JUC

Java并发编程

概述

JUC是java.util.concurrent工具包的简称,是处理多线程的工具包

wait方法和sleep方法区别

  • wait是Object的方法,任何对象实例都可以调用。sleep是Thread的静态方法
  • sleep不会释放锁,它也不需要占用锁。wait会释放锁,但调用它的前提是当前线程占有锁(即代码要在synchronized中)
  • 它们都会被interrupted方法中断

管程

Monitor监视器,是一种同步机制,保证在同一时间,只有一个线程访问被保护的数据或者代码

jvm同步基于进入和退出,使用管程对象实现的

用户线程和守护线程

  • 用户线程:自定义线程,主线程结束了,用户线程还存活,jvm存活
  • 守护线程:比如垃圾回收,没有用户线程了,都是守护线程,jvm结束
1
2
3
4
5
6
7
8
9
10
11
Thread thread=new Thread(()->{
// Thread.currentThread().isDaemon()判断当前线程是否是守护线程
System.out.println(Thread.currentThread().getName()+":::"+Thread.currentThread().isDaemon());
while(true){

}
});
// 设置线程为守护线程,如果主线程结束,守护线程也结束,jvm结束
thread.setDaemon(true);
thread.start();
System.out.println(Thread.currentThread().getName()+"::::主线程结束");

Lock接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class Ticket {
// synchronized实现卖票操作
private int number = 30;

public synchronized void sale() {
if (number > 0) {
System.out.println(Thread.currentThread().getName() + "买到了::" + (number--) + "票,剩余:" + number);
}
}
}

class LTicket {
// Lock实现卖票操作
private int number = 30;

// 创建可重入锁
public final ReentrantLock lock = new ReentrantLock();

public void sale() {
lock.lock();
try {
if (number > 0) {
System.out.println(Thread.currentThread().getName() + "买到了::" + (number--) + " 剩余:" + number);
}
} finally {
lock.unlock();
}
}
}

线程间通信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class T {
private int number = 0;

// synchronized锁
public synchronized void incr() throws InterruptedException {
while (number != 0) { // 必须使用while进行判断,不然会出现虚假唤醒的情况
this.wait(); // 等待
}
number++; // +1
System.out.println(Thread.currentThread().getName() + " :: " + number);
this.notifyAll(); // 通知其他线程
}

public `synchronized`

void decr() throws InterruptedException {
while (number != 1) { // 必须使用while进行判断,不然会出现虚假唤醒的情况
this.wait(); // 等待
}
number--; // -1
System.out.println(Thread.currentThread().getName() + " :: " + number);
this.notifyAll(); // 通知其他线程
}
}

class T2 {
private int number = 0;

// lock锁
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();

public void incr() throws InterruptedException {
lock.lock();
try {
while (number != 0) {
condition.await();
}
number++; // +1
System.out.println(Thread.currentThread().getName() + " :: " + number);
condition.signalAll();
} finally {
lock.unlock();
}
}

public void decr() throws InterruptedException {
lock.lock();
try {
while (number != 1) {
condition.await();
}
number--; // -1
System.out.println(Thread.currentThread().getName() + " :: " + number);
condition.signalAll();
} finally {
lock.unlock();
}
}
}

线程间定制化通信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
class Ticket {
private int flag = 1;

// lock锁
private final Lock lock = new ReentrantLock();
private final Condition c1 = lock.newCondition();
private final Condition c2 = lock.newCondition();
private final Condition c3 = lock.newCondition();

public void print5(int loop) throws InterruptedException {
lock.lock();
try {
while (flag != 1) {
c1.await(); // 等待
}
for (int i = 1; i <= 5; i++) {
System.out.println(Thread.currentThread().getName() + " :: " + i + " : 轮数:" + loop);
}
flag = 2; // 修改为2线程
c2.signalAll(); // 2线程取消等待
} finally {
lock.unlock();
}
}

public void print10(int loop) throws InterruptedException {
lock.lock();
try {
while (flag != 2) {
c2.await();
}
for (int i = 1; i <= 10; i++) {
System.out.println(Thread.currentThread().getName() + " :: " + i + " : 轮数:" + loop);
}
flag = 3;
c3.signalAll();
} finally {
lock.unlock();
}
}

public void print15(int loop) throws InterruptedException {
lock.lock();
try {
while (flag != 3) {
c3.await();
}
for (int i = 1; i <= 15; i++) {
System.out.println(Thread.currentThread().getName() + " :: " + i + " : 轮数:" + loop);
}
flag = 1;
c1.signalAll();
} finally {
lock.unlock();
}
}
}

