linux下通过python实现海康工业相机(MV-CA013-A0UC)USB连接,图像抓拍及实现简单的图像分类

  • Post author:
  • Post category:linux



电眼监控设备是否到位–信号传递给网络继电器(聚英的设备)–服务端监控继电器输入–打开相机采集图像–图像分类算法–返回结果进行逻辑处理–输出信号


1、下载海康MVS的客户端,文件包中有相关python例子;找到MvImport工具包


在这里插入图片描述


2、借助flask部署海康抓拍和一个简单的图像分类模型(resNet)

sys.path.append("./MvImport") #导入工具包
#模型部署相关代码
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

data_transform = transforms.Compose(
    [transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize([0.54, 0.40, 0.43], [0.23, 0.22, 0.22])])

# read class_indict
json_path = './class_img/class_carton.json'
assert os.path.exists(json_path), "file: '{}' dose not exist.".format(json_path)
json_file = open(json_path, "r")
class_indict = json.load(json_file)

# create model
model = action(num_classes=3)
# load model weights
weights_path = "./class_img/carton_0225.pth"
assert os.path.exists(weights_path), "file: '{}' dose not exist.".format(weights_path)
model.load_state_dict(torch.load(weights_path, map_location=device))

#hik相机初始化,其中FeatureFile.ini参数是海康客户端导出的相机配置
def press_any_key_exit():
    fd = sys.stdin.fileno()
    old_ttyinfo = termios.tcgetattr(fd)
    new_ttyinfo = old_ttyinfo[:]
    new_ttyinfo[3] &= ~termios.ICANON
    new_ttyinfo[3] &= ~termios.ECHO
    # sys.stdout.write(msg)
    # sys.stdout.flush()
    termios.tcsetattr(fd, termios.TCSANOW, new_ttyinfo)
    try:
        os.read(fd, 7)
    except:
        pass
    finally:
        termios.tcsetattr(fd, termios.TCSANOW, old_ttyinfo)

SDKVersion = MvCamera.MV_CC_GetSDKVersion()
print("SDKVersion[0x%x]" % SDKVersion)

deviceList = MV_CC_DEVICE_INFO_LIST()
tlayerType = MV_GIGE_DEVICE | MV_USB_DEVICE

# ch:枚举设备 | en:Enum device
ret = MvCamera.MV_CC_EnumDevices(tlayerType, deviceList)
if ret != 0:
    print("enum devices fail! ret[0x%x]" % ret)
    sys.exit()

if deviceList.nDeviceNum == 0:
    print("find no device!")
    sys.exit()

print("Find %d devices!" % deviceList.nDeviceNum)

for i in range(0, deviceList.nDeviceNum):
    mvcc_dev_info = cast(deviceList.pDeviceInfo[i], POINTER(MV_CC_DEVICE_INFO)).contents
    if mvcc_dev_info.nTLayerType == MV_GIGE_DEVICE:
        print("\ngige device: [%d]" % i)
        strModeName = ""
        for per in mvcc_dev_info.SpecialInfo.stGigEInfo.chModelName:
            strModeName = strModeName + chr(per)
        print("device model name: %s" % strModeName)

        nip1 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0xff000000) >> 24)
        nip2 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x00ff0000) >> 16)
        nip3 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x0000ff00) >> 8)
        nip4 = (mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x000000ff)
        print("current ip: %d.%d.%d.%d\n" % (nip1, nip2, nip3, nip4))
    elif mvcc_dev_info.nTLayerType == MV_USB_DEVICE:
        print("\nu3v device: [%d]" % i)
        strModeName = ""
        for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chModelName:
            if per == 0:
                break
            strModeName = strModeName + chr(per)
        print("device model name: %s" % strModeName)

        strSerialNumber = ""
        for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chSerialNumber:
            if per == 0:
                break
            strSerialNumber = strSerialNumber + chr(per)
        print("user serial number: %s" % strSerialNumber)

if sys.version >= '3':
    nConnectionNum = 0
else:
    nConnectionNum = raw_input("please input the number of the device to connect:")

if int(nConnectionNum) >= deviceList.nDeviceNum:
    print("intput error!")
    sys.exit()

# ch:创建相机实例 | en:Creat Camera Object
cam = MvCamera()

