异步与多线程

本篇文章的思维导图:

多线程

实现多线程的几种方法:

继承自 Thread

@SpringBootApplication
public class DemoApplication {

  public static void main(String[] args) {
    Thread01 thread01 = new Thread01();
    thread01.run();
  }
}

class Thread01 extends Thread{
  @Override
  public void run() {
    System.out.println("Thread01 运行");
  }
}

实现 Runnable 接口

@SpringBootApplication
public class DemoApplication {

  public static void main(String[] args) {
    Runable01 runable01 = new Runable01();
    new Thread(runable01).start();
  }
}

class Runable01 implements Runnable{
  @Override
  public void run() {
    System.out.println("Runable01 运行中");
  }
}

实现 Callable 接口+ FutureTask

可以拿到返回结果,可以处理异常

@SpringBootApplication
public class DemoApplication {

  public static void main(String[] args) throws  Exception {
    FutureTask<Integer> futureTask = new FutureTask<>(new Callable01());
    new Thread(futureTask).start();
    // 阻塞等待整个线程结束,获取返回结果
    Integer integer = futureTask.get();
    System.out.println("线程返回结果 "+integer);
  }
}

class Callable01 implements Callable<Integer> {
  /**
   * 可以有返回值
   * @return
   * @throws Exception
   */
  @Override
  public Integer call() throws Exception {
    System.out.println("Callable01 运行中");
    return 1;
  }
}

Future 接口如下:

public interface Future<V> {
     // 取消任务
  boolean cancel(boolean mayInterruptIfRunning);

  // 获取任务执行结果
  V get() throws InterruptedException, ExecutionException;

  // 获取任务执行结果,带有超时时间限制
  V get(long timeout, TimeUnit unit) throws InterruptedException,                             ExecutionException,  TimeoutException;

  // 判断任务是否已经取消
  boolean isCancelled();

  // 判断任务是否已经结束
  boolean isDone();
}

有关 FutureTask 的更多信息推荐阅读,不会用Java Future,我怀疑你泡茶没我快

使用线程池

@SpringBootApplication
public class DemoApplication {

