关于OK3568J-C在ubuntu20系统上使用python3支持MCP2515的SPI转CAN的实现
一)安装支持包
sudo apt install python3-pip
sudo pip install spidev
sudo pip install python-can
说明
飞凌的OK3568J-C有一个50针双列直插,有一个spi2带两个cs,分别代表:
/dev/spi2.0 ->spi2 cs0,公用SI,SO,CLK
/dev/spi2.1 ->spi2 cs1,公用SI,SO,CLK
板子自带两路CANFD,我们使用can1做为测试端,连线为:
can1 H ->MCP2515模块H
can1 L ->MCP2515模块L
spi转can驱动过程:
1)初始化:设置寄存器
2)读处理:使用线程监听数据,监听寄存器状态,有数据再读取
3)写操作:写入寄存器,对总线错误进行抛弃数据处理
三)spi转can代码实现
"""
用于完成MCP2515的spi转can2.0B的驱动python支持
编写:宋连猛 2022.09.28
使用:spidev模块,调用了linux spi驱动,配置项有:
#配置
max_speed_hz #通信时钟最大频率
mode #spi mode 0b00~0b11
no_cs #设置 SPI_NO_CS标志是否使用CS
threewire #共享SI/SO信号
bits_per_word #字节位数
#函数区别:
xfer([])/xfer2([])为同时写读的函数,使用了ioctl()调用SPI_IOC_MESSAGE(n)
writebytes([])/readbytes([])为单一功能函数,使用write()/read()完成
# spi说明:
spi2.0->spi2,cs0
spi2.1->spi2,cs1
spi0不可用
# can口说明:
1)正常数据发送:总线错误时等待正常再发送;此驱动数据发送:总线错误时丢弃发送数据
2)can总线无设备时,会报数据总线错误
"""
import spidev
import time
import copy
from threading import Thread
from queue import Queue
class MCP2515MSG(object):
"""
消息结构体
"""
def __init__(self):
self.uc_exId = 0 # 扩展ID,0或1
self.ul_id = 0 # ID
self.uc_dlc = 0 # 数据长度
self.uc_dta = [0x00] * 8 # 数据列表
self.uc_rtr = 0 # 远程帧标识,正常帧值为0x0,远程帧为0x2
self.uc_filhit = 0 # 过滤模式,总共有14中,定义有宏,其值依次为0x1,0x2,0x4,0x8,0x10,0x20,0x40...
self.channel = None # 通道号"can2","can3"
class myCAN(object):
"""
用于完成CAN2/3接口的spi数据收发
"""
def __init__(self):
self.spi1 = spidev.SpiDev() # 建立spi对象
self.spi2 = spidev.SpiDev() # 建立spi对象
self.max_speed_hz = 6000000 # 频率6000khz
self.delay_usec = 100 # 延时us,xfer使用
self.mode = 0b00 # 工作模式,3
self.bits_per_word = 8 # 字节位数
self.spi_bus = 2 # 总线号
self.spi1_device = 0 # 图纸中spi2,cs0
self.spi2_device = 1 # 图纸中spi2,cs1
self.startrecv = False # 开始接收,用于接收处理
self.recv_thread = None # 接收线程,默认为None
self.can2flag = False # 接收CAN2标记,默认假
self.can3flag = False # 接收CAN2标记,默认假
self.callback = None # 回调函数,默认None
self.send_q_can2 = Queue() # 定义发送队列,can2
self.send_q_can3 = Queue() # 定义发送队列,can3
self.defineVariable() # 定义变量
# 开启线程
self.recv_thread = Thread(target=self.recvThread, daemon=True) # 接收线程
self.startrecv = True # 开始接收标记为真
self.recv_thread.start() # 开启接收线程
def defineVariable(self):
"""
定义变量
:return:None
"""
# Begin mt
self.MCP_SIDH = 0
self.MCP_SIDL = 1
self.MCP_EID8 = 2
self.MCP_EID0 = 3
self.MCP_TXB_EXIDE_M = 0x08 # In TXBnSIDL
self.MCP_DLC_MASK = 0x0F # 4 LSBits
self.MCP_RTR_MASK = 0x40 # (1<<6) Bit 6
self.MCP_RXB_RX_ANY = 0x60
self.MCP_RXB_RX_EXT = 0x40
self.MCP_RXB_RX_STD = 0x20
self.MCP_RXB_RX_STDEXT = 0x00
self.MCP_RXB_RX_MASK = 0x60
self.MCP_RXB_BUKT_MASK = (1 << 2)
# Bits in the TXBnCTRL registers.
self.MCP_TXB_TXBUFE_M = 0x80
self.MCP_TXB_ABTF_M = 0x40
self.MCP_TXB_MLOA_M = 0x20
self.MCP_TXB_TXERR_M = 0x10
self.MCP_TXB_TXREQ_M = 0x08
self.MCP_TXB_TXIE_M = 0x04
self.MCP_TXB_TXP10_M = 0x03
self.MCP_TXB_RTR_M = 0x40 # In TXBnDLC
self.MCP_RXB_IDE_M = 0x08 # In RXBnSIDL
self.MCP_RXB_RTR_M = 0x40 # In RXBnDLC
self.MCP_STAT_RXIF_MASK = 0x03
self.MCP_STAT_RX0IF = (1 << 0)
self.MCP_STAT_RX1IF = (1 << 1)
self.MCP_EFLG_RX1OVR = (1 << 7)
self.MCP_EFLG_RX0OVR = (1 << 6)
self.MCP_EFLG_TXBO = (1 << 5)
self.MCP_EFLG_TXEP = (1 << 4)
self.MCP_EFLG_RXEP = (1 << 3)
self.MCP_EFLG_TXWAR = (1 << 2)
self.MCP_EFLG_RXWAR = (1 << 1)
self.MCP_EFLG_EWARN = (1 << 0)
self.MCP_EFLG_ERRORMASK = 0xF8 # 5 MS-Bits
# End mt
# Define MCP2515 register addresses
self.MCP_RXF0SIDH = 0x00
self.MCP_RXF0SIDL = 0x01
self.MCP_RXF0EID8 = 0x02
self.MCP_RXF0EID0 = 0x03
self.MCP_RXF1SIDH = 0x04
self.MCP_RXF1SIDL = 0x05
self.MCP_RXF1EID8 = 0x06
self.MCP_RXF1EID0 = 0x07
self.MCP_RXF2SIDH = 0x08
self.MCP_RXF2SIDL = 0x09
self.MCP_RXF2EID8 = 0x0A
self.MCP_RXF2EID0 = 0x0B
self.MCP_CANSTAT = 0x0E
self.MCP_CANCTRL = 0x0F
self.MCP_RXF3SIDH = 0x10
self.MCP_RXF3SIDL = 0x11
self.MCP_RXF3EID8 = 0x12
self.MCP_RXF3EID0 = 0x13
self.MCP_RXF4SIDH = 0x14
self.MCP_RXF4SIDL = 0x15
self.MCP_RXF4EID8 = 0x16
self.MCP_RXF4EID0 = 0x17
self.MCP_RXF5SIDH = 0x18
self.MCP_RXF5SIDL = 0x19
self.MCP_RXF5EID8 = 0x1A
self.MCP_RXF5EID0 = 0x1B
self.MCP_TEC = 0x1C
self.MCP_REC = 0x1D
self.MCP_RXM0SIDH = 0x20
self.MCP_RXM0SIDL = 0x21
self.MCP_RXM0EID8 = 0x22
self.MCP_RXM0EID0 = 0x23
self.MCP_RXM1SIDH = 0x24
self.MCP_RXM1SIDL = 0x25
self.MCP_RXM1EID8 = 0x26
self.MCP_RXM1EID0 = 0x27
self.MCP_CNF3 = 0x28
self.MCP_CNF2 = 0x29
self.MCP_CNF1 = 0x2A
self.MCP_CANINTE = 0x2B
self.MCP_CANINTF = 0x2C
self.MCP_EFLG = 0x2D
self.MCP_TXB0CTRL = 0x30
self.MCP_TXB1CTRL = 0x40
self.MCP_TXB2CTRL = 0x50
self.MCP_RXB0CTRL = 0x60
self.MCP_RXB0SIDH = 0x61
self.MCP_RXB1CTRL = 0x70
self.MCP_RXB1SIDH = 0x71
self.MCP_TX_INT = 0x1C # Enable all transmit interrupts
self.MCP_TX01_INT = 0x0C # Enable TXB0 and TXB1 interrupts
self.MCP_RX_INT = 0x03 # Enable receive interrupts
self.MCP_NO_INT = 0x00 # Disable all interrupts
self.MCP_TX01_MASK = 0x14
self.MCP_TX_MASK = 0x54
# Define SPI Instruction Set
self.MCP_WRITE = 0x02
self.MCP_READ = 0x03
self.MCP_BITMOD = 0x05
self.MCP_LOAD_TX0 = 0x40
self.MCP_LOAD_TX1 = 0x42
self.MCP_LOAD_TX2 = 0x44
self.MCP_RTS_TX0 = 0x81
self.MCP_RTS_TX1 = 0x82
self.MCP_RTS_TX2 = 0x84
self.MCP_RTS_ALL = 0x87
self.MCP_READ_RX0 = 0x90
self.MCP_READ_RX1 = 0x94
self.MCP_READ_STATUS = 0xA0
self.MCP_RX_STATUS = 0xB0
self.MCP_RESET = 0xC0
# CANCTRL Register Values
self.MODE_NORMAL = 0x00
self.MODE_SLEEP = 0x20
self.MODE_LOOPBACK = 0x40
self.MODE_LISTENONLY = 0x60
self.MODE_CONFIG = 0x80
self.MODE_POWERUP = 0xE0
self.MODE_MASK = 0xE0
self.ABORT_TX = 0x10
self.MODE_ONESHOT = 0x08
self.CLKOUT_ENABLE = 0x04
self.CLKOUT_DISABLE = 0x00
self.CLKOUT_PS1 = 0x00
self.CLKOUT_PS2 = 0x01
self.CLKOUT_PS4 = 0x02
self.CLKOUT_PS8 = 0x03
# CNF1 Register Values
self.SJW1 = 0x00
self.SJW2 = 0x40
self.SJW3 = 0x80
self.SJW4 = 0xC0
# CNF2 Register Values
self.BTLMODE = 0x80
self.SAMPLE_1X = 0x00
self.SAMPLE_3X = 0x40
# CNF3 Register Values
self.SOF_ENABLE = 0x80
self.SOF_DISABLE = 0x00
self.WAKFIL_ENABLE = 0x40
self.WAKFIL_DISABLE = 0x00
# CANINTF Register Bits
self.MCP_RX0IF = 0x01
self.MCP_RX1IF = 0x02
self.MCP_TX0IF = 0x04
self.MCP_TX1IF = 0x08
self.MCP_TX2IF = 0x10
self.MCP_ERRIF = 0x20
self.MCP_WAKIF = 0x40
self.MCP_MERRF = 0x80
def recvThread(self):
"""
接收线程处理函数
:return:None
"""
while self.startrecv: # 当接收开始时有效
try:
if self.can2flag: # 当can2有效时
msg = self.ObjMcp2515_MainDeal(2)
# 当数据接收时,调用有效的回调函数
if msg != None and self.callback != None:
self.callback(msg)
if self.can3flag: # 当can3有效时
msg = self.ObjMcp2515_MainDeal(3)
# 当数据接收时,调用有效的回调函数
if msg != None and self.callback != None:
self.callback(msg)
if not self.send_q_can2.empty(): # 当发送数据有效时
msg = self.send_q_can2.get() # 获取数据
self.PhyMcp2515_CANTx(2, msg)
if not self.send_q_can3.empty(): # 当发送数据有效时
msg = self.send_q_can3.get() # 获取数据
self.PhyMcp2515_CANTx(3, msg)
except Exception as e:
print("recvThread():", e)
finally: # 正常延时4ms
time.sleep(0.004)
def openCan2(self):
"""
打开can2口
:return:
"""
try:
self.spi1.open(self.spi_bus, self.spi1_device) # 连接到指定的spi设备 /dev/spidev<2>.<0>
self.spi1.max_speed_hz = self.max_speed_hz # 通讯速率
self.spi1.mode = self.mode # 使用模式0方式
self.spi1.bits_per_word = self.bits_per_word # 字节位数
self.spi1.no_cs = False # 设置 SPI_NO_CS标志是否使用CS
self.spi1.threewire = False # 共享SI/SO信号
time.sleep(0.2) # 等待0.2秒
# 配置寄存器部分:
self.PhyMcp2515_PeriphInit(2) # 初始化
except Exception as e:
print("openCan2():", e)
def openCan3(self):
"""
打开can3口
:return:
"""
try:
self.spi2.open(self.spi_bus, self.spi2_device) # 连接到指定的spi设备 /dev/spidev<2>.<1>
self.spi2.max_speed_hz = self.max_speed_hz # 通讯速率
self.spi2.mode = self.mode # 使用模式0方式
self.spi2.bits_per_word = self.bits_per_word # 字节位数
self.spi2.no_cs = False # 设置 SPI_NO_CS标志是否使用CS
self.spi2.threewire = False # 共享SI/SO信号
time.sleep(0.2) # 等待0.2秒
# 配置寄存器部分:
self.PhyMcp2515_PeriphInit(3) # 初始化
except Exception as e:
print("openCan3():", e)
# 通知回调
def Notifier(self, can2Flag=False, can3Flag=False, callbackFunc=None):
"""
通知回调
注:使用线程监听数据
:param can2Flag: can2读取标记,默认False
:param can3Flag: can3读取标记,默认False
:param callbackFunc:回调函数,def callbackFunc(message):pass
:return:None
"""
try:
self.can2flag = can2Flag # 接收CAN2标记
self.can3flag = can3Flag # 接收CAN2标记
self.callback = callbackFunc # 回调函数
except Exception as e:
print("Notifier():", e)
def startNotifier(self):
"""
开始接收处理
:return: None
"""
try:
# if self.recv_thread==None or not self.recv_thread.is_alive():# 当线程无效时创建并开启,否则不创建线程
# self.recv_thread = Thread(target=self.recvThread, daemon=True) # 接收线程
# self.startrecv = True # 开始接收标记为真
# self.recv_thread.start() # 开启接收线程
pass
except Exception as e:
print("startNotifier():", e)
def stoptNotifier(self):
"""
停止接收处理
:return: None
"""
try:
####self.startrecv=False# 开始接收标记为假,线程正常退出
self.can2flag = False # can2不接收
self.can3flag = False # can3不接收
except Exception as e:
print("stoptNotifier():", e)
def can2Write(self, ID: int = None, data: list = None):
"""
写入CAN2数据
:param ID: canID
:param data: list
:return: True/False
"""
result = False
try:
if isinstance(ID, int) and isinstance(data, list): # 当数据有效时
msg = MCP2515MSG() # 定义消息对象
# 赋值消息
msg.ul_id = id
msg.uc_exId = False
msg.uc_dlc = 8
msg.uc_dta = data
msg.channel = "can2"
self.send_q_can2.put(msg) # 发送到消息队列
result = True # 赋值为真
else: # 否则为假
result = False
except Exception as e:
print("can2Write():", e)
finally:
return result
def can3Write(self, ID: int = None, data: list = None):
"""
写入CAN2数据
:param ID: canID
:param data: list
:return: True/False
"""
result = False
try:
if isinstance(ID, int) and isinstance(data, list): # 当数据有效时
msg = MCP2515MSG() # 定义消息对象
# 赋值消息
msg.ul_id = id
msg.uc_exId = False
msg.uc_dlc = 8
msg.uc_dta = data
msg.channel = "can3"
self.send_q_can3.put(msg) # 发送到消息队列
result = True # 赋值为真
else: # 否则为假
result = False
except Exception as e:
print("can3Write():", e)
finally:
return result
def closeCan2(self):
"""
关闭can2口
:return:None
"""
try:
self.spi1.close() # 关闭处理
except Exception as e:
print("closeCan2():", e)
def closeCan3(self):
"""
关闭can3口
:return:None
"""
try:
self.spi2.close() # 关闭处理
except Exception as e:
print("closeCan3():", e)
def PhyMcp2515_PeriphInit(self, canNum: int = None):
"""
初始化函数
:param canNum:can号
:return:None
"""
try:
if canNum != None and canNum in (2, 3):
self.PhyMcp2515_McpConfig(canNum, canSpeed=100) # 定义为100kbps
except Exception as e:
print("PhyMcp2515_PeriphInit():", e)
def PhyMcp2515_ReadWrite(self, canNum=None, canData: list = None, length: int = None):
"""
读写函数
:param canNum:can号
:param canData:数据列表
:param length:数据长度
:return:data_list/[]
"""
try:
if canNum != None and isinstance(canData, list) and isinstance(length, int) and canNum in (2, 3):
cData = copy.deepcopy(canData) # 拷贝数据
if canNum == 2: # can2
# print("PhyMcp2515_ReadWrite():", cData)
result = self.spi1.xfer(cData) # 同时读写
# result = self.spi1.xfer(cData+[self.max_speed_hz,self.delay_usec,self.bits_per_word])#同时读写
else: # can3
# print("PhyMcp2515_ReadWrite():", cData)
result = self.spi2.xfer(cData) # 同时读写
# result = self.spi2.xfer(cData+[self.max_speed_hz,self.delay_usec,self.bits_per_word])#同时读写
# print("PhyMcp2515_ReadWrite()->result:",result)
return result
else:
return []
except Exception as e:
print("PhyMcp2515_ReadWrite():", e)
return []
def PhyMcp2515_Reset(self, canNum=None):
"""
复位
:param canNum:can号
:return:None
"""
try:
if canNum != None and canNum in (2, 3):
self.PhyMcp2515_ReadWrite(canNum, [self.MCP_RESET], 1)
except Exception as e:
print("PhyMcp2515_Reset():", e)
def PhyMcp2515_ReadRegister(self, canNum, address):
"""
读寄存器
:param canNum:can号
:param address:地址
:return:data单值/0
"""
try:
if canNum != None and canNum in (2, 3):
result = self.PhyMcp2515_ReadWrite(canNum, [self.MCP_READ, address, 0x00], 3)
if result != None and len(result) != 0:
return result[2]
else:
return 0
except Exception as e:
print("PhyMcp2515_ReadRegister():", e)
def PhyMcp2515_SetRegister(self, canNum, address, value):
"""
设置寄存器
:param canNum:can号
:param address:地址
:param value:value
:return:None
"""
try:
if canNum != None and isinstance(address, int) and isinstance(value, int) and canNum in (2, 3):
self.PhyMcp2515_ReadWrite(canNum, [self.MCP_WRITE, address, value], 3)
except Exception as e:
print("PhyMcp2515_SetRegister():", e)
def PhyMcp2515_ConfigRate(self, canNum, canBps):
"""
配置速率(8M晶振)
100kpbs:MCP_CNF1,MCP_CNF2,MCP_CNF3->0x01,0xBD,0x04
125kpbs:MCP_CNF1,MCP_CNF2,MCP_CNF3->0x01,0xBA,0x03
200kpbs:MCP_CNF1,MCP_CNF2,MCP_CNF3->0x00,0xBD,0x04
250kpbs:MCP_CNF1,MCP_CNF2,MCP_CNF3->0x00,0xBA,0x03
400kpbs:MCP_CNF1,MCP_CNF2,MCP_CNF3->0x00,0xA1,0x01
500kpbs:MCP_CNF1,MCP_CNF2,MCP_CNF3->0x00,0x91,0x01
配置速率(16M晶振)
100kpbs:MCP_CNF1,MCP_CNF2,MCP_CNF3->0x03,0xBD,0x04
125kpbs:MCP_CNF1,MCP_CNF2,MCP_CNF3->0x03,0xBA,0x03
200kpbs:MCP_CNF1,MCP_CNF2,MCP_CNF3->0x01,0xBD,0x04
250kpbs:MCP_CNF1,MCP_CNF2,MCP_CNF3->0x01,0xBA,0x03
400kpbs:MCP_CNF1,MCP_CNF2,MCP_CNF3->0x00,0xBD,0x04
500kpbs:MCP_CNF1,MCP_CNF2,MCP_CNF3->0x00,0xBA,0x03
800kpbs:MCP_CNF1,MCP_CNF2,MCP_CNF3->0x00,0xA1,0x01
1000kpbs:MCP_CNF1,MCP_CNF2,MCP_CNF3->0x00,0x98,0x01
:param canNum:can号
:param canBps:波特率
:return:None
"""
try:
if canNum != None and isinstance(canBps, int) and canNum in (2, 3):
# if canBps!=100:
# canBps=100
# 默认100kbps,使用“CAN 波特率计算器(MCP2515) 1.3“工具计算
self.PhyMcp2515_SetRegister(canNum, self.MCP_CNF1, 0x01)
self.PhyMcp2515_SetRegister(canNum, self.MCP_CNF2, 0xBD)
self.PhyMcp2515_SetRegister(canNum, self.MCP_CNF3, 0x04)
except Exception as e:
print("PhyMcp2515_ConfigRate():", e)
def PhyMcp2515_ModifyRegister(self, canNum, address, mask, data):
"""
修改寄存器
:param canNum:can号
:param address:地址
:param mask:掩码
:param data:数据
:return:None
"""
try:
if canNum != None and isinstance(address, int) and isinstance(mask, int) and isinstance(data,
int) and canNum in (
2, 3):
self.PhyMcp2515_ReadWrite(canNum, [self.MCP_BITMOD, address, mask, data], 4)
except Exception as e:
print("PhyMcp2515_ModifyRegister():", e)
def PhyMcp2515_SetCanCtrlMode(self, canNum, newmode):
"""
设置can控制模式
:param canNum:can号
:param newmode:模式字
:return:True/False
"""
try:
if canNum != None and canNum in (2, 3):
self.PhyMcp2515_ModifyRegister(canNum, self.MCP_CANCTRL, self.MODE_MASK, newmode)
i = self.PhyMcp2515_ReadRegister(canNum, self.MCP_CANCTRL)
if (i & self.MODE_MASK) == newmode:
return True
else:
return False
except Exception as e:
print("PhyMcp2515_SetCanCtrlMode():", e)
def PhyMcp2515_SetRegisterS(self, canNum, address, values: list, n):
"""
设置寄存器
:param canNum:can号
:param address:地址
:param values:值列表
:param n:数量
:return:None
"""
try:
if canNum != None and isinstance(address, int) and isinstance(values, list) and isinstance(n,
int) and canNum in (
2, 3):
data = [self.MCP_WRITE, address] + values
self.PhyMcp2515_ReadWrite(canNum, data, n + 2)
except Exception as e:
print("PhyMcp2515_SetRegisterS():", e)
def PhyMcp2515_WriteCanId(self, canNum=None, mcpAddr=None, is_ext=None, can_id=None):
"""
写canID
:param canNun:can号
:param mcpAddr:地址
:param is_ext:是否为扩展
:param can_id:canID
:return:None
"""
try:
if canNum != None and isinstance(mcpAddr, int) and isinstance(is_ext, bool) and isinstance(can_id,
int) and canNum in (
2, 3):
canid = (can_id & 0xffff)
if is_ext: # 扩展id
data = [0, 0, 0, 0]
data[self.MCP_EID0] = (canid & 0xff)
data[self.MCP_EID8] = ((canid >> 8) & 0xff)
canid = canid // 0x10000
data[self.MCP_SIDL] = (canid & 0x03)
data[self.MCP_SIDL] += (canid & 0x1c) * 8
data[self.MCP_SIDL] |= self.MCP_TXB_EXIDE_M
data[self.MCP_SIDH] = canid // 32
else: # 标准id
data = [0, 0, 0, 0]
data[self.MCP_SIDH] = (canid // 8)
data[self.MCP_SIDL] = ((canid & 0x07) * 32)
data[self.MCP_EID0] = 0
data[self.MCP_EID8] = 0
self.PhyMcp2515_SetRegisterS(canNum, mcpAddr, data, 4)
except Exception as e:
print("PhyMcp2515_WriteCanId():", e)
def PhyMcp2515_InitCanBuffers(self, canNum):
"""
初始化CAN buffer
:param canNum:can号
:return:None
"""
try:
if canNum != None and canNum in (2, 3):
# 设置接收屏蔽寄存器
self.PhyMcp2515_WriteCanId(canNum, self.MCP_RXM0SIDH, 1, 0x0)
self.PhyMcp2515_WriteCanId(canNum, self.MCP_RXM1SIDH, 1, 0x0)
# 设置接收过滤寄存器
self.PhyMcp2515_WriteCanId(canNum, self.MCP_RXF0SIDH, 1, 0x0)
self.PhyMcp2515_WriteCanId(canNum, self.MCP_RXF1SIDH, 1, 0x0)
self.PhyMcp2515_WriteCanId(canNum, self.MCP_RXF2SIDH, 1, 0x0)
self.PhyMcp2515_WriteCanId(canNum, self.MCP_RXF3SIDH, 1, 0x0)
self.PhyMcp2515_WriteCanId(canNum, self.MCP_RXF4SIDH, 1, 0x0)
self.PhyMcp2515_WriteCanId(canNum, self.MCP_RXF5SIDH, 1, 0x0)
# 清除、停用三个传输缓冲器
a1, a2, a3 = self.MCP_TXB0CTRL, self.MCP_TXB1CTRL, self.MCP_TXB2CTRL
for i in range(14):
self.PhyMcp2515_SetRegister(canNum, a1, 0x00)
self.PhyMcp2515_SetRegister(canNum, a2, 0x00)
self.PhyMcp2515_SetRegister(canNum, a3, 0x00)
a1 += 1
a2 += 1
a3 += 1
self.PhyMcp2515_SetRegister(canNum, self.MCP_RXB0CTRL, 0)
self.PhyMcp2515_SetRegister(canNum, self.MCP_RXB1CTRL, 0)
except Exception as e:
print("PhyMcp2515_InitCanBuffers():", e)
def PhyMcp2515_ResetCanBuffers(self, canNum):
"""
初始化CAN buffer,复位时使用
:param canNum:can号
:return:None
"""
try:
if canNum != None and canNum in (2, 3):
# RXM0\RXM1
self.PhyMcp2515_ReadWrite(canNum,
[self.MCP_WRITE, self.MCP_RXM0SIDH, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00], 10)
# RXF0-RXF2
self.PhyMcp2515_ReadWrite(canNum,
[self.MCP_WRITE, self.MCP_RXM0SIDH, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00], 14)
# RXF3-RXF5
self.PhyMcp2515_ReadWrite(canNum,
[self.MCP_WRITE, self.MCP_RXF3SIDH, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00], 14)
# TXB0
self.PhyMcp2515_ReadWrite(canNum,
[self.MCP_WRITE, self.MCP_TXB0CTRL, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00], 16)
# TXB1
self.PhyMcp2515_ReadWrite(canNum,
[self.MCP_WRITE, self.MCP_TXB1CTRL, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00], 16)
# TXB2
self.PhyMcp2515_ReadWrite(canNum,
[self.MCP_WRITE, self.MCP_TXB2CTRL, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00], 16)
self.PhyMcp2515_SetRegister(canNum, self.MCP_RXB0CTRL, 0)
self.PhyMcp2515_SetRegister(canNum, self.MCP_RXB1CTRL, 0)
except Exception as e:
print("PhyMcp2515_ResetCanBuffers():", e)
def PhyMcp2515_McpConfig(self, canNum, canSpeed):
"""
配置函数
:param canNum:can号
:param canSpeed:can口速率
:return:None
"""
try:
if canNum != None and canNum in (2, 3):
self.PhyMcp2515_Reset(canNum)
time.sleep(0.5)
res = self.PhyMcp2515_SetCanCtrlMode(canNum, self.MODE_CONFIG) # 进入配置模式
if res == False:
return
self.PhyMcp2515_ConfigRate(canNum, canSpeed) # 配置波特率
self.PhyMcp2515_InitCanBuffers(canNum) # 初始化数据区
self.PhyMcp2515_ModifyRegister(canNum, self.MCP_RXB0CTRL, self.MCP_RXB_RX_MASK | self.MCP_RXB_BUKT_MASK,
self.MCP_RXB_RX_ANY | self.MCP_RXB_BUKT_MASK) # 修改接收控制寄存器
self.PhyMcp2515_ModifyRegister(canNum, self.MCP_RXB1CTRL, self.MCP_RXB_RX_MASK,
self.MCP_RXB_RX_ANY) # 修改接收控制寄存器
self.PhyMcp2515_SetRegister(canNum, self.MCP_CANINTE, self.MCP_RX_INT) # 打开接收中断
self.PhyMcp2515_SetCanCtrlMode(canNum, self.MODE_NORMAL) # 进入正常模式
except Exception as e:
print("PhyMcp2515_McpConfig():", e)
def PhyMcp2515_RxMegDeal(self, canNum):
"""
读消息合约函数
:param canNum:can号
:return:message对象/None
"""
try:
if canNum != None and canNum in (2, 3):
msg = MCP2515MSG() # 定义消息对象
uc_PhyMcp2515_canIntf = self.PhyMcp2515_ReadRegister(canNum, self.MCP_CANINTF)
uc_intFlag = copy.deepcopy(uc_PhyMcp2515_canIntf)
if ((uc_PhyMcp2515_canIntf & 0x01) == 0x01):
uc_intFlag &= ~0x01
data = [self.MCP_READ_RX0] + [0x00] * 13
PhyMcp2515_BufferRx = self.PhyMcp2515_ReadWrite(canNum, data, 14) # 接收数据
if len(PhyMcp2515_BufferRx) == 0:
return
if (PhyMcp2515_BufferRx[2] & self.MCP_RXB_IDE_M) != self.MCP_RXB_IDE_M: # 标准数据
msg.ul_id = (PhyMcp2515_BufferRx[1] << 3) | ((PhyMcp2515_BufferRx[2] >> 5) & 0x07)
msg.uc_exId = 0
else: # 扩展帧
msg.ul_id = PhyMcp2515_BufferRx[4]
msg.ul_id |= ((PhyMcp2515_BufferRx[3]) << 8)
msg.ul_id |= ((PhyMcp2515_BufferRx[2] & 0x03) << 16)
msg.ul_id |= ((PhyMcp2515_BufferRx[2] & 0xe0) << 13)
msg.ul_id |= ((PhyMcp2515_BufferRx[1]) << 21)
msg.uc_exId = 1 # 扩展帧
msg.uc_dlc = (PhyMcp2515_BufferRx[5] & 0x0f)
for i in range(8):
msg.uc_dta[i] = PhyMcp2515_BufferRx[6 + i]
msg.channel = "CAN" + str(canNum) # 赋值通道号
if ((uc_PhyMcp2515_canIntf & 0x02) == 0x02):
uc_intFlag &= ~0x02
data = [self.MCP_READ_RX1] + [0x00] * 13
PhyMcp2515_BufferRx = self.PhyMcp2515_ReadWrite(canNum, data, 14) # 接收数据
if len(PhyMcp2515_BufferRx) == 0:
return
if (PhyMcp2515_BufferRx[2] & self.MCP_RXB_IDE_M) != self.MCP_RXB_IDE_M: # 标准数据
msg.ul_id = (PhyMcp2515_BufferRx[1] << 3) | ((PhyMcp2515_BufferRx[2] >> 5) & 0x07)
msg.uc_exId = 0
else: # 扩展帧
msg.ul_id = PhyMcp2515_BufferRx[4]
msg.ul_id |= ((PhyMcp2515_BufferRx[3]) << 8)
msg.ul_id |= ((PhyMcp2515_BufferRx[2] & 0x03) << 16)
msg.ul_id |= ((PhyMcp2515_BufferRx[2] & 0xe0) << 13)
msg.ul_id |= ((PhyMcp2515_BufferRx[1]) << 21)
msg.uc_exId = 1 # 扩展帧
msg.uc_dlc = (PhyMcp2515_BufferRx[5] & 0x0f)
for i in range(8):
msg.uc_dta[i] = PhyMcp2515_BufferRx[6 + i]
msg.channel = "CAN" + str(canNum) # 赋值通道号
uc_intFlag &= 0x03
self.PhyMcp2515_ModifyRegister(canNum, self.MCP_CANINTF, uc_intFlag, self.MCP_NO_INT) # 清除中断标志
# self.PhyMcp2515_ReadWrite(canNum, [self.MCP_WRITE, self.MCP_CANINTF, self.MCP_NO_INT],3) # 清除中断标志
# self.PhyMcp2515_ReadWrite(canNum,[self.MCP_WRITE,self.MCP_CANINTF,uc_intFlag,self.MCP_NO_INT],4)# 清除中断标志,此句有些问题,于指令不符
if msg.channel == None: # 当通道为空时,返回None
return None
else: # 否则,返回消息
return msg
else:
return None
except Exception as e:
print("PhyMcp2515_RxMegDeal():", e)
return None
def PhyMcp2515_TxMegDeal(self, canNum, canId, canExtIDE: bool, canDlc, canData: list):
"""
写消息合约函数
:param canNum:can号
:param canId:canID
:param canExtIDE:是否为扩展
:param canDlc:数据长度
:param canData:数据列表
:return:None
"""
try:
if canNum != None and isinstance(canId, int) and isinstance(canExtIDE, bool) and isinstance(canData,list) and len(canData) >= 8 and canNum in (2, 3): # 数据条件判断
IsIDLE = (self.PhyMcp2515_ReadRegister(canNum, 0x30) & 0x08) | (
self.PhyMcp2515_ReadRegister(canNum, 0x40) & 0x08) | (
self.PhyMcp2515_ReadRegister(canNum, 0x50) & 0x08)
count = 0 # 计数变量
while ((IsIDLE >> 3) == 1): # wait for finish sending
time.sleep(0.001) # 延时1ms
IsIDLE = (self.PhyMcp2515_ReadRegister(canNum, 0x30) & 0x08) | (
self.PhyMcp2515_ReadRegister(canNum, 0x40) & 0x08) | (
self.PhyMcp2515_ReadRegister(canNum, 0x50) & 0x08)
count += 1
if count > 3: # 记录三次直接返回
raise IOError("can bus transfer error!")
PhyMcp2515_BufferTx = []
PhyMcp2515_BufferTx.append(self.MCP_LOAD_TX0)
if not canExtIDE: # 标准帧
PhyMcp2515_BufferTx.append(((canId >> 3) & 0xff))
PhyMcp2515_BufferTx.append((((canId & 0x07) << 5) | ((0x00 & 0x01) << 3) | ((0x00 >> 16) & 0x03)))
PhyMcp2515_BufferTx.append(((0x00 >> 8) & 0xff))
PhyMcp2515_BufferTx.append((0x00 & 0xff))
else: # 扩展帧,MSB (SID10~SID0,EID17~EID0) LSB
PhyMcp2515_BufferTx.append(((canId >> 21) & 0xff))
PhyMcp2515_BufferTx.append(
(((canId >> 13) & 0xe0) | (self.MCP_TXB_EXIDE_M) | ((canId >> 16) & 0x03)))
PhyMcp2515_BufferTx.append(((canId >> 8) & 0xff))
PhyMcp2515_BufferTx.append((canId & 0xff))
PhyMcp2515_BufferTx.append(((0x00 & 0x01) << 6) | (canDlc & 0x0f))
PhyMcp2515_BufferTx += canData[0:8]
self.PhyMcp2515_ReadWrite(canNum, PhyMcp2515_BufferTx, 6 + canDlc)
self.PhyMcp2515_ReadWrite(canNum, [self.MCP_RTS_TX0], 1) # MCP_TXB0发送请求
except Exception as e:
print("PhyMcp2515_TxMegDeal():", e)
def PhyMcp2515_ErrDeal(self, canNum):
"""
错误合约
:param canNum:can号
:return:None
"""
try:
self.PhyMcp2515_PeriphInit(canNum)#调用初始化函数
except Exception as e:
print("PhyMcp2515_ErrDeal():", e)
def PhyMcp2515_CANTx(self, canNum, TxMessage: MCP2515MSG):
"""
can发送数据,只发送标准帧
:return:None
"""
try:
if canNum != None and isinstance(TxMessage, MCP2515MSG) and canNum in (2, 3):
self.PhyMcp2515_TxMegDeal(canNum, TxMessage.ul_id, False, TxMessage.uc_dlc, TxMessage.uc_dta) # 标准数据
# self.PhyMcp2515_TxMegDeal(canNum, TxMessage.ul_id, True, TxMessage.uc_dlc, TxMessage.uc_dta) # 扩展数据
except Exception as e:
print("PhyMcp2515_CANTx():", e)
def ObjMcp2515_MainDeal(self, canNum):
"""
can接收数据
:param canNum:can口
:return:message/None
"""
try:
if canNum != None and canNum in (2, 3):
msg = self.PhyMcp2515_RxMegDeal(canNum)
if msg != None: # 当消息有效时进入
return msg
except Exception as e:
print("ObjMcp2515_MainDeal():", e)
def can_calback(message: MCP2515MSG):
"""
自定义can接收回调函数
:param message:消息变量
:return:None
"""
try:
print("can_calback():", hex(message.ul_id), bool(message.uc_exId), message.uc_dlc, message.uc_dta)
except Exception as e:
print("can_calback():", e)
# 调试
if __name__ == "__main__":
mycan = myCAN() # 创建CAN口对象
# 打开can口:
# mycan.openCan2()
mycan.openCan3()
id, can_d = 0x10, [1, 2, 3, 4, 5, 6, 7, 8]
# 读取can口:
# mycan.Notifier(can2Flag=True, callbackFunc=can_calback) # 定义接收通知
mycan.Notifier(can3Flag=True, callbackFunc=can_calback) # 定义接收通知
# mycan.startNotifier() # 开始接收数据
count = 10
while count:
# 写入can口:
# mycan.can2Write(id,can_d)
mycan.can3Write(id, can_d)
count -= 1
time.sleep(0.05)
count = 50
while count: # 有限次循环
count -= 1
time.sleep(0.2)
# 关闭can口:
mycan.stoptNotifier() # 停止接收数据
# mycan.closeCan2()# 关闭
mycan.closeCan3() # 关闭
测试结果
forlinx@ok3568:~$ cd test/
forlinx@ok3568:~/test$ sudo python3 can_spi_driver.py
can_calback(): 0x7e0 False 8 [1, 2, 3, 4, 5, 6, 7, 8]
can_calback(): 0x7e0 False 8 [1, 2, 3, 4, 5, 6, 7, 8]
can_calback(): 0x7e0 False 8 [1, 2, 3, 4, 5, 6, 7, 8]
can_calback(): 0x7e0 False 8 [1, 2, 3, 4, 5, 6, 7, 8]
can_calback(): 0x7e0 False 8 [1, 2, 3, 4, 5, 6, 7, 8]
can_calback(): 0x7e0 False 8 [1, 2, 3, 4, 5, 6, 7, 8]
can_calback(): 0x7e0 False 8 [1, 2, 3, 4, 5, 6, 7, 8]
can_calback(): 0x7e0 False 8 [1, 2, 3, 4, 5, 6, 7, 8]
can_calback(): 0x7e0 False 8 [1, 2, 3, 4, 5, 6, 7, 8]
can_calback(): 0x7e0 False 8 [1, 2, 3, 4, 5, 6, 7, 8]
四)can1测试端代码
#!/usr/bin/env python
# coding: utf-8
"""
spi转can接收数据端(CAN1)处理 测试程序
编写: 宋连猛 2022.9.28
"""
import can
import threading
import random
import os
import time
from queue import Queue
#设备初始化,暂时不用
#os.system("sudo ip link set can0 down")
os.system("sudo ip link set can1 down")
#设置为canfd,使用时更改波特率和can2.0B,Message类中:FD模式有F标记,正常模式无标记,FD模式下速率貌似是采样出来的,不受到两个速率错误影响
#即使是CAN2.0B模式,两个设备全部为FD接口也可通过采样正确获取数据
#os.system("sudo ip link set can0 up type can bitrate 100000 sample-point 0.75 dbitrate 5000000 dsample-point 0.8 fd on")
os.system("sudo ip link set can1 up type can bitrate 100000 sample-point 0.75 dbitrate 5000000 dsample-point 0.8 fd on")
#os.system("sudo ip link set can0 up")
os.system("sudo ip link set can1 up")
#定义全局变量
q_data=Queue()#队列,用于数据返回
#定义CAN口:收发可以为统一个接口
#CAN2.0B,100kbps
bus_tx = can.interface.Bus(bustype='socketcan', channel='can1', bitrate=100000,fd=False)
bus_rx = can.interface.Bus(bustype='socketcan', channel='can1', bitrate=100000,fd=False)#接收别人的数据
#CANFD,100kbps和5Mpbs
#bus_tx = can.interface.Bus(bustype='socketcan', channel='can1', bitrate=100000,dbitrate=5000000,fd=True)
#bus_rx = can.interface.Bus(bustype='socketcan', channel='can1', bitrate=100000,dbitrate=5000000,fd=True)
def message(message):
if message!=None and message.is_rx:#当数据有效且是读数据时
print("message=",message.is_extended_id,message.dlc,message.arbitration_id,message.data)#打印数据
q_data.put(message.data)#放入发送队列
#定义监听器
listeners = [
#can.Printer(), # Callback function, print the received messages
message,#消息处理
]
notifier = can.Notifier([bus_rx], listeners)
#发送处理类
class tx_thread_cl:
def __init__(self, bus,q_data):
self.bus = bus
self.running = True
self.thread = threading.Thread(target=self.tx_callback, args=(self.bus,))
self.finished = False
self.q_data=q_data#队列对象
def start(self):
self.thread.start()
def tx_callback(self, bus):
while self.running:
data =self.q_data.get()#获得数据
msg = can.Message(is_extended_id=False, arbitration_id=0x7E0, data=data,is_fd=False)#CAN2.0B 标准数据
#msg = can.Message(is_extended_id=True, arbitration_id=0x7E0, data=data, is_fd=False) # CAN2.0B 扩展数据
#msg = can.Message(is_extended_id=False, arbitration_id=0x7E0, data=data, is_fd=True)#CANFD,标准数据
print("msg=",msg)
bus.send(msg)
time.sleep(0.5)
self.finished = True
def stop(self):
self.running = False
if __name__ == '__main__':
if __name__ == "__main__":
tx_service = tx_thread_cl(bus_tx,q_data)# 定义发送类
tx_service.start()# 开始发送,每0.5秒一次
# 等待用户输入退出
running = True
while running:
input()
running = False
while not tx_service.finished:
tx_service.stop()
# 停止通知
notifier.stop()
# 停止总线
bus_tx.shutdown()
bus_rx.shutdown()
测试结果
forlinx@ok3568:~/test$ sudo python3 can_test_with_spiToCan.py
message= False 8 16 bytearray(b'\x01\x02\x03\x04\x05\x06\x07\x08')
msg= Timestamp: 0.000000 ID: 07e0 S Rx DL: 8 01 02 03 04 05 06 07 08
message= False 8 16 bytearray(b'\x01\x02\x03\x04\x05\x06\x07\x08')
message= False 8 16 bytearray(b'\x01\x02\x03\x04\x05\x06\x07\x08')
message= False 8 16 bytearray(b'\x01\x02\x03\x04\x05\x06\x07\x08')
message= False 8 16 bytearray(b'\x01\x02\x03\x04\x05\x06\x07\x08')
message= False 8 16 bytearray(b'\x01\x02\x03\x04\x05\x06\x07\x08')
message= False 8 16 bytearray(b'\x01\x02\x03\x04\x05\x06\x07\x08')
message= False 8 16 bytearray(b'\x01\x02\x03\x04\x05\x06\x07\x08')
message= False 8 16 bytearray(b'\x01\x02\x03\x04\x05\x06\x07\x08')
message= False 8 16 bytearray(b'\x01\x02\x03\x04\x05\x06\x07\x08')
msg= Timestamp: 0.000000 ID: 07e0 S Rx DL: 8 01 02 03 04 05 06 07 08
msg= Timestamp: 0.000000 ID: 07e0 S Rx DL: 8 01 02 03 04 05 06 07 08
msg= Timestamp: 0.000000 ID: 07e0 S Rx DL: 8 01 02 03 04 05 06 07 08
msg= Timestamp: 0.000000 ID: 07e0 S Rx DL: 8 01 02 03 04 05 06 07 08
msg= Timestamp: 0.000000 ID: 07e0 S Rx DL: 8 01 02 03 04 05 06 07 08
msg= Timestamp: 0.000000 ID: 07e0 S Rx DL: 8 01 02 03 04 05 06 07 08
msg= Timestamp: 0.000000 ID: 07e0 S Rx DL: 8 01 02 03 04 05 06 07 08
msg= Timestamp: 0.000000 ID: 07e0 S Rx DL: 8 01 02 03 04 05 06 07 08
msg= Timestamp: 0.000000 ID: 07e0 S Rx DL: 8 01 02 03 04 05 06 07 08
结尾
网络上spi转can在mcu和linux 内核相关参考比较多,python3的实现不好找,希望此篇文章可以给大家一些思路。我是Simon,在这里期待您的关注。
版权声明:本文为weixin_43521467原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。