0%

java-并发(二)

五、线程同步

java允许多线程并发控制,当多个线程同时操作一个可共享的资源变量时(如数据的增删改查),将会导致数据不准确,相互之间产生冲突,因此加入同步锁以避免在该线程没有完成操作之前,被其他线程的调用,从而保证了该变量的唯一性和准确性。

1、使用重入锁(Lock)实现线程同步

​ 在JavaSE5.0中新增了一个java.util.concurrent包来支持同步。ReentrantLock类是可重入、互斥、实现了Lock接口的锁,它与使用synchronized方法和快具有相同的基本行为和语义,并且扩展了其能力。ReenreantLock类的常用方法有:

1
2
3
ReentrantLock() : 创建一个ReentrantLock实例         
lock() : 获得锁
unlock() : 释放锁

注:ReentrantLock()还有一个可以创建公平锁的构造方法,但由于能大幅度降低程序运行效率,不推荐使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//只给出要修改的代码,其余代码与上同
class Bank {

private int account = 100;
//需要声明这个锁
private Lock lock = new ReentrantLock();
public int getAccount() {
return account;
}
//这里不再需要synchronized
public void save(int money) {
lock.lock();//
try{
account += money;
}finally{
lock.unlock();//解锁线程
}

}

lock.lock();确保只有一个线程进入临界区,一旦一个线程进入之后,会获得锁对象,其他线程无法通过lock语句。当其他线程调用lock时,它们会被阻塞,知道第一个线程释放锁对象。

lock.unlock();解锁操作,一定要放到finally里,因为如果try语句里出了问题,锁必须被释放,否则其他线程将永远被阻塞

因为系统会随机为线程分配资源,所以在线程获得锁对象之后,可能被系统剥夺运行权,这时候其他线程来访问,但是发现有锁,进不去,只能等拿到锁对象的线程把里面的代码执行完毕后,释放锁,第二个线程才能运行。

2、synchronzied关键字

前面我们讲了ReentrantLock锁对象的使用,但是在系统里面我们不一定要使用ReentrantLock锁,Java中还提供了一个内部的隐式锁,关键字是synchronized.

举个例子:

1
2
3
public synchronized void Method() {
//do some work...
}

synchronized关键字说明

  总的说来,synchronized关键字可以作为函数的修饰符,也可作为函数内的语句,也就是平时说的同步方法和同步语句块。如果再细的分类,synchronized可作用于instance变量(成员变量)、object reference(对象实例引用)、static函数和class literals(类名称字面常量)身上。

在进一步阐述之前,我们需要明确几点:

  • 无论synchronized关键字加在方法上还是对象上,它取得的锁都是对象,而不是把一段代码或函数当作锁――而且同步方法很可能还会被其他线程的对象访问。

  • 每个对象只有一个锁(lock)与之相关联。JVM会给类的每个实例化的对象赋予一个单独的锁。

  • 实现同步是要很大的系统开销作为代价的,甚至可能造成死锁,所以尽量避免无谓的同步控制。

  注意:在同步块和同步方法中,是给类或类的对象进行加锁,而不是给方法加锁。所谓的需要获得对象的锁才能执行方法,也是针对线程而言的。

synchronized 方法:

通过在方法声明中加入 synchronized关键字来声明 synchronized 方法。如下:

1
2
3
4
5
6
class MyClass{
private String name;
public synchronized void setName(String name){
this.name = name;
}
}

  synchronized方法控制对类成员变量的访问:每个类实例对应一把锁,每个 synchronized 方法都必须获得调用该方法的类实例的锁方能执行,否则所属线程阻塞,方法一旦执行,就独占该锁,直到从该方法返回时才将锁释放,此后被阻塞的线程方能获得该锁,重新进入可执行状态。这种机制确保了同一时刻对于每一个类实例,其所有声明为 synchronized 的成员函数中至多只有一个处于可执行状态(因为至多只有一个能够获得该类实例对应的锁),从而有效避免了类成员变量的访问冲突(只要所有可能访问类成员变量的方法均被声明为 synchronized)。

  在 Java 中,不光是类实例,每一个类也对应一把锁,这样我们也可将类的静态成员函数声明为 synchronized ,以控制其对类的静态成员变量的访问。

  synchronized 方法的缺陷:若将一个大的方法声明为synchronized 将会大大影响效率,典型地,若将线程类的方法 run()声明为 synchronized ,由于在线程的整个生命期内它一直在运行,因此将导致它对本类任何 synchronized 方法的调用都永远不会成功。

synchronized 块:

  synchronized 块是这样一个代码块,其中的代码必须获得对象 syncObject (如前所述,可以是类实例或类)的锁方能执行,具体机制同前所述。由于可以针对任意代码块,且可任意指定上锁的对象,故灵活性较高。

  通过 synchronized关键字来声明synchronized 块。语法如下:

1
2
3
synchronized(syncObject){  
//允许访问控制的代码  
} 

3. 同步一个类

1
2
3
4
5
public void func() {
synchronized (SynchronizedExample.class) {
// ...
}
}

作用于整个类,也就是说两个线程调用同一个类的不同对象上的这种同步语句,也会进行同步。

1
2
3
4
5
6
7
8
9
10
public class SynchronizedExample {

public void func2() {
synchronized (SynchronizedExample.class) {
for (int i = 0; i < 10; i++) {
System.out.print(i + " ");
}
}
}
}
1
2
3
4
5
6
7
public static void main(String[] args) {
SynchronizedExample e1 = new SynchronizedExample();
SynchronizedExample e2 = new SynchronizedExample();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> e1.func2());
executorService.execute(() -> e2.func2());
}
1
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9

