# 条件变量

线程有时需要检查某一条件(condition)满足后才能继续运行:子线程 join 父线程。这也就意味着父线程需要等待子线程运行完才能继续运行。

  • 设计一个共享变量:这样父线程就需要一直自旋来等待子线程运行结束,将共享变量改为父线程可用状态。这种设计缺点就是浪费时间,某些情况甚至是错误的。
  • 加入等待队列:其实整个设计思想和 A 篇中我们讲到设计锁的思想很相似,这里的条件变量就是” “。当条件不满足时,线程就将自己加入队列,等待该条件。某个线程修改了条件状态,就唤醒一个或多个等待线程。

定义:条件变量是一个显示的队列,当某些执行状态(即条件)不满足时,线程就将自己加入队列,等待条件

关于条件变量的两种操作: wait()singal()wait() 职责是释放锁,并让调用线程休眠(原子地),当线程被唤醒时,需要重新竞争锁再返回调用者这是为了避免线程陷入休眠时产生一些竞态条件signal() 唤醒等待在某个条件变量上的睡眠线程。

wait 会释放锁,牢记,实际上,在 Java 语言中, Object 也有 await() 函数,该方法也会释放锁。

int done = 0;
pthread_cond_wait(pthread_cond_t *c, pthread_mutex_t *m);
pthread_cond_signal(pthread_cond_t *c); 
void thread_exit() {
    lock(&m);// 假设已经初始化了一个宏变量 m 作为锁
    done = 1;
    pthread_cond_signal(&c);// 假设已经初始化了一个宏变量 c 作为等待队列
    unlock(&m);
}
void *child(void *arg) {
    printf("child\n");
    thread_exit();
    
    return NULL;
}
void thread_join() {
    lock(&m);
    while(done == 0) 
        pthread_cond_wait(&c, &m);
    unlock(&m);
}
int main(int argc, char *agrv[]) {
    printf("parent: begin\n");
    thread p; 
    thread_create(&p, NULL, child, NULL);
    
    thread_join();
    printf("parent: end\n");
	return 0;
}

设想如果在发送信号和等待时都不加锁会发生什么问题?在操作系统 A 篇中讲过:改变锁的状态时一定要是线程安全的。这里保证互斥除了锁,还有条件变量,也就是信号。

DIjkstra 最早在 “私有信号量” 提出等待条件这个思想,Hoare 在关于观察者的工作中,将这种类似的思想称为条件变量

void thr_exit() {
    done = 1;
    pthread_cond_signal(&c);
}
void thr_join() {
    while(done == 0)
        pthread_cond_wait(&c, &m);
}

设想,如果父线程调用 thr_join() ,检查到 done==0 ,然后试图休眠,此时发生中断,然后子线程修改 done=1 ,发出信号,此时子线程检查到没有等待线程。父线程再次运行,进入休眠,之后就会一直休眠,没有线程唤醒。

发信号时总是持有锁,但也有一些情况可以不加锁,而这可能是你应该避免的。因此,为 了简单,请在调用 signal 时持有锁(hold the lock when calling signal)。同样的,在 wait 时也必须持有锁,这是 wait 语法强制要求的。因为 wait 调 用总是假设你调用它时已经持有锁、调用者睡眠之前会释放锁以及返回前重新持有锁。因此,这个提示 的一般化形式是正确的:调用 signalwait 时要持有锁(hold the lock when calling signal or wait)。

在操作系统 A 篇中我们最后给了一个设计锁的例子,结构体里面也有一个队列 queue_t *q ,该队列是竞争锁失败后,线程进入阻塞队列等待锁空闲。线程为了节省时间也会休眠。但是条件变量不同的是需要线程主动调用 wait 主动放弃锁的占用,进入等待队列从而休眠,需要等到一个指定条件发生才会被唤醒。

# 生产 / 消费问题

这也是 Dijkstra 提出来的 —— 生产者消费者问题。我们回顾前面的代码,你也许认为将:

while(done == 0)
    wait();

改成:

if(done == 0)
    wait();

也是符合逻辑的。在父线程 / 子线程的案例中,这确实是合理的,但是在使用条件变量时,应该总是使用 while ,在生产者消费者问题中就能够体现出来。

先看伪代码: count==1 表示缓冲区有数据可以被消费,消费之后就为 0。

void *producer(void *arg) {
    int i;
    for(i = 0; i < loops; i++) {
        lock(&m);
        if(count == 1)
            wait(&c,&m);
        put(i);
        signal(&c);
        unlock(&m);
    }
}
void *consumer(void *arg) {
    int i;
    for(i = 0; i < loops; i++) {
        lock(&m);
        if(count == 1)
            wait(&c,&m);
        int tmp = get();
        signal(&c);
        unlock(&m);
        printf("%d\n",tmp);
    }
}

上述代码使用的就是 if 语句,我们假设只有一个生产者。如果只有一个消费者,那么这段代码不会出现问题。但是当出现两个消费者时,就会导致错误。

首先,这个问题肯定是当缓冲区没有数据,就不应该调用 get 的。

设想 C1C2 两个消费者此时都 wait ,等待缓冲区加入数据。生产者产生数据后,唤醒了 C1 并拿到了锁, C1 开始执行 int tmp = get(); 这段代码。然后继续唤醒等待队列中的线程。当然有可能唤醒 C2 线程并且拿到锁,那么 C2 此时也会执行 get() 函数,但是此时缓冲区并没有数据,缓冲区无法消费,就会断言触发,提示整个并发环境出现了问题。所以需要将 if 改成 while ,每次被唤醒都需要重新检查一下缓冲区是否还可以消费。

