linux进程间通信msg的内核实现

  • Post author:
  • Post category:linux


最近在研究进程间通信msg相关内容,此文是对所学的阶段性总结,首先看下应用层使用的例子

发送端

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <sys/msg.h>
#include <errno.h>
 
#define BUF_LEN 64
 
struct msg_st
{
    long int msg_type;
    char buf[BUF_LEN];
};
 
int main(int argc, char **argv)
{
    struct msg_st data;
    char buffer[BUFSIZ];
    int msgid = -1;
 
    // 建立消息队列
    msgid = msgget((key_t)123, 0660 | IPC_CREAT);
    if (msgid == -1)
    {
        printf("msgget failed error: %d\n", errno);
        exit(EXIT_FAILURE);
    }
 
    int i_count = 0;
    while (1)
    {
        memset(&data, 0, sizeof(data));
        data.msg_type = 1;
        sprintf(data.buf, "send %d", ++i_count);
 
        // 向队列里发送数据
        if (msgsnd(msgid, (void *)&data, BUF_LEN, 0) == -1)
        {
            printf("msgsnd failed\n");
            exit(EXIT_FAILURE);
        }
 
        sleep(1);
    }
 
    exit(EXIT_SUCCESS);
}

接收端:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/msg.h>
#include <errno.h>
 
 #define BUF_LEN 64

struct msg_st
{
    long int msg_type;
    char buf[BUF_LEN];
};
 
int main(int argc, char **argv)
{
    int msgid = -1;
    struct msg_st data;
    long int msgtype = 0; // 接收所有类型消息
 
    // 建立消息队列
    msgid = msgget((key_t)123, 0660 | IPC_CREAT);
    if (msgid == -1)
    {
        printf("msgget failed width error: %d\n", errno);
        exit(EXIT_FAILURE);
    }
 
    while (1)
    {
        if (msgrcv(msgid, (void *)&data, BUF_LEN, msgtype, 0) == -1)
        {
            printf("msgrcv failed width erro: %d", errno);
        }
 
        printf("receive: %s\n", data.buf);
    }
 
    // 删除消息队列
    if (msgctl(msgid, IPC_RMID, 0) == -1)
    {
        fprintf(stderr, "msgctl(IPC_RMID) failed\n");
    }
 
    exit(EXIT_SUCCESS);
}

下图是msg内核所涉及的数据结构:

在这里插入图片描述

  1. 创建msg队列
SYSCALL_DEFINE2(msgget, key_t, key, int, msgflg)
{
	struct ipc_namespace *ns;
	static const struct ipc_ops msg_ops = {
		.getnew = newque,
		.associate = msg_security,
	};
	struct ipc_params msg_params;

	// 涉及的数据结构,都保存在ns上
	ns = current->nsproxy->ipc_ns;

	msg_params.key = key; // 对应上面的代码,即123
	msg_params.flg = msgflg; // 0660 | IPC_CREAT

	return ipcget(ns, &msg_ids(ns), &msg_ops, &msg_params);
}

int ipcget(struct ipc_namespace *ns, struct ipc_ids *ids,
			const struct ipc_ops *ops, struct ipc_params *params)
{
	if (params->key == IPC_PRIVATE)
		return ipcget_new(ns, ids, ops, params);
	else
		// 没有设置IPC_PRIVATE标记,走此分支
		return ipcget_public(ns, ids, ops, params);
}

static int ipcget_public(struct ipc_namespace *ns, struct ipc_ids *ids,
		const struct ipc_ops *ops, struct ipc_params *params)
{
	struct kern_ipc_perm *ipcp;
	int flg = params->flg; // 0660 | IPC_CREAT
	int err;

	/*
	 * Take the lock as a writer since we are potentially going to add
	 * a new entry + read locks are not "upgradable"
	 */
	down_write(&ids->rwsem);
	ipcp = ipc_findkey(ids, params->key);
	if (ipcp == NULL) { // 第一次调用,走此分支
		/* key not used */
		if (!(flg & IPC_CREAT)) // 没有找到key为123的队列,且没有设置IPC_CREAT标记,返回错误
			err = -ENOENT;
		else
			err = ops->getnew(ns, params); // getnew对应的是newque,见系统调用:msgget
	} else {
		/* ipc object has been locked by ipc_findkey() */

		// msg队列已经存在,同时设置了IPC_CREAT和IPC_EXCL,返回错误
		if (flg & IPC_CREAT && flg & IPC_EXCL)
			err = -EEXIST;
		else {
			err = 0;
			if (ops->more_checks)
				err = ops->more_checks(ipcp, params);
			if (!err)
				/*
				 * ipc_check_perms returns the IPC id on
				 * success
				 */
				err = ipc_check_perms(ns, ipcp, ops, params);
		}
		ipc_unlock(ipcp);
	}
	up_write(&ids->rwsem);

	return err;
}

