Published: 2019-01-09

Lock,Semphore,Condition Variable和Monitor

这篇文章是对于并发控制的一些操作符的介绍和整理。

Table of Contents

1 概述

1.1 概述

在一个多线程程序里,各个线程互相协作(共享资源或者串联执行结果等)来完成任务。

为了使得多线程程序的正确性得到保证(因为线程以任意频率交错执行,由操作系统来调度,不由得用户程序控制),我们需要控制多线程间的协作。

我们通过"同步"("synchronization")来控制多个线程间的协作(同步使得我们可以限制多个线程如何交互执行)。

这里的讨论对于“多进程”来讲也是如此。

首先考虑多线程中对共享资源的访问问题: 有一些机制来控制共享资源:"locks", "mutex", "semphores", "monitors", "condition variables" 还有一些模式来控制和协作对共享资源的访问:"Bounded buffer", "producer-consumer"等

1.2 例子

举个基本的例子:银行账号余额的操作:

withdraw (account, amount) {
    balance = get_balance(account);
    balance = balance – amount;
    put_balance(account, balance);
    return balance;
}

两个线程同时执行上述方法,由于程序不可控的"context switch", 会导致当账户减了钱还没设置回去后,又由另外一个程序获取旧值操作,导致意外结果:

balance = get_balance(account);
balance = balance – amount;

// -> context switch <-

balance = get_balance(account);
balance = balance – amount;
put_balance(account, balance);

// -> context switch <-

put_balance(account, balance);

问题在于两个线程访问一个共享资源却没有“同步”,所以导致了“竞争”(race condition)。 所以需要一种机制,来控制对共享资源的访问。

1.3 哪些是共享资源

一个多线程程序执行数据结构示意:

<插入pic1>

  1. 线程的本地变量不是共享的,它们存储在自己的stack上
  2. 全局变量和静态的一些数据是共享的,它们存储在static data上
  3. 动态对象和其他一些heap上的对象是共享的,它们存储在heap上

2 Lock

2.1 介绍

我们用互斥("mutal exclusion")来同步对共享资源的访问。用互斥动作包过的代码称为"critical section".

Lock提供了两个方法:

  1. acquire() : 进入critical section
  2. releaase() : 离开critical section

线程在acquire和release之间,就称之为线程“持有”这个lock.

acquire会处在等待状态,直到其它线程释放lock.

lock可以分为“空转锁”("spin lock")和“等待锁”("mutex").

采用lock机制,上面的银行账户操作例子可以写成:

withdraw (account, amount) {
    acquire(lock);

    balance = get_balance(account);
    balance = balance – amount;
    put_balance(account, balance);

    release(lock);

    return balance;
}

2.2 实现1

// 这是一个有缺陷的尝试

struct lock {
    int held = 0;
}
void acquire (lock) {
    while (lock->held);  // 如果被其它线程acquire了,这里会一直在运行(空转),所以这个实现是spin lock
    // 缺陷: 如果在这里发生了context switch, 那么会导致race condition
    lock->held = 1;
}
void release (lock) {
    lock->held = 0;
}

所以我们在实现锁的时候还需要一种类似锁的机制? 因为当中一些操作需要原子化(atomic)。

硬件提供给我们一些帮助: Atomic 指令(如"test-and-set")和 "disable/enable interrupts"(防止context switch)

2.3 实现2

// 使用test-and-set实现一个spinlock, 有缺陷的

struct lock {
    int held = 0;
}
void acquire (lock) {
    while (test-and-set(&lock->held));
}
void release (lock) {
    lock->held = 0;
}


spinlock的问题:

浪费资源,因为cpu一直在运行。 而且在单核处理器上时,当一个线程在acquires时spinning,它一直占有cpu, 另外一个持有lock的线程无法进行下去。这就得第一个线程主动调用"yield","sleep"或者发起context switch才行。

2.4 实现3

// 采用disable interrupts方式, 有缺陷的

struct lock {
}
void acquire (lock) {
    disable interrupts;
}
void release (lock) {
    enable interrupts;
}

问题:

