Flink可以对数据进行分段计算.类似于批处理,批的概念相当于一段时间内的数据,窗口定义了时间的范围
窗口函数代码
//Time.seconds(2)表示窗口时间范围为2秒 Time.seconds(1) 表示窗口时间跨度为1秒
data.timeWindow(Time.seconds(2), Time.seconds(1));
时间窗口长度为2秒,两次执行的时间跨度为1秒
每次触发的窗口都可以看出是一批数据,能进行聚合操作
自定义聚合代码
import org.apache.flink.api.common.functions.AggregateFunction;
/**
* Flink 自定义聚合函数
*
* @param <T1>
*/
public class MyCountAggregate<T1> implements AggregateFunction<T1, Long, Long> {
/**
* 以一个窗口数据为一批次
* 一个批次中的数据出现一个新的key都会调用这个方法,用于初始化key的默认值
*
* @return
*/
@Override
public Long createAccumulator() { //初始化值数值调用
return 0L;
}
/**
* 以一个窗口数据为一批次
* 每一个key都会调用这个方法,相同的key调用这个方法中的t2属于同一个引用,这个t2通常用于记录累计次数
*
* @param t1 传入的key
* @param t2 相同key的引用
* @return
*/
@Override
public Long add(T1 t1, Long t2) {
t2 = t2 + 1;
return t2;
}
/**
* 以一个窗口数据为一批次
* 输出一格窗口中的某一类key合并后(相同的key都调用了add之后)产生的结果
*
* @param t
* @return
*/
@Override
public Long getResult(Long t) {
System.out.println("getResult->" + t);
return t;
}
/**
* 以一个窗口数据为一批次
* Flink是分布式处理数据的,一个窗口的数据可能分布在多个服务器中,merge相当于是把不同的服务器的数据进行合并
*
* @param t1
* @param t2
* @return
*/
@Override
public Long merge(Long t1, Long t2) {
System.out.println("merge->" + t1 + t2);
return t1 + t2;
}
}
自定义窗口数据打印
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
/**
* 自定义窗口函数
*/
public class MyCountWindowFunction implements WindowFunction<Long, String, String, TimeWindow> {
@Override
public void apply(String value, TimeWindow window, Iterable<Long> input, Collector<String> out) throws Exception {
out.collect("窗口时间:" + window.getEnd());
out.collect("key: " + value + " 累计: " + input.iterator().next());//
}
}
使用自定义聚合使用
import com.jimu.flink_example.deno.MyCountAggregate;
import com.jimu.flink_example.deno.MyCountWindowFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
/**
* Flink 单次统计
*/
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(6);//设置flink分布式计算的迸发度
DataStreamSource<String> data = env.socketTextStream("localhost", 9999, "\n");
//单词切割
SingleOutputStreamOperator<String> flatMap = data.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
for (String word : value.split(",")) {
out.collect(word);//单词切割
}
}
});
//单词记录为1
SingleOutputStreamOperator<Tuple2<String, Long>> map = flatMap.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {
return new Tuple2<>(value, 1L);
}
});
//分区
KeyedStream<Tuple2<String, Long>, String> word = map.keyBy(new KeySelector<Tuple2<String, Long>, String>() {
@Override
public String getKey(Tuple2<String, Long> tp2) throws Exception {
return tp2.f0;//根据Tuple2的f0 即 key分区
}
});
//时间窗口长度为5秒,窗口移动为1秒进行计算
WindowedStream<Tuple2<String, Long>, String, TimeWindow> windowedStream = word.timeWindow(Time.seconds(5), Time.seconds(5));
//自定义聚合
SingleOutputStreamOperator aggregate = windowedStream.aggregate(new MyCountAggregate(),new MyCountWindowFunction());
aggregate.addSink(new PrintSinkFunction());
env.execute("Socket Window WordCount"); //开始执行
}
}
版权声明:本文为weixin_42660202原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。