本文列举了一些常见的 Java 多线程编程模式与案例, 十分十分重要 。务必理解且掌握。
- ⭐异步模式之生产者消费者
- ⭐LRU
- ⭐单例模式
- ⭐同步模式之顺序控制
- 实现一个线程安全的队列
- 终止模式之两阶段终止
- 使用 Unsafe 实现 CAS
- 多线程模拟并发售卖
- 手写限流算法
⭐异步模式之生产者消费者
使用 synchronized、wait、notify
class Message {
private int id;
public Message(int id) {
this.id = id;
}
};
class MessageQueue {
private LinkedList<Message> queue;
private int capacity;
public MessageQueue(int capacity) {
this.capacity = capacity;
this.queue = new LinkedList<Message>();
}
public Message take() {
synchronized (queue) {
while (queue.isEmpty()) {
System.out.println("没货了, wait");
try {
queue.wait();
} catch (InterruptedException e) {
}
}
Message message = queue.removeFirst();
queue.notifyAll();
return message;
}
}
public void put(Message message) {
synchronized (queue) {
while (queue.size() == capacity) {
System.out.println("库存已达到上限,wait");
try {
queue.wait();
} catch (InterruptedException e) {
}
}
queue.addLast(message);
queue.notifyAll();
}
}
}
public class Main {
public static void main(String[] args) {
MessageQueue messageQueue = new MessageQueue(2);
for (int i = 0; i < 4; i++) {
int id = i;
new Thread(() -> {
while(true){
Message message = new Message(id);
messageQueue.put(message);
System.out.println("生产者 " + id + " Put Message: " + message);
}
}, "生产者" + i).start();
}
new Thread(() -> {
while (true) {
Message message = messageQueue.take();
System.out.println("消费者 Get message: " + message);
}
}, "消费者").start();
}
}
使用 Condition
class MessageQueue {
private static ReentrantLock lock = new ReentrantLock();
private static Condition full = lock.newCondition();
private static Condition empty = lock.newCondition();
private LinkedList<Message> queue;
private int capacity;
public MessageQueue(int capacity) {
this.capacity = capacity;
this.queue = new LinkedList<Message>();
}
public Message take() {
lock.lock();
try {
while (queue.isEmpty()) {
System.out.println("没货了, wait");
try {
empty.await();
} catch (InterruptedException e) {
}
}
Message message = queue.removeFirst();
full.signalAll();
return message;
} finally {
lock.unlock();
}
}
public void put(Message message) {
lock.lock();
try {
while (queue.size() == capacity) {
System.out.println("库存已达到上限,wait");
try {
full.await();
} catch (InterruptedException e) {
}
}
queue.addLast(message);
empty.signalAll();
} finally {
lock.unlock();
}
}
}
使用 BlockingQueue 实现
class MessageQueue {
private BlockingQueue<Message> queue;
public MessageQueue(int capacity) {
this.queue = new ArrayBlockingQueue<Message>(capacity);
}
public Message take() {
Message message = null;
try {
message = queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
return message;
}
public void put(Message message) {
try {
queue.put(message);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
⭐LRU
LRU 的非线程安全实现
class LRUCache {
//HashMap + ArrayDeque(双端队列)实现
private int capacity;
private Map<Integer, Integer> map;
private Queue<Integer> keyQue;
public LRUCache(int capacity) {
this.capacity = capacity;
map = new HashMap<>();
keyQue = new ArrayDeque<>();
}
public int get(int key) {
if (map.containsKey(key)) {
Integer val = map.get(key);
keyQue.remove(key);
keyQue.add(key);
return val;
} else {
return -1;
}
}
public void put(int key, int value) {
if (map.containsKey(key)) {
keyQue.remove(key);
keyQue.add(key);
map.put(key, value);
return;
}
map.put(key, value);
keyQue.add(key);
if (map.size() > capacity) {
Integer oldestKey = keyQue.poll();
map.remove(oldestKey);
}
}
}
重写 LinkedHashMap#removeEldestEntry 并加读写锁
public final class SimpleLRUCache<K, V> {
private final LinkedHashMap<K, V> cache;
private final Lock r;
private final Lock w;
private SimpleLRUCache(int capacity) {
this.cache = new LRUMap<K, V>(capacity);
ReadWriteLock lock = new ReentrantReadWriteLock();
this.r = lock.readLock();
this.w = lock.writeLock();
}
public static <K, V> SimpleLRUCache<K, V> create(int capacity) {
return new SimpleLRUCache<K, V>(capacity);
}
public void put(K key, V value) {
w.lock();
try {
this.cache.put(key, value);
} finally {
w.unlock();
}
}
public V get(K key) {
r.lock();
try {
return this.cache.get(key);
} finally {
r.unlock();
}
}
public void remove(K key) {
w.lock();
try {
this.cache.remove(key);
} finally {
w.unlock();
}
}
public void clear() {
w.lock();
try {
this.cache.clear();
} finally {
w.unlock();
}
}
static class LRUMap<K, V> extends LinkedHashMap<K, V> {
private static final long serialVersionUID = 1L;
private final int capacity;
public LRUMap(int capacity) {
super((int) Math.ceil(capacity / 0.75) + 1, 0.75f, true);
this.capacity = capacity;
}
// 在 LRUMap 容量超出 capacity 时会驱逐出最老的键值对
@Override
protected boolean removeEldestEntry(Entry<K, V> eldest) {
return size() > capacity;
}
}
}
使用 Concurrent 容器
- ConcurrentLinkedQueue 维护键的放入或使用的先后顺序
- ConcurrentHashMap 维护键值对映射关系
- 使用读写锁来保证线程同时更新两个容器的线程安全性
/**
* @author shuang.kou
* <p>
* 使用 ConcurrentHashMap+ConcurrentLinkedQueue+ReadWriteLock实现线程安全的 LRU 缓存
*/
public class MyLruCache<K, V> {
/**
* 缓存的最大容量
*/
private final int maxCapacity;
private ConcurrentHashMap<K, V> cacheMap;
private ConcurrentLinkedQueue<K> keys;
/**
* 读写锁
*/
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private Lock writeLock = readWriteLock.writeLock();
private Lock readLock = readWriteLock.readLock();
public MyLruCache(int maxCapacity) {
if (maxCapacity < 0) {
throw new IllegalArgumentException("Illegal max capacity: " + maxCapacity);
}
this.maxCapacity = maxCapacity;
cacheMap = new ConcurrentHashMap<>(maxCapacity);
keys = new ConcurrentLinkedQueue<>();
}
public V put(K key, V value) {
// 加写锁
writeLock.lock();
try {
//1.key是否存在于当前缓存
if (cacheMap.containsKey(key)) {
moveToTailOfQueue(key);
cacheMap.put(key, value);
return value;
}
//2.是否超出缓存容量,超出的话就移除队列头部的元素以及其对应的缓存
if (cacheMap.size() == maxCapacity) {
System.out.println("maxCapacity of cache reached");
removeOldestKey();
}
//3.key不存在于当前缓存。将key添加到队列的尾部并且缓存key及其对应的元素
keys.add(key);
cacheMap.put(key, value);
return value;
} finally {
writeLock.unlock();
}
}
public V get(K key) {
//加读锁
readLock.lock();
try {
//key是否存在于当前缓存
if (cacheMap.containsKey(key)) {
// 存在的话就将key移动到队列的尾部
moveToTailOfQueue(key);
return cacheMap.get(key);
}
//不存在于当前缓存中就返回Null
return null;
} finally {
readLock.unlock();
}
}
public V remove(K key) {
writeLock.lock();
try {
//key是否存在于当前缓存
if (cacheMap.containsKey(key)) {
// 存在移除队列和Map中对应的Key
keys.remove(key);
return cacheMap.remove(key);
}
//不存在于当前缓存中就返回Null
return null;
} finally {
writeLock.unlock();
}
}
/**
* 将元素添加到队列的尾部(put/get的时候执行)
*/
private void moveToTailOfQueue(K key) {
keys.remove(key);
keys.add(key);
}
/**
* 移除队列头部的元素以及其对应的缓存 (缓存容量已满的时候执行)
*/
private void removeOldestKey() {
K oldestKey = keys.poll();
if (oldestKey != null) {
cacheMap.remove(oldestKey);
}
}
public int size() {
return cacheMap.size();
}
}
实现带有过期时间的 LRU
/**
* @author shuang.kou
* <p>
* 使用 ConcurrentHashMap+ConcurrentLinkedQueue+ReadWriteLock+ScheduledExecutorService实现线程安全的 LRU 缓存
*/
public class MyLruCacheWithExpireTime<K, V> {
/**
* 缓存的最大容量
*/
private final int maxCapacity;
private ConcurrentHashMap<K, V> cacheMap;
private ConcurrentLinkedQueue<K> keys;
/**
* 读写锁
*/
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private Lock writeLock = readWriteLock.writeLock();
private Lock readLock = readWriteLock.readLock();
// 定时清除过期键值
private ScheduledExecutorService scheduledExecutorService;
public MyLruCacheWithExpireTime(int maxCapacity) {
if (maxCapacity < 0) {
throw new IllegalArgumentException("Illegal max capacity: " + maxCapacity);
}
this.maxCapacity = maxCapacity;
cacheMap = new ConcurrentHashMap<>(maxCapacity);
keys = new ConcurrentLinkedQueue<>();
scheduledExecutorService = Executors.newScheduledThreadPool(3);
}
public V put(K key, V value, long expireTime) {
// 加写锁
writeLock.lock();
try {
//1.key是否存在于当前缓存
if (cacheMap.containsKey(key)) {
moveToTailOfQueue(key);
cacheMap.put(key, value);
return value;
}
//2.是否超出缓存容量,超出的话就移除队列头部的元素以及其对应的缓存
if (cacheMap.size() == maxCapacity) {
System.out.println("maxCapacity of cache reached");
removeOldestKey();
}
//3.key不存在于当前缓存。将key添加到队列的尾部并且缓存key及其对应的元素
keys.add(key);
cacheMap.put(key, value);
if (expireTime > 0) {
removeAfterExpireTime(key, expireTime);
}
return value;
} finally {
writeLock.unlock();
}
}
public V get(K key) {
//加读锁
readLock.lock();
try {
//key是否存在于当前缓存
if (cacheMap.containsKey(key)) {
// 存在的话就将key移动到队列的尾部
moveToTailOfQueue(key);
return cacheMap.get(key);
}
//不存在于当前缓存中就返回Null
return null;
} finally {
readLock.unlock();
}
}
public V remove(K key) {
writeLock.lock();
try {
//key是否存在于当前缓存
if (cacheMap.containsKey(key)) {
// 存在移除队列和Map中对应的Key
keys.remove(key);
return cacheMap.remove(key);
}
//不存在于当前缓存中就返回Null
return null;
} finally {
writeLock.unlock();
}
}
/**
* 将元素添加到队列的尾部(put/get的时候执行)
*/
private void moveToTailOfQueue(K key) {
keys.remove(key);
keys.add(key);
}
/**
* 移除队列头部的元素以及其对应的缓存 (缓存容量已满的时候执行)
*/
private void removeOldestKey() {
K oldestKey = keys.poll();
if (oldestKey != null) {
cacheMap.remove(oldestKey);
}
}
private void removeAfterExpireTime(K key, long expireTime) {
scheduledExecutorService.schedule(() -> {
//过期后清除该键值对
cacheMap.remove(key);
keys.remove(key);
}, expireTime, TimeUnit.MILLISECONDS);
}
public int size() {
return cacheMap.size();
}
}
⭐单例模式
饿汉
// 问题1:为什么加 final
// 问题2:如果实现了序列化接口, 还要做什么来防止反序列化破坏单例:实现 readResolve 方法
final class Singleton implements Serializable{
// 问题3:为什么设置为私有? 是否能防止反射创建新的实例?
private Singleton() {
}
//这样初始化是否能保证单例对象创建时的线程安全?
private static final Singleton INSTANCE = new Singleton();
//问题5:为什么提供静态方法而不是直接将 INSTANCE 设置为 public, 说出你知道的理由
public static Singleton getInstance() {
return INSTANCE;
}
// 防止序列化破坏单例模式
public Object readResolve(){
return INSTANCE;
}
}
枚举
// 问题1:枚举单例是如何限制实例个数的
// 问题2:枚举单例在创建时是否有并发问题
// 问题3:枚举单例能否被反射破坏单例
// 问题4:枚举单例能否被反序列化破坏单例
// 问题5:枚举单例属于懒汉式还是饿汉式
// 问题6:枚举单例如果希望加入一些单例创建时的初始化逻辑该如何做
enum Singleton {
INSTANCE;
}
线程安全的懒汉单例 1
public final class Singleton {
private Singleton() {
}
private static Singleton INSTANCE = null;
// 分析这里的线程安全, 并说明有什么缺点
public static synchronized Singleton getInstance() {
if (INSTANCE != null) {
return INSTANCE;
}
INSTANCE = new Singleton();
return INSTANCE;
}
}
线程安全的懒汉单例 2
public final class Singleton {
private Singleton() {
}
// 问题1:解释为什么要加 volatile ?
private static volatile Singleton INSTANCE = null;
public static Singleton getInstance() {
if (INSTANCE != null) {
return INSTANCE;
}
synchronized (Singleton.class) { // t1
// 问题2:为什么还要在这里加为空判断, 之前不是判断过了吗
if (INSTANCE != null) {
// t2
return INSTANCE;
}
INSTANCE = new Singleton();
return INSTANCE;
}
}
}
问题1:解释为什么要加 volatile ?
答: INSTANCE = new Singleton();
可能会发生重排序,导致 INSTANCE 不为空,但是构造函数没有执行。另外的线程可能获取到一个构造函数还未执行完的 INSTANCE。
问题2:为什么还要在这里加为空判断, 之前不是判断过了吗
答:刚开始 t1 和 t2 在 synchronized 处竞争锁,t1 竞争成功了,并执行了构造函数,给 INSTANCE 赋了值,之后 t2 得到锁,需要判断 INSTANCE 是否为空,防止再次执行构造函数。
静态内部类懒汉单例
public final class Singleton {
private Singleton() { }
// 问题1:属于懒汉式还是饿汉式
private static class LazyHolder {
static final Singleton INSTANCE = new Singleton();
}
// 问题2:在创建时是否有并发问题
public static Singleton getInstance() {
return LazyHolder.INSTANCE;
}
}
LazyHolder 类只有在被使用到的时候才会触发类加载,所以说是懒汉式的。
⭐同步模式之顺序控制
三个线程交叉打印 ABC
使用 Park/Unpark
public class PrintABC {
static Thread threadA, threadB, threadC;
public static void main(String[] args) {
threadA = new Thread(() -> {
for (int i = 0; i < 10; i++) {
// 打印当前线程名称
System.out.print(Thread.currentThread().getName());
// 唤醒下一个线程
LockSupport.unpark(threadB);
// 当前线程阻塞
LockSupport.park();
}
}, "A");
threadB = new Thread(() -> {
for (int i = 0; i < 10; i++) {
// 先阻塞等待被唤醒
LockSupport.park();
System.out.print(Thread.currentThread().getName());
// 唤醒下一个线程
LockSupport.unpark(threadC);
}
}, "B");
threadC = new Thread(() -> {
for (int i = 0; i < 10; i++) {
// 先阻塞等待被唤醒
LockSupport.park();
System.out.print(Thread.currentThread().getName());
// 唤醒下一个线程
LockSupport.unpark(threadA);
}
}, "C");
threadA.start();
threadB.start();
threadC.start();
}
}
使用 synchronized、wait、notify
class Wait_Notify_ACB {
private int num;
private static final Object LOCK = new Object();
private void printABC(String name, int targetNum) {
for (int i = 0; i < 10; i++) {
synchronized (LOCK) {
while (num % 3 != targetNum) {
try {
LOCK.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
num++;
System.out.print(name);
LOCK.notifyAll();
}
}
}
public static void main(String[] args) {
Wait_Notify_ACB task = new Wait_Notify_ACB ();
new Thread(() -> {
task.printABC("A", 0);
}, "A").start();
new Thread(() -> {
task.printABC("B", 1);
}, "B").start();
new Thread(() -> {
task.printABC("C", 2);
}, "C").start();
}
}
使用 Lock
class Lock_ABC {
private int num; // 当前状态值:保证三个线程之间交替打印
private Lock lock = new ReentrantLock();
private void printABC(String name, int targetNum) {
for (int i = 0; i < 10; ) {
lock.lock();
if (num % 3 == targetNum) {
num++;
i++;
System.out.print(name);
}
lock.unlock();
}
}
public static void main(String[] args) {
Lock_ABC lockABC = new Lock_ABC();
new Thread(() -> {
lockABC.printABC("A", 0);
}, "A").start();
new Thread(() -> {
lockABC.printABC("B", 1);
}, "B").start();
new Thread(() -> {
lockABC.printABC("C", 2);
}, "C").start();
}
}
使用 Lock+Condition
避免无效唤醒,指定下一个被唤醒的线程
class LockConditionABC {
private int num;
private static Lock lock = new ReentrantLock();
private static Condition c1 = lock.newCondition();
private static Condition c2 = lock.newCondition();
private static Condition c3 = lock.newCondition();
private void printABC(String name, int targetNum, Condition currentThread, Condition nextThread) {
for (int i = 0; i < 10; ) {
lock.lock();
try {
while (num % 3 != targetNum) {
currentThread.await();
}
num++;
i++;
System.out.print(name);
nextThread.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public static void main(String[] args) {
LockConditionABC print = new LockConditionABC();
new Thread(() -> {
print.printABC("A", 0, c1, c2);
}, "A").start();
new Thread(() -> {
print.printABC("B", 1, c2, c3);
}, "B").start();
new Thread(() -> {
print.printABC("C", 2, c3, c1);
}, "C").start();
}
}
使用 Semaphore
class SemaphoreABC {
private static Semaphore s1 = new Semaphore(1); //先打印A,所以设s1中的计数器值为1
private static Semaphore s2 = new Semaphore(0);
private static Semaphore s3 = new Semaphore(0);
private void printABC(String name, Semaphore currentThread, Semaphore nextThread) {
for (int i = 0; i < 10; i++) {
try {
currentThread.acquire(); //阻塞当前线程,即调用当前线程acquire(),计数器减1为0
System.out.print(name);
nextThread.release(); //唤醒下一个线程,即调用下一个线程线程release(),计数器加1
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
SemaphoreABC printer = new SemaphoreABC();
new Thread(() -> {
printer.printABC("A", s1, s2);
}, "A").start();
Thread.sleep(10);
new Thread(() -> {
printer.printABC("B", s2, s3);
}, "B").start();
Thread.sleep(10);
new Thread(() -> {
printer.printABC("C", s3, s1);
}, "C").start();
}
}
线程交替打印奇偶数
使用 synchronized、wait、notify
使用两个线程,两个线程竞争同一把对象锁,可以通过构造方法注入。
Object wait() 方法让当前线程进入等待状态。直到其他线程调用此对象的 notify() 方法或 notifyAll() 方法。当前线程必须是此对象的监视器所有者,否则还是会发生 IllegalMonitorStateException 异常。如果当前线程在等待之前或在等待时被任何线程中断,则会抛出 InterruptedException 异常。
Object notify() 方法用于唤醒一个在此对象监视器上等待的线程。如果所有的线程都在此对象上等待,那么只会选择一个线程,选择是任意性的,并在对实现做出决定时发生。一个线程在对象监视器上等待可以调用 wait() 方法。notify() 方法只能被作为此对象监视器的所有者的线程来调用。
一个线程要想成为对象监视器的所有者,可以使用以下 3 种方法:
- 执行对象的同步实例方法
- 使用 synchronized 内置锁
- 对于 Class 类型的对象,执行同步静态方法
一次只能有一个线程拥有对象的监视器。如果当前线程不是此对象监视器的所有者的话会抛出 IllegalMonitorStateException 异常。
class PrintOddEven {
private static final int MAX = 10;
private int num = 0;
private Object o = new Object();
private void printOddOrEven(String name, int flag) {
while (true) {
while (num < MAX) {
synchronized (o) {
// 检查条件是否满足,如果不满足就 wait
while (num % 2 != flag) {
try {
o.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(name + " : " + num);
num++;
o.notify();
}
}
}
}
public static void main(String[] args) {
PrintOddEven task = new PrintOddEven();
new Thread(() -> {
task.printOddOrEven("Odd thread", 1);
}).start();
new Thread(() -> {
task.printOddOrEven("Even thread", 0);
}).start();
}
}
总结 synchronized、wait、notify 的使用模板:
synchronized (monitor){
while(condition not matched){
monitor.wait();
}
doSomething();
monitor.notify();
}
为什么 synchronized 里面使用 while 而不是 if 呢?
为了防止虚假唤醒,虚假唤醒通俗来说就是当一个条件满足时可以唤醒多个线程,但是有些被唤醒的线程是不符合 if/while 的执行条件的。
就是用 if 判断的话,唤醒后线程会从 wait 之后的代码开始运行,但是不会重新判断 if 条件,直接继续运行 if 代码块之后的代码,而如果使用 while 的话,也会从wait 之后的代码运行,但是唤醒后会重新判断循环条件,如果不成立再执行 while 代码块之后的代码块,成立的话继续 wait
使用 Lock
class PrintOddEven{
private int num;
private Lock lock = new ReentrantLock();
private void printOddOrEven(String name, int flag){
for (int i = 0; i < 10;) {
lock.lock();
if(num % 2 == flag){
System.out.println(name + ": " + num);
num++;
i++; // 只有打印成功了才递增计数
}
lock.unlock();
}
}
public static void main(String[] args) {
PrintOddEven task = new PrintOddEven();
new Thread(()->{
task.printOddOrEven("Odd thread",1);
}).start();
new Thread(()->{
task.printOddOrEven("Even thread",0);
}).start();
}
}
使用 volatile
volatile 修饰的变量可以保证变量在线程间的可见性,常用作标识位出现。
public class Main {
public static void main(String[] args) throws InterruptedException {
// 定义锁
ReentrantLock lock = new ReentrantLock();
// 打印线程A的condition
Condition conditionA = lock.newCondition();
// 打印线程B的condition
Condition conditionB = lock.newCondition();
// 打印线程C的condition
Condition conditionC = lock.newCondition();
// 实例化A线程
Thread printerA = new Thread(new ReentrantLockSyncPrinter(lock, conditionA, conditionB));
// 实例化B线程
Thread printerB = new Thread(new ReentrantLockSyncPrinter(lock, conditionB, conditionC));
// 实例化C线程
Thread printerC = new Thread(new ReentrantLockSyncPrinter(lock, conditionC, conditionA));
printerA.setName("线程A");
printerB.setName("线程B");
printerC.setName("线程C");
printerA.start();
Thread.sleep(100);
printerB.start();
printerC.start();
}
}
class ReentrantLockSyncPrinter implements Runnable {
// 初始值
private static int num = 1;
private static final int MAX = 60;
// 打印次数
private static final int COUNT = 5;
// 打印锁
final ReentrantLock reentrantLock;
// 本线程打印所需的condition
final Condition currCondition;
// 下一个线程打印所需要的condition
final Condition nextCondition;
public ReentrantLockSyncPrinter(ReentrantLock reentrantLock, Condition currCondition, Condition nextCondition) {
this.reentrantLock = reentrantLock;
this.currCondition = currCondition;
this.nextCondition = nextCondition;
}
@Override
public void run() {
// 获取锁,进入临界区
reentrantLock.lock();
try {
while (num < MAX) {
System.out.print(Thread.currentThread().getName() + "-->");
// 连续打印COUNT次
for (int i = 0; i < COUNT; i++) {
//打印字符
System.out.print(num + " ");
num++;
}
System.out.println();
// 使用nextCondition唤醒下一个线程
// 因为只有一个线程在等待,所以signal或者signalAll都可以
nextCondition.signal();
try {
// 本线程让出锁并等待唤醒
currCondition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} finally {
// 释放锁
reentrantLock.unlock();
}
}
}
三个线程按序输出
利用三个线程,第一个线程打印1 2 3 4 5, 第二个线程打印 6 7 8 9 10, 第三个线程打印11 12 13 14 15,然后第一个线程再打印16 17 18 19 20,然后第二个线程接着打印21… 依此类推,直到打印到60。
通过一个共享的ReentrantLock
控制三个线程的访问,我们都知道ReentrantLock
是可重入锁。我们可以通过条件变量Condition
实现线程A给线程B发送信号唤醒B,线程B给线程C发送信号唤醒C,这样依次循环执行打印。
public class Main {
public static void main(String[] args) throws InterruptedException {
// 定义锁
ReentrantLock lock = new ReentrantLock();
// 打印线程A的condition
Condition conditionA = lock.newCondition();
// 打印线程B的condition
Condition conditionB = lock.newCondition();
// 打印线程C的condition
Condition conditionC = lock.newCondition();
// 实例化A线程
Thread printerA = new Thread(new ReentrantLockSyncPrinter(lock, conditionA, conditionB));
// 实例化B线程
Thread printerB = new Thread(new ReentrantLockSyncPrinter(lock, conditionB, conditionC));
// 实例化C线程
Thread printerC = new Thread(new ReentrantLockSyncPrinter(lock, conditionC, conditionA));
printerA.setName("线程A");
printerB.setName("线程B");
printerC.setName("线程C");
printerA.start();
Thread.sleep(100);
printerB.start();
printerC.start();
}
}
class ReentrantLockSyncPrinter implements Runnable {
// 初始值
private static int num = 1;
private static final int MAX = 60;
// 打印次数
private static final int COUNT = 5;
// 打印锁
final ReentrantLock reentrantLock;
// 本线程打印所需的condition
final Condition currCondition;
// 下一个线程打印所需要的condition
final Condition nextCondition;
public ReentrantLockSyncPrinter(ReentrantLock reentrantLock, Condition currCondition, Condition nextCondition) {
this.reentrantLock = reentrantLock;
this.currCondition = currCondition;
this.nextCondition = nextCondition;
}
@Override
public void run() {
// 获取锁,进入临界区
reentrantLock.lock();
try {
while (num < MAX) {
System.out.print(Thread.currentThread().getName() + "-->");
// 连续打印COUNT次
for (int i = 0; i < COUNT; i++) {
//打印字符
System.out.print(num + " ");
num++;
}
System.out.println();
// 使用nextCondition唤醒下一个线程
// 因为只有一个线程在等待,所以signal或者signalAll都可以
nextCondition.signal();
try {
// 本线程让出锁并等待唤醒
currCondition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} finally {
// 释放锁
reentrantLock.unlock();
}
}
}
实现一个线程安全的阻塞队列
使用 synchronized、wait、notify
public class ArrayBlockingQueue {
private Object[] array; //数组
private int head; //头
private int tail; //尾
private volatile int size; //元素个数
public ArrayBlockingQueue(int capacity) {
this.array = new Object[capacity];
}
//写入元素
public synchronized void put(Object o) throws InterruptedException {
//当队列满时,阻塞
while (size == array.length) {
this.wait();
}
array[tail++] = o;
if (tail == array.length) {
tail = 0;
}
size ++;
//唤醒线程
this.notifyAll();
}
//取出元素
public synchronized Object get() throws InterruptedException {
//当队列为空,阻塞
while (size == 0) {
this.wait();
}
Object o = array[head++];
if (head == array.length) {
head = 0;
}
size --;
//唤醒线程
this.notifyAll();
return o;
}
}
使用 Condition
public class ArrayBlockingQueue {
private Object[] array; //数组
private int head; //头
private int tail; //尾
private volatile int size; //元素个数
private ReentrantLock lock = new ReentrantLock(); //锁
private Condition notEmpty = lock.newCondition(); //非空
private Condition notFull = lock.newCondition(); //非满
public ArrayBlockingQueue(int capacity) {
this.array = new Object[capacity];
}
//写入元素
public void put(Object o) throws InterruptedException {
try{
lock.lock();
//当队列满时,阻塞
while (size == array.length) {
notFull.wait();
}
array[tail++] = o;
if (tail == array.length) {
tail = 0;
}
size ++;
//唤醒线程
notEmpty.notifyAll();
} finally {
lock.unlock();
}
}
//取出元素
public Object get() throws InterruptedException {
lock.lock();
try {
//当队列为空,阻塞
while (size == 0) {
notEmpty.wait();
}
Object o = array[head++];
if (head == array.length) {
head = 0;
}
size --;
//唤醒线程
notFull.notifyAll();
return o;
} finally {
lock.unlock();
}
}
}
终止模式之两阶段终止
两阶段终止模式又叫线程优雅停止。在一个线程 T1 中如何 “优雅” 终止线程 T2?这里的【优雅】指的是给 T2 一个料理后事的机会。
错误方法:使用线程对象的 stop() 方法停止线程:stop 方法会真正杀死线程,如果这时线程锁住了共享资源,那么当它被杀死后就再也没有机会释放锁,其它线程将永远无法获取锁
利用 isInterrupted
interrupt 可以打断正在执行的线程,无论这个线程是在 sleep,wait, 还是正常运行
class WorkerThread {
private Thread thread;
public void start() {
thread = new Thread(() -> {
while (true) {
Thread current = Thread.currentThread();
if (current.isInterrupted()) {
System.out.println("料理后事");
break;
}
try {
System.out.println("在努力工作");
Thread.sleep(1000);
} catch (InterruptedException e) {
current.interrupt();
}
}
}, "工作线程");
thread.start();
}
public void stop() {
thread.interrupt();
}
}
public class Main {
public static void main(String[] args) throws InterruptedException {
WorkerThread workerThread = new WorkerThread();
workerThread.start();
Thread.sleep(5000);
workerThread.stop();
}
}
利用停止标记
class WorkerThread {
private Thread thread;
private volatile boolean stop = false;
public void start() {
thread = new Thread(() -> {
while (true) {
Thread current = Thread.currentThread();
if (stop) {
System.out.println("料理后事");
break;
}
try {
System.out.println("在努力工作");
Thread.sleep(1000);
} catch (InterruptedException e) {}
}
}, "工作线程");
thread.start();
}
public void stop() {
stop = true;
thread.interrupt();
}
}
使用 Unsafe 实现 CAS
class MyAtomicInteger {
private volatile int value;
//提供底层CAS操作的对象
private static final Unsafe unsafe;
//属性在内存中相对于对象的地址偏移量
private static final long valueOffset;
static {
try {
//getUnsafe()方法不好使,通过反射获取unsafe对象
Field field = Unsafe.class.getDeclaredField("theUnsafe");
//Unsafe类中的theUnsafe属性为private,设置可访问权限
field.setAccessible(true);
unsafe = (Unsafe) field.get(null);
//通过unsafe对象提供的方法获取value属性偏移量
valueOffset = unsafe.objectFieldOffset
(MyAtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) {
throw new Error(ex);
}
}
/**
* CAS操作方法
*
* @param current 旧值
* @param update 期望值
* @return 修改是否成功
*/
public boolean compareAndSet(int current, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, current, update);
}
//相当于AtomicInteger中getAndAdd()方法
public void add() {
int current;
do {
//线程获取当前值
current = unsafe.getIntVolatile(this, valueOffset);
} while (!compareAndSet(current, current + 1));//修改失败则重试
}
public int getValue() {
return value;
}
}
public class Main {
public static void main(String[] args) throws InterruptedException {
MyAtomicInteger atomicInteger = new MyAtomicInteger();
for (int i = 0; i < 20; i++) {
new Thread(() -> {
for (int j = 0; j < 1000; j++) {
atomicInteger.add();
}
}).start();
}
Thread.sleep(1000);//等待所有线程执行完毕再输出
System.out.println(atomicInteger.getValue());
}
}
多线程模拟并发售卖
创建若干线程代表车票售卖窗口,每个窗口售卖的车票不能有重复,要求写代码实现。
public class Main {
public static void main(String[] args) {
Ticket ticket = new Ticket();
Thread t1 = new Thread(ticket);
Thread t2 = new Thread(ticket);
Thread t3 = new Thread(ticket);
Thread t4 = new Thread(ticket);
t1.setName("窗口A");
t2.setName("窗口B");
t3.setName("窗口C");
t4.setName("窗口D");
t1.start();
t2.start();
t3.start();
t4.start();
}
}
class Ticket implements Runnable {
int serialNumber = 1;
//synchronized为线程加锁,保证每次只有一个线程拿到锁进行售票
@Override
public synchronized void run() {
while (true) {
if (serialNumber <= 100) {
System.out.println(Thread.currentThread().getName() + "卖出" + serialNumber + "号票");
serialNumber++;
}
try {
// 这里wait()休眠一段时间,表示当前线程释放锁,别的线程可以拿到锁进行售票
// 注意wait()方法必须在synchronized代码里使用,因为想要释放锁的前提是,你必须首先持有锁
this.wait(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
手写限流算法
限流算法也是面试的重点之一,面试官可能不会要求你手写限流算法,但是你必须知道有哪些限流算法,以及这些 限流算法特点以及优缺点 。下面给出一下参考资料:
- 一文搞懂高频面试题之限流算法,从算法原理到实现,再到对比分析 - 知乎 (zhihu.com)
- 基于令牌桶算法的Java限流实现 - 简书 (jianshu.com)
- 高并发系统限流-漏桶算法和令牌桶算法 - xuwc - 博客园 (cnblogs.com)