IPC学习(4)--posix semaphore
时间:2006-04-04 来源:rwen2012
sem_open() sem_init()
\ /
\ /
sem_wait()
sem_trywait()
sem_post()
sem_getvalue()
/ \
/ \
sem_close() sem_destroy()
sem_unlink()
(1) (2)
其中1为有名的信号量(路径名), 2为共享内存的信号量,有名的信号量通常用于相互间不相关的进程间通讯。
1.信号量可以像互斥变量那样可以用于实现进程间的互斥,比较如下:
initialize mutex;
pthread_mutex_lock(mutex);
critical region
pthread_mutex_unlock(mutex);
互斥变量方法
------------
initialize semaphore sem to 1;
sem_wait(&sem);
critical region
sem_post(&sem);
信号量方法,这里用到的是二进制的信号量
=============
2.对于生产者,消费者问题,也可以有类似于互斥变量的方法使进程之间实现同步:
initialize semaphore get to 0;
initialize semaphore put to 1;
for ( ; ; ) {
sem_wait(&put);
put into buffer
sem_post(&get);
}
producer
-----------------------------
for ( ; ; ) {
sem_wait(&get);
put into buffer
sem_post(&put);
}
consumer
================================
3.比较一下互斥变量和信号量方法的异同:
1). 互斥变量要自己才能解开自己加的锁,而信号量则可以相互角锁,如上。
2). 互斥变量只有锁或不锁两种状态,这相当于二进制信号量,而信号量可以是一个资源的计数器。
3). 互斥变量中用条件变量信号来通知(pthread_cond_signal)另一个在等待(pthread_cond_wait)的线程某个资源是否可用,而信号是可能丢失的,这就存在了不可靠的因素。信号量则不会,一个sem_post的信号量可以在以后的任何时候被取到而不会有丢失的情况。
4.信号量还可以实现类似于记录锁的功能,使读写同步。
#define LOCK_PATH "pxsemlock"
sem_t *locksem;
int initflag;
void mylock(init fd)
{
if (initflag == 0) {
locksem = sem_open(px_ipc_name(LOCK_PATH), O_CREAT, FILE_MODE, 1);
initflag = 1;
}
sem_wait(locksem);
}
void my_unlock(int fd)
{
sem_post(locksem);
}
即是,某个进程只有获取该信号量时才能对文件进行读写。
5.下面是一个生产者,消费者的例子,用信号量实现他们之间的同步。(多生产者,单消费者情况)
//fron UNPv2
#define NBUFF 10
#define MAXNTHREADS 100
int nitems, nproducers; /* read-only by producer and consumer */
struct { /* data shared by producers and consumer */
int buff[NBUFF];
int nput;
int nputval;
sem_t mutex, nempty, nstored; /* semaphores, not pointers */
} shared;
void *produce(void *), *consume(void *);
int
main(int argc, char **argv)
{
int i, count[MAXNTHREADS];
pthread_t tid_produce[MAXNTHREADS], tid_consume;
if (argc != 3)
err_quit("usage: prodcons3 <#items> <#producers>");
nitems = atoi(argv[1]);
nproducers = min(atoi(argv[2]), MAXNTHREADS);
/* 4initialize three semaphores */
Sem_init(&shared.mutex, 0, 1);
Sem_init(&shared.nempty, 0, NBUFF);
Sem_init(&shared.nstored, 0, 0);
/* 4create all producers and one consumer */
Set_concurrency(nproducers + 1);
for (i = 0; i < nproducers; i++) {
count[i] = 0;
Pthread_create(&tid_produce[i], NULL, produce, &count[i]);
}
Pthread_create(&tid_consume, NULL, consume, NULL);
/* 4wait for all producers and the consumer */
for (i = 0; i < nproducers; i++) {
Pthread_join(tid_produce[i], NULL);
printf("count[%d] = %d\n", i, count[i]);
}
Pthread_join(tid_consume, NULL);
Sem_destroy(&shared.mutex);
Sem_destroy(&shared.nempty);
Sem_destroy(&shared.nstored);
exit(0);
}
/* end main */
/* include produce */
void *
produce(void *arg)
{
for ( ; ; ) {
Sem_wait(&shared.nempty); /* wait for at least 1 empty slot */
Sem_wait(&shared.mutex);
if (shared.nput >= nitems) {
Sem_post(&shared.nempty);
Sem_post(&shared.mutex);
return(NULL); /* all done */
}
shared.buff[shared.nput % NBUFF] = shared.nputval;
shared.nput++;
shared.nputval++;
Sem_post(&shared.mutex);
Sem_post(&shared.nstored); /* 1 more stored item */
*((int *) arg) += 1;
}
}
/* end produce */
/* include consume */
void *
consume(void *arg)
{
int i;
for (i = 0; i < nitems; i++) {
Sem_wait(&shared.nstored); /* wait for at least 1 stored item */
Sem_wait(&shared.mutex);
if (shared.buff[i % NBUFF] != i)
printf("error: buff[%d] = %d\n", i, shared.buff[i % NBUFF]);
Sem_post(&shared.mutex);
Sem_post(&shared.nempty); /* 1 more empty slot */
}
return(NULL);
}
/* end consume */
//=========================================
\ /
\ /
sem_wait()
sem_trywait()
sem_post()
sem_getvalue()
/ \
/ \
sem_close() sem_destroy()
sem_unlink()
(1) (2)
其中1为有名的信号量(路径名), 2为共享内存的信号量,有名的信号量通常用于相互间不相关的进程间通讯。
1.信号量可以像互斥变量那样可以用于实现进程间的互斥,比较如下:
initialize mutex;
pthread_mutex_lock(mutex);
critical region
pthread_mutex_unlock(mutex);
互斥变量方法
------------
initialize semaphore sem to 1;
sem_wait(&sem);
critical region
sem_post(&sem);
信号量方法,这里用到的是二进制的信号量
=============
2.对于生产者,消费者问题,也可以有类似于互斥变量的方法使进程之间实现同步:
initialize semaphore get to 0;
initialize semaphore put to 1;
for ( ; ; ) {
sem_wait(&put);
put into buffer
sem_post(&get);
}
producer
-----------------------------
for ( ; ; ) {
sem_wait(&get);
put into buffer
sem_post(&put);
}
consumer
================================
3.比较一下互斥变量和信号量方法的异同:
1). 互斥变量要自己才能解开自己加的锁,而信号量则可以相互角锁,如上。
2). 互斥变量只有锁或不锁两种状态,这相当于二进制信号量,而信号量可以是一个资源的计数器。
3). 互斥变量中用条件变量信号来通知(pthread_cond_signal)另一个在等待(pthread_cond_wait)的线程某个资源是否可用,而信号是可能丢失的,这就存在了不可靠的因素。信号量则不会,一个sem_post的信号量可以在以后的任何时候被取到而不会有丢失的情况。
4.信号量还可以实现类似于记录锁的功能,使读写同步。
#define LOCK_PATH "pxsemlock"
sem_t *locksem;
int initflag;
void mylock(init fd)
{
if (initflag == 0) {
locksem = sem_open(px_ipc_name(LOCK_PATH), O_CREAT, FILE_MODE, 1);
initflag = 1;
}
sem_wait(locksem);
}
void my_unlock(int fd)
{
sem_post(locksem);
}
即是,某个进程只有获取该信号量时才能对文件进行读写。
5.下面是一个生产者,消费者的例子,用信号量实现他们之间的同步。(多生产者,单消费者情况)
//fron UNPv2
#define NBUFF 10
#define MAXNTHREADS 100
int nitems, nproducers; /* read-only by producer and consumer */
struct { /* data shared by producers and consumer */
int buff[NBUFF];
int nput;
int nputval;
sem_t mutex, nempty, nstored; /* semaphores, not pointers */
} shared;
void *produce(void *), *consume(void *);
int
main(int argc, char **argv)
{
int i, count[MAXNTHREADS];
pthread_t tid_produce[MAXNTHREADS], tid_consume;
if (argc != 3)
err_quit("usage: prodcons3 <#items> <#producers>");
nitems = atoi(argv[1]);
nproducers = min(atoi(argv[2]), MAXNTHREADS);
/* 4initialize three semaphores */
Sem_init(&shared.mutex, 0, 1);
Sem_init(&shared.nempty, 0, NBUFF);
Sem_init(&shared.nstored, 0, 0);
/* 4create all producers and one consumer */
Set_concurrency(nproducers + 1);
for (i = 0; i < nproducers; i++) {
count[i] = 0;
Pthread_create(&tid_produce[i], NULL, produce, &count[i]);
}
Pthread_create(&tid_consume, NULL, consume, NULL);
/* 4wait for all producers and the consumer */
for (i = 0; i < nproducers; i++) {
Pthread_join(tid_produce[i], NULL);
printf("count[%d] = %d\n", i, count[i]);
}
Pthread_join(tid_consume, NULL);
Sem_destroy(&shared.mutex);
Sem_destroy(&shared.nempty);
Sem_destroy(&shared.nstored);
exit(0);
}
/* end main */
/* include produce */
void *
produce(void *arg)
{
for ( ; ; ) {
Sem_wait(&shared.nempty); /* wait for at least 1 empty slot */
Sem_wait(&shared.mutex);
if (shared.nput >= nitems) {
Sem_post(&shared.nempty);
Sem_post(&shared.mutex);
return(NULL); /* all done */
}
shared.buff[shared.nput % NBUFF] = shared.nputval;
shared.nput++;
shared.nputval++;
Sem_post(&shared.mutex);
Sem_post(&shared.nstored); /* 1 more stored item */
*((int *) arg) += 1;
}
}
/* end produce */
/* include consume */
void *
consume(void *arg)
{
int i;
for (i = 0; i < nitems; i++) {
Sem_wait(&shared.nstored); /* wait for at least 1 stored item */
Sem_wait(&shared.mutex);
if (shared.buff[i % NBUFF] != i)
printf("error: buff[%d] = %d\n", i, shared.buff[i % NBUFF]);
Sem_post(&shared.mutex);
Sem_post(&shared.nempty); /* 1 more empty slot */
}
return(NULL);
}
/* end consume */
//=========================================
相关阅读 更多 +