一,主程序
package flink.geektime.ds;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.util.*;
public class DataStreamDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
properties.setProperty("group.id", "geekflinkds");
FlinkKafkaConsumer<UserBehaviorEvent> kafka = new FlinkKafkaConsumer<>("user_action", new UserBehaviorSchema(), properties);
DataStreamSource<UserBehaviorEvent> userActionDS =
env.addSource(kafka);
SingleOutputStreamOperator<Tuple4<Long, Long, Integer, Integer>> process = userActionDS
.assignTimestampsAndWatermarks(
WatermarkStrategy
.forGenerator(new WatermarkGeneratorSupplier<UserBehaviorEvent>() {
@Override
public WatermarkGenerator<UserBehaviorEvent> createWatermarkGenerator(Context context) {
return new UserActionWatermarkGenerator();
}
})
.withTimestampAssigner(new TimestampAssignerSupplier<UserBehaviorEvent>() {
@Override
public TimestampAssigner<UserBehaviorEvent> createTimestampAssigner(Context context) {
return new UserActionAssigner();
}
})).windowAll(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(1)))
.allowedLateness(Time.seconds(5))
.process(new UserActionWindowFunction());
List<HttpHost> httpHostList = new ArrayList<>();
httpHostList.add(new HttpHost("node1",9200,"https"));
ElasticsearchSink.Builder<Tuple4<Long, Long, Integer, Integer>> builder = new ElasticsearchSink.Builder<>(httpHostList, new ElasticsearchSinkFunction<Tuple4<Long, Long, Integer, Integer>>() {
public IndexRequest createIndexRequest(Tuple4<Long, Long, Integer, Integer> element) {
Map<String, Tuple4<Long, Long, Integer, Integer>> json = new HashMap<>();
json.put("data", element);
return Requests.indexRequest()
.index("flink-user-action")
.source(json);
}
@Override
public void process(Tuple4<Long, Long, Integer, Integer> element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
});
builder.setRestClientFactory(
restClientBuilder -> {
restClientBuilder.setHttpClientConfigCallback(
httpAsyncClientBuilder -> {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials("elastic", "123456"));
httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
try {
return httpAsyncClientBuilder.setConnectionManager(SslUtil.getConnectionManager());
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
);
});
process.addSink(builder.build());
env.execute();
}
}
二,SSLUtil
package flink.geektime.ds;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.nio.conn.SchemeIOSessionStrategy;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.apache.http.ssl.TrustStrategy;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSession;
import java.io.Serializable;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
public class SslUtil implements Serializable {
public static PoolingNHttpClientConnectionManager getConnectionManager() throws Exception {
SSLContextBuilder builder = SSLContexts.custom();
builder.loadTrustMaterial(null, new TrustStrategy() {
@Override
public boolean isTrusted(X509Certificate[] chain, String authType)
throws CertificateException {
return true;
}
});
SSLContext sslContext = builder.build();
SchemeIOSessionStrategy sslioSessionStrategy = new SSLIOSessionStrategy(sslContext,
new HostnameVerifier() {
@Override
public boolean verify(String hostname, SSLSession session) {
return true;
}
});
Registry<SchemeIOSessionStrategy> https = RegistryBuilder.<SchemeIOSessionStrategy>create()
.register("https", sslioSessionStrategy).build();
PoolingNHttpClientConnectionManager ncm =
new PoolingNHttpClientConnectionManager(new DefaultConnectingIOReactor(), https);
return ncm;
}
}
三,UserActionAssigner
package flink.geektime.ds;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
public class UserActionAssigner implements TimestampAssigner<UserBehaviorEvent> {
@Override
public long extractTimestamp(UserBehaviorEvent userBehaviorEvent, long l) {
return userBehaviorEvent.getTs();
}
}
四,UserActionWatermarkGenerator
package flink.geektime.ds;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
public class UserActionWatermarkGenerator implements WatermarkGenerator<UserBehaviorEvent> {
private long currentWatermark = Long.MIN_VALUE;
@Override
public void onEvent(UserBehaviorEvent userBehaviorEvent, long lts, WatermarkOutput watermarkOutput) {
currentWatermark = lts;
}
@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
long effectiveWatermark = currentWatermark == Long.MAX_VALUE ? Long.MIN_VALUE : currentWatermark - 1;
watermarkOutput.emitWatermark(new Watermark(effectiveWatermark));
}
}
五,UserActionWindowFunction
package flink.geektime.ds;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.HashSet;
import java.util.Set;
public class UserActionWindowFunction extends ProcessAllWindowFunction<UserBehaviorEvent, Tuple4<Long,Long,Integer,Integer>, TimeWindow> {
@Override
public void process(Context context, Iterable<UserBehaviorEvent> iterable,
Collector<Tuple4<Long,Long,Integer,Integer>> collector) throws Exception {
Set<Integer> userIds = new HashSet<>();
Integer pv = 0;
for(UserBehaviorEvent event : iterable) {
pv++;
userIds.add(event.getUserId());
}
TimeWindow window = context.window();
collector.collect(Tuple4.of(window.getStart(),window.getEnd(),pv,userIds.size()));
}
}
六,UserBehaviorEvent
package flink.geektime.ds;
public class UserBehaviorEvent {
private Integer userId;
private Integer itemId;
private String category;
private String clientIp;
private String action;
private Long ts;
public UserBehaviorEvent() {
}
public UserBehaviorEvent(Integer userId, Integer itemId, String category, String clientIp, String action, Long ts) {
this.userId = userId;
this.itemId = itemId;
this.category = category;
this.clientIp = clientIp;
this.action = action;
this.ts = ts;
}
public Integer getUserId() {
return userId;
}
public void setUserId(Integer userId) {
this.userId = userId;
}
public Integer getItemId() {
return itemId;
}
public void setItemId(Integer itemId) {
this.itemId = itemId;
}
public String getCategory() {
return category;
}
public void setCategory(String category) {
this.category = category;
}
public String getClientIp() {
return clientIp;
}
public void setClientIp(String clientIp) {
this.clientIp = clientIp;
}
public String getAction() {
return action;
}
public void setAction(String action) {
this.action = action;
}
public Long getTs() {
return ts;
}
public void setTs(Long ts) {
this.ts = ts;
}
}
七,UserBehaviorSchema
package flink.geektime.ds;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class UserBehaviorSchema implements DeserializationSchema<UserBehaviorEvent>, SerializationSchema<UserBehaviorEvent> {
@Override
public UserBehaviorEvent deserialize(byte[] bytes) throws IOException {
return JSON.parseObject(bytes, UserBehaviorEvent.class);
}
@Override
public boolean isEndOfStream(UserBehaviorEvent userBehaviorEvent) {
return false;
}
@Override
public byte[] serialize(UserBehaviorEvent userBehaviorEvent) {
return userBehaviorEvent.toString().getBytes(StandardCharsets.UTF_8);
}
@Override
public TypeInformation<UserBehaviorEvent> getProducedType() {
return TypeInformation.of(UserBehaviorEvent.class);
}
}
版权声明:本文为epitomizelu原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。