public class t1 {
public static void main(String[] args) {
Ticket ticket = new Ticket();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
ticket.print5(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "AA").start();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
ticket.print10(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "BB").start();

new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
ticket.print15(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "CC").start();
}
}

集合的线程安全

ArrayList和CopyOnWriteArrayList

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 创建ArrayList,会出现异常
// List<String> list = new ArrayList<>();

// Vector
// List<String> list = new Vector<>();

// Collections方法
// List<String> list = Collections.synchronizedList(new ArrayList<>());

// 推荐使用 java.util包下的
List<String> list=new CopyOnWriteArrayList<>();

for(int i=0;i< 30;i++){
new Thread(()->{
list.add(UUID.randomUUID().toString().substring(0,8));
System.out.println(list);
},String.valueOf(i)).start();
}

HashSet和CopyOnWriteArraySet

1
2
3
4
5
6
7
8
9
10
11
12
// 出现异常
//Set<String> set = new HashSet<>()

// 推荐使用 java.util包下的
Set<String> set=new CopyOnWriteArraySet<>();

for(int i=0;i< 30;i++){
new Thread(()->{
set.add(UUID.randomUUID().toString().substring(0,8));
System.out.println(set);
},String.valueOf(i)).start();
}

HashMap和ConcurrentHashMap

1
2
3
4
5
6
7
8
9
10
// 出现异常
// Map<String,Object> map = new HashMap<>();
// 推荐使用 java.util包下的
Map<String, Object> map=new ConcurrentHashMap<>();
for(int i=0;i< 30;i++){
new Thread(()->{
map.put(UUID.randomUUID().toString().substring(0,8),UUID.randomUUID().toString().substring(0,8));
System.out.println(map);
},String.valueOf(i)).start();
}

多线程锁

synchronized

synchronized实现同步的基础:Java中的每一个对象都可以作为锁

具体表现为:

  • 对于普通方法,锁的是当前实例对象
  • 对于静态同步方法,锁的是当前类的Class对象
  • 对于同步方法块,锁的是synchronized括号里配置的对象

公平锁和非公平锁

  • 公平锁效率高,会出现一个线程执行完所有代码
  • 非公平锁效率相对低,每个线程都有执行代码的机会
1
2
3
4
// 公平锁
private final Lock lock=new ReentrantLock(true);
// 非公平锁
private final Lock lock=new ReentrantLock();

可重入锁

获取到最外层的锁,内层所有代码都可以执行

synchronized

1
2
3
4
5
6
7
8
9
10
11
12
13
// 所有都可打印
Object o = new Object();
new Thread(() -> {
synchronized (o) {
System.out.println("外层");
synchronized (o) {
System.out.println("中层");
synchronized (o) {
System.out.println("内层");
}
}
}
}).start();
1
2
3
4
5
6
7
// 方法锁
public synchronized void add(){
add();
}

// 堆栈溢出异常
new Ticket().add();

lock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 打印 外层 内层
Lock lock = new ReentrantLock();
new Thread(() -> {
try {
lock.lock();
System.out.println("外层");
try {
lock.lock();
System.out.println("内层");
} finally {
lock.unlock();
}
} finally {
lock.unlock();
}
}).start();

死锁

两个或者两个以上进程在执行过程中,因为争夺资源而造成一种互相等待的现象,如果没有外力干涉,他们无法再执行下去

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 死锁
Object a = new Object();
Object b = new Object();

new Thread(()->{
synchronized (a){
System.out.println("获得锁a");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (b){
System.out.println("获得锁b");
}
}
},"A").start();

new Thread(()->{
synchronized (b){
System.out.println("获得锁b");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (a){
System.out.println("获得锁a");
}
}
},"A").start();
}

产生死锁的原因

  • 系统资源不足
  • 进程运行推送顺序不合适
  • 资源分配不当

Callable接口

创建的线程可以有返回值

Runnable接口和Callable接口

  1. 是否有返回值
  2. 是否抛出异常
  3. 实现方法名称不同,一个是run方法,一个是call方法
1
2
3
4
5
6
FutureTask<Integer> task = new FutureTask<>(() -> {
return 10;
});
new Thread(task,"AA");
// 获取返回结果
System.out.println(task.get());

JUC辅助类

CountDownLatch

CountDownLatch类可以设置一个计数器,然后通过countDown方法来进行减1的操作,使用await方法等待计数器不大于0,然后继续执行await方法之后的语句

