1、竞争条件
1.甚么是竞争条件?
两个或多个进程读写某些同享数据,而最后的结果取决于进程运行的精确时序,称为竞争条件。
2.怎样避免竞争条件?
要避免这类毛病,关键是找出某种途径来禁止多个进程同时读写同享的数据。换言之,我们需要互斥!即以某种手段确保当1个进程在使用1个同享变量或文件时,其他进程不能做一样的操作。
3.甚么叫临界区?
我们把对同享内存进行访问的程序片断称作临界区域或临界区。使得两个进程不可能同时处于临界区中,就可以够避免竞争条件。
4.1个好的避免出现竞争条件的解决方案,需要满足以下4个条件:
a).任何两个进程不能同时处于临界区。
b).不应当对CPU的速度和数量做任何假定。
c).临界区外运行的进程不得阻塞其他进程。
d).不得使进程无穷期等待进入临界区。
2、互斥方式
2.1.忙等待的互斥
1).屏蔽中断
在单处理器系统中,最简单的办法是使每一个进程在刚刚进入临界区后立即屏蔽所有中断,并在将要离开之前再打开中断。屏蔽中断后,时钟中断也被屏蔽。CPU只有在产生时钟中断或其他中断时才会进行进程切换。所以,在屏蔽中断后CPU将不会被切换到其他进程。因而,1旦某个进程屏蔽中断后,它就能够检查和修改内存,而没必要担心其他进程参与。
优点:足够简单、明了。
缺点:把屏蔽中断的权利交给用户进程是不明智的。假想1下,如果1个用户进程屏蔽中断后不再打开,其结果将会如何?
只能适用于单CPU系统。如果系统是多处理器,则屏蔽中断仅仅对履行disable指令的那个CPU有效,其他CPU仍将继续运行并切换进程,而且可以访问同享内存。
2).锁变量
这是1种软件解决方案。假想有1个同享(锁)变量,其初始值为0,当1个进程想进入其临界区时,它首先测试这把锁。如果该锁的值为0,则该进程将其设置为1并进入临界区。若这把锁的值已为1,则该进程将等待直到其值变成0。
疏漏:假定1个进程读出锁变量的值并发现它为0,而恰好在它将其值设置为1之前,另外一个进程被调度运行,将该锁变量设置为1。当第1个进程再次运行时,它一样也将该锁设置为1,并进入临界区。此时同时有两个进程进入临界区。所以这类方案不可行!
3).严格轮换法
设置1个整型变量turn,初始值为0,用于记录轮到哪一个进程进入临界区,并检查或更新同享内存。开始时,进程0检查turn,发现其值为0,因而进入临界区。进程1也发现其值为0,所以在1个等待循环中不停地测试turn,看其值什么时候变成1,等到其值变成1,则进入临界区。
这类方式可用下面的两段代码描写:
进程0:
while(TRUE){
while (turn != 0);
critical_region();
turn = 1;
noncritical_region();
}
进程1:
while(TRUE){
while (turn != 1);
critical_region();
turn = 1;
noncritical_region();
}
可以看到,进程0在离开临界区前,它将turn的值设置为1,以便允许进程1进入临界区。
优点:简单、有效;
缺点:性能较低,CPU的利用率不高,由于触及到忙等待的问题。连续测试1个变量直到某个值出现为止,称为忙等待。只有在有理由认为等待时间是非常短的情况下,才能使用忙等待。用于忙等待的锁,称为自旋锁。
试着斟酌这样1种情况:假定turn的值为0,此时进程0先进入临界区,现在进程0很快就履行完其全部循环,它退出临界区,并将trun的值设置为1。此时,turn的值为1,两个进程都在其临界区外履行。突然,进程0结束了非临界区的操作,并且尝试进入临界区。但是,此时它没法进入临界区,由于turn的值为1,而此时进程1还在忙于履行非临界区的操作,进程0只有继续while循环,直到进程1把turn的值改成0。这说明在1个进程比另外1个进程慢了很多的情况下,轮番进入临界区其实不是1个好办法。这类情况违背了前面叙述的条件3:进程0被1个临界区以外的进程阻塞。
4).Peterson算法
这类方法是由两个ANSI C编写的进程:
#define FALSE 0
#define TRUE 1
#define N 2
int turn;
int interested[N];
void enter_region(int process)
{
int other;
other = 1 - process;
interested[process] = TRUE;
turn = process;
while(turn == process && interested[other] == TRUE);
}
void leave_region(int process)
{
interested[process] = FALSE;
}
看看它是如何工作的:1开始,没有任何进程处于临界区中,现在进程0调用enter_region(),并传入自己的进程ID,它通过设置其数组元素和将turn设置为0来标识希望进入临界区。由于进程1其实不想进入临界区,所以enter_region()很快便返回。如果进程1现在调用enter_region(),进程1将在此处挂起直到interested[0]变成FALSE,该事件只有在进程0调用leave_region()退出临界区时才会产生。
再斟酌两个进程几近同时调用enter_region()的情况,他们都将自己的进程ID写入到了turn变量,但只有后被保存进入的进程ID才有效,假定进程1是后存入ID的,则turn的值为1,。当两个进程都运行到while循环的时候,进程0将循环0次并进入临界区,因此时turn为1,process为0,虽然interested[1]为TRUE;而进程1将不停地循环且不能进入临界区,因turn的值为1,process也为1,切interested[1]为TRUE。
缺点:while循环会致使进程处于忙等待的状态,直到条件满足可以进入临界区为止。
5).TSL指令
这是1种需要硬件支持的方案。1般存在这样1条指令:TSL RX,LOCK 称为测试并加锁(Test and Set Lock)指令,它将1个内存字lock读取到寄存器RX中,然后再该内存地址上存储1个非零值。读字和写字操作保证是不可分割的原子操作。履行TSL指令的CPU将锁住内存总线,以制止其他CPU在本指令结束之前访问内存。当LOCK的值为0时,任何进程都可使用TSL指令将其值设置为1,并读写同享内存。当操作结束,进程用1条普通的move指令将LOCK的值重新设置为0。
enter_region:
TSL RX,LOCK ;复制锁变量到寄存器RX,并将锁变量设置为1
CMP RX,#0 ;锁变量是0吗?
JNE enter_region ;若不是零,说明已被设置,所以跳转到enter_region开始循环履行
RET ;若是零,返回调用者,可以进入临界区
leave_region:
MOVE LOCK,#0 ;将0存入锁变量
RET ;返回调用者,离开临界区
缺点:进程在进入临界区前,先调用enter_region,这将致使忙等待,直到锁空闲为止。与基于临界区问题的所有解法1样,进程必须在正确的时间调用enter_region和leave_region,解法才能见效。如果1个进程有讹诈行动,则互斥将会失败。
6).XCHG指令
该指令是1个可替换TSL的指令。它原子性的交换了两个位置的内容。用它实现互斥的方法以下:
enter_region:
MOVE RX,#1 ;在寄存器中放入1个1
XCHG RX,LOCK ;将LOCK和RX的值交换
CMP RX,#0 ;判断RX的值是不是为0?
JNE enter_region ;如果不为0,说明已被设置,跳转到enter_region循环履行
RET ;如果为0,则返回调用者,可进入临界区
leave_region:
MOVE LOCK,#0 ;将LOCK设置为0
RET ;返回调用者,离开临界区
缺点:和TSL指令1样,存在忙等待的问题。
7).总结
Peterson解法、TSL和XCHG解法都是正确的,但它们都有忙等待的缺点。这些解法在本质上是这样的:当1个进程想进入临界区时,先检查是不是允许进入,若不允许,则进程将原地等待,直到允许为止。
这类方法不但浪费了CPU时间,而且还可能引发料想不到的结果。斟酌1台计算机具有两个进程,H优先级较高,L优先级较低。调度规则规定,只要H处于就绪态它就能够运行。在某1时刻,L处于临界区中,此时H变到就绪态,准备运行。现在H开始忙等待,由于L此时处于临界区中。但由于H就绪时L不会被调度,也就没法离开临界区,所以H将永久忙等待下去。这类情况有时候被称为优先级反转问题。
2.2.寻觅更加高效的解决方案
从上文的总结可以看出,我们得寻觅1种更加高效的解决方案:当1个进程没法进入临界区时,将其阻塞,而不是忙等待,也许是个不错的方案。最简单的实现方案就是sleep和wakeup系统调用。sleep是1个将引发调用进程阻塞的系统调用,即被挂起,直到另外一个进程将其唤醒。wakeup调用有1个参数,即要被唤醒的进程。
作为使用这个方案的1个例子,我们斟酌生产者和消费者问题:两个进程同享1个公共的固定大小的缓冲区,其中1个是生产者,将信息放入缓冲区;另外一个是消费者,从缓冲区中取出信息。当缓冲区为空时,消费者应当阻塞,直到生产者生产了1份数据并唤醒它;当缓冲区满了时,生产者应当阻塞,直到消费者消费了1份数据并重新唤醒它。
这个例子可以大致表述为以下的代码:
#define MAXN 100 //缓冲区的容量
int count = 0; //缓冲区中的数据项数量
void producer(void)
{
int item;
while(TRUE)
{
item = produce_item(); //产生1项新数据
if(count == N) sleep(); //缓冲区已满了,生产者阻塞
insert_item(item);
count = count + 1;
if(count == 1) wakeup(consumer); //缓冲区空吗?如果不空,则唤醒消费者
}
}
void consumer(void)
{
int item;
while(TRUE)
{
if(count == 0) sleep(); //缓冲区空吗?如果为空,消费者阻塞
item = remove_item();
count = count - 1;
if(count == N - 1) wakeup(producer);//缓冲区还满吗?如果不满了,唤醒生产者
consume_item(item);
}
}
上述的代码是存在竞争条件的,其缘由是对count的访问未加任何限制。有可能出现以下情况:缓冲区为空,消费者进程刚刚读取count的值发现它为0。此时调度程序决定暂停消费者并启动运行生产者进程。生产者向缓冲区中加入1个数据项,count加1,。现在count的值为1,它推断由于刚才count的值为0,所以消费者此时1定在睡眠,因而生产者调用wakeup来唤醒消费者。但是,消费者此时在逻辑上并未睡眠,只不过是被调度程序转移到了就绪队列中,以等待下1次运行,所以wakeup信号丢失。当消费者下次运行时,它将测试先前读到的count值,发现它为0,因而睡眠。生产者早晚会填满全部缓冲区,然后睡眠,这样1来,两个进程都将永久睡眠下去。
问题的实质在于发给1个(尚)未睡眠的进程的wakeup信号丢失了。如果它没有丢失,则1切都很正常。解决它的方法就是使用信号量。
2.3.信号量
信号量,它使用1个整型变量来累计唤醒次数,供以后使用。它可以为0,表示没有保存下来的唤醒操作,也能够为正值,表示1个或多个唤醒操作。
对信号量有两种操作:down和up,它们分别对应1般化后的sleep和wakeup操作。
对1个信号量履行down操作,则是检测其值是不是大于0,若大于0,则将其值减1(即用掉1个保存的唤醒信号)并继续;若其值为0,则进程将睡眠,而此时down操作并未结束。1个信号量的down和up操作都是原子操作。所谓原子操作,是指1组相干联的操作要末都不中断的履行,要末都不履行。
up操作对信号量的值增加1.如果1个或多个进程在该信号量上睡眠,没法完成1个先前的down操作,则由系统选择其中1个(如随机挑选)并允许该进程完成它的down操作。因而,对1个有进程在其上睡眠的信号量履行1次up操作后,该信号量的值仍旧是0,但在其上睡眠的进程却少了1个。
如果使用多个CPU,则每一个信号量应当由1个锁变量进行保护,以确保同1时刻只有1个CPU在对信号量进行操作。
试着用信号量来解决生产者和消费者问题:
#define N 100 //缓冲区最大容量
typedef int Semaphore;
Semaphore mutex = 1; //提供互斥,控制对临界区的访问(2元信号量实现互斥)
Semaphore empty = N;
Semaphore full = 0;
void producer(void)
{
int item;
while(TRUE)
{
item = produce_item();
down(&empty);
down(&mutex);
insert_item(item);
up(&mutex);
up(&full);
}
}
void consumer(void)
{
int item;
while(TRUE)
{
down(&full);
down(&mutex);
item = remove_item();
up(&mutex);
up(&empty);
consume_item(item);
}
}
在上述的示例中,我们用信号量实现了两种操作:互斥和同步。mutex就是1个互斥信号量,它相当于1个锁变量,每一个进程在进入临界区前履行1个down操作,比在刚刚退出时履行1个up操作,就可以实现互斥了。信号量full和empty用来保证某种事件的顺序产生和不产生,例如当缓冲区满时生产者停止,和缓冲区空时消费者停止。
2.4.互斥量
如果不需要信号量的计数能力,有时可使用信号量的1个简化版本,称为互斥量(mutex)。它仅仅适用于管理同享资源或1小段代码。它实现简单并有效,因此在实现用户空间线程包时非常有用。
互斥量可以处于两种状态:解锁和加锁。它有两个进程,当1个进程(线程)需要访问临界区时,它调用mutex_lock,如果该互斥量当前是解锁的,此调用成功,调用线程可以自由进入该临界区。如果该互斥量已加锁,调用线程被阻塞,直到在临界区中的线程完成工作并调用mutext_unlock。此时如果有多个线程阻塞在该互斥量上,将随机选择1个线程并允许它取得锁。
如果有可用的TSL或XCHG指令,要在用户空间实现互斥量就很简单了。
mutex_lock:
TSL RX,MUTEX
CMP RX,$0 //互斥量是0吗?
JE ok //如果是0,则跳转到ok处,此时加锁成功,可以进入临界区
CALL thread_yield //如果不是0,将自己阻塞挂起,并让出CPU以调度另外一个线程
JMP mutex_lock //稍后再试
ok:
RET //返回调用者,进入临界区
mutex_unlock:
MOVE MUTEX,#0
RET
mutex_lock与enter_region的代码很类似,但有1个关键的区分:当enter_region进入临界区失败时,它始终重复测试锁(忙等待)。实际上,如果是在进程中,由于时钟超时作用,会调度其他进程运行,这样早晚具有锁的进程会进入临界区运行并释放锁,而忙等待的进程会因此而取得锁。而在用户线程中,情况有所不同,由于没有时钟停止运行时间太长的线程,会致使通过忙等待的方式来试图取得锁的线程将永久循环下去,决不会得到锁,由于这个试图取得锁的线程不会让其他线程运行从而释放锁。
2.5.Pthread中的互斥
Pthread提供许多可以用来同步线程的函数。其基本机制是使用1个可以被锁定和解锁的互斥量来保护每一个临界区。它提供了创建和撤消互斥量的方法pthread_mutex_init和pthread_mutex_destroy,也能够通过pthread_mutex_lock给互斥量加锁,如果该互斥量已被加锁时,则会阻塞调用者。还有1个调用可以用来尝试锁住1个互斥量,当互斥量已被加锁时会返回毛病代码而不是阻塞调用者,这个调用是pthread_mutex_trylock。最后,pthread_mutex_unlock用来给1个互斥量解锁,并在1个或多个线程等待它的情况下正确的释放1个线程。互斥量也能够有属性,但是这些属性只有在某些特殊的场合下使用。
除互斥量以外,pthread还提供了另外一种同步机制:条件变量。互斥量在允许或阻塞对临界区的访问上是很有用的,条件变量则允许线程由于1些未到达的条件而阻塞。绝大部份情况下这两种方法是1起使用的。这里的条件变量实际上类似于信号量。它也有专门的调用来创建和撤消,也能够有属性。最重要的两个操作是pthread_cond_wait和pthread_cond_signal。前者阻塞调用线程直到另外一其他线程向它发送信号。被阻塞的线程常常是在等待发信号的线程去做某些工作、释放某些资源或是进行其他的1些操作,只有完成后被阻塞的线程才可以继续运行。当多个线程被阻塞并等待同1个信号时,可使用pthread_cond_broadcast调用去唤醒它们所有。
条件变量与互斥量常常1起使用。这类模式用于让1个线程锁住1个互斥量,然后当它不能取得它期待的结果时等待1个条件变量。最后另外一个线程会向它发送信号,使它可以继续运行。pthread_cond_wait方法原子性的调用并解锁它持有的互斥量,由于这个缘由,互斥量是它的参数之1。
值得1说的是,条件变量不同于信号量,虽然它们很类似。条件变量(不像信号量)不会存在于内存中。如果将1个信号发送给1个没有线程在等待的条件变量,那末这个信号就会丢失。即是说条件变量不是计数器,也不能像信号量那样累计信号以便以后使用。所以,如果向1个条件变量发送信号,但是在该条件变量上并没有等待的进程,则该信号会永久丢失。
斟酌实现1个生产者和消费者问题:
#include <stdio.h>
#include <pthread.h>
#define MAX 10 //生产的数据数量
pthread_mutex_t the_mutex; //互斥锁
pthread_cond_t condC, condP; //消费者和生产者的条件变量
int buff = 0;
void *producer(void *ptr)
{
for (int i = 1; i <= MAX; ++i)
{
printf("生产者线程第【%d】次运行.\n",i);
pthread_mutex_lock(&the_mutex);
while(buff != 0)
{
printf("生产者线程睡眠.\n");
pthread_cond_wait(&condP, &the_mutex); //如果缓冲区中还有数据,则睡眠并等待消费者的唤醒信号
}
buff = i;
pthread_cond_signal(&condC); //通知消费者消费产品
pthread_mutex_unlock(&the_mutex);
}
printf("生产者线程运行终了!\n");
pthread_exit(NULL);
return NULL;
}
void *consumer(void *ptr)
{
for (int i = 1; i <= MAX; ++i)
{
printf("消费者线程第【%d】次运行.\n",i);
pthread_mutex_lock(&the_mutex);
while(buff == 0)
{
printf("消费者线程睡眠.\n");
pthread_cond_wait(&condC, &the_mutex); //如果没有可消费的产品,则睡眠并等待生产者的唤醒信号
}
printf("%d\n", buff);
buff = 0;
pthread_cond_signal(&condP); //通知生产者生产产品
pthread_mutex_unlock(&the_mutex);
}
printf("消费者线程运行终了!\n");
pthread_exit(NULL);
return NULL;
}
int main(int argc, char const *argv[])
{
pthread_t pro, con;
pthread_mutex_init(&the_mutex, NULL);
pthread_cond_init(&condC, NULL);
pthread_cond_init(&condC, NULL);
pthread_create(&con, NULL, consumer, NULL);
pthread_create(&pro, NULL, producer, NULL);
pthread_join(con, NULL);
pthread_join(pro, NULL);
pthread_cond_destroy(&condC);
pthread_cond_destroy(&condP);
pthread_mutex_destroy(&the_mutex);
return 0;
}