synchronized (this)的理解说明

  • 当一个线程正在执行object的一个synchronized(this)同步代码块时,该线程就获得了这个object的对象锁。
  • 当两个并发线程访问同一个对象object中的这个synchronized(this)同步代码块时,一个时间内只能有一个线程得到执行。另一个线程必须等待当前线程执行完这个代码块以后才能执行该代码块。  
  • 当一个线程访问object的一个synchronized(this)同步代码块时,其他线程对object中所有其它synchronized(this)同步代码块的访问将被阻塞。  
  • 但是,当一个线程访问object的一个synchronized(this)同步代码块时,另一个线程仍然可以访问该object中的除synchronized(this)同步代码块以外的部分。

总结

同步块、同步方法的锁定说明:

  • 对于同步的方法或者代码块来说,必须获得对象锁才能够进入同步方法或者代码块进行操作;
  • 如果采用普通方法级别的同步,则对象锁即为该方法所在的对象,如果是静态方法,对象锁即指该方法所在的类的锁(类的锁,对所有实例化对象都是唯一的)。
  • 对于代码块,对象锁即指synchronized(obj)中的obj;
  • 静态方法则一定会同步,非静态方法需在单例模式才生效,推荐用静态方法

实现同步的一些技巧

  搞清楚synchronized锁定的是哪个对象,就能帮助我们设计更安全的多线程程序。 还有一些技巧可以让我们对共享资源的同步访问更加安全:

  • 定义private的instance变量(成员变量)+对应的get()方法,而不要定义public/protected的instance变量。如果将变量定义为public,对象在外界可以绕过同步方法的控制而直接取得它,并改动它。这也是JavaBean的标准实现方式之一。
  • 如果instance变量是一个对象(如数组或ArrayList),那上述方法仍然不安全,因为当外界对象通过get()方法拿到这个instance对象的引用后,又将其指向另一个对象,那么这个private变量也就变了,岂不是很危险。这个时候就需要将get()方法也加上synchronized同步,并且,只返回这个private对象的clone(),这样,调用端得到的就是对象副本的引用了。
  • 还有,比较常用的就有:Collections.synchronizedMap(new HashMap()),当然这个MAP就是生命在类中的全局变量,就是一个线程安全的HashMap,web的application是全web容器公用的,所以要使用线程安全来保证数据的正确。

ava中多线程锁释放的条件:

  • 执行完同步代码块,就会释放锁。(synchronized)
  • 在执行同步代码块的过程中,遇到异常而导致线程终止,锁也会被释放。(exception)
  • 在执行同步代码块的过程中,执行了锁所属对象的wait()方法,这个线程会释放锁,进入对象的等待池。(wait)

比较

1. 锁的实现

synchronized 是 JVM 实现的,而 ReentrantLock 是 JDK 实现的。

2. 性能

新版本 Java 对 synchronized 进行了很多优化,例如自旋锁等,synchronized 与 ReentrantLock 大致相同。

3. 等待可中断

当持有锁的线程长期不释放锁的时候,正在等待的线程可以选择放弃等待,改为处理其他事情。

ReentrantLock 可中断,而 synchronized 不行。

4. 公平锁

公平锁是指多个线程在等待同一个锁时,必须按照申请锁的时间顺序来依次获得锁。

synchronized 中的锁是非公平的,ReentrantLock 默认情况下也是非公平的,但是也可以是公平的。

5. 锁绑定多个条件

一个 ReentrantLock 可以同时绑定多个 Condition 对象。

3、Conditional条件对象

通常,线程拿到锁对象之后,却发现需要满足某一条件才能继续向下执行。

拿银行程序来举例子,我们需要转账方账户有足够的资金才能转出到目标账户,这时候需要用到ReentrantLock对象,因为如果我们已经完成转账方账户有足够的资金的判断之后,线程被其他线程中断,等其他线程执行完之后,转账方的钱又没有了足够的资金,这时候因为系统已经完成了判断,所以会继续向下执行,然后银行系统就会出现问题。

举例:

1
2
3
4
public void Transfer(int from, int to, double amount) {
if (Accounts[from] > amount)//系统在结束判断之后被剥夺运行权,然后账户通过网银转出所有钱,银行凉凉
DoTransfer(from, to, amount);
}

这时候我们就需要使用ReentrantLock对象了,我们修改一下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void Transfer(int from, int to, double amount) {
ReentrantLock locker = new ReentrantLock();
locker.lock();
try {
while (Accounts[from] < amount) {
//等待有足够的钱
}
DoTransfer(from, to, amount);
} catch (Exception ex) {
ex.printStackTrace();
} finally {
locker.unlock();
}
}

但是这样又有了问题,当前线程获取了锁对象之后,开始执行代码,发现钱不够,进入等待状态,然后其他线程又因为锁的原因无法给该账户转账,就会一直进入等待状态。

这个问题如何解决呢?

