常见 Java 多线程编程模式案例

本文列举了一些常见的 Java 多线程编程模式与案例, 十分十分重要 。务必理解且掌握。

  • ⭐异步模式之生产者消费者
  • ⭐LRU
  • ⭐单例模式
  • ⭐同步模式之顺序控制
  • 实现一个线程安全的队列
  • 终止模式之两阶段终止
  • 使用 Unsafe 实现 CAS
  • 多线程模拟并发售卖
  • 手写限流算法

⭐异步模式之生产者消费者

使用 synchronized、wait、notify

class Message {
    private int id;

    public Message(int id) {
        this.id = id;
    }
};

class MessageQueue {
    private LinkedList<Message> queue;
    private int capacity;

    public MessageQueue(int capacity) {
        this.capacity = capacity;
        this.queue = new LinkedList<Message>();
    }

    public Message take() {
        synchronized (queue) {
            while (queue.isEmpty()) {
                System.out.println("没货了, wait");
                try {
                    queue.wait();
                } catch (InterruptedException e) {
                }
            }
            Message message = queue.removeFirst();
            queue.notifyAll();
            return message;
        }
    }

    public void put(Message message) {
        synchronized (queue) {
            while (queue.size() == capacity) {
                System.out.println("库存已达到上限,wait");
                try {
                    queue.wait();
                } catch (InterruptedException e) {
                }
            }
            queue.addLast(message);
            queue.notifyAll();
        }
    }
}

public class Main {
    public static void main(String[] args) {
        MessageQueue messageQueue = new MessageQueue(2);
        for (int i = 0; i < 4; i++) {
            int id = i;
            new Thread(() -> {
                while(true){
                    Message message = new Message(id);
                    messageQueue.put(message);
                    System.out.println("生产者 " + id + "  Put Message: " + message);
                }
            }, "生产者" + i).start();
        }
        new Thread(() -> {
            while (true) {
                Message message = messageQueue.take();
                System.out.println("消费者 Get message: " + message);
            }
        }, "消费者").start();
    }
}

使用 Condition

class MessageQueue {
    private static ReentrantLock lock = new ReentrantLock();
    private static Condition full = lock.newCondition();
    private static Condition empty = lock.newCondition();

    private LinkedList<Message> queue;
    private int capacity;

    public MessageQueue(int capacity) {
        this.capacity = capacity;
        this.queue = new LinkedList<Message>();
    }

    public Message take() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                System.out.println("没货了, wait");
                try {
                    empty.await();
                } catch (InterruptedException e) {
                }
            }
            Message message = queue.removeFirst();
            full.signalAll();
            return message;
        } finally {
            lock.unlock();
        }
    }

    public void put(Message message) {
        lock.lock();
        try {
            while (queue.size() == capacity) {
                System.out.println("库存已达到上限,wait");
                try {
                    full.await();
                } catch (InterruptedException e) {
                }
            }
            queue.addLast(message);
            empty.signalAll();
        } finally {
            lock.unlock();
        }
    }
}

使用 BlockingQueue 实现

class MessageQueue {
    private BlockingQueue<Message> queue;
    public MessageQueue(int capacity) {
        this.queue = new ArrayBlockingQueue<Message>(capacity);
    }