1
2
3
4
5
6
7
8
9
10
11
12
13
 // 计数器
CountDownLatch countDownLatch = new CountDownLatch(6);

for (int i = 0; i < 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 执行完成");
// 每次减1
countDownLatch.countDown();
}, String.valueOf(i)).start();
}
// 等待
countDownLatch.await();
System.out.println("全部执行完成");

CyclicBarrier

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
CyclicBarrier cyclicBarrier = new CyclicBarrier(6,()->{
System.out.println("当值达到6的时候执行");
});

for (int i = 0; i < 6; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " 执行完成");
// 等待执行
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}, String.valueOf(i)).start();
}

Semaphore

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 3个停车位 6个车抢

// 创建默认3个停车位
Semaphore semaphore = new Semaphore(3);

for (int i = 0; i < 6; i++) {
new Thread(() -> {
try {
// 抢占
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " 抢到车位");
TimeUnit.SECONDS.sleep(new Random().nextInt(5));
System.out.println(Thread.currentThread().getName() + " =======离开车位");
} catch (Exception e) {
e.printStackTrace();
}finally {
// 释放
semaphore.release();
}
}, String.valueOf(i)).start();
}

读写锁

一个资源可以被多个读线程访问,或者可以被一个写线程访问,但是不能同时存在读写线程,读写互斥,读读共享

锁降级

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public volatile Map<String, Object> map = new HashMap<>();

// 读写锁
public ReadWriteLock rwLock = new ReentrantReadWriteLock();

public void put(String key, Object v) throws InterruptedException {
// 上写锁
rwLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + " 正在写 " + key);
TimeUnit.MILLISECONDS.sleep(300);
map.put(key, v);
System.out.println(Thread.currentThread().getName() + " 写完了 " + key);
} finally {
rwLock.writeLock().unlock();
}
}

public void get(String key) throws InterruptedException {
// 上读锁
rwLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + " 正在读 " + key);
TimeUnit.MILLISECONDS.sleep(300);
map.get(key);
System.out.println(Thread.currentThread().getName() + " 读完了 " + key);
} finally {
rwLock.readLock().unlock();
}
}

JUC阻塞队列

可以使得数据由队列的一端输入,从另一端输出

  • ArrayBlockingQueue(常用):由数组结构组成的有界阻塞队列,基于数组的阻塞队列实现,在ArrayBlockingQueue内部维护了一个定长的数组,以便缓存队列中的数据对象,除了一个定长数组外,ArrayBlockingQueue内部保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置
  • LinkedBlockingQueue(常用):由链表结构组成的有界(但大小默认值为integer.MAX_VALUE)阻塞队列
  • DelayQueue:使用优先级队列实现的延迟无界阻塞队列
  • PriorityBlockingQueue:支持优先级排序的无界阻塞队列
  • SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列
  • LinkedTransferQueue:由链表组成的无界阻塞队列
  • LinkedBlockingDeque:由链表组成的双向阻塞队列
方法类型 抛出异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e,time,unit)
移除 remove() poll() take() poll(time,unit)
检查 element() peek() 不可用 不可用

ThreadPool线程池

  1. 使用Executors
  2. 使用new ThreadPoolExecutor
    • corePoolSize:核心线程数(一直存在);线程池创建好以后就准备就绪的线程数量,用来等待接受异步任务去执行
    • maximumPoolSize:最大线程数量
    • keepAliveTime:存活时间。如果当前线程数量大于最大数量,当线程空闲了存活时间后释放空闲的线程
    • unit:时间单位
    • BlockingQueue<Runnable> workQueue:阻塞队列。如果任务有很多,就会将目前多的任务放在队列里面。只要有线程空闲,就会去队列里面取出新的任务继续执行
    • ThreadFactory threadFactory:线程的创建工厂
    • RejectedExecutionHandler handler:如果队列满了,按照指定的策略拒绝执行任务
1
2
3
4
5
6
7
8
9
10
// 10代表默认线程池大小
Executors.newFixedThreadPool(10); // 固定大小,都不可回收
Executors.newCachedThreadPool(); // 核心是0,所有线程都可以回收
Executors.newScheduledThreadPool(10); // 定时任务的线程池
Executors.newSingleThreadExecutor(); // 单线程线程池,后台从队列中获取任务,挨个执行

ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("consumer-queue-thread-%d").build();
ExecutorService service=new ThreadPoolExecutor(10, 100, 25L, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(), namedThreadFactory);