条件对象登场!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void Transfer(int from, int to, double amount) {
ReentrantLock locker = new ReentrantLock();
Condition sufficientFunds = locker.newCondition();//条件对象,
lock.lock();
try {
while (Accounts[from] < amount) {
sufficientFunds.await();
//等待有足够的钱
}
DoTransfer(from, to, amount);
sufficientFunds.signalAll();
} catch (Exception ex) {
ex.printStackTrace();
} finally {
locker.unlock();
}
}

条件对象的关键字是:Condition一个锁对象可以有一个或多个相关的条件对象。可以通过锁对象.newCondition方法获得一个条件对象.

在进入锁之前,我们创建一个条件,然后如果金额不足,在这里调用条件对象的await方法,通知系统当前线程进入挂起状态,让其他线程执行。这样你这次调用会被锁定,然后系统可以再次调用该方法给其他账户转账,当每一次转账完成后,执行转账操作的线程在底部调用signalAll通知所有线程可以继续运行了,因为我们有可能是转足够的钱给当前账户,这时候有可能该线程会继续执行(不一定是你,是通知所有线程,如果通知的线程还是不符合条件,会继续调用await方法,并完成转账操作,然后通知其他挂起的线程。

你说为啥不直接通知当前线程?不行,可以调用signal方法只通知一个线程,但是如果这个线程操作的账户还是没钱(不是转账给这个账户的情况),那这个线程又进入等待了,这时候已经没有线程能通知其他线程了,程序死锁,所以还是用signal比较保险。

以上是使用ReentrantLock+Condition对象,那你说我要是使用synchronized隐式锁怎么办?

也可以,而且不需要

1
2
3
4
5
6
7
8
public void Transfer(int from, int to, double amount) {
while (Accounts[from] < amount) {
wait();//这个wait方法是定义在Object类里面的,可以直接用,和条件对象的await一样,挂起线程
//等待有足够的钱
}
DoTransfer(from, to, amount);
notifyAll();//通知其他挂起的线程
}

Object类里面定义了wait、notifyAll、notify方法,对应await、signalAll和signal方法,用来操作隐式锁,synchronized只能有一个条件,而ReentrantLock显式声明的锁可以用绑定多个Condition条件.

4、同步代码块

即有synchronized关键字修饰的语句块。

被该关键字修饰的语句块会自动被加上内置锁,从而实现同步。

1
2
3
4
5
public void method3(SomeObject obj){
synchronized(obj) { //锁定的是对象obj的对象锁
//…..
}
}

这时,锁就是obj这个对象,谁拿到这个锁谁就可以运行它所控制的那段代码。当有一个明确的对象作为锁时,就可以按以上方式来写程序;当没有明确的对象作为锁时,但还想让一段代码同步时,可以创建一个特殊的instance变量(必须是一个对象)来充当锁,此时代码如下:

1
2
3
4
5
6
7
8
class Foo implements Runnable{
private byte[] lock = new byte[0]; // 特殊的instance变量;也可以用String常量作为锁
Public void methodA(){
synchronized(lock) { //… }
}
//…..
}
//注:零长度的byte数组对象创建起来将比任何对象都经济――查看编译后的字节码:生成零长度的byte[]对象只需3条操作码,而Object lock = new Object()则需要7行操作码。
1
2
3
4
5
6
7
8
Object locker = new Object();
synchronized (locker) {
*//do some work*
}
*//也可以直接锁当前类的对象*
sychronized(this){
*//do some work*
}

以上代码会获得Object类型locker对象的锁,这种锁是一个特殊的锁,在上面的代码中,创建这个Object类对象只是单纯用来使用其持有的锁.

这种机制叫做同步块,应用场景也很广:有的时候,我们并不是整个一个方法都需要同步,只是方法里的部分代码块需要同步,这种情况下,我们如果将这个方法声明为synchronized,尤其是方法很大的时候,会造成很大的资源浪费。所以在这种情况下我们可以使用synchronized关键字来声明同步块:

1
2
3
4
5
6
public void Method() {
//do some work without synchronized
synchronized (this) {
//do some synchronized operation
}
}

例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class Bank {  

private int count =0;//账户余额

//存钱
public void addMoney(int money){

synchronized (this) {
count +=money;
}
System.out.println(System.currentTimeMillis()+"存进:"+money);
}

//取钱
public void subMoney(int money){

synchronized (this) {
if(count-money < 0){
System.out.println("余额不足");
return;
}
count -=money;
}
System.out.println(+System.currentTimeMillis()+"取出:"+money);
}

//查询
public void lookMoney(){
System.out.println("账户余额:"+count);
}
}

注:同步是一种高开销的操作,因此应该尽量减少同步的内容。通常没有必要同步整个方法,使用synchronized代码块同步关键代码即可。

5、监视器的概念

锁和条件是同步中一个很重要的工具,但是它们并不是面向对象的。多年来,Java的研究人员努力寻找一种方法,可以在不需要考虑如何加锁的情况下,就能保证多线程的安全性。最成功的的一个解决方案叫做monitor监视器,这个对象内置于每一个Object变量中,相当于一个许可证。拿到许可证就可以进行操作,没有拿到则需要阻塞等待。

监视器具有以下特性:

1.监视器是只包含私有域的类

2.每个监视器对象都有一个相关的锁

3.使用监视器对象的锁对所有的方法进行加锁(举个例子:如果调用obj.Method方法,obj对象的锁会在方法调用的时候自动获得,当方法结束或返回之后会自动释放该锁因为所有的域都是私有的,这样可以确保一个线程在操作类对象的时候,没有其他线程可以访问里面的域

4.该锁对象可以有任意多个相关条件

其实我们使用的synchronized关键字就是使用了monitor来实现加锁解锁,所以又被称为内部锁因为Object类实现了监视器,所以对象又被内置于任何一个对象之中。这就是我们为什么可以使用synchronized(locker)的方式锁定一个代码块了,其实只是用到了locker对象中内置的monitor而已。每一个对象的monitor类又是唯一的,所以就是唯一的许可证,拿到许可证的线程才可以执行,执行完后释放对象的monitor才可以被其他线程获取。

举个例子:

1
2
3
synchronized (this) {
//do some synchronized operation
}

它在字节码文件中会被编译为:

1
2
3
monitorenter;//get monitor,enter the synchronized block
//do some synchronized operation
monitorexit;//leavel the synchronized block,release the monitor

6、死锁

产生死锁的必要条件:

互斥条件:进程要求对所分配的资源进行排它性控制,即在一段时间内某资源仅为一进程所占用。
请求和保持条件:当进程因请求资源而阻塞时,对已获得的资源保持不放。
不剥夺条件:进程已获得的资源在未使用完之前,不能剥夺,只能在使用完时由自己释放。
环路等待条件:在发生死锁时,必然存在一个进程–资源的环形链。

为什么倾向于使用signalAll和notifyAll方式,如果假设使用signal和notify,随机选择的线程发现自己还是不能运行,那么它再次被阻塞。这样就又会造成死锁现象。

7、锁测试和超时

线程在调用lock方法获得另一个线程持有的锁的时候,很可能发生阻塞。应该更加谨慎的申请锁,tryLock方法试图申请一个锁,如果申请成功,返回true,否则,立刻返回false,线程就会离开去做别的事,而不是被阻塞等待锁对象。

语法:

1
2
3
4
5
6
7
8
9
10
11
12
ReentrantLock locker = new ReentrantLock();
if (locker.tryLock()) {
try {
//do some work
} catch (Exception ex) {
ex.printStackTrace();
} finally {
locker.unlock();
}
} else {
//do other work
}

也可以给其指定超时参数,单位有SECONDSMILLISECONDSMICROSEONDSMANOSECONDS.

1
2
3
4
5
6
7
8
9
10
11
12
ReentrantLock locker = new ReentrantLock();
if (locker.tryLock(1000, TimeUnit.MILLISECONDS)) {
try {
//do some work
} catch (Exception ex) {
ex.printStackTrace();
} finally {
locker.unlock();
}
} else {
//do other work
}

lock方法不能被中断,如果一个线程在调用了lock方法后等待锁的时候被中断,中断线程在获得锁之前一直处于阻塞状态。

如果带有超时参数的tryLock方法,那么如果等待期间线程被中断,会抛出InterruptedException异常,这是一个很好的特性,允许程序打破死锁。

8、读写锁

eentrantLock类属于java.util.concurrent.locks包,这个包底下还有一个ReentrantReaderWriterLock类,如果使用多线程对数据读的操作很多,但是写的操作很少的话,可以使用这个类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock():

public void Read() {
Lock readLocker = rwl.readLock();//创建读取锁对象
readLocker.lock();//使用读取锁对象加锁
try {
//do some work
} catch (Exception ex) {
ex.printStackTrace();
} finally {
readLocker.unlock();
}
}

public void Write() {
Lock writeLocker = rwl.writeLock();//创建写入锁对象
writeLocker.lock();//使用写入锁对象加锁
try {
//do some work
} catch (Exception ex) {
ex.printStackTrace();
} finally {
writeLocker.unlock();
}
}

9、使用特殊域变量(volatile)实现线程同步

• volatile关键字为域变量的访问提供了一种免锁机制;

• 使用volatile修饰域相当于告诉虚拟机该域可能会被其他线程更新;

• 因此每次使用该域就要重新计算,而不是使用寄存器中的值;

• volatile不会提供任何原子操作,它也不能用来修饰final类型的变量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public class SynchronizedThread {

class Bank {

private volatile int account = 100;

public int getAccount() {
return account;
}

/**
* 用同步方法实现
*
* @param money
*/
public synchronized void save(int money) {
account += money;
}

/**
* 用同步代码块实现
*
* @param money
*/
public void save1(int money) {
synchronized (this) {
account += money;
}
}
}

class NewThread implements Runnable {
private Bank bank;

public NewThread(Bank bank) {
this.bank = bank;
}

@Override
public void run() {
for (int i = 0; i < 10; i++) {
// bank.save1(10);
bank.save(10);
System.out.println(i + "账户余额为:" +bank.getAccount());
}
}

}

/**
* 建立线程,调用内部类
*/
public void useThread() {
Bank bank = new Bank();
NewThread new_thread = new NewThread(bank);
System.out.println("线程1");
Thread thread1 = new Thread(new_thread);
thread1.start();
System.out.println("线程2");
Thread thread2 = new Thread(new_thread);
thread2.start();
}

public static void main(String[] args) {
SynchronizedThread st = new SynchronizedThread();
st.useThread();
}

注:多线程中的非同步问题主要出现在对域的读写上,如果让域自身避免这个问题,则就不需要修改操作该域的方法。用final域,有锁保护的域和volatile域可以避免非同步的问题。

10、final变量

上一节已经了解到,除非使用锁或volatile修饰符,否则无法从多个线程安全地读取一个域。

还有一种情况可以安全地访问一个共享域,即这个域声明为final时。考虑以下声明:

finalMap<String,Double〉accounts=newHashKap<>0;

其他线程会在构造函数完成构造之后才看到这个accounts变量。

如果不使用final,就不能保证其他线程看到的是accounts更新后的值,它们可能都只是看到null,而不是新构造的HashMap。

当然,对这个映射表的操作并不是线程安全的。如果多个线程在读写这个映射表,仍然需要进行同步

11、线程的局部变量

线程间有时要避免共享变量,使用ThreadLocal辅助类为各个线程提供各自的实例。

例如,SimpleDateFormat类不是线程安全的。

1
2
3
public static final SimpleDateFormat dateFormat = 
new SimpleDateFormat("yyyy-MM-dd");

如果两个线程都执行以下操作:

1
String dateStamp = dateFormat.format(new Date());

结果可能很混乱,因为dateFormat使用的内部数据结构可能会被并发的访问所破坏。当然可以使用同步,但开销很大;或者也可以在需要时构造一个局部SimpleDateFormat对象,不过这也太浪费了。

要为每个线程构造一个实例,可以使用以下代码:

1
2
public static final ThreadLocal<SimpleDateFormat> dateFormat = 
ThreadLocal.withInitial(()->new SimpleDateFormat("yyyy-MM-dd"));

要访问具体的格式化方法,可以调用:

1
String dateStamp = dateFormat.get().format(new Date());

在一个给定线程中首次调用get时,会调用initialValue方法。在此之后,get方法会返回属于当前线程的那个实例。

在多个线程中生成随机数也存在类似的问题。java.util.Random类是线程安全的。但是如果多个线程需要等待一个共享的随机数生成器,这会很低效。

可以使用ThreadLocal辅助类为各个线程提供一个单独的生成器,不过Java SE 7还另外提供了一个便利类。只需要做以下调用:

1
int random = ThreadLocalRandom.current().nextInt(upperBound);

ThreadLocalRandom.current()调用会返回特定于当前线程的Random类实例。

六、线程通信

1、借助于Object类的wait()、notify()和notifyAll()实现通信

​ 线程执行wait()后,就放弃了运行资格,处于冻结状态;

​ 线程运行时,内存中会建立一个线程池,冻结状态的线程都存在于线程池中,notify()执行时唤醒的也是线程池中的线程,线程池中有多个线程时唤醒第一个被冻结的线程。
​ notifyall(), 唤醒线程池中所有线程。
注: (1) wait(), notify(),notifyall()都用在同步里面,因为这3个函数是对持有锁的线程进行操作,而只有同步才有锁,所以要使用在同步中;
​ (2) wait(),notify(),notifyall(), 在使用时必须标识它们所操作的线程持有的锁,因为等待和唤醒必须是同一锁下的线程;而锁可以是任意对象,所以这3个方法都是Object类中的方法。

单个消费者生产者例子如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class Resource{  //生产者和消费者都要操作的资源  
private String name;
private int count=1;
private boolean flag=false;
public synchronized void set(String name){
if(flag)
try{wait();}catch(Exception e){}
this.name=name+"---"+count++;
System.out.println(Thread.currentThread().getName()+"...生产者..."+this.name);
flag=true;
this.notify();
}
public synchronized void out(){
if(!flag)
try{wait();}catch(Exception e){}
System.out.println(Thread.currentThread().getName()+"...消费者..."+this.name);
flag=false;
this.notify();
}
}
class Producer implements Runnable{
private Resource res;
Producer(Resource res){
this.res=res;
}
public void run(){
while(true){
res.set("商品");
}
}
}
class Consumer implements Runnable{
private Resource res;
Consumer(Resource res){
this.res=res;
}
public void run(){
while(true){
res.out();
}
}
}
public class ProducerConsumerDemo{
public static void main(String[] args){
Resource r=new Resource();
Producer pro=new Producer(r);
Consumer con=new Consumer(r);
Thread t1=new Thread(pro);
Thread t2=new Thread(con);
t1.start();
t2.start();
}
}//运行结果正常,生产者生产一个商品,紧接着消费者消费一个商品。

七、同步器J.U.C-AQS

多线程并发的执行,之间通过某种 共享 状态来同步,只有当状态满足 xxxx 条件,才能触发线程执行 xxxx 。这个共同的语义可以称之为同步器。

可以认为以上所有的锁机制都可以基于同步器定制来实现的。

而juc(java.util.concurrent)里的思想是 将这些场景抽象出来的语义通过统一的同步框架来支持。

juc 里所有的这些锁机制都是基于 AQS ( AbstractQueuedSynchronizer )框架上构建的。下面简单介绍下 AQS( AbstractQueuedSynchronizer )。 可以参考Doug Lea的论文The java.util.concurrent Synchronizer Framework(http://gee.cs.oswego.edu/dl/papers/aqs.pdf)

Java中多线程开发时,离不开线程的分工协作,常用的多线程的同步器有如下几种:

1、CountDownLatch(倒计时门闩)

应用场景:等待一组线程任务完成后在继续执行当前线程。

用法:定义一个CountDownLatch变量latch,在当前线程中调用latch.await()方法,在要等待的一组线程中执行完后调用latch.countDown()方法,这样当该线程都调用过latch.countDown()方法后就开始执行当前线程latch.await()后的方法。


倒计时门闩会导致一条或多条线程在“门口”一直等待,直到另一条线程打开这扇门,线程才得以继续运行。他是由一个计数变量和两个操作组成的,这两个操作分别是“导致一条线程等待直到。

计数变为0”以及“递减计数变量”。

例如:

以下代码是用倒计时门闩实现的一个是所有线程同时执行同时结束之后,才能继续执行主线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Test {
final static int NTHREADS = 3;
public static void main(String[] args) {
final CountDownLatch startSignal = new CountDownLatch(1);
final CountDownLatch doneSignal = new CountDownLatch(NTHREADS);
Runnable r = new Runnable() {

@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + "进入等待");
startSignal.await();//3个线程进入等待,直到startSignal.countDown()被调用
System.out.println(Thread.currentThread().getName() + "开始执行任务");
Thread.sleep(200);
doneSignal.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
ExecutorService es = Executors.newFixedThreadPool(NTHREADS);
for (int i = 0; i < NTHREADS; i++) {
es.execute(r);
}
try {
Thread.sleep(1000);
startSignal.countDown();
//3个线程全部开始执行任务,主线程进入等待
System.out.println(Thread.currentThread().getName() + "进入等待");
//直到3个线程全部结束任务,doneSignal.countDown()被调用,主线程开始执行
doneSignal.await();
System.out.println(Thread.currentThread().getName() + "开始执行");
es.shutdownNow();

} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

2、CyclicBarrier(同步屏障)

应用场景:等待一组线程到达某个点后一起执行,该组线程达到指定点后可以再次循环执行。也可用于一组线程达达某个点后再执行某个方法。

用来控制多个线程互相等待,只有当多个线程都到达时,这些线程才会继续执行。

用法:定义一个CyclicBarrier变量barrier,线程达到某个约定点时调用barrier.await()方法,当该组所有线程都调用了barrier.await()方法后改组线程一起向下执行。

CyclicBarrier和CountDownLatch的区别

  • CountDownLatch的计数器只能使用一次。而CyclicBarrier的计数器可以使用reset() 方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次。
  • CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量。isBroken方法用来知道阻塞的线程是否被中断。

若有多条线程,他们到达屏障时将会被阻塞,只有当所有线程都到达屏障时才能打开屏障,


所有线程同时执行,若有这样的需求可以使用同步屏障。此外,当屏障打开的同时还能指定执行的任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 创建同步屏障对象,并制定需要等待的线程个数 和 打开屏障时需要执行的任务
CyclicBarrier barrier = new CyclicBarrier(3,new Runnable(){
public void run(){
//当所有线程准备完毕后触发此任务
}
});
// 启动三条线程
for( int i=0; i<3; i++ ){
new Thread( new Runnable(){
public void run(){
// 等待,(每执行一次barrier.await,同步屏障数量-1,直到为0时,打开屏障)
barrier.await();
// 任务
任务代码……
}
} ).start();
}

倒计时门闩 与 同步屏障 的区别 倒计时门闩只会阻塞一条线程,目的是为了让该条任务线程满足条件后执行;

而同步屏障会阻塞所有线程,目的是为了让所有线程同时执行

3、Semaphore(信号量)

应用场景:对于一组有限制都资源访问。比如餐厅有5个位置但同时有7个人要吃饭,则要控制7个人对餐位的并发实用。

用法:定义Semaphore变量semaphore包含受限的资源个数,每个人要来用餐时先调用semaphore.acquire()方法获取一个餐位(若没有餐位,则阻塞等待),用完餐后调用semaphore.release()释放餐位给其它人用。

信号量维护了一组许可证,以约束访问被限制资源的线程数。当没有可用

的许可证时,线程的获取尝试会一直阻塞,直到其它的线程释放一个许可证。

【信号量
一个信号量管理多个许可证。为了通过信号量,线程通过调用acquire()请求许可。其实没有实际的许可对象,信号连也仅仅是维护一个计数器。
许可的数目是固定的,由此限制了线程通过的数量当一个线程执行完之后,应该调用release()释放许可证,让其他线程有机会执行。事实上,
任意一个线程都有可以释放任意个数的许可证,这可能会增加许可证的个数。所以我建议,如果不是非常明确的知道为什么要释放多个许可证,就一定
是让获得许可证的线程是放一个许可证。

【常用方法
1.构造函数:
  Semaphore(int permits):创建具有给定许可数和非公平设置的Semaphore

​   Semaphore(int permits,boolean fair):此类的构造方法可选地接受一个公平 参数。当设置为 false 时(默认也是false),此类不对线程获取许可的顺序做任何保证。

​ 特别地,闯入是允许的,也就是说可以在已经等待的线程前为调用 acquire() 的线程分配一个许可,从逻辑上说,就是新线程将自己置于等待线程队列的头部。
​ 当公平设置为 true 时,信号量保证对于任何调用获取方法的线程而言,都按照处理它们调用这些方法的顺序(即先进先出;FIFO)来选择线程、获得许可。
​ 注意,FIFO 排序必然应用到这些方法内的指定内部执行点。所以,可能某个线程先于另一个线程调用了 acquire,但是却在该线程之后到达排序点,并且从方法返回时也类似。

2.Semaphore还提供一些其他方法:
int availablePermits() :返回此信号量中当前可用的许可证数。
int getQueueLength():返回正在等待获取许可证的线程数。
boolean hasQueuedThreads() :是否有线程正在等待获取许可证。
void reducePermits(int reduction) :减少reduction个许可证。是个protected方法。
Collection getQueuedThreads() :返回所有等待获取许可证的线程集合。是个protected方法。

【补充
当许可证的个数为1时,可以充当互斥锁使用。

示例代码:

只能同时有5个线程访问的信号量

1
`// 创建信号量对象,并给予3个资源Semaphore semaphore = new Semaphore(3);// 开启10条线程for ( int i=0; i<10; i++ ) {    new Thread( new Runnbale(){        public void run(){            // 获取资源,若此时资源被用光,则阻塞,直到有线程归还资源            semaphore.acquire();            // 任务代码            ……            // 释放资源            semaphore.release();        }    } ).start();}`

4、Exchanger交换器

  • 交换值是同步的;
  • 成对的线程之间交换数据;
  • 可看成是双向的同步队列;
  • 可应用于演算法、流水线设计;

Exchanger类中的主要方法就是:exchange(V x)方法,成对的两个线程之间,都调用了该方法,就能在两个线程彼此都准备好数据后,成功的交换数据给对方,然后各自返回。如果想支持成对的两个线程之间,一个没耐性,等的时间过长,或者被打断了就不交换数据了,可以使用exchange(V x, long timeout, TimeUnit unit)方法。

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
import java.util.concurrent.atomic.*;

//球线程
class BallTask implements Runnable
{
private Exchanger<String> e;
public BallTask(Exchanger<String> e){
this.e = e;
}

public void run(){
try{
long sleepTime = (long)(Math.random() * 2500) ;
String tName = Thread.currentThread().getName();
System.out.println(tName+"正在买球,用时["+sleepTime+"]才买到球,赶紧去换鱼...");
Thread.sleep(sleepTime);
//这里的str即为交换的东西
String str = e.exchange(tName+":的球");
System.out.println("【"+tName+":的球】换到了-->【"+str+"】");
}

catch(Exception e){
}
finally{
}
}
}

//鱼线程
class FishTask implements Runnable
{
private Exchanger<String> e;
public FishTask(Exchanger<String> e){
this.e = e;
}

public void run(){
try{
long sleepTime = (long)(Math.random() * 2500) ;
String tName = Thread.currentThread().getName();
System.out.println(tName+"正在钓鱼,用时["+sleepTime+"]才钓到鱼,赶紧去换球...");
Thread.sleep(sleepTime);
String str = e.exchange(tName+":的鱼");
System.out.println("【"+tName+":的鱼】换到了-->【"+str+"】");
}
catch(Exception e){
}
finally{
}
}
}

public class ExchangerTest
{
public static void main(String[] args)
{
Exchanger<String> e = new Exchanger<String>();

BallTask bTask = new BallTask(e); //任务:球线程
FishTask fTask = new FishTask(e); //任务:鱼线程

Thread bThread = new Thread(bTask,"Ball");
Thread fThread = new Thread(fTask,"Fish");

bThread.start();
fThread.start();

System.out.println("我是主线程,准备看看你们交易情况...\n\r");

try{
//Thread类中的join方法的主要作用就是同步,它可以使得线程之间的并行执行变为串行执行。
bThread.join();
fThread.join();
}catch(Exception ep){}

System.out.println("\n\r我是主线程,已看到你们的交易结果...");
}
}

Exchanger和Semaphore区别
Exchanger交换器和Semaphore信号量在关于生产者消费者《产1消1模式》运用的区别:

1·Exchanger交换器:成对的两个线程,各个线程有各个线程的自己数据V,A线程拥有V1,B线程拥有V2,V1<…>V2互换。
2·Semaphore信号量:成对的两个线程,只需一个数据池即可,生产者生产数据注入数据池,消费者从数据池取走数据消费。
3·Exchanger交换器:两个线程之间的通讯仅仅一个Exchanger实例即可。
4·Semaphore信号量:两个线程之间的通讯需要两个信号量,生产信号指示灯,消费信号指示灯。
5·Exchanger和Semaphore的共同点:两个线程之间需要同步通讯。生产的过快,没用,必须等消费完了,才能进行下一生产1;同理,消费的过快,也没用,必须等生产完了,才能进行下一消费1。

5、同步队列与等待队列

书上:

【同步队列是一种将生产者与消费者线程配对的机制。当一个线程调用SynchronousQueue的put方法时,它会阻塞直到另一个线程调用take方法为止,反之亦然。与Exchanger的情况不同,数据仅仅沿一个方向传递,从生产者到消费者。即使SynchronousQueue类实现了BlockingQueue接口,概念上讲,它依然不是一个队列。它没有包含任何元素,它的size方法总是返回0。】

简单的理解是同步队列存放着竞争同步资源的线程的引用(不是存放线程),而等待队列存放着待唤醒的线程的引用。

同步队列中存放着一个个节点,当线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点并将其加入同步队列,首节点表示的获取同步状态成功的线程节点。

Condition维护着一个等待队列与同步队列相似。主要针对await和signal的操作。

例子:

这里实现了三个多线程的run方法。A线程输出A然后通知B,然后B通知C。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public static class ThreadA extends Thread{
@Override
public void run(){
try{
lock.lock();
System.out.println("A进程输出" + " : " + ++index);
conditionB.signal();
conditionA.await();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}

public static class ThreadB extends Thread{
@Override
public void run(){
try{
lock.lock();
System.out.println("B进程输出" + " : " + ++index);
conditionC.signal();
conditionB.await();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}

public static class ThreadC extends Thread{
@Override
public void run(){
try{
lock.lock();
System.out.println("C进程输出" + " : " + ++index);
conditionA.signal();
conditionC.await();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class CondtionTest {

public static ReentrantLock lock = new ReentrantLock();
public static Condition conditionA = lock.newCondition();
public static Condition conditionB = lock.newCondition();
public static Condition conditionC = lock.newCondition();
public static int index = 0;
public static void main(String[] args){
ThreadA threadA = new ThreadA();
ThreadB threadB = new ThreadB();
ThreadC threadC = new ThreadC();

threadA.start();//(1)
threadB.start();//(2)
threadC.start();//(3)
}
}

当(1)(2)(3)三个线程被调用时,因为三个线程同时竞争lock,这里假设线程A拿到了lock(线程A虽然是看起来是先start(),但是正在的调用还是看调度程序的,所以这里只能假设是A线程拿到同步资源)。首节点表示的是正在操作同步资源的线程。所以现在的同步队列是:

接着线程A输出了:“A进程输出 : 1”。然后调用conditionB.signal(),其实这一步的signal是没什么意义的,因为conditionB现在没有线程是可以被唤醒的。
当conditionA.await()被执行到的时候,线程A同步队列中被移除,对应操作是锁的释放; 线程A(节点A)接着被加入到ConditionA等待队列,因为线程需要singal信号。

同步队列

A等待队列

现在在同步队列中的首节点是B节点,那么B线程占用了同步资源就可以开始运行了。先是输出“B进程输出 : 2”,同样的signal操作也是没有意义的,因为conditionC是没有可以被唤醒的线程。当conditionB.await()被执行到的时候,线程B同步队列中被移除,线程B(节点B)接着被加入到ConditionB等待队列

同步队列

B等待队列

终于轮到了C线程占用同步资源了,再输出“C进程输出:3”之后,调用conditionA.signal(),注意这个signal是有用的
因为在conditionA的等待队列中A线程是在等待的,把它取出来加入到同步队列中去竞争,但是这个时候线程A还没唤醒。首节点还是C

同步队列

接着conditionC.await()被执行。线程C同步队列中被移除,线程C(节点C)接着被加入到ConditionC等待队列

同步队列

C等待队列

注意到同步队列中的首节点已经变回了节点A了。所以线程A在刚刚等待的地方继续执行,最后释放了lock。但是线程B和线程C最后也没有其他线程去唤醒,状态一直为WAITING,而线程A的状态为TERMINATED。

6、定时器

定时器是一个应用十分广泛的线程工具,可用于调度多个定时任务以后台线程的方式执行,在Java中,可以通过Timer和TimerTask类来实现定义调度的功能。

Timer类

1
2
3
4
5
public Timer()默认的构造方法
public void schedule(TimerTask task, long delay) 在指定的延迟后执行指定的任务
public void schedule(TimerTask task,long delay,long period) 计划重复固定延迟执行指定的任务,在指定的延迟后开始
public void schedule(TimerTask task, Date time) 在指定的时间计划指定的任务
public void schedule(TimerTask task, Date firstTime, long period) 计划重复固定延迟执行指定的任务,在指定的开始时间

TimerTask类

1
2
public abstract void run()	定时器任务执行的动作
public boolean cancel() 取消此定时器任务

代码演示

1、输出爆炸啦

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import java.util.Timer;
import java.util.TimerTask;

//继承TimerTask,需要重新rum方法
public class Time extends TimerTask {
@Override
public void run() {
System.out.println("爆炸啦");
}
}
import java.util.Timer;

public class TimeTest {
public static void main(String[] args) {
//new一个Timer对象,用来调方法
Timer timer = new Timer();
//调用Timer对象的方法schedule,第一个参数必须是TimerTask对象,Time继承了它因此也是这个对象,第二个参数表示在2秒后运行run方法,这个参数只有在第一次使用run方法,最后一个参数是每隔1秒,运行一次run方法
timer.schedule(new Time(),2000,1000);
}
}

这个结果是,不停的输出爆炸啦,要想停掉的话,可以加入cancel()方法,在run方法最后加入这个话,结果会输出一个爆炸啦,因为执行第一次完后就执行到这一语句,定时器就取消了,如下。

2、

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
javapublic class Time extends TimerTask {
//定义一个Timer对象,到时候可以用它来调用cancel方法
Timer time;
//通过构造器给Timer对象赋值
public Time(Timer time){
this.time=time;
}
@Override
public void run() {
System.out.println("爆炸啦");
//取消定时器
time.cancel();
}
}
public class TimeTest {
public static void main(String[] args) {
Timer timer = new Timer();
timer.schedule(new Time(timer),2000,1000);
}
}