关于OK3568J-C在ubuntu20系统上使用python3支持MCP2515的SPI转CAN的实现

  • Post author:
  • Post category:python




关于OK3568J-C在ubuntu20系统上使用python3支持MCP2515的SPI转CAN的实现



一)安装支持包

sudo apt install python3-pip

sudo pip install spidev

sudo pip install python-can



说明

飞凌的OK3568J-C有一个50针双列直插,有一个spi2带两个cs,分别代表:

/dev/spi2.0 ->spi2 cs0,公用SI,SO,CLK

/dev/spi2.1 ->spi2 cs1,公用SI,SO,CLK

板子自带两路CANFD,我们使用can1做为测试端,连线为:

can1 H ->MCP2515模块H

can1 L ->MCP2515模块L

spi转can驱动过程:

1)初始化:设置寄存器

2)读处理:使用线程监听数据,监听寄存器状态,有数据再读取

3)写操作:写入寄存器,对总线错误进行抛弃数据处理



三)spi转can代码实现

"""
用于完成MCP2515的spi转can2.0B的驱动python支持
编写:宋连猛 2022.09.28
使用:spidev模块,调用了linux spi驱动,配置项有:
#配置
max_speed_hz           #通信时钟最大频率
mode                   #spi mode   0b00~0b11
no_cs                  #设置 SPI_NO_CS标志是否使用CS
threewire              #共享SI/SO信号
bits_per_word          #字节位数
#函数区别:
xfer([])/xfer2([])为同时写读的函数,使用了ioctl()调用SPI_IOC_MESSAGE(n)
writebytes([])/readbytes([])为单一功能函数,使用write()/read()完成
# spi说明:
spi2.0->spi2,cs0
spi2.1->spi2,cs1
spi0不可用
# can口说明:
1)正常数据发送:总线错误时等待正常再发送;此驱动数据发送:总线错误时丢弃发送数据
2)can总线无设备时,会报数据总线错误
"""
import spidev
import time
import copy
from threading import Thread
from queue import Queue


class MCP2515MSG(object):
    """
    消息结构体
    """

    def __init__(self):
        self.uc_exId = 0  # 扩展ID,0或1
        self.ul_id = 0  # ID
        self.uc_dlc = 0  # 数据长度
        self.uc_dta = [0x00] * 8  # 数据列表
        self.uc_rtr = 0  # 远程帧标识,正常帧值为0x0,远程帧为0x2
        self.uc_filhit = 0  # 过滤模式,总共有14中,定义有宏,其值依次为0x1,0x2,0x4,0x8,0x10,0x20,0x40...
        self.channel = None  # 通道号"can2","can3"


