一、开启使用
sudo modprobe can
sudo modprobe can-dev
sudo modprobe can-raw
sudo modprobe can-bcm
for i in {0..5};
do
sudo ip link set can"$i" up type can bitrate 500000 restart-ms 300
# sudo ip link set can"$i" down
done
二、shell查看收发
cansend can0 505#0000F1FFFFFFFFF1
# 开另一窗口,接受
candump can0 | grep 505
# 发送指令增加时间戳
cansend can9 2BD#0000553900000001 && date +%s
# 接收指令打印时间戳
sudo candump can9 -t a | grep 2BD | grep "00 00 55 39"
# cantools
candump vcan0 | python3 -m cantools decode --single-line tests/files/dbc/motohawk.dbc
三、C++ 接收
#include <iostream>
#include <cstring>
#include <unistd.h>
#include <net/if.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <linux/can.h>
#include <linux/can/raw.h>
int main() {
const char* ifname = "can1";
// 创建SocketCAN套接字
int sock = socket(PF_CAN, SOCK_RAW, CAN_RAW);
if (sock == -1) {
std::cerr << "Failed to create socket." << std::endl;
return 1;
}
// 获取CAN接口索引
struct ifreq ifr;
std::strncpy(ifr.ifr_name, ifname, IFNAMSIZ - 1);
if (ioctl(sock, SIOCGIFINDEX, &ifr) == -1) {
std::cerr << "Failed to get interface index." << std::endl;
close(sock);
return 1;
}
// 绑定CAN接口到套接字
struct sockaddr_can addr;
std::memset(&addr, 0, sizeof(addr));
addr.can_family = AF_CAN;
addr.can_ifindex = ifr.ifr_ifindex;
if (bind(sock, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr)) == -1) {
std::cerr << "Failed to bind socket to interface." << std::endl;
close(sock);
return 1;
}
// 接收CAN消息
struct can_frame frame;
while (true) {
int nbytes = read(sock, &frame, sizeof(struct can_frame));
if (nbytes > 0) {
// std::cout << "Received CAN frame:" << std::endl;
std::cout << ifr.ifr_name << " ";
std::cout << "0x" << std::hex << static_cast<int>(frame.can_id) << " ";
// std::cout << "Data: ";
std::cout << "["<< (int)frame.can_dlc << "] ";
for (int i = 0; i < frame.can_dlc; ++i) {
std::cout << std::hex << static_cast<int>(frame.data[i]) << " ";
}
std::cout << std::endl;
// std::cout << "Length: " << frame.can_dlc << std::endl;
// std::cout << std::endl;
}
}
// 关闭套接字
close(sock);
return 0;
}
C++ 发送
#include <iostream>
#include <cstring>
#include <unistd.h>
#include <net/if.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <linux/can.h>
#include <linux/can/raw.h>
int main() {
const char* ifname = "can0";
// 创建SocketCAN套接字
int sock = socket(PF_CAN, SOCK_RAW, CAN_RAW);
if (sock == -1) {
std::cerr << "Failed to create socket." << std::endl;
return 1;
}
// 获取CAN接口索引
struct ifreq ifr;
std::strncpy(ifr.ifr_name, ifname, IFNAMSIZ - 1);
if (ioctl(sock, SIOCGIFINDEX, &ifr) == -1) {
std::cerr << "Failed to get interface index." << std::endl;
close(sock);
return 1;
}
// 绑定CAN接口到套接字
struct sockaddr_can addr;
std::memset(&addr, 0, sizeof(addr));
addr.can_family = AF_CAN;
addr.can_ifindex = ifr.ifr_ifindex;
if (bind(sock, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr)) == -1) {
std::cerr << "Failed to bind socket to interface." << std::endl;
close(sock);
return 1;
}
// 准备发送的CAN消息
struct can_frame frame;
frame.can_id = 0x505; // 设置CAN ID
frame.can_dlc = 8; // 设置数据长度为8个字节
std::memset(frame.data, 0, sizeof(frame.data)); // 清空数据
// 发送CAN消息
int nbytes = write(sock, &frame, sizeof(struct can_frame));
if (nbytes == -1) {
std::cerr << "Failed to send CAN frame." << std::endl;
close(sock);
return 1;
}
std::cout << "CAN frame sent successfully." << std::endl;
// 关闭套接字
close(sock);
return 0;
}
四、python收发
import can
import binascii
def send_one():
"""Sends a single message."""
# this uses the default configuration (for example from the config file)
# see https://python-can.readthedocs.io/en/stable/configuration.html
with can.Bus(bustype='socketcan', channel="can0", bitrate=500000, recv_timeout=1.0) as bus:
# Using specific buses works similar:
# bus = can.Bus(interface='socketcan', channel='vcan0', bitrate=250000)
# bus = can.Bus(interface='pcan', channel='PCAN_USBBUS1', bitrate=250000)
# bus = can.Bus(interface='ixxat', channel=0, bitrate=250000)
# bus = can.Bus(interface='vector', app_name='CANalyzer', channel=0, bitrate=250000)
# ...
msg = can.Message(
arbitration_id=0x505, data=[0, 25, 0, 1, 3, 1, 4, 1], is_extended_id=True
)
# is_extended_id 根据实际情况修改
try:
bus.send(msg)
print(f"Message sent on {bus.channel_info}")
except can.CanError:
print("Message NOT sent")
def receive_all():
with can.Bus(bustype='socketcan', channel="can0", bitrate=500000, recv_timeout=1.0) as bus:
for msg in bus:
# print(msg.dlc)
data = binascii.hexlify(msg.data).decode('utf-8')
chunks = [data[i:i+2] for i in range(0, len(data), 2)]
formatted_string = ' '.join(chunks).upper()
# id = binascii.hexlify(msg.channel).decode('utf-8')
# data = [i for i in data]
print(msg.channel, hex(msg.arbitration_id), [msg.dlc], formatted_string)
# print(s.decode('utf-8'))
# print(msg.data)
if __name__ == "__main__":
receive_all()
通过dbc文件发送msg
import can
import time
import cantools
import json
def send_one(example_message, data):
with can.Bus(bustype='socketcan', channel="can3", bitrate=500000, recv_timeout=1.0) as bus:
time.sleep(1)
msg = can.Message(arbitration_id = example_message.frame_id, data = data, is_extended_id=True)
try:
bus.send(msg)
print("[info] send:", bus.channel, hex(msg.arbitration_id), list(msg.data), time.time())
except KeyboardInterrupt:
print("=======================")
if __name__ == "__main__":
db = cantools.database.load_file('NT2_Body_CAN_Official_Release_V_1_6_DA_03.dbc')
example_message = db.get_message_by_name('CCU_01')
dict1 = {}
max_name_length = max(len(sig.name) for sig in example_message.signals)
max_name_length = max_name_length + 4
for sig in example_message.signals:
print("{:<{}} {}\t {}\t 0\t".format(sig.name, max_name_length, sig.start, sig.length))
dict1[sig.name] = 0
with open("原始.json", "w") as json_file:
json.dump(dict1, json_file, indent=4)
with open("config.json", "r") as json_file:
src = json.load(json_file)
sum = 0
for key, vlue in src.items():
maxvlue = (1 << example_message.get_signal_by_name(key).length) -1
if vlue > maxvlue:
raise ValueError("{} max value is {}; but value is {}".format(key, maxvlue, vlue))
# assert(vlue < (1 << example_message.get_signal_by_name(key).length -1))
tmp = vlue << example_message.get_signal_by_name(key).start
sum = sum | tmp
print(sum)
byte_code = sum.to_bytes(example_message.length, "big")
print(byte_code)
send_one(example_message, byte_code)
通过配置json文件,发送msg
import can
import time
import cantools
import json
from cantools.database.can.signal import NamedSignalValue
def set_signal_values(message, values):
signal_values = {}
for signal in message.signals:
if signal.name in values:
raw_value = values[signal.name]
scaled_value = raw_value * signal.scale + signal.offset
signal_values[signal.name] = scaled_value
else:
signal_values[signal.name] = 0
return signal_values
def send_one(can_id, example_message, data):
with can.Bus(bustype='socketcan', channel= can_id, bitrate=500000, recv_timeout=1.0) as bus:
time.sleep(1)
msg = can.Message(arbitration_id = example_message.frame_id, data = data, is_extended_id=True)
try:
bus.send(msg)
print("[info] send:", bus.channel, hex(msg.arbitration_id), list(msg.data), time.time())
except KeyboardInterrupt:
print("=======================")
def print_dic(example_message_tmp):
dict1 = {}
for sig in example_message_tmp.signals:
dict1[sig.name] = 0
print(json.dumps(dict1, indent=4))
def send_msg(testcase):
db = cantools.database.load_file(testcase["dbc_file"])
example_message = db.get_message_by_name(testcase["msg_id"])
# print_dic(example_message)
sum = 0
list1 = []
for key, vlue in testcase["msg"].items():
maxvlue = (1 << example_message.get_signal_by_name(key).length) -1
if vlue > maxvlue:
raise ValueError("{} max value is {}; but value is {}".format(key, maxvlue, vlue))
# assert(vlue < (1 << example_message.get_signal_by_name(key).length -1))
tmp = vlue << example_message.get_signal_by_name(key).start
sum = sum | tmp
dic1 = set_signal_values(example_message, testcase["msg_id"])
byte_code = example_message.encode(dic1)
send_one(testcase["can_id"], example_message, byte_code)
if __name__ == "__main__":
with open("config.json", "r") as json_file:
src = json.load(json_file)
for i in range(1):
for testcase in src["testcases"]:
send_msg(testcase)
json文件
{
"testcases": [
{
"can_id": "can3",
"dbc_file": "NT2_Body_CAN_Official_Release_V_1_6_DA_03.dbc",
"msg_id": "CCU_01",
"msg": {
"AmbTemp": 0,
"PwrLimOfChi": 0,
"PtcTotActPwr": 0,
"AmbTempValid": 0,
"PtcTotReqdPwr": 0,
"PtcReFailSts": 0,
"ComprActPwr": 1,
"PtcFrntOnOffSts": 0,
"ComprReqdPwr": 0,
"CmprOnOffSts": 0,
"PtcReOnOffSts": 0,
"PtcFrntFailSts": 0,
"ComprlimSts": 0,
"ComprFaultSts": 0,
"CbnACSysSts": 0,
"CCUActPwrLimEna": 0,
"EXVChillerFailr": 0,
"SOVTXVChillerFailr": 0,
"CoolgFanDutyCycReq": 0
}
},
{
"can_id": "can3",
"dbc_file": "NT2_Body_CAN_Official_Release_V_1_6_DA_03.dbc",
"msg_id": "CCU_01",
"msg": {
"AmbTemp": 0,
"PwrLimOfChi": 0,
"PtcTotActPwr": 0,
"AmbTempValid": 0,
"PtcTotReqdPwr": 0,
"PtcReFailSts": 0,
"ComprActPwr": 1,
"PtcFrntOnOffSts": 0,
"ComprReqdPwr": 0,
"CmprOnOffSts": 0,
"PtcReOnOffSts": 0,
"PtcFrntFailSts": 0,
"ComprlimSts": 0,
"ComprFaultSts": 0,
"CbnACSysSts": 0,
"CCUActPwrLimEna": 0,
"EXVChillerFailr": 0,
"SOVTXVChillerFailr": 0,
"CoolgFanDutyCycReq": 0
}
}
]
}
版权声明:本文为m0_56306892原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。