文章详情

  • 游戏榜单
  • 软件榜单
关闭导航
热搜榜
热门下载
热门标签
php爱好者> php文档>消息队列

消息队列

时间:2010-11-14  来源:liurhyme

                           消息队列

   消息队列是一个存放在内核中的消息链表,每个消息队列由消息队列标识符标识。
   消息队列存放在内核中,只有重启内核(即操作系统重启)或者显示地删除一个消息队列时,该消息队列才会真正删除。
   消息队列是随着内核的存在而存在的,每个消息队列在系统范围内对应唯一的键值。要获得消息队列的描述符,只需要提供该消息队列的键值即可。

基本概念

1.队列

     队列是信息的线性表,它的访问次序是先进先出(FIFO)。也就是说,置入队列中的

第一个数据项将是从队列中第一次读出的数据项,置入的第二项将是读出的第二项,依

此类推。这是队列允许的唯一存取操作,其它随机访问是不允许的。这种数据结构保证

对数据资源的请求将严格按照先后顺序进行,因而可用于对事件的调度并起到I/O缓冲

的作用。

2.报文

     发送进程和接收进程进行信息的交换,一般是通过将信息划分为若干段放入数据交

换缓冲器中,进程间通过对该缓冲器的存取来实现通信。因此,数据是以不连续的形式

在进程间传送,这些不连续的部分就叫报文。

3.消息队列

     将报文按队列的结构进行组织就叫消息队列。该队列用于存放正被发送或接收的

每一个报文的标题信息。每一个消息队列还对应有一个数据结构,它含有消息队列的

存取权限,和消息队列的当前状态信息等信息。消息队列可进行"发送"和"接收"操作。

消息队列的编程要点及运作过程

1.消息队列的创建

     在报文能够发送和接收之前,必须创建一个能够唯一被识别出的消息队列和数据结

构,这个被创建的唯一标识符叫做消息队列描述符(msqid),用来识别或引用相关的消

息队列和数据结构。用msgget(longkey,intmsgflg)系统调用来创建消息队列,其中

key是一个长整型,可由用户设定也可通过ftok()获得。msgflg的值是八进制的消息队

列操作权和控制命令的组合。操作权定义为:

操作允许权八进制整数

用户可读0400

用户可写0200

同组可读0040

同组可写0020

其它可读0004

其它可写0002


     操作权可相加而派生,如用户可"读"、"写"的权限为0400|0200=0600。控制命令

可取IPC_CREAT或IPC_EXCL。如果要创建一个key=888且属主和同组可读写的消息队列,

执行以下系统调用msgget(0x888,0660|IPC_CREAT)。创建后可用ipcs命令看到以下信

息:


IPCstatusfrom/dev/memasofSun

Jan2506:49:521970

TIDKEYMODEOWNERGROUP

MessageQueues:

.q70x00000888--rw-rw    

rootsystem

...


     它的消息队列描述符是7,属主是root,同组是system,存取权是属主、用户可读写

。如果执行msgget(0x888,0660|IPC_CREAT)时,与0x888对应的消息队列已存在,则返

回该消息队列的描述符msqid。
     2.消息的发送

     消息队列一经创建即可用msgsnd(intmsqid,void*msgp,size_tmsgsz,intmsgflg)

发送消息。msgqid是经msgget创建的消息队列描述符,msgp是指向消息段的指针,该

指针所指结构含有报文类型和要发送或接收的报文:

structmsgbuf{

longmtype;/*消息类型*/

charmtext[512];

/*消息正文,512暂定为消息段的大小*/

}


     msgsz是msgp参量指向的数据结构中字符数组的长度,即报文长度,最大值由MSGMAX

确定。msgflg是当消息队列满时(队列中无空闲空间),系统要采取的行动.如果

msgflg&IPC_NOWAIT=真,调用进程立即返回,不发送该消息。如果

msgflg&IPC_NOWAIT=假,调用进程暂停执行,处于"挂起"状态,且不发送该消息。直到

下列情况之一出现:
     引起暂停的条件不再存在,如队列出现空闲,即可发送
     该消系队列被从系统中删去
     调用进程接收到一个要捕捉的信号,如中断信号,此时不发送消息,调用进程按

signal中描述的方式执行。

     如果msgsnd返回0则发送成功。返回-1则表示发送失败,错误类型可具体查看

