springboot集成kafka

  • Post author:
  • Post category:其他



1、先解决依赖

springboot相关的依赖我们就不提了,和kafka相关的只依赖一个spring-kafka集成包

1

2

3

4

5


<


dependency


>




<


groupId


>org.springframework.kafka</


groupId


>




<


artifactId


>spring-kafka</


artifactId


>




<


version


>1.1.1.RELEASE</


version


>




</


dependency


>

这里我们先把配置文件展示一下

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16


#============== kafka ===================


kafka.consumer.zookeeper.connect=10.93.21.21:2181


kafka.consumer.servers=10.93.21.21:9092


kafka.consumer.enable.auto.commit=true


kafka.consumer.session.timeout=6000


kafka.consumer.auto.commit.interval=100


kafka.consumer.auto.offset.reset=latest


kafka.consumer.topic=test


kafka.consumer.group.id=test


kafka.consumer.concurrency=10


kafka.producer.servers=10.93.21.21:9092


kafka.producer.retries=0


kafka.producer.batch.size=4096


kafka.producer.linger=1


kafka.producer.buffer.memory=40960


2、Configuration:Kafka producer

1)通过@Configuration、@EnableKafka,声明Config并且打开KafkaTemplate能力。

2)通过@Value注入application.properties配置文件中的kafka配置。

3)生成bean,@Bean

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48


package


com.kangaroo.sentinel.collect.configuration;


import


java.util.HashMap;


import


java.util.Map;


import


org.apache.kafka.clients.producer.ProducerConfig;


import


org.apache.kafka.common.serialization.StringSerializer;


import


org.springframework.beans.factory.annotation.Value;


import


org.springframework.context.annotation.Bean;


import


org.springframework.context.annotation.Configuration;


import


org.springframework.kafka.annotation.EnableKafka;


import


org.springframework.kafka.core.DefaultKafkaProducerFactory;


import


org.springframework.kafka.core.KafkaTemplate;


import


org.springframework.kafka.core.ProducerFactory;


@Configuration


@EnableKafka


public


class


KafkaProducerConfig {




@Value


(


"${kafka.producer.servers}"


)




private


String servers;




@Value


(


"${kafka.producer.retries}"


)




private


int


retries;




@Value


(


"${kafka.producer.batch.size}"


)




private


int


batchSize;




@Value


(


"${kafka.producer.linger}"


)




private


int


linger;




@Value


(


"${kafka.producer.buffer.memory}"


)




private


int


bufferMemory;




public


Map<String, Object> producerConfigs() {




Map<String, Object> props =


new


HashMap<>();




props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);




props.put(ProducerConfig.RETRIES_CONFIG, retries);




props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);




props.put(ProducerConfig.LINGER_MS_CONFIG, linger);




props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);




props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.


class


);




props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.


class


);




return


props;




}




public


ProducerFactory<String, String> producerFactory() {




return


new


DefaultKafkaProducerFactory<>(producerConfigs());




}




@Bean




public


KafkaTemplate<String, String> kafkaTemplate() {




return


new


KafkaTemplate<String, String>(producerFactory());




}


}

实验我们的producer,写一个Controller。想topic=test,key=key,发送消息message

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32


package


com.kangaroo.sentinel.collect.controller;


import


com.kangaroo.sentinel.common.response.Response;


import


com.kangaroo.sentinel.common.response.ResultCode;


import


org.slf4j.Logger;


import


org.slf4j.LoggerFactory;


import


org.springframework.beans.factory.annotation.Autowired;


import


org.springframework.kafka.core.KafkaTemplate;


import


org.springframework.web.bind.annotation.*;


import


javax.servlet.http.HttpServletRequest;


import


javax.servlet.http.HttpServletResponse;


@RestController


@RequestMapping


(


"/kafka"


)


public


class


