原文地址:
https://blog.csdn.net/qq_39800434/article/details/84982549
需求
因为工作要求,需要对测试环境的kafka的所有topic增加分区。因为topic很多,所以手动使用命令行实现不太现实(写这篇文章的时候忽然想起来也可以实现。只需要写个脚本,加一个循环就可以了,好像还简单一点-_-||)。所以寻求通过代码连接客户端的方式实现。
代码如下
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.utils.ZkUtils;
import org.apache.kafka.common.security.JaasUtils;
import org.junit.Test;
import scala.collection.JavaConversions;
import scala.collection.Map;
import scala.collection.Seq;
import java.util.List;
import java.util.Properties;
/**
* Created by NightWatch on 2018/12/11.
*/
public class TopicSh {
private static String hostAndPort = "10.1.24.216:2181";
@Test
//创建topic
public void createTopic() {
ZkUtils zkUtils = ZkUtils.apply(hostAndPort, 30000, 30000, JaasUtils.isZkSecurityEnabled());
AdminUtils.createTopic(zkUtils, "topic-name", 1, 1, new Properties(), RackAwareMode.Enforced$.MODULE$);
}
@Test
//获取所有topic
public void listTopicOne() {
ZkUtils zkUtils = ZkUtils.apply(hostAndPort, 30000, 30000, JaasUtils.isZkSecurityEnabled());
Map<String, Properties> map = AdminUtils.fetchAllTopicConfigs(zkUtils);
java.util.Map<String, Properties> javaMap = JavaConversions.asJavaMap(map);
for (java.util.Map.Entry<String, Properties> entry : javaMap.entrySet()) {
//topic名称
String topic = entry.getKey();
System.out.println(topic);
//TODO : 这个value我暂时也没搞懂是啥,等空闲了在研究吧
Properties value = entry.getValue();
}
}
@Test
//获取所有topic,这种写法更简单一些
public void listTopicTwo() {
ZkUtils zkUtils = ZkUtils.apply(hostAndPort, 30000, 30000, JaasUtils.isZkSecurityEnabled());
Seq<String> allTopics = zkUtils.getAllTopics();
List<String> topicsList = JavaConversions.seqAsJavaList(allTopics);
//TODO: 这个写法我也没搞懂,同上
topicsList.forEach(System.out::println);
}
@Test
//修改topic分区数
public void alterTopicPartition() {
ZkUtils zkUtils = ZkUtils.apply(hostAndPort, 30000, 30000, JaasUtils.isZkSecurityEnabled());
List<String> topics = JavaConversions.seqAsJavaList(zkUtils.getAllTopics());
for (String topic : topics) {
AdminUtils.addPartitions(zkUtils, topic, 4, "", true, RackAwareMode.Enforced$.MODULE$);
}
zkUtils.close();
}
}
遇到的问题
运行之后发现报了如下错误:
org.I0Itec.zkclient.exception.ZkException: org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /brokers/ids
at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:68)
at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:1000)
at org.I0Itec.zkclient.ZkClient.getChildren(ZkClient.java:676)
at org.I0Itec.zkclient.ZkClient.getChildren(ZkClient.java:672)
at kafka.utils.ZkUtils.getChildrenParentMayNotExist(ZkUtils.scala:568)
at kafka.utils.ZkUtils.getAllBrokersInCluster(ZkUtils.scala:161)
at kafka.admin.AdminUtils$.getBrokerMetadatas(AdminUtils.scala:380)
at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:402)
at kafka.admin.AdminUtils.createTopic(AdminUtils.scala)
at com.bicon.kafka.TopicSh.createTopic(TopicSh.java:26)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:119)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:234)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:74)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /brokers/ids
at org.apache.zookeeper.KeeperException.create(KeeperException.java:113)
at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
at org.apache.zookeeper.ZooKeeper.getChildren(ZooKeeper.java:1472)
at org.apache.zookeeper.ZooKeeper.getChildren(ZooKeeper.java:1500)
at org.I0Itec.zkclient.ZkConnection.getChildren(ZkConnection.java:119)
at org.I0Itec.zkclient.ZkClient$4.call(ZkClient.java:679)
at org.I0Itec.zkclient.ZkClient$4.call(ZkClient.java:676)
at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:990)
... 35 more
错误显示没有认证,一开始我推测可能是文件权限的问题
于是我看了一下zookeeper的datadir目录
[root@localhost zookeeper]# ll
总用量 4
drwxr-xr-x 2 root root 4096 12月 7 09:41 version-2
发现非root用户果然没有写的权限,但是我又觉得不会是这个问题,然后我尝试百度了一下,发现好像还真不是这个原因。
原来zookeeper会设置acl,登录zookeeper命令行查看
[root@localhost kafka_2.11-0.11.0.1]# ./bin/zookeeper-shell.sh 28.2.5.89:2181
Connecting to 28.2.5.89:2181
Welcome to ZooKeeper!
JLine support is disabled
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
ls /
[cluster, controller, brokers, zookeeper, kafka-acl, kafka-acl-changes, admin, isr_change_notification, controller_epoch, kafka-manager, consumers, latest_producer_id_block, config]
getAcl /brokers/ids
'ip,'10.1.24.216
: cdrwa
果然被谁设置了权限。。于是修改权限
setAcl /brokers/ids world:anyone:cdrwa
cZxid = 0x5
ctime = Wed Dec 06 20:17:05 CST 2017
mZxid = 0x5
mtime = Wed Dec 06 20:17:05 CST 2017
pZxid = 0x1fd7
cversion = 63
dataVersion = 0
aclVersion = 2
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1
getAcl /brokers/ids
'world,'anyone
: cdrwa
在此运行程序,成功!
注:因为我这里是测试环境,所以随便修改权限配置没啥问题,不过大家还是别学我
最后贴上提醒了我的文章
https://my.oschina.net/anxiaole/blog/1814143