static int newque(struct ipc_namespace *ns, struct ipc_params *params)
{
	struct msg_queue *msq;
	int retval;
	key_t key = params->key; // msgget第一个参数:123
	int msgflg = params->flg; // msgget第二个参数:0660 | IPC_CREAT

	// 申请msg_queue对象
	msq = kvmalloc(sizeof(*msq), GFP_KERNEL);
	if (unlikely(!msq))
		return -ENOMEM;

	msq->q_perm.mode = msgflg & S_IRWXUGO;
	msq->q_perm.key = key;

	msq->q_perm.security = NULL;
	retval = security_msg_queue_alloc(msq);
	if (retval) {
		kvfree(msq);
		return retval;
	}

	// 初始化统计数据
	msq->q_stime = msq->q_rtime = 0;
	msq->q_ctime = ktime_get_real_seconds();
	msq->q_cbytes = msq->q_qnum = 0;
	msq->q_qbytes = ns->msg_ctlmnb;
	msq->q_lspid = msq->q_lrpid = 0;
	INIT_LIST_HEAD(&msq->q_messages);
	INIT_LIST_HEAD(&msq->q_receivers);
	INIT_LIST_HEAD(&msq->q_senders);

	/* ipc_addid() locks msq upon success. */
	// #define msg_ids(ns)	((ns)->ids[IPC_MSG_IDS])
	// 查找可用的消息ID,重点关注第二个参数
	retval = ipc_addid(&msg_ids(ns), &msq->q_perm, ns->msg_ctlmni);
	if (retval < 0) {
		call_rcu(&msq->q_perm.rcu, msg_rcu_free);
		return retval;
	}

	ipc_unlock_object(&msq->q_perm);
	rcu_read_unlock();

	return msq->q_perm.id;
}

int ipc_addid(struct ipc_ids *ids, struct kern_ipc_perm *new, int limit)
{
	kuid_t euid;
	kgid_t egid;
	int id, err;

	if (limit > IPCMNI)
		limit = IPCMNI;
	// 如果已经初始化过
	if (!ids->tables_initialized || ids->in_use >= limit)
		return -ENOSPC;

	// 禁用内核抢占
	idr_preload(GFP_KERNEL);

	// 引用计数
	refcount_set(&new->refcount, 1);
	spin_lock_init(&new->lock);
	new->deleted = false;
	rcu_read_lock();
	spin_lock(&new->lock);

	current_euid_egid(&euid, &egid);
	new->cuid = new->uid = euid; // 用户id
	new->gid = new->cgid = egid; // 组id

	/* 
	核心逻辑在这里
	#define ipc_idr_alloc(ids, new)					\
	idr_alloc(&(ids)->ipcs_idr, (new), 0, 0, GFP_NOWAIT)
	ipc_idr_alloc中依次调用idr_alloc->idr_alloc_cmn
	*/
	id = ipc_idr_alloc(ids, new);
	idr_preload_end();

	// 非IPC_PRIVATE
	if (id >= 0 && new->key != IPC_PRIVATE) {
		err = rhashtable_insert_fast(&ids->key_ht, &new->khtnode,
					     ipc_kht_params);
		if (err < 0) {
			idr_remove(&ids->ipcs_idr, id);
			id = err;
		}
	}
	if (id < 0) {
		spin_unlock(&new->lock);
		rcu_read_unlock();
		return id;
	}

	ids->in_use++;
	if (id > ids->max_id)
		ids->max_id = id;

	new->id = ipc_buildid(id, ids, new);

	return id;
}

/*
ptr		对应newque中动态申请的msg_queue结构中的kern_ipc_perm
*/
int idr_alloc_cmn(struct idr *idr, void *ptr, unsigned long *index,
		  unsigned long start, unsigned long end, gfp_t gfp,
		  bool ext)
{
	struct radix_tree_iter iter;
	void __rcu **slot;

	if (WARN_ON_ONCE(radix_tree_is_internal_node(ptr)))
		return -EINVAL;

	radix_tree_iter_init(&iter, start);
	if (ext)
		slot = idr_get_free_ext(&idr->idr_rt, &iter, gfp, end);
	else
		/*
		newque 走此分支
		idr->idr_rt为基树radix_tree_root类型,此方法注要逻辑是搜索基树,查找最小的可用消息ID,并返回对应的slot(radix_tree_node类型)
		基树的介绍,可以看下:https://zhuanlan.zhihu.com/p/533338300
		*/
		slot = idr_get_free(&idr->idr_rt, &iter, gfp, end);
	if (IS_ERR(slot))
		return PTR_ERR(slot);

	// 注意这里,将slot替换为ptr(newque中动态申请的msg_queue结构中的kern_ipc_perm)
	radix_tree_iter_replace(&idr->idr_rt, &iter, slot, ptr);
	radix_tree_iter_tag_clear(&idr->idr_rt, &iter, IDR_FREE);

	if (index)
		*index = iter.index; // 查找到的消息ID
	return 0;
}

