笔者前段时间做一个消息队列优化程序时涉及到多线程同步问题,顺便看了下大学操作系统课程。将生产者-消费者问题以及哲学家就餐问题实现了下,做以下笔记。
哲学家就餐问题
设有5个哲学家,共享一张放有5把椅子的桌子,每人一把椅子,但是桌子上只有5只筷子,在每人两边分开各放一支;哲学家在就餐时必须试图分两次从两边拾起筷子就餐。
条件:
1)只有拿到2只筷子,哲学家才能吃饭。
2)如果筷子在他人手上,哲学家只能等到他人吃完后才能拿到筷子。
3)任一哲学家在自己未拿到2只筷子吃饭之前,决不放弃自己的筷子。
死锁问题的产生:
每个哲学家都握有自己左边(或都握有右边)的筷子。如果用5信号量S1~S5代表5个筷子资源,用Pa~Pe五个进程代表5为哲学家的话,可以用下述算法描述该死锁情况:
Pa                  Pb                 Pc                Pd                Pe
{P(S1);         {P(S2);          {P(S3);        {P(S4);        {P(S5);
P(S5);             P(S1);            P(S2);          P(S3);          P(S4);
eat;                eat;               eat;              eat;             eat;
V(S1);            V(S2);           V(S3);         V(S3);          V(S4);
V(S5;}           V(S1);}         V(S2);}        V(S4);}       V(S5);
 当且仅当5进程第一轮分别执行完黑线上的操作后(都握有左边筷子),随后才会发生死锁。
降低死锁或者避免死锁情况的出现:
1.哲学家在拿不到第二只筷子时等待随机时间,而不是等待相同时间,死锁的概率就大大降低。
2.拿筷子前,对互斥信号量mutex执行down操作,放回筷子后对mutex执行up操作。理论上可行,但任何时刻都只有一个哲学家在就餐。实际上可以两个同时就餐。
3.使用一个信号量数组(对应每一位哲学家),当一个哲学家左右两个邻居都没有进餐时才可以进入进餐状态,如果所需要的筷子被占用,想进餐的哲学家就会被阻塞。解法多人任意位哲学家都能获得最大并行度。
 
下面是方法的相关引用,宏定义和全局变量定义:
1 #include <unistd.h>
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <pthread.h>
5 #include <semaphore.h>
6
7 #define PHILOSOPHER_NUM 5
8 #define THINKING 0
9 #define HUNGRY 1
10 #define EATING 2
11
12 pthread_mutex_t mutex;
13 sem_t semph[PHILOSOPHER_NUM];
14 #define LEFT (i + PHILOSOPHER_NUM -1)%PHILOSOPHER_NUM //i 的左领居编号
15 #define RIGHT (i + 1)%PHILOSOPHER_NUM //i 的右领居编号
16 int state[PHILOSOPHER_NUM];
      程序是在Unix环境中编写运行的,<unistd.h>Unix系统标准头文件必不可少,<pthread.h>是线程必须库文件,<semaphore.h>则是信号量操作的头文件。注意还有一个是<sys/sem.h>,区别待查。哲学家共有三个state:THINKING,HUNGRY,EATING。LEFT和RIGHT分别表示当前哲学家的左邻居和右邻居。信号量的数据类型为结构sem_t,它本质上是一个长整型的数。初始化信号量函数sem_init() 

extern int sem_init __P ((sem_t *__sem, int __pshared, unsigned int __value));  T>

sem为指向信号量结构的一个指针;pshared不为0时此信号量在进程间共享,否则只能为当前进程的所有线程共享;value给出了信号量的初始值。

      函数sem_post( sem_t *sem )用来增加信号量的值。当有线程阻塞在这个信号量上时,调用这个函数会使其中的一个线程不在阻塞,选择机制同样是由线程的调度策略决定的。它作用是给信号量的值加上一个“1”,它是一个“原子操作”,即同时对同一个信号量做加“1”操作的两个线程是不会冲突的。而同时对同一个文件进行读、加和写操作的两个程序就有可能会引起冲突。

      函数sem_wait( sem_t *sem )被用来阻塞当前线程直到信号量sem的值大于0,解除阻塞后将sem的值减一,表明公共资源经使用后减少。sem_wait函数也是一个原子操作,它的作用是从信号量的值减去一个“1”,但它永远会先等待该信号量为一个非零值才开始做减法。也就是说,如果你对一个值为2的信号量调用sem_wait(),线程将会继续执行,介信号量的值将减到1。如果对一个值为0的信号量调用   

      sem_wait(),这个函数就会地等待直到有其它线程增加了这个值使它不再是0为止。如果有两个线程都在sem_wait()中等待同一个信号量变成非零值,那么当它被第三个线程增加一个“1”时,等待线程中只有一个能够对信号量做减法并继续执行,另一个还将处于等待状态。

      函数sem_trywait ( sem_t *sem )是函数sem_wait()的非阻塞版本,它直接将信号量sem的值减一。

      函数sem_destroy(sem_t *sem)用来释放信号量sem。

      笔者这里用sem_t信号量数组对应每一位哲学家,用互斥锁pthread_mutex_t来维护哲学家状态的改变。pthread_mutex_lock()和pthread_mutex_unlock(),这次互斥锁其实很sem_t信号量初始原子值置1效果一样。
 
以下为就餐问题的具体实现
1 void think(int i)
2 {
3 }
4
5 void eat(int i)
6 {
7 }
8
9 void test(int i)
10 {
11 if((state[i] == HUNGRY )
12 && (state[LEFT] != EATING)
13 && (state[RIGHT]!= EATING))
14 {
15 state[i] = EATING;
16
17 sem_post(&semph[i]);
18 }
19 }
20
21
22
23 void take_forks(int i)
24 {
25 pthread_mutex_lock(&mutex);
26 state[i] = HUNGRY;
27 test(i);
28 pthread_mutex_unlock(&mutex);
29 sem_wait(&semph[i]);
30 }
31
32 void put_forks(int i)
33 {
34 pthread_mutex_lock(&mutex);
35 state[i] = THINKING;
36 test(LEFT);
37 test(RIGHT);
38 pthread_mutex_unlock(&mutex);
39 }
40
41
42 void philosopher(int *index)
43 {
44 int mythreadId = (*index);
45 int sleepTime;
46 bzero(state,0);
47
48 while(1)
49 {
50 think(mythreadId);
51 take_forks(mythreadId);
52 eat(mythreadId);
53 put_forks(mythreadId);
54
55 // sleep a random time : between 1 - 5 s
56 sleepTime = 1 + (int)(5.0*rand()/(RAND_MAX+1.0));
57 usleep(sleepTime*10);
58 }
59 }
这里philosopher()函数是创建的多线程函数,它的参数表示的是第几个线程函数。
函数中while(1)一个就餐操作:
      think——>takeforks——>eat——>putforks。
      think()和eat()操作这里我们暂不关心。主要是在哲学家takefork和putfork时候所做的互斥操作以及如何通过信号量通知其他线程可以继续操作。
      首先任何一个哲学家(ph线程)进行takefork和putfork时候都需要互斥锁mutex控制,保证不会同时有两个哲学家拿起同一个筷子,不然程序进混乱!pthread_mutex_lock(&mutex)锁住,相当于将信号量置0,当别的哲学家(ph线程)希望拿起或者放下fork的时候都会被阻塞。等当前哲学家(ph线程)拿起或者放下fork后,其他哲学家才能继续拿起或者放下fork(当然此处笔者觉得也可以对每个fork对应一个mutex,当前哲学家操作的两个fork置锁,其他的fork对应的mutex可以继续拿起或放下,未具体实现)。当当前哲学家对fork和state改变之后,pthread_mutex_unlock(&mutex)进行开锁。
      然后就是每个哲学家对应的信号量semph,takefork时,首先进行test判断当前哲学家的左右邻居是否是EATING状态,如果都不是表示当前哲学家可以EATING(拿起两个筷子),并将当前哲学家对应的信号量sem_post(),做+1操作。takefork结束时,做sem_wait()-1操作,如果哲学家没有拿起筷子,即没有sem_post()操作,那么当前哲学家的semph被sem_wait为0,被阻塞。如果能够EATING,则当前哲学家信号量仍然为1,post和wait抵消。
      putfork中EATING完之后则test当前哲学家左右邻居是否能够EATING,如果可以则将他们的semph做sem_post()+1操作,之前被挂起的左右邻居就可以EATING继续操作了。
      我们再加上一个main函数
1 int main()
2 {
3 int i,ret;
4 int threadId[PHILOSOPHER_NUM];
5 int errNum=0;
6 pthread_t t_phThread[PHILOSOPHER_NUM];
7 pthread_t t_threadInfo;
8
9 srand(getpid());
10 pthread_mutex_init(&mutex,NULL);
11
12 //ret = pthread_create(&t_threadInfo, NULL, (void *) threadInfo, (void *) NULL);
13 if( ret )
14 {
15 errNum++;
16 }
17
18 for(i=0;i<PHILOSOPHER_NUM;i++)
19 {
20 threadId[i] = i;
21 sem_init(&semph[i],0,1);
22 ret=pthread_create(&t_phThread[i], NULL, (void *)philosopher, (void*)(&threadId[i]));
23 if(ret)
24 {
25 errNum++;
26 }
27
28 usleep(20);
29 }
30 if(errNum)
31 {
32 printf("thread create err! errnum=%d \n",errNum);
33 }
34
35 sleep(1);
36 }
主函数中主要创建philosopher线程,线程的创建函数如下,
#include<pthread.h>
int pthread_create(pthread_t *restrict tidp,const pthread_attr_t *restrict_attr,void*(*start_rtn)  (void*),void *restrict arg);
返回值:若成功则返回0,否则返回出错编号。返回成功时,由tidp指向的内存单元被设置为新创建线程的线程ID。attr参数用于制定各种不同的线程属性。新创建的线程从start_rtn函数的地址开始运行,该函数只有一个无指针参数arg,如果需要向start_rtn函数传递的参数不止一个,那么需要把这些参数放到一个结构中,然后把这个结构的地址作为arg的参数传入。
笔者同时创建了一个打印信息的线程threadInfo。
主函数的最后sleep()是为了使主函数能停留一会,以便线程运行。当main函数执行结束后,它的线程也会结束。如果不想程序结束,可以在主函数中也添加无限循环,main函数也可以类似一个线程函数持续执行。
这里是方式是哲学家在自己未拿到2只筷子吃饭之前,决不放弃自己的筷子,笔者在网络上看到一种可以放弃自己的筷子的方法。同时定义的信号量是对对应的每一只筷子。当前哲学家在检测左右筷子是否可以拿起的时候,如果不能同时拿起则放弃拿起。然后持续执行一直到左右邻居将筷子放下。我将实现程序贴出:
1 #include <unistd.h>
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <pthread.h>
5 #include <semaphore.h>
6
7 #define PHILOSOPHER_NUM 5
8 #define THINKING 1
9 #define HUNGRY 2
10 #define EATING 3
11
12 pthread_mutex_t mutex;
13 sem_t semph[PHILOSOPHER_NUM];
14
15 void philosopher(int *index)
16 {
17 int mythreadId;
18 char myState,strState[128];
19 int leftIndex;
20 int rightIndex;
21 int sleepTime;
22
23 mythreadId = (*index);
24 leftIndex = mythreadId + PHILOSOPHER_NUM-1% PHILOSOPHER_NUM;
25 rightIndex = (mythreadId + 1) % PHILOSOPHER_NUM;
26
27 myState = THINKING;
28
29 while(1)
30 {
31 switch(myState)
32 {
33 case THINKING:
34 myState = HUNGRY;
35 strcpy(strState,"HUNGRY");
36 break;
37 case HUNGRY:
38 strcpy(strState,"HUNGRY");
39 if(!(sem_wait(&semph[leftIndex])))
40 {
41 if(!(sem_wait(&semph[rightIndex])))
42 {
43 myState = EATING;
44 strcpy(strState,"EATING");
45 }
46 else
47 {//不能同时拿起则都不拿起
48 sem_post(&semph[leftIndex]);
49 }
50 }
51 break;
52 case EATING:
53 sem_post(&semph[leftIndex]);
54 sem_post(&semph[rightIndex]);
55
56 myState=THINKING;
57 strcpy(strState,"THINKING");
58 break;
59 }
60 pthread_mutex_lock(&mutex);
61 printf("pholosopher %d begin %s\n",mythreadId,strState);
62 pthread_mutex_unlock(&mutex);
63
64 sleepTime = 1+(int)(5.0*rand()/(RAND_MAX+1.0));
65 usleep(sleepTime);
66
67 }
68 }
69
70 int main()
71 {
72 int i,ret;
73 int threadId[PHILOSOPHER_NUM];
74 int errNum=0;
75 pthread_t t_phThread[PHILOSOPHER_NUM];
76 //pthread_t t_threadInfo;
77
78 srand(getpid());
79 pthread_mutex_init(&mutex,NULL);
80 for(i=0;i<PHILOSOPHER_NUM;i++)
81 {
82 threadId[i] = i;
83 sem_init(&semph[i],0,1);
84 ret=pthread_create(&t_phThread[i], NULL, (void *)philosopher, (void*)(&threadId[i]));
85 if(ret)
86 {
87 errNum++;
88 }
89
90 usleep(20);
91 }
92 if(errNum)
93 {
94 printf("thread create err! errnum=%d \n",errNum);
95 }
96
97 sleep(1);
98 }

作者: margincc 发表于 2011-06-30 14:06 原文链接

推荐.NET配套的通用数据层ORM框架:CYQ.Data 通用数据层框架
新浪微博粉丝精灵,刷粉丝、刷评论、刷转发、企业商家微博营销必备工具"