package com.zl.kafkademo;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import java.util.Properties;
/**
* @Auther: le
* @Date: 2019/4/23 22:05
* @Description:
*/
public class MyProducer implements Job {
private static KafkaProducer<String,String> producer;
static {
Properties properties = new Properties();
properties.put("bootstrap.servers","127.0.0.1:9092");
properties.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String, String>(properties);
}
/**
* 第一种直接发送,不管结果
*/
private static void sendMessageForgetResult(){
ProducerRecord<String,String> record = new ProducerRecord<String,String>(
"kafka-study","name","Forget_result"
);
producer.send(record);
producer.close();
}
/**
* 第二种同步发送,等待执行结果
* @return
* @throws Exception
*/
private static RecordMetadata sendMessageSync() throws Exception{
ProducerRecord<String,String> record = new ProducerRecord<String,String>(
"kafka-study","name","sync"
);
RecordMetadata result = producer.send(record).get();
System.out.println(result.topic());
System.out.println(result.partition());
System.out.println(result.offset());
return result;
}
/**
* 第三种执行回调函数
*/
private static void sendMessageCallback(){
ProducerRecord<String,String> record = new ProducerRecord<String,String>(
"kafka-study","name","callback"
);
producer.send(record,new MyProducerCallback());
}
//定时任务
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
try {
sendMessageSync();
}catch (Exception e){
System.out.println("error:"+e);
}
}
private static class MyProducerCallback implements Callback{
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e !=null){
e.printStackTrace();
return;
}
System.out.println(recordMetadata.topic());
System.out.println(recordMetadata.partition());
System.out.println(recordMetadata.offset());
System.out.println("Coming in MyProducerCallback");
}
}
public static void main(String[] args){
//sendMessageForgetResult();
//sendMessageCallback();
JobDetail job = JobBuilder.newJob(MyProducer.class).build();
Trigger trigger = TriggerBuilder.newTrigger()
.withSchedule(SimpleScheduleBuilder.repeatSecondlyForever()).build();
try {
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
scheduler.scheduleJob(job,trigger);
scheduler.start();
}catch (SchedulerException e){
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}
需要引入文件:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.1</version>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.3.0</version>
</dependency>
测试方法:
MAC下操作指令:
1、创建主题:
./kafka-topics.sh –create –topic kafka-study –zookeeper 127.0.0.1:2181 –config max.message.bytes=12800000 –config flush.messages=1 –partitions 5 –replication-factor 1
2、运行上述程序,执行定时任务
3、查看消费情况
./kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic kafka-study –from-beginning
windows操作指令:
1、进入 D:\zookeeper-3.4.14\bin 打开新的cmd,输入“zkServer“,运行Zookeeper
2、进入 D:\kafka_2.11-0.11.0.0 运行cmd
.\bin\windows\kafka-server-start.bat .\config\server.properties
3、 创建主题,
进入D:\kafka_2.11-0.11.0.0运行cmd,输入:
.\bin\windows\kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic test
查看已创建主题:
.\bin\windows\kafka-topics.bat –list –zookeeper localhost:2181
查看指定主题的详细信息:
.\bin\windows\kafka-topics.bat –describe –zookeeper localhost:2181 –topic test
查看主题消费详情:
.\bin\windows\kafka-console-consumer.bat –zookeeper localhost:2181 –topic kafka-study –from-beginning