kafka环境搭建(Windows/Linux)

  • Post author:
  • Post category:linux




(一)安装zookeeper(windows)



kafka需要用到zookeeper,所以需要先安装zookeeper


1.到官网下载最新版zookeeper,http://www.apache.org/dyn/closer.cgi/zookeeper/


2.解压到你喜欢的路径,我这里为:E:\zookeeper\zookeeper-3.4.10


3.复制conf目录下zoo_sample.cfg,粘贴改名为zoo.cfg,修改zoo.cfg中的dataDir的值为E:/data/zookeeper,并添加一行dataLogDir=E:/log/zookeeper


4.修改系统环境变量,在Path后添加    ;E:\zookeeper\zookeeper-3.4.10\bin


5.运行cmd命令窗口,输入zkServer回车,出现下图的就表示zookeeper启动成功,也表明安装成功了。







安装zookeeper(Linux)



1. Xshell等工具连接Linux服务器,切换到任意目录,下载zookeeper最新稳定版,下载地址http://mirrors.hust.edu.cn/apache/zookeeper/stable/,命令如下


cd /usr/soft


wget http://mirrors.hust.edu.cn/apache/zookeeper/stable/zookeeper-3.4.10.tar.gz


2.解压


tar -xzvf zookeeper-3.4.10.tar.gz


3.切换到conf配置文件目录,复制zoo_sample.cfg为zoo.cfg可以按需修改配置文件内容


4.切换到bin目录,启动zookeeper,看到Starting zookeeper … STARTED字样表示启动成功了


./zkServer.sh start





(二)


安装kafka


(windows)











1. 到官网下载最新版kafka,http://kafka.apache.org/downloads


2.解压到你喜欢的路径,我这里解压路径为:E:\kafka_2.12-0.10.2.0


3.修改E:\kafka_2.12-0.10.2.0\config目录下的server.properties中 log.dirs的值为E:/log/kafka


4.添加系统环境变量,在Path后添加   ;E:\kafka_2.12-0.10.2.0\bin\windows


5.启动kafka,在cmd命令行用cd命令切换到kafka根目录E:\kafka_2.12-0.10.2.0,输入命令


.\bin\windows\kafka-server-start.bat .\config\server.properties


出现started (kafka.server.KafkaServer)字样表示启动成功




启动时若出现“wvim不是内部或外部命令...”错误提示,则需要在系统Path环境变量后添加
;C:\Windows\System32\wbem


6.运行cmd命令行,创建一个topic


kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic test


7.再打开一个cmd,创建一个Producer


kafka-console-producer.bat –broker-list localhost:9092 –topic test


8.再打开一个cmd,创建一个Customer


kafka-console-consumer.bat –zookeeper localhost:2181 –topic test


9.在Producer窗口下输入信息进行测试 ,每输入一行回车后消息马上就会出现在Customer中,表明kafka已经安装测试成功













安装kafka


(Linux)






1.下载kafka最新版https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.2.0/kafka_2.12-0.10.2.0.tgz




wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/0.10.2.0/kafka_2.12-0.10.2.0.tgz




2.解压,文件夹重命名



tar -xzvf


kafka_2.12-0.10.2.0.tgz





mv




kafka_2.12-0.10.2.0 kafka





3.切换目录到kafka目录下的bin目录,用vi命令修改


kafka-server-start.sh


中jvm内存大小,把


export KAFKA_HEAP_OPTS=”-Xms1G -Xms1G” 修改为

export KAFKA_HEAP_OPTS=”-Xms256M -Xms128M”,当然如果你的内存够大可以不修改



4.切换到kafka根目录,启动kafka,启动成功如下图



bin/kafka-server-start.sh config/server.properties





5.创建topic

bin/kafka-topics.sh –create –zookeeper 127.0.0.1:2181 –replication-factor 1 –partitions 1 –topic test

创建一个名为test的topic,只有一个副本,一个分区。

通过list命令查看刚刚创建的topic

bin/kafka-topics.sh -list -zookeeper 127.0.0.1:2181

6.启动producer并发送消息启动producer

bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test

启动之后就可以发送消息了

按Ctrl+C退出发送消息

7.启动consumer

bin/kafka-console-consumer.sh –zookeeper

localhost

:2181 –topic test –from-beginning

启动consumer之后就可以在console中看到producer发送的消息了

可以开启两个终端,一个发送消息,一个接受消息。




(三)kafka编程之Java接口


1.新建Maven工程,我这里用的是Eclipse;pom加入kafka依赖,如下:


<dependency>
    	<groupId>org.apache.kafka</groupId>
    	<artifactId>kafka_2.11</artifactId>
    	<version>0.10.2.0</version>
</dependency>


2.新建生产测试类TestProducer.java


import java.util.Properties;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class TestProducer {
    public static void main(String[] args) {
         Properties props = new Properties();
         props.put("bootstrap.servers", "localhost:9092");
         //The "all" setting we have specified will result in blocking on the full commit of the record, the slowest but most durable setting.
        //“所有”设置将导致记录的完整提交阻塞,最慢的,但最持久的设置。
         props.put("acks", "all");
         //如果请求失败,生产者也会自动重试,即使设置成0 the producer can automatically retry.
         props.put("retries", 0);
         //The producer maintains buffers of unsent records for each partition. 
         props.put("batch.size", 16384);
         //默认立即发送,这里这是延时毫秒数
         props.put("linger.ms", 1);
         //生产者缓冲大小,当缓冲区耗尽后,额外的发送调用将被阻塞。时间超过max.block.ms将抛出TimeoutException
         props.put("buffer.memory", 33554432);
         //The key.serializer and value.serializer instruct how to turn the key and value objects the user provides with their ProducerRecord into bytes.
         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

         //创建kafka的生产者类
         Producer<String, String> producer = new KafkaProducer<String, String>(props);
         //生产者的主要方法
         // close();//Close this producer.
         //   close(long timeout, TimeUnit timeUnit); //This method waits up to timeout for the producer to complete the sending of all incomplete requests.
         //  flush() ;所有缓存记录被立刻发送
         for(int i = 0; i < 100; i++)
             producer.send(new ProducerRecord<String, String>("test",0, Integer.toString(i), Integer.toString(i)));
             producer.close();
    }
}


3.新建消费测试类TestCustomer.java


import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class TestConsumer {

    public static void main(String[] args) {
        Properties props = new Properties();

        props.put("bootstrap.servers", "localhost:9092");
        System.out.println("this is the group part test 1");
        //消费者的组id
        props.put("group.id", "GroupA");//这里是GroupA或者GroupB
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        //从poll(拉)的回话处理时长
        props.put("session.timeout.ms", "30000");
        //poll的数量限制
        //props.put("max.poll.records", "100");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        //订阅主题列表topic
        consumer.subscribe(Arrays.asList("test"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                // 正常这里应该使用线程池处理,不应该在这里处理
                System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()+"\n");

        }
    }

}


4.先运行(run/debug)TestCustomer



再运行TestProducer,在TestCustomer的控制台看到下图的结果就表示消息发送并接收成功了





并且在之前启动的消费端的命令窗口也能看到接收到的数据:






dazu表示kakazhj



版权声明:本文为z8414原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。