总结,通过搜索current->nsproxy.ipc_ns.ids[1].ipcs_idr.idr_rt基树,查找已存在key或创建新的key。

  1. 发送消息
SYSCALL_DEFINE4(msgsnd, int, msqid, struct msgbuf __user *, msgp, size_t, msgsz,
		int, msgflg)
{
	long mtype; // 消息类型

	if (get_user(mtype, &msgp->mtype))
		return -EFAULT;
	return do_msgsnd(msqid, mtype, msgp->mtext, msgsz, msgflg);
}

static long do_msgsnd(int msqid, long mtype, void __user *mtext,
		size_t msgsz, int msgflg)
{
	struct msg_queue *msq;
	struct msg_msg *msg;
	int err;
	struct ipc_namespace *ns;
	DEFINE_WAKE_Q(wake_q);

	ns = current->nsproxy->ipc_ns;

	// 对消息大小进行校验
	if (msgsz > ns->msg_ctlmax || (long) msgsz < 0 || msqid < 0)
		return -EINVAL;
	if (mtype < 1)
		return -EINVAL;

	// 从用户空间加载消息内容
	msg = load_msg(mtext, msgsz);
	if (IS_ERR(msg))
		return PTR_ERR(msg);

	msg->m_type = mtype;
	msg->m_ts = msgsz;

	rcu_read_lock();
	/*
	msq_obtain_object_check,根据消息ID查找对应的kern_ipc_perm结构,再获取kern_ipc_perm所在的msg_queue
	*/
	msq = msq_obtain_object_check(ns, msqid);
	if (IS_ERR(msq)) {
		err = PTR_ERR(msq);
		goto out_unlock1;
	}

	ipc_lock_object(&msq->q_perm);

	for (;;) {
		struct msg_sender s;

		err = -EACCES;
		// 检查权限
		if (ipcperms(ns, &msq->q_perm, S_IWUGO))
			goto out_unlock0;

		/* raced with RMID? */
		if (!ipc_valid_object(&msq->q_perm)) { // => !perm->deleted
			err = -EIDRM;
			goto out_unlock0;
		}

		err = security_msg_queue_msgsnd(msq, msg, msgflg);
		if (err)
			goto out_unlock0;

		// 检查队列中数据的大小,和接收的消息的数量是否超限
		if (msg_fits_inqueue(msq, msgsz))
			break;

		/* queue full, wait: */
		if (msgflg & IPC_NOWAIT) {
			err = -EAGAIN;
			goto out_unlock0;
		}

		/* enqueue the sender and prepare to block */
		ss_add(msq, &s, msgsz);

		if (!ipc_rcu_getref(&msq->q_perm)) {
			err = -EIDRM;
			goto out_unlock0;
		}

		ipc_unlock_object(&msq->q_perm);
		rcu_read_unlock();
		schedule(); // 调度

		rcu_read_lock();
		ipc_lock_object(&msq->q_perm);

		ipc_rcu_putref(&msq->q_perm, msg_rcu_free);
		/* raced with RMID? */
		if (!ipc_valid_object(&msq->q_perm)) {
			err = -EIDRM;
			goto out_unlock0;
		}
		ss_del(&s);

		if (signal_pending(current)) {
			err = -ERESTARTNOHAND;
			goto out_unlock0;
		}

	}

	msq->q_lspid = task_tgid_vnr(current);
	msq->q_stime = get_seconds();

	// 将消息添加到maq->q_receivers列表上
	if (!pipelined_send(msq, msg, &wake_q)) {
		/* no one is waiting for this message, enqueue it */
		// 如果没有接收者,将消息添加到msq->q_messages上
		list_add_tail(&msg->m_list, &msq->q_messages);
		msq->q_cbytes += msgsz;
		msq->q_qnum++;
		atomic_add(msgsz, &ns->msg_bytes);
		atomic_inc(&ns->msg_hdrs);
	}

	err = 0;
	msg = NULL;

out_unlock0:
	ipc_unlock_object(&msq->q_perm);
	wake_up_q(&wake_q);
out_unlock1:
	rcu_read_unlock();
	if (msg != NULL)
		free_msg(msg);
	return err;
}

