最近学习Demo的时候,遇上了一个大坑,SpringBoot中使用Stream,
因为,是多个模块之间相互耦合的,最后折腾了4个小时,终于剥离出来了核心配置依赖,
附上配置和代码,有需要的,请自行修改
POM.XML
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.4</version>
<!-- <version>2.3.9.RELEASE</version>-->
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>kafka-demo2</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafka-demo2</name>
<description>kafka-demo2</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--日志处理包-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- kafkfa -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<exclusions>
<exclusion>
<artifactId>connect-json</artifactId>
<groupId>org.apache.kafka</groupId>
</exclusion>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
引导类
package com.example.kafkademo2;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaDemo2Application {
public static void main(String[] args) {
SpringApplication.run(KafkaDemo2Application.class, args);
}
}
application.yml
spring:
application:
name: kafka-demo
kafka:
bootstrap-servers: 10.147.17.93:9092
producer:
retries: 10
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: ${spring.application.name}-test
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
kafka:
hosts: 10.147.17.93:9092
group: ${spring.application.name}
消费者
package com.example.kafkademo2.sample;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
* 消费者
*/
public class ConsumerQuickStart {
public static void main(String[] args) {
//1.kafka的配置信息
Properties prop = new Properties();
//链接地址
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.147.17.93:9092");
//key和value的反序列化器
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//设置消费者组
prop.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
//手动提交偏移量
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
//2.创建消费者对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
//3.订阅主题
consumer.subscribe(Collections.singletonList("itcast-topic-out"));
//4.拉取消息
//同步提交和异步提交偏移量
try {
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key());
System.out.println(consumerRecord.value());
}
//异步提交偏移量
consumer.commitAsync();
}
}catch (Exception e){
e.printStackTrace();
System.out.println("记录错误的信息:"+e);
}finally {
//同步
consumer.commitSync();
}
}
}
生产者
package com.example.kafkademo2.sample;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* 生产者
*/
public class ProducerQuickStart {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1.kafka链接配置信息
Properties prop = new Properties();
//kafka链接地址
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.147.17.93:9092");
//key和value的序列化
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//ack配置 消息确认机制
prop.put(ProducerConfig.ACKS_CONFIG,"all");
//重试次数
prop.put(ProducerConfig.RETRIES_CONFIG,10);
//数据压缩
prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");
//2.创建kafka生产者对象
KafkaProducer<String,String> producer = new KafkaProducer<String,String>(prop);
//3.发送消息
/**
* 第一个参数 :topic
* 第二个参数:消息的key
* 第三个参数:消息的value
*/
for (int i = 0; i < 5; i++) {
ProducerRecord<String,String> kvProducerRecord = new ProducerRecord<String,String>("itcast-topic-input","hello kafka");
producer.send(kvProducerRecord);
}
//4.关闭消息通道 必须要关闭,否则消息发送不成功
producer.close();
}
}
配置类
package com.example.kafkademo2.config;
import lombok.Getter;
import lombok.Setter;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import java.util.HashMap;
import java.util.Map;
/**
* 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数
*/
@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {
private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
private String hosts;
private String group;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");
props.put(StreamsConfig.RETRIES_CONFIG, 10);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return new KafkaStreamsConfiguration(props);
}
}
聚合类
package com.example.kafkademo2.stream;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.time.Duration;
import java.util.Arrays;
@Configuration
@Slf4j
public class KafkaStreamHelloListener {
@Bean
public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
//创建kstream对象,同时指定从那个topic中接收消息
KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
return Arrays.asList(value.split(" "));
}
})
//根据value进行聚合分组
.groupBy((key,value)->value)
//聚合计算时间间隔
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
//求单词的个数
.count()
.toStream()
//处理后的结果转换为string字符串
.map((key,value)->{
System.out.println("key:"+key+",value:"+value);
return new KeyValue<>(key.key().toString(),value.toString());
})
//发送消息
.to("itcast-topic-out");
return stream;
}
}
版权声明:本文为u013833472原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。