Environment
getExecutionEnvironment
:创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。
如果没有设置并行度,会以flink-conf.yaml中的配置为准,默认是1。
// 批处理
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
// 流处理
val env = StreamExecutionEnvironment.getExecutionEnvironment
createLocalEnvironment
:返回本地执行环境,需要在调用时指定默认的并行度
val env = StreamExecutionEnvironment.createLocalEnvironment(1)
createRemoteEnvironment
:返回集群执行环境,将Jar提交到远程服务器。需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包。
val env = ExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", 6123,"YOURPATH//wordcount.jar")
Source之从集合中读取数据
SensorReading.scala
// 定义样例类,传感器id,时间戳,温度
case class SensorReading(id: String, timestamp: Long, temperature: Double)
SourceForCollection.scala
// 隐式转换很重要
import org.apache.flink.streaming.api.scala._
/**
* 从集合中获取数据
*/
object SourceForCollection {
def main(args: Array[String]): Unit = {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 从集合中读取数据
val listDstream : DataStream[SensorReading] = env.fromCollection(List(
SensorReading("sensor_1", 1547718199, 35.8),
SensorReading("sensor_6", 1547718201, 15.4),
SensorReading("sensor_7", 1547718202, 6.7),
SensorReading("sensor_10&#
版权声明:本文为qq_40180229原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。