example
创建producer发送信息给消费者
object kafakaTest extends App {
//设定配置相关
private val prop = new Properties()
prop.setProperty("bootstrap.servers", "mypc01:9092,mypc02:9092,mypc03:9092")
prop.setProperty("acks", "0")
prop.setProperty("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer")
prop.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
//获取生产者对象
private val producer = new KafkaProducer[Integer, String](prop)
//创建记录,就是要发送什么信息
private val message = new ProducerRecord[Integer, String]("pet","you are good")
//发送信息
producer.send(message)
//释放资源
producer.close()
}
获取配置文件的另一种方法
object kafakaTest2 extends App {
private val prop = new Properties()
//另一种方式获取配置信息,需要把配置文件放到idea的resource文件下
prop.load(kafakaTest2.getClass.getClassLoader.getResourceAsStream("producer.properties"))
//获取生产者对象
private val producer = new KafkaProducer[Integer, String](prop)
//获取记录
private val message = new ProducerRecord[Integer, String]("pet", "you are rich")
producer.send(message)
producer.close()
}
创建producer时需要的配置信息解析
bootstrap.servers=bigdata01:9092,bigdata02:9092,bigdata03:9092 ## kafka的服务器
key.serializer=org.apache.kafka.common.serialization.IntegerSerializer ##Key的序列化器
value.serializer=org.apache.kafka.common.serialization.StringSerializer ##value的序列化器
acks=[0|-1|1|all] ##消息确认机制
0: 不做确认,直管发送消息即可
-1|all: 不仅leader需要将数据写入本地磁盘,并确认,还需要同步的等待其它followers进行确认
1:只需要leader进行消息确认即可,后期follower可以从leader进行同步
batch.size=1024 #每个分区内的用户缓存未发送record记录的空间大小
如果缓存区中的数据,没有沾满,也就是任然有未用的空间,那么也会将请求发送出去,为了较少请求次数,我们可以配置linger.ms大于0,
linger.ms=10 ## 不管缓冲区是否被占满,延迟10ms发送request
buffer.memory=10240 #控制的是一个producer中的所有的缓存空间
retries=0 #发送消息失败之后的重试次数
send方法是异步方法
public class kafkaTest3 {
public static void main(String[] args) throws IOException, InterruptedException, ExecutionException {
InputStream is = kafkaTest3.class.getClassLoader().getResourceAsStream("producer.properties");
Properties properties = new Properties();
properties.load(is);
//创建producer对象
KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(properties);
//创建一个记录
ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("pet", "good story");
//Asynchronously send a record to a topic. Equivalent to send(record, null)
//异步方法,数据会发送到缓冲区中,就立即返回,无序等待缓冲区中的数据被冲刷出去.
//缓冲区的特点:满足条件时,(比如缓冲区已满,或者是达到时间阈值)就会被冲刷出去
//利用producer的send方法发送记录
Future<RecordMetadata> send = producer.send(record);
//解析发送后的返回值
//send方法是有返回值的
RecordMetadata metadata = send.get();
//获取topic
String topic = metadata.topic();//pet
int partition = metadata.partition();//2
long offset = metadata.offset();//-1
boolean hasOffset = metadata.hasOffset();//false
boolean hasTimestamp = metadata.hasTimestamp();//true
System.out.println(topic + " " + partition + " " + offset + " " + hasOffset + " " + hasTimestamp);
Thread.sleep(10000);
producer.close();
}
}
ps
参数linger.ms
缓冲区延迟多少秒后,批量发送到集群上.
总结
- 创建proucer对象
- 创建待发送的信息对象
- 发送消息
版权声明:本文为u010711495原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。