实现功能:设计一个新的action,实现在冗余链路中的数据包去重
一:在内核级定义OVS action
(一)在datapath/linux/compat/include/linux/openvswitch.h中添加:
enum ovs_action_attr {
/* ... */
/*
* after #ifndef __KERNEL__ ... #endif.
* the equals is thus ABSOLUTELY NECESSARY
*/
OVS_ACTION_ATTR_RMDUPQUEUE = 23, /* struct ovs_action_rmdupqueue. */
__OVS_ACTION_ATTR_MAX, /* Nothing past this will be accepted
* from userspace. */
/* ... */
}
(二)注意:指定显示值
OVS_ACTION_ATTR_RMDUPQUEUE = 23 如果我们不为该枚举条目指定显式值,则内核和用户区部分 ovs-vswitchd将对新操作使用不同的代码(这里不加会出错)
(三)定义内核级别的OVS action的消息结构体
/*
* struct ovs_action_rmdupqueue - %OVS_ACTION_ATTR_RMDUPQUEUE action argument.
* @queue_id: Algorithm used to choose queue number.
* @max_len: basis used for setting queue[queue_id] size.
*/
struct ovs_action_rmdupqueue{
uint32_t queue_id;
uint32_t max_len;
};
二:在内核模块中实现自定义action的实现函数,用于调用执行
(一)队列业务实现,在datapath/flow_netlink.h中定义队列
int nsh_hdr_from_nlattr(const struct nlattr *attr, struct nshhdr *nh,
size_t size);
//-------------------queue start---------------------
#define MAX_QUEUE_SIZE 1000 //最多可以为1000个流提供服务
#define MAX_QUEUE_LEN 1000 //----改进:动态自适应算法,自动选择队列大小 或者 滑动窗口协议PRP
typedef struct
{
int *queue; //队列指针(动态分配队列空间)
int NUM; //队列大小
int TOP, REAR; //队首队尾标识
int EmpFlag; //队列判空标识
}Queue;
void InitQueue(Queue* q, int n); //初始化队列
int EmptyOrFullQueue(Queue q); //队列判空以及判断满
int QueueLength(Queue q); //获取队列大小
int PushQueue(Queue* q,int ele); //入队操作
int PopQueue(Queue* q); //出队操作
int RePushQueue(Queue* q, int ele); //当一个数据第二次到达时对数据进行匹配出队操作
int FindElePos(Queue q, int ele,int* n); //查找元素位置
void ShowData(Queue q); //显示队列数据
extern Queue Que[];
//-----------------queue end----------------------------
#endif /* flow_netlink.h */
(二)队列业务实现,在datapath/flow_netlink.c中实现队列
//-------------------queue start---------------------
void InitQueue(Queue* q,int n)
{
q->NUM = n;
//空间回收
if (q->queue != NULL)
{
kfree(q->queue); //改进:设置一个新的action(---del-flows指令,不是action) 实现队列的释放,清除上一个action
q->queue = NULL;
}
if (q->NUM != 0)
{
q->queue = (int *)kmalloc(sizeof(int)*n,GFP_KERNEL);
memset(q->queue, 0, sizeof(int)*n);
}
q->TOP = q->REAR = 0;
q->EmpFlag = 1; //空队列
}
int EmptyOrFullQueue(Queue q)
{
return q.EmpFlag;
}
int QueueLength(Queue q)
{
if (EmptyOrFullQueue(q) == 1)
return 0;
if (EmptyOrFullQueue(q) == 2)
return q.NUM;
if (q.TOP > q.REAR)
return q.REAR + q.NUM - q.TOP;
else
return q.REAR - q.TOP;
}
int PushQueue(Queue* q, int ele)
{
if (q->NUM <= 0)
return 0; //队列空间已经释放 设置常量
if (RePushQueue(q, ele) == 1) //重复插入,冗余数据---重点 改进:定义网络新协议,替换ip标识
return 1;
if (q->EmpFlag == 2) //队列满的情况入队
PopQueue(q); //先出队队首,再入队
q->queue[q->REAR] = ele;
q->REAR = (q->REAR + 1) % q->NUM;
if (q->TOP == q->REAR)
q->EmpFlag = 2; //为满队列
else
q->EmpFlag = 3;
return 0;
}
int PopQueue(Queue* q)
{
if (q->NUM <= 0)
return -2; //队列空间已经释放 改进:可以队列动态空间划分
int temp = q->queue[q->TOP];
if (q->EmpFlag == 1) //队列为空时,不允许出队
return -1;
q->TOP++;
if (q->TOP == q->NUM)
q->TOP = 0;
if (q->TOP == q->REAR)
q->EmpFlag = 1; //为空队列
else
q->EmpFlag = 3;
return temp;
}
int RePushQueue(Queue* q, int ele)
{
int n; //用于记录元素个数
int pos = FindElePos(*q, ele, &n);
if (pos == -1)
return 0; //可以直接插入
q->TOP = pos;
if (QueueLength(*q) == 0)
q->EmpFlag = 1;
else
q->EmpFlag = 3;
return 1; //队列有重复
}
int FindElePos(Queue q, int ele,int* n)
{
int i;
for (i = 0; i < QueueLength(q); i++)
if (q.queue[(i + q.TOP) % q.NUM] == ele)
{
*n = i + 1; //返回队首到该元素,一共几个数据
return (i + q.TOP + 1) % q.NUM; //返回该元素位置的下一个位置,新的队首
}
return -1;
}
//-----------------queue end----------------------------
队列操作:对于第二次到达的数据,若是在队列中匹配到,则将该数据以及前面的数据全部出队。
(三)在datapath/actions.c中的队列去重实现:
static bool rmdup_queue(struct sk_buff *skb, struct sw_flow_key *key,
const struct nlattr *attr)
{
/* since we can't use rand() in the kernel */
struct ovs_action_rmdupqueue* rdque_act = nla_data(attr);
uint32_t queue_id = rdque_act->queue_id - 1; //can`t waste kernel space 改进:交换机返回设置后的队列号给控制器,用来进行记录调配https://www.cnblogs.com/ssyfj/p/11623514.html
uint32_t max_len = rdque_act->max_len;
struct iphdr *ip_header = (struct iphdr *)skb_network_header(skb);
unsigned int ident = (unsigned int)ip_header->id; //获取IP报文首部的id标识字段作为去重标准
if(max_len == 0 && Que[queue_id].NUM == 0) //don`t need to remove the redundancy packet
return false; //去重复,后面进行删除---false不进行去重复
else if (max_len == 0 && Que[queue_id].NUM != 0){ //init queue infomation
InitQueue(&Que[queue_id],max_len);
return false; //去重复,后面进行删除---false不进行去重复
}
else if (max_len != 0 && max_len != Que[queue_id].NUM){ //reinit the queue
InitQueue(&Que[queue_id],max_len);
return PushQueue(&Que[queue_id],ident); //PushQueue返回1,表示有重复,返回0,表示没有重复
}
else{ //judge the packet, decide to remove this packet
return PushQueue(&Que[queue_id],ident); //PushQueue返回1,表示有重复,返回0,表示没有重复
}
}
(四)do_execute_actions方法实现对OVS action中OVS_ACTION_ATTR_RMDUPQUEUE去重业务的调用
/* Execute a list of actions against 'skb'. */
static int do_execute_actions(struct datapath *dp, struct sk_buff *skb,
struct sw_flow_key *key,
const struct nlattr *attr, int len)
{
const struct nlattr *a;
int rem;
for (a = attr, rem = len; rem > 0;
a = nla_next(a, &rem)) {
int err = 0;
switch (nla_type(a)) {
case OVS_ACTION_ATTR_OUTPUT:
....
break;
case OVS_ACTION_ATTR_RMDUPQUEUE:
if(rmdup_queue(skb, key, a)) //当我们的队列发现存在重复,则进行去重操作(删除当前数据包)
{
while (rem) {
a = nla_next(a, &rem);
}
}
break;
case OVS_ACTION_ATTR_PUSH_MPLS:
err = push_mpls(skb, key, nla_data(a));
break;
三:在用户态下定义OVS action
(一)所有action定义在include/openvswitch/ofp-actions.h中添加
OFPACT(GOTO_TABLE, ofpact_goto_table, ofpact, "goto_table") \
OFPACT(RMDUPQUEUE, ofpact_rmdupqueue, ofpact, "rmdupqueue")
(二)实现OVS action的消息体
/* OFPACT_RMDUPQUEUE.
*
* Used for OFPAT_RMDUPQUEUE */
struct ofpact_rmdupqueue {
OFPACT_PADDED_MEMBERS(
struct ofpact ofpact;
uint32_t queue_id;
uint32_t max_len; /* Uint probability, "covers" 0->1 range. */
);
uint8_t data[];
};
四:用户态下的OVS action实现(与内核态定义的OVS action有关,引入了内核定义的头文件,联系内核态和用户态两者的OVS action),这里我们使用OVS_NOT_REACHED,不实现用户态的action
(一)lib/odp-execute.c:用户态调用,执行action
void
odp_execute_actions(void *dp, struct dp_packet_batch *batch, bool steal,
const struct nlattr *actions, size_t actions_len,
odp_execute_cb dp_execute_action)
{
struct dp_packet *packet;
......
switch ((enum ovs_action_attr) type) {
case OVS_ACTION_ATTR_UNSPEC:
case OVS_ACTION_ATTR_RMDUPQUEUE:
case __OVS_ACTION_ATTR_MAX:
OVS_NOT_REACHED();
}
}
dp_packet_delete_batch(batch, steal);
}
(二)lib/dpif.c:
static void
dpif_execute_helper_cb( /* ... */ )
{
/* ... */
switch ((enum ovs_action_attr)type) {
/* ... */
case OVS_ACTION_ATTR_RMDUPQUEUE:
OVS_NOT_REACHED();
}
}
(三)ofproto/ofproto-dpif-ipfix.c:
void
dpif_ipfix_read_actions( /* ... */ )
{
/* ... */
switch (type) {
/* ... */
case OVS_ACTION_ATTR_RMDUPQUEUE:
/* Again, ignore for now. Not needed. */
break;
}
}
(四)ofproto/ofproto-dpif-sflow.c:
void
dpif_sflow_read_actions( /* ... */ )
{
switch (type) {
/* ... */
case OVS_ACTION_ATTR_RMDUPQUEUE:
/* Ignore sFlow for now, unless needed. */
break;
}
}
五:多功能聚合lib/odp-util.c(这里的格式化、解析都是针对内核态实现)
(一)实现格式化操作,获取了内核OVS action消息体中的内容,格式化为字符串形式
static void
format_odp_rmdupqueue_action(struct ds *ds, const struct ovs_action_rmdupqueue *rdq_act)
{
ds_put_format(ds, "rmdupqueue(queue_id=%"PRIu32",max_len=%"PRIu32")",rdq_act->queue_id,rdq_act->max_len);
}
static void
format_odp_action(struct ds *ds, const struct nlattr *a,
const struct hmap *portno_names)
{
int expected_len;
enum ovs_action_attr type = nl_attr_type(a);
switch (type) {
case OVS_ACTION_ATTR_METER:
ds_put_format(ds, "meter(%"PRIu32")", nl_attr_get_u32(a));
break;
......
case OVS_ACTION_ATTR_RMDUPQUEUE:
format_odp_rmdupqueue_action(ds, nl_attr_get(a));
break;
case OVS_ACTION_ATTR_UNSPEC:
case __OVS_ACTION_ATTR_MAX:
default:
format_generic_odp_action(ds, a);
break;
}
}
(二)实现解析化操作,根据我们的字符串,进行解析,从而将从字符串中获取的信息放入内核OVS action消息体中
static int
parse_odp_action(const char *s, const struct simap *port_names,
struct ofpbuf *actions)
{
{
uint32_t port;
int n;
if (ovs_scan(s, "%"SCNi32"%n", &port, &n)) {
nl_msg_put_u32(actions, OVS_ACTION_ATTR_OUTPUT, port);
return n;
}
}
......
{
uint32_t queue_id,max_len;
struct ovs_action_rmdupqueue rdque;
int n;
if (ovs_scan(s, "rmdupqueue(queue_id=%"SCNi32",max_len=%"SCNi32")%n", &queue_id, &max_len, &n)) {
rdque.queue_id = queue_id;
rdque.max_len = max_len;
nl_msg_put_unspec(actions, OVS_ACTION_ATTR_RMDUPQUEUE,
&rdque, sizeof rdque);
return n;
}
}
{
if (!strncmp(s, "clone(", 6)) {
.....
.....
(三)设置action长度
static int
odp_action_len(uint16_t type)
{
if (type > OVS_ACTION_ATTR_MAX) {
return -1;
}
switch ((enum ovs_action_attr) type) {
case OVS_ACTION_ATTR_OUTPUT: return sizeof(uint32_t);
......
case OVS_ACTION_ATTR_RMDUPQUEUE: return sizeof(struct ovs_action_rmdupqueue);
case OVS_ACTION_ATTR_UNSPEC:
case __OVS_ACTION_ATTR_MAX:
return ATTR_LEN_INVALID;
}
return ATTR_LEN_INVALID;
}
六:定义一个OpenFlow action
(一)lib / ofp-actions.c:引入添加自己的操作代码,作为OpenFlow的扩展
enum ofp_raw_action_type {
/* ... */
/* NX1.3+(47): struct nx_action_decap, ... */
NXAST_RAW_DECAP,
/* OF1.0+(30): struct ofp10_action_rmdupqueue. */
OFPAT_RAW_RMDUPQUEUE,
/* ... */
}
(二)注释说明以及自动生成函数的补充
注释非常重要,说明了协议版本,序号,构造openflow消息所需参数 有些函数头是根据协议版本、您选择的代码和操作所需的参数类型自动生成的。 后面的序号是独一无二的,不能在同一协议版本中出现两个一样的序号
put_OFPAT_action构造openflow消息 put_OFPAT_RMDUPQUEUE: 根据 下面的消息结构体构造出openflow消息
(三)定义对应的OpenFlow action的消息结构体
struct ofp10_action_rmdupqueue {
ovs_be16 type; /* OFPAT_VENDOR. */
ovs_be16 len; /* At least 16. */
ovs_be32 vendor; /* NX_VENDOR_ID. */
ovs_be16 subtype; /* NXAST_OUTPUT_TRUNC. */
uint8_t zeros[6];
ovs_be32 queue_id;
ovs_be32 max_len;
};
OFP_ASSERT(sizeof(struct ofp10_action_rmdupqueue) == 24); //必须是8字节的整数倍
七:OpenFlow Action与OpenVSwitch action的转化(编码、解码、格式化….)
(一)补充各个函数的说明
(1)ofpact_decode---->decode_OFPAT_RAW_RMDUPQUEUE: 解openflow消息生成openvswitch action (2)ofpact_encode---->encode_RMDUPQUEUE: 从ofpact_type构造openflow消息 (3)ofpact_parse---->parse_RMDUPQUEUE: 从字符串解析构造openvswitch action (4)ofpact_format---->format_RMDUPQUEUE: 将openvswitch action转化为string (5)ofpact_check---->check_RMDUPQUEUE:校验openvswitch action
注意:
在我们添加流表项时,会先执行解析、检查、编码操作(从字符串解析OVS action,然后构造openflow消息)
在我们使用ovs-ofctl dump-flows 交换机,会先执行解码、格式化操作(将openflow消息转换为OVS action,然后根据OVS action中的参数去格式化为字符串显示)
(二)在lib/ofp-actions.c中修改代码:定义新的动作,编码,解码,形式化和校验。
/*use queue to achive remove packet duplicate*/
struct ofp10_action_rmdupqueue {
ovs_be16 type; /* OFPAT_VENDOR. */
ovs_be16 len; /* At least 16. */
ovs_be32 vendor; /* NX_VENDOR_ID. */
ovs_be16 subtype; /* NXAST_OUTPUT_TRUNC. */
uint8_t zeros[6];
ovs_be32 queue_id;
ovs_be32 max_len;
};
OFP_ASSERT(sizeof(struct ofp10_action_rmdupqueue) == 24); //定义的消息体下实现编码、解码等即可
/* Encoding the action packet to put on the wire. */
static void
encode_RMDUPQUEUE(const struct ofpact_rmdupqueue *rdque,
enum ofp_version ofp_version OVS_UNUSED,
struct ofpbuf *out)
{
struct ofp10_action_rmdupqueue* of_rdque = put_OFPAT_RMDUPQUEUE(out);
of_rdque->queue_id = rdque->queue_id;
of_rdque->max_len = rdque->max_len;
}
/* Reversing the process. */
static enum ofperr
decode_OFPAT_RAW_RMDUPQUEUE(const struct ofp10_action_rmdupqueue* of_rdque,
enum ofp_version ofp_version OVS_UNUSED,
struct ofpbuf *out)
{
struct ofpact_rmdupqueue *rdque;
rdque = ofpact_put_RMDUPQUEUE(out);
rdque->queue_id = of_rdque->queue_id;
rdque->max_len = of_rdque->max_len;
return 0;
}
/* Helper for below. */
static char * OVS_WARN_UNUSED_RESULT
parse_rdque(char *arg, struct ofpbuf *ofpacts)
{
struct ofpact_rmdupqueue *rdque;
char *key, *value;
rdque = ofpact_put_RMDUPQUEUE(ofpacts);
while (ofputil_parse_key_value(&arg, &key, &value)) {
char *error = NULL;
if (!strcmp(key, "queue_id")) {
error = str_to_u32(value, &rdque->queue_id);
} else if (!strcmp(key, "max_len")) {
error = str_to_u32(value, &rdque->max_len);
}
if (error) return error;
}
return NULL;
}
/* Go from string-formatted args into an action struct.
e.g. ovs-ofctl add-flow ... actions=rmdupqueue(queue_id=3,max_len=100),output:"s2-eth0"
*/
static char * OVS_WARN_UNUSED_RESULT
parse_RMDUPQUEUE(char *arg, const struct ofpact_parse_params *pp)
{
return parse_rdque(arg, pp->ofpacts);
}
/* Used when printing info to console. */
static void
format_RMDUPQUEUE(const struct ofpact_rmdupqueue *rdque,
const struct ofpact_format_params *fp)
{
/* Feel free to use e.g. colors.param,
colors.end around parameter names */
ds_put_format(fp->s, "rmdupqueue(queue_id=%"PRIu32, rdque->queue_id);
ds_put_format(fp->s, ",max_len=%"PRIu32")", rdque->max_len);
}
/* ... */
static enum ofperr
check_RMDUPQUEUE(const struct ofpact_rmdupqueue *rdque OVS_UNUSED,
const struct ofpact_check_params *cp OVS_UNUSED)
{
/* My method needs no checking. Probably. */
return 0;
}
(三)lib/ofp-actions.c:
struct ofpact *
ofpact_next_flattened(const struct ofpact *ofpact)
{
switch (ofpact->type) {
/* ... */
case OFPACT_RMDUPQUEUE:
return ofpact_next(ofpact);
}
/* ... */
}
/* ... */
enum ovs_instruction_type
ovs_instruction_type_from_ofpact_type(enum ofpact_type type)
{
switch (type) {
/* ... */
case OFPACT_RMDUPQUEUE:
default:
return OVSINST_OFPIT11_APPLY_ACTIONS;
/* ... */
}
}
/* ... */
static bool
ofpact_outputs_to_port(const struct ofpact *ofpact, ofp_port_t port)
{
switch (ofpact->type) {
/* ... */
case OFPACT_RMDUPQUEUE:
default:
return false;
}
}
static enum action_set_class
action_set_classify(const struct ofpact a*)
{
switch (a->type) {
/* ... */
/* NEVER */
/* ... */
case OFPACT_RMDUPQUEUE:
return ACTION_SLOT_INVALID;
/* ... */
}
}
八:处理内核数据路径和用户级守护程序之间的通信
在某些情况下,守护程序和内核模块通过Netlink套接字相互通信。
守护程序在到达时将流操作向下发送到内核(用于数据包处理),并在到达时轮询来自内核的任何上行调用。
通常,当到达的数据包与任何已知条目都不匹配时(即必须将该数据包发送到控制器,或者需要具体实例化通配符规则),就会发生这种情况。
(一)实现通信ofproto/ofproto-dpif-xlate.c:
/* Put this with the other "compose" functions. */
static void
compose_rmdupqueue_action(struct xlate_ctx *ctx, struct ofpact_rmdupqueue *op) //可以看出是将用户态的消息体传入内核态中了
{
struct {
ovs_be32 queue_id;
ovs_be32 max_len;
} odp_pd_label;
odp_pd_label.queue_id = op->queue_id;
odp_pd_label.max_len = op->max_len;
nl_msg_put_unspec(ctx->odp_actions, OVS_ACTION_ATTR_RMDUPQUEUE,
&odp_pd_label, sizeof odp_pd_label);
}
static void
do_xlate_actions(const struct ofpact *ofpacts, size_t ofpacts_len,
struct xlate_ctx *ctx, bool is_last_action,
bool group_bucket_action)
{
struct flow_wildcards *wc = ctx->wc;
......
switch (a->type) {
......
case OFPACT_RMDUPQUEUE:
compose_rmdupqueue_action(ctx, ofpact_get_RMDUPQUEUE(a));
break;
case OFPACT_CLONE:
......
}
}
/* No action can undo the packet drop: reflect this. */
static bool
reversible_actions(const struct ofpact *ofpacts, size_t ofpacts_len)
{
const struct ofpact *a;
OFPACT_FOR_EACH (a, ofpacts, ofpacts_len) {
switch (a->type) {
/*... */
case OFPACT_RMDUPQUEUE:
return false;
}
}
return true;
}
/* ... */
/* RMDUPQUEUE likely doesn't require explicit thawing. */
static void
freeze_unroll_actions( /* ... */ )
{
/* ... */
switch (a->type) {
case OFPACT_RMDUPQUEUE:
/* These may not generate PACKET INs. */
break;
}
}
/* ... */
/* Naturally, don't need to recirculate since we don't change packets. */
static void
recirc_for_mpls(const struct ofpact *a, struct xlate_ctx *ctx)
{
/* ... */
switch (a->type) {
case OFPACT_RMDUPQUEUE:
default:
break;
}
}
九:datapath/flow_netlink.c:内核部分,是对参数长度和值的最后检查
static int __ovs_nla_copy_actions( /*...*/ )
{
/* ... */
static const u32 action_lens[OVS_ACTION_ATTR_MAX + 1] = {
/* ... */
[OVS_ACTION_ATTR_RMDUPQUEUE] = sizeof(struct ovs_action_rmdupqueue),
};
/* ... */
/* Be careful here, your compiler may not catch this one
* even with -Werror */
switch (type) {
/* ... */
case OVS_ACTION_ATTR_RMDUPQUEUE:
/* Finalest sanity checks in the kernel. */
break;
/* ... */
}
/* ... */
}
原文链接:
https://www.cnblogs.com/ssyfj/p/13620781.html
(免费订阅,永久学习)学习地址:
Dpdk/网络协议栈/vpp/OvS/DDos/NFV/虚拟化/高性能专家-学习视频教程-腾讯课堂