最近在研究进程间通信msg相关内容,此文是对所学的阶段性总结,首先看下应用层使用的例子
发送端
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <sys/msg.h>
#include <errno.h>
#define BUF_LEN 64
struct msg_st
{
long int msg_type;
char buf[BUF_LEN];
};
int main(int argc, char **argv)
{
struct msg_st data;
char buffer[BUFSIZ];
int msgid = -1;
// 建立消息队列
msgid = msgget((key_t)123, 0660 | IPC_CREAT);
if (msgid == -1)
{
printf("msgget failed error: %d\n", errno);
exit(EXIT_FAILURE);
}
int i_count = 0;
while (1)
{
memset(&data, 0, sizeof(data));
data.msg_type = 1;
sprintf(data.buf, "send %d", ++i_count);
// 向队列里发送数据
if (msgsnd(msgid, (void *)&data, BUF_LEN, 0) == -1)
{
printf("msgsnd failed\n");
exit(EXIT_FAILURE);
}
sleep(1);
}
exit(EXIT_SUCCESS);
}
接收端:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/msg.h>
#include <errno.h>
#define BUF_LEN 64
struct msg_st
{
long int msg_type;
char buf[BUF_LEN];
};
int main(int argc, char **argv)
{
int msgid = -1;
struct msg_st data;
long int msgtype = 0; // 接收所有类型消息
// 建立消息队列
msgid = msgget((key_t)123, 0660 | IPC_CREAT);
if (msgid == -1)
{
printf("msgget failed width error: %d\n", errno);
exit(EXIT_FAILURE);
}
while (1)
{
if (msgrcv(msgid, (void *)&data, BUF_LEN, msgtype, 0) == -1)
{
printf("msgrcv failed width erro: %d", errno);
}
printf("receive: %s\n", data.buf);
}
// 删除消息队列
if (msgctl(msgid, IPC_RMID, 0) == -1)
{
fprintf(stderr, "msgctl(IPC_RMID) failed\n");
}
exit(EXIT_SUCCESS);
}
下图是msg内核所涉及的数据结构:
- 创建msg队列
SYSCALL_DEFINE2(msgget, key_t, key, int, msgflg)
{
struct ipc_namespace *ns;
static const struct ipc_ops msg_ops = {
.getnew = newque,
.associate = msg_security,
};
struct ipc_params msg_params;
// 涉及的数据结构,都保存在ns上
ns = current->nsproxy->ipc_ns;
msg_params.key = key; // 对应上面的代码,即123
msg_params.flg = msgflg; // 0660 | IPC_CREAT
return ipcget(ns, &msg_ids(ns), &msg_ops, &msg_params);
}
int ipcget(struct ipc_namespace *ns, struct ipc_ids *ids,
const struct ipc_ops *ops, struct ipc_params *params)
{
if (params->key == IPC_PRIVATE)
return ipcget_new(ns, ids, ops, params);
else
// 没有设置IPC_PRIVATE标记,走此分支
return ipcget_public(ns, ids, ops, params);
}
static int ipcget_public(struct ipc_namespace *ns, struct ipc_ids *ids,
const struct ipc_ops *ops, struct ipc_params *params)
{
struct kern_ipc_perm *ipcp;
int flg = params->flg; // 0660 | IPC_CREAT
int err;
/*
* Take the lock as a writer since we are potentially going to add
* a new entry + read locks are not "upgradable"
*/
down_write(&ids->rwsem);
ipcp = ipc_findkey(ids, params->key);
if (ipcp == NULL) { // 第一次调用,走此分支
/* key not used */
if (!(flg & IPC_CREAT)) // 没有找到key为123的队列,且没有设置IPC_CREAT标记,返回错误
err = -ENOENT;
else
err = ops->getnew(ns, params); // getnew对应的是newque,见系统调用:msgget
} else {
/* ipc object has been locked by ipc_findkey() */
// msg队列已经存在,同时设置了IPC_CREAT和IPC_EXCL,返回错误
if (flg & IPC_CREAT && flg & IPC_EXCL)
err = -EEXIST;
else {
err = 0;
if (ops->more_checks)
err = ops->more_checks(ipcp, params);
if (!err)
/*
* ipc_check_perms returns the IPC id on
* success
*/
err = ipc_check_perms(ns, ipcp, ops, params);
}
ipc_unlock(ipcp);
}
up_write(&ids->rwsem);
return err;
}
static int newque(struct ipc_namespace *ns, struct ipc_params *params)
{
struct msg_queue *msq;
int retval;
key_t key = params->key; // msgget第一个参数:123
int msgflg = params->flg; // msgget第二个参数:0660 | IPC_CREAT
// 申请msg_queue对象
msq = kvmalloc(sizeof(*msq), GFP_KERNEL);
if (unlikely(!msq))
return -ENOMEM;
msq->q_perm.mode = msgflg & S_IRWXUGO;
msq->q_perm.key = key;
msq->q_perm.security = NULL;
retval = security_msg_queue_alloc(msq);
if (retval) {
kvfree(msq);
return retval;
}
// 初始化统计数据
msq->q_stime = msq->q_rtime = 0;
msq->q_ctime = ktime_get_real_seconds();
msq->q_cbytes = msq->q_qnum = 0;
msq->q_qbytes = ns->msg_ctlmnb;
msq->q_lspid = msq->q_lrpid = 0;
INIT_LIST_HEAD(&msq->q_messages);
INIT_LIST_HEAD(&msq->q_receivers);
INIT_LIST_HEAD(&msq->q_senders);
/* ipc_addid() locks msq upon success. */
// #define msg_ids(ns) ((ns)->ids[IPC_MSG_IDS])
// 查找可用的消息ID,重点关注第二个参数
retval = ipc_addid(&msg_ids(ns), &msq->q_perm, ns->msg_ctlmni);
if (retval < 0) {
call_rcu(&msq->q_perm.rcu, msg_rcu_free);
return retval;
}
ipc_unlock_object(&msq->q_perm);
rcu_read_unlock();
return msq->q_perm.id;
}
int ipc_addid(struct ipc_ids *ids, struct kern_ipc_perm *new, int limit)
{
kuid_t euid;
kgid_t egid;
int id, err;
if (limit > IPCMNI)
limit = IPCMNI;
// 如果已经初始化过
if (!ids->tables_initialized || ids->in_use >= limit)
return -ENOSPC;
// 禁用内核抢占
idr_preload(GFP_KERNEL);
// 引用计数
refcount_set(&new->refcount, 1);
spin_lock_init(&new->lock);
new->deleted = false;
rcu_read_lock();
spin_lock(&new->lock);
current_euid_egid(&euid, &egid);
new->cuid = new->uid = euid; // 用户id
new->gid = new->cgid = egid; // 组id
/*
核心逻辑在这里
#define ipc_idr_alloc(ids, new) \
idr_alloc(&(ids)->ipcs_idr, (new), 0, 0, GFP_NOWAIT)
ipc_idr_alloc中依次调用idr_alloc->idr_alloc_cmn
*/
id = ipc_idr_alloc(ids, new);
idr_preload_end();
// 非IPC_PRIVATE
if (id >= 0 && new->key != IPC_PRIVATE) {
err = rhashtable_insert_fast(&ids->key_ht, &new->khtnode,
ipc_kht_params);
if (err < 0) {
idr_remove(&ids->ipcs_idr, id);
id = err;
}
}
if (id < 0) {
spin_unlock(&new->lock);
rcu_read_unlock();
return id;
}
ids->in_use++;
if (id > ids->max_id)
ids->max_id = id;
new->id = ipc_buildid(id, ids, new);
return id;
}
/*
ptr 对应newque中动态申请的msg_queue结构中的kern_ipc_perm
*/
int idr_alloc_cmn(struct idr *idr, void *ptr, unsigned long *index,
unsigned long start, unsigned long end, gfp_t gfp,
bool ext)
{
struct radix_tree_iter iter;
void __rcu **slot;
if (WARN_ON_ONCE(radix_tree_is_internal_node(ptr)))
return -EINVAL;
radix_tree_iter_init(&iter, start);
if (ext)
slot = idr_get_free_ext(&idr->idr_rt, &iter, gfp, end);
else
/*
newque 走此分支
idr->idr_rt为基树radix_tree_root类型,此方法注要逻辑是搜索基树,查找最小的可用消息ID,并返回对应的slot(radix_tree_node类型)
基树的介绍,可以看下:https://zhuanlan.zhihu.com/p/533338300
*/
slot = idr_get_free(&idr->idr_rt, &iter, gfp, end);
if (IS_ERR(slot))
return PTR_ERR(slot);
// 注意这里,将slot替换为ptr(newque中动态申请的msg_queue结构中的kern_ipc_perm)
radix_tree_iter_replace(&idr->idr_rt, &iter, slot, ptr);
radix_tree_iter_tag_clear(&idr->idr_rt, &iter, IDR_FREE);
if (index)
*index = iter.index; // 查找到的消息ID
return 0;
}
总结,通过搜索current->nsproxy.ipc_ns.ids[1].ipcs_idr.idr_rt基树,查找已存在key或创建新的key。
- 发送消息
SYSCALL_DEFINE4(msgsnd, int, msqid, struct msgbuf __user *, msgp, size_t, msgsz,
int, msgflg)
{
long mtype; // 消息类型
if (get_user(mtype, &msgp->mtype))
return -EFAULT;
return do_msgsnd(msqid, mtype, msgp->mtext, msgsz, msgflg);
}
static long do_msgsnd(int msqid, long mtype, void __user *mtext,
size_t msgsz, int msgflg)
{
struct msg_queue *msq;
struct msg_msg *msg;
int err;
struct ipc_namespace *ns;
DEFINE_WAKE_Q(wake_q);
ns = current->nsproxy->ipc_ns;
// 对消息大小进行校验
if (msgsz > ns->msg_ctlmax || (long) msgsz < 0 || msqid < 0)
return -EINVAL;
if (mtype < 1)
return -EINVAL;
// 从用户空间加载消息内容
msg = load_msg(mtext, msgsz);
if (IS_ERR(msg))
return PTR_ERR(msg);
msg->m_type = mtype;
msg->m_ts = msgsz;
rcu_read_lock();
/*
msq_obtain_object_check,根据消息ID查找对应的kern_ipc_perm结构,再获取kern_ipc_perm所在的msg_queue
*/
msq = msq_obtain_object_check(ns, msqid);
if (IS_ERR(msq)) {
err = PTR_ERR(msq);
goto out_unlock1;
}
ipc_lock_object(&msq->q_perm);
for (;;) {
struct msg_sender s;
err = -EACCES;
// 检查权限
if (ipcperms(ns, &msq->q_perm, S_IWUGO))
goto out_unlock0;
/* raced with RMID? */
if (!ipc_valid_object(&msq->q_perm)) { // => !perm->deleted
err = -EIDRM;
goto out_unlock0;
}
err = security_msg_queue_msgsnd(msq, msg, msgflg);
if (err)
goto out_unlock0;
// 检查队列中数据的大小,和接收的消息的数量是否超限
if (msg_fits_inqueue(msq, msgsz))
break;
/* queue full, wait: */
if (msgflg & IPC_NOWAIT) {
err = -EAGAIN;
goto out_unlock0;
}
/* enqueue the sender and prepare to block */
ss_add(msq, &s, msgsz);
if (!ipc_rcu_getref(&msq->q_perm)) {
err = -EIDRM;
goto out_unlock0;
}
ipc_unlock_object(&msq->q_perm);
rcu_read_unlock();
schedule(); // 调度
rcu_read_lock();
ipc_lock_object(&msq->q_perm);
ipc_rcu_putref(&msq->q_perm, msg_rcu_free);
/* raced with RMID? */
if (!ipc_valid_object(&msq->q_perm)) {
err = -EIDRM;
goto out_unlock0;
}
ss_del(&s);
if (signal_pending(current)) {
err = -ERESTARTNOHAND;
goto out_unlock0;
}
}
msq->q_lspid = task_tgid_vnr(current);
msq->q_stime = get_seconds();
// 将消息添加到maq->q_receivers列表上
if (!pipelined_send(msq, msg, &wake_q)) {
/* no one is waiting for this message, enqueue it */
// 如果没有接收者,将消息添加到msq->q_messages上
list_add_tail(&msg->m_list, &msq->q_messages);
msq->q_cbytes += msgsz;
msq->q_qnum++;
atomic_add(msgsz, &ns->msg_bytes);
atomic_inc(&ns->msg_hdrs);
}
err = 0;
msg = NULL;
out_unlock0:
ipc_unlock_object(&msq->q_perm);
wake_up_q(&wake_q);
out_unlock1:
rcu_read_unlock();
if (msg != NULL)
free_msg(msg);
return err;
}
static inline struct msg_queue *msq_obtain_object_check(struct ipc_namespace *ns,
int id)
{
// 最终搜索idr结构的idr_rt基树,得到id对应的radix_tree_node(被替换为kern_ipc_perm,参考前面:radix_tree_iter_replace(&idr->idr_rt, &iter, slot, ptr); )
struct kern_ipc_perm *ipcp = ipc_obtain_object_check(&msg_ids(ns), id);
if (IS_ERR(ipcp))
return ERR_CAST(ipcp);
// 获取ipcp所在的msg_queue
return container_of(ipcp, struct msg_queue, q_perm);
}
总结:根据消息ID,在idr的idr_rt中搜索,找到对应的kern_ipc_perm,再找到msg_queue。如果有消息接收者(msg_queue的q_receivers的链表不为空,链接的是msg_receiver结构),将消息添加到msg_receiver的r_msg上;如果每有消息接收者,将消息添加到msg_queue的q_messages链表上。
- 接收msg
static long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgflg,
long (*msg_handler)(void __user *, struct msg_msg *, size_t))
{
int mode;
struct msg_queue *msq;
struct ipc_namespace *ns;
struct msg_msg *msg, *copy = NULL;
DEFINE_WAKE_Q(wake_q);
ns = current->nsproxy->ipc_ns;
if (msqid < 0 || (long) bufsz < 0)
return -EINVAL;
// 设置了复制标记
if (msgflg & MSG_COPY) {
if ((msgflg & MSG_EXCEPT) || !(msgflg & IPC_NOWAIT))
return -EINVAL;
copy = prepare_copy(buf, min_t(size_t, bufsz, ns->msg_ctlmax));
if (IS_ERR(copy))
return PTR_ERR(copy);
}
// 对消息类型进行转换
mode = convert_mode(&msgtyp, msgflg);
rcu_read_lock();
// 获取消息ID对应的msg_queue,与msgsnd一样的逻辑
msq = msq_obtain_object_check(ns, msqid);
if (IS_ERR(msq)) {
rcu_read_unlock();
free_copy(copy);
return PTR_ERR(msq);
}
for (;;) {
struct msg_receiver msr_d;
msg = ERR_PTR(-EACCES);
if (ipcperms(ns, &msq->q_perm, S_IRUGO))
goto out_unlock1;
ipc_lock_object(&msq->q_perm);
/* raced with RMID? */
if (!ipc_valid_object(&msq->q_perm)) {
msg = ERR_PTR(-EIDRM);
goto out_unlock0;
}
// 查找消息
msg = find_msg(msq, &msgtyp, mode);
if (!IS_ERR(msg)) {
/*
* Found a suitable message.
* Unlink it from the queue.
*/
// 用户缓冲区长度小于消息大小
if ((bufsz < msg->m_ts) && !(msgflg & MSG_NOERROR)) {
msg = ERR_PTR(-E2BIG);
goto out_unlock0;
}
/*
* If we are copying, then do not unlink message and do
* not update queue parameters.
* 如果设置了MSG_COPY,不将msg从列表中删除
*/
if (msgflg & MSG_COPY) {
msg = copy_msg(msg, copy);
goto out_unlock0;
}
// 将msg从q_messages链表中删除
list_del(&msg->m_list);
// 更新统计信息
msq->q_qnum--;
msq->q_rtime = get_seconds();
msq->q_lrpid = task_tgid_vnr(current);
msq->q_cbytes -= msg->m_ts;
atomic_sub(msg->m_ts, &ns->msg_bytes);
atomic_dec(&ns->msg_hdrs);
ss_wakeup(msq, &wake_q, false);
goto out_unlock0;
}
/* No message waiting. Wait for a message */
// 没有可读的消息
if (msgflg & IPC_NOWAIT) {
msg = ERR_PTR(-ENOMSG);
goto out_unlock0;
}
// 将msg_receiver连接到q_receivers链表上(参考pipelined_send的处理)
list_add_tail(&msr_d.r_list, &msq->q_receivers);
msr_d.r_tsk = current;
msr_d.r_msgtype = msgtyp;
msr_d.r_mode = mode;
if (msgflg & MSG_NOERROR)
msr_d.r_maxsize = INT_MAX;
else
msr_d.r_maxsize = bufsz;
msr_d.r_msg = ERR_PTR(-EAGAIN);
__set_current_state(TASK_INTERRUPTIBLE);
ipc_unlock_object(&msq->q_perm);
rcu_read_unlock();
schedule(); // 执行调度,让出cpu
/*
* Lockless receive, part 1:
* We don't hold a reference to the queue and getting a
* reference would defeat the idea of a lockless operation,
* thus the code relies on rcu to guarantee the existence of
* msq:
* Prior to destruction, expunge_all(-EIRDM) changes r_msg.
* Thus if r_msg is -EAGAIN, then the queue not yet destroyed.
*/
rcu_read_lock();
/*
* Lockless receive, part 2:
* The work in pipelined_send() and expunge_all():
* - Set pointer to message
* - Queue the receiver task for later wakeup
* - Wake up the process after the lock is dropped.
*
* Should the process wake up before this wakeup (due to a
* signal) it will either see the message and continue ...
*/
msg = READ_ONCE(msr_d.r_msg);
if (msg != ERR_PTR(-EAGAIN))
goto out_unlock1;
/*
* ... or see -EAGAIN, acquire the lock to check the message
* again.
*/
ipc_lock_object(&msq->q_perm);
msg = msr_d.r_msg;
if (msg != ERR_PTR(-EAGAIN))
goto out_unlock0;
// 将msg_receiver从q_receivers链表上删除
list_del(&msr_d.r_list);
// 检查当前进程是否有信号需要处理(不为0,说明有信号)
if (signal_pending(current)) {
msg = ERR_PTR(-ERESTARTNOHAND);
goto out_unlock0;
}
ipc_unlock_object(&msq->q_perm);
}
out_unlock0:
ipc_unlock_object(&msq->q_perm);
wake_up_q(&wake_q);
out_unlock1:
rcu_read_unlock();
if (IS_ERR(msg)) {
free_copy(copy);
return PTR_ERR(msg);
}
bufsz = msg_handler(buf, msg, bufsz);
free_msg(msg);
return bufsz;
}
static inline int convert_mode(long *msgtyp, int msgflg)
{
if (msgflg & MSG_COPY)
return SEARCH_NUMBER;
/*
* find message of correct type.
* msgtyp = 0 => get first.
* msgtyp > 0 => get first message of matching type.
* msgtyp < 0 => get message with least type must be < abs(msgtype).
*/
// 获取第一个消息,此时不区分消息类型
if (*msgtyp == 0)
return SEARCH_ANY;
// 查找消息类型小于等于msgtyp绝对值的消息
if (*msgtyp < 0) {
if (*msgtyp == LONG_MIN) /* -LONG_MIN is undefined */
*msgtyp = LONG_MAX;
else
*msgtyp = -*msgtyp;
return SEARCH_LESSEQUAL;
}
// 查找消息类型不等于msgtyp的消息
if (msgflg & MSG_EXCEPT)
return SEARCH_NOTEQUAL;
// 查找消息类型等于msgtyp的消息
return SEARCH_EQUAL;
}
static struct msg_msg *find_msg(struct msg_queue *msq, long *msgtyp, int mode)
{
struct msg_msg *msg, *found = NULL;
long count = 0;
// 没有接收者时,会将消息缓存到q_messages链表上,参考前面
list_for_each_entry(msg, &msq->q_messages, m_list) {
if (testmsg(msg, *msgtyp, mode) &&
!security_msg_queue_msgrcv(msq, msg, current,
*msgtyp, mode)) {
if (mode == SEARCH_LESSEQUAL && msg->m_type != 1) {
*msgtyp = msg->m_type - 1;
found = msg;
// 消息类型等于msgtyp
} else if (mode == SEARCH_NUMBER) {
if (*msgtyp == count)
return msg;
} else
return msg;
count++;
}
}
return found ?: ERR_PTR(-EAGAIN);
}
总结:如果msg_queue的q_messages链表上有消息,按类型获取消息;如果q_messages没有消息,或没有指定类型的消息,则将一个msg_receiver对象链接到q_receivers链表上,等待消息的到来。
版权声明:本文为weixin_46381158原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。