class myCAN(object):
    """
    用于完成CAN2/3接口的spi数据收发
    """

    def __init__(self):
        self.spi1 = spidev.SpiDev()  # 建立spi对象
        self.spi2 = spidev.SpiDev()  # 建立spi对象
        self.max_speed_hz = 6000000  # 频率6000khz
        self.delay_usec = 100  # 延时us,xfer使用
        self.mode = 0b00  # 工作模式,3
        self.bits_per_word = 8  # 字节位数
        self.spi_bus = 2  # 总线号
        self.spi1_device = 0  # 图纸中spi2,cs0
        self.spi2_device = 1  # 图纸中spi2,cs1
        self.startrecv = False  # 开始接收,用于接收处理
        self.recv_thread = None  # 接收线程,默认为None
        self.can2flag = False  # 接收CAN2标记,默认假
        self.can3flag = False  # 接收CAN2标记,默认假
        self.callback = None  # 回调函数,默认None
        self.send_q_can2 = Queue()  # 定义发送队列,can2
        self.send_q_can3 = Queue()  # 定义发送队列,can3
        self.defineVariable()  # 定义变量
        # 开启线程
        self.recv_thread = Thread(target=self.recvThread, daemon=True)  # 接收线程
        self.startrecv = True  # 开始接收标记为真
        self.recv_thread.start()  # 开启接收线程

    def defineVariable(self):
        """
        定义变量
        :return:None
        """
        # Begin mt
        self.MCP_SIDH = 0
        self.MCP_SIDL = 1
        self.MCP_EID8 = 2
        self.MCP_EID0 = 3
        self.MCP_TXB_EXIDE_M = 0x08  # In TXBnSIDL
        self.MCP_DLC_MASK = 0x0F  # 4 LSBits
        self.MCP_RTR_MASK = 0x40  # (1<<6) Bit 6
        self.MCP_RXB_RX_ANY = 0x60
        self.MCP_RXB_RX_EXT = 0x40
        self.MCP_RXB_RX_STD = 0x20
        self.MCP_RXB_RX_STDEXT = 0x00
        self.MCP_RXB_RX_MASK = 0x60
        self.MCP_RXB_BUKT_MASK = (1 << 2)
        # Bits in the TXBnCTRL registers.
        self.MCP_TXB_TXBUFE_M = 0x80
        self.MCP_TXB_ABTF_M = 0x40
        self.MCP_TXB_MLOA_M = 0x20
        self.MCP_TXB_TXERR_M = 0x10
        self.MCP_TXB_TXREQ_M = 0x08
        self.MCP_TXB_TXIE_M = 0x04
        self.MCP_TXB_TXP10_M = 0x03
        self.MCP_TXB_RTR_M = 0x40  # In TXBnDLC
        self.MCP_RXB_IDE_M = 0x08  # In RXBnSIDL
        self.MCP_RXB_RTR_M = 0x40  # In RXBnDLC
        self.MCP_STAT_RXIF_MASK = 0x03
        self.MCP_STAT_RX0IF = (1 << 0)
        self.MCP_STAT_RX1IF = (1 << 1)
        self.MCP_EFLG_RX1OVR = (1 << 7)
        self.MCP_EFLG_RX0OVR = (1 << 6)
        self.MCP_EFLG_TXBO = (1 << 5)
        self.MCP_EFLG_TXEP = (1 << 4)
        self.MCP_EFLG_RXEP = (1 << 3)
        self.MCP_EFLG_TXWAR = (1 << 2)
        self.MCP_EFLG_RXWAR = (1 << 1)
        self.MCP_EFLG_EWARN = (1 << 0)
        self.MCP_EFLG_ERRORMASK = 0xF8  # 5 MS-Bits
        # End mt
        # Define MCP2515 register addresses
        self.MCP_RXF0SIDH = 0x00
        self.MCP_RXF0SIDL = 0x01
        self.MCP_RXF0EID8 = 0x02
        self.MCP_RXF0EID0 = 0x03
        self.MCP_RXF1SIDH = 0x04
        self.MCP_RXF1SIDL = 0x05
        self.MCP_RXF1EID8 = 0x06
        self.MCP_RXF1EID0 = 0x07
        self.MCP_RXF2SIDH = 0x08
        self.MCP_RXF2SIDL = 0x09
        self.MCP_RXF2EID8 = 0x0A
        self.MCP_RXF2EID0 = 0x0B
        self.MCP_CANSTAT = 0x0E
        self.MCP_CANCTRL = 0x0F
        self.MCP_RXF3SIDH = 0x10
        self.MCP_RXF3SIDL = 0x11
        self.MCP_RXF3EID8 = 0x12
        self.MCP_RXF3EID0 = 0x13
        self.MCP_RXF4SIDH = 0x14
        self.MCP_RXF4SIDL = 0x15
        self.MCP_RXF4EID8 = 0x16
        self.MCP_RXF4EID0 = 0x17
        self.MCP_RXF5SIDH = 0x18
        self.MCP_RXF5SIDL = 0x19
        self.MCP_RXF5EID8 = 0x1A
        self.MCP_RXF5EID0 = 0x1B
        self.MCP_TEC = 0x1C
        self.MCP_REC = 0x1D
        self.MCP_RXM0SIDH = 0x20
        self.MCP_RXM0SIDL = 0x21
        self.MCP_RXM0EID8 = 0x22
        self.MCP_RXM0EID0 = 0x23
        self.MCP_RXM1SIDH = 0x24
        self.MCP_RXM1SIDL = 0x25
        self.MCP_RXM1EID8 = 0x26
        self.MCP_RXM1EID0 = 0x27
        self.MCP_CNF3 = 0x28
        self.MCP_CNF2 = 0x29
        self.MCP_CNF1 = 0x2A
        self.MCP_CANINTE = 0x2B
        self.MCP_CANINTF = 0x2C
        self.MCP_EFLG = 0x2D
        self.MCP_TXB0CTRL = 0x30
        self.MCP_TXB1CTRL = 0x40
        self.MCP_TXB2CTRL = 0x50
        self.MCP_RXB0CTRL = 0x60
        self.MCP_RXB0SIDH = 0x61
        self.MCP_RXB1CTRL = 0x70
        self.MCP_RXB1SIDH = 0x71
        self.MCP_TX_INT = 0x1C  # Enable all transmit interrupts
        self.MCP_TX01_INT = 0x0C  # Enable TXB0 and TXB1 interrupts
        self.MCP_RX_INT = 0x03  # Enable receive interrupts
        self.MCP_NO_INT = 0x00  # Disable all interrupts
        self.MCP_TX01_MASK = 0x14
        self.MCP_TX_MASK = 0x54
        # Define SPI Instruction Set
        self.MCP_WRITE = 0x02
        self.MCP_READ = 0x03
        self.MCP_BITMOD = 0x05
        self.MCP_LOAD_TX0 = 0x40
        self.MCP_LOAD_TX1 = 0x42
        self.MCP_LOAD_TX2 = 0x44
        self.MCP_RTS_TX0 = 0x81
        self.MCP_RTS_TX1 = 0x82
        self.MCP_RTS_TX2 = 0x84
        self.MCP_RTS_ALL = 0x87
        self.MCP_READ_RX0 = 0x90
        self.MCP_READ_RX1 = 0x94
        self.MCP_READ_STATUS = 0xA0
        self.MCP_RX_STATUS = 0xB0
        self.MCP_RESET = 0xC0
        # CANCTRL Register Values
        self.MODE_NORMAL = 0x00
        self.MODE_SLEEP = 0x20
        self.MODE_LOOPBACK = 0x40
        self.MODE_LISTENONLY = 0x60
        self.MODE_CONFIG = 0x80
        self.MODE_POWERUP = 0xE0
        self.MODE_MASK = 0xE0
        self.ABORT_TX = 0x10
        self.MODE_ONESHOT = 0x08
        self.CLKOUT_ENABLE = 0x04
        self.CLKOUT_DISABLE = 0x00
        self.CLKOUT_PS1 = 0x00
        self.CLKOUT_PS2 = 0x01
        self.CLKOUT_PS4 = 0x02
        self.CLKOUT_PS8 = 0x03
        # CNF1 Register Values
        self.SJW1 = 0x00
        self.SJW2 = 0x40
        self.SJW3 = 0x80
        self.SJW4 = 0xC0
        # CNF2 Register Values
        self.BTLMODE = 0x80
        self.SAMPLE_1X = 0x00
        self.SAMPLE_3X = 0x40
        # CNF3 Register Values
        self.SOF_ENABLE = 0x80
        self.SOF_DISABLE = 0x00
        self.WAKFIL_ENABLE = 0x40
        self.WAKFIL_DISABLE = 0x00
        # CANINTF Register Bits
        self.MCP_RX0IF = 0x01
        self.MCP_RX1IF = 0x02
        self.MCP_TX0IF = 0x04
        self.MCP_TX1IF = 0x08
        self.MCP_TX2IF = 0x10
        self.MCP_ERRIF = 0x20
        self.MCP_WAKIF = 0x40
        self.MCP_MERRF = 0x80

    def recvThread(self):
        """
        接收线程处理函数
        :return:None
        """
        while self.startrecv:  # 当接收开始时有效
            try:
                if self.can2flag:  # 当can2有效时
                    msg = self.ObjMcp2515_MainDeal(2)
                    # 当数据接收时,调用有效的回调函数
                    if msg != None and self.callback != None:
                        self.callback(msg)
                if self.can3flag:  # 当can3有效时
                    msg = self.ObjMcp2515_MainDeal(3)
                    # 当数据接收时,调用有效的回调函数
                    if msg != None and self.callback != None:
                        self.callback(msg)
                if not self.send_q_can2.empty():  # 当发送数据有效时
                    msg = self.send_q_can2.get()  # 获取数据
                    self.PhyMcp2515_CANTx(2, msg)
                if not self.send_q_can3.empty():  # 当发送数据有效时
                    msg = self.send_q_can3.get()  # 获取数据
                    self.PhyMcp2515_CANTx(3, msg)
            except Exception as e:
                print("recvThread():", e)
            finally:  # 正常延时4ms
                time.sleep(0.004)

    def openCan2(self):
        """
        打开can2口
        :return:
        """
        try:
            self.spi1.open(self.spi_bus, self.spi1_device)  # 连接到指定的spi设备     /dev/spidev<2>.<0>
            self.spi1.max_speed_hz = self.max_speed_hz  # 通讯速率
            self.spi1.mode = self.mode  # 使用模式0方式
            self.spi1.bits_per_word = self.bits_per_word  # 字节位数
            self.spi1.no_cs = False  # 设置 SPI_NO_CS标志是否使用CS
            self.spi1.threewire = False  # 共享SI/SO信号
            time.sleep(0.2)  # 等待0.2秒
            # 配置寄存器部分:
            self.PhyMcp2515_PeriphInit(2)  # 初始化
        except Exception as e:
            print("openCan2():", e)

    def openCan3(self):
        """
        打开can3口
        :return:
        """
        try:
            self.spi2.open(self.spi_bus, self.spi2_device)  # 连接到指定的spi设备     /dev/spidev<2>.<1>
            self.spi2.max_speed_hz = self.max_speed_hz  # 通讯速率
            self.spi2.mode = self.mode  # 使用模式0方式
            self.spi2.bits_per_word = self.bits_per_word  # 字节位数
            self.spi2.no_cs = False  # 设置 SPI_NO_CS标志是否使用CS
            self.spi2.threewire = False  # 共享SI/SO信号
            time.sleep(0.2)  # 等待0.2秒
            # 配置寄存器部分:
            self.PhyMcp2515_PeriphInit(3)  # 初始化
        except Exception as e:
            print("openCan3():", e)

    # 通知回调
    def Notifier(self, can2Flag=False, can3Flag=False, callbackFunc=None):
        """
        通知回调
        注:使用线程监听数据
        :param can2Flag: can2读取标记,默认False
        :param can3Flag: can3读取标记,默认False
        :param callbackFunc:回调函数,def callbackFunc(message):pass
        :return:None
        """
        try:
            self.can2flag = can2Flag  # 接收CAN2标记
            self.can3flag = can3Flag  # 接收CAN2标记
            self.callback = callbackFunc  # 回调函数
        except Exception as e:
            print("Notifier():", e)

    def startNotifier(self):
        """
        开始接收处理
        :return: None
        """
        try:
            # if self.recv_thread==None or not self.recv_thread.is_alive():# 当线程无效时创建并开启,否则不创建线程
            #    self.recv_thread = Thread(target=self.recvThread, daemon=True)  # 接收线程
            #    self.startrecv = True  # 开始接收标记为真
            #    self.recv_thread.start()  # 开启接收线程
            pass
        except Exception as e:
            print("startNotifier():", e)

    def stoptNotifier(self):
        """
        停止接收处理
        :return: None
        """
        try:
            ####self.startrecv=False# 开始接收标记为假,线程正常退出
            self.can2flag = False  # can2不接收
            self.can3flag = False  # can3不接收
        except Exception as e:
            print("stoptNotifier():", e)

    def can2Write(self, ID: int = None, data: list = None):
        """
        写入CAN2数据
        :param ID: canID
        :param data: list
        :return: True/False
        """
        result = False
        try:
            if isinstance(ID, int) and isinstance(data, list):  # 当数据有效时
                msg = MCP2515MSG()  # 定义消息对象
                # 赋值消息
                msg.ul_id = id
                msg.uc_exId = False
                msg.uc_dlc = 8
                msg.uc_dta = data
                msg.channel = "can2"
                self.send_q_can2.put(msg)  # 发送到消息队列
                result = True  # 赋值为真
            else:  # 否则为假
                result = False
        except Exception as e:
            print("can2Write():", e)
        finally:
            return result

    def can3Write(self, ID: int = None, data: list = None):
        """
        写入CAN2数据
        :param ID: canID
        :param data: list
        :return: True/False
        """
        result = False
        try:
            if isinstance(ID, int) and isinstance(data, list):  # 当数据有效时
                msg = MCP2515MSG()  # 定义消息对象
                # 赋值消息
                msg.ul_id = id
                msg.uc_exId = False
                msg.uc_dlc = 8
                msg.uc_dta = data
                msg.channel = "can3"
                self.send_q_can3.put(msg)  # 发送到消息队列
                result = True  # 赋值为真
            else:  # 否则为假
                result = False
        except Exception as e:
            print("can3Write():", e)
        finally:
            return result

    def closeCan2(self):
        """
        关闭can2口
        :return:None
        """
        try:
            self.spi1.close()  # 关闭处理
        except Exception as e:
            print("closeCan2():", e)

    def closeCan3(self):
        """
        关闭can3口
        :return:None
        """
        try:
            self.spi2.close()  # 关闭处理
        except Exception as e:
            print("closeCan3():", e)

    def PhyMcp2515_PeriphInit(self, canNum: int = None):
        """
        初始化函数
        :param canNum:can号
        :return:None
        """
        try:
            if canNum != None and canNum in (2, 3):
                self.PhyMcp2515_McpConfig(canNum, canSpeed=100)  # 定义为100kbps
        except Exception as e:
            print("PhyMcp2515_PeriphInit():", e)

    def PhyMcp2515_ReadWrite(self, canNum=None, canData: list = None, length: int = None):
        """
        读写函数
        :param canNum:can号
        :param canData:数据列表
        :param length:数据长度
        :return:data_list/[]
        """
        try:
            if canNum != None and isinstance(canData, list) and isinstance(length, int) and canNum in (2, 3):
                cData = copy.deepcopy(canData)  # 拷贝数据
                if canNum == 2:  # can2
                    # print("PhyMcp2515_ReadWrite():", cData)
                    result = self.spi1.xfer(cData)  # 同时读写
                    # result = self.spi1.xfer(cData+[self.max_speed_hz,self.delay_usec,self.bits_per_word])#同时读写
                else:  # can3
                    # print("PhyMcp2515_ReadWrite():", cData)
                    result = self.spi2.xfer(cData)  # 同时读写
                    # result = self.spi2.xfer(cData+[self.max_speed_hz,self.delay_usec,self.bits_per_word])#同时读写
                # print("PhyMcp2515_ReadWrite()->result:",result)
                return result
            else:
                return []
        except Exception as e:
            print("PhyMcp2515_ReadWrite():", e)
            return []

    def PhyMcp2515_Reset(self, canNum=None):
        """
        复位
        :param canNum:can号
        :return:None
        """
        try:
            if canNum != None and canNum in (2, 3):
                self.PhyMcp2515_ReadWrite(canNum, [self.MCP_RESET], 1)
        except Exception as e:
            print("PhyMcp2515_Reset():", e)

    def PhyMcp2515_ReadRegister(self, canNum, address):
        """
        读寄存器
        :param canNum:can号
        :param address:地址
        :return:data单值/0
        """
        try:
            if canNum != None and canNum in (2, 3):
                result = self.PhyMcp2515_ReadWrite(canNum, [self.MCP_READ, address, 0x00], 3)
                if result != None and len(result) != 0:
                    return result[2]
                else:
                    return 0
        except Exception as e:
            print("PhyMcp2515_ReadRegister():", e)

    def PhyMcp2515_SetRegister(self, canNum, address, value):
        """
        设置寄存器
        :param canNum:can号
        :param address:地址
        :param value:value
        :return:None
        """
        try:
            if canNum != None and isinstance(address, int) and isinstance(value, int) and canNum in (2, 3):
                self.PhyMcp2515_ReadWrite(canNum, [self.MCP_WRITE, address, value], 3)
        except Exception as e:
            print("PhyMcp2515_SetRegister():", e)

    def PhyMcp2515_ConfigRate(self, canNum, canBps):
        """
        配置速率(8M晶振)
        100kpbs:MCP_CNF1,MCP_CNF2,MCP_CNF3->0x01,0xBD,0x04
        125kpbs:MCP_CNF1,MCP_CNF2,MCP_CNF3->0x01,0xBA,0x03
        200kpbs:MCP_CNF1,MCP_CNF2,MCP_CNF3->0x00,0xBD,0x04
        250kpbs:MCP_CNF1,MCP_CNF2,MCP_CNF3->0x00,0xBA,0x03
        400kpbs:MCP_CNF1,MCP_CNF2,MCP_CNF3->0x00,0xA1,0x01
        500kpbs:MCP_CNF1,MCP_CNF2,MCP_CNF3->0x00,0x91,0x01
        配置速率(16M晶振)
        100kpbs:MCP_CNF1,MCP_CNF2,MCP_CNF3->0x03,0xBD,0x04
        125kpbs:MCP_CNF1,MCP_CNF2,MCP_CNF3->0x03,0xBA,0x03
        200kpbs:MCP_CNF1,MCP_CNF2,MCP_CNF3->0x01,0xBD,0x04
        250kpbs:MCP_CNF1,MCP_CNF2,MCP_CNF3->0x01,0xBA,0x03
        400kpbs:MCP_CNF1,MCP_CNF2,MCP_CNF3->0x00,0xBD,0x04
        500kpbs:MCP_CNF1,MCP_CNF2,MCP_CNF3->0x00,0xBA,0x03
        800kpbs:MCP_CNF1,MCP_CNF2,MCP_CNF3->0x00,0xA1,0x01
        1000kpbs:MCP_CNF1,MCP_CNF2,MCP_CNF3->0x00,0x98,0x01
        :param canNum:can号
        :param canBps:波特率
        :return:None
        """
        try:
            if canNum != None and isinstance(canBps, int) and canNum in (2, 3):
                # if canBps!=100:
                #    canBps=100
                # 默认100kbps,使用“CAN 波特率计算器(MCP2515) 1.3“工具计算
                self.PhyMcp2515_SetRegister(canNum, self.MCP_CNF1, 0x01)
                self.PhyMcp2515_SetRegister(canNum, self.MCP_CNF2, 0xBD)
                self.PhyMcp2515_SetRegister(canNum, self.MCP_CNF3, 0x04)
        except Exception as e:
            print("PhyMcp2515_ConfigRate():", e)

    def PhyMcp2515_ModifyRegister(self, canNum, address, mask, data):
        """
        修改寄存器
        :param canNum:can号
        :param address:地址
        :param mask:掩码
        :param data:数据
        :return:None
        """
        try:
            if canNum != None and isinstance(address, int) and isinstance(mask, int) and isinstance(data,
                                                                                                    int) and canNum in (
            2, 3):
                self.PhyMcp2515_ReadWrite(canNum, [self.MCP_BITMOD, address, mask, data], 4)
        except Exception as e:
            print("PhyMcp2515_ModifyRegister():", e)

    def PhyMcp2515_SetCanCtrlMode(self, canNum, newmode):
        """
        设置can控制模式
        :param canNum:can号
        :param newmode:模式字
        :return:True/False
        """
        try:
            if canNum != None and canNum in (2, 3):
                self.PhyMcp2515_ModifyRegister(canNum, self.MCP_CANCTRL, self.MODE_MASK, newmode)
                i = self.PhyMcp2515_ReadRegister(canNum, self.MCP_CANCTRL)
                if (i & self.MODE_MASK) == newmode:
                    return True
                else:
                    return False
        except Exception as e:
            print("PhyMcp2515_SetCanCtrlMode():", e)

    def PhyMcp2515_SetRegisterS(self, canNum, address, values: list, n):
        """
        设置寄存器
        :param canNum:can号
        :param address:地址
        :param values:值列表
        :param n:数量
        :return:None
        """
        try:
            if canNum != None and isinstance(address, int) and isinstance(values, list) and isinstance(n,
                                                                                                       int) and canNum in (
            2, 3):
                data = [self.MCP_WRITE, address] + values
                self.PhyMcp2515_ReadWrite(canNum, data, n + 2)
        except Exception as e:
            print("PhyMcp2515_SetRegisterS():", e)

    def PhyMcp2515_WriteCanId(self, canNum=None, mcpAddr=None, is_ext=None, can_id=None):
        """
        写canID
        :param canNun:can号
        :param mcpAddr:地址
        :param is_ext:是否为扩展
        :param can_id:canID
        :return:None
        """
        try:
            if canNum != None and isinstance(mcpAddr, int) and isinstance(is_ext, bool) and isinstance(can_id,
                                                                                                       int) and canNum in (
            2, 3):
                canid = (can_id & 0xffff)
                if is_ext:  # 扩展id
                    data = [0, 0, 0, 0]
                    data[self.MCP_EID0] = (canid & 0xff)
                    data[self.MCP_EID8] = ((canid >> 8) & 0xff)
                    canid = canid // 0x10000
                    data[self.MCP_SIDL] = (canid & 0x03)
                    data[self.MCP_SIDL] += (canid & 0x1c) * 8
                    data[self.MCP_SIDL] |= self.MCP_TXB_EXIDE_M
                    data[self.MCP_SIDH] = canid // 32
                else:  # 标准id
                    data = [0, 0, 0, 0]
                    data[self.MCP_SIDH] = (canid // 8)
                    data[self.MCP_SIDL] = ((canid & 0x07) * 32)
                    data[self.MCP_EID0] = 0
                    data[self.MCP_EID8] = 0
                self.PhyMcp2515_SetRegisterS(canNum, mcpAddr, data, 4)
        except Exception as e:
            print("PhyMcp2515_WriteCanId():", e)

    def PhyMcp2515_InitCanBuffers(self, canNum):
        """
        初始化CAN buffer
        :param canNum:can号
        :return:None
        """
        try:
            if canNum != None and canNum in (2, 3):
                # 设置接收屏蔽寄存器
                self.PhyMcp2515_WriteCanId(canNum, self.MCP_RXM0SIDH, 1, 0x0)
                self.PhyMcp2515_WriteCanId(canNum, self.MCP_RXM1SIDH, 1, 0x0)
                # 设置接收过滤寄存器
                self.PhyMcp2515_WriteCanId(canNum, self.MCP_RXF0SIDH, 1, 0x0)
                self.PhyMcp2515_WriteCanId(canNum, self.MCP_RXF1SIDH, 1, 0x0)
                self.PhyMcp2515_WriteCanId(canNum, self.MCP_RXF2SIDH, 1, 0x0)
                self.PhyMcp2515_WriteCanId(canNum, self.MCP_RXF3SIDH, 1, 0x0)
                self.PhyMcp2515_WriteCanId(canNum, self.MCP_RXF4SIDH, 1, 0x0)
                self.PhyMcp2515_WriteCanId(canNum, self.MCP_RXF5SIDH, 1, 0x0)
                # 清除、停用三个传输缓冲器
                a1, a2, a3 = self.MCP_TXB0CTRL, self.MCP_TXB1CTRL, self.MCP_TXB2CTRL
                for i in range(14):
                    self.PhyMcp2515_SetRegister(canNum, a1, 0x00)
                    self.PhyMcp2515_SetRegister(canNum, a2, 0x00)
                    self.PhyMcp2515_SetRegister(canNum, a3, 0x00)
                    a1 += 1
                    a2 += 1
                    a3 += 1
                self.PhyMcp2515_SetRegister(canNum, self.MCP_RXB0CTRL, 0)
                self.PhyMcp2515_SetRegister(canNum, self.MCP_RXB1CTRL, 0)
        except Exception as e:
            print("PhyMcp2515_InitCanBuffers():", e)

    def PhyMcp2515_ResetCanBuffers(self, canNum):
        """
        初始化CAN buffer,复位时使用
        :param canNum:can号
        :return:None
        """
        try:
            if canNum != None and canNum in (2, 3):
                # RXM0\RXM1
                self.PhyMcp2515_ReadWrite(canNum,
                                          [self.MCP_WRITE, self.MCP_RXM0SIDH, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
                                           0x00], 10)
                # RXF0-RXF2
                self.PhyMcp2515_ReadWrite(canNum,
                                          [self.MCP_WRITE, self.MCP_RXM0SIDH, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
                                           0x00, 0x00, 0x00, 0x00, 0x00], 14)
                # RXF3-RXF5
                self.PhyMcp2515_ReadWrite(canNum,
                                          [self.MCP_WRITE, self.MCP_RXF3SIDH, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
                                           0x00, 0x00, 0x00, 0x00, 0x00], 14)
                # TXB0
                self.PhyMcp2515_ReadWrite(canNum,
                                          [self.MCP_WRITE, self.MCP_TXB0CTRL, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
                                           0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00], 16)
                # TXB1
                self.PhyMcp2515_ReadWrite(canNum,
                                          [self.MCP_WRITE, self.MCP_TXB1CTRL, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
                                           0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00], 16)
                # TXB2
                self.PhyMcp2515_ReadWrite(canNum,
                                          [self.MCP_WRITE, self.MCP_TXB2CTRL, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
                                           0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00], 16)
                self.PhyMcp2515_SetRegister(canNum, self.MCP_RXB0CTRL, 0)
                self.PhyMcp2515_SetRegister(canNum, self.MCP_RXB1CTRL, 0)
        except Exception as e:
            print("PhyMcp2515_ResetCanBuffers():", e)

    def PhyMcp2515_McpConfig(self, canNum, canSpeed):
        """
        配置函数
        :param canNum:can号
        :param canSpeed:can口速率
        :return:None
        """
        try:
            if canNum != None and canNum in (2, 3):
                self.PhyMcp2515_Reset(canNum)
                time.sleep(0.5)
                res = self.PhyMcp2515_SetCanCtrlMode(canNum, self.MODE_CONFIG)  # 进入配置模式
                if res == False:
                    return
                self.PhyMcp2515_ConfigRate(canNum, canSpeed)  # 配置波特率
                self.PhyMcp2515_InitCanBuffers(canNum)  # 初始化数据区
                self.PhyMcp2515_ModifyRegister(canNum, self.MCP_RXB0CTRL, self.MCP_RXB_RX_MASK | self.MCP_RXB_BUKT_MASK,
                                               self.MCP_RXB_RX_ANY | self.MCP_RXB_BUKT_MASK)  # 修改接收控制寄存器
                self.PhyMcp2515_ModifyRegister(canNum, self.MCP_RXB1CTRL, self.MCP_RXB_RX_MASK,
                                               self.MCP_RXB_RX_ANY)  # 修改接收控制寄存器
                self.PhyMcp2515_SetRegister(canNum, self.MCP_CANINTE, self.MCP_RX_INT)  # 打开接收中断
                self.PhyMcp2515_SetCanCtrlMode(canNum, self.MODE_NORMAL)  # 进入正常模式
        except Exception as e:
            print("PhyMcp2515_McpConfig():", e)

    def PhyMcp2515_RxMegDeal(self, canNum):
        """
        读消息合约函数
        :param canNum:can号
        :return:message对象/None
        """
        try:
            if canNum != None and canNum in (2, 3):
                msg = MCP2515MSG()  # 定义消息对象
                uc_PhyMcp2515_canIntf = self.PhyMcp2515_ReadRegister(canNum, self.MCP_CANINTF)
                uc_intFlag = copy.deepcopy(uc_PhyMcp2515_canIntf)
                if ((uc_PhyMcp2515_canIntf & 0x01) == 0x01):
                    uc_intFlag &= ~0x01
                    data = [self.MCP_READ_RX0] + [0x00] * 13
                    PhyMcp2515_BufferRx = self.PhyMcp2515_ReadWrite(canNum, data, 14)  # 接收数据
                    if len(PhyMcp2515_BufferRx) == 0:
                        return
                    if (PhyMcp2515_BufferRx[2] & self.MCP_RXB_IDE_M) != self.MCP_RXB_IDE_M:  # 标准数据
                        msg.ul_id = (PhyMcp2515_BufferRx[1] << 3) | ((PhyMcp2515_BufferRx[2] >> 5) & 0x07)
                        msg.uc_exId = 0
                    else:  # 扩展帧
                        msg.ul_id = PhyMcp2515_BufferRx[4]
                        msg.ul_id |= ((PhyMcp2515_BufferRx[3]) << 8)
                        msg.ul_id |= ((PhyMcp2515_BufferRx[2] & 0x03) << 16)
                        msg.ul_id |= ((PhyMcp2515_BufferRx[2] & 0xe0) << 13)
                        msg.ul_id |= ((PhyMcp2515_BufferRx[1]) << 21)
                        msg.uc_exId = 1  # 扩展帧
                    msg.uc_dlc = (PhyMcp2515_BufferRx[5] & 0x0f)
                    for i in range(8):
                        msg.uc_dta[i] = PhyMcp2515_BufferRx[6 + i]
                    msg.channel = "CAN" + str(canNum)  # 赋值通道号
                if ((uc_PhyMcp2515_canIntf & 0x02) == 0x02):
                    uc_intFlag &= ~0x02
                    data = [self.MCP_READ_RX1] + [0x00] * 13
                    PhyMcp2515_BufferRx = self.PhyMcp2515_ReadWrite(canNum, data, 14)  # 接收数据
                    if len(PhyMcp2515_BufferRx) == 0:
                        return
                    if (PhyMcp2515_BufferRx[2] & self.MCP_RXB_IDE_M) != self.MCP_RXB_IDE_M:  # 标准数据
                        msg.ul_id = (PhyMcp2515_BufferRx[1] << 3) | ((PhyMcp2515_BufferRx[2] >> 5) & 0x07)
                        msg.uc_exId = 0
                    else:  # 扩展帧
                        msg.ul_id = PhyMcp2515_BufferRx[4]
                        msg.ul_id |= ((PhyMcp2515_BufferRx[3]) << 8)
                        msg.ul_id |= ((PhyMcp2515_BufferRx[2] & 0x03) << 16)
                        msg.ul_id |= ((PhyMcp2515_BufferRx[2] & 0xe0) << 13)
                        msg.ul_id |= ((PhyMcp2515_BufferRx[1]) << 21)
                        msg.uc_exId = 1  # 扩展帧
                    msg.uc_dlc = (PhyMcp2515_BufferRx[5] & 0x0f)
                    for i in range(8):
                        msg.uc_dta[i] = PhyMcp2515_BufferRx[6 + i]
                    msg.channel = "CAN" + str(canNum)  # 赋值通道号
                uc_intFlag &= 0x03
                self.PhyMcp2515_ModifyRegister(canNum, self.MCP_CANINTF, uc_intFlag, self.MCP_NO_INT)  # 清除中断标志
                # self.PhyMcp2515_ReadWrite(canNum, [self.MCP_WRITE, self.MCP_CANINTF, self.MCP_NO_INT],3)  # 清除中断标志
                # self.PhyMcp2515_ReadWrite(canNum,[self.MCP_WRITE,self.MCP_CANINTF,uc_intFlag,self.MCP_NO_INT],4)# 清除中断标志,此句有些问题,于指令不符
                if msg.channel == None:  # 当通道为空时,返回None
                    return None
                else:  # 否则,返回消息
                    return msg
            else:
                return None
        except Exception as e:
            print("PhyMcp2515_RxMegDeal():", e)
            return None

    def PhyMcp2515_TxMegDeal(self, canNum, canId, canExtIDE: bool, canDlc, canData: list):
        """
        写消息合约函数
        :param canNum:can号
        :param canId:canID
        :param canExtIDE:是否为扩展
        :param canDlc:数据长度
        :param canData:数据列表
        :return:None
        """
        try:
            if canNum != None and isinstance(canId, int) and isinstance(canExtIDE, bool) and isinstance(canData,list) and len(canData) >= 8 and canNum in (2, 3):  # 数据条件判断
                IsIDLE = (self.PhyMcp2515_ReadRegister(canNum, 0x30) & 0x08) | (
                            self.PhyMcp2515_ReadRegister(canNum, 0x40) & 0x08) | (
                                     self.PhyMcp2515_ReadRegister(canNum, 0x50) & 0x08)
                count = 0  # 计数变量
                while ((IsIDLE >> 3) == 1):  # wait for finish sending
                    time.sleep(0.001)  # 延时1ms
                    IsIDLE = (self.PhyMcp2515_ReadRegister(canNum, 0x30) & 0x08) | (
                                self.PhyMcp2515_ReadRegister(canNum, 0x40) & 0x08) | (
                                         self.PhyMcp2515_ReadRegister(canNum, 0x50) & 0x08)
                    count += 1
                    if count > 3:  # 记录三次直接返回
                        raise IOError("can bus transfer error!")
                PhyMcp2515_BufferTx = []
                PhyMcp2515_BufferTx.append(self.MCP_LOAD_TX0)
                if not canExtIDE:  # 标准帧
                    PhyMcp2515_BufferTx.append(((canId >> 3) & 0xff))
                    PhyMcp2515_BufferTx.append((((canId & 0x07) << 5) | ((0x00 & 0x01) << 3) | ((0x00 >> 16) & 0x03)))
                    PhyMcp2515_BufferTx.append(((0x00 >> 8) & 0xff))
                    PhyMcp2515_BufferTx.append((0x00 & 0xff))
                else:  # 扩展帧,MSB (SID10~SID0,EID17~EID0) LSB
                    PhyMcp2515_BufferTx.append(((canId >> 21) & 0xff))
                    PhyMcp2515_BufferTx.append(
                        (((canId >> 13) & 0xe0) | (self.MCP_TXB_EXIDE_M) | ((canId >> 16) & 0x03)))
                    PhyMcp2515_BufferTx.append(((canId >> 8) & 0xff))
                    PhyMcp2515_BufferTx.append((canId & 0xff))
                PhyMcp2515_BufferTx.append(((0x00 & 0x01) << 6) | (canDlc & 0x0f))
                PhyMcp2515_BufferTx += canData[0:8]
                self.PhyMcp2515_ReadWrite(canNum, PhyMcp2515_BufferTx, 6 + canDlc)
                self.PhyMcp2515_ReadWrite(canNum, [self.MCP_RTS_TX0], 1)  # MCP_TXB0发送请求
        except Exception as e:
            print("PhyMcp2515_TxMegDeal():", e)

    def PhyMcp2515_ErrDeal(self, canNum):
        """
        错误合约
        :param canNum:can号
        :return:None
        """
        try:
            self.PhyMcp2515_PeriphInit(canNum)#调用初始化函数
        except Exception as e:
            print("PhyMcp2515_ErrDeal():", e)

    def PhyMcp2515_CANTx(self, canNum, TxMessage: MCP2515MSG):
        """
        can发送数据,只发送标准帧
        :return:None
        """
        try:
            if canNum != None and isinstance(TxMessage, MCP2515MSG) and canNum in (2, 3):
                self.PhyMcp2515_TxMegDeal(canNum, TxMessage.ul_id, False, TxMessage.uc_dlc, TxMessage.uc_dta)  # 标准数据
                # self.PhyMcp2515_TxMegDeal(canNum, TxMessage.ul_id, True, TxMessage.uc_dlc, TxMessage.uc_dta)  # 扩展数据
        except Exception as e:
            print("PhyMcp2515_CANTx():", e)

    def ObjMcp2515_MainDeal(self, canNum):
        """
        can接收数据
        :param canNum:can口
        :return:message/None
        """
        try:
            if canNum != None and canNum in (2, 3):
                msg = self.PhyMcp2515_RxMegDeal(canNum)
                if msg != None:  # 当消息有效时进入
                    return msg
        except Exception as e:
            print("ObjMcp2515_MainDeal():", e)


def can_calback(message: MCP2515MSG):
    """
    自定义can接收回调函数
    :param message:消息变量
    :return:None
    """
    try:
        print("can_calback():", hex(message.ul_id), bool(message.uc_exId), message.uc_dlc, message.uc_dta)
    except Exception as e:
        print("can_calback():", e)


# 调试
if __name__ == "__main__":
    mycan = myCAN()  # 创建CAN口对象
    # 打开can口:
    # mycan.openCan2()
    mycan.openCan3()
    id, can_d = 0x10, [1, 2, 3, 4, 5, 6, 7, 8]
    # 读取can口:
    # mycan.Notifier(can2Flag=True, callbackFunc=can_calback)  # 定义接收通知
    mycan.Notifier(can3Flag=True, callbackFunc=can_calback)  # 定义接收通知
    # mycan.startNotifier()  # 开始接收数据
    count = 10
    while count:
        # 写入can口:
        # mycan.can2Write(id,can_d)
        mycan.can3Write(id, can_d)
        count -= 1
        time.sleep(0.05)
    count = 50
    while count:  # 有限次循环
        count -= 1
        time.sleep(0.2)
    # 关闭can口:
    mycan.stoptNotifier()  # 停止接收数据
    # mycan.closeCan2()# 关闭
    mycan.closeCan3()  # 关闭

测试结果

forlinx@ok3568:~$ cd test/
forlinx@ok3568:~/test$ sudo python3 can_spi_driver.py 
can_calback(): 0x7e0 False 8 [1, 2, 3, 4, 5, 6, 7, 8]
can_calback(): 0x7e0 False 8 [1, 2, 3, 4, 5, 6, 7, 8]
can_calback(): 0x7e0 False 8 [1, 2, 3, 4, 5, 6, 7, 8]
can_calback(): 0x7e0 False 8 [1, 2, 3, 4, 5, 6, 7, 8]
can_calback(): 0x7e0 False 8 [1, 2, 3, 4, 5, 6, 7, 8]
can_calback(): 0x7e0 False 8 [1, 2, 3, 4, 5, 6, 7, 8]
can_calback(): 0x7e0 False 8 [1, 2, 3, 4, 5, 6, 7, 8]
can_calback(): 0x7e0 False 8 [1, 2, 3, 4, 5, 6, 7, 8]
can_calback(): 0x7e0 False 8 [1, 2, 3, 4, 5, 6, 7, 8]
can_calback(): 0x7e0 False 8 [1, 2, 3, 4, 5, 6, 7, 8]



四)can1测试端代码