# ch:选择设备并创建句柄| en:Select device and create handle
stDeviceList = cast(deviceList.pDeviceInfo[int(nConnectionNum)], POINTER(MV_CC_DEVICE_INFO)).contents

ret = cam.MV_CC_CreateHandle(stDeviceList)
if ret != 0:
    print("create handle fail! ret[0x%x]" % ret)
    sys.exit()

# ch:打开设备 | en:Open device
ret = cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0)
if ret != 0:
    print("open device fail! ret[0x%x]" % ret)
    sys.exit()

# ch:设置触发模式为off | en:Set trigger mode as off
ret = cam.MV_CC_SetEnumValue("TriggerMode", MV_TRIGGER_MODE_OFF)
if ret != 0:
    print("set trigger mode fail! ret[0x%x]" % ret)
    sys.exit()

# ch:从文件中导入相机属性 | en:Import the camera properties from the file
ret = cam.MV_CC_FeatureLoad("FeatureFile.ini")
if MV_OK != ret:
    print("load feature fail! ret [0x%x]" % ret)
print("finish import the camera properties from the file")
# ch:获取数据包大小 | en:Get payload size
stParam = MVCC_INTVALUE()
memset(byref(stParam), 0, sizeof(MVCC_INTVALUE))

ret = cam.MV_CC_GetIntValue("PayloadSize", stParam)
if ret != 0:
    print("get payload size fail! ret[0x%x]" % ret)
    sys.exit()
nPayloadSize = stParam.nCurValue

# ch:开始取流 | en:Start grab image
ret = cam.MV_CC_StartGrabbing()
if ret != 0:
    print("start grabbing fail! ret[0x%x]" % ret)
    sys.exit()

data_buf = (c_ubyte * nPayloadSize)()
nDataSize = nPayloadSize
# try:
#     hThreadHandle = threading.Thread(target=work_thread, args=(cam, data_buf, nPayloadSize))
#     hThreadHandle.start()
# except:
#     print("error: unable to start thread")
print("press a key to stop grabbing.")
# press_any_key_exit()
g_bExit = True
# hThreadHandle.join()


完整代码如下

from flask import Flask, request
import sys
import termios
import cv2 as cv
from ctypes import *

import os
import time
import os
import json
import time

import torch
from PIL import Image,ImageFile
from torchvision import transforms
import matplotlib.pyplot as plt

from class_img.model import resnet34,action
sys.path.append("./MvImport")
from MvCameraControl_class import *
ImageFile.LOAD_TRUNCATED_IMAGES = True

g_bExit = False

app = Flask(__name__)

global cam, data_buf, nDataSize

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

data_transform = transforms.Compose(
    [transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize([0.54, 0.40, 0.43], [0.23, 0.22, 0.22])])

# read class_indict
json_path = './class_img/class_carton.json'
assert os.path.exists(json_path), "file: '{}' dose not exist.".format(json_path)
json_file = open(json_path, "r")
class_indict = json.load(json_file)

# create model
model = action(num_classes=3)
# load model weights
weights_path = "./class_img/carton_0225.pth"
assert os.path.exists(weights_path), "file: '{}' dose not exist.".format(weights_path)
model.load_state_dict(torch.load(weights_path, map_location=device))


#hik相机初始化
def press_any_key_exit():
    fd = sys.stdin.fileno()
    old_ttyinfo = termios.tcgetattr(fd)
    new_ttyinfo = old_ttyinfo[:]
    new_ttyinfo[3] &= ~termios.ICANON
    new_ttyinfo[3] &= ~termios.ECHO
    # sys.stdout.write(msg)
    # sys.stdout.flush()
    termios.tcsetattr(fd, termios.TCSANOW, new_ttyinfo)
    try:
        os.read(fd, 7)
    except:
        pass
    finally:
        termios.tcsetattr(fd, termios.TCSANOW, old_ttyinfo)

SDKVersion = MvCamera.MV_CC_GetSDKVersion()
print("SDKVersion[0x%x]" % SDKVersion)

deviceList = MV_CC_DEVICE_INFO_LIST()
tlayerType = MV_GIGE_DEVICE | MV_USB_DEVICE