static inline struct msg_queue *msq_obtain_object_check(struct ipc_namespace *ns,
							int id)
{
	// 最终搜索idr结构的idr_rt基树,得到id对应的radix_tree_node(被替换为kern_ipc_perm,参考前面:radix_tree_iter_replace(&idr->idr_rt, &iter, slot, ptr); )
	struct kern_ipc_perm *ipcp = ipc_obtain_object_check(&msg_ids(ns), id);

	if (IS_ERR(ipcp))
		return ERR_CAST(ipcp);

	// 获取ipcp所在的msg_queue
	return container_of(ipcp, struct msg_queue, q_perm);
}

总结:根据消息ID,在idr的idr_rt中搜索,找到对应的kern_ipc_perm,再找到msg_queue。如果有消息接收者(msg_queue的q_receivers的链表不为空,链接的是msg_receiver结构),将消息添加到msg_receiver的r_msg上;如果每有消息接收者,将消息添加到msg_queue的q_messages链表上。

  1. 接收msg
static long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgflg,
	       long (*msg_handler)(void __user *, struct msg_msg *, size_t))
{
	int mode;
	struct msg_queue *msq;
	struct ipc_namespace *ns;
	struct msg_msg *msg, *copy = NULL;
	DEFINE_WAKE_Q(wake_q);

	ns = current->nsproxy->ipc_ns;

	if (msqid < 0 || (long) bufsz < 0)
		return -EINVAL;

	// 设置了复制标记
	if (msgflg & MSG_COPY) {
		if ((msgflg & MSG_EXCEPT) || !(msgflg & IPC_NOWAIT))
			return -EINVAL;
		copy = prepare_copy(buf, min_t(size_t, bufsz, ns->msg_ctlmax));
		if (IS_ERR(copy))
			return PTR_ERR(copy);
	}

	// 对消息类型进行转换
	mode = convert_mode(&msgtyp, msgflg);

	rcu_read_lock();
	// 获取消息ID对应的msg_queue,与msgsnd一样的逻辑
	msq = msq_obtain_object_check(ns, msqid);
	if (IS_ERR(msq)) {
		rcu_read_unlock();
		free_copy(copy);
		return PTR_ERR(msq);
	}

	for (;;) {
		struct msg_receiver msr_d;

		msg = ERR_PTR(-EACCES);
		if (ipcperms(ns, &msq->q_perm, S_IRUGO))
			goto out_unlock1;

		ipc_lock_object(&msq->q_perm);

		/* raced with RMID? */
		if (!ipc_valid_object(&msq->q_perm)) {
			msg = ERR_PTR(-EIDRM);
			goto out_unlock0;
		}

		// 查找消息
		msg = find_msg(msq, &msgtyp, mode);
		if (!IS_ERR(msg)) {
			/*
			 * Found a suitable message.
			 * Unlink it from the queue.
			 */
			// 用户缓冲区长度小于消息大小
			if ((bufsz < msg->m_ts) && !(msgflg & MSG_NOERROR)) {
				msg = ERR_PTR(-E2BIG);
				goto out_unlock0;
			}
			/*
			 * If we are copying, then do not unlink message and do
			 * not update queue parameters.
			 * 如果设置了MSG_COPY,不将msg从列表中删除
			 */
			if (msgflg & MSG_COPY) {
				msg = copy_msg(msg, copy);
				goto out_unlock0;
			}

			// 将msg从q_messages链表中删除
			list_del(&msg->m_list);
			// 更新统计信息
			msq->q_qnum--;
			msq->q_rtime = get_seconds();
			msq->q_lrpid = task_tgid_vnr(current);
			msq->q_cbytes -= msg->m_ts;
			atomic_sub(msg->m_ts, &ns->msg_bytes);
			atomic_dec(&ns->msg_hdrs);
			ss_wakeup(msq, &wake_q, false);

			goto out_unlock0;
		}

		/* No message waiting. Wait for a message */
		// 没有可读的消息
		if (msgflg & IPC_NOWAIT) {
			msg = ERR_PTR(-ENOMSG);
			goto out_unlock0;
		}

		// 将msg_receiver连接到q_receivers链表上(参考pipelined_send的处理)
		list_add_tail(&msr_d.r_list, &msq->q_receivers);
		msr_d.r_tsk = current;
		msr_d.r_msgtype = msgtyp;
		msr_d.r_mode = mode;
		if (msgflg & MSG_NOERROR)
			msr_d.r_maxsize = INT_MAX;
		else
			msr_d.r_maxsize = bufsz;
		msr_d.r_msg = ERR_PTR(-EAGAIN);
		__set_current_state(TASK_INTERRUPTIBLE);

		ipc_unlock_object(&msq->q_perm);
		rcu_read_unlock();
		schedule(); // 执行调度,让出cpu

		/*
		 * Lockless receive, part 1:
		 * We don't hold a reference to the queue and getting a
		 * reference would defeat the idea of a lockless operation,
		 * thus the code relies on rcu to guarantee the existence of
		 * msq:
		 * Prior to destruction, expunge_all(-EIRDM) changes r_msg.
		 * Thus if r_msg is -EAGAIN, then the queue not yet destroyed.
		 */
		rcu_read_lock();

		/*
		 * Lockless receive, part 2:
		 * The work in pipelined_send() and expunge_all():
		 * - Set pointer to message
		 * - Queue the receiver task for later wakeup
		 * - Wake up the process after the lock is dropped.
		 *
		 * Should the process wake up before this wakeup (due to a
		 * signal) it will either see the message and continue ...
		 */
		msg = READ_ONCE(msr_d.r_msg);
		if (msg != ERR_PTR(-EAGAIN))
			goto out_unlock1;

		 /*
		  * ... or see -EAGAIN, acquire the lock to check the message
		  * again.
		  */
		ipc_lock_object(&msq->q_perm);

		msg = msr_d.r_msg;
		if (msg != ERR_PTR(-EAGAIN))
			goto out_unlock0;

		// 将msg_receiver从q_receivers链表上删除
		list_del(&msr_d.r_list);

		// 检查当前进程是否有信号需要处理(不为0,说明有信号)
		if (signal_pending(current)) {
			msg = ERR_PTR(-ERESTARTNOHAND);
			goto out_unlock0;
		}

		ipc_unlock_object(&msq->q_perm);
	}

out_unlock0:
	ipc_unlock_object(&msq->q_perm);
	wake_up_q(&wake_q);
out_unlock1:
	rcu_read_unlock();
	if (IS_ERR(msg)) {
		free_copy(copy);
		return PTR_ERR(msg);
	}

	bufsz = msg_handler(buf, msg, bufsz);
	free_msg(msg);

	return bufsz;
}