CompletableFuture

基本用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 第一个参数是任务,第二个参数是自定义的线程池,无返回值
CompletableFuture.runAsync(()->{
// 任务
},service);

// 有返回值,get()方法可以获取返回值
CompletableFuture.supplyAsync(()->{
return 100;
},service);

// 编排处理
CompletableFuture.supplyAsync(()->100)
.whenComplete((res,exc)->{
System.out.println("结果是:"+res); // 100
System.out.println("异常是:"+exc); // null
})
.exceptionally(t->{
// 出现异常的时候返回 10
return 10;
});

// 可以处理返回值
CompletableFuture.supplyAsync(()->100)
.handle((res,exc)->{
System.out.println("结果是:"+res); // 100
System.out.println("异常是:"+exc); // null
if(res!=null){
return res;
}
if(exc!=null){
return 0;
}
return 10;
});

线程串行化

  • thenApply()和thenApplyAsync()方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值
  • thenAccept()和thenAcceptAsync()方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果
  • thenRun()和thenRunAsync()方法:带有Async默认是异步执行的,不接收上一个任务返回值
1
2
3
4
5
6
7
8
9
10
11
12
ExecutorService service=Executors.newSingleThreadExecutor();
CompletableFuture.supplyAsync(()->100)
.thenAccept(res->{
System.out.println("上一个任务的返回值:"+res);
})
.thenApplyAsync(res->{
System.out.println("上一个任务的返回值:"+res);
return 100;
},service)
.thenRunAsync(()->{
System.out.println("无法接收返回值");
},service);

两任务组合-都要完成

  • thenCombine()和thenCombineAsync():组合两个future,获取两个future的返回结果,并返回当前任务的返回值
  • thenAcceptBoth()和thenAcceptBothAsync():组合两个future,获取两个future的返回结果,没有返回值
  • runAfterBoth()和runAfterBothAsync():组合两个future,不需要获取future的结果,只需两个future处理完任务后,处理该任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
CompletableFuture<Integer> future01=CompletableFuture.supplyAsync(()->{
int i=10;
return i/2;
},service);
CompletableFuture<Integer> future02=CompletableFuture.supplyAsync(()->{
int i=20;
return i/2;
},service);

// 无法感知前两个任务的结果
future01.runAfterBothAsync(future02,()->{
System.out.println("任务1和任务2执行后执行");
},service);

// 感知结果,没有返回值
future01.thenAcceptBothAsync(future02,(res01,res02)->{
System.out.println("任务1返回结果:"+res01);
System.out.println("任务2返回结果:"+res02);
},service);

// 感知结果,并且有返回值
future01.thenCombineAsync(future02,(res01,res02)->{
System.out.println("任务1返回结果:"+res01);
System.out.println("任务2返回结果:"+res02);
return"任务3自己的返回值";
},service);

两任务组合-一个完成

  • applyToEither()和applyToEitherAsync():两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值
  • acceptEither()和acceptEitherAsync():两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值
  • runAfterEither()和runAfterEitherAsync():两个任务有一个执行完成,不需要获取future的结果,处理任务,没有返回值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
CompletableFuture<Integer> future01=CompletableFuture.supplyAsync(()->{
int i=10;
return i/2;
},service);
CompletableFuture<Integer> future02=CompletableFuture.supplyAsync(()->{
int i=20;
return i/2;
},service);

future01.runAfterEitherAsync(future02,()->{
// 任务1或者任务2只要执行完成一个就执行
},service);

future01.acceptEitherAsync(future02,res->{
// 接收返回结果
System.out.println("任务返回结果:"+res);
},service);

future01.applyToEitherAsync(future02,res->{
// 接收返回结果,并返回结果
System.out.println("任务返回结果:"+res);
return res+1;
},service);

多任务组合

  • allOf():等待所有任务完成
  • anyOf():只要有一个任务完成
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CompletableFuture<Integer> future01=CompletableFuture.supplyAsync(()->{
int i=10;
return i/2;
},service);
CompletableFuture<Integer> future02=CompletableFuture.supplyAsync(()->{
int i=20;
return i/2;
},service);

CompletableFuture<Void> allOf=CompletableFuture.allOf(future01,future02);
// 等待所有任务执行完成
allOf.get();

CompletableFuture<Object> anyOf=CompletableFuture.anyOf(future01,future02);
// 任意一个任务执行完成即可,返回值是执行成功的那个任务的返回值
anyOf.get();

相关文章

JAVA基础

JDBC

JAVA多线程