Flink零基础学习(二)DataSource以及kafka和Socket信息源传输实例

  • Post author:
  • Post category:其他


一:数据源有以下几种类型

* 1.基于集合:有界数据集,更偏向本地测试
* 2.基于文件,适合监听文件修改并读取内容
* 3.基于socket:监听主机的host port,从socket中获取数据
* 4.自定义addSource:无界

二:详细分析

    1.集合

     DataStreamSource<OUT>  input =env.fromCollection(Collection<OUT> data);

    2.迭代器

     DataStreamSource<OUT>  input =env.fromCollection(Iterable,Class);

    3.给定的数据

     DataStream<Out> input =env.fromElements();

    4.从一个迭代器里创建并行数据流

    DataStreamSource<OUT>  input = env.fromParallelCollection(SplittableIterator,Class)

    5.创建一个生成制定区间范围内的数字序列的并行数据流
    DataStreamSource<OUT>  input = env.generateSequence(from,to);   6.基于文件
     
   DataStreamSource<String>  text = env.readFile(path);

    指定格式的文件输入格式读取文件

    DataStreamSource<String>  text = env.readFile(fileInputFormat,path);

    解释:上面两个方法内部调用的方法。它根据给定的 fileInputFormat 和读取路径读取文件。
         根据提供的 watchType,这个 source 可以定期(每隔 interval 毫秒)
         监测给定路径的新数据(FileProcessingMode.PROCESS_CONTINUOUSLY),
         或者处理一次路径对应文件的数据并退出(FileProcessingMode.PROCESS_ONCE)。
         你可以通过 pathFilter 进一步排除掉需要处理的文件,如下:

    DataStreamSource<MyEvent>  stream = env.readFile(
            myFormat,myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY,100,
            FilePathFilter.createDefaultFilter(),typeInfo
    );

三:Socket实现demo

1.maven引入

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-java</artifactId>
   <version>${flink.version}</version>
   <scope>provided</scope>
</dependency>
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
   <version>${flink.version}</version>
   <scope>provided</scope>
</dependency>
<flink.version>1.8.1</flink.version>
<scala.binary.version>2.11</scala.binary.version>

一个是flink版本,一个是scala预言版本

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String,Integer>> dataStream=env.socketTextStream("localhost",9999)
                                                 .flatMap(new SocketTextStreamWordCount.LinSplitter())
                                                 .keyBy(0)
        .timeWindow(Time.seconds(5))
        .sum(1);

统计方法类:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @author zhouxl
 * 实例demo
 */
public class SocketTextStreamWordCount {
    public static void  main(String[] args) throws Exception{
      //参数检查
        if(args.length !=2){
            System.err.println("USAGE:\\nSocketTextStreamWordCount <hostname> <port>");
            return;
        }
       String hostname =args[0];
       Integer port =Integer.parseInt(args[1]);


       //设置流执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      //获取数据
        DataStreamSource<String> stream  =env.socketTextStream(hostname,port);
      //计数
        SingleOutputStreamOperator<Tuple2<String,Integer>> sum =stream.flatMap(new LinSplitter())
                .keyBy(0)
                .sum(1);

        sum.print();

        env.execute("Java WordCount from SocketTextStream Example");
    }


    public static  final  class LinSplitter implements FlatMapFunction<String,Tuple2<String,Integer>>{


        @Override
        public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
           String[] tokens =s.toLowerCase().split("\\W+");

           for(String token :tokens){
               if(token.length()>0){
                   collector.collect(new Tuple2<String, Integer>(token,1));
               }
           }

        }
    }
}

实验结果

1maven打包好启动jar

flink run -c com.tuya.SocketTextStreamWordCount /Users/zhouxl/flinkdemo/target/original-flinkdemo-1.0-SNAPSHOT.jar 127.0.0.1 9000

2设置控制台监听 nc -l 9000

3启动flink(sh文件启动)

启动后

在idea控制台打印任意字符,都可以在日志(flink的log文件夹下面)看到统计结果

四:kafka实现demo

1.引入参考三

2.定义传递dto

@data
public class Metric {

    public String name;

    public long timestamp;

}

3.kafka发布类

/**
 * @author zhouxl
 * kafka发送工具类
 */
public class kafkaUtils {

    public static  final String broker_list ="kafka地址";

    public static  final String topic ="flink_demo";

    public static  void writeTokafak(Integer count) throws InterruptedException{

        //生产者配置文件,具体配置可参考ProducerConfig类源码,或者参考官网介绍
        Map<String,Object> config=new HashMap<String, Object>(16);
        //kafka服务器地址
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,kafka地址);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        config.put(ProducerConfig.PARTITIONER_CLASS_CONFIG," org.apache.kafka.clients.producer.internals.DefaultPartitioner");

        config.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024*1024*5);
        KafkaProducer producer =new KafkaProducer<String,String>(config);
        Metric metric =new Metric();
        metric.setTimestamp(System.currentTimeMillis());
        metric.setName("mem");
        JSON.toJSONString(metric));
        ProducerRecord record =new ProducerRecord<String,String>(topic,metric.toString());
        Future<RecordMetadata> future= producer.send(record);
        producer.flush();
       try{
           future.get();
           System.out.println("发送"+future.isDone()+"数据:"+JSON.toJSONString(metric));
       }catch (Exception e){
       }
    }

    /**
     * 模拟kafka发送数据
     * @param args
     */
    public static void main(String[] args)throws InterruptedException{
        Integer count=1;
        while(true){
            Thread.sleep(3000);
            writeTokafak(count++);
        }
    }

4.flink流处理

/**
 * @author zhouxl
 * 接受kafka消息
 */
public class Main {
    public static  void main(String[] args) throws Exception{

        final StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();

        Properties  props =new Properties();

        props.put("bootstrap.servers",kafka地址);

        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        props.put("auto.offset.reset","latest");

        DataStreamSource<String> dataStreamSource =env.addSource(
           new FlinkKafkaConsumer<String>("flink_demo",
                   //String序列化
                   new SimpleStringSchema(),
                   props
                   )
        ).setParallelism(1);

        //将从kafka读到的数据打印在控制台
        dataStreamSource.print();

        env.execute("Flink添加kafka资源");
    }
}

5实验结果

发送的结果

接受到的结果

总结:挤出点时间不容易啊



版权声明:本文为qq_40650378原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。