使用java操作zookeeper实现kafka-topics.sh的功能

  • Post author:
  • Post category:java


使用java操作zookeeper实现kafka-topics.sh的功能

原文地址:

https://blog.csdn.net/qq_39800434/article/details/84982549



需求

因为工作要求,需要对测试环境的kafka的所有topic增加分区。因为topic很多,所以手动使用命令行实现不太现实(写这篇文章的时候忽然想起来也可以实现。只需要写个脚本,加一个循环就可以了,好像还简单一点-_-||)。所以寻求通过代码连接客户端的方式实现。



代码如下

import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.utils.ZkUtils;
import org.apache.kafka.common.security.JaasUtils;
import org.junit.Test;
import scala.collection.JavaConversions;
import scala.collection.Map;
import scala.collection.Seq;

import java.util.List;
import java.util.Properties;

/**
 * Created by NightWatch on 2018/12/11.
 */
public class TopicSh {

    private static String hostAndPort = "10.1.24.216:2181";

    @Test
    //创建topic
    public void createTopic() {
        ZkUtils zkUtils = ZkUtils.apply(hostAndPort, 30000, 30000, JaasUtils.isZkSecurityEnabled());
        AdminUtils.createTopic(zkUtils, "topic-name", 1, 1, new Properties(), RackAwareMode.Enforced$.MODULE$);
    }

    @Test
    //获取所有topic
    public void listTopicOne() {
        ZkUtils zkUtils = ZkUtils.apply(hostAndPort, 30000, 30000, JaasUtils.isZkSecurityEnabled());
        Map<String, Properties> map = AdminUtils.fetchAllTopicConfigs(zkUtils);
        java.util.Map<String, Properties> javaMap = JavaConversions.asJavaMap(map);
        for (java.util.Map.Entry<String, Properties> entry : javaMap.entrySet()) {
            //topic名称
            String topic = entry.getKey();
            System.out.println(topic);
            //TODO : 这个value我暂时也没搞懂是啥,等空闲了在研究吧
            Properties value = entry.getValue();
        }
    }

    @Test
    //获取所有topic,这种写法更简单一些
    public void listTopicTwo() {
        ZkUtils zkUtils = ZkUtils.apply(hostAndPort, 30000, 30000, JaasUtils.isZkSecurityEnabled());
        Seq<String> allTopics = zkUtils.getAllTopics();

        List<String> topicsList = JavaConversions.seqAsJavaList(allTopics);

        //TODO: 这个写法我也没搞懂,同上
        topicsList.forEach(System.out::println);
    }


    @Test
    //修改topic分区数
    public void alterTopicPartition() {
        ZkUtils zkUtils = ZkUtils.apply(hostAndPort, 30000, 30000, JaasUtils.isZkSecurityEnabled());

        List<String> topics = JavaConversions.seqAsJavaList(zkUtils.getAllTopics());
        for (String topic : topics) {
            AdminUtils.addPartitions(zkUtils, topic, 4, "", true, RackAwareMode.Enforced$.MODULE$);
        }
        zkUtils.close();
    }
}



遇到的问题

运行之后发现报了如下错误:

org.I0Itec.zkclient.exception.ZkException: org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /brokers/ids

	at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:68)
	at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:1000)
	at org.I0Itec.zkclient.ZkClient.getChildren(ZkClient.java:676)
	at org.I0Itec.zkclient.ZkClient.getChildren(ZkClient.java:672)
	at kafka.utils.ZkUtils.getChildrenParentMayNotExist(ZkUtils.scala:568)
	at kafka.utils.ZkUtils.getAllBrokersInCluster(ZkUtils.scala:161)
	at kafka.admin.AdminUtils$.getBrokerMetadatas(AdminUtils.scala:380)
	at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:402)
	at kafka.admin.AdminUtils.createTopic(AdminUtils.scala)
	at com.bicon.kafka.TopicSh.createTopic(TopicSh.java:26)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:119)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
	at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:234)
	at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:74)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /brokers/ids
	at org.apache.zookeeper.KeeperException.create(KeeperException.java:113)
	at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
	at org.apache.zookeeper.ZooKeeper.getChildren(ZooKeeper.java:1472)
	at org.apache.zookeeper.ZooKeeper.getChildren(ZooKeeper.java:1500)
	at org.I0Itec.zkclient.ZkConnection.getChildren(ZkConnection.java:119)
	at org.I0Itec.zkclient.ZkClient$4.call(ZkClient.java:679)
	at org.I0Itec.zkclient.ZkClient$4.call(ZkClient.java:676)
	at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:990)
	... 35 more

错误显示没有认证,一开始我推测可能是文件权限的问题

于是我看了一下zookeeper的datadir目录

[root@localhost zookeeper]# ll
总用量 4
drwxr-xr-x 2 root root 4096 12月  7 09:41 version-2

发现非root用户果然没有写的权限,但是我又觉得不会是这个问题,然后我尝试百度了一下,发现好像还真不是这个原因。

原来zookeeper会设置acl,登录zookeeper命令行查看

[root@localhost kafka_2.11-0.11.0.1]# ./bin/zookeeper-shell.sh 28.2.5.89:2181
Connecting to 28.2.5.89:2181
Welcome to ZooKeeper!
JLine support is disabled

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
ls /
[cluster, controller, brokers, zookeeper, kafka-acl, kafka-acl-changes, admin, isr_change_notification, controller_epoch, kafka-manager, consumers, latest_producer_id_block, config]
getAcl /brokers/ids
'ip,'10.1.24.216
: cdrwa

果然被谁设置了权限。。于是修改权限

setAcl /brokers/ids world:anyone:cdrwa
cZxid = 0x5
ctime = Wed Dec 06 20:17:05 CST 2017
mZxid = 0x5
mtime = Wed Dec 06 20:17:05 CST 2017
pZxid = 0x1fd7
cversion = 63
dataVersion = 0
aclVersion = 2
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1
getAcl /brokers/ids
'world,'anyone
: cdrwa

在此运行程序,成功!


注:因为我这里是测试环境,所以随便修改权限配置没啥问题,不过大家还是别学我

最后贴上提醒了我的文章

https://my.oschina.net/anxiaole/blog/1814143