1、kafka消费线程类
public class ConsumerThread implements Runnable {
private ConsumerRecords<String, String> records;
private KafkaConsumer<String, String> consumer;
public ConsumerThread(ConsumerRecords<String, String> records,
KafkaConsumer<String, String> consumer) {
this.records = records;
this.consumer = consumer;
}
@Override
public void run() {
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records
.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println("当前线程:" + Thread.currentThread() + ","
+ "偏移量:" + record.offset() + "," + "主题:"
+ record.topic() + "," + "分区:" + record.partition()
+ "," + "获取的消息:" + record.value());
}
// 消费者自己手动提交消费的offest,确保消息正确处理后再提交
long lastOffset = partitionRecords.get(partitionRecords.size() - 1)
.offset();
consumer.commitSync(Collections.singletonMap(partition,
new OffsetAndMetadata(lastOffset + 1)));
}
}
}
多线程消费代码
@Slf4j
@Component
public class KafkaManyThreadConsumer {
private final KafkaTemplate<String, String> kafkaTemplate;
private final static KafkaConsumer<String, String> kafkaConsumer;
private final static int corePoolSize = 2;
private final static int maximumPoolSize = 4;
private final static long keepAliveTime = 10;
private static BlockingQueue<Runnable> workQueue;
private static ThreadPoolExecutor executor;
public KafkaManyThreadConsumer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
static {
Properties props = new Properties();
props.put("bootstrap.servers", "127.0.0.1:9092");
props.put("group.id", "test-group-id");
// 关闭自动提交
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
kafkaConsumer = new KafkaConsumer<String, String>(props);
kafkaConsumer.subscribe(Arrays.asList("topic.partition"));
workQueue = new ArrayBlockingQueue<>(100000);
executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS,
workQueue, new ThreadPoolExecutor.AbortPolicy());
executor.prestartAllCoreThreads(); // 预启动所有核心线程
}
public void manyThreadConsumer() {
while (true){
ConsumerRecords<String, String> records = kafkaConsumer.poll(10);
if (null != records) {
executor.submit(new ConsumerThread(records, kafkaConsumer));
}
}
}
public void shutdown() {
try {
if (kafkaConsumer != null) {
kafkaConsumer.close();
}
if (executor != null) {
executor.shutdown();
}
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
log.info("Timeout");
}
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}
}
程序启动后自动执行消费消息
@Component
public class ApplicationRunnerImpl implements ApplicationRunner {
private final KafkaManyThreadConsumer kafkaManyThreadConsumer;
public ApplicationRunnerImpl(KafkaManyThreadConsumer kafkaManyThreadConsumer) {
this.kafkaManyThreadConsumer = kafkaManyThreadConsumer;
}
@Override
public void run(ApplicationArguments args) {
try {
kafkaManyThreadConsumer.manyThreadConsumer();
} catch (Exception e) {
e.printStackTrace();
} finally {
kafkaManyThreadConsumer.shutdown();
}
}
}
生产者
@GetMapping("/kafka/many")
public void testKafka() {
for (int i = 0; i <20; i++) {
try {
Thread.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
Long orderId = new SnowflakeGenerator().next();
//key值取hash值对分区数量取模
final Integer partition =Math.abs(orderId.hashCode())%3;
final Order order = new Order();
order.setOrderNo(orderId+"");
order.setCreateTime(new Date());
order.setPhone('1' + RandomUtil.randomNumbers(10));
log.info("kafka 发送消息"+orderId + "分区====="+partition);
kafkaService.sendMsg("topic.partition",partition,orderId+"", JSON.toJSONString(order));
}
}
版权声明:本文为weixin_42324471原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。