#!/usr/bin/env python
# coding: utf-8

"""
spi转can接收数据端(CAN1)处理 测试程序
编写: 宋连猛 2022.9.28
"""
import can
import threading
import random
import os
import time
from queue import Queue

#设备初始化,暂时不用
#os.system("sudo ip link set can0 down")
os.system("sudo ip link set can1 down")
#设置为canfd,使用时更改波特率和can2.0B,Message类中:FD模式有F标记,正常模式无标记,FD模式下速率貌似是采样出来的,不受到两个速率错误影响
#即使是CAN2.0B模式,两个设备全部为FD接口也可通过采样正确获取数据
#os.system("sudo ip link set can0 up type can bitrate 100000 sample-point 0.75 dbitrate 5000000 dsample-point 0.8 fd on")
os.system("sudo ip link set can1 up type can bitrate 100000 sample-point 0.75 dbitrate 5000000 dsample-point 0.8 fd on")
#os.system("sudo ip link set can0 up")
os.system("sudo ip link set can1 up")

#定义全局变量
q_data=Queue()#队列,用于数据返回

#定义CAN口:收发可以为统一个接口
#CAN2.0B,100kbps
bus_tx = can.interface.Bus(bustype='socketcan', channel='can1', bitrate=100000,fd=False)
bus_rx = can.interface.Bus(bustype='socketcan', channel='can1', bitrate=100000,fd=False)#接收别人的数据
#CANFD,100kbps和5Mpbs
#bus_tx = can.interface.Bus(bustype='socketcan', channel='can1', bitrate=100000,dbitrate=5000000,fd=True)
#bus_rx = can.interface.Bus(bustype='socketcan', channel='can1', bitrate=100000,dbitrate=5000000,fd=True)
def message(message):
    if message!=None and message.is_rx:#当数据有效且是读数据时
        print("message=",message.is_extended_id,message.dlc,message.arbitration_id,message.data)#打印数据
        q_data.put(message.data)#放入发送队列