# ch:枚举设备 | en:Enum device
ret = MvCamera.MV_CC_EnumDevices(tlayerType, deviceList)
if ret != 0:
    print("enum devices fail! ret[0x%x]" % ret)
    sys.exit()

if deviceList.nDeviceNum == 0:
    print("find no device!")
    sys.exit()

print("Find %d devices!" % deviceList.nDeviceNum)

for i in range(0, deviceList.nDeviceNum):
    mvcc_dev_info = cast(deviceList.pDeviceInfo[i], POINTER(MV_CC_DEVICE_INFO)).contents
    if mvcc_dev_info.nTLayerType == MV_GIGE_DEVICE:
        print("\ngige device: [%d]" % i)
        strModeName = ""
        for per in mvcc_dev_info.SpecialInfo.stGigEInfo.chModelName:
            strModeName = strModeName + chr(per)
        print("device model name: %s" % strModeName)

        nip1 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0xff000000) >> 24)
        nip2 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x00ff0000) >> 16)
        nip3 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x0000ff00) >> 8)
        nip4 = (mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x000000ff)
        print("current ip: %d.%d.%d.%d\n" % (nip1, nip2, nip3, nip4))
    elif mvcc_dev_info.nTLayerType == MV_USB_DEVICE:
        print("\nu3v device: [%d]" % i)
        strModeName = ""
        for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chModelName:
            if per == 0:
                break
            strModeName = strModeName + chr(per)
        print("device model name: %s" % strModeName)

        strSerialNumber = ""
        for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chSerialNumber:
            if per == 0:
                break
            strSerialNumber = strSerialNumber + chr(per)
        print("user serial number: %s" % strSerialNumber)

if sys.version >= '3':
    nConnectionNum = 0
else:
    nConnectionNum = raw_input("please input the number of the device to connect:")

if int(nConnectionNum) >= deviceList.nDeviceNum:
    print("intput error!")
    sys.exit()

# ch:创建相机实例 | en:Creat Camera Object
cam = MvCamera()

# ch:选择设备并创建句柄| en:Select device and create handle
stDeviceList = cast(deviceList.pDeviceInfo[int(nConnectionNum)], POINTER(MV_CC_DEVICE_INFO)).contents

ret = cam.MV_CC_CreateHandle(stDeviceList)
if ret != 0:
    print("create handle fail! ret[0x%x]" % ret)
    sys.exit()

# ch:打开设备 | en:Open device
ret = cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0)
if ret != 0:
    print("open device fail! ret[0x%x]" % ret)
    sys.exit()

# ch:设置触发模式为off | en:Set trigger mode as off
ret = cam.MV_CC_SetEnumValue("TriggerMode", MV_TRIGGER_MODE_OFF)
if ret != 0:
    print("set trigger mode fail! ret[0x%x]" % ret)
    sys.exit()

# ch:从文件中导入相机属性 | en:Import the camera properties from the file
ret = cam.MV_CC_FeatureLoad("FeatureFile.ini")
if MV_OK != ret:
    print("load feature fail! ret [0x%x]" % ret)
print("finish import the camera properties from the file")
# ch:获取数据包大小 | en:Get payload size
stParam = MVCC_INTVALUE()
memset(byref(stParam), 0, sizeof(MVCC_INTVALUE))

ret = cam.MV_CC_GetIntValue("PayloadSize", stParam)
if ret != 0:
    print("get payload size fail! ret[0x%x]" % ret)
    sys.exit()
nPayloadSize = stParam.nCurValue

# ch:开始取流 | en:Start grab image
ret = cam.MV_CC_StartGrabbing()
if ret != 0:
    print("start grabbing fail! ret[0x%x]" % ret)
    sys.exit()

data_buf = (c_ubyte * nPayloadSize)()
nDataSize = nPayloadSize
# try:
#     hThreadHandle = threading.Thread(target=work_thread, args=(cam, data_buf, nPayloadSize))
#     hThreadHandle.start()
# except:
#     print("error: unable to start thread")
print("press a key to stop grabbing.")
# press_any_key_exit()
g_bExit = True
# hThreadHandle.join()


