消息队列
时间:2010-11-14 来源:liurhyme
消息队列是一个存放在内核中的消息链表,每个消息队列由消息队列标识符标识。
消息队列存放在内核中,只有重启内核(即操作系统重启)或者显示地删除一个消息队列时,该消息队列才会真正删除。
消息队列是随着内核的存在而存在的,每个消息队列在系统范围内对应唯一的键值。要获得消息队列的描述符,只需要提供该消息队列的键值即可。
基本概念
1.队列
队列是信息的线性表,它的访问次序是先进先出(FIFO)。也就是说,置入队列中的
第一个数据项将是从队列中第一次读出的数据项,置入的第二项将是读出的第二项,依
此类推。这是队列允许的唯一存取操作,其它随机访问是不允许的。这种数据结构保证
对数据资源的请求将严格按照先后顺序进行,因而可用于对事件的调度并起到I/O缓冲
的作用。
2.报文
发送进程和接收进程进行信息的交换,一般是通过将信息划分为若干段放入数据交
换缓冲器中,进程间通过对该缓冲器的存取来实现通信。因此,数据是以不连续的形式
在进程间传送,这些不连续的部分就叫报文。
3.消息队列
将报文按队列的结构进行组织就叫消息队列。该队列用于存放正被发送或接收的
每一个报文的标题信息。每一个消息队列还对应有一个数据结构,它含有消息队列的
存取权限,和消息队列的当前状态信息等信息。消息队列可进行"发送"和"接收"操作。
消息队列的编程要点及运作过程
1.消息队列的创建
在报文能够发送和接收之前,必须创建一个能够唯一被识别出的消息队列和数据结
构,这个被创建的唯一标识符叫做消息队列描述符(msqid),用来识别或引用相关的消
息队列和数据结构。用msgget(longkey,intmsgflg)系统调用来创建消息队列,其中
key是一个长整型,可由用户设定也可通过ftok()获得。msgflg的值是八进制的消息队
列操作权和控制命令的组合。操作权定义为:
操作允许权八进制整数
用户可读0400
用户可写0200
同组可读0040
同组可写0020
其它可读0004
其它可写0002
操作权可相加而派生,如用户可"读"、"写"的权限为0400|0200=0600。控制命令
可取IPC_CREAT或IPC_EXCL。如果要创建一个key=888且属主和同组可读写的消息队列,
执行以下系统调用msgget(0x888,0660|IPC_CREAT)。创建后可用ipcs命令看到以下信
息:
IPCstatusfrom/dev/memasofSun
Jan2506:49:521970
TIDKEYMODEOWNERGROUP
MessageQueues:
.q70x00000888--rw-rw
rootsystem
...
它的消息队列描述符是7,属主是root,同组是system,存取权是属主、用户可读写
。如果执行msgget(0x888,0660|IPC_CREAT)时,与0x888对应的消息队列已存在,则返
回该消息队列的描述符msqid。
2.消息的发送
消息队列一经创建即可用msgsnd(intmsqid,void*msgp,size_tmsgsz,intmsgflg)
发送消息。msgqid是经msgget创建的消息队列描述符,msgp是指向消息段的指针,该
指针所指结构含有报文类型和要发送或接收的报文:
structmsgbuf{
longmtype;/*消息类型*/
charmtext[512];
/*消息正文,512暂定为消息段的大小*/
}
msgsz是msgp参量指向的数据结构中字符数组的长度,即报文长度,最大值由MSGMAX
确定。msgflg是当消息队列满时(队列中无空闲空间),系统要采取的行动.如果
msgflg&IPC_NOWAIT=真,调用进程立即返回,不发送该消息。如果
msgflg&IPC_NOWAIT=假,调用进程暂停执行,处于"挂起"状态,且不发送该消息。直到
下列情况之一出现:
引起暂停的条件不再存在,如队列出现空闲,即可发送
该消系队列被从系统中删去
调用进程接收到一个要捕捉的信号,如中断信号,此时不发送消息,调用进程按
signal中描述的方式执行。
如果msgsnd返回0则发送成功。返回-1则表示发送失败,错误类型可具体查看
errno。
3.消息的接收
用msgrcv(intmsqid,void*msgp,size_tmsgsz,longmsgtyp,intmsgflg)系统调用从
msqid消息队列中读取一条信息并将其放入消息段指针msgp指向的结构。msgsz给出
mtext的字节数,如果所接收的消息比msgsz大且msgflg&MSG_NOERROR为真,则按msgsz
的大小截断而不通知调用进程。msgtyp指定要求的消息类型:
msgtyp=0接收消息队列中的第一个报文
msgtyp>0接收消息队列中的类型为msgtyp的第一个报文
msgtyp<0接收消息队列中小于等于msgtyp绝对值的最低类型的第一个报文
当队列上没有所期望类型的消息或消息队列为空时msgflg指出调用进程要采取的
行动:如果msgflg&IPC_NOWAIT为真,则调用进程立即结束并返回-1。如
msgflg&IPC_NOWAIT为假,则调用进程暂停执行直至出现:
队列中放入所需类型的消息,调用进程接收该消息
msqid消息队列从系统中删除
调用进程接收到捕获的信号,此时不接收消息,调用进程按signal描述的方式执行
。
如果msgrev执行成功,则返回放入mtext中的字节数,失败返回-1,错误类型可查
errno。
4.消息队列的控制和撤销
用msgctl(intmsqid,intcmd,structmsqid_ds*buf)系统调用实现对消息队列的控
制。msgqid必须是用msgget创建的消息队列描述符。cmd可以是:
IPC_STAT查看消息队列的状态,结果放入buf指针指向的结构
IPC_SET为消息队列设置属主标识,同组标识,操作允许权,最大字节数
IPC_RMID删除指定的msqid以及相关的消息队列和结构
四、编程示例
下面给出一个运用消息队列,实现进程通信的实例。以下程序在IBMRS/6000小型机
(AIX操作系统)上和IBMPC(UNIX操作系统)上分别调试通过。该程序主要模拟根据
帐号查询余额的过程。包括三方面:
请求进程从标准输入读入帐号,并将该帐号通过消息队列发送给服务进程;
服务进程接收该帐号后,按照请求的先后顺序在标准输入上输入该帐户的姓名和余额,
并将结果返回给客户进程;
请求进程接收返回的信息,并将结果输出在标准输出上。
服务进程(msgcenter)先于请求进程(msgreq)启动.客户进程启动时要携带请求编
号,可同时起动多个请求进程。
程序在HP-unix有点问题!! C,S都生成ID一样的消息队列
而且要加两个头文件:#include <stdlib.h> #include <string.h>
/*请求方程序msgreq.c*/
#include <unistd.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
static struct msgbuf1
{
long mtype;
char mtext[100];
} sndbuf, rcvbuf, *msgp ;
extern int errno;
int main(int argc, char **argv)
{
int rtrn, msqid ;
char name[10];
double balance;
if (argc!=2)
{
fprintf(stderr,"msgreq [01-99]\n"); exit(-1);
}
if ((msqid = msgget(0x888, IPC_CREAT|0660)) == -1 )
{
fprintf(stderr, "msgget 888 failed !\n");
//exit(-1);
}
msgp=&sndbuf;
sprintf(sndbuf.mtext,"%2.2s",argv[1]);
printf("输入4位帐号:");
scanf("%s",&sndbuf.mtext[2]);
sndbuf.mtext[6]=0;
msgp->mtype=666;
rtrn=msgsnd(msqid,msgp, strlen(sndbuf.mtext), 0);
if (rtrn==-1)
{
perror("msgsnd"); exit(-1);
}
msgp=&rcvbuf;
fprintf(stderr,"等待后台数据处理进程的回答....");
rtrn=msgrcv(msqid,msgp, 100, atoi(argv[1]), 0);
if(rtrn==-1)
{
perror("msgrcv"); exit(-1);
}
sscanf(rcvbuf.mtext,"%[^|]|%lf",name,&balance);
printf("\n姓名=%s\n",name);
printf("余额=%lf\n",balance);
}
/*服务方程序msgcenter.c*/
#include <unistd.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
static struct msgbuf1
{
long mtype;
char mtext[100];
} sndbuf, rcvbuf , *msgp;
extern int errno;
int main()
{
int rtrn, msqid ;
char strbuf[100];
if ( (msqid = msgget(0x888, IPC_CREAT|0600)) == -1 )
{
fprintf(stderr, "msgget 888 failed !\n"); exit(-1);
}
while(1)
{
msgp=&rcvbuf;
fprintf(stderr,"等待前台进程的请求....");
rtrn=msgrcv(msqid, msgp, 100, 666 ,MSG_NOERROR);
if(rtrn==-1)
{
perror("msgrcv");exit(-1);
}
msgp=&sndbuf;
sprintf(strbuf,"%2.2s\0",rcvbuf.mtext);
msgp->mtype=atoi(strbuf);
printf("\n输入帐号=%4.4s的帐户姓名:",&rcvbuf.mtext[2]);
scanf("%s",sndbuf.mtext);
strcat(sndbuf.mtext,"|");
printf("输入该帐户余额:");
scanf("%s",strbuf);
strcat(sndbuf.mtext,strbuf);
rtrn=msgsnd(msqid,msgp, strlen(sndbuf.mtext), 0);
if (rtrn==-1)
{
perror("msgsnd");
exit(-1);
}
}
}
eg.2
编写了012号与013号程序,分别是关于消息队列和共享内存的,在机器上也已经调试过了,现在把代码贴在下面:
/*********************程序相关信息*********************
程序编号:012
程序编写起始日期:2008.11.1
程序编写完成日期:2008.11.1
程序修改日期: 修改备注:
程序目的:学习linux消息队列通信
所用主要函数:msgget(),msgsnd(),msgrcv(),msgctl()
程序存疑:
程序完成地点: 宿舍内
*********************程序相关信息*********************/
#include<sys/types.h>
#include<sys/ipc.h>
#include<sys/msg.h>
#include<stdio.h>
#include<string.h>
#include<stdlib.h>
int main()
{
int pid,msqid;//后者为消息队列识别代号
struct msgbuf
{
long mtype;//消息类型
char mtext[20];//消息内容
}send_buf,receive_buf;
if((msqid=msgget(IPC_PRIVATE,0700))<0)//建立消息队列
{
printf("msgget建立消息队列失败。\n");
exit(1);
}
else
printf("msgget建立消息队列成功,该消息队列识别代号为%d。\n",msqid);
if((pid=fork())<0)
{
printf("fork()函数调用失败!\n");
exit(2);
}
else if(pid>0)//父进程,发送消息到消息队列
{
send_buf.mtype=1;
strcpy(send_buf.mtext,"My test information");
printf("发送到消息队列的信息内容为:%s\n",send_buf.mtext);
if(msgsnd(msqid,&send_buf,20,IPC_NOWAIT)<0)//发送send_buf中的信息到msqid对应的消息队列
{
printf("msgsnd消息发送失败。\n");
exit(3);
}
else
printf("msgsnd消息发送成功。\n");
sleep(2);
exit(0);
}
else//子进程,从消息队列中接收消息]
{
sleep(2);//等待父进程发送消息完成
int infolen;//读到的信息数据长度
if((infolen=msgrcv(msqid,&receive_buf,20,0,IPC_NOWAIT))<0)//自消息队列接收信息
{
printf("msgrcv读取信息错误。\n");
exit(4);
}
else
printf("msgrcv读取信息成功。\n");
printf("自消息队列读取到的内容为%s,共读取%d个字节。\n",receive_buf.mtext,infolen);
if((msgctl(msqid,IPC_RMID,NULL))<0)//删除msqid对应的消息队列
{
printf("msgctl函数调用出现错误。\n");
exit(5);
}
else
{
printf("识别代号为%d的消息队列已经被成功删除。\n",msqid);
exit(0);
}
}
}
/*********************程序运行结果*********************
[root@localhost temp]# ./msg
msgget建立消息队列成功,该消息队列识别代号为98304。
发送到消息队列的信息内容为:My test information
msgsnd消息发送成功。
msgrcv读取信息成功。
自消息队列读取到的内容为My test information,共读取20个字节。
识别代号为98304的消息队列已经被成功删除。
***********************************************************/
/*********************程序相关信息*********************
程序编号:013
程序编写起始日期:2008.11.1
程序编写完成日期:2008.11.1
程序修改日期: 修改备注:
程序目的:学习linux共享内存
所用主要函数:shmget(),shmat(),shmctl(),shmdt()
程序存疑:
程序完成地点: 宿舍内
*********************程序相关信息*********************/
#include<sys/ipc.h>
#include<sys/shm.h>
#include<stdlib.h>
#include<string.h>
#include<stdio.h>
int main()
{
int pid,shmid;//后者为共享内存识别代号
char *write_address;
char *read_address;
struct shmid_ds dsbuf;
if((shmid=shmget(IPC_PRIVATE,32,0))<0)//分配共享内存
{
printf("shmid共享内存分配出现错误。\n");
exit(1);
}
else
printf("shmid共享内存分配成功,共享内存识别代号为:%d。\n",shmid);
if((pid=fork())<0)
{
printf("fork函数调用出现错误!\n");
exit(2);
}
else if(pid>0)//父进程,向共享内存中写入数据
{
printf("父进程的ID是:%d\n",getpid());
write_address=(char *)shmat(shmid,NULL,0);//连接共享内存
if((int)write_address==-1)
{
printf("shmat连接共享内存错误。\n");
exit(3);
}
else
{
printf("shmat连接共享内存成功。\n");
strcpy(write_address,"我是写入共享内存的测试数据");//将数据写入共享内存
printf("写入共享内存的信息为“%s”。\n",write_address);
if((shmdt((void *)write_address))<0)//断开与共享内存的连接
printf("shmdt共享内存断开错误。\n");
else
printf("shmdt共享内存断开成功。\n");
sleep(2);
return;
}
}
else//子进程,从共享内存中读取数据
{
sleep(2);//等待父进程写入共享内存完毕
printf("子进程ID是:%d\n",getpid());
if((shmctl(shmid,IPC_STAT,&dsbuf))<0)
{
printf("shmctl获取共享内存数据结构出现错误。\n");
exit(4);
}
else
{
printf("shmctl获取共享内存数据结构成功。\n建立这个共享内存的进程ID是:%d\n",dsbuf.shm_cpid);
printf("该共享内存的大小为:%d\n",dsbuf.shm_segsz);
if((read_address=(char *)shmat(shmid,0,0))<0)//连接共享内存
{
printf("shmat连接共享内存出现错误。\n");
exit(5);
}
else
{
printf("自共享内存中读取的信息为:“%s”。\n",read_address);
printf("最后一个操作该共享内存的进程ID是:%d\n",dsbuf.shm_lpid);
if((shmdt((void *)read_address))<0)//断开与共享内存的连接
{
printf("shmdt共享内存断开错误。\n");
exit(6);
}
else
printf("shmdt共享内存断开成功。\n");
if(shmctl(shmid,IPC_RMID,NULL)<0)//删除共享内存及其数据结构
{
printf("shmctl删除共享内存及其数据结构出现错误。\n");
exit(7);
}
else
printf("shmctl删除共享内存及其数据结构成功。\n");
exit(0);
}
}
}
}
/*********************程序运行结果*********************
[root@localhost temp]# ./shm
shmid共享内存分配成功,共享内存识别代号为:1703947。
父进程的ID是:7647
shmat连接共享内存成功。
写入共享内存的信息为“我是写入共享内存的测试数据”。
shmdt共享内存断开成功。
子进程ID是:7648
shmctl获取共享内存数据结构成功。
建立这个共享内存的进程ID是:7647
该共享内存的大小为:32
自共享内存中读取的信息为:“我是写入共享内存的测试数据”。
最后一个操作该共享内存的进程ID是:7647
shmdt共享内存断开成功。
shmctl删除共享内存及其数据结构成功。
***********************************************************/