>
Home

登龙(DLonng)

选择大于努力

Linux 高级编程 - 消息队列 Msg Queue


版权声明:本文为 DLonng 原创文章,可以随意转载,但必须在明确位置注明出处!

消息队列 Msg Queue

如果你在 Windows 上开发过应用程序,想必你应该听过消息队列这个概念。在 Windows 中每个程序都有一个消息队列,整个程序在一个 loop 中等待从消息队列中取消息并执行,所以称 Windows 上的程序为事件驱动型。同样在 Linux 开发中也有消息队列这个概念,不过 Linux 中的消息队列是用来进行 IPC 的,本质上跟共享内存一样也是内存维护的一片内存区域。这篇文章带你学习消息队列的相关操作和内核机制。

系统中的消息队列

可以使用 ipcs -q 查看系统当前使用的消息队列(以下简称 MQ):

ipcs -q

------ Message Queues --------
key        msqid      owner      perms      used-bytes   messages

我的系统当前没有任何 MQ,后面我们会用程序创建一个 MQ,然后再用这个命令查看。

消息队列的原理

MQ 传递的是消息,消息即是我们需要在进程间传递的数据。MQ 采用链表来实现消息队列,该链表是由系统内核维护,系统中可能有很多的 MQ,每个 MQ 用消息队列描述符(消息队列 ID:qid)来区分,qid 是唯一的,用来区分不同的 MQ。在进行进程间通信时,一个进程将消息加到 MQ 尾端,另一个进程从消息队列中取消息(不一定以先进先出来取消息,也可以按照消息类型字段取消息),这样就实现了进程间的通信。如下 MQ 的模型:

msg queue

进程 A 向内核维护的消息队列中发消息,进程 B 从消息队列中取消息,从而实现了 A 和 B 的进程间通信。了解了原理,来看看如何使用 MQ。

使用消息队列

MQ 的 API 操作与共享内存几乎是相同的,分为下面 4 个步骤:

  1. 创建和访问 MQ
  2. 发送消息
  3. 接受消息
  4. 删除 MQ

来分别学习这些函数。

1. 创建:msgget

使用 msgget 可以创建一个消息队列,需要指定创建的 key 和标志,key 与返回的 qid 有关系。

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

/*
 * key:用来指定返回 MQ 的 ID
 * msgflg:创建的标志,例如 IPC_CREAT
 * return:成功返回队列 ID,失败返回 -1, 并设置 erron
 */
int msgget(key_t key, int msgflg);

2. 发送:msgsnd

使用 msgsnd 来发送一个消息,必须要有写消息队列的权限

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

/*
 * msgid:消息队列 ID
 * msgp:指向 msgbuf 的指针,用来指定发送的消息
 * msgsz:要发送消息的长度
 * msgflg:创建标记,如果指定 IPC_NOWAIT,失败会立刻返回
 * return:成功返回 0, 失败返回 -1, 并设置 erron
 */
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);

3. 发送:msgrcv

使用 msgrcv 来从 msgqid 标识的 MQ 中读取一个消息放到 msgp 指定的内存中,必须要有读消息队列的权限

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

/*
 * msgid:消息队列 ID
 * msgp:指向 msgbuf 的指针,用来接收消息
 * msgsz:要接收消息的长度
 * msgtyp:接收消息的方式
 * msgflg:创建标记,如果指定 IPC_NOWAIT,获取失败会立刻返回
 * return:成功返回实际读取消息的字节数, 失败返回 -1, 并设置 erron
 */
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);

注意:参数 msgsz 指定由 msgp 参数指向的结构的成员 mtext 的最大大小(以字节为单位),msgtyp 也有 3 种方式:

  1. msgtyp = 0:读取队列中的第一条消息
  2. msgtyp > 0:读取队列中类型为 msgtyp 的第一条消息,除非在 msgflg 中指定了 MSG_EXCEPT,否则将读取类型不等于 msgtyp 的队列中的第一条消息。
  3. msgtyp < 0:读取队列中最小类型小于或等于 msgtyp 绝对值的第一条消息

4. 删除及控制:msgctl

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

/*
 * msqid:消息队列 ID
 * cmd:控制命令,例如 IPC_RMID 删除命令
 * buf:存储消息队列的相关信息的 buf
 * return:成功根据不同的 cmd 有不同的返回值,失败返回 -1, 并设置 erron
 */
int msgctl(int msqid, int cmd, struct msqid_ds *buf);

例子:使用消息队列来进程间通信

我们也写两个程序来用 MQ 来进程间通信,一个进程 A 写入字符串数据到消息队列,另一个进程 B 从队列中取出数据。

write_msg.c

// write_msg.c

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>

struct msgbuf {
	long mtype;
	char mtext[255];
};

