并发模型:线程与锁模型

互斥和内存模型

互斥:用锁保证某一时间仅有一个线程可以访问数据;
可能带来的麻烦:竞态条件和死锁。

线程

并发的基本单元:线程,可以讲线程看做控制流;
线程间通信方式:共享内存。

1
2
3
4
5
6
7
8
9
10
11
def say_hi(name)
puts "Hi #{name}!"
end

def say_hi_to_folks(folks)
folks.inject([]) do |threads_array, name|
threads_array << Thread.new { say_hi(name) }
end.each(&:join)
end

say_hi_to_folks %w(Larry Jack)

执行结果有可能是

1
2
Hi Larry!
Hi Jack!

或者是

1
2
Hi Jack!
Hi Larry!

多线程的运行结果依赖于时序,多次运行结果并不稳定。

*注:这里使用 Jruby(基于 JVM),两个线程分别为主线程和子线程,由于 Ruby 中没有相对应的「让出线程」方法 Thread.yield() ,而在 Ruby 中相接近的 Thread.pass 实验效果又很差,故而改由两个子线程举例(为啥要用JRuby? 因为不用编译)

锁儿

多个线程共享内存时,避免同时修改同一个部分内存造成的问题,需要用达到线程互斥的目的。某一时间,至多有一个线程持有锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
 class Counter
def initialize
@count = 0
end
def increment
@count += 1
end
def count
@count
end
end
counter = Counter.new

def thread(counter)
10_000.times { counter.increment }
end

t1 = Thread.new { thread(counter) }
t2 = Thread.new { thread(counter) }

[t1, t2].each(&:join)
puts counter.count

执行结果

1
2
3
4
5
6
➜  /private/tmp ruby:(system: jruby 1.7.19)
$ ruby test.rb
13779
➜ /private/tmp ruby:(system: jruby 1.7.19)
$ ruby test.rb
16440

这段代码创建了一个 counter 对象和两个线程,每个线程调用 counter.increment 10,000次。这段代码看上去很简单,但很脆弱。

几乎每次运行都将获得不同的结果,产生这个结果的原因是两个线程使用 counter.count 时发生了竞态条件(即代码行为取决于各操作的时序)

我们来看一下 JVM 是如何解释 ++count 的。其字节码:

1
2
3
4
getfield #2 ;//获取count的值
iconst_1 ;//设置加数
iadd ;//count加设置好的加数
putfield #2 ;//将更新的值写回count

这就是通称的读-改-写模式。

如果两个线程同时调用 increment ,线程1执行 getfield #2 ,获取值42。在线程1执行其他动作之前,线程2也执行了 getfield #2 ,获得值42。不过,现在两个线程都将获得的值加1,将43写回count中。导致 count 只被增加了一次。

竞态条件的解决方案:对 count 进行同步(synchronize)访问。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Counter
attr_reader :count
def initialize
@count = 0
@counter_mutex = Mutex.new
end

def increment
@counter_mutex.synchronize { @count += 1 }
end
end

def thread(counter)
10_000.times { counter.increment }
end

counter = Counter.new
t1 = Thread.new { thread(counter) }
t2 = Thread.new { thread(counter) }

[t1, t2].each(&:join)
puts counter.count

执行结果

1
2
3
➜  /private/tmp ruby:(system: jruby 1.7.19)
$ ruby test.rb
20000

线程进入 increment 方法时,获得 counter_mutex 锁,函数返回的时候释放该锁。同一时间最多有一个进程可以执行函数体,其他线程调用方法时将被阻塞,直到锁被释放。

优化的副作用

What is the meaning of my life?

由于没有通读过Ruby源码,无法确定这个Bug是否能用Ruby来复现,先用Java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Puzzle {
static boolean answerReady = false;
static int answer = 0;
static Thread t1 = new Thread() {
public void run() {
answer = 42;
answerReady = true;
}
};
static Thread t2 = new Thread() {
public void run() {
if (answerReady)
System.out.println("The meaning of life is: " + answer);
else
System.out.println("I don't know the answer");
}
};
public static void main(String[] args) throws InterruptedException {
t1.start(); t2.start();
t1.join(); t2.join();
}
}

根据线程执行的时序,这段代码的输出可能是:

1
The meaning of life is: 42

或者是

1
I don't know the answer

但是还有一种结果可能是:

1
The meaning of life is: 0

这说明了,当 answerReady 为 true 时 answer 可能为0!

就好像第六行和第七行颠倒了执行顺序。但是乱序执行是完全可能发生的:

  1. 编译器的静态优化可以打乱代码的执行顺序(编译原理)
  2. JVM的动态有话也会打乱代码的执行顺序(JVM)
  3. 硬件可以通过乱序执行来优化其性能(计算机体系结构)

比乱序执行更糟糕的时,有时一个线程产生的修改可能对另一个线程不可见,

如果讲 run() 写成:

1
2
3
4
5
public void run() {
while (!answerReady)
Thread.sleep(100);
System.out.println("The meaning of life is: " + answer);
}

answerReady 可能不会变成 true 代码运行后无法退出。

显然,我们需要一个明确的标准来告诉我们,优化会产生什么副作用影响,这就是 Java 内存模型(其他语言应该也有类似的东西)。Btw,经过本天才的多次试验,Ruby这边可以复现啦!!!

复现代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
require 'singleton'  

class Puzzle
include Singleton

def initialize
@answer_ready = false
@answer = 0
end

def thread1
Thread.new do
@answer = 'eat'
@answer_ready = true
end
end

def thread2
Thread.new do
meaning_of_life = "The meaning of life is: #{@answer}"
no_answer = "I don't know the answer"
puts @answer_ready ? meaning_of_life : no_answer
end
end

