疯狂的小鸡

多线程-Lock

字数统计: 3.6k阅读时长: 15 min
2018/10/11 Share

饥饿和公平

如果一个线程因为CPU时间全部被其他线程抢走而得不到CPU运行时间,这种状态被称之为“饥饿”。而该线程被“饥饿致死”正是因为它得不到CPU运行时间的机会。解决饥饿的方案被称之为“公平性” – 即所有线程均能公平地获得运行机会。

Java中导致饥饿的原因:

  • 高优先级线程吞噬所有的低优先级线程的CPU时间。
  • 线程被永久堵塞在一个等待进入同步块的状态。
  • 线程在等待一个本身也处于永久等待完成的对象(比如调用这个对象的wait方法)。

首先来看一段简单的同步态代码:

1
2
3
4
5
public class Synchronizer{
public synchronized void doSynchronized(){
//do a lot of work which takes a long time
}
}

如果有一个以上的线程调用doSynchronized()方法,在第一个获得访问的线程未完成前,其他线程将一直处于阻塞状态,而且在这种多线程被阻塞的场景下,接下来将是哪个线程获得访问是没有保障的。

同步锁

为了提高等待线程的公平性,我们首先使用锁方式来替代同步块。
使用lock替代sychronized一例:

1
2
3
4
5
6
7
8
public class Synchronizer{
Lock lock = new Lock();
public void doSynchronized() throws InterruptedException{
this.lock.lock();
//critical section, do a lot of work which takes a long time
this.lock.unlock();
}
}

注:如果用Lock来保护临界区,并且临界区有可能会抛出异常,那么在finally语句中调用unlock()就显得非常重要了。这样可以保证这个锁对象可以被解锁以便其它线程能继续对其加锁。

下面是Lock类做的一个简单实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class Lock{
private boolean isLocked      = false;
private Thread lockingThread = null;

public synchronized void lock() throws InterruptedException{
while(isLocked){
wait();
}
isLocked = true;
lockingThread = Thread.currentThread();
}

public synchronized void unlock(){
if(this.lockingThread != Thread.currentThread()){
throw new IllegalMonitorStateException(
"Calling thread has not locked this lock");

}
isLocked = false;
lockingThread = null;
notify();
}
}

如果存在多线程并发访问lock(),这些线程将阻塞在对lock()方法的访问上。另外,如果锁已经锁上,这里指的是isLocked等于true时,这些线程将阻塞在while(isLocked)循环的wait()调用里面。要记住的是,当线程正在等待进入lock() 时,可以调用wait()释放其锁实例对应的同步锁,使得其他多个线程可以进入lock()方法,并调用wait()方法。

这个锁只是实现了基本的同步锁, 由于同步块不会对等待进入的多个线程谁能获得访问做任何保障,同样当调用notify()时,wait()也不会做保障一定能唤醒线程因此这个版本的Lock类和doSynchronized()那个版本就保障公平性而言,没有任何区别。

公平锁

当前的Lock类版本调用自己的wait()方法,如果每个线程在不同的对象上调用wait(),那么只有一个线程会在该对象上调用wait(),Lock类可以决定哪个对象能对其调用notify(),因此能做到有效的选择唤醒哪个线程。

公平锁实现的基本思想是:每一个调用lock()的线程都会进入一个队列,当解锁后,只有队列里的第一个线程被允许锁住Farlock实例,所有其它的线程都将处于等待状态,直到他们处于队列头部。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class FairLock {
private boolean isLocked = false;
private Thread lockingThread = null;
private List<QueueObject> waitingThreads =
new ArrayList<QueueObject>();

public void lock() throws InterruptedException{
QueueObject queueObject = new QueueObject();
boolean isLockedForThisThread = true;
synchronized(this){
waitingThreads.add(queueObject);
}

while(isLockedForThisThread){
synchronized(this){
isLockedForThisThread =
isLocked || waitingThreads.get(0) != queueObject;
if(!isLockedForThisThread){
isLocked = true;
waitingThreads.remove(queueObject);
lockingThread = Thread.currentThread();
return;
}
}
try{
queueObject.doWait();
}catch(InterruptedException e){
synchronized(this) { waitingThreads.remove(queueObject); }
throw e;
}
}
}

public synchronized void unlock(){
if(this.lockingThread != Thread.currentThread()){
throw new IllegalMonitorStateException(
"Calling thread has not locked this lock");
}
isLocked = false;
lockingThread = null;
if(waitingThreads.size() > 0){
waitingThreads.get(0).doNotify();
}
}
}