int main() {
	// 1. 创建一个消息队列,用 key = 123 来唯一表示这个队列
	int msg_id = msgget(123, IPC_CREAT | 0666);
	if (msg_id != -1) {

		// 2. 初始化要发生的消息
		struct msgbuf mybuf;
		mybuf.mtype = 1;
		strcpy(mybuf.mtext, "I'm send process.\n");

		// 3. 发送消息
		if (msgsnd(msg_id, &mybuf, sizeof(mybuf.mtext), 0))
			printf("success\n");			
		else
			perror("msgsnd:");
	} else {
		perror("msgget:");
	}

	return 0;
}

read_msg.c

这是读取进程:

// read_msg.c
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>

struct msgbuf {
	long mtype;
	char mtext[255];
};

int main() {
	// 1. 获取消息队列
	int msg_id = msgget(123, IPC_CREAT | 0666);
	if (msg_id != -1) {
		struct msgbuf mybuf;
		// 2. 接收第一条消息,存到 mybuf 中
		if (msgrcv(msg_id, &mybuf, sizeof(mybuf.mtext), 0, IPC_NOWAIT) != -1) {
			printf("read success: %s\n", mybuf.mtext);
			// 3. 接收完就删除这个消息队列
			if (msgctl(msg_id, IPC_RMID, 0) != -1)
				printf("delete msg success\n");
		} else {
			perror("msgsnd:");
		}

	} else {
		perror("msgget:");
	}

	return 0;
}

编译:

gcc write_msg.c -o write_msg
gcc read_msg.c -o read_msg

运行:

./write_msg

msgsnd:: success

消息队列创建成功,并且消息也发送了,我们再次用 ipcs -q 看看存不存在这个队列:

ipcs -q

------ Message Queues --------
key        msqid      owner      perms      used-bytes   messages
0x0000007b 32768      orange     666        255          1       

成功输出了我们创建的消息队列,并且大小等于 255 B,里面有一条我们刚才发送的消息,来读取这个消息:

./read_msg

read success: I'm send process.

delete msg success

消息接收成功,并且也删除了消息队列,看看有没有删除成功:

ipcs -q

------ Message Queues --------
key        msqid      owner      perms      used-bytes   messages

删除成功!一样,来看看消息队列在内核中的实现。

消息队列在内核中的实现

1. struct msg_queue

消息队列在内核中用下面的数据结构表示

// Linux 3.4: _msg_sm.h
struct msg_queue {
	struct list_head list_elem;
	struct msg_mgr *msg_mgr;
	u32 max_msgs;		/* Node message depth */
	u32 msgq_id;		/* Node environment pointer */
	struct list_head msg_free_list;	/* Free MsgFrames ready to be filled */

	/* Filled MsgFramess waiting to be read */
	struct list_head msg_used_list;
	void *arg;		/* Handle passed to mgr on_exit callback */
	struct sync_object *sync_event;	/* Signalled when message is ready */
	struct sync_object *sync_done;	/* For synchronizing cleanup */
	struct sync_object *sync_done_ack;	/* For synchronizing cleanup */
	struct ntfy_object *ntfy_obj;	/* For notification of message ready */
	bool done;		/* TRUE <==> deleting the object */
	u32 io_msg_pend;	/* Number of pending MSG_get/put calls */
};

可以看到消息队列实际上就是一个链表。

2. 分配获取消息队列

首先上层应用通过调用 msgget 来进行系统调用,然后陷入内核:

int	msgget(key, msgflg)
	key_t key;
	int msgflg;
{
	return INLINE_SYSCALL (ipc, 5, IPCOP_msgget, key, msgflg, 0, NULL);
}

消息队列的内核实现机制和共享内存几乎相同:在内核开辟一片内存空间存放消息队列,不同的进程使用这个消息队列(内存空间)来通信,与共享内存使用相同的一组回调函数 ipc_ops

msgget 的基本的调用过程如下:

msgget

比较重要的是最后一步回调 newque 函数,这个函数在内核中分配了一个消息队列 msg:

newque

3. 发送消息

上层的 msgsnd 系统调用最后会调用到内核的 do_msgsnd 函数,过程如下:

sendmsg

具体的过程这里就不分析了,主要就是在链表中添加一项。

4. 接受消息

上层的 msgrcv 系统调用最后会调用到内核的 do_msgrcv 函数,过程如下:

rcvmsg

同样接收消息会从链表中删除一项,具体的过程可以根据这个路线仔细分析。

结语

本次我们学习了跟共享内存类似的 IPC 方式:消息队列 Msg Queue,它的使用 API 和内核实现原理跟共享内存是差不多,建议你对照两者进行学习,以此来更好的理解。

感谢你的阅读,我们下次再见 :)

本文原创首发于微信公号「登龙」,分享机器学习、算法编程、Python、机器人技术等原创文章,扫码即可关注

DLonng at 08/07/17