flink用https连接elasticsearch

  • Post author:
  • Post category:其他




一,主程序

package flink.geektime.ds;

import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.util.*;

public class DataStreamDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
        properties.setProperty("group.id", "geekflinkds");

        FlinkKafkaConsumer<UserBehaviorEvent> kafka = new FlinkKafkaConsumer<>("user_action", new UserBehaviorSchema(), properties);

        DataStreamSource<UserBehaviorEvent> userActionDS =
                env.addSource(kafka);

        SingleOutputStreamOperator<Tuple4<Long, Long, Integer, Integer>> process = userActionDS
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .forGenerator(new WatermarkGeneratorSupplier<UserBehaviorEvent>() {
                                    @Override
                                    public WatermarkGenerator<UserBehaviorEvent> createWatermarkGenerator(Context context) {
                                        return new UserActionWatermarkGenerator();
                                    }
                                })
                                .withTimestampAssigner(new TimestampAssignerSupplier<UserBehaviorEvent>() {
                                    @Override
                                    public TimestampAssigner<UserBehaviorEvent> createTimestampAssigner(Context context) {
                                        return new UserActionAssigner();
                                    }
                                })).windowAll(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(1)))
                .allowedLateness(Time.seconds(5))
                .process(new UserActionWindowFunction());

        List<HttpHost> httpHostList = new ArrayList<>();
        httpHostList.add(new HttpHost("node1",9200,"https"));

        ElasticsearchSink.Builder<Tuple4<Long, Long, Integer, Integer>> builder = new ElasticsearchSink.Builder<>(httpHostList, new ElasticsearchSinkFunction<Tuple4<Long, Long, Integer, Integer>>() {
            public IndexRequest createIndexRequest(Tuple4<Long, Long, Integer, Integer> element) {
                Map<String, Tuple4<Long, Long, Integer, Integer>> json = new HashMap<>();
                json.put("data", element);

                return Requests.indexRequest()
                        .index("flink-user-action")
                        .source(json);
            }

            @Override
            public void process(Tuple4<Long, Long, Integer, Integer> element, RuntimeContext ctx, RequestIndexer indexer) {
                indexer.add(createIndexRequest(element));
            }
        });
        

        builder.setRestClientFactory(
                restClientBuilder -> {
                    restClientBuilder.setHttpClientConfigCallback(
                            httpAsyncClientBuilder -> {


                                final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();

                                credentialsProvider.setCredentials(AuthScope.ANY,
                                        new UsernamePasswordCredentials("elastic", "123456"));

                                httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                                try {
                                    return  httpAsyncClientBuilder.setConnectionManager(SslUtil.getConnectionManager());
                                } catch (Exception e) {
                                    e.printStackTrace();
                                }

                                return null;
                            }
                    );

                });

        process.addSink(builder.build());

        env.execute();



    }
}



二,SSLUtil

package flink.geektime.ds;

import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.nio.conn.SchemeIOSessionStrategy;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.apache.http.ssl.TrustStrategy;

import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSession;
import java.io.Serializable;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;


public class SslUtil implements Serializable {

    public static PoolingNHttpClientConnectionManager getConnectionManager() throws Exception {
        SSLContextBuilder builder = SSLContexts.custom();
        builder.loadTrustMaterial(null, new TrustStrategy() {
            @Override
            public boolean isTrusted(X509Certificate[] chain, String authType)
                    throws CertificateException {
                return true;
            }
        });
        SSLContext sslContext = builder.build();
        SchemeIOSessionStrategy sslioSessionStrategy = new SSLIOSessionStrategy(sslContext,
                new HostnameVerifier() {
                    @Override
                    public boolean verify(String hostname, SSLSession session) {
                        return true;
                    }
                });
        Registry<SchemeIOSessionStrategy> https = RegistryBuilder.<SchemeIOSessionStrategy>create()
                .register("https", sslioSessionStrategy).build();

        PoolingNHttpClientConnectionManager ncm =
                new PoolingNHttpClientConnectionManager(new DefaultConnectingIOReactor(), https);
        return ncm;
    }
}