#定义监听器
listeners = [
        #can.Printer(),  # Callback function, print the received messages
        message,#消息处理
    ]
notifier = can.Notifier([bus_rx], listeners)

#发送处理类
class tx_thread_cl:
    def __init__(self, bus,q_data):
        self.bus = bus
        self.running = True
        self.thread = threading.Thread(target=self.tx_callback, args=(self.bus,))
        self.finished = False
        self.q_data=q_data#队列对象

    def start(self):
        self.thread.start()

    def tx_callback(self, bus):
        while self.running:
            data =self.q_data.get()#获得数据
            msg = can.Message(is_extended_id=False, arbitration_id=0x7E0, data=data,is_fd=False)#CAN2.0B 标准数据
            #msg = can.Message(is_extended_id=True, arbitration_id=0x7E0, data=data, is_fd=False)  # CAN2.0B 扩展数据
            #msg = can.Message(is_extended_id=False, arbitration_id=0x7E0, data=data, is_fd=True)#CANFD,标准数据
            print("msg=",msg)
            bus.send(msg)
            time.sleep(0.5)
        self.finished = True

    def stop(self):
        self.running = False


if __name__ == '__main__':
    if __name__ == "__main__":
        tx_service = tx_thread_cl(bus_tx,q_data)# 定义发送类
        tx_service.start()# 开始发送,每0.5秒一次
        # 等待用户输入退出
        running = True
        while running:
            input()
            running = False

        while not tx_service.finished:
            tx_service.stop()
        # 停止通知
        notifier.stop()
        # 停止总线
        bus_tx.shutdown()
        bus_rx.shutdown()