CollectController {




protected


final


Logger logger = LoggerFactory.getLogger(


this


.getClass());




@Autowired




private


KafkaTemplate kafkaTemplate;




@RequestMapping


(value =


"/send"


, method = RequestMethod.GET)




public


Response sendKafka(HttpServletRequest request, HttpServletResponse response) {




try


{




String message = request.getParameter(


"message"


);




logger.info(


"kafka的消息={}"


, message);




kafkaTemplate.send(


"test"


,


"key"


, message);




logger.info(


"发送kafka成功."


);




return


new


Response(ResultCode.SUCCESS,


"发送kafka成功"


,


null


);




}


catch


(Exception e) {




logger.error(


"发送kafka失败"


, e);




return


new


Response(ResultCode.EXCEPTION,


"发送kafka失败"


,


null


);




}




}


}


3、configuration:kafka consumer

1)通过@Configuration、@EnableKafka,声明Config并且打开KafkaTemplate能力。

2)通过@Value注入application.properties配置文件中的kafka配置。

3)生成bean,@Bean

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62


package


com.kangaroo.sentinel.collect.configuration;


import


org.apache.kafka.clients.consumer.ConsumerConfig;


import


org.apache.kafka.common.serialization.StringDeserializer;


import


org.springframework.beans.factory.annotation.Value;


import


org.springframework.context.annotation.Bean;


import


org.springframework.context.annotation.Configuration;


import


org.springframework.kafka.annotation.EnableKafka;


import


org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;


import


org.springframework.kafka.config.KafkaListenerContainerFactory;


import


org.springframework.kafka.core.ConsumerFactory;


import


org.springframework.kafka.core.DefaultKafkaConsumerFactory;


import


org.springframework.kafka.listener.ConcurrentMessageListenerContainer;


import


java.util.HashMap;


import


java.util.Map;


@Configuration


@EnableKafka


public


class


KafkaConsumerConfig {




@Value


(


"${kafka.consumer.servers}"


)




private


String servers;




@Value


(


"${kafka.consumer.enable.auto.commit}"


)




private


boolean


enableAutoCommit;




@Value


(


"${kafka.consumer.session.timeout}"


)




private


String sessionTimeout;




@Value


(


"${kafka.consumer.auto.commit.interval}"


)




private


String autoCommitInterval;




@Value


(


"${kafka.consumer.group.id}"


)




private


String groupId;




@Value


(


"${kafka.consumer.auto.offset.reset}"


)




private


String autoOffsetReset;




@Value


(


"${kafka.consumer.concurrency}"


)




private


int


concurrency;




@Bean




public


KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {




ConcurrentKafkaListenerContainerFactory<String, String> factory =


new


ConcurrentKafkaListenerContainerFactory<>();




factory.setConsumerFactory(consumerFactory());




factory.setConcurrency(concurrency);




factory.getContainerProperties().setPollTimeout(


1500


);




return


factory;




}




public


ConsumerFactory<String, String> consumerFactory() {




return


new


DefaultKafkaConsumerFactory<>(consumerConfigs());




}




public


Map<String, Object> consumerConfigs() {




Map<String, Object> propsMap =


new


HashMap<>();




propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);




propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);




propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);




propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);




propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.


class


);




propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.


class


);




propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);




propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);




return


propsMap;




}




@Bean




public


Listener listener() {




return


new


Listener();




}


}

new Listener()生成一个bean用来处理从kafka读取的数据。Listener简单的实现demo如下:只是简单的读取并打印key和message值

@KafkaListener中topics属性用于指定kafka topic名称,topic名称由消息生产者指定,也就是由kafkaTemplate在发送消息时指定。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16


package


com.kangaroo.sentinel.collect.configuration;


import


org.apache.kafka.clients.consumer.ConsumerRecord;


import


org.slf4j.Logger;


import


org.slf4j.LoggerFactory;


import


org.springframework.kafka.annotation.KafkaListener;


public


class


Listener {




protected


final


Logger logger = LoggerFactory.getLogger(


this


.getClass());




@KafkaListener


(topics = {



"test"


})




public


void


listen(ConsumerRecord<?, ?> record) {




logger.info(


"kafka的key: "


+ record.key());




logger.info(


"kafka的value: "


+ record.value().toString());




}


}



版权声明:本文为baidu_23263735原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。