其实导致我们需要循环检查缓冲区消费情况的本质原因是:消费者在休眠的时候 count 可能会减小。

但是仅仅是将 if 改为 while 还是存在问题,主要原因是生产者和消费者都位于同一个休眠(等待,阻塞)队列(这是我自己总结的原因,先不用急着反驳)。

设想这种情况: P1 作为生产者, C1C2 作为两个消费者. C1C2 因为缓冲区没有数据先休眠,进入队列。当 P1 向缓冲区放入数据,然后进入队列。此时 C1 被唤醒,消费了数据之后,需要唤醒一个队列中的线程。

此时应该唤醒一个生产者,因为缓冲区没有数据了,但是不同的队列的管理方式不同,并不是所有队列都是 FIFO ,所以程序运行到后期你根本不知道下一个被唤醒的是生产者还是消费者。如果恰好此时 C2 被唤醒了,进入 while 循环发现缓冲区没有数据,于是继续 wait 休眠,进入队列。此时三个线程都休眠了,就不会再有线程来唤醒队列中的线程了,程序崩溃。

所以只需要使用两个队列即可,标准的说法就是使用两个条件变量。

cond_t empty, fill;
// producer
while(count == 1)
    wait(&empty, &m); // 当缓冲区为空时,就需要从 empty 唤醒生产者
put(i);
signal(&fill);
// consumer
while(count == 0)
    wait(&fill, &m);
int tmp = get();
signal(&empty);

# 覆盖条件

这其实就是 signal_all() 方法,将休眠的线程全部唤醒。有时这样的操作是有必要的,尽管他比较损耗性能。

举个内存分配的问题,当 allocate(int size) 遇到了内存不足时,就会休眠等待,相应的,线程释放内存会发出信号说有更多内存空间,但是,代码中有一个问题就是,应该缓行哪个线程?

极端的情况就是:共有 3 个线程,2 个都在休眠等待,最后一个线程释放了内存后,现在还有 50 的内存,线程 A 需要 100,线程 B 需要 10,但是不能够保证一定唤醒 B 线程,如果唤醒了 A 线程,那么所有线程都会进入休眠,所以有人的解决方案就是将所有线程都唤醒,这就是覆盖条件。

其实单论这个问题的话,可以用优先队列来解决,当然我只是借这个问题引出覆盖条件的概念。

# 信号量

信号量是有一个整数值的对象,可以用两个函数操作它。在 POSIX 标准中,是 sem_wait()sem_post() 。当线程完成一次对信号量对象的等待(wait)时,计数值减一;当线程完成一次对信号量的释放(release)时,计数值加一。

# 应用:锁

最简单的例子就是使用信号量作为锁,那么同时间进入临界区只能有一个线程,假设最开始 A 线程进入临界区,此时需要调用 wait() ,将信号量减一,如果此时不允许其他线程进入,那么信号量此时至少为 0. 因为其他线程进入临界区之前也需要调用一下 wait() ,此时再判断信号量为负,不能进入临界区,进入休眠。因为锁只有两种状态,所以这种用法有时也叫做二值信号量

所以信号量作为锁,初始化的计数值应该为 1。 图中的 signal 相当于 POSIX 中的 sme_post()

# 应用:条件变量

本文最开始讲解条件变量时的例子是父线程创建子线程,并等待子线程执行结束。用一个显示队列作为条件变量,使用 wait/signal 来保证线程安全。

下面讲一下如何用信号量来实现这种效果。

首先要理解一个点,信号量并不是什么很特别的东西,本质就是一个整数,只是说,操作系统存在调用,如 sem_wait()sem_signal() 。重要的是它能够让线程停下来等待,放弃 CPU 资源进入休眠。

父线程等待子线程的例子中,逻辑为:

void *child(void *arg) {
    printf("child\n");
    sem_post(&s);
    return NULL;
}
sem_init(&s, 0, x); // 0 表示同一进程所有线程都共享信号量,计数值初始化为 x
printf("parent:begin\n");
thread_create(c, NULL, child, NULL); // 创建子线程并运行
sem_wait(&s);
printf("parent:end\n");

使用信号量代替条件变量,计数值初始化为 0。

需要思考父线程先执行 sem_wait() 和子线程先执行 sem_post() 两种情况。

# 应用:实现信号量

该部分模拟实现一下 sem_waitsem_post 。《操作系统导论》一书中将这个自定义信号量称为 Zemaphore ,我们延用一下。

typedef struct _Zem_t {
    int val; // 计数值
    pthread_cond_t cond; // 休眠 / 等待队列,肯定要有,不然你以为谁替你维护一个队列
    pthread_mutex_t lock; // 锁
}
void Zem_init(Zem_t *s, int val) {
    s->val = val;
    Cond_init(&s->cond); // 初始化阻塞队列
    Mutex_init(&s->lock); // 初始化锁,
}
void Zem_wait(Zem_t *s) {
    Mutex_lock(&s->lock);
    
    while(s->val <= 0) // 进入队列休眠 
        Cond_wait(&s->cond, &s->lock);
    s->val--;
    Mutex_unlock(&s->lock);
}
void Zem_post(Zem_t *s) {
    Mutex_lock(&s->lock);
    s->val++;
    Cond_signal(&s->cond);
    Mutex_unlock(&s->lock);
}

只用了一把锁、一个条件变量和一个状态的变量来记录信号量的值。是不是感觉信号量就是将锁和条件变量封装了一下。

# 参考

https://pages.cs.wisc.edu/~remzi/OSTEP/Chinese/30.pdf

https://pages.cs.wisc.edu/~remzi/OSTEP/Chinese/31.pdf

https://zh.m.wikipedia.org/zh-hans/ 信号量