测试结果

forlinx@ok3568:~/test$ sudo python3 can_test_with_spiToCan.py 
message= False 8 16 bytearray(b'\x01\x02\x03\x04\x05\x06\x07\x08')
msg= Timestamp:        0.000000        ID: 07e0    S Rx                DL:  8    01 02 03 04 05 06 07 08
message= False 8 16 bytearray(b'\x01\x02\x03\x04\x05\x06\x07\x08')
message= False 8 16 bytearray(b'\x01\x02\x03\x04\x05\x06\x07\x08')
message= False 8 16 bytearray(b'\x01\x02\x03\x04\x05\x06\x07\x08')
message= False 8 16 bytearray(b'\x01\x02\x03\x04\x05\x06\x07\x08')
message= False 8 16 bytearray(b'\x01\x02\x03\x04\x05\x06\x07\x08')
message= False 8 16 bytearray(b'\x01\x02\x03\x04\x05\x06\x07\x08')
message= False 8 16 bytearray(b'\x01\x02\x03\x04\x05\x06\x07\x08')
message= False 8 16 bytearray(b'\x01\x02\x03\x04\x05\x06\x07\x08')
message= False 8 16 bytearray(b'\x01\x02\x03\x04\x05\x06\x07\x08')
msg= Timestamp:        0.000000        ID: 07e0    S Rx                DL:  8    01 02 03 04 05 06 07 08
msg= Timestamp:        0.000000        ID: 07e0    S Rx                DL:  8    01 02 03 04 05 06 07 08
msg= Timestamp:        0.000000        ID: 07e0    S Rx                DL:  8    01 02 03 04 05 06 07 08
msg= Timestamp:        0.000000        ID: 07e0    S Rx                DL:  8    01 02 03 04 05 06 07 08
msg= Timestamp:        0.000000        ID: 07e0    S Rx                DL:  8    01 02 03 04 05 06 07 08
msg= Timestamp:        0.000000        ID: 07e0    S Rx                DL:  8    01 02 03 04 05 06 07 08
msg= Timestamp:        0.000000        ID: 07e0    S Rx                DL:  8    01 02 03 04 05 06 07 08
msg= Timestamp:        0.000000        ID: 07e0    S Rx                DL:  8    01 02 03 04 05 06 07 08
msg= Timestamp:        0.000000        ID: 07e0    S Rx                DL:  8    01 02 03 04 05 06 07 08



结尾

网络上spi转can在mcu和linux 内核相关参考比较多,python3的实现不好找,希望此篇文章可以给大家一些思路。我是Simon,在这里期待您的关注。



版权声明:本文为weixin_43521467原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。