Flink-Environment的三种方式和Source的四种读取方式-从集合中、从kafka中、从文件中、自定义

  • Post author:
  • Post category:其他




Environment


getExecutionEnvironment

:创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。

如果没有设置并行度,会以flink-conf.yaml中的配置为准,默认是1。

// 批处理
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
// 流处理
val env = StreamExecutionEnvironment.getExecutionEnvironment


createLocalEnvironment

:返回本地执行环境,需要在调用时指定默认的并行度

val env = StreamExecutionEnvironment.createLocalEnvironment(1)


createRemoteEnvironment

:返回集群执行环境,将Jar提交到远程服务器。需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包。

val env = ExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", 6123,"YOURPATH//wordcount.jar")



Source之从集合中读取数据

SensorReading.scala

// 定义样例类,传感器id,时间戳,温度
case class SensorReading(id: String, timestamp: Long, temperature: Double)

SourceForCollection.scala

// 隐式转换很重要
import org.apache.flink.streaming.api.scala._

/**
 * 从集合中获取数据
 */
object SourceForCollection {
   
  def main(args: Array[String]): Unit = {
   

    // 创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 从集合中读取数据
    val listDstream : DataStream[SensorReading] = env.fromCollection(List(
      SensorReading("sensor_1", 1547718199, 35.8),
      SensorReading("sensor_6", 1547718201, 15.4),
      SensorReading("sensor_7", 1547718202, 6.7),
      SensorReading("sensor_10&#



版权声明:本文为qq_40180229原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。