acquire时disable interrupts了后,也阻断了(在critical section里)发生其它外部事件(比如timer)来触发context switch的可能。 而且在真实的操作系统上只有kernel才能这么做。并且disable interrupts是一个并不高效的操作。

2.5 实现4

// Higer-level synchronization
// 1. 只在acquire和release里disable interrupts, 而不是像上面那样在critical section里
// 2. 采用queue来保存acquire hold的线程,并让其sleep,等待release的唤醒,而避免了spinning

struct lock {
    int held = 0;
    queue Q;
}
void acquire (lock) {
    Disable interrupts;
    while (lock->held) {
        put current thread on queue Q;
        block current thread;
    }
    lock->held = 1;
    Enable interrupts;
}

void release (lock) {
    Disable interrupts;
    if (Q) remove waiting thread;
    unblock waiting thread;
    lock->held = 0;
    Enable interrupts;
}


3 Semphore

3.1 介绍

lock提供了一种互斥控制的机制。但是它的语义有限,仅仅是对资源的互斥访问。 在其他一些情况下需要有更好的语义的操作符来控制对于共享资源的访问。 就像接下来要介绍的:semaphore, condition variable, monitor。这些机制可以基于lock构建。

semphore也是用来提供对critical section的保护。 它提供了一种控制共享资源池的方式。

semphore是一个共享的计数器, 其支持两种操作:

  1. Wait() , 又称 P() , down()

等待semphore大于0,然后减一。

  1. Signal() , 又称 V()

对semphore加1

Semphore可以分为:

  1. "Mutex semphore"(or binary semaphores), 保证了对资源的单一访问
  2. "Counting semphore"(or general semphore),用于处理对资源的多方访问

上面银行账户的例子采用semphore:

struct Semaphore {
    int value;
    Queue q;
} S;

withdraw (account, amount) {
    wait(S);

    balance = get_balance(account);
    balance = balance – amount;
    put_balance(account, balance);

    signal(S);

    return balance;
}

3.2 实现1

// 有缺陷的实现
// 由于P和V里有需要atomic的操作
// 和上面lock的实现一样, 在P和V里面有可能发生context switch

P(sem) {
   while (sem <= 0)
   ;
   sem = sem - 1;
}


V(sem) {
   sem = sem + 1;
}



3.3 实现2

// 有缺陷的实现
// 采用mutex,来保证P和V的critical section
// 但和spinning lock一样, 会发生spinning

P (csem) {
   while (1)  {
      Acquire_mutex (csem.mutex);
      if (csem.value <= 0) {
         Release_mutex (csem.mutex);
         continue;
      }
      else {
          csem.value = csem.value – 1;
          Release_mutex (csem.mutex);
          break;
      }
   }
}


V (csem) {
    Acquire_mutex (csem.mutex);
    csem.value = csem.value + 1;
    Release_mutex (csem.mutex);
}


3.4 实现3

// 采用queue/block的方式来处理spinning


Struct Semphore {
    int value
    mutex mutex
    queue queue
}
P (csem) {
   while (1)  {
      Acquire_mutex (csem.mutex);
      if (csem.value <= 0) {
         insert_queue (getpid(), csem.queue);  /* Add this thread to */
         Release_mutex_and_block (csem.mutex); /* atomic: lost wake-up */
      }
      else {
          csem.value = csem.value – 1;
          Release_mutex (csem.mutex);
          break;
      }
   }
}


V (csem)
{
    Acquire_mutex (csem.mutex);

    csem.value = csem.value + 1;
    dequeue_and_wakeup (csem.queue)

    Release_mutex (csem.mutex);
}


// 注意,在P()里, 释放mutex并且sleep的需要原子化,
// 不然,当线程1释放mutex并且sleep前发生context switch,这时候其它线程执行V()尝试唤醒线程1
// 然后线程1执行sleep又等带其它唤醒它。
// 操作系统提供了这样的系统调用:sleep(mutex)来实现原子操作

3.5 总结