    public Message take() {
        Message message = null;
        try {
            message = queue.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return message;
    }

    public void put(Message message) {
        try {
            queue.put(message);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

⭐LRU

LRU 的非线程安全实现

class LRUCache {

    //HashMap + ArrayDeque(双端队列)实现
    private int capacity;
    private Map<Integer, Integer> map;
    private Queue<Integer> keyQue;

    public LRUCache(int capacity) {
        this.capacity = capacity;
        map = new HashMap<>();
        keyQue = new ArrayDeque<>();
    }
    
    public int get(int key) {
        if (map.containsKey(key)) {
            Integer val = map.get(key);
            keyQue.remove(key);
            keyQue.add(key);
            return val;
        } else {
            return -1;
        }
    }
    
    public void put(int key, int value) {
        if (map.containsKey(key)) {
            keyQue.remove(key);
            keyQue.add(key);
            map.put(key, value);
            return;
        }

        map.put(key, value);
        keyQue.add(key);

        if (map.size() > capacity) {
            Integer oldestKey = keyQue.poll();
            map.remove(oldestKey);
        }
    }
}

重写 LinkedHashMap#removeEldestEntry 并加读写锁

public final class SimpleLRUCache<K, V> {
    private final LinkedHashMap<K, V> cache;
    private final Lock r;
    private final Lock w;

    private SimpleLRUCache(int capacity) {
        this.cache = new LRUMap<K, V>(capacity);
        ReadWriteLock lock = new ReentrantReadWriteLock();

        this.r = lock.readLock();
        this.w = lock.writeLock();
    }

    public static <K, V> SimpleLRUCache<K, V> create(int capacity) {
        return new SimpleLRUCache<K, V>(capacity);
    }

    public void put(K key, V value) {
        w.lock();
        try {
            this.cache.put(key, value);
        } finally {
            w.unlock();
        }
    }

    public V get(K key) {
        r.lock();
        try {
            return this.cache.get(key);
        } finally {
            r.unlock();
        }
    }

    public void remove(K key) {
        w.lock();
        try {
            this.cache.remove(key);
        } finally {
            w.unlock();
        }
    }

    public void clear() {
        w.lock();
        try {
            this.cache.clear();
        } finally {
            w.unlock();
        }
    }

    static class LRUMap<K, V> extends LinkedHashMap<K, V> {
        private static final long serialVersionUID = 1L;
        private final int capacity;

        public LRUMap(int capacity) {
            super((int) Math.ceil(capacity / 0.75) + 1, 0.75f, true);
            this.capacity = capacity;

        }
        // 在 LRUMap 容量超出 capacity 时会驱逐出最老的键值对
        @Override
        protected boolean removeEldestEntry(Entry<K, V> eldest) {
            return size() > capacity;
        }
    }
}

使用 Concurrent 容器

  • ConcurrentLinkedQueue 维护键的放入或使用的先后顺序
  • ConcurrentHashMap 维护键值对映射关系
  • 使用读写锁来保证线程同时更新两个容器的线程安全性
/**
 * @author shuang.kou
 * <p>
 * 使用 ConcurrentHashMap+ConcurrentLinkedQueue+ReadWriteLock实现线程安全的 LRU 缓存
 */
public class MyLruCache<K, V> {

    /**
     * 缓存的最大容量
     */
    private final int maxCapacity;

    private ConcurrentHashMap<K, V> cacheMap;
    private ConcurrentLinkedQueue<K> keys;
    /**
     * 读写锁
     */
    private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private Lock writeLock = readWriteLock.writeLock();
    private Lock readLock = readWriteLock.readLock();

    public MyLruCache(int maxCapacity) {
        if (maxCapacity < 0) {
            throw new IllegalArgumentException("Illegal max capacity: " + maxCapacity);
        }
        this.maxCapacity = maxCapacity;
        cacheMap = new ConcurrentHashMap<>(maxCapacity);
        keys = new ConcurrentLinkedQueue<>();
    }

    public V put(K key, V value) {
        // 加写锁
        writeLock.lock();
        try {
            //1.key是否存在于当前缓存
            if (cacheMap.containsKey(key)) {
                moveToTailOfQueue(key);
                cacheMap.put(key, value);
                return value;
            }
            //2.是否超出缓存容量,超出的话就移除队列头部的元素以及其对应的缓存
            if (cacheMap.size() == maxCapacity) {
                System.out.println("maxCapacity of cache reached");
                removeOldestKey();
            }
            //3.key不存在于当前缓存。将key添加到队列的尾部并且缓存key及其对应的元素
            keys.add(key);
            cacheMap.put(key, value);
            return value;
        } finally {
            writeLock.unlock();
        }
    }

    public V get(K key) {
        //加读锁
        readLock.lock();
        try {
            //key是否存在于当前缓存
            if (cacheMap.containsKey(key)) {
                // 存在的话就将key移动到队列的尾部
                moveToTailOfQueue(key);
                return cacheMap.get(key);
            }
            //不存在于当前缓存中就返回Null
            return null;
        } finally {
            readLock.unlock();
        }
    }

    public V remove(K key) {
        writeLock.lock();
        try {
            //key是否存在于当前缓存
            if (cacheMap.containsKey(key)) {
                // 存在移除队列和Map中对应的Key
                keys.remove(key);
                return cacheMap.remove(key);
            }
            //不存在于当前缓存中就返回Null
            return null;
        } finally {
            writeLock.unlock();
        }
    }

    /**
     * 将元素添加到队列的尾部(put/get的时候执行)
     */
    private void moveToTailOfQueue(K key) {
        keys.remove(key);
        keys.add(key);
    }

    /**
     * 移除队列头部的元素以及其对应的缓存 (缓存容量已满的时候执行)
     */
    private void removeOldestKey() {
        K oldestKey = keys.poll();
        if (oldestKey != null) {
            cacheMap.remove(oldestKey);
        }
    }

    public int size() {
        return cacheMap.size();
    }

}

实现带有过期时间的 LRU

/**
 * @author shuang.kou
 * <p>
 * 使用 ConcurrentHashMap+ConcurrentLinkedQueue+ReadWriteLock+ScheduledExecutorService实现线程安全的 LRU 缓存
 */
public class MyLruCacheWithExpireTime<K, V> {

    /**
     * 缓存的最大容量
     */
    private final int maxCapacity;

    private ConcurrentHashMap<K, V> cacheMap;
    private ConcurrentLinkedQueue<K> keys;
    /**
     * 读写锁
     */
    private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private Lock writeLock = readWriteLock.writeLock();
    private Lock readLock = readWriteLock.readLock();
    // 定时清除过期键值
    private ScheduledExecutorService scheduledExecutorService;

    public MyLruCacheWithExpireTime(int maxCapacity) {
        if (maxCapacity < 0) {
            throw new IllegalArgumentException("Illegal max capacity: " + maxCapacity);
        }
        this.maxCapacity = maxCapacity;
        cacheMap = new ConcurrentHashMap<>(maxCapacity);
        keys = new ConcurrentLinkedQueue<>();
        scheduledExecutorService = Executors.newScheduledThreadPool(3);
    }

    public V put(K key, V value, long expireTime) {
        // 加写锁
        writeLock.lock();
        try {
            //1.key是否存在于当前缓存
            if (cacheMap.containsKey(key)) {
                moveToTailOfQueue(key);
                cacheMap.put(key, value);
                return value;
            }
            //2.是否超出缓存容量,超出的话就移除队列头部的元素以及其对应的缓存
            if (cacheMap.size() == maxCapacity) {
                System.out.println("maxCapacity of cache reached");
                removeOldestKey();
            }
            //3.key不存在于当前缓存。将key添加到队列的尾部并且缓存key及其对应的元素
            keys.add(key);
            cacheMap.put(key, value);
            if (expireTime > 0) {
                removeAfterExpireTime(key, expireTime);
            }
            return value;
        } finally {
            writeLock.unlock();
        }
    }

    public V get(K key) {
        //加读锁
        readLock.lock();
        try {
            //key是否存在于当前缓存
            if (cacheMap.containsKey(key)) {
                // 存在的话就将key移动到队列的尾部
                moveToTailOfQueue(key);
                return cacheMap.get(key);
            }
            //不存在于当前缓存中就返回Null
            return null;
        } finally {
            readLock.unlock();
        }
    }

    public V remove(K key) {
        writeLock.lock();
        try {
            //key是否存在于当前缓存
            if (cacheMap.containsKey(key)) {
                // 存在移除队列和Map中对应的Key
                keys.remove(key);
                return cacheMap.remove(key);
            }
            //不存在于当前缓存中就返回Null
            return null;
        } finally {
            writeLock.unlock();
        }
    }

    /**
     * 将元素添加到队列的尾部(put/get的时候执行)
     */
    private void moveToTailOfQueue(K key) {
        keys.remove(key);
        keys.add(key);
    }

    /**
     * 移除队列头部的元素以及其对应的缓存 (缓存容量已满的时候执行)
     */
    private void removeOldestKey() {
        K oldestKey = keys.poll();
        if (oldestKey != null) {
            cacheMap.remove(oldestKey);
        }
    }

    private void removeAfterExpireTime(K key, long expireTime) {
        scheduledExecutorService.schedule(() -> {
            //过期后清除该键值对
            cacheMap.remove(key);
            keys.remove(key);
        }, expireTime, TimeUnit.MILLISECONDS);
    }

    public int size() {
        return cacheMap.size();
    }

}

⭐单例模式

饿汉

// 问题1:为什么加 final
// 问题2:如果实现了序列化接口, 还要做什么来防止反序列化破坏单例:实现 readResolve 方法
final class Singleton implements Serializable{
    // 问题3:为什么设置为私有? 是否能防止反射创建新的实例?
    private Singleton() {
    }
    //这样初始化是否能保证单例对象创建时的线程安全?
    private static final Singleton INSTANCE = new Singleton();
    
    //问题5:为什么提供静态方法而不是直接将 INSTANCE 设置为 public, 说出你知道的理由
    public static Singleton getInstance() {
        return INSTANCE;
    }
    
    // 防止序列化破坏单例模式
    public Object readResolve(){
        return INSTANCE;
    }
}

枚举

// 问题1:枚举单例是如何限制实例个数的
// 问题2:枚举单例在创建时是否有并发问题
// 问题3:枚举单例能否被反射破坏单例
// 问题4:枚举单例能否被反序列化破坏单例
// 问题5:枚举单例属于懒汉式还是饿汉式
// 问题6:枚举单例如果希望加入一些单例创建时的初始化逻辑该如何做
enum Singleton {
    INSTANCE;
}

线程安全的懒汉单例 1

public final class Singleton {
    private Singleton() {
    }

    private static Singleton INSTANCE = null;

    // 分析这里的线程安全, 并说明有什么缺点
    public static synchronized Singleton getInstance() {
        if (INSTANCE != null) {
            return INSTANCE;
        }
        INSTANCE = new Singleton();
        return INSTANCE;
    }
}

线程安全的懒汉单例 2

public final class Singleton {
    private Singleton() {
    }

    // 问题1:解释为什么要加 volatile ?
    private static volatile Singleton INSTANCE = null;

    public static Singleton getInstance() {
        if (INSTANCE != null) {
            return INSTANCE;
        }
        synchronized (Singleton.class) { // t1
            // 问题2:为什么还要在这里加为空判断, 之前不是判断过了吗
            if (INSTANCE != null) {
                // t2
                return INSTANCE;
            }
            INSTANCE = new Singleton();
            return INSTANCE;
        }
    }
}

问题1:解释为什么要加 volatile ?

答: INSTANCE = new Singleton(); 可能会发生重排序,导致 INSTANCE 不为空,但是构造函数没有执行。另外的线程可能获取到一个构造函数还未执行完的 INSTANCE。

问题2:为什么还要在这里加为空判断, 之前不是判断过了吗

答:刚开始 t1 和 t2 在 synchronized 处竞争锁,t1 竞争成功了,并执行了构造函数,给 INSTANCE 赋了值,之后 t2 得到锁,需要判断 INSTANCE 是否为空,防止再次执行构造函数。

静态内部类懒汉单例

public final class Singleton {
    private Singleton() { }
    // 问题1:属于懒汉式还是饿汉式
    private static class LazyHolder {
        static final Singleton INSTANCE = new Singleton();
    }
    // 问题2:在创建时是否有并发问题
    public static Singleton getInstance() {
        return LazyHolder.INSTANCE;
    }
}

LazyHolder 类只有在被使用到的时候才会触发类加载,所以说是懒汉式的。

⭐同步模式之顺序控制

三个线程交叉打印 ABC

使用 Park/Unpark

public class PrintABC {
  
    static Thread threadA, threadB, threadC;
  
      public static void main(String[] args) {
        threadA = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                // 打印当前线程名称
                System.out.print(Thread.currentThread().getName());
                // 唤醒下一个线程
                LockSupport.unpark(threadB);
                // 当前线程阻塞
                LockSupport.park();
            }
        }, "A");
        threadB = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                // 先阻塞等待被唤醒
                LockSupport.park();
                System.out.print(Thread.currentThread().getName());
                // 唤醒下一个线程
                LockSupport.unpark(threadC);
            }
        }, "B");
        threadC = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                // 先阻塞等待被唤醒
                LockSupport.park();
                System.out.print(Thread.currentThread().getName());
                // 唤醒下一个线程
                LockSupport.unpark(threadA);
            }
        }, "C");
        threadA.start();
        threadB.start();
        threadC.start();
    }
}