static inline int convert_mode(long *msgtyp, int msgflg)
{
	if (msgflg & MSG_COPY)
		return SEARCH_NUMBER;
	/*
	 *  find message of correct type.
	 *  msgtyp = 0 => get first.
	 *  msgtyp > 0 => get first message of matching type.
	 *  msgtyp < 0 => get message with least type must be < abs(msgtype).
	 */
	// 获取第一个消息,此时不区分消息类型
	if (*msgtyp == 0)
		return SEARCH_ANY;
	// 查找消息类型小于等于msgtyp绝对值的消息
	if (*msgtyp < 0) {
		if (*msgtyp == LONG_MIN) /* -LONG_MIN is undefined */
			*msgtyp = LONG_MAX;
		else
			*msgtyp = -*msgtyp;
		return SEARCH_LESSEQUAL;
	}
	// 查找消息类型不等于msgtyp的消息
	if (msgflg & MSG_EXCEPT)
		return SEARCH_NOTEQUAL;
	// 查找消息类型等于msgtyp的消息
	return SEARCH_EQUAL;
}

static struct msg_msg *find_msg(struct msg_queue *msq, long *msgtyp, int mode)
{
	struct msg_msg *msg, *found = NULL;
	long count = 0;

	// 没有接收者时,会将消息缓存到q_messages链表上,参考前面
	list_for_each_entry(msg, &msq->q_messages, m_list) {
		if (testmsg(msg, *msgtyp, mode) &&
		    !security_msg_queue_msgrcv(msq, msg, current,
					       *msgtyp, mode)) {
			if (mode == SEARCH_LESSEQUAL && msg->m_type != 1) {
				*msgtyp = msg->m_type - 1;
				found = msg;
			// 消息类型等于msgtyp
			} else if (mode == SEARCH_NUMBER) {
				if (*msgtyp == count)
					return msg;
			} else
				return msg;
			count++;
		}
	}

	return found ?: ERR_PTR(-EAGAIN);
}

总结:如果msg_queue的q_messages链表上有消息,按类型获取消息;如果q_messages没有消息,或没有指定类型的消息,则将一个msg_receiver对象链接到q_receivers链表上,等待消息的到来。



版权声明:本文为weixin_46381158原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。