semphore可以被用来解决任何传统同步问题,但有些缺点:

  1. 本质上是共享的全局变量
  2. semphore和保护的data之间没有直接联系
  3. 同时用作critical sections(mutual exclusion)和 coordination(scheduling)(有时需要注释才清楚使用场景)
  4. 不能保证合适的使用(有时难于发现错误)

4 Condition Variable

4.1 简介

Condition Variable 提供了一种等等事件发生的方式。

Condition variable可以:

  1. 等待某些condition发生
  2. 通知其他线程某个condition发生
  3. 是一个线程间发信号的有用方式

Condition variable 表示的是一个事件, 并不能用来存储或者获取值。

不同于semaphore,它不对资源进行计数和追踪。

Condition Variable支持3种操作:

  1. Wait(mutex)
  2. Signal(mutex)
  3. Broadcase(mutex)

为什么要和mutxe一起使用呢? 因为Condition Variable的使用通常需要测试某个条件,如果条件满足那么wait, 但为了控制wait前不会context switch,那么需要mutex保护。

4.2 使用例子

// 例子1, 等待couter大于等于10
pthread_mutex_t myLock;
pthread_cond_t myCV;
int counter = 0;


/* Thread A */
pthread_mutex_lock(&myLock);

while (counter < 10) {
    pthread_cond_wait(&myCV, &myLock);  // 当counter小于10, 等待 , myLock用来保护counter
}

pthread_mutex_unlock(&myLock);



/* Thread B */
pthread_mutex_lock(&myLock);

counter++;
if (counter == 10) {
    pthread_cond_signal(&myCV);
}

pthread_mutex_unlock(&myLock);





// 例子2: Bounded buffer using CVS



int theArray[ARRAY_SIZE], size;
pthread_mutex_t theLock;
pthread_cond_t theCV;

/* Initialize */
pthread_mutex_init(&theLock, NULL);
pthread_condvar_init(&theCV, NULL);

void put(int val) {
    pthread_mutex_lock(&theLock);
    while (size == ARRAY_SIZE) {
        pthread_cond_wait(&theCV, &theLock);
    }
    addItemToArray(val);
    size++;

    pthread_cond_signal(&theCV);

    pthread_mutex_unlock(&theLock);
}




int get() {
    int item;
    pthread_mutex_lock(&theLock);
    while (size == 0) {
        pthread_cond_wait(&theCV, &theLock);
    }
    item = getItemFromArray();
    size--;

    pthread_cond_signal(&theCV)

    pthread_mutex_unlock(&theLock);
    return item;
}




// 改进版,更高效

int theArray[ARRAY_SIZE], size;
pthread_mutex_t theLock;
pthread_cond_t theCV;

/* Initialize */
pthread_mutex_init(&theLock, NULL);
pthread_condvar_init(&theCV, NULL);

void put(int val) {
    pthread_mutex_lock(&theLock);
    while (size == ARRAY_SIZE) {
        pthread_cond_wait(&theCV, &theLock);
    }
    addItemToArray(val);
    size++;
    if (size == 1) {  // <-- 差异
     pthread_cond_broadcast(&theCV);
    }
    pthread_mutex_unlock(&theLock);
}




int get() {
    int item;
    pthread_mutex_lock(&theLock);
    while (size == 0) {
        pthread_cond_wait(&theCV, &theLock);
    }
    item = getItemFromArray();
    size--;
    if (size == ARRAY_SIZE-1) {  // <-- 差异
     pthread_cond_broadcast(&theCV);
    }
    pthread_mutex_unlock(&theLock);
    return item;
}


4.3 实现

struct condition {
    proc next;  /* doubly linked list implementation of */
    proc prev;  /* queue for blocked threads */
    mutex listLock; /*protects queue */
};


void wait (condition *c, mutex *mx)
  {
    mutex_acquire(&c->listLock);  /* protect the queue */
    enqueue (&c->next, &c->prev, thr_self()); /* enqueue */
    mutex_release (&c->listLock); /* we're done with the list */

    /* The suspend and release_mutex() operation should be atomic */
    release_mutex (mx);
    thr_suspend (self);  /* Sleep 'til someone wakes us */

    mutex_acquire (mx); /* Woke up -- our turn, get resource lock */

    return;
  }


