一、JUC简介
在Java5.0提供了java.util.concurrent
包,简称JUC,即Java并发编程工具包。JUC更好的支持高并发任务。
具体的有以下三个包:
java.util.concurrent
java.util.concurrent.atomic
java.util.concurrent.locks
二、Lock锁
1、传统的synchronized锁
/** * synchronized售票例子 */ public class SynSaleTicket { //真正在公司开发,遵守oop思想,降低耦合性 //线程就是一个单独的资源类,没有任何附属操作,里面只包含属性、方法 public static void main(String[] args) { //并发:多个线程操作同一个资源 Ticket ticket = new Ticket(); //lambda表达式 new Thread(() -> { for (int i = 0; i < 40; i++) { ticket.sale(); } }, "A").start(); new Thread(() -> { for (int i = 0; i < 40; i++) { ticket.sale(); } }, "B").start(); new Thread(() -> { for (int i = 0; i < 40; i++) { ticket.sale(); } }, "C").start(); } } //资源类 class Ticket { //票数 private int number = 30; //买票方法 //synchronized本质就是队列+锁 public synchronized void sale() { if (number > 0) { System.out.println(Thread.currentThread().getName() + "抢到了第" + (number--) + "张票,剩下" + number); } } }
2、JUC包下的Lock接口
查看jdk1.8官方文档可以看到有三个实现类:
- ReentrantLock:可重入锁(常用)
- ReentrantReadWriteLock.ReadLock:可重入读锁
- ReentrantReadWriteLock.WriteLock:可重入写锁
//lock锁用法: //1. 创建锁对象 Lock l = ...; //2. 加锁 l.lock(); //3. 解锁 try {} finally { l.unlock(); }
1. 公平锁和非公平锁
通俗的说公平锁其实就是买票都需要排队按队伍顺序遵循先来后到的原则获得锁,非公平锁就是有人开VIP可以插队获得锁。
- 公平锁:多个线程按照申请锁的顺序去获得锁,线程会直接进入队列去排队,永远都是队列的第一位才能得到锁。
- 优点:所有的线程都能得到资源,不会饿死在队列中。
- 缺点:吞吐量会下降很多,队列里面除了第一个线程,其他的线程都会阻塞,cpu唤醒阻塞线程的开销会很大。
- 非公平锁:多个线程获取锁的时候,不会按照申请锁的顺序去获得锁,会直接尝试获取锁,如果能获取到锁,就直接获得锁,如果获取不到,再进入等待队列乖乖等待。
- 优点:可以减少CPU唤醒线程的开销,整体的吞吐效率会高点,CPU也不必取唤醒所有线程,会减少唤起线程的数量。
- 缺点:可能导致队列中排队的线程一直获取不到锁或者长时间获取不到锁,活活饿死。
//ReentrantLock无参构造,相当于ReentrantLock(false) public ReentrantLock() { sync = new NonfairSync();//默认是非公平锁 } //ReentrantLock有参构造,fair参数决定是否选为公平锁,true是,false否 public ReentrantLock(boolean fair) { sync = fair ? new FairSync()/*公平锁*/ : new NonfairSync()/*非公平锁/*; }
package com.hao.demo01; //使用juc包下的锁 import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * Lock售票例子 */ public class LockSaleTicket { //使用Lock锁来解决买票问题 public static void main(String[] args) { Ticket ticket = new Ticket(); new Thread(() -> {for (int i = 0; i < 40; i++) ticket.sale();}, "A").start(); new Thread(() -> {for (int i = 0; i < 40; i++) ticket.sale();}, "B").start(); new Thread(() -> {for (int i = 0; i < 40; i++) ticket.sale();}, "C").start(); } } class Ticket2 { //票数 private int number = 30; //Lock锁 Lock lock = new ReentrantLock(); //买票方法 public void sale() { lock.lock();//加锁 try { if (number > 0) { System.out.println(Thread.currentThread().getName() + "抢到了第" + (number--) + "张票,剩下" + number); } } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock();//解锁 } } }
3、Synchronized 和 Lock锁的区别
区别 | Synchronized | Lock |
---|---|---|
是否关键字 | Synchronized是Java内置关键字 | Lock类是一个接口 |
是否可尝试获取锁 | Synchronized无法判断是否获取锁的状态 | Lock可以判断是否获取到锁 |
是否自动释放锁 | Synchronized会自动释放锁(a 线程执行完同步代码会释放锁;b 线程执行过程中发生异常会释放锁) | Lock需在finally中手工释放锁,否则容易造成线程死锁 |
是否一直阻塞 | 用Synchronized关键字修饰的两个线程1和线程2,如果当前线程1获得锁,线程2线程等待。如果线程1阻塞,线程2则会一直等待下去 | Lock锁就不一定会等待下去,如果尝试获取不到锁,线程可以不用一直等待就结束了 |
是否可重入、中断、公平锁 | Synchronized的锁可重入、不可中断、非公平 | Lock锁可重入、可中断、可公平(也可非公平) |
使用场合 | Synchronized锁适合代码少量的同步问题 | Lock锁适合大量同步的代码的同步问题 |
总体来说,Lock锁比synchronized更加灵活,提供了更加丰富的API进行同步操作,也可以结合Condition条件实现比较复杂的线程间同步通信。
三、线程通信
1、传统的Synchronized生产者与消费者
使用 Synchronized + wait + notify 这一套完成线程通信。
/** * 生产者和消费者问题! */ public class TestPC { //使用多个线程操作同一个变量num=0 //线程A负责num+1,完事之后通知B操作。线程B负责num-1,完事之后通知A操作 public static void main(String[] args) { Data data = new Data(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "A").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "B").start(); } } //资源类 class Data { //变量 private int num = 0; //加 public synchronized void increment() throws InterruptedException { //判断等待 if (num != 0) { this.wait(); } num++; System.out.println(Thread.currentThread().getName() + "=>" + num); //通知其他线程,+1执行完毕 this.notifyAll(); } //减 public synchronized void decrement() throws InterruptedException { //判断等待 if (num == 0) { this.wait(); } num--; System.out.println(Thread.currentThread().getName() + "=>" + num); //通知其他线程,-1执行完毕 this.notifyAll(); } }
如果此时再加两个C、D线程,两加两减,发现运行出问题。
1. 问题诞生:虚假唤醒
A=>1 B=>0 C=>1 A=>2 //出现问题 C=>3 B=>2 B=>1 B=>0 C=>1 A=>2 Process finished with exit code 0
查看官方文档解释Object类下的wait()方法
也就是说,这里用if判断的话,被唤醒后的线程将不会重新进入if判断,代码直接运行if代码块之后的代码,而使用while的话,被唤醒后的线程会重新判断循环条件,如果不成立再执行while代码块之后的代码。
解决办法:将 if 换成 while
/** * 生产者和消费者问题! */ public class TestPC { //使用多个线程操作同一个变量num=0 //线程A负责num+1,完事之后通知B操作。线程B负责num-1,完事之后通知A操作 public static void main(String[] args) { Data data = new Data(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "A").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "B").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "C").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "D").start(); } } //资源类 class Data { //变量 private int num = 0; //加 public synchronized void increment() throws InterruptedException { //判断等待 while (num != 0) { this.wait(); } num++; System.out.println(Thread.currentThread().getName() + "=>" + num); //通知其他线程,+1执行完毕 this.notifyAll(); } //减 public synchronized void decrement() throws InterruptedException { //判断等待 while (num == 0) { this.wait(); } num--; System.out.println(Thread.currentThread().getName() + "=>" + num); //通知其他线程,-1执行完毕 this.notifyAll(); } }
2、JUC版的生产者和消费者
思考,我们的传统的Synchronized锁被JUC版的Lock锁替代了,那传统的wait()和notify()有没有被替换呢?
当然有滴!
我们查看java.util.concurrent.locks包下的Lock接口下的方法
点Condition进入看
Condition接口属于JUC包下的,继续查看官方文档的解释
继续查看Condition接口中的方法
其中有 await()等待 和 signal()唤醒 方法替换。
于是新的一套锁方案出来了:Lock + await + signal
要用await()和signal()就必须得到Condition实例,看官方文档怎么说?
/** * 生产者和消费者问题! */ public class TestLockPC { public static void main(String[] args) { Data2 data = new Data2(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "A").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "B").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "C").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "D").start(); } } class Data2 { private int num = 0; //创建Lock锁实例 private final Lock lock = new ReentrantLock(); //创建Condition实例 private final Condition condition = lock.newCondition(); //加 public void increment() throws InterruptedException { lock.lock();//加锁 try { //判断等待 while (num != 0) { condition.await(); } num++; System.out.println(Thread.currentThread().getName() + "=>" + num); //通知其他线程,+1执行完毕 condition.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock();//解锁 } } //减 public void decrement() throws InterruptedException { lock.lock();//加锁 try { //判断等待 while (num == 0) { condition.await(); } num--; System.out.println(Thread.currentThread().getName() + "=>" + num); //通知其他线程,-1执行完毕 condition.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock();//解锁 } } }
A=>1 B=>0 C=>1 B=>0 C=>1 B=>0 C=>1 D=>0 A=>1 D=>0 A=>1 D=>0 Process finished with exit code 0
乍一看,Condition也没什么牛逼的呀,还有什么牛逼的功能吗?
当然有滴!
我们看运行出来的结果线程还是随机状态的,A B C B C B C D A D A D,我们想要实现A执行完通知B,B执行完通知C,C执行完通知D,D执行完通知A,实现 A B C D 按顺序执行。
使用Condition精准通知和唤醒线程。
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * 精准通知和精准唤醒线程。 */ public class TestAccuratePC { public static void main(String[] args) { Data3 data = new Data3(); new Thread(() -> { for (int i = 0; i < 10; i++) { data.printA(); } }, "A").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { data.printB(); } }, "B").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { data.printC(); } }, "C").start(); } } class Data3 { //创建Lock锁实例 private final Lock lock = new ReentrantLock(); //创建Condition锁实例 private final Condition condition1 = lock.newCondition(); private final Condition condition2 = lock.newCondition(); private final Condition condition3 = lock.newCondition(); private int num = 1; //假如num等于1就让A执行,2就让B执行,3就让C执行 public void printA() { lock.lock();//加锁 try { while (num != 1) { condition1.await();//等待 } System.out.println(Thread.currentThread().getName() + "=>AAAAAA"); num = 2; //唤醒指定的人,B condition2.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock();//解锁 } } public void printB() { lock.lock();//加锁 try { while (num != 2) { condition2.await();//等待 } System.out.println(Thread.currentThread().getName() + "=>BBBBBB"); num = 3; //唤醒指定的人,C condition3.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock();//解锁 } } public void printC() { lock.lock();//加锁 try { while (num != 3) { condition3.await();//等待 } System.out.println(Thread.currentThread().getName() + "=>CCCCCC"); num = 1; //唤醒指定的人,A condition1.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock();//解锁 } } }
A=>AAAAAA B=>BBBBBB C=>CCCCCC A=>AAAAAA B=>BBBBBB C=>CCCCCC A=>AAAAAA B=>BBBBBB C=>CCCCCC Process finished with exit code 0
四、8锁现象
通过8个问题彻底理解锁。
锁只会锁:对象、class
问题1
import java.util.concurrent.TimeUnit; /** * 题1:两个同步方法,两条线程分别执行,先打印 “发短信” 还是 “打电话” */ public class Demo01 { public static void main(String[] args) throws InterruptedException { Phone phone = new Phone(); new Thread(() -> { phone.sendSms(); }, "A").start(); TimeUnit.SECONDS.sleep(1);//JUC包下的sleep,睡1秒 new Thread(() -> { phone.call(); }, "B").start(); } } class Phone{ public synchronized void sendSms(){ System.out.println("发短信"); } public synchronized void call(){ System.out.println("打电话"); } }
结果是先发短信后打电话。
如果你认为发短信的方法先被调用就先执行,那你再看下面的情况。
问题2
import java.util.concurrent.TimeUnit; /** * 题2:两个同步方法,两条线程分别执行,发短信睡3秒,先打印 “发短信” 还是 “打电话” */ public class Demo01 { public static void main(String[] args) throws InterruptedException { Phone phone = new Phone(); new Thread(() -> { phone.sendSms(); }, "A").start(); TimeUnit.SECONDS.sleep(1);//JUC包下的sleep,睡1秒 new Thread(() -> { phone.call(); }, "B").start(); } } class Phone{ public synchronized void sendSms(){ //让sendSms()方法睡3秒 try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("发短信"); } public synchronized void call(){ System.out.println("打电话"); } }
还是先发短信后打电话。
原因:因为Synchronized锁的对象是方法调用者,两个方法用的同一把锁,所以谁先拿到锁,谁就先执行!
问题3
import java.util.concurrent.TimeUnit; /** * 题3:新增一个普通方法hello(),A线程调用sendSms(),B线程调用hello() */ public class Demo02 { public static void main(String[] args) throws InterruptedException { Phone2 phone = new Phone2(); new Thread(() -> { phone.sendSms(); }, "A").start(); TimeUnit.SECONDS.sleep(1);//JUC包下的sleep,睡1秒 new Thread(() -> { phone.hello(); }, "B").start(); } } class Phone2{ public synchronized void sendSms(){ try { TimeUnit.SECONDS.sleep(3);//睡3秒 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("发短信"); } public synchronized void call(){ System.out.println("打电话"); } //这里没有锁,不用去拿锁。 public void hello(){ System.out.println("hello"); } }
结果:先打印hello,再打印发短信
原因:hello()方法是一个普通方法,所以不受synchronized锁的影响,不用等待锁释放。
问题4
import java.util.concurrent.TimeUnit; /** * 题4:两个对象,对象1调用sendSms(),对象2调用call() */ public class Demo02 { public static void main(String[] args) throws InterruptedException { Phone2 phone1 = new Phone2(); Phone2 phone2 = new Phone2(); new Thread(() -> { phone1.sendSms(); }, "A").start(); TimeUnit.SECONDS.sleep(1);//JUC包下的sleep,睡1秒 new Thread(() -> { phone2.call(); }, "B").start(); } } class Phone2{ public synchronized void sendSms(){ try { TimeUnit.SECONDS.sleep(3);//睡3秒 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("发短信"); } public synchronized void call(){ System.out.println("打电话"); } }
结果:先打电话,再发短信
原因:因为两个对象有两把锁,各自用各自的锁,发短信的方法延迟了3秒,所以打电话的方法先执行。
问题5
import java.util.concurrent.TimeUnit; /** * 题5:两个静态同步方法,一个对象,先打印 “发短信” 还是 “打电话” */ public class Test3 { public static void main(String[] args) throws InterruptedException { Phone3phone = new Phone3(); new Thread(() -> { phone.sendSms(); }, "A").start(); TimeUnit.SECONDS.sleep(1);//JUC包下的sleep,睡1秒 new Thread(() -> { phone.call(); }, "B").start(); } } //只有唯一一个class对象 class Phone3{ //Synchronized锁的是方法的调用者! //static静态方法,类一加载就有了,锁的是class public static synchronized void sendSms(){ try { TimeUnit.SECONDS.sleep(3);//睡3秒 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("发短信"); } public static synchronized void call(){ System.out.println("打电话"); } }
结果:先发短信,后打电话
问题6
import java.util.concurrent.TimeUnit; /** * 题6:两个静态同步方法,两个对象,对象1调用sendSms(),对象2调用call(),先打印 “发短信” 还是 “打电话”? */ public class Test3 { public static void main(String[] args) throws InterruptedException { Phone3 phone1 = new Phone3(); Phone3 phone2 = new Phone3(); new Thread(() -> { phone1.sendSms(); }, "A").start(); TimeUnit.SECONDS.sleep(1);//JUC包下的sleep,睡1秒 new Thread(() -> { phone2.call(); }, "B").start(); } } //只有唯一一个Class对象 class Phone3{ //Synchronized锁的是方法的调用者! //static静态方法,类一加载就有了,锁的是class public static synchronized void sendSms(){ try { TimeUnit.SECONDS.sleep(3);//睡3秒 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("发短信"); } public static synchronized void call(){ System.out.println("打电话"); } }
结果:先发短信,后打电话
原因:静态方法是和类一起加载的,也就是说这个静态方法是属于这个类的,如果在static方法中使用synchronized锁,锁的将是Class对象,而Class对象又是全局唯一的,不管new多少个对象,它的Class对象只有一个,所以谁先拿到了锁谁就先执行。
问题7
import java.util.concurrent.TimeUnit; /** * 题7:一个静态同步方法,一个普通同步方法,一个对象,先打印 “发短信” 还是 “打电话”? */ public class Test4 { public static void main(String[] args) throws InterruptedException { Phone4 phone = new Phone4(); new Thread(() -> { phone.sendSms(); }, "A").start(); TimeUnit.SECONDS.sleep(1);//JUC包下的sleep,睡1秒 new Thread(() -> { phone.call(); }, "B").start(); } } class Phone4{ //静态同步方法,锁的是class类模板 public static synchronized void sendSms(){ try { TimeUnit.SECONDS.sleep(3);//睡3秒 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("发短信"); } //普通同步方法,锁的是调用者 public synchronized void call(){ System.out.println("打电话"); } }
结果:先打电话,后发短信
问题8
import java.util.concurrent.TimeUnit; /** * 题8:一个静态同步方法,一个普通同步方法,两个对象,对象1调用sendSms(),对象2调用call(),先打印 “发短信” 还是 “打电话”? */ public class Test4 { public static void main(String[] args) throws InterruptedException { Phone4 phone1 = new Phone4(); Phone4 phone2 = new Phone4(); new Thread(() -> { phone1.sendSms(); }, "A").start(); TimeUnit.SECONDS.sleep(1);//JUC包下的sleep,睡1秒 new Thread(() -> { phone2.call(); }, "B").start(); } } class Phone4{ //静态同步方法,锁的是class类模板 public static synchronized void sendSms(){ try { TimeUnit.SECONDS.sleep(3);//睡3秒 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("发短信"); } //普通同步方法,锁的是调用者 public synchronized void call(){ System.out.println("打电话"); } }
结果:先打电话,后发短信
原因:静态同步方法锁的是class对象,普通同步方法锁的是调用者,锁的是不同对象,所以后者不需要等待前者释放锁。
小结:总之synchronized锁就锁两个东西,一个是new出来的this对象,一个是类Class唯一的模板。
五、不安全的集合
1、List
我们通常使用的ArrayList就是线程不安全的,举个简单的例子
/** * List集合 */ public class TestList { public static void main(String[] args) { List<String> list = new ArrayList<>(); for (int i = 1; i <= 10; i++) { new Thread(() -> { list.add(UUID.randomUUID().toString().substring(0, 5)); System.out.println(list); }, String.valueOf(i)).start(); } } }
//并发修改异常 Exception in thread "4" java.util.ConcurrentModificationException at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909) at java.util.ArrayList$Itr.next(ArrayList.java:859) at java.util.AbstractCollection.toString(AbstractCollection.java:461) at java.lang.String.valueOf(String.java:2994) at java.io.PrintStream.println(PrintStream.java:821) at com.hnguigu.unsafe.TestList.lambda$main$0(TestList.java:20) at java.lang.Thread.run(Thread.java:748) Process finished with exit code 0
解决方案,有以下几种!
import java.util.concurrent.CopyOnWriteArrayList; public class TestList{ public static void main(String[] args) { /** * 解决方案 * 1. List<String> list = new Vector<>(); * 2. List<String> list = Collections.synchronizedList(new ArrayList<>()); * 3. List<String> list = new CopyOnWriteArrayList<>(); */ List<String> list = new CopyOnWriteArrayList<>(); for (int i = 1; i <=10; i++) { new Thread(() -> { list.add(UUID.randomUUID().toString().substring(0,5)); System.out.println(list); },String.valueOf(i)).start(); } } }
写入时复制(CopyOnWrite,简称COW)思想是计算机程序设计领域中的一种通用优化策略。
CopyOnWrite容器即写入时复制的容器。通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。这样做的好处是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。
读的时候不需要加锁,如果读的时候有多个线程正在向CopyOnWriteArrayList添加数据,读还是会读到旧的数据。CopyOnWrite并发容器用于读多写少的并发场景。
CopyOnWriteArrayList 比 Vector厉害在哪里?
CopyOnWriteArrayList底层采用了Lock锁,是JDK层面的,效率高!
Vector底层采用了Synchronized加锁的方式,保证了数据的安全性,但是效率低下!
解释一下:Synchronized是Java内置的机制,是JVM层面的,效率低是因为底层操作依赖于操作系统,操作系统切换线程要从用户态切换到内核态,花费很多时间。
优点:
- CopyOnWriteArrayList 并发安全且性能比 Vector 好。Vector 是增删改查方法都加了synchronized 来保证同步,但是每个方法执行的时候都要去获得锁,性能就会大大下降,而 CopyOnWriteArrayList 只是在增删改上加锁,但是读不加锁,在读方面的性能就好于 Vector。
缺点:
-
数据一致性问题。这种实现只是保证数据的最终一致性,不能保证数据的实时一致性。在添加到拷贝数据而还没进行替换的时候,读到的仍然是旧数据。
-
内存占用问题。如果对象比较大,频繁地进行替换会消耗内存,从而引发 Java 的 GC 问题,这个时候,我们应该考虑其他的容器,例如 ConcurrentHashMap。
2、Set
Set和List同样是多线程下不安全的集合类,同样会报并发修改异常!
/** * Set集合 */ public class TestSet { public static void main(String[] args) { Set<String> set = new HashSet<>(); for (int i = 1; i <= 20; i++) { new Thread(() -> { set.add(UUID.randomUUID().toString().substring(0, 5)); System.out.println(set); }, String.valueOf(i)).start(); } } }
解决方案:
import java.util.concurrent.CopyOnWriteArraySet; public class TestSet { public static void main(String[] args) { /** * 解决方案 * set集合没有可替换的集合 * 1. Set<String> set = Collections.synchronizedSet(new HashSet<>()); * 2. Set<String> set = new CopyOnWriteArraySet<>(); */ Set<String> set = new CopyOnWriteArraySet<>(); for (int i = 1; i <= 20; i++) { new Thread(() -> { set.add(UUID.randomUUID().toString().substring(0, 5)); System.out.println(set); }, String.valueOf(i)).start(); } } }
和面试官谈到这里,一般都会问hashSet的底层实现原理。
底层其实就是用hashMap实现的
HashSet底层使用了哈希表来支持的,特点:存储快 往HashSet添加元素的时候,HashSet会先调用元素的HashCode方法得到元素的哈希值,然后通过元素的哈希值经过异或移位等运算,就可以算出该元素在哈希表中的存储位置。如果算出的元素存储的位置目前没有任何元素存储,那么该元素可以直接存储在该位置上;如果算出的元素的存储位置上目前已经有了其他的元素,那么还会调用该元素的equals方法 ,与该位置的元素进行比较一次,如果过equals方法返回的是true,那么该位置上的元素就会被视为重复元素,不允许被添加,如果false,则允许添加。
3、Map
和List、Set一样,也会有并发修改异常!
import java.util.concurrent.ConcurrentHashMap; /** * Map集合 */ public class TestMap { public static void main(String[] args) { //默认等价于:new HashMap<>(16,0.75); //初始容量、加载因子 /** * 解决方案 * 1. Map<String, String> hashMap = Collections.synchronizedMap(new HashMap<>()); * 2. Map<String, String> hashMap = new ConcurrentHashMap<>(); */ Map<String, String> hashMap = new ConcurrentHashMap<>(); for (int i = 0; i < 20; i++) { new Thread(()->{ hashMap.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0,5)); System.out.println(hashMap); },String.valueOf(i)).start(); } } }
六、彻底了解Callable
Callable是JUC包下的,查看以下官方文档
得到结论:
- 返回结果
- 抛出异常
- 方法不同,run()/call()
泛型的参数等于方法的返回值。
Thread构造参数只认识Runnable,那我们继续查看Runnable官方文档
里面有一个实现类:FutureTask,继续往下看
FutureTask的构造方法接受Callable参数。我们可以通过这个适配类来启动线程,这下就圆满了!
import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; /** * Callable */ public class TestCallable { public static void main(String[] args) throws ExecutionException, InterruptedException { //思考:怎么启动这个线程 MyThread myThread = new MyThread(); FutureTask task = new FutureTask(myThread);//适配类 new Thread(task,"A").start();//线程的启动方式,有且只有一个 new Thread(task,"B").start();//结果会被缓存,提高效率。 //这个get()方法可能会阻塞,因为它要等call()方法执行完毕,等待结果返回,一般放到最后一行执行,或者通过异步通信来处理! Integer num = (Integer) task.get();//获取Callable的返回值 System.out.println(num); } } class MyThread implements Callable<Integer> { @Override public Integer call() { System.out.println("call()"); //如果这里是耗时操作,返回值就会等待 return 1024; } }
//输出结果: call() 1024 Process finished with exit code 0
注意点:1、结果有缓存,2、返回值可能阻塞
七、JUC包下的常用辅助类
1、CountDownLatch
import java.util.concurrent.CountDownLatch; /** * CountDownLatch计数器 */ public class TestCountDownLatch { public static void main(String[] args) throws InterruptedException { //它是一个减法计数器 CountDownLatch countDownLatch = new CountDownLatch(6); for (int i = 1; i <= 6; i++) { new Thread(()->{ System.out.println(Thread.currentThread().getName()+" Go Out!"); countDownLatch.countDown();//数量-1 },String.valueOf(i)).start(); } countDownLatch.await();//等待计数器归零,然后才向下执行。 System.out.println("Close Door"); //想象放学了,学生一个一个的走出教室,等待学生全部走完了,才能锁门! } }
2、CyclicBarrier
看不懂?没关系,我也看不懂,咱看代码!
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; /** * CyclicBarrier计数器 */ public class TestCyclicBarrier { public static void main(String[] args) { //它是一个加法计数器 //要求:集齐七颗龙珠召唤神龙! CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{ System.out.println("恭喜你!召唤神龙成功"); }); for (int i = 1; i <= 7; i++) { //jdk1.8不需要加final int finalI = i; new Thread(()->{ System.out.println(Thread.currentThread().getName()+"获得了第"+ finalI +"颗龙珠"); try { cyclicBarrier.await();//等待集齐 } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }).start(); } //假如需要集齐8颗龙珠,只集齐了7颗,程序一直等待。 } }
3、Semaphore
import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; /** * Semaphore信号量 */ public class TestSemaphore { public static void main(String[] args) { //线程数量:停车位,限流 Semaphore semaphore = new Semaphore(3); for (int i = 1; i <= 6; i++) { int finalI = i; new Thread(()->{ try { semaphore.acquire();//acquire()得到许可证 System.out.println(Thread.currentThread().getName()+"抢到了车位"); TimeUnit.SECONDS.sleep(2);//睡2秒 System.out.println(Thread.currentThread().getName()+"释放了车位"); } catch (InterruptedException e) { e.printStackTrace(); }finally { semaphore.release();//release()释放许可证 } },String.valueOf(i)).start(); //想象抢车位,总共有3个车位,6台车去抢,先抢到了3个车位的车用完之后就走了,剩下的3个去抢车位 } } }
semaphore.acquire():获得资源,如果资源已经使用完了,就等待资源释放后再进行使用!
semaphore.release():释放资源,会将当前的信号量释放+1,然后唤醒等待的线程!
作用: 多个共享资源互斥的使用!并发限流,控制最大的线程数!
八、读写锁
读写锁(Readers-Writer Lock)顾名思义是一把锁分为两部分:读锁和写锁,其中读锁允许多个线程同时获得,因为读操作本身是线程安全的,而写锁则是互斥锁,不允许多个线程同时获得写锁,并且写操作和读操作也是互斥的。总结来说,读写锁的特点是:读读不互斥、读写互斥、写写互斥。
ReadWriteLock读写锁:读的时候可以被多线程共享,写的时候只能一个线程去写
/** * ReadWriteLock读写锁 */ public class TestReadWriteLock { public static void main(String[] args) { MyCache myCache = new MyCache(); for (int i = 1; i <= 5; i++) { int finalI = i; new Thread(() -> { myCache.put(String.valueOf(finalI), String.valueOf(finalI)); }, String.valueOf(i)).start(); } for (int i = 1; i <= 5; i++) { int finalI = i; new Thread(() -> { myCache.get(String.valueOf(finalI)); }, String.valueOf(i)).start(); } } } //自定义缓存 class MyCache { //volatile保证了变量的可见性,后面会细讲 private volatile Map<String, Object> map = new HashMap<>(); //没加锁导致线程插队写入。 //存--->写 public void put(String key, Object value) { System.out.println(Thread.currentThread().getName() + "开始写入"); map.put(key, value); System.out.println(Thread.currentThread().getName() + "写入完成~"); } //取--->读 public void get(String key) { System.out.println(Thread.currentThread().getName() + "开始读取"); map.get(key); System.out.println(Thread.currentThread().getName() + "读取完成~"); } }
2开始写入 4开始写入 //2还没写入完成,就被其他线程插进来了。 1开始写入 3开始写入 1写入完成~ 4写入完成~ 2写入完成~ Process finished with exit code 0
注意:这里我们可以采用重量锁Synchronized和轻量锁Lock来保证数据的安全。
但是我们这里使用更加细粒度的ReadWriteLock读写锁来完成。
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; //读读并发、读写互斥、写写互斥 public class TestReadWriteLock { public static void main(String[] args) { MyCache myCache = new MyCache(); for (int i = 1; i <= 5; i++) { int finalI = i; new Thread(() -> { myCache.put(String.valueOf(finalI), String.valueOf(finalI)); }, String.valueOf(i)).start(); } for (int i = 1; i <= 5; i++) { int finalI = i; new Thread(() -> { myCache.get(String.valueOf(finalI)); }, String.valueOf(i)).start(); } } } //自定义缓存 class MyCache { //volatile保证了变量的可见性,后面会细讲 private volatile Map<String, Object> map = new HashMap<>(); //读写锁 private ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); //存--->写 public void put(String key, Object value) { readWriteLock.writeLock().lock();//加写锁 try { System.out.println(Thread.currentThread().getName() + "开始写入"); map.put(key, value); System.out.println(Thread.currentThread().getName() + "写入完成~"); } catch (Exception e) { e.printStackTrace(); }finally { readWriteLock.writeLock().unlock();//解写锁 } } //取--->读 public void get(String key) { readWriteLock.readLock().lock();//加读锁 try { System.out.println(Thread.currentThread().getName() + "开始读取"); map.get(key); System.out.println(Thread.currentThread().getName() + "读取完成~"); } catch (Exception e) { e.printStackTrace(); }finally { readWriteLock.readLock().unlock();//解读锁 } } }
1开始写入 1写入完成~ 3开始写入 3写入完成~ 4开始写入 4写入完成~ 2开始写入 2写入完成~ 5开始写入 5写入完成~ 2开始读取 1开始读取 3开始读取 3读取完成~ 1读取完成~ 5开始读取 5读取完成~ 4开始读取 4读取完成~ 2读取完成~ Process finished with exit code 0
读锁与写锁互斥,加入读锁是为了防止读的时候写线程进来,打破原子性。
你们可能在外面听到的读写锁叫:
- 共享锁(读锁):多个线程共享资源。
- 排它锁又或者独占锁(写锁):单个线程独占资源。
1、为什么会有读写锁?
Synchronized 和 ReentrantLock 都是独占锁,即在同一时刻只有一个线程获取到锁。
然而在有些业务场景中,我们大多在读取数据,很少写入数据,这种情况下,如果仍使用独占锁,效率将及其低下。
于是Java就提供了ReentrantReadWriteLock锁来达到保证线程安全的前提下提高并发效率。
九、阻塞队列
在了解阻塞队列之前,先了解队列。
1、什么是队列?
Queue:队列,是一种特殊的线性表,是一种先进先出(FIFO)的数据结构。(FIFO:First In First Out)
在Java中,队列是一种基本的集合类型。与List和Set是同级别的,都有共同父类Collection接口。
它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作。进行插入操作的端称为队尾,进行删除操作的端称为队头。
在现实生活中队列也很常见,比如排队买票,排队上车,食堂打饭一个个等待处理,这些都是一种队列模型。而且在程序设计中队列使用也非常广泛:比如多线程中等待处理的任务、排队等待获取某个锁的线程等。
Queue的接口定义如下:
public interface Queue<E> extends Collection<E> { //向队列中添加一个元素;如果有空间则添加成功返回true,否则则抛出IllegalStateException异常 boolean add(E e); //向队列中添加一个元素;如果有空间则添加成功返回true,否则返回false boolean offer(E e); //从队列中删除一个元素;如果元素存在则返回队首元素,否则抛出NoSuchElementException异常 E remove(); //从队列中删除一个元素;如果元素存在则返回队首元素,否则返回null E poll(); //从队列获取一个元素,但是不删除;如果元素存在则返回队首元素,否则抛出NoSuchElementException异常 E element(); //从队列获取一个元素,但是不删除;如果元素存在则返回队首元素,否则返回null E peek(); }
2、什么是双端队列?
Deque:双端队列,是一种具有队列和栈的性质的数据结构。是限定插入和删除操作在表的两端进行的线性表。(double-ended queue,双端队列)
在双端队列中,使用first
和last
来表示队列的首、尾两端,可以在双端进行元素的插入和删除。
offerFirst(A)表示在队首进行元素插入,pollLast()表示在队尾进行元素删除。
可以看到Deque是Queue的子接口,这就表示除了拥有Queue接口的方法之外,还定义了一些特殊的双端队列方法。
public interface Deque<E> extends Queue<E> { // 向队首添加一个元素;如果有空间则添加成功返回true,否则则抛出`IllegalStateException`异常 void addFirst(E e); // 向队尾添加一个元素;如果有空间则添加成功返回true,否则则抛出`IllegalStateException`异常 void addLast(E e); // 向队首添加一个元素;如果有空间则添加成功返回true,否则返回false boolean offerFirst(E e); // 向队尾添加一个元素;如果有空间则添加成功返回true,否则返回false boolean offerLast(E e); // 从队首删除一个元素;如果元素存在则返回队首元素,否则抛出`NoSuchElementException`异常 E removeFirst(); // 从队尾删除一个元素;如果元素存在则返回队尾元素,否则抛出`NoSuchElementException`异常 E removeLast(); // 从队首删除一个元素;如果元素存在则返回队首元素,否则返回null E pollFirst(); // 从队尾删除一个元素;如果元素存在则返回队首元素,否则返回null E pollLast(); // 从队首获取一个元素,但是不删除;如果元素存在则返回队首元素,否则抛出`NoSuchElementException`异常 E getFirst(); // 从队尾获取一个元素,但是不删除;如果元素存在则返回队尾元素,否则抛出`NoSuchElementException`异常 E getLast(); // 从队首获取一个元素,但是不删除;如果元素存在则返回队首元素,否则返回null E peekFirst(); // 从队尾获取一个元素,但是不删除;如果元素存在则返回队尾元素,否则返回null E peekLast(); // 如果元素o存在,则从队列中删除第一次出现的该元素 boolean removeFirstOccurrence(Object o); // 如果元素o存在,则从队列中删除最后一次出现的该元素 boolean removeLastOccurrence(Object o); // 其他方法省略.... }
它们其中有一个实现类:LinkedList是基于链表实现的List的一个数据集合,LinkedList还实现了Queue接口和Deque接口。也就是说我们还可以直接使用LinkedList来实现队列的操作。这里就不举例了。
Java中Queue的实现其实是非线程安全的,如果在多线程环境下进行Queue的入队和出队操作,会产生不一致的情况。所以Java也提供了线程安全的队列类——阻塞队列BlockingQueue。
3、什么是阻塞队列?
在Java中,提供了两种线程安全队列的实现方式:一种是阻塞机制,另一种是非阻塞机制。
使用阻塞机制的队列,是通过锁实现的,在入队和出队时通过加锁避免并发操作。而使用非阻塞机制的队列,是通过使用CAS方式实现,比ConcurrentLinkedQueue。
BlockingQueue:阻塞队列,是基于阻塞机制实现的线程安全的队列。而阻塞机制的实现是通过在入队和出队时加锁的方式避免并发操作。
BlockingQueue不同于普通的Queue的区别主要是:
- 通过在入队和出队时进行加锁,保证了队列线程安全
- 支持阻塞的入队和出队方法:当队列满时,会阻塞入队的线程,直到队列不满;当队列为空时,会阻塞出队的线程,直到队列中有元素。
BlockingQueue常用于生产者-消费者模型中,往队列里添加元素的是生产者,从队列中获取元素的是消费者;通常情况下生产者和消费者都是由多个线程组成;下图所示则为一个最常见的生产者-消费者模型,生产者和消费者之间通过队列平衡两者的的处理能力、进行解耦等。
BlockingQueue接口定义如下:
public interface BlockingQueue<E> extends Queue<E> { //入队一个元素,有空间则添加并返回true,没有则抛出IllegalStateException异常。 boolean add(E e); //入队一个元素,有空间则添加并返回true,没有则返回false。 boolean offer(E e); //入队一个元素,有空间则添加,没有则一直阻塞等待。 void put(E e) throws InterruptedException; //入队一个元素,有空间则添加并返回true,没有则等timeout时间,添加失败则返回false boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; //出队一个元素,有元素就出队,没有则一直阻塞等待。 E take() throws InterruptedException; //出队一个元素,有元素就出队,没有元素则等timeout时间,出队失败则返回false E poll(long timeout, TimeUnit unit) throws InterruptedException; //返回该队列剩余的容量(如果没有限制则返回Integer.MAX_VALUE) int remainingCapacity(); //如果元素o在队列中存在,则从队列中删除 boolean remove(Object o); //判断队列中是否存在元素o public boolean contains(Object o); //将队列中的所有元素出队,并添加到给定的集合c中,返回出队的元素数量 int drainTo(Collection<? super E> c); //将队列中的元素出队,限制数量maxElements个,并添加到给定的集合c中,返回出队的元素数量 int drainTo(Collection<? super E> c, int maxElements); }
BlockingQueue主要提供了四类方法:
方法 | 抛出异常 | 有返回值 | 阻塞等待 | 超时等待 |
---|---|---|---|---|
入队 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
出队 | remove() | poll() | take() | poll(time,unit) |
获取队首元素 | element() | peek() | 没有 | 没有 |
BlockingQueue除了和Queue相同的抛出异常和有返回值方法之外,还提供了两种阻塞方法:
- 当队列没有空间/元素时,一直阻塞。
- 在指定时间尝试入队/出队。
BlockingQueue实现类:
实现类 | 功能 |
---|---|
ArrayBlockingQueue | 基于数组的阻塞队列,维护了一个定长数组,以便缓存队列中的数据对象,所以是有界队列 |
LinkedBlockingQueue | 基于链表的阻塞队列,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),默认是一个无界队列;也可以通过构造方法中的capacity设置最大元素数量,所以也可以是有界队列 |
SynchronousQueue | 一种没有缓冲的队列,生产者产生的数据直接会被消费者获取并且立刻消费,所以是无界队列 |
PriorityBlockingQueue | 基于优先级的阻塞队列,底层基于数组实现,是一个无界队列 |
DelayQueue | 延迟队列,其中的元素只有到了其指定的延迟时间,才能够从队列中出队 |
其中在日常开发中用的比较多的是 ArrayBlockingQueue 和 LinkedBlockingQueue。
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit; /** * 阻塞队列 */ public class TestArrayBlockingQueue { public static void main(String[] args) throws InterruptedException { test4(); } /** * 抛出异常 */ public static void test1(){ //需要指定队列的大小 ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3); System.out.println(blockingQueue.add("a")); System.out.println(blockingQueue.add("b")); System.out.println(blockingQueue.add("c")); //Exception in thread "main" java.lang.IllegalStateException: Queue full //System.out.println(blockingQueue.add("d")); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); //Exception in thread "main" java.util.NoSuchElementException //System.out.println(blockingQueue.remove()); //System.out.println(blockingQueue.element());//获得队列首个元素 } /** * 有返回值,不抛出异常 */ public static void test2(){ ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3); System.out.println(blockingQueue.offer("张三")); System.out.println(blockingQueue.offer("李四")); System.out.println(blockingQueue.offer("王五")); System.out.println(blockingQueue.offer("王五"));//false,不抛出异常 System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll());//null //System.out.println(blockingQueue.peek());//获得队列首个元素 } /** * 阻塞等待(一直等待) */ public static void test3() throws InterruptedException { ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3); blockingQueue.put("黄子澄"); blockingQueue.put("路哈"); blockingQueue.put("灌水同"); //blockingQueue.put("李成");//队列没有空间,程序一直等待。 System.out.println(blockingQueue.take()); System.out.println(blockingQueue.take()); System.out.println(blockingQueue.take()); //System.out.println(blockingQueue.take());//一直阻塞 } /** * 阻塞等待(超时等待) */ public static void test4() throws InterruptedException { ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3); System.out.println(blockingQueue.offer("唱")); System.out.println(blockingQueue.offer("跳")); System.out.println(blockingQueue.offer("rap")); //System.out.println(blockingQueue.offer("鸡你太美", 5, TimeUnit.SECONDS));//等待5秒后如果还是失败则终止程序并返回false。 System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll(5,TimeUnit.SECONDS));//等待5秒如果还是失败终止程序并返回null; } }
十、同步队列
SynchronousQueue同步队列同样也是BlockingQueue接口的实现类,它是一种无缓冲的等待队列,也就是说存入一个元素,必须等待取出来之后,才能继续往里面队列存。
import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; /** * 同步队列 */ public class TestSynchronousQueue { public static void main(String[] args) { //声明一个同步队列 SynchronousQueue<String> blockingQueue = new SynchronousQueue<>(); new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + "put 1"); blockingQueue.put(String.valueOf(1)); System.out.println(Thread.currentThread().getName() + "put 2"); blockingQueue.put(String.valueOf(2)); System.out.println(Thread.currentThread().getName() + "put 3"); blockingQueue.put(String.valueOf(3)); } catch (InterruptedException e) { e.printStackTrace(); } }, "A").start(); new Thread(() -> { try { TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName() + "=>" + blockingQueue.take()); TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName() + "=>" + blockingQueue.take()); TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName() + "=>" + blockingQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } }, "B").start(); } }
A put 1 B=>1 A put 2 B=>2 A put 3 B=>3 Process finished with exit code 0
SynchronousQueue同步队列与其他的BlockingQueue不同,同步队列没有容量,也可以称为容量为1的队列,put了一个元素,就必须从里面先take出来,否则一直阻塞。
十一、线程池(重点)
之前我们是使用线程的时候就去创建一个线程,看起来很方便快捷,但是如果并发的线程数量较多时,而每一个线程且都只是执行一个时间很短的任务,这样会引发一些问题:
-
线程的创建和销毁都需要时间,线程数量过多时,耗费大量时间,影响效率。
-
频繁的创建和销毁线程会占用大量的内存,还可能引发内存抖动,频繁触发GC,最直接的表现就是卡顿。长而久之,内存资源占用过多或者内存碎片过多,系统甚至会出现OOM。
-
在操作系统中,CPU都是遵循时间片轮转机制进行处理任务,线程数量过多时,必然会引发CPU频繁的进行线程上下文切换。这个代价是非常昂贵的。
有没有一种技术来优化这些资源的使用嘞?
1、池化技术
把一些能够复用的东西(比如说数据库连接、线程)放到池中,避免重复创建、销毁的开销,从而极大提高性能。
在Java中线程池就可以达到这样的效果。
2、线程池的概念
线程池,本质上是一种对象池,用于管理线程资源。在任务执行前,需要从线程池中拿出线程来执行。在任务执行完成之后,需要把线程放回线程池。通过线程的这种反复利用机制,可以有效地避免直接创建线程所带来的坏处。
3、线程池的使用
四大方法、七大参数、四种拒绝策略。
1. 四大方法。
Java通过了JUC包下的Executors工具类提供了4种创建线程池的方式。
newFixedThreadPool
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 创建固定线程数量的线程池 * 创建一个固定大小的线程池,该方法可指定线程池的固定大小,对于超出的线程会在LinkedBlockingQueue队列中等待 * 核心线程数可以指定,线程空闲时间为0 */ public class TestCreatePool { public static void main(String[] args) { ExecutorService threadPool = Executors.newFixedThreadPool(5); try { for (int i = 0; i < 10; i++) { //通过线程池开启线程 threadPool.execute(()->{ System.out.println(Thread.currentThread().getName()); }); } } catch (Exception e) { e.printStackTrace(); }finally { //关闭线程池 threadPool.shutdown(); } } } /* pool-1-thread-2 pool-1-thread-5 pool-1-thread-4 pool-1-thread-3 pool-1-thread-3 pool-1-thread-3 pool-1-thread-1 pool-1-thread-4 pool-1-thread-5 pool-1-thread-2 */
newCachedThreadPool
/** * 可缓存无界线程池 * 当线程池中的线程空闲时间超过60s则会自动回收空线程 * 当任务超过线程池的线程数则创建新线程。线程池的大小上限Integer.MAX_VALUE,约等于21亿 */ public class TestCreatePool { public static void main(String[] args) { ExecutorService threadPool = Executors.newCachedThreadPool(); try { for (int i = 0; i < 10; i++) { threadPool.execute(()->{ System.out.println(Thread.currentThread().getName()); }); } } catch (Exception e) { e.printStackTrace(); }finally { //关闭线程池 threadPool.shutdown(); } } } /* pool-1-thread-1 pool-1-thread-5 pool-1-thread-4 pool-1-thread-7 pool-1-thread-3 pool-1-thread-2 pool-1-thread-6 pool-1-thread-8 pool-1-thread-10 pool-1-thread-9 */
newSingleThreadExecutor
/** * 创建一个单线程化的线程池 * 该方法无参数,所有任务都保存队列LinkedBlockingQueue中,核心线程数为1,线程空闲时间为0 * 等待唯一的单线程来执行任务,并保证所有任务按照指定顺序(FIFO或优先级)执行 */ public class TestCreatePool { public static void main(String[] args) { ExecutorService threadPool = Executors.newSingleThreadExecutor(); try { for (int i = 0; i < 10; i++) { threadPool.execute(()->{ System.out.println(Thread.currentThread().getName()); }); } } catch (Exception e) { e.printStackTrace(); }finally { //关闭线程池 threadPool.shutdown(); } } } /* pool-1-thread-1 pool-1-thread-1 pool-1-thread-1 pool-1-thread-1 pool-1-thread-1 pool-1-thread-1 pool-1-thread-1 pool-1-thread-1 pool-1-thread-1 pool-1-thread-1 */
newScheduledThreadPool
/** * 创建一个定长的线程池 * 可以指定线程池核心线程数,支持定时及周期性任务的执行 */ public class TestCreatePool { public static void main(String[] args) { ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(1); //延迟5s后执行一次任务 threadPool.schedule(()->{ System.out.println(Thread.currentThread().getName() + ",5s"); },5,TimeUnit.SECONDS); //定时任务:延迟2s后,每3s一次地周期性执行任务 ScheduledFuture<?> scheduledFuture = threadPool.scheduleAtFixedRate(() -> { System.out.println(Thread.currentThread().getName() + ",3s"); }, 2, 3, TimeUnit.SECONDS); //20秒之后关闭周期性执行任务 threadPool.schedule(()->{ scheduledFuture.cancel(true); },20 ,TimeUnit.SECONDS); //延迟0秒,等上个任务执行完才开始2s计时 threadPool.scheduleWithFixedDelay(()->{ System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date());); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } },0,2,TimeUnit.SECONDS); // schedule(Runnable command, long delay, TimeUnit unit),延迟一定时间后执行Runnable里的任务; // scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit),延迟一定时间后,以间隔period时间的频率周期性地执行任务; // scheduleWithFixedDelay(Runnable command, long initialDelay, long delay,TimeUnit unit),与 scheduleAtFixedRate()方法很类似, /* 但是不同的是 scheduleAtFixedRate ,是以上一个任务开始的时间计时,period时间过去后,检测上一个任务是否执行完毕,如果上一个任务执行完毕,则当前任务立即执行,如果上一个任务没有执行完毕,则需要等上一个任务执行完毕后立即执行。 scheduleWithFixedDelay,是以上一个任务结束时开始计时,period时间过去后,立即执行。 */ // 注意:线程池一旦关闭,周期任务不再执行。 } }
看一下阿里巴巴开发手册并发编程的规范
叫我们线程池不要使用Executors创建,要用ThreadPoolExecutor创建。
四大方法源码分析
//newFixedThreadPool public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } //newCachedThreadPool public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } //newSingleThreadExecutor public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } //newScheduledThreadPool public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
本质还是创建一个ThreadPoolExecutor。
2. 七大参数
//ThreadPoolExecutor源码分析 public ThreadPoolExecutor(int corePoolSize,//核心线程大小 int maximumPoolSize,//最大核心线程大小 long keepAliveTime,//存活时间 TimeUnit unit,//超时单位 BlockingQueue<Runnable> workQueue,//阻塞队列 ThreadFactory threadFactory,//线程工厂 RejectedExecutionHandler handler//拒绝策略) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
画个图理解每个参数的含义
3. 四大拒绝策略
跟踪源码
查看官方文档解释
具体是什么意思我们通过代码测试。
4. 手动创建线程池
import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * 使用ThreadPoolExecutor创建线程池 */ public class TestThreadPool { public static void main(String[] args) { //工作中通过ThreadPoolExecutor手动创建线程池 ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor( 2,//corePoolSize核心线程数 5,//maximumPoolSize最大线程数 5,//keepAliveTime线程存活时间 TimeUnit.SECONDS,//存活时间单位 new LinkedBlockingDeque<>(3),//workQueue阻塞队列 Executors.defaultThreadFactory(),//threadFactory线程工厂一般使用默认就行 new ThreadPoolExecutor.AbortPolicy());//handler拒绝策略 try { //使用线程池来创建线程 //循环次数就代表线程数 for (int i = 1; i <= 2; i++) { poolExecutor.execute(()->{ System.out.println(Thread.currentThread().getName()); }); } } catch (Exception e) { e.printStackTrace(); }finally { //线程池用完,程序结束,关闭线程池 poolExecutor.shutdown(); } } }
//第一次2个线程,正常处理(现有2个线程处理业务) pool-1-thread-1 pool-1-thread-2 Process finished with exit code 0 //第二次5个线程,核心线程只有2个处理业务,剩下3个线程在阻塞队列(现有2个线程处理) pool-1-thread-1 pool-1-thread-2 pool-1-thread-1 pool-1-thread-2 pool-1-thread-1 Process finished with exit code 0 //第三次6个线程,核心线程和阻塞队列都满了,于是开启1个最大线程窗口处理(现有3个线程处理业务) pool-1-thread-1 pool-1-thread-3 pool-1-thread-2 pool-1-thread-3 pool-1-thread-1 pool-1-thread-2 Process finished with exit code 0 //第四次7个线程,开启2个最大线程窗口处理(现有4个线程处理业务) pool-1-thread-2 pool-1-thread-3 pool-1-thread-1 pool-1-thread-3 pool-1-thread-4 pool-1-thread-2 pool-1-thread-1 Process finished with exit code 0 //第五次8个线程,开启3个最大线程窗口处理(现有5个线程处理业务) pool-1-thread-1 pool-1-thread-4 pool-1-thread-4 pool-1-thread-5 pool-1-thread-3 pool-1-thread-2 pool-1-thread-2 pool-1-thread-3 Process finished with exit code 0 //第六次9个线程,最大线程窗口也慢了,就启动AbortPolicy拒绝策略(现有5个线程处理业务) pool-1-thread-2 pool-1-thread-4 pool-1-thread-3 pool-1-thread-2 pool-1-thread-5 pool-1-thread-1 pool-1-thread-3 pool-1-thread-4 java.util.concurrent.RejectedExecutionException: Task com.hnguigu.pool.TestThreadPool$$Lambda$1/1831932724@7699a589 rejected from java.util.concurrent.ThreadPoolExecutor@58372a00[Running, pool size = 5, active threads = 2, queued tasks = 0, completed tasks = 6] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) at com.hnguigu.pool.TestThreadPool.main(TestThreadPool.java:30) Process finished with exit code 0 //CallerRunsPolicy拒绝策略 pool-1-thread-2 pool-1-thread-3 pool-1-thread-4 pool-1-thread-3 main pool-1-thread-1 pool-1-thread-4 pool-1-thread-2 pool-1-thread-5 Process finished with exit code 0 //DiscardOldestPolicy拒绝策略 pool-1-thread-1 pool-1-thread-4 pool-1-thread-5 pool-1-thread-3 pool-1-thread-3 pool-1-thread-2 pool-1-thread-4 pool-1-thread-1 Process finished with exit code 0 //DiscardPolicy拒绝策略 pool-1-thread-1 pool-1-thread-5 pool-1-thread-2 pool-1-thread-4 pool-1-thread-3 pool-1-thread-3 pool-1-thread-1 pool-1-thread-5 Process finished with exit code 0
- new ThreadPoolExecutor.AbortPolicy():拒绝处理该任务,并抛出RejectedExecutionException异常
- new ThreadPoolExecutor.CallerRunsPolicy():由调用线程处理该任务,如果调用线程是主线程,那么主线程会调用执行器中的execute方法来执行该任务。
- new ThreadPoolExecutor.DiscardOldestPolicy():尝试丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)如果竞争失败还是会被丢弃。
- new ThreadPoolExecutor.DiscardPolicy():放弃当前任务,不会抛出异常。
5. 调优
线程的本质就是为了执行任务,在计算机的世界里,任务大致分为两类,CPU密集型任务和 IO密集型任务。
- CPU密集型任务,比如公式计算、资源解码等。这类任务要进行大量的计算,全都依赖CPU的运算能力,持久消耗CPU资源。所以针对这类任务,其实不应该开启大量线程。因为线程越多,花在线程切换的时间就越多,CPU执行效率就越低,一般CPU密集型任务同时进行的数量等于CPU的核心数,最多再加个1。
- IO密集型任务,比如网络读写、文件读写等。这类任务不需要消耗太多的CPU资源,绝大部分时间是在IO操作上。所以针对这类任务,可以开启大量线程去提高CPU的执行效率,一般IO密集型任务同时进行的数量等于CPU的核心数的两倍。
获取CPU核心数:Runtime.getRuntime().availableProcessors()
public class TestThreadPool { public static void main(String[] args) { System.out.println("CPU核数:"+Runtime.getRuntime().availableProcessors()); //工作中通过ThreadPoolExecutor手动创建线程池 ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor( 2, Runtime.getRuntime().availableProcessors(),//最大核心线程数 5, TimeUnit.SECONDS, new LinkedBlockingDeque<>(3), Executors.defaultThreadFactory(),//线程工厂一般使用默认就行 new ThreadPoolExecutor.DiscardPolicy()); } }
当面试官问你,如何设置线程池最大核心线程数?
答:CPU密集型(CPU是几核就设置多少)、IO密集型 (判断程序中十分耗IO的线程,然后为其分配,耗IO的线程 * 2)---->属于调优问题。
十二、四大函数式接口
新时代的程序员必须要会:lambda表达式、链式编程、函数式接口、Stream流式计算。
函数式接口:在java中是指有且仅只有一个抽象方法的接口。即适用于函数式编程场景的接口。而Java中的,函数式编程体现就是Lambda表达式,所以函数式接口就是可以适用于Lambda使用的接口。
某个接口上声明了 @FunctionalInterface 注解,那么编译器就会按照函数式接口的定义来要求该接口。
四大函数式接口指的是Consumer、Function、Predicate、Supplier。位于java.util.function包下。
1、Function(函数型接口)
@FunctionalInterface public interface Function<T, R> { //接受一个参数T,返回一个结果R R apply(T t); }
public class Test { public static void main(String[] args) { //相当于一个工具类,可以根据自己的需求来实现其功能。 Function<String, String> fun1 = new Function<String, String>() { @Override public String apply(String s) { return s.toUpperCase(); } }; //lambda简化后 Function<String, String> fun2 = s -> s.toUpperCase(); System.out.println(fun2.apply("hello,world"));//HELLO,WORLD } }
2、Predicate(断定型接口)
@FunctionalInterface public interface Predicate<T> { //接受一个参数T,返回boolean类型的值 boolean test(T t); }
public class TestPredicate { public static void main(String[] args) { //相当于一个工具类,可以根据自己的需求来实现其功能。 Predicate<String> fun1 = new Predicate<String>() { @Override public boolean test(String s) { return s.isEmpty(); } }; //lambda简化后 Predicate<String> fun2 = s -> s.isEmpty(); System.out.println(fun2.test(""));//true } }
3、Consumer(消费型接口)
@FunctionalInterface public interface Consumer<T> { //接收一个参数T,没有返回值 void accept(T t); }
public class TestConsumer { public static void main(String[] args) { //因为没有出参,常用于打印,发短信等 Consumer<String> fun1 = new Consumer<String>() { @Override public void accept(String s) { System.out.println(s); } }; //lambda简化后 Consumer<String> fun2 = s -> System.out.println(s); fun2.accept("你好啊,陈冠希!");//你好啊,陈冠希! } }
4、Supplier(供给型接口)
@FunctionalInterface public interface Supplier<T> { //没有输入参数,只有返回值T T get(); }
public class TestSupplier { public static void main(String[] args) { //常用于符合条件时调用获取结果;运行结果提前定义,但不运行。 Supplier<Double> fun1 = new Supplier<Double>() { @Override public Double get() { return Math.PI; } }; //lambda简化后 Supplier<Double> fun2 = () -> Math.PI; System.out.println(fun2.get());//3.141592653589793 } }
十三、Stream流
Stream 流是 Java 8 新提供给开发者的一组操作集合的 API,将要处理的元素集合看作一种流, 流在管道中传输, 并且可以在管道的节点上进行处理, 比如筛选、排序、聚合等。元素流在管道中经过中间操作(intermediate operation)的处理,最后由终端操作 (terminal operation) 得到前面处理的结果。Stream 流可以极大的提高开发效率,也可以使用它写出更加简洁明了的代码。
1、Stream流的创建
常见有三种创建stream流的方式:
- 集合:Collection.stream()
- 数组:Arrays.stream()
- 静态方法:Stream.of()
public class TestStream { public static void main(String[] args) { //1、使用Collection集合 List<String> list = Arrays.asList("a", "b", "c"); Stream<String> stream1 = list.stream();//创建一个顺序流 Stream<String> stringStream = list.parallelStream();//创建一个并行流 //2、使用Arrays.stream(T[] array)方法用数组创建流 int[] array = {1,2,3,4,5,6}; IntStream stream2 = Arrays.stream(array); //3、使用Stream.of()静态方法 Stream<Integer> stream3 = Stream.of(1,2,3,4,5,6); } }
stream流是串行流(顺序流),由主线程按顺序对流进行操作,而parallelStream是并行流,内部以多线程并行的方式对流进行操作。但前提是流中的数据处理没有顺序要求,如果流中的数据量足够大,并行流可以加快处速度。除了直接创建并行流,还可以通过parallel()把顺序流转换成并行流:
2、Stream流的使用
java.util.stream包下的Stream接口查看API文档
以下案例使用到了:lambda、链式编程、函数式接口、流式计算
import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /** * 用户实体类 */ //get、set、无参、有参 @Data @NoArgsConstructor @AllArgsConstructor public class User { private int id; private String name; private int age; }
import java.util.Arrays; import java.util.List; /** * 题目要求:一分钟内完成此题,只能用一行代码完成! * 现有5个用户,进行筛选! * 1、ID 必须是偶数 * 2、Age 必须大于23岁 * 3、Name 转为大写字母 * 4、Name 倒序 * 5、只输出一个用户 */ public class Test { public static void main(String[] args) { User u1 = new User(1, "a", 21); User u2 = new User(2, "b", 22); User u3 = new User(3, "c", 23); User u4 = new User(4, "d", 24); User u5 = new User(6, "e", 25); //1、将对象存入集合 List<User> list = Arrays.asList(u1, u2, u3, u4, u5); //2、通过stream流筛选数据 list.stream() .filter(user -> user.getId()%2==0) .filter(user -> user.getAge() > 23) .map(user -> { user.setName(user.getName().toUpperCase()); return user; }) .sorted((user1,user2)->user2.getName().compareTo(user2.getName())) .limit(1) .forEach(System.out::println);//方法引用 } } //输出结果:User(id=4, name=D, age=24)
十四、Fork/Join
Fork/Join 框架是 Java7 提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
Fork/Join适用于大数据量计算,小数据量完全没必要使用。
1、工作窃取
工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。
为什么使用工作窃取算法呢?假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如 A 线程负责处理 A 队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。
2、Fork/Join的使用
需要用到两个类:
-
ForkJoinTask:创建一个ForkJoin任务。因为它是一个抽象类,通常不直接使用,通常继承它的子类来完成。
- RecursiveAction:递归事件,用于没有返回值的任务。
- RecursiveTask:递归任务:用于有返回值的任务。
-
ForkJoinPool:ForkJoinTask通过ForkJoinPool来执行。任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。
举个例子:现需要计算一个超大的累加数(10_0000_0000)的和,你会怎么做?
- 办法一:用一个循环在一个线程内完成(效率低)
- 办法二:把计算的大任务拆分成多个小任务,并行执行。(效率高)
- 办法三:使用stream并行流(效率封顶!)
分析:使用Fork/Join框架首先考虑如何分割任务,设定一个阈值10000,如果我们要计算的数值超过阈值,那么就使用Fork/Join来分割任务计算,没有超过就正常计算。
我们来看如何使用Fork/Join对大数据进行并行求和:
import java.util.concurrent.RecursiveTask; /** * 1、设置阈值 * 2、ForkJoinTask创建子任务 * 3、ForkJoinPool执行任务 */ public class ForkJoinDemo extends RecursiveTask<Long> { //因为数值比较大所以使用Long类型 private static final int THRESHOLD = 10000;//设置阈值 private Long start; private Long end; public ForkJoinDemo(Long start, Long end) { this.start = start; this.end = end; } @Override protected Long compute() { Long sum = 0L; if ((end - start) <= THRESHOLD) { //如果任务足够小,我们就正常计算。 for (Long i = start; i <= end; i++) { sum += i; } } else { //如果任务大于阈值,就分裂子任务计算。 Long middle = (start + end) >> 1; //中间值 //创建分割任务,将一个任务拆分成两个子任务 ForkJoinDemo f1 = new ForkJoinDemo(start, middle); ForkJoinDemo f2 = new ForkJoinDemo(middle + 1, end); //压入线程队列,异步执行子任务 f1.fork(); f2.fork(); //等子任务执行完,得到其结果 Long j1 = f1.join(); Long j2 = f2.join(); //最后合并结果 sum = j1 + j2; } return sum; } }
import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.stream.LongStream; /** * 测试 */ public class Test { public static void main(String[] args) { //test1();//sum=500000000500000000,耗时=6412 //test2();//sum=500000000500000000,耗时=6101 test3();//sum=500000000500000000,耗时=126 //注意:这里使用Long包装类,jvm会帮我们自动装箱自动拆箱会消费时间 } //普通方法 public static void test1() { Long startTime = System.currentTimeMillis(); Long sum = 0L; for (Long i = 1L; i <= 10_0000_0000; i++) { sum += i; } Long endTime = System.currentTimeMillis(); System.out.println("sum=" + sum + ",耗时=" + (endTime - startTime)); } //使用ForkJoin的方法 public static void test2() { try { Long startTime = System.currentTimeMillis(); //创建ForkJoinPool ForkJoinPool forkJoinPool = new ForkJoinPool(); //创建任务 ForkJoinTask<Long> forkJoinTask = new ForkJoinDemo(1L, 10_0000_0000L); //提交任务 ForkJoinTask<Long> submit = forkJoinPool.submit(forkJoinTask); Long sum = submit.get(); Long endTime = System.currentTimeMillis(); System.out.println("sum=" + sum + ",耗时=" + (endTime - startTime)); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } //使用stream并行流的方法 public static void test3() { Long startTime = System.currentTimeMillis(); long sum = LongStream.rangeClosed(1L, 10_0000_0000L).parallel().reduce(0, Long::sum); Long endTime = System.currentTimeMillis(); System.out.println("sum=" + sum + ",耗时=" + (endTime - startTime)); } }
Fork/Join是一种基于“分治”的算法:通过分解任务,并行执行,最后合并结果得到最终结果。
十五、异步回调
异步回调:在发起一个异步任务的同时指定一个函数,在异步任务完成时会调用这个函数,这个函数就叫回调函数。
Future接口表示一个可能还没有完成的异步任务的结果,针对这个结果可以添加Callback以便在任务执行成功或失败后作出相应的操作。
使用 Future 获得异步执行结果时,要么调用阻塞方法get(),要么轮询看isDone()是否为true,这两种方法都不是很好,因为主线程也会被迫等待。
从Java 8开始引入了 CompletableFuture,它针对 Future 做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。
CompletableFuture 是 Future 的实现类。一个completableFuture对象代表着一个任务。
1、创建无返回值的异步任务
import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; public class Demo01{ public static void main(String[] args) throws ExecutionException, InterruptedException { //runAsync执行一个异步任务,没有返回值 CompletableFuture<Void> future = CompletableFuture.runAsync(()->{ try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); }); System.out.println("1111111"); future.get();//阻塞获取执行结果 System.out.println("2222222"); } } /* 输出结果: 1111111 ForkJoinPool.commonPool-worker-9 2222222 */
2、创建有返回值的异步任务
import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; public class Demo01{ public static void main(String[] args) throws ExecutionException, InterruptedException { //supplyAsync执行一个异步任务,有返回值 CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(()-> { System.out.println(Thread.currentThread().getName()); //int i = 1 / 0;//异常则回调exceptionally return 1024; }); //success回调函数 CompletableFuture<Integer> whenComplete = completableFuture.whenComplete((t, u) -> { System.out.println("t=>" + t);//正常返回值 System.out.println("u=>" + u);//抛出异常的错误信息 }).exceptionally((e) -> { //error回调函数 System.out.println(e.getMessage()); //异步任务执行异常则返回500 return 500; }); System.out.println(whenComplete.get());//获取异步任务执行结果 } } /* 正常输出结果: ForkJoinPool.commonPool-worker-9 t=>1024 u=>null 1024 异常输出结果: ForkJoinPool.commonPool-worker-9 t=>null u=>java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero java.lang.ArithmeticException: / by zero 500 */
十六、JMM
JMM就是Java内存模型(java memory model)。因为在不同的硬件生产商和不同的操作系统下,内存的访问有一定的差异,所以会造成相同的代码运行在不同的系统上会出现各种问题。所以java内存模型(JMM)屏蔽掉各种硬件和操作系统的内存访问差异,以实现让java程序在各种平台下都能达到一致的并发效果。
JMM也是属于JVM的一部分,只是JMM是一种抽象的概念,是一组规则,并不实际存在。JMM规定了内存主要划分为主内存和工作内存两种。
JVM在设计时候考虑到,如果Java线程每次读取和写入变量都直接操作主内存,对性能影响比较大,所以每条线程拥有各自的工作内存,从主内存中拷贝一份变量放入工作内存中,线程对变量的读和写都是在自己的工作内存中操作,执行完相关的指令和操作,最后写回主内存,而不是直接操作主内存中的变量。
但这样会引发一个问题:当某个线程修改了自己工作内存中的变量,对其他的线程是不可见的,就会导致线程不安全问题。
因此JMM制定了一套标准来保证开发者在编写多线程程序的时候,能够控制什么时候内存会被同步给其他线程。
1、内存交互操作
内存交互操作有8种,虚拟机实现必须保证每一个操作都是原子的,不可在分的(对于double和long类型的变量来说,load、store、read和write操作在某些平台上允许例外)
- lock(锁定):作用于主内存的变量,把一个变量标识为线程独占状态
- unlock(解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定
- read(读取):作用于主内存变量,它把一个变量的值从主内存传输到线程的工作内存中,以便随后的load动作使用
- load(载入):作用于工作内存的变量,它把read操作从主存中变量放入工作内存中
- use(使用):作用于工作内存中的变量,它把工作内存中的变量传输给执行引擎,每当虚拟机遇到一个需要使用到变量的值,就会使用到这个指令
- assign(赋值):作用于工作内存中的变量,它把一个从执行引擎中接受到的值放入工作内存的变量副本中
- store(存储):作用于主内存中的变量,它把一个从工作内存中一个变量的值传送到主内存中,以便后续的write使用
- write(写入):作用于主内存中的变量,它把store操作从工作内存中得到的变量的值放入主内存的变量中
JMM对这八种指令的使用,制定了如下规则:
- 不允许read和load、store和write操作之一单独出现。即使用了read必须load,使用了store必须write
- 不允许线程丢弃他最近的assign操作,即工作变量的数据改变了之后,必须告知主存
- 不允许一个线程将没有assign的数据从工作内存同步回主内存
- 一个新的变量必须在主内存中诞生,不允许工作内存直接使用一个未被初始化的变量。就是怼变量实施use、store操作之前,必须经过assign和load操作
- 一个变量同一时间只有一个线程能对其进行lock。多次lock后,必须执行相同次数的unlock才能解锁
- 如果对一个变量进行lock操作,会清空所有工作内存中此变量的值,在执行引擎使用这个变量前,必须重新load或assign操作初始化变量的值
- 如果一个变量没有被lock,就不能对其进行unlock操作。也不能unlock一个被其他线程锁住的变量
- 对一个变量进行unlock操作之前,必须把此变量同步回主内存(执行store和write操作)
2、三大模型特征
原子性
一次操作或者多次操作,要么所有的操作全部都得到执行并且不会受到任何因素的干扰而中断,要么都不执行。
在 Java 中,可以借助synchronized 、各种 Lock 以及各种原子类实现原子性。
synchronized 和各种 Lock 可以保证任一时刻只有一个线程访问该代码块,因此可以保障原子性。各种原子类是利用 CAS (compare and swap) 操作(可能也会用到 volatile或者final关键字)来保证原子操作。
可见性
当一个线程对共享变量进行了修改,那么另外的线程都是立即可以看到修改后的最新值。
在 Java 中,可以借助synchronized 、volatile 以及各种 Lock 实现可见性。
-
volatile关键字要求被修改之后的变量要求立即更新到主内存,每次使用前从主内存处进行读取。
-
synchronized保证unlock之前必须先把变量刷新回主内存。
-
final修饰的字段在构造器中一旦完成初始化,并且构造器没有this逸出,那么其他线程就能看到final字段的值。
有序性
在本线程内观察,所有的操作都是有序的;而在一个线程内观察另一个线程,所有操作都是无序的。前半句指 as-if-serial 语义:线程内似表现为串行,后半句是指:“指令重排序现象”和“工作内存与主内存同步延迟现象”。处理器为了提高程序的运行效率,提高并行效率,可能会对代码进行优化。编译器认为,重排序后的代码执行效率更优。这样一来,代码的执行顺序就未必是编写代码时候的顺序了,在多线程的情况下就可能会出错。
Java 语言提供了 volatile 和 synchronized 两个关键字来保证线程之间操作的有序性。
-
volatile关键字是使用内存屏障达到禁止指令重排序,以保证有序性。
-
synchronized是一个线程lock之后,必须unlock后,其他线程才可以重新lock,使得被synchronized包住的代码块在多线程之间是串行执行的。
同步规定:
- 线程解锁前,必须把共享变量的值刷新回主内存。
- 线程加锁前,必须将主内存的最新值读取到自己的工作内存。
- 加锁和解锁是同一把锁。
十七、Volatile
volatile在java语言中是一个关键字,用于修饰变量。被volatile修饰的变量后,表示这个变量在不同线程中是共享,编译器与运行时都会注意到这个变量是共享的,因此不会对该变量进行重排序。
特点:1.保证了可见性 2.不保证原子性 3.禁止指令重排。
1、验证volatile保证可见性
public class TestVolatile { //标志位 static boolean flag= true; public static void main(String[] args) throws InterruptedException { //子线程 new Thread(()->{ while(flag){} }).start(); TimeUnit.SECONDS.sleep(2); //main线程 flag = false; System.out.println(flag);//false } }
以上的情况,按道理说程序应该是停止的。但事实并不是这样。程序输出false,程序死循环。
flag变量被拷贝到子线程和main线程的工作内存,子线程开始循环,此时的flag=true,此时的main线程修改flag=false,并写入了主内存,但是子线程还是在循环,并不知道其他线程修改了主存中的变量。
但是此时用volatile关键字修饰变量,main线程修改完flag变量之后,会立即将变量写回主存,并且告知子线程,子线程发现自己的变量失效后,会重新去主存中访问flag变量,此时flag=false,循环退出。
//加入volatile保证可见性。 volatile static boolean flag = true;
2、验证volatile不保证原子性
//以下几句代码能保证原子性吗? int i = 2; int j = i; i++; i = i + 1;
第一句是基本类型赋值操作,必定是原子性操作。
第二句先读取i的值,再赋值到j,两步操作,不能保证原子性。
第三和第四句其实是等效的,先读取i的值,再+1,最后赋值到i,三步操作了,不能保证原子性。
JMM只能保证基本的原子性,如果要保证一个代码块的原子性,提供了monitorenter 和 moniterexit 两个字节码指令,也就是 synchronized 关键字。因此在 synchronized 块之间的操作都是原子性的。
public class TestVolatile2 { //volatile不保证多线程条件下的原子性 private volatile static int num = 0; public static void add(){ num++;//不是原子性操作 } public static void main(String[] args) { //理论应该是num = 20000 for (int i = 1; i <= 20; i++) { new Thread(()->{ for (int j = 0; j < 1000; j++) { add(); } }).start(); } //保证线程全部执行完 while(Thread.activeCount()>2){ //如果存活的线程大于2,main gc,那就礼让 Thread.yield(); } System.out.println(Thread.currentThread().getName()+"=>"+num); } }
我们发现所有的线程跑完了num的值还是没有20000,我们之前学了锁,给add()方法加上synchronized是不是就可以保证原子性了,确实是这样的。但是我们这里是要验证volatile关键字是否保证原子性。我们给num关键字加上volatile关键字,num的值依旧到不了20000,证明了volatile不保证原子性。
这时候面试官就为难你了.
如果不加lock和synchronized怎么保证原子性?
num++看上去是一行代码,其实底层拆分了三行,第一步读取num的值,第二步num+1,第三步给num赋值。
JUC剩下最后一个java.util.concurrent.atomic包没讲了,这里直接派上用场。
使用java.util.concurrent.atomic包下的原子类解决问题
import java.util.concurrent.atomic.AtomicInteger; public class TestVolatile2 { //原子类的Integer private static AtomicInteger num = new AtomicInteger(); public synchronized static void add(){ num.getAndIncrement();//atomicInteger+1的方法 } public static void main(String[] args) { //理论应该是num = 20000 for (int i = 1; i <= 20; i++) { new Thread(()->{ for (int j = 0; j < 1000; j++) { add(); } }).start(); } //保证线程全部执行完 while(Thread.activeCount()>2){ //如果存活的线程大于2,main gc,那就礼让 Thread.yield(); } System.out.println(Thread.currentThread().getName()+"=>"+num);//20000 } }
看方法源码,Unsafe类里面全是native方法,这些类的底层都和操作系统挂钩!它的num++,是在内存中修改值,底层是CAS保证的原子性。
3、禁止指令重排详解
为了提升执行速度/性能,计算机在执行程序代码的时候,会对指令进行重排序。
简单来说就是系统在执行代码的时候并不一定是按照你写的代码的顺序依次执行。
我们去执行程序会经历这样的过程。
源代码 ---> 编译器优化重排 --> 指令并行重排 --> 内存系统重排 --> 执行
int x = 1; //1行 int y = 2; //2行 x = x + 2; //3行 y = x * x; //4行 //我们期望的执行顺序是 1234,可能执行的时候是 2134 1324 //可不可能是4123? 不可能的嗷 //处理器在进行指令重排:会考虑,数据之间的依赖性。
指令重排序在单线程是没有问题的,不会影响执行结果,而且还提高了性能。但是在多线程的环境下就不能保证一定不会影响执行结果了。
现在 x y a b 默认都是0
线程A | 线程B |
---|---|
x = a | y = b |
b = 1 | a = 2 |
正常执行结果:x = 0,y = 0,指令重排后
线程A | 线程B |
---|---|
b = 1 | a = 2 |
x = a | y = b |
指令重排后执行结果:x = 2,y = 1
volatile可以禁止指令重排
怎么实现的呢?
指令并行重排和内存系统重排都属于是处理器级别的指令重排序。对于处理器,通过插入内存屏障的方式来禁止特定类型的处理器重排序。
内存屏障(Memory Barrier,或有时叫做内存栅栏,Memory Fence)是一种 CPU 指令,用来禁止处理器指令发生重排序(像屏障一样),从而保障指令执行的有序性。另外,为了达到屏障的效果,它也会使处理器写入、读取值之前,将主内存的值写入高速缓存,清空无效队列,从而保障变量的可见性。
volatile禁止重排序原理
在每个volatile读操作后插入LoadLoad屏障,在读操作后插入LoadStore屏障。
在每个volatile写操作的前面插入一个StoreStore屏障,后面插入一个SotreLoad屏障。
面试官:那么你知道在哪里用这个内存屏障用得最多呢?单例模式
十八、彻底玩转单例模式
单例模式(Singleton Pattern)是 Java 中最简单的设计模式之一。这种类型的设计模式属于创建型模式,它提供了一种创建对象的最佳方式。
这种模式涉及到一个单一的类,该类负责创建自己的对象,同时确保只有单个对象被创建。这个类提供了一种访问其唯一的对象的方式,可以直接访问,不需要实例化该类的对象。
注意:
1、单例类只能有一个实例。
2、单例类必须自己创建自己的唯一实例。
3、单例类必须给所有其他对象提供这一实例。
意图:保证一个类仅有一个实例,并提供一个访问它的全局访问点。
主要解决:一个全局使用的类频繁地创建与销毁。
何时使用:当您想控制实例数目,节省系统资源的时候。
如何解决:判断系统是否已经有这个单例,如果有则返回,如果没有则创建。
关键代码:构造函数是私有的。
优点:
- 在内存里只有一个实例,减少了内存的开销,尤其是频繁的创建和销毁实例(比如管理学院首页页面缓存)。
- 避免对资源的多重占用(比如写文件操作)
缺点:没有接口,不能继承,与单一职责原则冲突,一个类应该只关心内部逻辑,而不关心外面怎么样来实例化。
注意事项:getInstance() 方法中需要使用同步锁 synchronized (Singleton.class) 防止多线程同时进入造成 instance 被多次实例化。
/** * 单例模式 */ public class SingleObject { //创建 SingleObject 的一个对象 private static SingleObject instance = new SingleObject(); //私有化构造函数,这样该类就不会被实例化 private SingleObject(){} //获取唯一可用的对象 public static SingleObject getInstance(){ return instance; } public void showMessage(){ System.out.println("Hello,World!"); } }
public class SingletonPatternDemo { public static void main(String[] args) { //不合法的构造函数 //编译时错误:构造函数 SingleObject() 是不可见的 //SingleObject object = new SingleObject(); //获取唯一可用的对象 SingleObject object = SingleObject.getInstance(); //显示消息 object.showMessage(); //Hello World! } }
1、单例模式的几种实现方式
单例模式的实现有多种方式,如下所示:
1. 饿汉式(可用)
优点:没有加锁,执行效率会提高。
缺点:在类装载的时候就完成实例化,没有达到Lazy Loading的效果。如果从始至终从未使用过这个实例,则会造成内存的浪费。
public class Hungry { private final static Hungry INSTANCE = new Hungry(); private Hungry(){} public static Hungry getInstance(){ return INSTANCE; } }
2. 懒汉式(不可用)
优点:这种方式Lazy Loading的效果很明显
缺点:因为没有加锁 synchronized,所以严格意义上它并不算单例模式。不要求线程安全,在多线程不能正常工作。
public class LazyMan { private static LazyMan lazyMan; private LazyMan(){} public static LazyMan getInstance(){ if(lazyMan == null){ lazyMan = new LazyMan(); } return lazyMan; } }
单线程下确实是单例的,看看多线程。
/** * 多线程测试懒汉式的单例模式 */ public class LazyMan { private static LazyMan lazyMan; private LazyMan(){ //打印线程 System.out.println(Thread.currentThread().getName()); } public static LazyMan getInstance(){ if(lazyMan == null){ lazyMan = new LazyMan(); } return lazyMan; } public static void main(String[] args) { for (int i = 0; i < 10; i++) { new Thread(()->{ LazyMan.getInstance(); }).start(); } } }
//多线程下出现多个实例 Thread-1 Thread-2 Thread-4 Thread-0 Process finished with exit code 0
解决办法第一个想到就是给方法加锁。但是给方法加synchronized锁,会导致很大的性能开销,并且加锁其实只需要在第一次初始化的时候用到,之后的调用都没必要再进行加锁。
所以我们使用synchronized块锁类
public class LazyMan { private static LazyMan lazyMan; private LazyMan(){ System.out.println(Thread.currentThread().getName()); } public static LazyMan getInstance(){ if(lazyMan == null){ synchronized (LazyMan.class){ lazyMan = new LazyMan(); } } return lazyMan; } public static void main(String[] args) { for (int i = 0; i < 10; i++) { new Thread(()->{ LazyMan.getInstance(); }).start(); } } }
这种情况还是会出现多实例,如果两个线程同时进入了if判断,其中A线程抢到了锁,B线程就在这里等待,等A线程执行锁块里代码完成之后就释放锁,B线程就拿锁,继续执行锁块代码,从而导致创建了多个实例。继续优化
双检锁/双重校验锁(DCL,即 double-checked locking)(推荐)
优点:这种方式采用双锁机制,安全且在多线程情况下能保持高性能。
public class LazyMan { //加入volatile禁止指令重排 private volatile static LazyMan lazyMan; private LazyMan(){ System.out.println(Thread.currentThread().getName()); } public static LazyMan getInstance(){ if(lazyMan == null){ synchronized (LazyMan.class){ if(lazyMan == null){ lazyMan = new LazyMan(); } } } return lazyMan; } public static void main(String[] args) { for (int i = 0; i < 10; i++) { new Thread(()->{ LazyMan.getInstance(); }).start(); } } }
为什么要加入volatile关键字:因为 “lazyMan = new LazyMan” 不是原子性操作,底层会经过三步操作:
- 为对象分配内存空间。
- 执行构造方法初始化对象。
- 将这个对象指向分配的内存空间。
由于编译器为了性能原因,可能会将第二步和第三步操作交换顺序,也就是指令重排。顺序就变成这样了:
- 为对象分配内存空间。
- 将这个对象指向分配的内存空间。
- 执行构造方法初始化对象。
如果A线程执行到第二步操作的时候,此时进来一个B线程,B线程进入if判断的时候检查该对象不是null值,直接return返回去,而此时的对象还未完成初始化。所以B线程就会访问到一个未初始化的对象。加入volatile解决问题。
3.登记式/静态内部类(推荐)
这个时候就炫技,我静态内部类玩得挺6,我用静态内部类来实现单例模式。
优点:避免了线程不安全,延迟加载,效率高。
这种方式跟饿汉式方式采用的机制类似,但又有不同。两者都是采用了类装载的机制来保证初始化实例时只有一个线程。不同的地方在饿汉式方式是只要Singleton类被装载就会实例化,没有Lazy-Loading的作用,而静态内部类方式在Singleton类被装载时并不会立即实例化,而是在需要实例化时,调用getInstance方法,才会装载SingletonInstance类,从而完成Singleton的实例化。
类的静态属性只会在第一次加载类的时候初始化,所以在这里,JVM帮助我们保证了线程的安全性,在类进行初始化时,别的线程是无法进入的。
public class Holder { private Holder(){} //调用getInstance()方法的时候才会实例化 public static Holder getInstance(){ return InnerClass.INSTANCE; } //静态内部类去实例化 public static class InnerClass{ private static final Holder INSTANCE = new Holder(); } }
但是这几种单例模式都是不安全的,因为我们之前学过一种非常牛逼的技术叫“反射”,使用反射可用破解这几种单例模式。
import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; /** * 反射破解懒汉式单例模式 */ public class Singleton { private volatile static Singleton instance; private Singleton(){} public static Singleton getInstance(){ if(instance == null){ synchronized (Singleton.class){ if(instance == null){ instance = new Singleton(); } } } return instance; } public static void main(String[] args) throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException { //正常方式 Singleton instance1 = Singleton.getInstance(); //反射破坏 Constructor<Singleton> constructor = Singleton.class.getDeclaredConstructor(null);//获取指定形参的构造器 constructor.setAccessible(true);//关闭安全检测,也就是忽略private Singleton instance2 = constructor.newInstance();//反射创建实例 System.out.println(instance1); System.out.println(instance2); } }
com.hnguigu.single.Singleton@74a14482 com.hnguigu.single.Singleton@1540e19d Process finished with exit code 0
我们发现确实通过反射破坏了单例模式,怎么解决呢?因为反射是走的无参构造器,于是我们可用在无参构造器加锁判断。
public class Singleton { private volatile static Singleton instance; private Singleton(){ synchronized (Singleton.class){ if(instance!=null){ throw new RuntimeException("不要试图使用反射破坏异常"); } } } public static Singleton getInstance(){ if(instance == null){ synchronized (Singleton.class){ if(instance == null){ instance = new Singleton(); } } } return instance; } public static void main(String[] args) throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException { //正常方式 Singleton instance1 = Singleton.getInstance(); //反射破坏 Constructor<Singleton> constructor = Singleton.class.getDeclaredConstructor(null);//获取指定形参的构造器 constructor.setAccessible(true);//关闭安全检测,也就是忽略private Singleton instance2 = constructor.newInstance();//反射创建实例 System.out.println(instance1); System.out.println(instance2); } }
确实报异常了,刚刚是有一个对象是以正常方式创建的,那如果我两个对象都是反射方式创建的呢?
public class Singleton { private volatile static Singleton instance; private Singleton(){ synchronized (Singleton.class){ if(instance!=null){ throw new RuntimeException("不要试图使用反射破坏异常"); } } } public static Singleton getInstance(){ if(instance == null){ synchronized (Singleton.class){ if(instance == null){ instance = new Singleton(); } } } return instance; } public static void main(String[] args) throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException { //反射破坏 Constructor<Singleton> constructor = Singleton.class.getDeclaredConstructor(null);//获取指定形参的构造器 constructor.setAccessible(true);//关闭安全检测,也就是忽略private Singleton instance1 = constructor.newInstance();//反射创建实例 Singleton instance2 = constructor.newInstance();//反射创建实例 System.out.println(instance1); System.out.println(instance2); } }
com.hnguigu.single.Singleton@74a14482 com.hnguigu.single.Singleton@1540e19d Process finished with exit code 0
还是遭到破坏了,我们可用通过红绿灯来防止破坏,设置一个标志位来判断
public class Singleton { private volatile static Singleton instance; //标志位 private static boolean flag = false; private Singleton(){ synchronized (Singleton.class){ if(flag == false){ flag = true; }else{ throw new RuntimeException("不要试图使用反射破坏异常"); } } } public static Singleton getInstance(){ if(instance == null){ synchronized (Singleton.class){ if(instance == null){ instance = new Singleton(); } } } return instance; } public static void main(String[] args) throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException { //正常方式 //Singleton instance1 = Singleton.getInstance(); //反射破坏 Constructor<Singleton> constructor = Singleton.class.getDeclaredConstructor(null);//获取指定形参的构造器 constructor.setAccessible(true);//关闭安全检测,也就是忽略private Singleton instance2 = constructor.newInstance();//反射创建实例 Singleton instance1 = constructor.newInstance();//反射创建实例 System.out.println(instance1); System.out.println(instance2); } }
如果不通过反编译的情况下,是找不到这个标志位的,我们还可以对标志位进行加密处理,使其单例变得更安全。
但是再牛逼的加密方式,也会有解密办法,假设我们拿到了标志位,是可以通过反射来转换标志位的。
但是 “道高一尺魔高一丈”,正义终将战胜邪恶!我们查看反射创建实例源码
有一个 “Cannot reflectively create enum objects” 异常,意思就是无法以反射方式创建枚举对象,条件是必须是枚举类型。
4、枚举(推荐)
借助JDK1.5中添加的枚举来实现单例模式。它不仅能避免多线程同步问题,而且还自动支持序列化机制,防止反序列化重新创建新的对象,绝对防止多次实例化。
public enum Singleton { INSTANCE; public void whateverMethod() { } }
我们验证枚举类对象不能被反射创建,查看枚举类里的构造器
public enum EnumSingle { INSTANCE; public EnumSingle getInstance(){ return INSTANCE; } } class Test{ public static void main(String[] args) throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException { EnumSingle instance1 = EnumSingle.INSTANCE; Constructor<EnumSingle> constructor = EnumSingle.class.getDeclaredConstructor(String.class,int.class); EnumSingle instance2 = constructor.newInstance(); System.out.println(instance1); System.out.println(instance2); } }
Exception in thread "main" java.lang.IllegalArgumentException: Cannot reflectively create enum objects at java.lang.reflect.Constructor.newInstance(Constructor.java:417) at com.hnguigu.single.Test.main(EnumSingle.java:25) Process finished with exit code 1
十九、深入理解CAS
CAS全称叫“Compare-And-Swap”,直译就是 “比较并交换”。它是一条CPU的原子指令(并发原语),其作用就是让CPU先比较两个值是否相等,如果相等则更新为新值,这个过程是原子的。
CAS并发原语体现在JAVA语言中就是sun.misc.Unsafe类中的各个方法。调用UnSafe类中的CAS方法,JVM会帮我们实现出CAS汇编指令。这是一种完全依赖于硬件的功能。
1、使用
CAS操作是原子性的,所以多线程并发使用CAS更新数据时,可以不使用锁。JDK中大量使用了CAS来更新数据而防止加锁(synchronized 重量级锁)来保持原子更新。
我们来看下AtomicInteger类的核心源码:
public class AtomicInteger extends Number implements java.io.Serializable { private static final long serialVersionUID = 6214790243416807050L; // 设置为使用Unsafe.compareAndSwapInt进行更新 private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long valueOffset; /* Unsafe类负责执行CAS并发原语,由JVM转换为汇编。 Java无法操作内存,C++可以操作内存,Java可以调用C++(native方法)。 Unsafe类就相当于Java的后门,可以通过这个类来操作内存 */ static { try { valueOffset = unsafe.objectFieldOffset (AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } private volatile int value; public AtomicInteger(int initialValue) { value = initialValue; } }
public class CASDemo { public static void main(String[] args) { AtomicInteger atomicInteger = new AtomicInteger(2020); //boolean compareAndSet(int expect, int update) //expect:期望,update:更新 //如果实际的值和我期望的值是相同的,那么就更新,否则就不更新 System.out.println(atomicInteger.compareAndSet(2020, 2021));//true System.out.println(atomicInteger.get());//2021 System.out.println(atomicInteger.compareAndSet(1999, 2021));//false System.out.println(atomicInteger.get());//2021 atomicInteger.getAndIncrement();//++操作 } }
以getAndIncrement()的源码为例:
public final int getAndIncrement() { return unsafe.getAndAddInt(this, valueOffset, 1); } //参数:var1当前对象,var2该变量值在内存中的偏移地址,var4需要增加的值大小 public final int getAndAddInt(Object var1, long var2, int var4) { int var5;//代表要交换的值。 do { //getIntVolatile获取内存地址中的值 var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; }
compareAndSwapInt该方法中使用了自旋锁以保证其原子性。
假设主内存值为 v 等于10,此时有 T1、T2两个线程进入到该方法,根据 Java 内存模型(JMM)我们可以知道,线程 T1 和线程 T2 都会将主内存的值10拷贝到自己的工作内存。
1、当线程 T1 和线程 T2 都通过getIntVolatile(var1, var2)赋值给了变量 var5 之后,线程 T1 被挂起;
2、线程 T2 调用方法compareAndSwapInt,因为当中的期望值 var5 和当前主内存值相同,比较成功,更新当前内存的值为 11,返回 true,退出循环;
3、线程 T1 被唤醒,在执行compareAndSwapInt方法的时候,由于当前内存的值已经为11,和 工作内存 var5 的值10不同了,所以比较不成功,返回 false,继续执行循环;
4、线程 T1 重新从主内存获取当前的最新值11赋值给 var5;
5、线程 T1 继续进行比较,若此时没有其他线程对主内存的进行修改,比较更新成功 ,退出循环;否则继续执行步骤4。
虽然CAS没有加锁保证了一致性,并发性有所提高 ,但是也产生了一系列的问题,比如循环时间长开销大、只能保证一个共享变量的原子操作、会产生ABA问题。
2、ABA问题
使用 CAS 会产生 ABA 问题,这是因为 CAS 算法是在某一时刻取出内存值然后在当前的时刻进行比较,中间存在一个时间差,在这个时间差里就可能会产生 ABA 问题。
ABA问题的过程就是:现有 T1 T2 线程从内存中获取值为 A,T2将值改成了B,然后又改回成A,T2退出。T1进行操作,使用预期值和内存中的值比较,发现都是A,修改成功然后退出。CAS进行检查时则会发现它的值没有发生变化,但是实际上却变化了。我们称这种现象为ABA问题。
解决思路:可以添加版本号,每次值更新的时候将版本号+1。
对应的思想就是乐观锁。
3、原子引用
带版本号的原子操作。
从Java 1.5开始,java.util.concurrent.atomic包里提供了一个类AtomicStampedReference。其中的compareAndSet方法如下
public boolean compareAndSet(V expectedReference,//期望被修改的值 V newReference,//新的值 int expectedStamp,//期望被修改的版本号 int newStamp) {//新的版本号 Pair<V> current = pair; return expectedReference == current.reference && expectedStamp == current.stamp && ((newReference == current.reference && newStamp == current.stamp) || casPair(current, Pair.of(newReference, newStamp))); }
import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicStampedReference; /** * ABA问题 */ public class ABADemo { /* * AtomicStampedReference:注意如果泛型是基本数据类型的包装类,注意对象的引用问题 * 正常业务操作下,这里面比较的是一个个对象 */ private static AtomicStampedReference<Integer> stampedReference = new AtomicStampedReference<>(100, 1); public static void main(String[] args) { new Thread(() -> { int stamp = stampedReference.getStamp();//版本号 System.out.println("当前线程的名字:" + Thread.currentThread().getName() + ",版本号:" + stamp + ",值:" + stampedReference.getReference()); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } stampedReference.compareAndSet(100, 101, stampedReference.getStamp(), stampedReference.getStamp() + 1); System.out.println("当前线程的名字:" + Thread.currentThread().getName() + ",版本号:" + stampedReference.getStamp() + ",值:" + stampedReference.getReference()); stampedReference.compareAndSet(101, 100, stampedReference.getStamp(), stampedReference.getStamp() + 1); System.out.println("当前线程的名字:" + Thread.currentThread().getName() + ",版本号:" + stampedReference.getStamp() + ",值:" + stampedReference.getReference()); System.out.println("T1完成CAS操作~"); }, "T1").start(); new Thread(() -> { int stamp = stampedReference.getStamp();//版本号 System.out.println("当前线程的名字:" + Thread.currentThread().getName() + ",版本号:" + stamp + ",值:" + stampedReference.getReference()); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } //版本号对不上,修改不成功! boolean flag = stampedReference.compareAndSet(100, 66, stamp, stamp + 1);//false System.out.println("当前线程的名字:" + Thread.currentThread().getName() + ",修改成功与否:" + flag + ",最新版本号:" + stampedReference.getStamp() + ",最新值:" + stampedReference.getReference()); }, "T2").start(); } }
当前线程的名字:T1,版本号:1,值:100 当前线程的名字:T2,版本号:1,值:100 当前线程的名字:T1,版本号:2,值:101 当前线程的名字:T1,版本号:3,值:100 T1完成CAS操作~ 当前线程的名字:T2,修改成功与否:false,最新版本号:3,最新值:100 Process finished with exit code 0
二十、彻底吃透各种锁
1、公平锁和非公平锁
这里前面讲过就不再赘述。
2、可重入锁
可重入锁:某个线程已经获得某个锁,还可以再次获得锁而不会出现死锁。
/** * 可重入锁的案例,synchronized */ public class WhatReentrantLock { public static void main(String[] args) { Phone phone = new Phone(); new Thread(()->{ phone.sendSms(); },"A").start(); new Thread(()->{ phone.sendSms(); },"B").start(); /* A=>sms A=>call B=>sms B=>call */ } } class Phone{ public synchronized void sendSms(){ System.out.println(Thread.currentThread().getName()+"=>"+"sms"); call(); } public synchronized void call(){ System.out.println(Thread.currentThread().getName()+"=>"+"call"); } }
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * 可重入锁,ReentrantLock */ public class WhatReentrantLock2 { public static void main(String[] args) { Phone phone = new Phone(); new Thread(()->{ phone.sendSms(); },"A").start(); new Thread(()->{ phone.sendSms(); },"B").start(); /* A=>sms A=>call B=>sms B=>call */ } } class Phone2{ Lock lock = new ReentrantLock(); public void sendSms(){ //这是两把锁,两把钥匙 //lock锁必须配对,否则就会死锁 lock.lock(); try { System.out.println(Thread.currentThread().getName()+"=>"+"sms"); call();//这里也是一把锁 } catch (Exception e) { e.printStackTrace(); }finally { lock.unlock(); } } public void call(){ lock.lock(); try { System.out.println(Thread.currentThread().getName()+"=>"+"call"); } catch (Exception e) { e.printStackTrace(); }finally { lock.unlock(); } } }
lock锁必须配对,相当于lock和 unlock 必须数量相同;
3、自旋锁
由于多线程的核心是CPU的时间分片,所以同一时刻只能有一个线程获取到锁。那么没有获取到锁的线程应该怎么办?
通常有两种处理方式:一种是没有获取到锁的线程就一直循环等待判断该资源是否已经释放锁,这种锁叫做自旋锁,它不用将线程阻塞起来(NON-BLOCKING);还有一种处理方式就是把自己阻塞起来,等待重新调度请求,这种叫做互斥锁。
自旋锁:当一个线程尝试去获取某一把锁的时候,如果这个锁此时已经被别人获取(占用),该线程将会等待,间隔一段时间后会再次尝试获取。这种采用循环加锁 -> 等待的机制被称为自旋锁(spinlock)。
- 优点:如果持有锁的线程能在短时间内释放锁资源,那么避免了用户进程和内核切换的消耗。
- 缺点:如果持有锁的线程长时间占用锁执行同步块,其他线程就一直占着CPU资源不进行释放,进而会影响整体系统的性能。
解决办法:可以给自旋锁设定一个自旋时间,等时间一到立即释放自旋锁。
自己创建一个自旋锁
import java.util.concurrent.atomic.AtomicReference; /** * 自旋锁 */ public class SpinLock { private AtomicReference<Thread> atomicReference = new AtomicReference<>(); //加锁 public void myLock(){ Thread thread = Thread.currentThread(); //自旋锁 while(!atomicReference.compareAndSet(null,thread)){} System.out.println(thread.getName()+"=>lock"); } //解锁 public void myUnLock(){ Thread thread = Thread.currentThread(); if(!atomicReference.compareAndSet(thread,null)){ throw new RuntimeException("释放锁失败!"); } System.out.println(thread.getName()+"=>unlock"); } }
import java.util.concurrent.TimeUnit; /** * 测试自旋锁 */ public class TestSpinLock { public static void main(String[] args) throws InterruptedException { //ReentrantLock reentrantLock = new ReentrantLock(); //reentrantLock.lock(); //reentrantLock.unlock(); //自己写的锁,底层使用自旋锁,cas实现 SpinLock lock = new SpinLock(); new Thread(()->{ lock.myLock(); try { TimeUnit.SECONDS.sleep(3); } catch (Exception e) { e.printStackTrace(); }finally { lock.myUnLock(); } },"T1").start(); TimeUnit.SECONDS.sleep(1);//让t1先获得锁 new Thread(()->{ lock.myLock(); try { TimeUnit.SECONDS.sleep(1); } catch (Exception e) { e.printStackTrace(); }finally { lock.myUnLock(); } },"T2").start(); } }
T1=>lock T1=>unlock T2=>lock T2=>unlock //t1进来获得锁之后,t2进入自旋,等待t1释放锁后,t2退出自旋然后获得锁
4、死锁
多个线程各自占有一些共享资源,并且互相等待其他线程占有的资源才能运行,而导致两个或多个线程在等待对方释放锁资源,都停止执行的情形,某一个代码块同时拥有两个以上对象的锁时,就可能会发生“死锁”的问题。
import java.util.concurrent.TimeUnit; public class DeadLockDemo { public static void main(String[] args) { Makeup makeup1 = new Makeup(0, "灰姑娘"); Makeup makeup2 = new Makeup(1, "白雪公主"); makeup1.start(); makeup2.start(); //最终结果:程序僵持运行着 } } //口红 class Lipstick{ String name = "迪奥口红"; } //镜子 class Mirror{ String name = "魔镜"; } //化妆 class Makeup extends Thread{ //使用static保证只有一份 static Lipstick lipstick = new Lipstick(); static Mirror mirror = new Mirror(); int choice;//选择 String girlName;//选择化妆的人 Makeup(int choice,String girlName){ this.choice = choice; this.girlName = girlName; } @Override public void run() { try { makeup(); } catch (InterruptedException e) { e.printStackTrace(); } } //化妆 private void makeup() throws InterruptedException { if(choice==0){ synchronized (lipstick){//获得口红的锁 System.out.println(this.girlName + "--->获得" + lipstick.name); TimeUnit.SECONDS.sleep(1); synchronized (mirror){//一秒钟之后想要镜子的锁 System.out.println(this.girlName + "--->获得" + mirror.name); } } }else{ synchronized (mirror){//获得镜子的锁 System.out.println(this.girlName + "--->获得" + mirror.name); TimeUnit.SECONDS.sleep(2); synchronized (lipstick){//两秒钟之后想要口红的锁 System.out.println(this.girlName + "--->获得" + lipstick.name); } } } } }
灰姑娘拿着口红的锁不释放,随后一秒钟后又要魔镜的锁,白雪公主拿着魔镜的锁不释放,两秒钟后又要口红的锁,双方都不释放已经使用完了的锁资源,僵持形成死锁。
产生死锁的四个必要条件:
- 互斥条件:一个资源每次只能被一个进程使用。
- 请求与保持条件:一个进程因请求资源而阻塞时,对以获得的资源保持不放。
- 不剥夺条件:进程已获得的资源,在未使用完毕之前,不能被强行抢走。
- 循环等待条件:若干进程之间形成一种头尾相接的循环等待资源关系。
上面就是形成死锁的必要条件,只需要解决其中任意一个或者多个条件就可以避免死锁的发生。
死锁排查
jps进程状态工具 jps.exe 工具是 jdk 自带的,在 %JAVA_HOME%/bin 目录下。
第一步:打开idea提供terminal终端命令行,使用jps -l
查看进程
第二步:使用jstack 进程号
查看堆栈信息
一般情况信息在最后面
好文要顶!