1. 前言
基于上一篇文章对EII Message Bus原理的介绍,本文我们继续来聊聊EII Message Bus编程。另外我们也介绍一下,在EII的容器网络之外(如Host系统之上,或者非EII功能模块中),如何使用EII Message Bus和EII模块实现通讯。
在EII Message Bus的Samples案例中,例如发布程序”Samples/publisher/python/publisher.py”和订阅程序”Samples/subscriber/python/subscriber.py”,其中既包含了对EII ConfigMgr API(负责从ETCD数据库读取模块的配置信息)的使用,又包含了对EII Message Bus API的使用。
这里我们将EII Message Bus的使用单独提取出来,来学习一下EII Message Bus API的使用。
2. 修改发布和订阅程序
- 首先,我们将”publisher.py”脚本改写成如下内容:
import time
import eii.msgbus as mb
import os
import random
def start_publisher():
publisher = None
try:
msgbus_cfg = {
'type': 'zmq_tcp',
'zmq_tcp_publish': {
'host': '0.0.0.0',
'port': 65021
}
}
topic = 'camera1_stream_results'
loop_interval = 1
print('[INFO] Initializing message bus context')
msgbus_pub = mb.MsgbusContext(msgbus_cfg)
publisher = msgbus_pub.new_publisher(topic)
print('[INFO] Running...')
while True:
blob = b'\x22' * 10
meta = {
'temperature': random.uniform(10,30)
}
publisher.publish((meta, blob,))
print(f'[INFO] Msg published by publisher : \'{meta}\'')
time.sleep(loop_interval)
except KeyboardInterrupt:
print('[INFO] Quitting...')
finally:
if publisher is not None:
publisher.close()
if __name__ == "__main__":
start_publisher()
上述发布程序去掉了ConfigMgr相关的操作,将配置信息直接写死(hardcode)在了程序中,例如程序中对”msgbus_cfg”(包含本机IP和端口号Port),“topic”,”loop_interval”的定义。
- 接着,我们将”subscriber.py”脚本改写成如下内容:
import eii.msgbus as mb
import os
import cfgmgr.config_manager as cfg
def start_subscriber():
subscriber = None
try:
msgbus_cfg = {
'type': 'zmq_tcp',
'camera1_stream_results': {
'host': '192.168.2.101',
'port': 65021
}
}
topic = 'camera1_stream_results'
print('[INFO] Initializing message bus context')
msgbus_sub = mb.MsgbusContext(msgbus_cfg)
subscriber = msgbus_sub.new_subscriber(topic)
print('[INFO] Running...')
while True:
msg, _ = subscriber.recv()
if msg is not None:
print(f'[INFO] RECEIVED by subscriber : {msg}')
else:
print('[INFO] Receive interrupted')
except KeyboardInterrupt:
print('[INFO] Quitting...')
finally:
if subscriber is not None:
subscriber.close()
if __name__ == "__main__":
start_subscriber()
同样,在订阅程序中,我们去掉了ConfigMgr相关的操作,将配置信息直接写死在了程序中,例如程序中对”msgbus_cfg”,”topic”的定义。
3. 安装环境依赖
以上,我们修改好了发布和订阅程序,接下来我们尝试运行一下。不过EII Message Bus的运行需要安装一些依赖包,我们得先安装环境依赖。
在EII的Samples中,依赖包是在Dockerfile中进行安装的。我们将其中的安装步骤提取出来,尝试在Host系统上安装一下,参考”Samples/publisher/python/ubuntu/Dockerfile”文件,编写如下安装脚本”install.sh”:
#!/bin/bash
set -e
sudo -v
CUR_DIR=$PWD
# Install dep tools, python3, cjson, libzmq
sudo apt update
sudo apt install -y build-essential cmake wget python3.8 python3.8-dev python3-distutils python3-setuptools python3-pip libcjson-dev libzmq3-dev
mkdir -p $CUR_DIR/dpkgs
# Install eii-utils and eii-messagebus
cd $CUR_DIR/dpkgs
wget https://gitee.com/open-edge-insights/eii-manifests/releases/download/v3.0/eii-utils-3.0.0-Linux.deb
wget https://gitee.com/open-edge-insights/eii-manifests/releases/download/v3.0/eii-messagebus-3.0.0-Linux.deb
sudo dpkg -i eii-*
# eii-messagebus python deps
pip3 install eii-messagebus==3.0 -i https://pypi.tuna.tsinghua.edu.cn/simple
- 在Host系统,打开一个命令行窗口,执行上述安装程序,完成EII Message Bus依赖包的安装。
$ sudo chmod +x install.sh
$ ./install.sh
4. 在Host上运行发布和订阅程序
安装好EII Message Bus的环境依赖后,我们就可以运行”publisher.py”和”subscriber.py”程序了。
- 打开命令行窗口,执行订阅程序。
$ python subscriber.py
- 打开另外一个命令行窗口,执行发布程序。
$ python publisher.py
- 此时,可以看到”subscriber.py”窗口成功接收到消息。
5. EII时序栈订阅Host上EII Message Bus发布的数据
想要让EII时序栈(Telegraf)接收Host上程序发布的数据,其实非常简单。我们打开Telegraf的配置文件”Telegra/config.json”,将”Subscriber”接口的定义由原先从某个容器订阅数据,改为从本机的发布程序的端口订阅数据。
# "Telegra/config.json"文件第38行:
"EndPoint": "ia_ubuntu_python_sample_pub:65020",
# 修改为:
"EndPoint": "192.168.2.101:65021",
只需修改上述一行,我们即配置好了Telegraf从Host上的”publisher.py”程序订阅数据。
接下来,我们启动EII的时序栈。
- 首先确认模块编排文件”build/usecases/test-emb.yml”包含如下模块。(这里仍然需要带上Samples/publisher和Samples/subscriber模块,属于EII的一个小bug,后续文章再解释)
- ConfigMgrAgent
- Samples/publisher/python/ubuntu
- Samples/subscriber/python/ubuntu
- Telegraf
- InfluxDBConnector
- Grafana
- Kapacitor
- 编译模块配置信息
$ cd ~/eii/IEdgeInsights/build
$ sudo -E python3 builder.py -f usecases/test-emb.yml
- 启动EII软件栈。
$ ./eii_start.sh
- 运行上一章节,准备的发布程序。
$ python publisher.py
-
打开浏览器,输入网址”localhost:3000″登录Grafana界面 (默认账号: admin, 密码: admin)。
-
参考上一篇文章,在Grafana界面中查看数据成功收到。
后记
- 更多EII Message Bus编程相关信息请参考官方教程:https://open-edge-insights.github.io/pages/msgbus.html#