Java线程池的工作原理


线程池的创建
下方代码块是线程池的完整构造函数。

    public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

corePoolSize:线程池的核心线程数,可以理解为初始化之后就存活的线程,即使没有任何任务也会在线程池中等待任务
maximumPoolSize:最大线程数,创建的线程不能大于此线程数
keepAliveTime:线程的存活时间。适用于corePoolSize的之外的线程,如果到最大存活时间还未被调用就退出此线程
unit:用来指定keepAliveTime的单位,比如秒:TimeUnit.SECONDS
workQueue:官方文档的直译是在执行任务之前用于保存任务的队列。此队列将只保存由execute方法提交的可运行任务。其实就是一个阻塞队列,若核心线程被占满时提交的任务会先放到这个队列里。
threadFactory:线程工厂,用来创建线程。
handler:官方文档直译是当执行被阻塞时要使用的处理程序,也称作拒绝策略。

线程池的执行流程
下方是ThreadPoolExecutor的execute方法

    public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {//步骤1
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {//步骤2
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))//步骤3
reject(command);
}

官方文档给出了三个步骤

    If fewer than corePoolSize threads are running, try to
    start a new thread with the given command as its first
    task. The call to addWorker atomically checks runState and
    workerCount, and so prevents false alarms that would add
    threads when it shouldn’t, by returning false.
    如果当前线程数小于核心线程数,则创建一个线程并尝试使用给定的指令作为其第一个任务启动新线程。对addWorker方法调用会自动检查 runState和workerCount,以此通过返回false来防止在不应该添加线程的情况下出现添加线程的错误。
    If a task can be successfully queued, then we still need to
    double-check whether we should have added a thread (because existing
    ones died since last checking) or that the pool shut down since
    entry into this method. So we recheck state and if necessary roll
    back the enqueuing if stopped, or start a new thread if there are
    none.
    如果任务可以成功进入队列,仍然需要再次检查是否应该添加一个线程(因为现有的线程在上次检查后死亡了),或者池在进入此方法后关闭。因此,我们会重新检查状态,如果停止队列,必要时回滚队列;如果没有线程,则启动一个新线程。
    If we cannot queue task, then we try to add a new thread. If it
    fails, we know we are shut down or saturated and so reject the task.
    如果不能将任务放入队列,则尝试添加一个新线程。如果失败了,则认为终止或者饱和,因此拒绝这个任务。

文档的解释里,第一步中的addWorker是比较难理解的,所以需要继续查看addWorker方法的源码。

addWorker上半部分源码

    private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// 检查队列是否为空.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

.........
}

第一个if中表示,如果当前运行的状态是SHUTDOWN,并且传入的任务为空,并且队列不为空的情况,则()中为true,前面加了个!则在这种情况下&&整个判断为false,则不会执行下面的return false。因此这部分主要是为了判断队列是否为空。

wc是当前工作的线程数量
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false;
这块代码是在创建非核心线程时,即core等于false。判断当前线程数是否大于等于maximumPoolSize,如果大于等于则返回false,即上边说到的步骤3中创建线程失败的情况。

下半部分代码

        boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);//创建Worker对象
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();//启动线程
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;

这里的步骤主要是

    创建Worker对象,同时也会实例化一个Thread对象。
    启动这个线程

在这个步骤中,他再一次判断了是否需要添加新线程,如上述步骤2中所说,因为现有的线程在上次检查后死亡了,或者池在进入此方法后关闭。因此会重新检查状态,如果停止队列,必要时回滚队列;如果没有线程,则启动一个新线程。

接下来看看Woker的逻辑

        /**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}

在Woker中会调用ThreadFactory来创建一个新线程。在上方的t.start()中就会调用这个run方法。

接下来继续看runWoker的逻辑,可以发现主要就是判断任务是否为空,不为空则运行。

    final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {//任务判空
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();//任务运行
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

最后再来看看任务判空中的getTask()方法

    private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// allowCoreThreadTimeOut 默认是false,后者是判断当前线程数是否大于核心线程数
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
//当前线程数比核心线程数大,则执行poll()方法获取任务,超时时长和单位设置为以下参数
//否则调用take()方法阻塞队列
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

    allowCoreThreadTimeOut,这个变量默认值是false。wc>corePoolSize则是判断当前线程数是否大于corePoolSize。
    如果当前线程数大于corePoolSize,则会调用workQueue的poll方法获取任务,超时时间是keepAliveTime。如果超过keepAliveTime时长,poll返回了null,上边提到的while循序就会退出,线程也就执行完了。如果当前线程数小于corePoolSize,则会调用workQueue的take方法阻塞在当前队列。

所以总结一下流程如下图所示
Java线程池的工作原理

原创:https://www.panoramacn.com
源码网提供WordPress源码,帝国CMS源码discuz源码,微信小程序,小说源码,杰奇源码,thinkphp源码,ecshop模板源码,微擎模板源码,dede源码,织梦源码等。

专业搭建小说网站,小说程序,杰奇系列,微信小说系列,app系列小说

Java线程池的工作原理

免责声明,若由于商用引起版权纠纷,一切责任均由使用者承担。

您必须遵守我们的协议,如果您下载了该资源行为将被视为对《免责声明》全部内容的认可-> 联系客服 投诉资源
www.panoramacn.com资源全部来自互联网收集,仅供用于学习和交流,请勿用于商业用途。如有侵权、不妥之处,请联系站长并出示版权证明以便删除。 敬请谅解! 侵权删帖/违法举报/投稿等事物联系邮箱:2640602276@qq.com
未经允许不得转载:书荒源码源码网每日更新网站源码模板! » Java线程池的工作原理
关注我们小说电影免费看
关注我们,获取更多的全网素材资源,有趣有料!
120000+人已关注
分享到:
赞(0) 打赏

评论抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

您的打赏就是我分享的动力!

支付宝扫一扫打赏

微信扫一扫打赏