#flask start抓拍接口
@app.route('/detect', methods=['POST'])
def upload_image1():
    # Check if a valid image file was uploaded
    if request.method == 'POST':
        start =time.time()
        haha = grab_img(cam,data_buf,nDataSize)
        print(time.time()-start)
        return haha
    # If no valid image file was uploaded, show the file upload form:
    return

#hik抓拍代码
def grab_img(cam, data_buf, nDataSize):
    stFrameInfo = MV_FRAME_OUT_INFO_EX()
    memset(byref(stFrameInfo), 0, sizeof(stFrameInfo))
    while True:
        ret = cam.MV_CC_GetOneFrameTimeout(byref(data_buf), nDataSize, stFrameInfo, 10000)
        if ret == 0:
            print("get one frame: Width[%d], Height[%d], PixelType[0x%x], nFrameNum[%d]" % (
            stFrameInfo.nWidth, stFrameInfo.nHeight, stFrameInfo.enPixelType, stFrameInfo.nFrameNum))
            stConvertParam = MV_SAVE_IMAGE_PARAM_EX()
            stConvertParam.nWidth = stFrameInfo.nWidth
            stConvertParam.nHeight = stFrameInfo.nHeight
            stConvertParam.pData = data_buf
            stConvertParam.nDataLen = stFrameInfo.nFrameLen
            stConvertParam.enPixelType = stFrameInfo.enPixelType
            file_name = str(time.time()).replace(".","_")+".bmp"
            file_path = os.path.join("./images/",file_name)
            stConvertParam.enImageType = MV_Image_Bmp
            bmpsize = stFrameInfo.nWidth * stFrameInfo.nHeight * 3 + 54

            stConvertParam.nBufferSize = bmpsize
            bmp_buf = (c_ubyte * bmpsize)()
            stConvertParam.pImageBuffer = bmp_buf

            ret = cam.MV_CC_SaveImageEx2(stConvertParam)
            if ret != 0:
                print("save file executed failed0:! ret[0x%x]" % ret)
                del data_buf
                sys.exit()
            # print(stop - start)
            file_open = open(file_path.encode('ascii'), 'wb+')
            try:
                img_buff = (c_ubyte * stConvertParam.nDataLen)()
                memmove(byref(img_buff), stConvertParam.pImageBuffer, stConvertParam.nDataLen)
                file_open.write(img_buff)
            except Exception as e:
                raise Exception("save file executed failed1::%s" % e)
            finally:
                file_open.close()

            img = Image.open(file_path).convert('RGB')
            img = cv.resize(img,(int(img.shape[1]/2)),int(img.shape[1]/2))
            # [N, C, H, W]
            img = data_transform(img)
            # expand batch dimension
            img = torch.unsqueeze(img, dim=0)
            # 分类网络
            model.eval()
            with torch.no_grad():
                # predict class
                output = torch.squeeze(model(img))
                predict = torch.softmax(output, dim=0)
                predict_cla = torch.argmax(predict).numpy()
            class_name = class_indict[str(predict_cla)]
            calss_num = predict[predict_cla].numpy()
            print_res = "class: {}   prob: {:.3}".format(class_indict[str(predict_cla)],
                                                         predict[predict_cla].numpy())
            print(print_res)
            return json.dumps({"class":str(class_name), "prob":str(calss_num)})
        else:
            print("no data[0x%x]" % ret)
            return json.dumps({"class":"null", "prob":"null"})
        if g_bExit == True:
            break

if __name__ == "__main__":
    # app.run('0.0.0.0', port=8989, debug=True,  use_reloader=False,processes=1)
    app.run('0.0.0.0', port=8989, debug=False,processes=1)


如果仅仅需要海康相机抓拍,可以不采用flask部署和检测模型模块代码。


例子:

# -- coding: utf-8 --

import sys
import threading
import os
import termios
import cv2 as cv
from ctypes import *
import time
import numpy as np
sys.path.append("./MvImport")
from MvCameraControl_class import *
g_bExit = False

