使confluent_kafka支持SASL_PLAINTEXT

  • Post author:
  • Post category:其他


同事之前一直使用kafka-python开发. 上了ACL以后发现kafka-python居然不支持SASL_PLAINTEXT


https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html


sasl_mechanism (str) – string picking sasl mechanism when security_protocol is SASL_PLAINTEXT or SASL_SSL. Currently only PLAIN is supported. Default: None

看了一下

confluent-kafka

是支持的但需要重新编译

librdkafka


否则会报错:

KafkaException: KafkaError{code=_INVALID_ARG,val=-186,str="Failed to create producer: No provider for SASL mechanism GSSAPI: recompile librdkafka with libsasl2 or openssl support. Current build options: PLAIN SASL_SCRAM"}



安装confluent-kafka

pip install confluent-kafka
Collecting confluent-kafka
  Downloading https://files.pythonhosted.org/packages/2a/ba/dccb27376453f91ad8fa57f75a7ba5dc188023700c1789273dec976477b2/confluent_kafka-0.11.6-cp37-cp37m-manylinux1_x86_64.whl (3.9MB)
    100% |████████████████████████████████| 3.9MB 984kB/s 



安装librdkafka

克隆下来

[root@node004110 18:21:05 /tmp]
git clone https://github.com/edenhill/librdkafka.git
Cloning into 'librdkafka'...
remote: Enumerating objects: 213, done.
remote: Counting objects: 100% (213/213), done.
remote: Compressing objects: 100% (111/111), done.
remote: Total 18133 (delta 133), reused 149 (delta 98), pack-reused 17920
Receiving objects: 100% (18133/18133), 11.15 MiB | 946.00 KiB/s, done.
Resolving deltas: 100% (13758/13758), done.

检查依赖

rpm -qa| grep openssl
openssl-1.0.2k-16.el7.x86_64
openssl-libs-1.0.2k-16.el7.x86_64
openssl-devel-1.0.2k-16.el7.x86_64

The GNU toolchain

GNU make

pthreads

zlib-dev (optional, for gzip compression support)

libssl-dev (optional, for SSL and SASL SCRAM support) –这个对于centos openssl-devel

libsasl2-dev (optional, for SASL GSSAPI support)

libzstd-dev (optional, for ZStd compression support)

安装

./configure
make && make install

注意下make的时候这里是ok就行

checking for libssl (by pkg-config)... ok
checking for libssl (by compile)... ok (cached)



替换库文件

查找

find / -name "librdkafka*" 
/root/.pyenv/versions/3.7.2/lib/python3.7/site-packages/confluent_kafka/.libs/librdkafka.so.1

替换

#cd /root/.pyenv/versions/3.7.2/lib/python3.7/site-packages/confluent_kafka/.libs/

[root@node004110 18:24:03 ~/.pyenv/versions/3.7.2/lib/python3.7/site-packages/confluent_kafka/.libs]
#ll
total 10316
-rwxr-xr-x 1 root root 2903144 Mar 8 18:20 libcrypto-4c524931.so.1.0.0
-rwxr-xr-x 1 root root 6944424 Mar 8 18:20 librdkafka.so.1
-rwxr-xr-x 1 root root 584072 Mar 8 18:20 libssl-01b7eff1.so.1.0.0
-rwxr-xr-x 1 root root 87848 Mar 8 18:20 libz-a147dcb0.so.1.2.3
-rw-r--r-- 1 root root 35336 Mar 8 18:20 monitoring-interceptor.so

[root@node004110 18:24:04 ~/.pyenv/versions/3.7.2/lib/python3.7/site-packages/confluent_kafka/.libs]
#mv librdkafka.so.1 librdkafka.so.1.bak

[root@node004110 18:24:11 ~/.pyenv/versions/3.7.2/lib/python3.7/site-packages/confluent_kafka/.libs]
#ln -s /usr/local/lib/librdkafka.so.1 librdkafka.so.1

[root@node004110 18:24:23 ~/.pyenv/versions/3.7.2/lib/python3.7/site-packages/confluent_kafka/.libs]
#ll
total 10316
-rwxr-xr-x 1 root root 2903144 Mar 8 18:20 libcrypto-4c524931.so.1.0.0
lrwxrwxrwx 1 root root 30 Mar 8 18:24 librdkafka.so.1 -> /usr/local/lib/librdkafka.so.1
-rwxr-xr-x 1 root root 6944424 Mar 8 18:20 librdkafka.so.1.bak
-rwxr-xr-x 1 root root 584072 Mar 8 18:20 libssl-01b7eff1.so.1.0.0
-rwxr-xr-x 1 root root 87848 Mar 8 18:20 libz-a147dcb0.so.1.2.3
-rw-r--r-- 1 root root 35336 Mar 8 18:20 monitoring-interceptor.so



验证

#python
Python 3.7.2 (default, Mar 4 2019, 16:55:21) 
[GCC 4.8.5 20150623 (Red Hat 4.8.5-36)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from confluent_kafka import Producer     
>>> p = Producer({'bootstrap.servers': '192.168.4.114:9092', 'security.protocol': 'SASL_PLAINTEXT', 'sasl.mechanism':'SCRAM-SHA-256','sasl.username':'admin','sasl.password':'your-admin-pass'})        
>>> def delivery_report(err, msg):
... if err is not None:
... print('Message delivery failed: {}'.format(err))
... else:
... print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
... 
>>> for data in ['hello','word']:  
... p.produce('test_acl', data.encode('utf-8'), callback=delivery_report)
... 
>>> p.poll(10) 
Message delivered to test_acl [0]
Message delivered to test_acl [0]
1
>>> p.flush()
0
>>> quit()

成功收到消息

[root@node004114 kafka]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.4.114:9092 --topic test_acl --consumer.config config/client-sasl.properties --from-beginning
hello
word



版权声明:本文为ashic原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。