errno。

     3.消息的接收

     用msgrcv(intmsqid,void*msgp,size_tmsgsz,longmsgtyp,intmsgflg)系统调用从

msqid消息队列中读取一条信息并将其放入消息段指针msgp指向的结构。msgsz给出

mtext的字节数,如果所接收的消息比msgsz大且msgflg&MSG_NOERROR为真,则按msgsz

的大小截断而不通知调用进程。msgtyp指定要求的消息类型:

     msgtyp=0接收消息队列中的第一个报文
     msgtyp>0接收消息队列中的类型为msgtyp的第一个报文
     msgtyp<0接收消息队列中小于等于msgtyp绝对值的最低类型的第一个报文

     当队列上没有所期望类型的消息或消息队列为空时msgflg指出调用进程要采取的

行动:如果msgflg&IPC_NOWAIT为真,则调用进程立即结束并返回-1。如

msgflg&IPC_NOWAIT为假,则调用进程暂停执行直至出现:

     队列中放入所需类型的消息,调用进程接收该消息
     msqid消息队列从系统中删除
     调用进程接收到捕获的信号,此时不接收消息,调用进程按signal描述的方式执行

     如果msgrev执行成功,则返回放入mtext中的字节数,失败返回-1,错误类型可查

errno。

     4.消息队列的控制和撤销

     用msgctl(intmsqid,intcmd,structmsqid_ds*buf)系统调用实现对消息队列的控

制。msgqid必须是用msgget创建的消息队列描述符。cmd可以是:

     IPC_STAT查看消息队列的状态,结果放入buf指针指向的结构
     IPC_SET为消息队列设置属主标识,同组标识,操作允许权,最大字节数
     IPC_RMID删除指定的msqid以及相关的消息队列和结构

四、编程示例

     下面给出一个运用消息队列,实现进程通信的实例。以下程序在IBMRS/6000小型机

(AIX操作系统)上和IBMPC(UNIX操作系统)上分别调试通过。该程序主要模拟根据

帐号查询余额的过程。包括三方面:

请求进程从标准输入读入帐号,并将该帐号通过消息队列发送给服务进程;

服务进程接收该帐号后,按照请求的先后顺序在标准输入上输入该帐户的姓名和余额,

并将结果返回给客户进程;

请求进程接收返回的信息,并将结果输出在标准输出上。
     服务进程(msgcenter)先于请求进程(msgreq)启动.客户进程启动时要携带请求编

号,可同时起动多个请求进程。

程序在HP-unix有点问题!! C,S都生成ID一样的消息队列
而且要加两个头文件:#include <stdlib.h> #include <string.h> 
/*请求方程序msgreq.c*/

#include <unistd.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
static struct msgbuf1
{
     long mtype;
     char mtext[100];
} sndbuf, rcvbuf, *msgp ;

extern int errno;

int   main(int argc, char **argv) 

     int rtrn, msqid ;
     char name[10];
     double balance;
    
     if (argc!=2)
     { 
      fprintf(stderr,"msgreq [01-99]\n"); exit(-1); 
     }
     if ((msqid = msgget(0x888, IPC_CREAT|0660)) == -1 )
     {
        fprintf(stderr, "msgget 888 failed !\n"); 
        //exit(-1);
     }
     msgp=&sndbuf;

     sprintf(sndbuf.mtext,"%2.2s",argv[1]);
     printf("输入4位帐号:"); 
     scanf("%s",&sndbuf.mtext[2]);

     sndbuf.mtext[6]=0;
     msgp->mtype=666;
     rtrn=msgsnd(msqid,msgp, strlen(sndbuf.mtext), 0);
     if (rtrn==-1)
     {
         perror("msgsnd"); exit(-1);
      }
     msgp=&rcvbuf;
    
     fprintf(stderr,"等待后台数据处理进程的回答....");

     rtrn=msgrcv(msqid,msgp, 100, atoi(argv[1]), 0);
     if(rtrn==-1)
     { 
      perror("msgrcv"); exit(-1);
     }

     sscanf(rcvbuf.mtext,"%[^|]|%lf",name,&balance);
     printf("\n姓名=%s\n",name); 
     printf("余额=%lf\n",balance);
}

/*服务方程序msgcenter.c*/
#include <unistd.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

static struct msgbuf1
{
     long mtype;
     char mtext[100];
} sndbuf, rcvbuf , *msgp;