# 为线程定义一个函数
def work_thread(cam=0, data_buf=0, nDataSize=0):
    stFrameInfo = MV_FRAME_OUT_INFO_EX()
    memset(byref(stFrameInfo), 0, sizeof(stFrameInfo))
    while True:
        ret = cam.MV_CC_GetOneFrameTimeout(byref(data_buf), nDataSize, stFrameInfo, 10000)
        if ret == 0:
            print("get one frame: Width[%d], Height[%d], PixelType[0x%x], nFrameNum[%d]" % (
            stFrameInfo.nWidth, stFrameInfo.nHeight, stFrameInfo.enPixelType, stFrameInfo.nFrameNum))
            stConvertParam = MV_SAVE_IMAGE_PARAM_EX()
            stConvertParam.nWidth = stFrameInfo.nWidth
            stConvertParam.nHeight = stFrameInfo.nHeight
            stConvertParam.pData = data_buf
            stConvertParam.nDataLen = stFrameInfo.nFrameLen
            stConvertParam.enPixelType = stFrameInfo.enPixelType
            file_name = str(time.time()).replace(".","_")+".bmp"
            file_path = os.path.join("./images/",file_name)
            stConvertParam.enImageType = MV_Image_Bmp

            bmpsize = stFrameInfo.nWidth * stFrameInfo.nHeight * 3 + 54

            stConvertParam.nBufferSize = bmpsize
            bmp_buf = (c_ubyte * bmpsize)()
            stConvertParam.pImageBuffer = bmp_buf

            ret = cam.MV_CC_SaveImageEx2(stConvertParam)
            if ret != 0:
                print("save file executed failed0:! ret[0x%x]" % ret)
                del data_buf
                sys.exit()
            # print(stop - start)
            file_open = open(file_path.encode('ascii'), 'wb+')
            try:
                img_buff = (c_ubyte * stConvertParam.nDataLen)()
                memmove(byref(img_buff), stConvertParam.pImageBuffer, stConvertParam.nDataLen)
                file_open.write(img_buff)
            except Exception as e:
                raise Exception("save file executed failed1::%s" % e)
            finally:
                file_open.close()
        else:
            print("no data[0x%x]" % ret)
        if g_bExit == True:
            break


def press_any_key_exit():
    fd = sys.stdin.fileno()
    old_ttyinfo = termios.tcgetattr(fd)
    new_ttyinfo = old_ttyinfo[:]
    new_ttyinfo[3] &= ~termios.ICANON
    new_ttyinfo[3] &= ~termios.ECHO
    # sys.stdout.write(msg)
    # sys.stdout.flush()
    termios.tcsetattr(fd, termios.TCSANOW, new_ttyinfo)
    try:
        os.read(fd, 7)
    except:
        pass
    finally:
        termios.tcsetattr(fd, termios.TCSANOW, old_ttyinfo)


