(3)EII Message Bus编程

  • Post author:
  • Post category:其他




1. 前言

基于上一篇文章对EII Message Bus原理的介绍,本文我们继续来聊聊EII Message Bus编程。另外我们也介绍一下,在EII的容器网络之外(如Host系统之上,或者非EII功能模块中),如何使用EII Message Bus和EII模块实现通讯。

在EII Message Bus的Samples案例中,例如发布程序”Samples/publisher/python/publisher.py”和订阅程序”Samples/subscriber/python/subscriber.py”,其中既包含了对EII ConfigMgr API(负责从ETCD数据库读取模块的配置信息)的使用,又包含了对EII Message Bus API的使用。

这里我们将EII Message Bus的使用单独提取出来,来学习一下EII Message Bus API的使用。



2. 修改发布和订阅程序

  • 首先,我们将”publisher.py”脚本改写成如下内容:
import time
import eii.msgbus as mb
import os
import random

def start_publisher():
    publisher = None

    try:
        msgbus_cfg = {
            'type': 'zmq_tcp', 
            'zmq_tcp_publish': {
                'host': '0.0.0.0', 
                'port': 65021
            }
        }
        topic = 'camera1_stream_results'
        loop_interval = 1

        print('[INFO] Initializing message bus context')
        msgbus_pub = mb.MsgbusContext(msgbus_cfg)
        publisher = msgbus_pub.new_publisher(topic)

        print('[INFO] Running...')
        while True:
            blob = b'\x22' * 10
            meta = {
                'temperature': random.uniform(10,30)
            }

            publisher.publish((meta, blob,))
            print(f'[INFO] Msg published by publisher :  \'{meta}\'')
            time.sleep(loop_interval)
    except KeyboardInterrupt:
        print('[INFO] Quitting...')
    finally:
        if publisher is not None:
            publisher.close()

if __name__ == "__main__":
	start_publisher()

上述发布程序去掉了ConfigMgr相关的操作,将配置信息直接写死(hardcode)在了程序中,例如程序中对”msgbus_cfg”(包含本机IP和端口号Port),“topic”,”loop_interval”的定义。

  • 接着,我们将”subscriber.py”脚本改写成如下内容:
import eii.msgbus as mb
import os
import cfgmgr.config_manager as cfg

def start_subscriber():
    subscriber = None

    try:
        msgbus_cfg = {
            'type': 'zmq_tcp', 
            'camera1_stream_results': {
                'host': '192.168.2.101', 
                'port': 65021
            }
        }
        topic = 'camera1_stream_results'

        print('[INFO] Initializing message bus context')
        msgbus_sub = mb.MsgbusContext(msgbus_cfg)
        subscriber = msgbus_sub.new_subscriber(topic)

        print('[INFO] Running...')
        while True:
            msg, _ = subscriber.recv()
            if msg is not None:
                print(f'[INFO] RECEIVED by subscriber : {msg}')
            else:
                print('[INFO] Receive interrupted')
    except KeyboardInterrupt:
        print('[INFO] Quitting...')
    finally:
        if subscriber is not None:
            subscriber.close()

if __name__ == "__main__":
	start_subscriber()

同样,在订阅程序中,我们去掉了ConfigMgr相关的操作,将配置信息直接写死在了程序中,例如程序中对”msgbus_cfg”,”topic”的定义。



3. 安装环境依赖

以上,我们修改好了发布和订阅程序,接下来我们尝试运行一下。不过EII Message Bus的运行需要安装一些依赖包,我们得先安装环境依赖。

在EII的Samples中,依赖包是在Dockerfile中进行安装的。我们将其中的安装步骤提取出来,尝试在Host系统上安装一下,参考”Samples/publisher/python/ubuntu/Dockerfile”文件,编写如下安装脚本”install.sh”:

#!/bin/bash

set -e
sudo -v
CUR_DIR=$PWD

# Install dep tools, python3, cjson, libzmq
sudo apt update
sudo apt install -y build-essential cmake wget python3.8 python3.8-dev python3-distutils python3-setuptools python3-pip libcjson-dev libzmq3-dev

mkdir -p $CUR_DIR/dpkgs
# Install eii-utils and eii-messagebus
cd $CUR_DIR/dpkgs
wget https://gitee.com/open-edge-insights/eii-manifests/releases/download/v3.0/eii-utils-3.0.0-Linux.deb
wget https://gitee.com/open-edge-insights/eii-manifests/releases/download/v3.0/eii-messagebus-3.0.0-Linux.deb
sudo dpkg -i eii-*

# eii-messagebus python deps
pip3 install eii-messagebus==3.0 -i https://pypi.tuna.tsinghua.edu.cn/simple
  • 在Host系统,打开一个命令行窗口,执行上述安装程序,完成EII Message Bus依赖包的安装。
$ sudo chmod +x install.sh
$ ./install.sh



4. 在Host上运行发布和订阅程序

安装好EII Message Bus的环境依赖后,我们就可以运行”publisher.py”和”subscriber.py”程序了。

  • 打开命令行窗口,执行订阅程序。
$ python subscriber.py
  • 打开另外一个命令行窗口,执行发布程序。
$ python publisher.py
  • 此时,可以看到”subscriber.py”窗口成功接收到消息。



5. EII时序栈订阅Host上EII Message Bus发布的数据

想要让EII时序栈(Telegraf)接收Host上程序发布的数据,其实非常简单。我们打开Telegraf的配置文件”Telegra/config.json”,将”Subscriber”接口的定义由原先从某个容器订阅数据,改为从本机的发布程序的端口订阅数据。

# "Telegra/config.json"文件第38行:
"EndPoint": "ia_ubuntu_python_sample_pub:65020",

# 修改为:
"EndPoint": "192.168.2.101:65021",

只需修改上述一行,我们即配置好了Telegraf从Host上的”publisher.py”程序订阅数据。

接下来,我们启动EII的时序栈。

  • 首先确认模块编排文件”build/usecases/test-emb.yml”包含如下模块。(这里仍然需要带上Samples/publisher和Samples/subscriber模块,属于EII的一个小bug,后续文章再解释)
- ConfigMgrAgent
- Samples/publisher/python/ubuntu
- Samples/subscriber/python/ubuntu
- Telegraf
- InfluxDBConnector
- Grafana
- Kapacitor
  • 编译模块配置信息
$ cd ~/eii/IEdgeInsights/build
$ sudo -E python3 builder.py -f usecases/test-emb.yml
  • 启动EII软件栈。
$ ./eii_start.sh
  • 运行上一章节,准备的发布程序。
$ python publisher.py
  • 打开浏览器,输入网址”localhost:3000″登录Grafana界面 (默认账号: admin, 密码: admin)。

  • 参考上一篇文章,在Grafana界面中查看数据成功收到。


后记

  • 更多EII Message Bus编程相关信息请参考官方教程:https://open-edge-insights.github.io/pages/msgbus.html#



版权声明:本文为weixin_39816256原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。