public class QueueObject {

private boolean isNotified = false;

public synchronized void doWait() throws InterruptedException {
while(!isNotified){
this.wait();
}
this.isNotified = false;
}

public synchronized void doNotify() {
this.isNotified = true;
this.notify();
}

public boolean equals(Object o) {
return this == o;
}
}

FairLock新创建了一个QueueObject的实例,并对每个调用lock()的线程进行入队列。调用unlock()的线程将从队列头部获取QueueObject,并对其调用doNotify(),以唤醒在该对象上等待的线程。通过这种方式,在同一时间仅有一个等待线程获得唤醒,而不是所有的等待线程。

注1:doWait()和doNotify()方法在QueueObject中保存着信号,避免信号丢失。

注2:queueObject.doWait()调用放置在synchronized(this)块之外,以避免被monitor嵌套管程锁死,所以另外的线程可以解锁,只要当没有线程在lock方法的synchronized(this)块中执行即可。

注3:注意到queueObject.doWait()在try – catch块中是怎样调用的。在InterruptedException抛出的情况下,线程得以离开lock(),并需让它从队列中移除。

重入锁

Java中的synchronized同步块是可重入的。这意味着如果一个java线程进入了代码中的synchronized同步块,并因此获得了该同步块使用的同步对象对应的管程上的锁,那么这个线程可以进入由同一个管程对象所同步的另一个java代码块。下面是一个例子:

1
2
3
4
5
6
7
8
9
public class Reentrant{
public synchronized outer(){
inner();
}

public synchronized inner(){
//do something
}
}

注意outer()和inner()都被声明为synchronized,这在Java中和synchronized(this)块等效。如果一个线程调用了outer(),在outer()里调用inner()就没有什么问题,因为这两个方法(代码块)都由同一个管程对象(”this”)所同步。

应用到我们自定义的锁上:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Reentrant2{
Lock lock = new Lock();

public outer(){
lock.lock();
inner();
lock.unlock();
}

public synchronized inner(){
lock.lock();
//do something
lock.unlock();
}
}

可见,之前对锁的实现不支持重入,调用outer()的线程首先会锁住Lock实例,然后继续调用inner()。inner()方法中该线程将再一次尝试锁住Lock实例,结果该动作会失败(也就是说该线程会被阻塞),因为这个Lock实例已经在outer()方法中被锁住了。

实现Lock类可重入性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Lock{
boolean isLocked = false;
Thread lockedBy = null;
int lockedCount = 0;

public synchronized void lock()
throws InterruptedException{
Thread callingThread =
Thread.currentThread();
while(isLocked && lockedBy != callingThread){
wait();
}
isLocked = true;
lockedCount++;
lockedBy = callingThread;
}

public synchronized void unlock(){
if(Thread.curentThread() ==
this.lockedBy){
lockedCount--;
if(lockedCount == 0){
isLocked = false;
notify();
}
}
}

...
}

可以看到,如果当前的锁对象没有被加锁,或者当前调用线程已经对该Lock实例加了锁,wait()就不会执行。

除此之外,我们需要记录同一个线程重复对一个锁对象加锁的次数。否则,一次unlock()调用就会解除整个锁,即使当前锁已经被加锁过多次。在unlock()调用没有达到对应lock()调用的次数之前,我们不希望锁被解除。

读写锁

假设你的程序中涉及到对一些共享资源的读和写操作,且写操作没有读操作那么频繁。在没有写操作的时候,两个线程同时读一个资源没有任何问题,所以应该允许多个线程能在同时读取共享资源。但是如果有一个线程想去写这些共享资源,就不应该再有其它线程对该资源进行读或写。也就是说:读-读能共存,读-写不能共存,写-写不能共存)。这就需要一个读/写锁来解决这个问题。

读写条件分析

如果某个线程想要读取资源,只要没有线程正在对该资源进行写操作且没有线程请求对该资源的写操作即可。

我们假设对写操作的请求比对读操作的请求更重要,就要提升写请求的优先级。因为,如果读操作发生的比较频繁,我们又没有提升写操作的优先级,那么就会产生“饥饿”现象。请求写操作的线程会一直阻塞,直到所有的读线程都从ReadWriteLock上解锁了。 因此,只有当没有线程正在锁住ReadWriteLock进行写操作,且没有线程请求该锁准备执行写操作时,才能保证读操作继续。