void signal (condition *c)
  {
    thread_id tid;

    mutex_acquire (c->listlock); /* protect the queue */
    tid = dequeue(&c->next, &c->prev);
    mutex_release (listLock);

    if (tid>0)
      thr_continue (tid);

    return;
  }



void broadcast (condition *c)
  {
    thread_id tid;

    mutex_acquire (c->listLock); /* protect the queue */
    while (&c->next) /* queue is not empty */
    {
      tid = dequeue(&c->next, &c->prev); /* wake one */
      thr_continue (tid); /* Make it runnable */
    }
    mutex_release (c->listLock); /* done with the queue */
  }

4.4 和semphore关系

尽管condition variable和semphore可以用来互相实现对方,他们有不同的语义。

4.4.1 wait和signal差异

condition variable wait时,需要通过lock控制,wait时要释放lock,然后block

semphore wait时,仅仅block。

condition variable signal时,如果没有等待的线程,那么这次signal就回丢失

semphore signal时,如果没有等待的线程,那么这次signal会被计数留作将来使用。

5 TODO Monitor

5.1 概述

Monitort提供了一种对对象的并发控制封装机制, monitor对象里包含的变量、conditionVariables和方法都是受保护的(并发控制有保证)。

Monitor里,同一时间最多只有一个线程活跃。

当线程1进入monitor后,线程2线程3想访问monitor方法需要进入等待队列

当线程1还在monitor里,但因某种情况wait后,线程2获取进入monitor的权利

当线程2在monitor里因为某种情况signal后,接下来的操作分两种: "Hoare Monitor" 和 "Mesa Monitor".

5.2 Hoare Monitor

线程2 signal后,立即运行waiting的线程1,把持有的锁交给线程1。线程2进入等待状态

5.3 Mesa Monitor

线程2 signal后,waiting的线程1进入monitor的ready queue,线程2继续运行。线程1等到其可以获取到锁后再运行

这也是phtreads, java, C#里的实现

5.4 差别

Hoare Monitor里, 当线程1醒来后,触发线程2 signal的condition一定会成立。而在 Mesa Monitor里,醒来的线程1先要去再检查condition,确保其成立(因为可能在线程1等到获取锁的时候被另外的线程4等修改)

Hoare
  if (empty)
    wait(condition);


Mesa
  while (empty)
    wait(condition);

差异对比:
Mesa更少的context switch和更容易支持broadcase,更易使用,更加高效

Hoare可操作性少,但构建的程序更加容易理解

5.5 例子

// Monitor Bounded Buffer
Monitor bounded_buffer {
    Resource buffer[N];

    // Variables for indexing buffer
    // monitor invariant involves these vars
    Condition not_full; // space in buffer
    Condition not_empty; // value in buffer

    void put_resource (Resource R) {
        while (buffer array is full)
            wait(not_full);

        Add R to buffer array;
        signal(not_empty);
    }

    Resource get_resource() {
        while (buffer array is empty)
            wait(not_empty);

        Get resource R from buffer array;
        signal(not_full);
        return R;
    }
} // end monitor


// 这里实现了一个Monitor Bounded buffer, 多线程对其方法put_resource和get_resource的使用是受保护的。

5.6 Java与Monitor

任何Java对象里都隐式包含了lock和condition variable。

最多一个线程可以进入到Monitor里, 线程通过的方式执行标有"synchronized"的方法或者语句进入对象的monitor。

一个例子

class BoundedBuffer {
    private int size;
    private int theArray[ARRAY_SIZE];

    public synchronized void put(int x) {
      while (size == ARRAY_SIZE) this.wait();
      theArray[size] = x;
      size++;
      if (size == 1) this.notifyAll();
    }

    public synchronized int get() {
      while (size == 0) this.wait();
      size--;
      if (size == ARRAY_SIZE-1) this.notifyAll();
      return theArray[size];
    }
}

#+ENDSRC

6 参考资料

7 END

Author: Nisen

Email: imnisen@163.com