首先,在没有flask的时候,我写了kafka的程序,一个生产者,一个消费者。
生产者进程,它负责读取本地的一张图片,把图片的二进制数据以消息的形式发送给kafka,代码如下:
from kafka import KafkaProducer
# get the binary data of a picture
f=open(‘/home/seven/Pictures/fff.png’,’rb’)
data=f.read()
f.close()
# create a producer
producer=KafkaProducer(bootstrap_servers=[‘localhost:9092’],key_serializer=str.encode)
# send th binary data to kafka
producer.send(‘img_msg’,key=”Hello,Assassin424214141″,value=data)
消费者进程,它负责接收消息,并用消息内容还原出一张图片存储在磁盘上,代码如下:
from kafka import KafkaConsumer
# create a consumer
consumer = KafkaConsumer(‘img_msg’,bootstrap_servers=[‘localhost:9092’])
# receive messages
for message in consumer:
# print the mess