前言 在介绍线程池之前,并发编程我们先回顾下线程的程池基本知识。其中线程池包括ThreadPoolExecutor 默认线程和ScheduledThreadPoolExecutor 定时线程池 ,原理本篇重点介绍ThreadPoolExecutor线程池。解析 线程 线程是并发编程调度CPU资源的最小单位,线程模型分为KLT模型与ULT模型,程池JVM使用的原理是KLT模型,Java线程与OS线程保持 1:1 的解析映射关系,也就是并发编程说有一个Java线程也会在操作系统里有一个对应的线程。 内核线程模型 内核线程(KLT):系统内核管理线程(KLT),程池内核保存线程的原理状态和上下文信息,线程阻塞不会引起进程阻塞。解析在多处理器系统上,并发编程多线程在多处理器上并行运行。程池线程的原理创建、调度和管理由内核完成,效率比ULT要慢,比进程操作快。 用户线程模型 用户线程(ULT):用户程序实现,不依赖操作系统核心,应用提供创建、同步、调度和管理线程的函数来控制用户线程。高防服务器不需要用户态/内核态切换,速度快。内核对ULT无感知,线程阻塞则进程(包括它的所有线程)阻塞。 Java线程生命状态 Java线程有多种生命状态: 状态切换如下图所示: Java线程实现方式 Java线程实现方式主要有四种: 其中前两种方式线程执行完后都没有返回值,后两种是带返回值的。 继承Thread类创建线程 Thread类本质上是实现了Runnable接口的一个实例,代表一个线程的实例。启动线程的唯一方法就是通过Thread类的start()实例方法。start()方法是一个native方法,它将启动一个新线程,亿华云计算并执行run()方法。这种方式实现多线程很简单,通过自己的类直接extend Thread,并复写run()方法,就可以启动新线程并执行自己定义的run()方法。例如: 实现Runnable接口创建线程 如果自己的类已经extends另一个类,就无法直接extends Thread,此时,可以实现一个Runnable接口,如下: // 实现Runnable接口的类将被Thread执行,表示一个基本的任务 public interface Runnable { // run方法就是它所有的内容,就是实际执行的任务 public abstract void run(); } 为了启动MyThread,需要首先实例化一个Thread,并传入自己的MyThread实例: 事实上,当传入一个Runnable target参数给Thread后,Thread的run()方法就会调用target.run(),参考JDK源代码: 实现Callable接口通过FutureTask包装器来创建Thread线程 Callable接口(也只有一个方法)定义如下: public interface Callable //Callable同样是任务,与Runnable接口的区别在于它接收泛型,网站模板同时它执行任务后带有返回内容 public class SomeCallable 使用ExecutorService、Callable、Future实现有返回结果的线程 ExecutorService、Callable、Future三个接口实际上都是属于Executor框架。返回结果的线程是在JDK1.5中引入的新特征,有了这种特征就不需要再为了得到返回值而大费周折了。而且自己实现了也可能漏洞百出。(下部分来讲线程池了) 执行Callable任务后,可以获取一个Future的对象,在该对象上调用get就可以获取到Callable任务返回的Object了。 再结合线程池接口ExecutorService就可以实现传说中有返回结果的多线程了。 下面提供了一个完整的有返回结果的多线程测试例子。代码如下: 协程 协程(纤程,用户级线程),目的是为了追求最大力度的发挥硬件性能和提升软件的速度,协程基本原理是:在某个点挂起当前的任务,并且保存栈信息,去执行另一个任务;等完成或达到某个条件时,再还原原来的栈信息并继续执行(整个过程不需要上下文切换)。 协程的概念很早就提出来了,但直到最近几年才在某些语言(如Lua)中得到广泛应用。 协程的目的:当我们在使用多线程的时候,如果存在长时间的I/O操作。这个时候线程一直处于阻塞状态,如果线程很多的时候,会存在很多线程处于空闲状态,造成了资源应用不彻底。相对的协程不一样了,在单线程中多个任务来回执行如果出现长时间的I/O操作,让其让出目前的协程调度,执行下一个任务。当然可能所有任务,全部卡在同一个点上,但是这只是针对于单线程而言,当所有数据正常返回时,会同时处理当前的I/O操作。 Java原生不支持协程,在纯java代码里需要使用协程的话需要引入第三方包,如:quasar 线程池 “线程池”,顾名思义就是一个线程缓存,线程是稀缺资源,如果被无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,因此 Java 中提供线程池对线程进行同一分配、调优和监控。 线程池介绍 在web开发中,服务器需要接受并处理请求,所以会为一个请求分配一个线程来进行处理。如果每次请求都创建一个线程的话实现起来非常简单,但是存在一个问题:如果并发的请求数量非常多,但每个线程执行的时间很短,这样就会频繁的创建和销毁线程,如此一来会大大降低系统的效率。可能出现服务器在为每个请求创建新线程和销毁线程上花费的时间和消耗的系统资源要比处理实际的用户请求的时间和资源更多。 那么有没有一种办法使执行完一个任务,并不被销毁,而是可以继续执行其他的任务呢? 这就是线程池的目的。线程池为线程生命周期的开销和资源不足问题提供了解决方案。通过对多个任务重用线程,线程创建的开销被分摊到多个任务上。 什么时候使用线程池? 线程池优势 Executor框架 Executor接口是线程池框架中最基础的部分,定义来一个用于执行 Runnable 的 execute 方法。下面为它的继承与实现 ExecutorService接口 从图中可以看出 Executor 下有一个重要的子接口 ExecutorService ,其中定义来线程池的具体行为 AbstractExcutorService抽象类 此类的定义并没有特殊的意义仅仅是实现了ExecutorService接口 线程池的具体实现 ThreadPoolExecutor 线程池重点属性 ctl是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它包含两部分的信息: 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),这里可以看到,使用了Integer类型来保存,高3位保存runState,低29位保存workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位减1(29个1),这个常量表示workerCount的上限值,大约是5亿。 ctl相关方法 线程池存在5种状态 使用一个整形,前3位表示状态,后29位表示线程容量,也就是说线程最多有 230−1 个 也可以看出当ctl小于零表示线程池仍在运行 RUNNING SHUTDOWN STOP TIDYING TERMINATED 进入TERMINATED的条件如下: 线程池参数 corePoolSize 线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。 maximumPoolSize 线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize; keepAliveTim 线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime; unit keepAliveTime的单位; workQueue 用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口,在JDK中提供了如下阻塞队列: 1、ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务; 2、LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene; 3、SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene; 4、priorityBlockingQuene:具有优先级的无界阻塞队列; threadFactory 它是ThreadFactory类型的变量,用来创建新线程。默认使用Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置了线程的名称。 handler 线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略: 上面的4种策略都是ThreadPoolExecutor的内部类。 当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。 线程池的创建 有四个构造函数,其他三个都是调用下面代码中的这个构造函数 线程池监控 线程池原理 核心方法分析 由于篇幅有限,核心方法解析请阅读文末的扩展链接。 PS:以上代码提交在 Github : https://github.com/Niuh-Study/niuh-juc-final.gitexecute(Runnable command):履行Ruannable类型的任务; submit(task):可用来提交Callable或Runnable任务,并返回代表此任务的Future对象; shutdown():在完成已提交的任务后封闭办事,不再接管新任务; shutdownNow():停止所有正在履行的任务并封闭办事; isTerminated():测试是否所有任务都履行完毕了; isShutdown():测试是否该ExecutorService已被关闭; awaitTermination(long,TimeUnit):接收timeout和TimeUnit两个参数,用于设定超时时间及单位。当等待超过设定时间时,会监测ExecutorService是否已经关闭,若关闭则返回true,否则返回false。一般情况下会和shutdown方法组合使用; invokeAll :作用是等待所有的任务执行完成后统一返回; invokeAny :将第一个得到的结果作为返回值,然后立刻终止所有的线程。如果设置了超时时间,未超时完成则正常返回结果,如果超时未完成则报超时异常。
public abstract class AbstractExecutorService implements ExecutorService { //此方法很简单就是对runnable保证,将其包装为一个FutureTask protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } //包装callable为FutureTask //FutureTask其实就是对Callable的一个封装 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); } //提交一个Runnable类型的任务 public Future<?> submit(Runnable task) { //如果为null则抛出NPE if (task == null) throw new NullPointerException(); //包装任务为一个Future RunnableFuture<Void> ftask = newTaskFor(task, null); //将任务丢给执行器,而此处会抛出拒绝异常,在讲述ThreadPoolExecutor的时候有讲述,不记得的读者可以去再看看 execute(ftask); return ftask; } //与上方方法相同只不过指定了返回结果 public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } //与上方方法相同只是换成了callable public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } //执行集合tasks结果是最后一个执行结束的任务结果 //可以设置超时 timed为true并且nanos是未来的一个时间 //任何一个任务完成都将会返回结果 private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException { //传入的任务集合不能为null if (tasks == null) throw new NullPointerException(); //传入的任务数不能是0 int ntasks = tasks.size(); if (ntasks == 0) throw new IllegalArgumentException(); //满足上面的校验后将任务分装到一个ArrayList中 ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks); //并且创建一个执行器传入this //这里简单讲述他的执行原理,传入this会使用传入的this(类型为Executor)作为执行器用于执行任务,当submit提交任务的时候回将任务 //封装为一个内部的Future并且重写他的done而此方法就是在future完成的时候调用的,而他的写法则是将当前完成的future添加到esc //维护的结果队列中 ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this); try { //创建一个执行异常,以便后面抛出 ExecutionException ee = null; //如果开启了超时则计算死线时间如果时间是0则代表没有开启执行超时 final long deadline = timed ? System.nanoTime() + nanos : 0L; //获取任务的迭代器 Iterator<? extends Callable<T>> it = tasks.iterator(); //先获取迭代器中的第一个任务提交给前面创建的ecs执行器 futures.add(ecs.submit(it.next())); //前面记录的任务数减一 --ntasks; //当前激活数为1 int active = 1; //进入死循环 for (;;) { //获取刚才提价的任务是否完成如果完成则f不是null否则为null Future<T> f = ecs.poll(); //如果为null则代表任务还在继续 if (f == null) { //如果当前任务大于0 说明除了刚才的任务还有别的任务存在 if (ntasks > 0) { //则任务数减一 --ntasks; //并且再次提交新的任务 futures.add(ecs.submit(it.next())); //当前的存活的执行任务加一 ++active; } //如果当前存活任务数是0则代表没有任务在执行了从而跳出循环 else if (active == 0) break; //如果当前任务执行设置了超时时间 else if (timed) { //则设置指定的超时时间获取 f = ecs.poll(nanos, TimeUnit.NANOSECONDS); //等待执行超时还没有获取到则抛出超时异常 if (f == null) throw new TimeoutException(); //否则使用当前时间计算剩下的超时时间用于下一个循环使用 nanos = deadline - System.nanoTime(); } //如果没有设置超时则直接获取任务 else f = ecs.take(); } //如果获取到了任务结果f!=null if (f != null) { //激活数减一 --active; try { //返回获取到的结果 return f.get(); //如果获取结果出错则包装异常 } catch (ExecutionException eex) { ee = eex; } catch (RuntimeException rex) { ee = new ExecutionException(rex); } } } //如果异常不是null则抛出如果是则创建一个 if (ee == null) ee = new ExecutionException(); throw ee; } finally { //其他任务则设置取消 for (int i = 0, size = futures.size(); i < size; i++) futures.get(i).cancel(true); } } //对上方方法的封装 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { try { return doInvokeAny(tasks, false, 0); } catch (TimeoutException cannotHappen) { assert false; return null; } } //对上方法的封装 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return doInvokeAny(tasks, true, unit.toNanos(timeout)); } //相对于上一个方法执行成功任何一个则返回结果而此方法是全部执行完然后统一返回结果 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { //传入的任务集合不能是null if (tasks == null) throw new NullPointerException(); //创建一个集合用来保存获取到的执行future ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); //任务是否执行完成 boolean done = false; try { //遍历传入的任务并且调用执行方法将创建的future添加到集合中 for (Callable<T> t : tasks) { RunnableFuture<T> f = newTaskFor(t); futures.add(f); execute(f); } //遍历获取到的future for (int i = 0, size = futures.size(); i < size; i++) { Future<T> f = futures.get(i); //如果当前任务没有成功则进行f.get方法等待此方法执行成功,如果方法执行异常或者被取消将忽略异常 if (!f.isDone()) { try { f.get(); } catch (CancellationException ignore) { } catch (ExecutionException ignore) { } } } //到这一步则代表所有的任务都已经有了确切的结果 done = true; //返回任务结果集合 return futures; } finally { //如果不是true是false 则代表执行过程中被中断了则需要对任务进行取消操作,如果正常完成则不会被取消 if (!done) for (int i = 0, size = futures.size(); i < size; i++) futures.get(i).cancel(true); } } //与上方方法的区别在于对于任务集合可以设置超时时间 //这里会针对差异进行讲解 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { if (tasks == null) throw new NullPointerException(); //计算设置时长的纳秒时间 long nanos = unit.toNanos(timeout); ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); boolean done = false; try { for (Callable<T> t : tasks) futures.add(newTaskFor(t)); //计算最终计算的确切时间点,运行时长不能超过此时间也就是时间死线 //这里是个细节future创建的时间并没有算作执行时间 final long deadline = System.nanoTime() + nanos; //获取当前结果数 final int size = futures.size(); //遍历将任务进行执行 for (int i = 0; i < size; i++) { execute((Runnable)futures.get(i)); //并且每次都计算死线 nanos = deadline - System.nanoTime(); //如果时间已经超过则返回结果 if (nanos <= 0L) return futures; } //否则遍历future确定每次执行都获取到了结果 for (int i = 0; i < size; i++) { Future<T> f = futures.get(i); if (!f.isDone()) { //如果在等待过程中已经超时则返回当前等待结合 if (nanos <= 0L) return futures; try { //如果没有超过死线则设置从future中获取结果的时间如果超过则会派出timeout f.get(nanos, TimeUnit.NANOSECONDS); } catch (CancellationException ignore) { } catch (ExecutionException ignore) { } catch (TimeoutException toe) { //抛出了异常则会返回当前的列表 return futures; } //计算最新的超时时间 nanos = deadline - System.nanoTime(); } } //之前的返回都没有设置为true所以在finally中都会设置为取消唯独正常执行完成到此处返回的结果才是最终的结果 done = true; return futures; } finally { if (!done) for (int i = 0, size = futures.size(); i < size; i++) futures.get(i).cancel(true); } } }
ThreadPoolExecutor 默认线程池 ScheduledThreadPoolExecutor 定时线程池 (下篇再做介绍)