三,UserActionAssigner

package flink.geektime.ds;

import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;

public class UserActionAssigner implements TimestampAssigner<UserBehaviorEvent> {


    @Override
    public long extractTimestamp(UserBehaviorEvent userBehaviorEvent, long l) {
        return userBehaviorEvent.getTs();
    }
}



四,UserActionWatermarkGenerator

package flink.geektime.ds;

import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;

public class UserActionWatermarkGenerator implements WatermarkGenerator<UserBehaviorEvent> {

    private long currentWatermark = Long.MIN_VALUE;
    @Override
    public void onEvent(UserBehaviorEvent userBehaviorEvent, long lts, WatermarkOutput watermarkOutput) {
        currentWatermark = lts;
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
        long effectiveWatermark = currentWatermark == Long.MAX_VALUE ? Long.MIN_VALUE : currentWatermark - 1;
        watermarkOutput.emitWatermark(new Watermark(effectiveWatermark));
    }
}



五,UserActionWindowFunction

package flink.geektime.ds;

import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.util.HashSet;
import java.util.Set;

public class UserActionWindowFunction extends ProcessAllWindowFunction<UserBehaviorEvent, Tuple4<Long,Long,Integer,Integer>, TimeWindow> {
    @Override
    public void process(Context context, Iterable<UserBehaviorEvent> iterable,
                        Collector<Tuple4<Long,Long,Integer,Integer>> collector) throws Exception {
        Set<Integer> userIds = new HashSet<>();
        Integer pv = 0;
        for(UserBehaviorEvent event : iterable) {
            pv++;
            userIds.add(event.getUserId());
        }

        TimeWindow window = context.window();
        collector.collect(Tuple4.of(window.getStart(),window.getEnd(),pv,userIds.size()));
    }
}



六,UserBehaviorEvent

package flink.geektime.ds;


public class UserBehaviorEvent  {
    private Integer userId;
    private Integer itemId;
    private String category;
    private String clientIp;
    private String action;
    private Long ts;

    public UserBehaviorEvent() {
    }

    public UserBehaviorEvent(Integer userId, Integer itemId, String category, String clientIp, String action, Long ts) {
        this.userId = userId;
        this.itemId = itemId;
        this.category = category;
        this.clientIp = clientIp;
        this.action = action;
        this.ts = ts;
    }

    public Integer getUserId() {
        return userId;
    }

    public void setUserId(Integer userId) {
        this.userId = userId;
    }

    public Integer getItemId() {
        return itemId;
    }

    public void setItemId(Integer itemId) {
        this.itemId = itemId;
    }

    public String getCategory() {
        return category;
    }

    public void setCategory(String category) {
        this.category = category;
    }

    public String getClientIp() {
        return clientIp;
    }

    public void setClientIp(String clientIp) {
        this.clientIp = clientIp;
    }

    public String getAction() {
        return action;
    }

    public void setAction(String action) {
        this.action = action;
    }

    public Long getTs() {
        return ts;
    }

    public void setTs(Long ts) {
        this.ts = ts;
    }
}



七,UserBehaviorSchema

package flink.geektime.ds;

import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class UserBehaviorSchema implements DeserializationSchema<UserBehaviorEvent>, SerializationSchema<UserBehaviorEvent> {
    @Override
    public UserBehaviorEvent deserialize(byte[] bytes) throws IOException {
        return JSON.parseObject(bytes, UserBehaviorEvent.class);
    }

    @Override
    public boolean isEndOfStream(UserBehaviorEvent userBehaviorEvent) {
        return false;
    }

    @Override
    public byte[] serialize(UserBehaviorEvent userBehaviorEvent) {
        return userBehaviorEvent.toString().getBytes(StandardCharsets.UTF_8);
    }

    @Override
    public TypeInformation<UserBehaviorEvent> getProducedType() {
        return TypeInformation.of(UserBehaviorEvent.class);
    }
}



版权声明:本文为epitomizelu原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。