当其它线程没有对共享资源进行读操作或者写操作时,某个线程就有可能获得该共享资源的写锁,进而对共享资源进行写操作。

一个读写锁实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class ReadWriteLock{
private int readers = 0;
private int writers = 0;
private int writeRequests = 0;

public synchronized void lockRead()
throws InterruptedException{
while(writers > 0 || writeRequests > 0){
wait();
}
readers++;
}

public synchronized void unlockRead(){
readers--;
notifyAll();
}

public synchronized void lockWrite()
throws InterruptedException{
writeRequests++;

while(readers > 0 || writers > 0){
wait();
}
writeRequests--;
writers++;
}

public synchronized void unlockWrite()
throws InterruptedException{
writers--;
notifyAll();
}
}

需要注意的是,在两个释放锁的方法(unlockRead,unlockWrite)中,都调用了notifyAll方法,而不是notify。要解释这个原因,我们可以想象下面一种情形:

如果有线程在等待获取读锁,同时又有线程在等待获取写锁。如果这时其中一个等待读锁的线程被notify方法唤醒,但因为此时仍有请求写锁的线程存在(writeRequests>0),所以被唤醒的线程会再次进入阻塞状态。然而,等待写锁的线程一个也没被唤醒,就像什么也没发生过一样(译者注:信号丢失现象)。如果用的是notifyAll方法,所有的线程都会被唤醒,然后判断能否获得其请求的锁。

用notifyAll还有一个好处。如果有多个读线程在等待读锁且没有线程在等待写锁时,调用unlockWrite()后,所有等待读锁的线程都能立马成功获取读锁 —— 而不是一次只允许一个。

上面实现的读/写锁(ReadWriteLock) 是不可重入的,当一个已经持有写锁的线程再次请求写锁时,就会被阻塞。原因是已经有一个写线程了——就是它自己。ReadWriteLock就会被锁定 。不会再有线程能够成功获取读锁或写锁了。

读锁重入

为了让ReadWriteLock的读锁可重入,我们要先为读锁重入建立规则:

要保证某个线程中的读锁可重入,要么满足获取读锁的条件(没有写或写请求),要么已经持有读锁(不管是否有写请求)。

