object SourceDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//数据来源
//1.从文件读取
val inpath="D:\\programs\\sparkPrograms\\FlinkProgarm\\src\\main\\resources\\hello.txt"
val stream1 = env.readTextFile(inpath)
//2.从socket流中读取
val stream2 =env.socketTextStream("hadoop01",7777)
//3.从kafka中读取
val properties=new Properties()
properties.setProperty("bootstrap.servers","hadoop01:9092")
properties.setProperty("group.id","cosumer-group")
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new
SimpleStringSchema(), properties))
// stream3.print("stream3")
//4.自定义Source
val stream4: DataStream[SensorReading] = env.addSource(new MySensorSource())
stream4.print("Stream4")
env.execute()
}
case class SensorReading(id: String, timestamp: Long, temperature: Double)
class MySensorSource extends SourceFunction[SensorReading]{
//flag 表示数据源是否正常运行
var running:Boolean = true
override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {
//初始化一个随机数发生器
val rand = new Random()
var curTemp=1.to(10).map(
//rand.nextGaussian() 正太分布随机数 范围在正负2seigama之间
i=>("sensor_"+i,65+rand.nextGaussian()*20)
)
while (running){
//更新温度值
curTemp = curTemp.map(
t=>(t._1,t._2+rand.nextGaussian())
)
//获取当前时间戳
val curTime = System.currentTimeMillis()
curTemp.foreach(
//使用collect方法 将数据一条一条发送出去
t=>sourceContext.collect(SensorReading(t._1,curTime,t._2))
)
Thread.sleep(100)
}
}
override def cancel(): Unit = {
running=false
}
}
}
版权声明:本文为qq_41123269原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。