1.落地到Clickhouse
import com.bawei.operators4.STest
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._
import java.sql.{Connection, DriverManager, PreparedStatement}
object SinkToClickhouse {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream: DataStream[String] = env.socketTextStream("hdp1", 9999)
val mapDS: DataStream[STest] = stream.map(x => {
val li: Array[String] = x.split(",")
STest(li(0), li(1), li(2).trim.toInt, li(3).trim.toLong)
})
mapDS.addSink(new ClickhouseSinkFunction)
env.execute()
}
}
class ClickhouseSinkFunction extends RichSinkFunction[STest]{
var conn: Connection = null
var ps: PreparedStatement = null
//打开资源创建链接
override def open(parameters: Configuration): Unit = {
Class.forName("ru.yandex.clickhouse.ClickHouseDriver")
conn = DriverManager.getConnection("jdbc:clickhouse://hdp1:8123", "default", "")
ps = conn.prepareStatement("insert into t_score values (?,?,?,?)")
}
//关闭资源
override def close(): Unit = {
ps.close()
conn.close()
}
override def invoke(value: STest, context: SinkFunction.Context): Unit = {
ps.setString(1,value.name)
ps.setString(2,value.subject)
ps.setInt(3,value.score)
ps.setLong(4,value.ts)
ps.executeUpdate()
}
}
2.落地到File
import org.apache.flink.api.common.serialization.{SimpleStringEncoder, SimpleStringSchema}
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
import java.util.Properties
import java.util.concurrent.TimeUnit
object SinkToFile {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream: DataStream[String] = env.socketTextStream("hdp1", 9999)
//将stream落地到file
/*val sink: StreamingFileSink[String] = StreamingFileSink
.forRowFormat(new Path("result/sink"), new SimpleStringEncoder[String]("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.SECONDS.toMillis(15))
.withInactivityInterval(TimeUnit.SECONDS.toMillis(5))
.withMaxPartSize(1024 * 1024 * 1024)
.build())
.build()
stream.addSink(sink)*/
stream.writeAsText("result2/sink")
env.execute()
}
}
3.落地到Hbase
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put, Table}
import org.apache.hadoop.hbase.util.Bytes
import java.util.{Date, Properties}
object SinkToHbase {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream: DataStream[String] = env.socketTextStream("hdp1", 9999)
//将stream落地到kafka
stream.addSink(new HbaseSink)
env.execute()
}
}
//create 't_result','info'
class HbaseSink extends RichSinkFunction[String]{
var connection: Connection = null
override def open(parameters: Configuration): Unit = {
val config = HBaseConfiguration.create
config.set("hbase.zookeeper.quorum", "hdp1")
connection = ConnectionFactory.createConnection(config)
}
override def close(): Unit = {
connection.close()
}
override def invoke(value: String, context: SinkFunction.Context): Unit = {
//"hello kitty"
//"hello snoopy"
//"hello a a"
//put 't_result','时间戳+hello kitty','info:line','hello kitty'
val t_stu: Table = connection.getTable(TableName.valueOf("t_result"))
val put: Put = new Put(Bytes.toBytes(new Date().getTime+value))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("line"), Bytes.toBytes(value))
t_stu.put(put)
}
}
4.落地到Kafka
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
import java.util.Properties
object SinkToKafka {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream: DataStream[String] = env.socketTextStream("hdp1", 9999)
//将stream落地到kafka
val properties = new Properties
properties.setProperty("bootstrap.servers", "hdp1:9092")
val myProducer = new FlinkKafkaProducer[String]("test", new SimpleStringSchema(), properties)
stream.addSink(myProducer)
env.execute()
}
}
5.落地到Mysql
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.Properties
object SinkToMysql {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream: DataStream[String] = env.socketTextStream("hdp1", 9999)
//将stream落地到kafka
val counts = stream.flatMap {_.toLowerCase.split("\\W+") filter {_.nonEmpty}}
.map {(_, 1)}
.keyBy(_._1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(1)
counts.addSink(new MysqlSink)
env.execute()
}
}
class MysqlSink extends RichSinkFunction[(String,Int)]{
var conn: Connection = null
var ps: PreparedStatement = null
//打开资源创建链接
override def open(parameters: Configuration): Unit = {
Class.forName("com.mysql.jdbc.Driver")
conn = DriverManager.getConnection("jdbc:mysql://hdp1:3306/mydb?characterEnCoding=utf8", "root", "root")
ps = conn.prepareStatement("insert into t_wc values(?,?)")
}
//关闭资源
override def close(): Unit = {
ps.close()
conn.close()
}
//拿到流中的每一条数据,将数据保存到指定位置
override def invoke(value: (String, Int), context: SinkFunction.Context): Unit = {
ps.setString(1,value._1)
ps.setInt(2,value._2)
ps.executeUpdate()
}
}
6.从Clickhouse读取
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
object SourceFromClickhouse {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//1、读取clickhouse数据
val stream: DataStream[(String, Int)] = env.addSource(new ClickhouseSourceFunction)
stream.print()
//2、输出数据内容
env.execute()
}
}
class ClickhouseSourceFunction extends RichSourceFunction[(String,Int)] {
var conn: Connection = null
//打开资源创建链接
override def open(parameters: Configuration): Unit = {
Class.forName("ru.yandex.clickhouse.ClickHouseDriver")
conn= DriverManager.getConnection("jdbc:clickhouse://hdp1:8123", "default", "")
}
//关闭资源
override def close(): Unit = {
conn.close()
}
//执行数据查询等
override def run(sourceContext: SourceFunction.SourceContext[(String, Int)]): Unit = {
val ps: PreparedStatement = conn.prepareStatement("select * from t_student")
val rs: ResultSet = ps.executeQuery()
while (rs.next()){
val name: String = rs.getString("name")
val age: Int = rs.getInt("age")
sourceContext.collect((name,age))
}
}
override def cancel(): Unit = ???
}
7.从Hbase读取
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.hadoop.hbase.{Cell, CellUtil, HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Result, ResultScanner, Scan, Table}
import org.apache.hadoop.hbase.util.Bytes
import java.util
import java.util.List
object SourceFromHbase {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream: DataStream[String] = env.addSource(new HbaseSource)
stream.print()
env.execute()
}
}
class HbaseSource extends RichSourceFunction[String] {
var connection: Connection = null
override def open(parameters: Configuration): Unit = {
val config = HBaseConfiguration.create
config.set("hbase.zookeeper.quorum", "hdp1")
connection = ConnectionFactory.createConnection(config)
}
override def close(): Unit = {
connection.close()
}
override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
val t_stu: Table = connection.getTable(TableName.valueOf("t_stu"))
val scan: Scan = new Scan
val scanner: ResultScanner = t_stu.getScanner(scan)
var result: Result = scanner.next
while ( {
result != null
}) { //外层遍历行
val cells: util.List[Cell] = result.listCells
import scala.collection.JavaConversions._
for (c <- cells) { //内层遍历单元格
val rk: String = Bytes.toString(CellUtil.cloneRow(c))
val f: String = Bytes.toString(CellUtil.cloneFamily(c))
val q: String = Bytes.toString(CellUtil.cloneQualifier(c))
val v: String = Bytes.toString(CellUtil.cloneValue(c))
sourceContext.collect(rk + "," + f + "," + q + "," + v)
}
result = scanner.next
}
}
override def cancel(): Unit = ???
}
8.从Kafkka读取
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import java.util.Properties
object SourceFromKafka {
def main(args: Array[String]): Unit = {
//1、获取(创建)流处理执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//2、通过流处理环境对象来获取数据源
val properties = new Properties()
properties.setProperty("bootstrap.servers", "hdp1:9092")
properties.setProperty("group.id", "group1")
val stream = env
.addSource(new FlinkKafkaConsumer[String]("test", new SimpleStringSchema(), properties))
//3、数据转换分析 略
//4、数据落地
stream.print()
//5、提交执行
env.execute()
}
}
9.从Mysql读取
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
object SourceFromMysql {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//1、读取mysql数据
//val stream: DataStream[String] = env.addSource(new UDFSourceFunction)
val stream: DataStream[(String, Int)] = env.addSource(new MysqlSourceFunction)
stream.print()
//2、输出数据内容
env.execute()
}
}
/*
class UDFSourceFunction extends SourceFunction[String] {
override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
for(i<- 0.to(10)){
sourceContext.collect("xiaoming"+i)
}
}
override def cancel(): Unit = ???
}*/
class MysqlSourceFunction extends RichSourceFunction[(String,Int)] {
var conn: Connection = null
//打开资源创建链接
override def open(parameters: Configuration): Unit = {
Class.forName("com.mysql.jdbc.Driver")
conn= DriverManager.getConnection("jdbc:mysql://hdp1:3306/mydb?characterEnCoding=utf8", "root", "root")
}
//关闭资源
override def close(): Unit = {
conn.close()
}
//执行数据查询等
override def run(sourceContext: SourceFunction.SourceContext[(String, Int)]): Unit = {
val ps: PreparedStatement = conn.prepareStatement("select * from t_student")
val rs: ResultSet = ps.executeQuery()
while (rs.next()){
val name: String = rs.getString("name")
val age: Int = rs.getInt("age")
sourceContext.collect((name,age))
}
}
override def cancel(): Unit = ???
}
版权声明:本文为qq_66455465原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。