def main
[thread1, thread2].each(&:join)
end
end

Puzzle.instance.main

实验过程:

1
2
3
4
5
6
#!/bin/bash

while [ "$result" != "The meaning of life is: 0" ]; do
result="$(ruby test.rb)"
echo $result
done

实验结果:

1
2
3
4
5
6
7
8
9
➜  /tmp ruby:(system: ruby 2.1.5p273)
$ sh test.sh
The meaning of life is: eat
The meaning of life is: eat
I don't know the answer
The meaning of life is: eat
I don't know the answer
I don't know the answer
The meaning of life is: 0

内存可见性

Java 内存模型定义了何时一个线程对内存的修改对另一个线程可见。基本原则是,如果读线程和写线程不进行同步,就不能保证可见性。

除了 increment 之外, count 的 getter 方法也需要进行同步。否则 count 方法可能获得一个失效的值:对于前面交互的两个线程, conter 在 join 之后调用因此是线程安全的。但这种设计为其他调用 conter 的方法埋下了隐患。

所以,「竞态条件」和「内存可见性」都可能让多线程程序运行结果出错。除此之外,还有一类问题:「死锁」。

推荐阅读

深入理解Java内存模型系列
内存可见性

哲学家用餐问题(Dining philosophers problem)

简单来说:有五个哲学家坐在一张圆桌上,每个人之间放着一只餐叉,这样桌上就有五只餐叉。哲学家只会做两件事,吃饭,或者思考。吃东西的时候,他们就停止思考,思考的时候也停止吃东西。
来自Wikipedia

是的,他们每个人都会用到别人用过的餐叉,开不开心。

这个例子一般用来说明死锁问题,经典的场景之一:一名哲学家拿起了自己左手的餐叉,并为其加锁(以免同时被自己左边的哲学家拿到),而后等待自己右手的餐叉锁的释放。

然而,如果五个哲学家同时处于这个状态,就会死锁。

举个栗子:
Chopstick.java

1
2
class Chopstick {
}

Philosopher.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import java.util.Random;

class Philosopher extends Thread {
private int id;
private Chopstick left, right;
private Random random;

public Philosopher(Chopstick left, Chopstick right, int id) {
this.left = left; this.right = right; this.id = id;
random = new Random();
}

public void run() {
try {
while (true) {
Thread.sleep( random.nextInt(1000) ); // Think for a while
synchronized (left) { // Grab left chopstick
System.out.println("Philosopher#" + id + " take left Chopstick");

synchronized (right) { // Grab right chopstick
System.out.println("Philosopher#" + id + " take right Chopstick");
Thread.sleep( random.nextInt(1000) ); // Eat for a while
}
System.out.println("Philosopher#" + id + " put right chopsticks");
}
System.out.println("Philosopher#" + id + " put left chopsticks");
}
} catch (InterruptedException e) {}
}
}

DiningPhilosophers.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import java.util.Random;

public class DiningPhilosophers {

public static void main(String[] args) throws InterruptedException {
Philosopher[] philosophers = new Philosopher[5];
Chopstick[] chopsticks = new Chopstick[5];

for (int i = 0; i < 5; ++i)
chopsticks[i] = new Chopstick();

for (int i = 0; i < 5; ++i) {
philosophers[i] = new Philosopher(chopsticks[i], chopsticks[(i + 1) % 5], i);
philosophers[i].start();
}

for (int i = 0; i < 5; ++i)
philosophers[i].join();
}
}

这个栗子属于可以锁得死死的那种。

因为全局的多个代码块可能会共同使用一些锁,所以我们可以通过为所有的锁添加一个偏序关系,来避免死锁状态的产生。

Philosopher.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class Philosopher extends Thread {
private int id;
private Chopstick first, secound;
private Random random;

public Philosopher(Chopstick left, Chopstick right, int id) {
this.id = id;
if (left.getId() < right.getId()) {
this.first = left; this.second = right;
} else {
this.first = right; this.second = left;
}
random = new Random();
}
public void run() {
try {
while (true) {
Thread.sleep(random.nextInt(1000));
synchronized(first) {
System.out.println("Philosopher#" + id + " take Chopstick#" + first.getId());

synchronized(second) {
System.out.println( "Philosopher#" +
id +" take Chopstick#" + second.getId() );
Thread.sleep( random.nextInt(1000) );
}
System.out.println("Philosopher#" + id + " put Chopstick#" + second.getId());
}
System.out.println("Philosopher#" + id + " put Chopstick#" + first.getId());
}
} catch (InterruptedException e) {}
}
}

外星方法

这里我们构造有一个类从一个URL进行下载, 用 ProgressListeners 监听下载速度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class Downloader extends Thread {
private InputStream in;
private OutputStream out;
private ArrayList<ProgressListener> listeners;

public Downloader(URL url, String outputFilename) throws IOException {
in = url.openConnect().getInputSteam();
out = new FileOutputStream(outputFilename);
listeners = new ArrayList<ProgressListener>();
}

public synchronized void addListener(ProgressListener listener) {
listeners.add(listener);
}

public synchronized void removeListener(ProgressListener listener) {
listeners.remove(listener);
}

private synchronized void updateProgress(int n) {
for (ProgressListener listener: listeners)
listener.onProgress(n);
}

public void run() {
int n = 0, total = 0;
byte[] buffer = new byte[1024];

try {
while ( (n = in.read(buffer)) != -1 ) {
out.write(buffer, 0, n);
total += n;
updateProgress(total);
}
out.flush();
} catch (IOException e) {}
}
}

未完待续

Reference

《七周七并发模型》

评论