一、前言
在 Java 中,线程池有着比较方便的实现类 ThreadPoolExecutor,但是这个实现类并不是开箱即用的,在实际的业务中需要一定的知识基础才能发挥出线程池的好处。Elasticsearch 中就封装了一系列更实用的线程池实现,本篇文章就结合 ES 源码来探索一下大神们是如何实现线程池的。
二、基础知识
先回顾一下 Java 的线程池实现中任务调度的原理,有以下几个重要参数:当前线程数 current,核心线程数 core,最大线程数 max,存储等待任务的队列 queue,任务调度可以用以下流程图表示:
可以参考一下 Java 线程池类 ThreadPoolExecutor 的构造函数:
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
其中的 corePoolSize、maximumPoolSize 和 workQueue 上面介绍过了。其余几个参数含义如下:
keepAliveTime:当当前线程数大于核心线程数是,空闲线程等待新任务的时间,超过这个时间就会销毁该空闲线程 unit:keepAliveTime 的单位 threadFactory:线程池创建线程的工厂类,可以自定义线程的名字 handler:线程任务的拒绝 handler,只需要重写 RejectedExecutionHandler 接口的 rejectedExecution 类就可以自定义线程被拒绝后的逻辑
结合着这几个参数可以构建出适用场景不同的线程池模型。
三、Elasticsearch 线程池实现
ES 的线程池类是 ThreadPool,其中把不同的任务用不同的线程池分开。这样的好处一是隔离线程资源,避免当一种任务繁忙时影响到另一种任务的运行;二是可以为不同类型的任务分配不同类型的线程池,提高处理效率。
ES 预定义好了三种线程池:
- newSinglePrioritizing:固定单线程的线程池,只在 MasterService 中用于更新集群元数据,其中的任务队列使用了优先级队列 PriorityBlockingQueue。队列中存入的内容 TieBreakingPrioritizedRunnable 包含优先级 priority 和插入顺序 insertionOrder,当线程从队列中拿任务时会调用队列的 poll 方法,队列会返回优先级最高且插入最早的任务:
@Override
public int compareTo(PrioritizedRunnable pr) {
int res = super.compareTo(pr); // 父类的调用为priority.compareTo(pr.priority),先比较优先级
if (res != 0 || !(pr instanceof TieBreakingPrioritizedRunnable)) {
return res;
}
return insertionOrder < ((TieBreakingPrioritizedRunnable) pr).insertionOrder ? -1 : 1;
}
- newScaling:线程可扩容的线程池,类似 Java 默认线程池,设定 corePoolSize 和 maximumPoolSize。但是具体逻辑与默认线程池差别很大,首先是队列上,继承了一个性能很好的无界队列 LinkedTransferQueue,但是重写了将任务写入队列的 offer 方法,代码如下:
// check if there might be spare capacity in the thread
// pool executor
int left = executor.getMaximumPoolSize() - executor.getCorePoolSize();
if (left > 0) {
// reject queuing the task to force the thread pool
// executor to add a worker if it can; combined
// with ForceQueuePolicy, this causes the thread
// pool to always scale up to max pool size and we
// only queue when there is no spare capacity
return false;
} else {
return super.offer(e);
}
由上文可以知道,默认线程池的逻辑是:如果队列写入失败,并且当前线程数小于最大线程数时,才会增加线程并执行传入的任务。重写这里的 offer 方法之后实际上将写入队列和增加线程的顺序颠倒过来了,传入一个新任务时,如果线程没有到最大线程,直接增加线程,直到线程达到最大线程,任务再写入队列。由于 newScaling 线程池的任务类型包含 FLUSH,REFRESH 这些 IO 密集型任务,这样处理的原因比较显而易见了————提前增大线程数量可以获得更高的吞吐量。
另一个不同点是拒绝策略,使用 newScaling 线程池的任务都是系统层级或者非 CPU 密集型的,如 GENERIC,MANAGEMENT,REFREASH,FLUSH 等,因此没有必要将任务拒绝,所以在拒绝策略的定义中,实际上将要被拒绝的任务重新放进队列中:
static class ForceQueuePolicy implements XRejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
// force queue policy should only be used with a scaling queue
assert executor.getQueue() instanceof ExecutorScalingQueue;
executor.getQueue().put(r);
} catch (final InterruptedException e) {
// a scaling queue never blocks so a put to it can never be interrupted
throw new AssertionError(e);
}
}
@Override
public long rejected() {
return 0;
}
}
- newFixed:线程数固定的线程池,corePoolSize 和 maximumPoolSize 值相同,因此线程池正常执行时线程数时固定的。使用这种线程池的任务都是 CPU 密集型的任务,如 WRITE 和 FORCE_MERGE 等。因为对于 CPU 密集型任务,过大的线程数由于线程切换的损耗会使得任务效率下降。newFixed 线程池队列分两种情况:
BlockingQueue<Runnable> queue;
if (queueCapacity < 0) {
queue = ConcurrentCollections.newBlockingQueue(); // 具体是LinkedTransferQueue
} else {
queue = new SizeBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), queueCapacity);
}
当 queueCapacity 小于 0 时,队列是 LinkedTransferQueue,为无界队列。当 queueCapacity 大于 0 时,队列是 SizeBlockingQueue,实际上是一个封装了的 LinkedTransferQueue,为其加上了容量,参考 offer 方法:
@Override
public boolean offer(E e) {
while (true) {
final int current = size.get();
if (current >= capacity()) {
return false; // 当队列容量超过限制时,拒绝写入队列
}
if (size.compareAndSet(current, 1 + current)) {
break;
}
}
boolean offered = queue.offer(e);
if (!offered) {
size.decrementAndGet();
}
return offered;
}
在这种情况下就需要拒绝超过限额的请求,拒绝逻辑代码就不一样了,除了一些特殊任务的 isForceExecution()方法返回为 true 时会将该任务强行放入队列,其他情况下任务会被拒绝:
public class EsAbortPolicy implements XRejectedExecutionHandler {
private final CounterMetric rejected = new CounterMetric();
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (r instanceof AbstractRunnable) {
if (((AbstractRunnable) r).isForceExecution()) {
BlockingQueue<Runnable> queue = executor.getQueue();
if (!(queue instanceof SizeBlockingQueue)) {
throw new IllegalStateException("forced execution, but expected a size queue");
}
try {
((SizeBlockingQueue) queue).forcePut(r); // 当任务的isForceExecution方法返回true时,强行写入队列
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("forced execution, but got interrupted", e);
}
return;
}
}
rejected.inc();
throw new EsRejectedExecutionException("rejected execution of " + r + " on " + executor, executor.isShutdown());
}
@Override
public long rejected() {
return rejected.count();
}
}
另外,在 ES7.7.0 版本之前,还有一种专门给 SEARCH 任务使用的线程池 newAutoQueueFixed,它与 newFixed 线程池相似,只是队列大小是自适应的,本文不再赘述。
四、总结
可以看到,Elasticsearch 根据不同的任务类型精心设计了不同的线程池,并且根据任务的特性设置了不同的线程数和队列参数。相关代码清晰易读,非常值得学习。