新网创想网站建设,新征程启航
为企业提供网站建设、域名注册、服务器等服务
Java并发编程中线程池工作原理的示例分析,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
创新互联建站公司2013年成立,先为衡阳县等服务建站,衡阳县等地企业,进行企业商务咨询服务。为衡阳县企业网站制作PC+手机+微官网三网同步一站式服务解决您的所有建站问题。
ThreadPoolExecutor实现的顶层接口是Executor,在接口Executor中用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器Executor中,由Executor框架完成线程的调配和任务的执行部分。
ExecutorService接口增加了一些能力
扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法;
提供了管控线程池的方法,比如停止线程池的运行。
AbstractExecutorService则是上层的抽象类:
将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。
ThreadPoolExecutor实现最复杂的运行部分:
可以自动创建、管理和复用指定数量的一组线程,适用方只需提交任务即可线程安全,ThreadPoolExecutor内部有状态、核心线程数、非核心线程等属性,广泛使用了CAS和AQS锁机制避免并发带来的冲突问题
提供了核心线程、缓冲阻塞队列、非核心线程、抛弃策略的概念,可以根据实际应用场景进行组合使用
提供了beforeExecute 和afterExecute()可以支持对线程池的功能进行扩展
提高响应速度:任务到达时,相对于手工创建一个线程,直接从线程池中拿线程,速度肯定快很多
提高线程可管理性:线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统稳定性,使用线程池可以进行同意分配、调优和监控。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueueworkQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
corePoolSize:线程池的核心线程数,一般情况下不管有没有任务都会一直在线程池中一直存活,只有在 ThreadPoolExecutor中的方法allowCoreThreadTimeOut(boolean value)设置为true时,闲置的核心线程会存在超时机制,如果在指定时间没有新任务来时,核心线程也会被终止,而这个时间间隔由第3个属性keepAliveTime指定。
maximumPoolSize:线程池所能容纳的最大线程数,当活动的线程数达到这个值后,后续的新任务将会被阻塞。
keepAliveTime:控制线程闲置时的超时时长,超过则终止该线程。一般情况下用于非核心线程,只有在 ThreadPoolExecutor中的方法allowCoreThreadTimeOut(boolean value)设置为true时,也作用于核心线程。
unit:用于指定keepAliveTime参数的时间单位,TimeUnit是个enum枚举类型,常用的有:TimeUnit.HOURS(小时)、TimeUnit.MINUTES(分钟)、TimeUnit.SECONDS(秒) 和 TimeUnit.MILLISECONDS(毫秒)等。
workQueue:线程池的任务队列,通过线程池的execute(Runnable command)方法会将任务Runnable存储在队列中。
threadFactory:线程工厂,它是一个接口,用来为线程池创建新线程的。
handler:拒绝策略,所谓拒绝策略,是指将任务添加到线程池中时,线程池拒绝该任务所采取的相应策略。
/** * 任务阻塞队列 */ private final BlockingQueueworkQueue; /** * 非公平的互斥锁(可重入锁) */ private final ReentrantLock mainLock = new ReentrantLock(); /** * 线程集合一个Worker对应一个线程,没有核心线程的说话,只有核心线程数 */ private final HashSet workers = new HashSet (); /** * 配合mainLock通过Condition能够更加精细的控制多线程的休眠与唤醒 */ private final Condition termination = mainLock.newCondition(); /** * 线程池中线程数量曾经达到过的最大值。 */ private int largestPoolSize; /** * 已完成任务数量 */ private long completedTaskCount; /** * ThreadFactory对象,用于创建线程。 */ private volatile ThreadFactory threadFactory; /** * 拒绝策略的处理句柄 * 现在默认提供了CallerRunsPolicy、AbortPolicy、DiscardOldestPolicy、DiscardPolicy */ private volatile RejectedExecutionHandler handler; /** * 线程池维护线程(超过核心线程数)所允许的空闲时间 */ private volatile long keepAliveTime; /** * 允许线程池中的核心线程超时进行销毁 */ private volatile boolean allowCoreThreadTimeOut; /** * 线程池维护线程的最小数量,哪怕是空闲的 */ private volatile int corePoolSize; /** * 线程池维护的最大线程数量,线程数超过这个数量之后新提交的任务就需要进入阻塞队列 */ private volatile int maximumPoolSize;
核心线程数为0,总线程数量阈值为Integer.MAX_VALUE,即可以创建无限的非核心线程
newCachedThreadPool是一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。对于执行很多短期异步任务的程序而言,这些线程池通常可提高程序性能。调用 execute() 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60秒钟未被使用的线程。因此,长时间保持空闲的线程池不会使用任何资源。注意,可以使用 ThreadPoolExecutor构造方法创建具有类似属性但细节不同(例如超时参数)的线程池。
会出下面大量的线程对象,导致的OOM
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); }
先执行SynchronousQueue的offer方法提交任务,并查询线程池中是否有空闲线程来执行SynchronousQueue的poll方法来移除任务。如果有,则配对成功,将任务交给这个空闲线程,否则,配对失败,创建新的线程去处理任务
当线程池中的线程空闲时,会执行SynchronousQueue的poll方法等待执行SynchronousQueue中新提交的任务。若等待超过60s,空闲线程就会终止
执行大量短生命周期任务。因为maximumPoolSize是无界的,所以提交任务的速度 > 线程池中线程处理任务的速度就要不断创建新线程;每次提交任务,都会立即有线程去处理,因此CachedThreadPool适用于处理大量、耗时少的任务。
它适用于需要保证顺序地执行各个任务;并且在任意时间点,不会有多个线程是活动的应用场景,SingleThreadExecutor的corePoolSize和maximumPoolSize被设置为1,使用无界队列LinkedBlockingQueue作为线程池的工作队列
newSingleThreadExecutor创建是一个单线程池,也就是该线程池只有一个线程在工作,所有的任务是串行执行的,如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它,此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
当线程池中没有线程时,会创建一个新线程来执行任务。
当前线程池中有一个线程后,将新任务加入LinkedBlockingQueue
线程执行完第一个任务后,会在一个无限循环中反复从LinkedBlockingQueue获取任务来执行 。
**适用于串行执行任务场景**
会存在出现阻塞队列堆积过大,导致的OOM
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); }
corePoolSize等于maximumPoolSize,所以线程池中只有核心线程,使用无界阻塞队列LinkedBlockingQueue作为工作队列
适用于处理CPU密集型的任务,确保CPU在长期被工作线程使用的情况下,尽可能的少的分配线程,即适用执行长期的任务。
newFixedThreadPool:创建固定大小的线程池,每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小,线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。当线程处于空闲状态时,他们并不会被回收,除非线程池被关闭。当所有的线程都处于活动状态时,新的任务都会处于等待状态,直到有线程空闲出来。
会存在出现阻塞队列堆积大,导致的OOM
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory); }
newScheduledThreadPool创建一个大小无限的线程池,此线程池支持定时以及周期性执行任务的需求。
线程总数阈值为Integer.MAX_VALUE,工作队列使用DelayedWorkQueue,非核心线程存活时间为0,所以线程池仅仅包含固定数目的核心线程。
会存在出现阻塞队列堆积过大,导致的OOM
public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory); }
可以看出来上面的方法一共使用了DelayedWorkQueue、LinkedBlockingQueue和 SynchronousQueue。这个就是线程核心之一的阻塞队列。
scheduleAtFixedRate: 按照固定速率周期执行
scheduleWithFixedDelay:上个任务延迟固定时间后执行
它一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列;
直接提交队列:设置为SynchronousQueue队列,SynchronousQueue是一个特殊的BlockingQueue,它没有容量,每执行一个插入操作就会阻塞,需要再执行一个删除操作才会被唤醒,反之每一个删除操作也都要等待对应的插入操作。
一个不存储元素的阻塞队列,每个插入操作,必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态
SynchronousQueue队列,提交的任务不会被保存,总是会马上提交执行。如果用于执行任务的线程数量小于maximumPoolSize,则尝试创建新的进程,如果达到maximumPoolSize设置的最大值,则根据你设置的handler执行拒绝策略。因此这种方式你提交的任务不会被缓存起来,而是会被马上执行,在这种情况下,你需要对你程序的并发量有个准确的评估,才能设置合适的maximumPoolSize数量,否则很容易就会执行拒绝策略;
有界的任务队列:有界的任务队列可以使用ArrayBlockingQueue实现,如下所示:
new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(10),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
使用ArrayBlockingQueue有界任务队列,若有新的任务需要执行时,线程池会创建新的线程,直到创建的线程数量达到corePoolSize时,则会将新的任务加入到等待队列中。若等待队列已满,即超过ArrayBlockingQueue初始化的容量,则继续创建线程,直到线程数量达到maximumPoolSize设置的最大线程数量,若大于maximumPoolSize,则执行拒绝策略。
在这种情况下,线程数量的上限与有界任务队列的状态有直接关系,如果有界队列初始容量较大或者没有达到超负荷的状态,线程数将一直维持在corePoolSize以下,反之当任务队列已满时,则会以maximumPoolSize为最大线程数上限。
无界的任务队列:无界任务队列可以使用LinkedBlockingQueue实现,如下所示:
new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
使用无界任务队列,线程池的任务队列可以无限制的添加新的任务,而线程池创建的最大线程数量就是你corePoolSize设置的数量,也就是说在这种情况下maximumPoolSize这个参数是无效的,哪怕你的任务队列中缓存了很多未执行的任务,当线程池的线程数达到corePoolSize后,就不会再增加了;若后续有新的任务加入,则直接进入队列等待,当使用这种任务队列模式时,一定要注意你任务提交与处理之间的协调与控制,不然会出现队列中的任务由于无法及时处理导致一直增长,直到最后资源耗尽的问题。
优先任务队列:优先任务队列通过PriorityBlockingQueue实现,使用平衡二叉树堆,实现的具有优先级的无界阻塞队列
任务会按优先级重新排列执行,且线程池的线程数一直为corePoolSize,也就是只有一个。
PriorityBlockingQueue其实是一个特殊的无界队列,它其中无论添加了多少个任务,线程池创建的线程数也不会超过corePoolSize的数量,只不过其他队列一般是按照先进先出的规则处理任务,而PriorityBlockingQueue队列可以自定义规则根据任务的优先级顺序先后执行。
其实LinkedBlockingQueue也是可以设置界限的,它默认的界限是Integer.MAX_VALUE。同时也支持也支持构造的时候设置队列大小。
无界阻塞延迟队列,队列中每个元素均有过期时间,当从队列获取元素时,只有过期元素才会出队列。队列头元素是最块要过期的元素。
public interface RejectedExecutionHandler { void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }
当Executor已经关闭,即执行了executorService.shutdown()方法后,或者Executor将有限边界用于最大线程和工作队列容量,且已经饱和时。使用方法execute()提交的新任务将被拒绝. 在以上述情况下,execute方法将调用其RejectedExecutionHandler的rejectExecution()方法
RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor)
也称为终止策略,遭到拒绝将抛出运行时RejectedExecutionException。业务方能通过捕获异常及时得到对本次任务提交的结果反馈。
public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
拥有自主反馈控制,让提交者执行提交任务,能够减缓新任务的提交速度。这种情况是需要让所有的任务都执行完毕。
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
拒绝任务的处理程序,静默丢弃任务。使用此策略,我们可能无法感知系统的异常状态。慎用~!
public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
丢弃队列中最前面的任务,然后重新提交被拒绝的任务。是否要使用此策略需要看业务是否需要新老的替换,慎用~!(LRU)
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
判断线程池中核心线程数是否已达阈值corePoolSize,若否,则创建一个新核心线程执行任务
若核心线程数已达阈值corePoolSize,判断阻塞队列workQueue是否已满,若未满,则将新任务添加进阻塞队列
若满,再判断,线程池中线程数是否达到阈值maximumPoolSize,若否,则新建一个非核心线程执行任务。若达到阈值,则执行线程池饱和策略。
AbortPolicy:直接抛出一个异常,默认策略
DiscardPolicy: 直接丢弃任务
DiscardOldestPolicy:抛弃下一个将要被执行的任务(最旧任务)
CallerRunsPolicy:主线程中执行任务
要想合理的配置线程池,就必须首先分析任务特性,可以从以下几个角度来进行分析:
任务的性质:CPU密集型任务,IO密集型任务和混合型任务。
任务的优先级:高,中和低。
任务的执行时间:长,中和短。
任务的依赖性:是否依赖其他系统资源,如数据库连接。
根据任务所需要的cpu和io资源的量可以分为,
CPU密集型任务: 主要是执行计算任务,响应时间很快,cpu一直在运行,这种任务cpu的利用率很高。
IO密集型任务:主要是进行IO操作,执行IO操作的时间较长,这是cpu出于空闲状态,导致cpu的利用率不高。
为了合理最大限度的使用系统资源同时也要保证的程序的高性能,可以给CPU密集型任务和IO密集型任务配置一些线程数。
CPU密集型:线程个数为CPU核数。这几个线程可以并行执行,不存在线程切换到开销,提高了cpu的利用率的同时也减少了切换线程导致的性能损耗
IO密集型:线程个数为CPU核数的两倍。到其中的线程在IO操作的时候,其他线程可以继续用cpu,提高了cpu的利用率。
看完上述内容,你们掌握Java并发编程中线程池工作原理的示例分析的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注创新互联行业资讯频道,感谢各位的阅读!