使用 synchronized、wait、notify

class Wait_Notify_ACB {

    private int num;
    private static final Object LOCK = new Object();

    private void printABC(String name, int targetNum) {
        for (int i = 0; i < 10; i++) {
            synchronized (LOCK) {
                while (num % 3 != targetNum) {
                    try {
                        LOCK.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                num++;
                System.out.print(name);
                LOCK.notifyAll();
            }
        }

    }

    public static void main(String[] args) {
        Wait_Notify_ACB  task = new Wait_Notify_ACB ();
        new Thread(() -> {
            task.printABC("A", 0);
        }, "A").start();
        new Thread(() -> {
            task.printABC("B", 1);
        }, "B").start();
        new Thread(() -> {
            task.printABC("C", 2);
        }, "C").start();
    }
}

使用 Lock

class Lock_ABC {

    private int num;   // 当前状态值:保证三个线程之间交替打印
    private Lock lock = new ReentrantLock();

    private void printABC(String name, int targetNum) {
        for (int i = 0; i < 10; ) {
            lock.lock();
            if (num % 3 == targetNum) {
                num++;
                i++;
                System.out.print(name);
            }
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        Lock_ABC lockABC = new Lock_ABC();

        new Thread(() -> {
            lockABC.printABC("A", 0);
        }, "A").start();

        new Thread(() -> {
            lockABC.printABC("B", 1);
        }, "B").start();

        new Thread(() -> {
            lockABC.printABC("C", 2);
        }, "C").start();
    }
}

使用 Lock+Condition

避免无效唤醒,指定下一个被唤醒的线程

class LockConditionABC {

    private int num;
    private static Lock lock = new ReentrantLock();
    private static Condition c1 = lock.newCondition();
    private static Condition c2 = lock.newCondition();
    private static Condition c3 = lock.newCondition();

    private void printABC(String name, int targetNum, Condition currentThread, Condition nextThread) {
        for (int i = 0; i < 10; ) {
            lock.lock();
            try {
                while (num % 3 != targetNum) {
                    currentThread.await();
                }
                num++;
                i++;
                System.out.print(name);
                nextThread.signal();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }

    public static void main(String[] args) {
        LockConditionABC print = new LockConditionABC();
        new Thread(() -> {
            print.printABC("A", 0, c1, c2);
        }, "A").start();
        new Thread(() -> {
            print.printABC("B", 1, c2, c3);
        }, "B").start();
        new Thread(() -> {
            print.printABC("C", 2, c3, c1);
        }, "C").start();
    }
}

使用 Semaphore

class SemaphoreABC {

    private static Semaphore s1 = new Semaphore(1);  //先打印A,所以设s1中的计数器值为1
    private static Semaphore s2 = new Semaphore(0);
    private static Semaphore s3 = new Semaphore(0);
    

    private void printABC(String name, Semaphore currentThread, Semaphore nextThread) {
        for (int i = 0; i < 10; i++) {
            try {
                currentThread.acquire();   //阻塞当前线程,即调用当前线程acquire(),计数器减1为0
                System.out.print(name);
                nextThread.release();    //唤醒下一个线程,即调用下一个线程线程release(),计数器加1

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        SemaphoreABC printer = new SemaphoreABC();
        new Thread(() -> {
            printer.printABC("A", s1, s2);
        }, "A").start();
        Thread.sleep(10);
        new Thread(() -> {
            printer.printABC("B", s2, s3);
        }, "B").start();
        Thread.sleep(10);
        new Thread(() -> {
            printer.printABC("C", s3, s1);
        }, "C").start();
    }
}

线程交替打印奇偶数

使用 synchronized、wait、notify

使用两个线程,两个线程竞争同一把对象锁,可以通过构造方法注入。


Object wait() 方法让当前线程进入等待状态。直到其他线程调用此对象的 notify() 方法notifyAll() 方法。当前线程必须是此对象的监视器所有者,否则还是会发生 IllegalMonitorStateException 异常。如果当前线程在等待之前或在等待时被任何线程中断,则会抛出 InterruptedException 异常。


Object notify() 方法用于唤醒一个在此对象监视器上等待的线程。如果所有的线程都在此对象上等待,那么只会选择一个线程,选择是任意性的,并在对实现做出决定时发生。一个线程在对象监视器上等待可以调用 wait() 方法。notify() 方法只能被作为此对象监视器的所有者的线程来调用。

一个线程要想成为对象监视器的所有者,可以使用以下 3 种方法:

  • 执行对象的同步实例方法
  • 使用 synchronized 内置锁
  • 对于 Class 类型的对象,执行同步静态方法

一次只能有一个线程拥有对象的监视器。如果当前线程不是此对象监视器的所有者的话会抛出 IllegalMonitorStateException 异常。

class PrintOddEven {

    private static final int MAX = 10;
    private int num = 0;
    private Object o = new Object();

    private void printOddOrEven(String name, int flag) {
        while (true) {
            while (num < MAX) {
                synchronized (o) {
                    // 检查条件是否满足,如果不满足就 wait
                    while (num % 2 != flag) {
                        try {
                            o.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    System.out.println(name + " : " + num);
                    num++;
                    o.notify();
                }
            }
        }
    }

    public static void main(String[] args) {
        PrintOddEven task = new PrintOddEven();
        new Thread(() -> {
            task.printOddOrEven("Odd thread", 1);
        }).start();
        new Thread(() -> {
            task.printOddOrEven("Even thread", 0);
        }).start();
    }
}

总结 synchronized、wait、notify 的使用模板:

synchronized (monitor){
    while(condition not matched){
        monitor.wait();        
    }
    doSomething();
    monitor.notify();
}

为什么 synchronized 里面使用 while 而不是 if 呢?

为了防止虚假唤醒,虚假唤醒通俗来说就是当一个条件满足时可以唤醒多个线程,但是有些被唤醒的线程是不符合 if/while 的执行条件的。

就是用 if 判断的话,唤醒后线程会从 wait 之后的代码开始运行,但是不会重新判断 if 条件,直接继续运行 if 代码块之后的代码,而如果使用 while 的话,也会从wait 之后的代码运行,但是唤醒后会重新判断循环条件,如果不成立再执行 while 代码块之后的代码块,成立的话继续 wait

使用 Lock

class PrintOddEven{

    private int num;
    private Lock lock = new ReentrantLock();

    private void printOddOrEven(String name, int flag){
        for (int i = 0; i < 10;) {
            lock.lock();
            if(num % 2 == flag){
                System.out.println(name + ": " + num);
                num++;
                i++; // 只有打印成功了才递增计数
            }
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        PrintOddEven task = new PrintOddEven();
        new Thread(()->{
            task.printOddOrEven("Odd thread",1);
        }).start();
        new Thread(()->{
            task.printOddOrEven("Even thread",0);
        }).start();
    }
}

使用 volatile

volatile 修饰的变量可以保证变量在线程间的可见性,常用作标识位出现。

public class Main {
    public static void main(String[] args) throws InterruptedException {
        // 定义锁
        ReentrantLock lock = new ReentrantLock();
        // 打印线程A的condition
        Condition conditionA = lock.newCondition();
        // 打印线程B的condition
        Condition conditionB = lock.newCondition();
        // 打印线程C的condition
        Condition conditionC = lock.newCondition();
        // 实例化A线程
        Thread printerA = new Thread(new ReentrantLockSyncPrinter(lock, conditionA, conditionB));
        // 实例化B线程
        Thread printerB = new Thread(new ReentrantLockSyncPrinter(lock, conditionB, conditionC));
        // 实例化C线程
        Thread printerC = new Thread(new ReentrantLockSyncPrinter(lock, conditionC, conditionA));

        printerA.setName("线程A");
        printerB.setName("线程B");
        printerC.setName("线程C");

        printerA.start();
        Thread.sleep(100);
        printerB.start();
        printerC.start();

    }

}

class ReentrantLockSyncPrinter implements Runnable {
    // 初始值
    private static int num = 1;
    private static final int MAX = 60;
    // 打印次数
    private static final int COUNT = 5;
    // 打印锁
    final ReentrantLock reentrantLock;
    // 本线程打印所需的condition
    final Condition currCondition;
    // 下一个线程打印所需要的condition
    final Condition nextCondition;

    public ReentrantLockSyncPrinter(ReentrantLock reentrantLock, Condition currCondition, Condition nextCondition) {
        this.reentrantLock = reentrantLock;
        this.currCondition = currCondition;
        this.nextCondition = nextCondition;
    }

    @Override
    public void run() {
        // 获取锁,进入临界区
        reentrantLock.lock();
        try {
            while (num < MAX) {
                System.out.print(Thread.currentThread().getName() + "-->");
                // 连续打印COUNT次
                for (int i = 0; i < COUNT; i++) {
                    //打印字符
                    System.out.print(num + " ");
                    num++;
                }
                System.out.println();
                // 使用nextCondition唤醒下一个线程
                // 因为只有一个线程在等待,所以signal或者signalAll都可以
                nextCondition.signal();
                try {
                    // 本线程让出锁并等待唤醒
                    currCondition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        } finally {
            // 释放锁
            reentrantLock.unlock();
        }
    }
}

三个线程按序输出

利用三个线程,第一个线程打印1 2 3 4 5, 第二个线程打印 6 7 8 9 10, 第三个线程打印11 12 13 14 15,然后第一个线程再打印16 17 18 19 20,然后第二个线程接着打印21… 依此类推,直到打印到60。

通过一个共享的ReentrantLock控制三个线程的访问,我们都知道ReentrantLock是可重入锁。我们可以通过条件变量Condition实现线程A给线程B发送信号唤醒B,线程B给线程C发送信号唤醒C,这样依次循环执行打印。

public class Main {
    public static void main(String[] args) throws InterruptedException {
        // 定义锁
        ReentrantLock lock = new ReentrantLock();
        // 打印线程A的condition
        Condition conditionA = lock.newCondition();
        // 打印线程B的condition
        Condition conditionB = lock.newCondition();
        // 打印线程C的condition
        Condition conditionC = lock.newCondition();
        // 实例化A线程
        Thread printerA = new Thread(new ReentrantLockSyncPrinter(lock, conditionA, conditionB));
        // 实例化B线程
        Thread printerB = new Thread(new ReentrantLockSyncPrinter(lock, conditionB, conditionC));
        // 实例化C线程
        Thread printerC = new Thread(new ReentrantLockSyncPrinter(lock, conditionC, conditionA));

        printerA.setName("线程A");
        printerB.setName("线程B");
        printerC.setName("线程C");

        printerA.start();
        Thread.sleep(100);
        printerB.start();
        printerC.start();

    }

}

class ReentrantLockSyncPrinter implements Runnable {
    // 初始值
    private static int num = 1;
    private static final int MAX = 60;
    // 打印次数
    private static final int COUNT = 5;
    // 打印锁
    final ReentrantLock reentrantLock;
    // 本线程打印所需的condition
    final Condition currCondition;
    // 下一个线程打印所需要的condition
    final Condition nextCondition;

    public ReentrantLockSyncPrinter(ReentrantLock reentrantLock, Condition currCondition, Condition nextCondition) {
        this.reentrantLock = reentrantLock;
        this.currCondition = currCondition;
        this.nextCondition = nextCondition;
    }

    @Override
    public void run() {
        // 获取锁,进入临界区
        reentrantLock.lock();
        try {
            while (num < MAX) {
                System.out.print(Thread.currentThread().getName() + "-->");
                // 连续打印COUNT次
                for (int i = 0; i < COUNT; i++) {
                    //打印字符
                    System.out.print(num + " ");
                    num++;
                }
                System.out.println();
                // 使用nextCondition唤醒下一个线程
                // 因为只有一个线程在等待,所以signal或者signalAll都可以
                nextCondition.signal();
                try {
                    // 本线程让出锁并等待唤醒
                    currCondition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        } finally {
            // 释放锁
            reentrantLock.unlock();
        }
    }
}

实现一个线程安全的阻塞队列

使用 synchronized、wait、notify

public class ArrayBlockingQueue {
    private Object[] array; //数组
    private int head; //头
    private int tail; //尾
    private volatile int size; //元素个数
 
    public ArrayBlockingQueue(int capacity) {
        this.array = new Object[capacity];
    }
 
    //写入元素
    public synchronized void put(Object o) throws InterruptedException {
        //当队列满时,阻塞
        while (size == array.length) {
            this.wait();
        }
        array[tail++] = o;
        if (tail == array.length) {
            tail = 0;
        }
        size ++;
        //唤醒线程
        this.notifyAll();
    }
 
    //取出元素
    public synchronized Object get() throws InterruptedException {
        //当队列为空,阻塞
        while (size == 0) {
            this.wait();
        }
        Object o = array[head++];
        if (head == array.length) {
            head = 0;
        }
        size --;
        //唤醒线程
        this.notifyAll();
        return o;
    }
 
}

使用 Condition

public class ArrayBlockingQueue {
    private Object[] array; //数组
    private int head; //头
    private int tail; //尾
    private volatile int size; //元素个数
    private ReentrantLock lock = new ReentrantLock(); //锁
    private Condition notEmpty = lock.newCondition(); //非空
    private Condition notFull = lock.newCondition();  //非满
 
    public ArrayBlockingQueue(int capacity) {
        this.array = new Object[capacity];
    }
 
    //写入元素
    public void put(Object o) throws InterruptedException {
        try{
            lock.lock();
            //当队列满时,阻塞
            while (size == array.length) {
                notFull.wait();
            }
            array[tail++] = o;
            if (tail == array.length) {
                tail = 0;
            }
            size ++;
            //唤醒线程
            notEmpty.notifyAll();
        } finally {
            lock.unlock();
        }
    }
 
    //取出元素
    public Object get() throws InterruptedException {
        lock.lock();
        try {
            //当队列为空,阻塞
            while (size == 0) {
                notEmpty.wait();
            }
            Object o = array[head++];
            if (head == array.length) {
                head = 0;
            }
            size --;
            //唤醒线程
            notFull.notifyAll();
            return o;
        } finally {
            lock.unlock();
        }
    }
 
}

终止模式之两阶段终止

两阶段终止模式又叫线程优雅停止。在一个线程 T1 中如何 “优雅” 终止线程 T2?这里的【优雅】指的是给 T2 一个料理后事的机会。

错误方法:使用线程对象的 stop() 方法停止线程:stop 方法会真正杀死线程,如果这时线程锁住了共享资源,那么当它被杀死后就再也没有机会释放锁,其它线程将永远无法获取锁

利用 isInterrupted

interrupt 可以打断正在执行的线程,无论这个线程是在 sleep,wait, 还是正常运行

class WorkerThread {
    private Thread thread;

    public void start() {
        thread = new Thread(() -> {
            while (true) {
                Thread current = Thread.currentThread();
                if (current.isInterrupted()) {
                    System.out.println("料理后事");
                    break;
                }
                try {
                    System.out.println("在努力工作");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    current.interrupt();
                }
            }
        }, "工作线程");
        thread.start();
    }

    public void stop() {
        thread.interrupt();
    }
}

public class Main {
    public static void main(String[] args) throws InterruptedException {
        WorkerThread workerThread = new WorkerThread();
        workerThread.start();
        Thread.sleep(5000);
        workerThread.stop();
    }
}

利用停止标记

class WorkerThread {
    private Thread thread;
    private volatile boolean stop = false;

    public void start() {
        thread = new Thread(() -> {
            while (true) {
                Thread current = Thread.currentThread();
                if (stop) {
                    System.out.println("料理后事");
                    break;
                }
                try {
                    System.out.println("在努力工作");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {}
            }
        }, "工作线程");
        thread.start();
    }

    public void stop() {
        stop = true;
        thread.interrupt();
    }
}

使用 Unsafe 实现 CAS

class MyAtomicInteger {
    private volatile int value;
    //提供底层CAS操作的对象
    private static final Unsafe unsafe;
    //属性在内存中相对于对象的地址偏移量
    private static final long valueOffset;

    static {
        try {
            //getUnsafe()方法不好使,通过反射获取unsafe对象
            Field field = Unsafe.class.getDeclaredField("theUnsafe");
            //Unsafe类中的theUnsafe属性为private,设置可访问权限
            field.setAccessible(true);
            unsafe = (Unsafe) field.get(null);
            //通过unsafe对象提供的方法获取value属性偏移量
            valueOffset = unsafe.objectFieldOffset
                    (MyAtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) {
            throw new Error(ex);
        }
    }

    /**
     * CAS操作方法
     *
     * @param current 旧值
     * @param update  期望值
     * @return 修改是否成功
     */
    public boolean compareAndSet(int current, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, current, update);
    }

    //相当于AtomicInteger中getAndAdd()方法
    public void add() {
        int current;
        do {
            //线程获取当前值
            current = unsafe.getIntVolatile(this, valueOffset);
        } while (!compareAndSet(current, current + 1));//修改失败则重试
    }

    public int getValue() {
        return value;
    }
}

public class Main {

    public static void main(String[] args) throws InterruptedException {
        MyAtomicInteger atomicInteger = new MyAtomicInteger();
        for (int i = 0; i < 20; i++) {
            new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    atomicInteger.add();
                }
            }).start();
        }
        Thread.sleep(1000);//等待所有线程执行完毕再输出
        System.out.println(atomicInteger.getValue());
    }
}

多线程模拟并发售卖

创建若干线程代表车票售卖窗口,每个窗口售卖的车票不能有重复,要求写代码实现。

public class Main {
    public static void main(String[] args) {
        Ticket ticket = new Ticket();
        Thread t1 = new Thread(ticket);
        Thread t2 = new Thread(ticket);
        Thread t3 = new Thread(ticket);
        Thread t4 = new Thread(ticket);
        t1.setName("窗口A");
        t2.setName("窗口B");
        t3.setName("窗口C");
        t4.setName("窗口D");
        t1.start();
        t2.start();
        t3.start();
        t4.start();
    }
}

class Ticket implements Runnable {
    int serialNumber = 1;

    //synchronized为线程加锁,保证每次只有一个线程拿到锁进行售票
    @Override
    public synchronized void run() {
        while (true) {
            if (serialNumber <= 100) {
                System.out.println(Thread.currentThread().getName() + "卖出" + serialNumber + "号票");
                serialNumber++;
            }
            try {
                // 这里wait()休眠一段时间,表示当前线程释放锁,别的线程可以拿到锁进行售票
                // 注意wait()方法必须在synchronized代码里使用,因为想要释放锁的前提是,你必须首先持有锁
                this.wait(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

手写限流算法

限流算法也是面试的重点之一,面试官可能不会要求你手写限流算法,但是你必须知道有哪些限流算法,以及这些 限流算法特点以及优缺点 。下面给出一下参考资料:

参考