  public static void main(String[] args) throws  Exception {

    ThreadPoolExecutor executor = new ThreadPoolExecutor(
        5,
        200,
        10,
        TimeUnit.SECONDS,
        new LinkedBlockingDeque<>(1000),
        Executors.defaultThreadFactory(),
        new ThreadPoolExecutor.AbortPolicy()
    );
    executor.execute(()->{
        try {
          System.out.println("thread01 开始执行");
          Thread.sleep(10000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    );
    executor.execute(()->{
      try {
        System.out.println("thread02 开始执行");
        Thread.sleep(20000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    });
  }
}

接下来我将具体介绍线程池。

线程池

// ThreadPoolExecutor 构造函数
public ThreadPoolExecutor(
    int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory,
    RejectedExecutionHandler handler
)

ThreadPoolExecutor 的参数:

  • corePoolSize: 池中一直保持的线程的数量,即使线程空闲。除非设置了allowCoreThreadTimeOut
  • maximumPoolSize:池中允许的最大的线程数
  • keepAliveTime:明白 keepAliveTime 需要先明白什么叫救急线程,当当前需要执行的线程数大于核心线程数+阻塞队列的线程数时,多出来的线程会直接转交给救济线程来执行。这些执行完毕之后救急线程就空闲下来了,空闲 keepAliveTime 之后救急线程就会被释放。最终线程池维持在corePoolSize大小
  • unit:时间单位
  • workQueue:阻塞队列,用来存储等待执行的任务,如果当前对线程的需求超过了corePoolSize 大小,就会放在这里等待空闲线程执行。
  • threadFactory:创建线程的工厂,比如指定线程名等
  • handler:拒绝策略,如果线程满了,线程池就会使用拒绝策略

Executor 框架提供了多种工厂方法来获取不同类型的线程池:

  1. newCachedThreadPool :创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程
  2. newFixedThreadPool:创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待
  3. newScheduledThreadPool:创建一个定长线程池,支持定时及周期性任务执行
  4. newSingleThreadExecutor:创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行

既然 Executor 可以直接得到一个线程池,我们为什么还要使用 ThreadPoolExecutor 来创建线程池?在阿里巴巴Java开发手册中对这一点做出了解释

线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

因为这种方式会使用无界的任务队列,为避免OOM,我们应该使用ThreadPoolExecutor的构造方法手动指定队列的最大长度。

开发中使用线程池的好处:

  1. 降低资源的消耗:通过重复利用已经创建好的线程降低线程的创建和销毁带来的损耗
  2. 提高响应速度:因为线程池中的线程数没有超过线程池的最大上限时,有的线程处于等待分配任务 的状态,当任务来时无需创建新的线程就能执行
  3. 提高线程的可管理性:线程池会根据当前系统特点对池内的线程进行优化处理,减少创建和销毁线程带来 的系统开销。无限的创建和销毁线程不仅消耗系统资源,还降低系统的稳定性,使用线程池进行统一分配。

在实际开发中应该 ThreadPoolExecutor 的构造参数应该从配置文件中读取。

@ConfigurationProperties(prefix = "gulimall.thread")
@Data
@Component
public class ThreadPoolConfigProperties{
  private Integer coreSize;
  private Integer maxSize;
  private Integer keepAliveSeconds;
}

线程池应当要放入到容器中:

@Configuration
 class MyThreadConfig{
  @Bean
  public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties pool) {
      return new ThreadPoolExecutor(pool.getCoreSize(),
          pool.getMaxSize(),
          pool.getKeepAliveSeconds(),
          TimeUnit.SECONDS,new LinkedBlockingDeque<>(1000),
          Executors.defaultThreadFactory(),
          new ThreadPoolExecutor.AbortPolicy());
  }
 }

CompletableFuture 异步编排

在电商系统中,获取一个商品信息一般有以下几个步骤:

  1. 获取 sku 的基本信息
  2. 获取 sku 的图片信息
  3. 获取 sku 的促销信息
  4. 获取 spu 的所有销售属性
  5. 获取规格参数及组下的规格参数
  6. spu 详情

假如商品详情页的每个查询,需要每个步骤的时间的总和,效率十分低,可以采用多线程的方式来完成这些步骤,提高响应时间。

使用多线程来实现以上的步骤需要解决的是这些步骤之间的依赖关系,比如,在 sku 的基本信息没有获取之前是无法获取 sku 的促销信息的。因此我们需要解决这些步骤的编排问题,称为 异步编排。

用更专业的术语来讲,所谓异步调用其实就是实现一个可无需等待被调用函数的返回值而让操作继续运行的方法。在 Java 语言中,简单的讲就是另启一个线程来完成调用中的部分计算,使调用继续运行或返回,而不需要等待计算结果。但调用者仍需要取线程的计算结果。

创建异步对象

@SpringBootApplication
public class DemoApplication {

   public static void main(String[] args) throws  Exception {
      // 在项目中应该采用 注入的方式注入线程池
      ThreadPoolExecutor executor = new ThreadPoolExecutor(
            5,
            200,
            10,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque<>(1000),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy()
      );

      CompletableFuture.runAsync(() -> {
         System.out.println("没有返回值");
      },executor);
      CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
         System.out.println("有返回值");
         return 1;
      }, executor);
   }
}
  1. runXxxx 都是没有返回结果的,supplyXxx 都是可以获取返回结果的
  2. 可以传入自定义的线程池,否则就用默认的线程池;

计算完成时回调方法

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)

whenComplete 可以处理正常和异常的计算结果,exceptionally 处理异常情况。

whenComplete 和 whenCompleteAsync 的区别:

  • whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。
  • WhenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来执行

方法不以 Async 结尾,意味着 Action 使用相同的线程执行,而 Async 可能会使用其他线程 执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)

// 计算完成时回调方法
CompletableFuture
	.supplyAsync(() -> {
		System.out.println("有返回值");return 1;
	},executor)
	.whenComplete((res,exc)-> {
		System.out.println("返回值为 "+res);
		System.out.println("异常为 "+exc);
	})
	.exceptionally(exc->{
		System.out.println("发生异常");
		return 1;
	});

Handle 方法

public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Execut

测试代码如下

CompletableFuture<Integer> fu = CompletableFuture
			.supplyAsync(() -> {
				System.out.println("有返回值");
				return 1;
			}, executor)
			.handle((res, exc) -> {
				System.out.println("返回值是 " + res);
				System.out.println("异常是 " + exc);
				return res;
			});
System.out.println("返回值是: "+fu.get());

线程串行化方法

带有 Async 默认是异步执行的。同之前。