if __name__ == "__main__":

    SDKVersion = MvCamera.MV_CC_GetSDKVersion()
    print("SDKVersion[0x%x]" % SDKVersion)

    deviceList = MV_CC_DEVICE_INFO_LIST()
    tlayerType = MV_GIGE_DEVICE | MV_USB_DEVICE

    # ch:枚举设备 | en:Enum device
    ret = MvCamera.MV_CC_EnumDevices(tlayerType, deviceList)
    if ret != 0:
        print("enum devices fail! ret[0x%x]" % ret)
        sys.exit()

    if deviceList.nDeviceNum == 0:
        print("find no device!")
        sys.exit()

    print("Find %d devices!" % deviceList.nDeviceNum)

    for i in range(0, deviceList.nDeviceNum):
        mvcc_dev_info = cast(deviceList.pDeviceInfo[i], POINTER(MV_CC_DEVICE_INFO)).contents
        if mvcc_dev_info.nTLayerType == MV_GIGE_DEVICE:
            print("\ngige device: [%d]" % i)
            strModeName = ""
            for per in mvcc_dev_info.SpecialInfo.stGigEInfo.chModelName:
                strModeName = strModeName + chr(per)
            print("device model name: %s" % strModeName)

            nip1 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0xff000000) >> 24)
            nip2 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x00ff0000) >> 16)
            nip3 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x0000ff00) >> 8)
            nip4 = (mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x000000ff)
            print("current ip: %d.%d.%d.%d\n" % (nip1, nip2, nip3, nip4))
        elif mvcc_dev_info.nTLayerType == MV_USB_DEVICE:
            print("\nu3v device: [%d]" % i)
            strModeName = ""
            for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chModelName:
                if per == 0:
                    break
                strModeName = strModeName + chr(per)
            print("device model name: %s" % strModeName)

            strSerialNumber = ""
            for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chSerialNumber:
                if per == 0:
                    break
                strSerialNumber = strSerialNumber + chr(per)
            print("user serial number: %s" % strSerialNumber)

    if sys.version >= '3':
        nConnectionNum = input("please input the number of the device to connect:")
    else:
        nConnectionNum = raw_input("please input the number of the device to connect:")

    if int(nConnectionNum) >= deviceList.nDeviceNum:
        print("intput error!")
        sys.exit()

    # ch:创建相机实例 | en:Creat Camera Object
    cam = MvCamera()

    # ch:选择设备并创建句柄| en:Select device and create handle
    stDeviceList = cast(deviceList.pDeviceInfo[int(nConnectionNum)], POINTER(MV_CC_DEVICE_INFO)).contents

    ret = cam.MV_CC_CreateHandle(stDeviceList)
    if ret != 0:
        print("create handle fail! ret[0x%x]" % ret)
        sys.exit()

    # ch:打开设备 | en:Open device
    ret = cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0)
    if ret != 0:
        print("open device fail! ret[0x%x]" % ret)
        sys.exit()

    # ch:探测网络最佳包大小(只对GigE相机有效) | en:Detection network optimal package size(It only works for the GigE camera)
    # if stDeviceList.nTLayerType == MV_GIGE_DEVICE:
    #     nPacketSize = cam.MV_CC_GetOptimalPacketSize()
    #     if int(nPacketSize) > 0:
    #         ret = cam.MV_CC_SetIntValue("GevSCPSPacketSize", nPacketSize)
    #         if ret != 0:
    #             print("Warning: Set Packet Size fail! ret[0x%x]" % ret)
    #     else:
    #         print("Warning: Get Packet Size fail! ret[0x%x]" % nPacketSize)

    # ch:设置触发模式为off | en:Set trigger mode as off
    ret = cam.MV_CC_SetEnumValue("TriggerMode", MV_TRIGGER_MODE_OFF)
    if ret != 0:
        print("set trigger mode fail! ret[0x%x]" % ret)
        sys.exit()

    # ch:从文件中导入相机属性 | en:Import the camera properties from the file
    ret = cam.MV_CC_FeatureLoad("FeatureFile.ini")
    if MV_OK != ret:
        print("load feature fail! ret [0x%x]" % ret)
    print("finish import the camera properties from the file")
    # ch:获取数据包大小 | en:Get payload size
    stParam = MVCC_INTVALUE()
    memset(byref(stParam), 0, sizeof(MVCC_INTVALUE))
    
    ret = cam.MV_CC_GetIntValue("PayloadSize", stParam)
    if ret != 0:
        print("get payload size fail! ret[0x%x]" % ret)
        sys.exit()
    nPayloadSize = stParam.nCurValue

    # ch:开始取流 | en:Start grab image
    ret = cam.MV_CC_StartGrabbing()
    if ret != 0:
        print("start grabbing fail! ret[0x%x]" % ret)
        sys.exit()

    data_buf = (c_ubyte * nPayloadSize)()

    try:
        hThreadHandle = threading.Thread(target=work_thread, args=(cam, data_buf, nPayloadSize))
        hThreadHandle.start()
    except:
        print("error: unable to start thread")

    print("press a key to stop grabbing.")
    press_any_key_exit()

    g_bExit = True
    hThreadHandle.join()

    # ch:停止取流 | en:Stop grab image
    ret = cam.MV_CC_StopGrabbing()
    if ret != 0:
        print("stop grabbing fail! ret[0x%x]" % ret)
        del data_buf
        sys.exit()

    # ch:关闭设备 | Close device
    ret = cam.MV_CC_CloseDevice()
    if ret != 0:
        print("close deivce fail! ret[0x%x]" % ret)
        del data_buf
        sys.exit()

    # ch:销毁句柄 | Destroy handle
    ret = cam.MV_CC_DestroyHandle()
    if ret != 0:
        print("destroy handle fail! ret[0x%x]" % ret)
        del data_buf
        sys.exit()

    del data_buf



版权声明:本文为weixin_40717742原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。