extern int errno;

int main()

     int rtrn, msqid ;
     char strbuf[100];
    
     if ( (msqid = msgget(0x888, IPC_CREAT|0600)) == -1 )
     {
         fprintf(stderr, "msgget 888 failed !\n"); exit(-1);
     }

     while(1) 
     {
         msgp=&rcvbuf;
         fprintf(stderr,"等待前台进程的请求....");
        
         rtrn=msgrcv(msqid, msgp, 100, 666 ,MSG_NOERROR);   
         if(rtrn==-1)
         { 
             perror("msgrcv");exit(-1); 
         }
         msgp=&sndbuf;
         sprintf(strbuf,"%2.2s\0",rcvbuf.mtext);
         msgp->mtype=atoi(strbuf);
         printf("\n输入帐号=%4.4s的帐户姓名:",&rcvbuf.mtext[2]); 
        
         scanf("%s",sndbuf.mtext);
         strcat(sndbuf.mtext,"|");
         printf("输入该帐户余额:"); 
         scanf("%s",strbuf);
         strcat(sndbuf.mtext,strbuf);
         rtrn=msgsnd(msqid,msgp, strlen(sndbuf.mtext), 0);
         if (rtrn==-1)
         {
             perror("msgsnd"); 
             exit(-1);
         }
     }
}

eg.2

编写了012号与013号程序,分别是关于消息队列和共享内存的,在机器上也已经调试过了,现在把代码贴在下面:

/*********************程序相关信息*********************
程序编号:012
程序编写起始日期:2008.11.1
程序编写完成日期:2008.11.1
程序修改日期:                                   修改备注:
程序目的:学习linux消息队列通信
所用主要函数:msgget(),msgsnd(),msgrcv(),msgctl()
程序存疑:
程序完成地点: 宿舍内
*********************程序相关信息*********************/
#include<sys/types.h>
#include<sys/ipc.h>
#include<sys/msg.h>
#include<stdio.h>
#include<string.h>
#include<stdlib.h>
int main()
{
    int pid,msqid;//后者为消息队列识别代号
    struct msgbuf
    {
        long mtype;//消息类型
        char mtext[20];//消息内容
    }send_buf,receive_buf;
    if((msqid=msgget(IPC_PRIVATE,0700))<0)//建立消息队列
    {
        printf("msgget建立消息队列失败。\n");
        exit(1);
    }
    else
        printf("msgget建立消息队列成功,该消息队列识别代号为%d。\n",msqid);
    if((pid=fork())<0)
    {
        printf("fork()函数调用失败!\n");
        exit(2);
    }
    else if(pid>0)//父进程,发送消息到消息队列
    {
        send_buf.mtype=1;
        strcpy(send_buf.mtext,"My test information");
        printf("发送到消息队列的信息内容为:%s\n",send_buf.mtext);
        if(msgsnd(msqid,&send_buf,20,IPC_NOWAIT)<0)//发送send_buf中的信息到msqid对应的消息队列
        {
            printf("msgsnd消息发送失败。\n");
            exit(3);
        }
        else
            printf("msgsnd消息发送成功。\n");
        sleep(2);
        exit(0);
    }
    else//子进程,从消息队列中接收消息]
    {
        sleep(2);//等待父进程发送消息完成
        int infolen;//读到的信息数据长度
        if((infolen=msgrcv(msqid,&receive_buf,20,0,IPC_NOWAIT))<0)//自消息队列接收信息
        {
            printf("msgrcv读取信息错误。\n");
            exit(4);
        }
        else
            printf("msgrcv读取信息成功。\n");
        printf("自消息队列读取到的内容为%s,共读取%d个字节。\n",receive_buf.mtext,infolen);
        if((msgctl(msqid,IPC_RMID,NULL))<0)//删除msqid对应的消息队列
        {
            printf("msgctl函数调用出现错误。\n");
            exit(5);
        }
        else
        {
            printf("识别代号为%d的消息队列已经被成功删除。\n",msqid);
            exit(0);
        }
    }
}
/*********************程序运行结果*********************
[root@localhost temp]# ./msg
msgget建立消息队列成功,该消息队列识别代号为98304。
发送到消息队列的信息内容为:My test information
msgsnd消息发送成功。
msgrcv读取信息成功。
自消息队列读取到的内容为My test information,共读取20个字节。
识别代号为98304的消息队列已经被成功删除。
***********************************************************/