要确定一个线程是否已经持有读锁,可以用一个map来存储已经持有读锁的线程以及对应线程获取读锁的次数,当需要判断某个线程能否获得读锁时,就利用map中存储的数据进行判断。修改后的代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class ReadWriteLock{
private Map<Thread, Integer> readingThreads =
new HashMap<Thread, Integer>();

private int writers = 0;
private int writeRequests = 0;

public synchronized void lockRead()
throws InterruptedException{
Thread callingThread = Thread.currentThread();
while(! canGrantReadAccess(callingThread)){
wait();
}

readingThreads.put(callingThread,
(getAccessCount(callingThread) + 1));
}

public synchronized void unlockRead(){
Thread callingThread = Thread.currentThread();
int accessCount = getAccessCount(callingThread);
if(accessCount == 1) {
readingThreads.remove(callingThread);
} else {
readingThreads.put(callingThread, (accessCount -1));
}
notifyAll();
}

private boolean canGrantReadAccess(Thread callingThread){
if(writers > 0) return false;
if(isReader(callingThread) return true;
if(writeRequests > 0) return false;
return true;
}

private int getReadAccessCount(Thread callingThread){
Integer accessCount = readingThreads.get(callingThread);
if(accessCount == null) return 0;
return accessCount.intValue();
}

private boolean isReader(Thread callingThread){
return readingThreads.get(callingThread) != null;
}
}

写锁重入

仅当一个线程已经持有写锁,才允许写锁重入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class ReadWriteLock{
private Map<Thread, Integer> readingThreads =
new HashMap<Thread, Integer>();

private int writeAccesses = 0;
private int writeRequests = 0;
private Thread writingThread = null;

public synchronized void lockWrite()
throws InterruptedException{
writeRequests++;
Thread callingThread = Thread.currentThread();
while(!canGrantWriteAccess(callingThread)){
wait();
}
writeRequests--;
writeAccesses++;
writingThread = callingThread;
}

public synchronized void unlockWrite()
throws InterruptedException{
writeAccesses--;
if(writeAccesses == 0){
writingThread = null;
}
notifyAll();
}

private boolean canGrantWriteAccess(Thread callingThread){
if(hasReaders()) return false;
if(writingThread == null) return true;
if(!isWriter(callingThread)) return false;
return true;
}

private boolean hasReaders(){
return readingThreads.size() > 0;
}

private boolean isWriter(Thread callingThread){
return writingThread == callingThread;
}
}

读锁升级到写锁

有时,我们希望一个拥有读锁的线程,也能获得写锁。想要允许这样的操作,要求这个线程是唯一一个拥有读锁的线程。
代码修改如下:

1
2
3
4
5
6
7
8
9
10
11
private boolean canGrantWriteAccess(Thread callingThread){
if(isOnlyReader(callingThread)) return true;
if(hasReaders()) return false;
if(writingThread == null) return true;
if(!isWriter(callingThread)) return false;
return true;
}

private boolean isOnlyReader(Thread thread){
return readers == 1 && readingThreads.get(callingThread) != null;
}

写锁降级到读锁

有时拥有写锁的线程也希望得到读锁。如果一个线程拥有了写锁,那么自然其它线程是不可能拥有读锁或写锁了。所以对于一个拥有写锁的线程,再获得读锁,是不会有什么危险的。

重入读写锁完整实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
public class ReadWriteLock{
private Map<Thread, Integer> readingThreads =
new HashMap<Thread, Integer>();

private int writeAccesses = 0;
private int writeRequests = 0;
private Thread writingThread = null;

public synchronized void lockRead()
throws InterruptedException{
Thread callingThread = Thread.currentThread();
while(! canGrantReadAccess(callingThread)){
wait();
}

readingThreads.put(callingThread,
(getReadAccessCount(callingThread) + 1));
}

private boolean canGrantReadAccess(Thread callingThread){
if(isWriter(callingThread)) return true;
if(hasWriter()) return false;
if(isReader(callingThread)) return true;
if(hasWriteRequests()) return false;
return true;
}


public synchronized void unlockRead(){
Thread callingThread = Thread.currentThread();
if(!isReader(callingThread)){
throw new IllegalMonitorStateException(
"Calling Thread does not" +
" hold a read lock on this ReadWriteLock");
}
int accessCount = getReadAccessCount(callingThread);
if(accessCount == 1){
readingThreads.remove(callingThread);
} else {
readingThreads.put(callingThread, (accessCount -1));
}
notifyAll();
}

public synchronized void lockWrite()
throws InterruptedException{
writeRequests++;
Thread callingThread = Thread.currentThread();
while(!canGrantWriteAccess(callingThread)){
wait();
}
writeRequests--;
writeAccesses++;
writingThread = callingThread;
}

public synchronized void unlockWrite()
throws InterruptedException{
if(!isWriter(Thread.currentThread()){
throw new IllegalMonitorStateException(
"Calling Thread does not" +
" hold the write lock on this ReadWriteLock");
}
writeAccesses--;
if(writeAccesses == 0){
writingThread = null;
}
notifyAll();
}

private boolean canGrantWriteAccess(Thread callingThread){
if(isOnlyReader(callingThread)) return true;
if(hasReaders()) return false;
if(writingThread == null) return true;
if(!isWriter(callingThread)) return false;
return true;
}


private int getReadAccessCount(Thread callingThread){
Integer accessCount = readingThreads.get(callingThread);
if(accessCount == null) return 0;
return accessCount.intValue();
}


private boolean hasReaders(){
return readingThreads.size() > 0;
}

private boolean isReader(Thread callingThread){
return readingThreads.get(callingThread) != null;
}

private boolean isOnlyReader(Thread callingThread){
return readingThreads.size() == 1 &&
readingThreads.get(callingThread) != null;
}

private boolean hasWriter(){
return writingThread != null;
}

private boolean isWriter(Thread callingThread){
return writingThread == callingThread;
}

private boolean hasWriteRequests(){
return this.writeRequests > 0;
}
}

参考文档:
Jenkov.com/java-concurrency

更多Java基础系列文章,参见Java基础大纲

CATALOG
  1. 1. 饥饿和公平
  2. 2. 同步锁
  3. 3. 公平锁
  4. 4. 重入锁
  5. 5. 读写锁
    1. 5.1. 读写条件分析
    2. 5.2. 一个读写锁实现
    3. 5.3. 读锁重入
    4. 5.4. 写锁重入
    5. 5.5. 读锁升级到写锁
    6. 5.6. 写锁降级到读锁
  6. 6. 重入读写锁完整实现: