掃二維碼與項目經理溝通
我們在微信上24小時期待你的聲音
解答本文疑問/技術咨詢/運營咨詢/技術建議/互聯(lián)網(wǎng)交流
《Java開發(fā)手冊》中強調,線程資源必須通過線程池提供,而創(chuàng)建線程池必須使用ThreadPoolExecutor。手冊主要強調利用線程池避免兩個問題,一是線程過渡切換,二是避免請求過多時造成OOM。但是如果參數(shù)配置錯誤,還是會引發(fā)上面的兩個問題。所以本節(jié)我們主要是討論ThreadPoolExecutor的一些技術細節(jié),并且給出幾個常用的最佳實踐建議。

網(wǎng)站建設哪家好,找創(chuàng)新互聯(lián)!專注于網(wǎng)頁設計、網(wǎng)站建設、微信開發(fā)、微信小程序開發(fā)、集團企業(yè)網(wǎng)站建設等服務項目。為回饋新老客戶創(chuàng)新互聯(lián)還提供了馬山免費建站歡迎大家使用!
我在查找資料的過程中,發(fā)現(xiàn)有些問題存在爭議。后面發(fā)現(xiàn),一部分原因是因為不同JDK版本的現(xiàn)實是有差異的。因此,下面的分析是基于當下最常用的版本JDK1.8,并且對于存在爭議的問題,我們分析源碼,源碼才是最準確的。
這是一個爭議點。我發(fā)現(xiàn)大部分博文,不論是國內的還是國外的,都是這樣回答這個問題的:
按照上面的描述,如果corePoolSize=0,則會判斷等待隊列的容量,如果還有容量,則排隊,并且不會創(chuàng)建新的線程。
—— 但其實,這是老版本的實現(xiàn)方式,從1.6之后,實現(xiàn)方式就變了。我們直接看execute的源碼(submit也依賴它),我備注出了關鍵一行:
- int c = ctl.get();
- if (workerCountOf(c) < corePoolSize) {
- if (addWorker(command, true))
- return;
- c = ctl.get();
- }
- if (isRunning(c) && workQueue.offer(command)) {
- int recheck = ctl.get();
- if (! isRunning(recheck) && remove(command))
- reject(command);
- // 注意這一行代碼,添加到等待隊列成功后,判斷當前池內線程數(shù)是否為0,如果是則創(chuàng)建一個firstTask為null的worker,這個worker會從等待隊列中獲取任務并執(zhí)行。
- else if (workerCountOf(recheck) == 0)
- addWorker(null, false);
- }
- else if (!addWorker(command, false))
- reject(command);
答
上述問題需區(qū)分JDK版本。在1.6版本之后,如果corePoolSize=0,提交任務時如果線程池為空,則會立即創(chuàng)建一個線程來執(zhí)行任務(先排隊再獲取);如果提交任務的時候,線程池不為空,則先在等待隊列中排隊,只有隊列滿了才會創(chuàng)建新線程。
所以,優(yōu)化在于,在隊列沒有滿的這段時間內,會有一個線程在消費提交的任務;1.6之前的實現(xiàn)是,必須等隊列滿了之后,才開始消費。
之前有人問過我這個問題,因為他發(fā)現(xiàn)應用中有些Bean創(chuàng)建了線程池,但是這個Bean一般情況下用不到,所以咨詢我是否需要把這個線程池注釋掉,以減少應用運行時的線程數(shù)(該應用運行時線程過多。)
答
不會。從上面的源碼可以看出,在剛剛創(chuàng)建ThreadPoolExecutor的時候,線程并不會立即啟動,而是要等到有任務提交時才會啟動,除非調用了prestartCoreThread/prestartAllCoreThreads事先啟動核心線程。
這個問題有點tricky。首先我們要明確一下概念,雖然在JavaDoc中也使用了“core/non-core threads”這樣的描述,但其實這是一個動態(tài)的概念,JDK并沒有給一部分線程打上“core”的標記,做什么特殊化的處理。這個問題我認為想要探討的是閑置線程終結策略的問題。
在JDK1.6之前,線程池會盡量保持corePoolSize個核心線程,即使這些線程閑置了很長時間。這一點曾被開發(fā)者詬病,所以從JDK1.6開始,提供了方法allowsCoreThreadTimeOut,如果傳參為true,則允許閑置的核心線程被終止。
請注意這種策略和corePoolSize=0的區(qū)別。我總結的區(qū)別是:
所以corePoolSize=0的效果,基本等同于allowsCoreThreadTimeOut=true && corePoolSize=1,但實現(xiàn)細節(jié)其實不同。
答
在JDK1.6之后,如果allowsCoreThreadTimeOut=true,核心線程也可以被終止。
首先我們要明確一下線程池模型。線程池有個內部類Worker,它實現(xiàn)了Runnable接口,首先,它自己要run起來。然后它會在合適的時候獲取我們提交的Runnable任務,然后調用任務的run()接口。一個Worker不終止的話可以不斷執(zhí)行任務。
我們前面說的“線程池中的線程”,其實就是Worker;等待隊列中的元素,是我們提交的Runnable任務。
每一個Worker在創(chuàng)建出來的時候,會調用它本身的run()方法,實現(xiàn)是runWorker(this),這個實現(xiàn)的核心是一個while循環(huán),這個循環(huán)不結束,Worker線程就不會終止,就是這個基本邏輯。
- final void runWorker(Worker w) {
- Thread wt = Thread.currentThread();
- Runnable task = w.firstTask;
- w.firstTask = null;
- w.unlock(); // allow interrupts
- boolean completedAbruptly = true;
- try {
- // 看這里,核心邏輯在這里
- while (task != null || (task = getTask()) != null) {
- w.lock();
- // If pool is stopping, ensure thread is interrupted;
- // if not, ensure thread is not interrupted. This
- // requires a recheck in second case to deal with
- // shutdownNow race while clearing interrupt
- if ((runStateAtLeast(ctl.get(), STOP) ||
- (Thread.interrupted() &&
- runStateAtLeast(ctl.get(), STOP))) &&
- !wt.isInterrupted())
- wt.interrupt();
- try {
- beforeExecute(wt, task);
- Throwable thrown = null;
- try {
- task.run();
- } catch (RuntimeException x) {
- thrown = x; throw x;
- } catch (Error x) {
- thrown = x; throw x;
- } catch (Throwable x) {
- thrown = x; throw new Error(x);
- } finally {
- afterExecute(task, thrown);
- }
- } finally {
- task = null;
- w.completedTasks++;
- w.unlock();
- }
- }
- completedAbruptly = false;
- } finally {
- processWorkerExit(w, completedAbruptly);
- }
- }
- private Runnable getTask() {
- boolean timedOut = false; // Did the last poll() time out?
- for (;;) {
- int c = ctl.get();
- int rs = runStateOf(c);
- // Check if queue empty only if necessary.
- if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
- decrementWorkerCount();
- return null;
- }
- int wc = workerCountOf(c);
- // Are workers subject to culling?
- boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
- if ((wc > maximumPoolSize || (timed && timedOut))
- && (wc > 1 || workQueue.isEmpty())) {
- if (compareAndDecrementWorkerCount(c))
- return null;
- continue;
- }
- try {
- // 注意,核心中的核心在這里
- Runnable r = timed ?
- workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
- workQueue.take();
- if (r != null)
- return r;
- timedOut = true;
- } catch (InterruptedException retry) {
- timedOut = false;
- }
- }
- }
答
實現(xiàn)方式非常巧妙,核心線程(Worker)即使一直空閑也不終止,是通過workQueue.take()實現(xiàn)的,它會一直阻塞到從等待隊列中取到新的任務。非核心線程空閑指定時間后終止是通過workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)實現(xiàn)的,一個空閑的Worker只等待keepAliveTime,如果還沒有取到任務則循環(huán)終止,線程也就運行結束了。
引申思考
Worker本身就是個線程,它再調用我們傳入的Runnable.run(),會啟動一個子線程么?如果你還沒有答案,再回想一下Runnable和Thread的關系。
籠統(tǒng)地回答是會占用內存,我們分析一下占用了哪些內存。首先,比較普通的一部分,一個線程的內存模型:
我想額外強調是下面這幾個內存占用,需要小心:
答
線程池保持空閑的核心線程是它的默認配置,一般來講是沒有問題的,因為它占用的內存一般不大。怕的就是業(yè)務代碼中使用ThreadLocal緩存的數(shù)據(jù)過大又不清理。
如果你的應用線程數(shù)處于高位,那么需要觀察一下YoungGC的情況,估算一下Eden大小是否足夠。如果不夠的話,可能要謹慎地創(chuàng)建新線程,并且讓空閑的線程終止;必要的時候,可能需要對JVM進行調參。
這也是個爭議點。有的博文說等于0表示空閑線程永遠不會終止,有的說表示執(zhí)行完立刻終止。還有的說等于-1表示空閑線程永遠不會終止。其實稍微看一下源碼知道了,這里我直接拋出答案。
答
在JDK1.8中,keepAliveTime=0表示非核心線程執(zhí)行完立刻終止。
默認情況下,keepAliveTime小于0,初始化的時候才會報錯;但如果allowsCoreThreadTimeOut,keepAliveTime必須大于0,不然初始化報錯。
很多代碼的寫法,我們都習慣按照常見范式去編寫,而沒有去思考為什么。比如:
—— 但是在上面,我提到過,submit()底層實現(xiàn)依賴execute(),兩者應該統(tǒng)一呀,為什么有差異呢?下面再扒一扒submit()的源碼,它的實現(xiàn)蠻有意思。
首先,ThreadPoolExecutor中沒有submit的代碼,而是在它的父類AbstractExecutorService中,有三個submit的重載方法,代碼非常簡單,關鍵代碼就兩行:
- public Future> submit(Runnable task) {
- if (task == null) throw new NullPointerException();
- RunnableFuture
ftask = newTaskFor(task, null); - execute(ftask);
- return ftask;
- }
- public
Future submit(Runnable task, T result) { - if (task == null) throw new NullPointerException();
- RunnableFuture
ftask = newTaskFor(task, result); - execute(ftask);
- return ftask;
- }
- public
Future submit(Callable task) { - if (task == null) throw new NullPointerException();
- RunnableFuture
ftask = newTaskFor(task); - execute(ftask);
- return ftask;
- }
正是因為這三個重載方法,都調用了execute,所以我才說submit底層依賴execute。通過查看這里execute的實現(xiàn),我們不難發(fā)現(xiàn),它就是ThreadPoolExecutor中的實現(xiàn),所以,造成submit和execute的差異化的代碼,不在這。那么造成差異的一定在newTaskFor方法中。這個方法也就new了一個FutureTask而已,F(xiàn)utureTask實現(xiàn)RunnableFuture接口,RunnableFuture接口繼承Runnable接口和Future接口。而Callable只是FutureTask的一個成員變量。
所以講到這里,就有另一個Java基礎知識點:Callable和Future的關系。我們一般用Callable編寫任務代碼,F(xiàn)uture是異步返回對象,通過它的get方法,阻塞式地獲取結果。FutureTask的核心代碼就是實現(xiàn)了Future接口,也就是get方法的實現(xiàn):
- public V get() throws InterruptedException, ExecutionException {
- int s = state;
- if (s <= COMPLETING)
- // 核心代碼
- s = awaitDone(false, 0L);
- return report(s);
- }
- private int awaitDone(boolean timed, long nanos)
- throws InterruptedException {
- final long deadline = timed ? System.nanoTime() + nanos : 0L;
- WaitNode q = null;
- boolean queued = false;
- // 死循環(huán)
- for (;;) {
- if (Thread.interrupted()) {
- removeWaiter(q);
- throw new InterruptedException();
- }
- int s = state;
- // 只有任務的狀態(tài)是’已完成‘,才會跳出死循環(huán)
- if (s > COMPLETING) {
- if (q != null)
- q.thread = null;
- return s;
- }
- else if (s == COMPLETING) // cannot time out yet
- Thread.yield();
- else if (q == null)
- q = new WaitNode();
- else if (!queued)
- queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
- q.next = waiters, q);
- else if (timed) {
- nanos = deadline - System.nanoTime();
- if (nanos <= 0L) {
- removeWaiter(q);
- return state;
- }
- LockSupport.parkNanos(this, nanos);
- }
- else
- LockSupport.park(this);
- }
- }
get的核心實現(xiàn)是有個awaitDone方法,這是一個死循環(huán),只有任務的狀態(tài)是“已完成”,才會跳出死循環(huán);否則會依賴UNSAFE包下的LockSupport.park原語進行阻塞,等待LockSupport.unpark信號量。而這個信號量只有當運行結束獲得結果、或者出現(xiàn)異常的情況下,才會發(fā)出來。分別對應方法set和setException。這就是異步執(zhí)行、阻塞獲取的原理,扯得有點遠了。
回到最初我們的疑問,為什么submit之后,通過get方法可以獲取到異常?原因是FutureTask有一個Object類型的outcome成員變量,用來記錄執(zhí)行結果。這個結果可以是傳入的泛型,也可以是Throwable異常:
- public void run() {
- if (state != NEW ||
- !UNSAFE.compareAndSwapObject(this, runnerOffset,
- null, Thread.currentThread()))
- return;
- try {
- Callable
c = callable; - if (c != null && state == NEW) {
- V result;
- boolean ran;
- try {
- result = c.call();
- ran = true;
- } catch (Throwable ex) {
- result = null;
- ran = false;
- setException(ex);
- }
- if (ran)
- set(result);
- }
- } finally {
- // runner must be non-null until state is settled to
- // prevent concurrent calls to run()
- runner = null;
- // state must be re-read after nulling runner to prevent
- // leaked interrupts
- int s = state;
- if (s >= INTERRUPTING)
- handlePossibleCancellationInterrupt(s);
- }
- }
- // get方法中依賴的,報告執(zhí)行結果
- private V report(int s) throws ExecutionException {
- Object x = outcome;
- if (s == NORMAL)
- return (V)x;
- if (s >= CANCELLED)
- throw new CancellationException();
- throw new ExecutionException((Throwable)x);
- }
FutureTask的另一個巧妙的地方就是借用RunnableAdapter內部類,將submit的Runnable封裝成Callable。所以就算你submit的是Runnable,一樣可以用get獲取到異常。
答
答
一般來講,線程池的生命周期跟隨服務的生命周期。如果一個服務(Service)停止服務了,那么需要調用shutdown方法進行關閉。所以ExecutorService.shutdown在Java以及一些中間件的源碼中,是封裝在Service的shutdown方法內的。
如果是Server端不重啟就不停止提供服務,我認為是不需要特殊處理的。
答
本來想分析一下兩者的源碼的,但是發(fā)現(xiàn)本文的篇幅已經過長了,源碼也貼了不少。感興趣的朋友自己看一下即可。
答
| SimpleAsyncTaskExecutor | 每次請求新開線程,沒有最大線程數(shù)設置.不是真的線程池,這個類不重用線程,每次調用都會創(chuàng)建一個新的線程。 |
| SyncTaskExecutor | 不是異步的線程。同步可以用SyncTaskExecutor,但這個可以說不算一個線程池,因為還在原線程執(zhí)行。這個類沒有實現(xiàn)異步調用,只是一個同步操作。 |
| ConcurrentTaskExecutor | Executor的適配類,不推薦使用。如果ThreadPoolTaskExecutor不滿足要求時,才用考慮使用這個類。 |
| SimpleThreadPoolTaskExecutor | 監(jiān)聽Spring’s lifecycle callbacks,并且可以和Quartz的Component兼容.是Quartz的SimpleThreadPool的類。線程池同時被quartz和非quartz使用,才需要使用此類。 |
這里我想著重強調的就是SimpleAsyncTaskExecutor,Spring中使用的@Async注解,底層就是基于SimpleAsyncTaskExecutor去執(zhí)行任務,只不過它不是線程池,而是每次都新開一個線程。
另外想要強調的是Executor接口。Java初學者容易想當然的以為Executor結尾的類就是一個線程池,而上面的都是反例。我們可以在JDK的execute方法上看到這個注釋:
- /**
- * Executes the given command at some time in the future. The command
- * may execute in a new thread, in a pooled thread, or in the calling
- * thread, at the discretion of the {@code Executor} implementation.
- */
所以,它的職責并不是提供一個線程池的接口,而是提供一個“將來執(zhí)行命令”的接口。真正能代表線程池意義的,是ThreadPoolExecutor類,而不是Executor接口。
線程池初始化示例:
- private static final ThreadPoolExecutor pool;
- static {
- ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("po-detail-pool-%d").build();
- pool = new ThreadPoolExecutor(4, 8, 60L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(512),
- threadFactory, new ThreadPoolExecutor.AbortPolicy());
- pool.allowCoreThreadTimeOut(true);
- }

我們在微信上24小時期待你的聲音
解答本文疑問/技術咨詢/運營咨詢/技術建議/互聯(lián)網(wǎng)交流