/*********************程序相关信息*********************
程序编号:013
程序编写起始日期:2008.11.1
程序编写完成日期:2008.11.1
程序修改日期:                                   修改备注:
程序目的:学习linux共享内存
所用主要函数:shmget(),shmat(),shmctl(),shmdt()
程序存疑:
程序完成地点: 宿舍内
*********************程序相关信息*********************/
#include<sys/ipc.h>
#include<sys/shm.h>
#include<stdlib.h>
#include<string.h>
#include<stdio.h>
int main()
{
    int pid,shmid;//后者为共享内存识别代号
    char *write_address;
    char *read_address;
    struct shmid_ds dsbuf;
    if((shmid=shmget(IPC_PRIVATE,32,0))<0)//分配共享内存
    {
        printf("shmid共享内存分配出现错误。\n");
        exit(1);
    }
    else
        printf("shmid共享内存分配成功,共享内存识别代号为:%d。\n",shmid);
    if((pid=fork())<0)
    {
        printf("fork函数调用出现错误!\n");
        exit(2);
    }
    else if(pid>0)//父进程,向共享内存中写入数据
    {
        printf("父进程的ID是:%d\n",getpid());
        write_address=(char *)shmat(shmid,NULL,0);//连接共享内存
        if((int)write_address==-1)
        {
            printf("shmat连接共享内存错误。\n");
            exit(3);
        }
        else
        {
            printf("shmat连接共享内存成功。\n");
            strcpy(write_address,"我是写入共享内存的测试数据");//将数据写入共享内存
            printf("写入共享内存的信息为“%s”。\n",write_address);
            if((shmdt((void *)write_address))<0)//断开与共享内存的连接
                printf("shmdt共享内存断开错误。\n");
            else
                printf("shmdt共享内存断开成功。\n");
            sleep(2);
            return;
        }
    }
    else//子进程,从共享内存中读取数据
    {
        sleep(2);//等待父进程写入共享内存完毕
        printf("子进程ID是:%d\n",getpid());
        if((shmctl(shmid,IPC_STAT,&dsbuf))<0)
        {
            printf("shmctl获取共享内存数据结构出现错误。\n");
            exit(4);
        }
        else
        {
            printf("shmctl获取共享内存数据结构成功。\n建立这个共享内存的进程ID是:%d\n",dsbuf.shm_cpid);
            printf("该共享内存的大小为:%d\n",dsbuf.shm_segsz);
            if((read_address=(char *)shmat(shmid,0,0))<0)//连接共享内存
            {
                printf("shmat连接共享内存出现错误。\n");
                exit(5);
            }
            else
            {
                printf("自共享内存中读取的信息为:“%s”。\n",read_address);
                printf("最后一个操作该共享内存的进程ID是:%d\n",dsbuf.shm_lpid);
                if((shmdt((void *)read_address))<0)//断开与共享内存的连接
                {
                    printf("shmdt共享内存断开错误。\n");
                    exit(6);
                }
                else
                    printf("shmdt共享内存断开成功。\n");
                if(shmctl(shmid,IPC_RMID,NULL)<0)//删除共享内存及其数据结构
                {
                    printf("shmctl删除共享内存及其数据结构出现错误。\n");
                    exit(7);
                }
                else
                    printf("shmctl删除共享内存及其数据结构成功。\n");
                exit(0);
            }
        }    
    }
}
/*********************程序运行结果*********************
[root@localhost temp]# ./shm
shmid共享内存分配成功,共享内存识别代号为:1703947。
父进程的ID是:7647
shmat连接共享内存成功。
写入共享内存的信息为“我是写入共享内存的测试数据”。
shmdt共享内存断开成功。
子进程ID是:7648
shmctl获取共享内存数据结构成功。
建立这个共享内存的进程ID是:7647
该共享内存的大小为:32
自共享内存中读取的信息为:“我是写入共享内存的测试数据”。
最后一个操作该共享内存的进程ID是:7647
shmdt共享内存断开成功。
shmctl删除共享内存及其数据结构成功。
***********************************************************/






相关阅读 更多 +
排行榜 更多 +
辰域智控app

辰域智控app

系统工具 下载
网医联盟app

网医联盟app

运动健身 下载
汇丰汇选App

汇丰汇选App

金融理财 下载