Function<? super T,? extends U>

  • T:上一个任务返回结果的类型
  • U:当前任务的返回值类型

thenApply

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。

// 计算完成时回调方法
CompletableFuture<Integer> fu = CompletableFuture
		.supplyAsync(() -> {
			System.out.println("有返回值");
			return 1;
		}, executor)
		.thenApply((res1)->{
			System.out.println("thread01 的返回值是: "+res1);
			return 2;
		});
System.out.println("thread02 的返回值是: "+fu.get());

thenAccept

public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);

thenAccept 方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。

CompletableFuture
			.supplyAsync(() -> {
				System.out.println("thread01 有返回值");
				return 1;
			}, executor)
			.thenAccept((res)->{
				System.out.println("thread01 的返回值: "+res);
			});

thenRun

public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);

thenRun 方法:只要上面的任务执行完成,就开始执行 thenRun,只是处理完任务后,执行 thenRun 的后续操作。

CompletableFuture
			.runAsync(() -> {
				System.out.println("thread01 无返回值");
			}, executor)
			.thenRun(()->{
				System.out.println("thread02 无返回值");
			});

两两任务组合-都要完成

thenCombine

public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);

组合两个 future,获取两个 future 的返回结果,并返回当前任务的返回值。

CompletableFuture<Integer> thread01 = CompletableFuture
		.supplyAsync(() -> {
			System.out.println("thread01");
			return 1;
		}, executor);

CompletableFuture<Integer> future2 = CompletableFuture
		.supplyAsync(() -> {
			System.out.println("thread02");
			return 2;
		}, executor)
		.thenCombine(thread01,(res1,res2)->{
			System.out.println("thread01 的返回值是:"+res1);
			System.out.println("thread02 的返回值是: "+res2);
			return 3;
		});
System.out.println("future2 的返回值是: "+future2.get());

thenAcceptBoth

public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action,     Executor executor);

组合两个 future,获取两个 future 任务的返回结果,然后处理任务,没有返回值。

CompletableFuture<Integer> thread01 = CompletableFuture
		.supplyAsync(() -> {
			System.out.println("thread01");
			return 1;
		}, executor);

CompletableFuture<Void> future2 = CompletableFuture
		.supplyAsync(() -> {
			System.out.println("thread02");
			return 2;
		}, executor)
		.thenAcceptBoth(thread01,(res1,res2)->{
			System.out.println("thread01 的返回值是:"+res1);
			System.out.println("thread02 的返回值是: "+res2);
		});

runAfterBoth

public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);

组合两个 future,不需要获取 future 的结果,只需两个 future 处理完任务后, 处理该任务。

CompletableFuture<Void> thread01 = CompletableFuture
		.runAsync(() -> {
			System.out.println("thread01");
			try {
				Thread.sleep(10000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}, executor);
CompletableFuture<Void> thread02 = CompletableFuture
		.runAsync(() -> {
			System.out.println("thread02");
			try {
				Thread.sleep(10000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}, executor).runAfterBoth(thread01,()->{
			System.out.println("thread01 和 thread02 都执行完了");
		});

两两任务组合-一个完成

当两个任务中,任意一个 future 任务完成的时候,执行任务。

applyToEither

两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。

public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? sup

acceptEither

两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。

public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? supe

runAfterEither

两个任务有一个执行完成,不需要获取 future 的结果,处理任务,也没有返回值。

public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);

多任务组合

allOf

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);
CompletableFuture<Integer> future1 = CompletableFuture
      .supplyAsync(() -> {
         System.out.println("thread01");
         return 1;
      }, executor);

CompletableFuture<Integer> future2 = CompletableFuture
      .supplyAsync(() -> {
         System.out.println("thread02");
         return 2;
      }, executor);

CompletableFuture<Void> future = CompletableFuture.allOf(future2, future1);

future.get();
System.out.println("future1 done");
System.out.println("future2 done");

anyOf

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);
CompletableFuture<Integer> future1 = CompletableFuture
      .supplyAsync(() -> {
         System.out.println("thread01");
         return 1;
      }, executor);

CompletableFuture<Integer> future2 = CompletableFuture
      .supplyAsync(() -> {
         System.out.println("thread02");
         return 2;
      }, executor);

CompletableFuture<Object> future = CompletableFuture.anyOf(future2, future1);

参考资料

07、异步&线程池.pdf

Java线程池实现原理及其在美团业务中的实践