如何優雅的使用和理解線程池?
提示
請帶著這些問題繼續后文,會很大程度上幫助你更好的理解相關知識點。@pdai為什么要有線程池?Java是實現和管理線程池有哪些方式? 請簡單舉例如何使用。為什么很多公司不允許使用Executors去創建線程池? 那么推薦怎么使用呢?ThreadPoolExecutor有哪些核心的配置參數? 請簡要說明ThreadPoolExecutor可以創建哪是哪三種線程池呢?當隊列滿了并且worker的數量達到maxSize的時候,會怎么樣?說說ThreadPoolExecutor有哪些RejectedExecutionHandler策略? 默認是什么策略?簡要說下線程池的任務執行機制? execute –> addWorker –>runworker (getTask)線程池中任務是如何提交的?線程池中任務是如何關閉的?在配置線程池的時候需要考慮哪些配置因素?如何監控線程池的狀態?為什么要有線程池線程池能夠對線程進行統一分配,調優和監控:降低資源消耗(線程無限制地創建,然后使用完畢后銷毀)提高響應速度(無須創建線程)提高線程的可管理性ThreadPoolExecutor例子Java是如何實現和管理線程池的?從JDK 5開始,把工作單元與執行機制分離開來,工作單元包括Runnable和Callable,而執行機制由Executor框架提供。WorkerThreadSimpleThreadPool
程序中我們創建了固定大小為五個工作線程的線程池。然后分配給線程池十個工作,因為線程池大小為五,它將啟動五個工作線程先處理五個工作,其他的工作則處于等待狀態,一旦有工作完成,空閑下來工作線程就會撿取等待隊列里的其他工作進行執行。這里是以上程序的輸出。輸出表明線程池中至始至終只有五個名為 "pool-1-thread-1" 到 "pool-1-thread-5" 的五個線程,這五個線程不隨著工作的完成而消亡,會一直存在,并負責執行分配給線程池的任務,直到線程池消亡。Executors 類提供了使用了 ThreadPoolExecutor 的簡單的 ExecutorService 實現,但是 ThreadPoolExecutor 提供的功能遠不止于此。我們可以在創建 ThreadPoolExecutor 實例時指定活動線程的數量,我們也可以限制線程池的大小并且創建我們自己的 RejectedExecutionHandler 實現來處理不能適應工作隊列的工作。這里是我們自定義的 RejectedExecutionHandler 接口的實現。RejectedExecutionHandlerImpl.javaThreadPoolExecutor 提供了一些方法,我們可以使用這些方法來查詢 executor 的當前狀態,線程池大小,活動線程數量以及任務數量。因此我是用來一個監控線程在特定的時間間隔內打印 executor 信息。MyMonitorThread.java這里是使用 ThreadPoolExecutor 的線程池實現例子。WorkerPool.java注意在初始化 ThreadPoolExecutor 時,我們保持初始池大小為 2,最大池大小為 4 而工作隊列大小為 2。因此如果已經有四個正在執行的任務而此時分配來更多任務的話,工作隊列將僅僅保留他們(新任務)中的兩個,其他的將會被RejectedExecutionHandlerImpl 處理。上面程序的輸出可以證實以上觀點。注意 executor 的活動任務、完成任務以及所有完成任務,這些數量上的變化。我們可以調用 shutdown() 方法來結束所有提交的任務并終止線程池。ThreadPoolExecutor使用詳解其實java線程池的實現原理很簡單,說白了就是一個線程集合workerSet和一個阻塞隊列workQueue。當用戶向線程池提交一個任務(也就是線程)時,線程池會先將任務放入workQueue中。workerSet中的線程會不斷的從workQueue中獲取線程然后執行。當workQueue中沒有任務的時候,worker就會阻塞,直到隊列中有任務了就取出來繼續執行。Execute原理當一個任務提交至線程池之后:線程池首先當前運行的線程數量是否少于corePoolSize。如果是,則創建一個新的工作線程來執行任務。如果都在執行任務,則進入2.判斷BlockingQueue是否已經滿了,倘若還沒有滿,則將線程放入BlockingQueue。否則進入3.如果創建一個新的工作線程將使當前運行的線程數量超過maximumPoolSize,則交給RejectedExecutionHandler來處理任務。當ThreadPoolExecutor創建新線程時,通過CAS來更新線程池的狀態ctl.參數corePoolSize 線程池中的核心線程數,當提交一個任務時,線程池創建一個新線程執行任務,直到當前線程數等于corePoolSize, 即使有其他空閑線程能夠執行新來的任務, 也會繼續創建線程;如果當前線程數為corePoolSize,繼續提交的任務被保存到阻塞隊列中,等待被執行;如果執行了線程池的prestartAllCoreThreads()方法,線程池會提前創建并啟動所有核心線程。workQueue 用來保存等待被執行的任務的阻塞隊列. 在JDK中提供了如下阻塞隊列: 具體可以參考JUC 集合: BlockQueue詳解ArrayBlockingQueue: 基于數組結構的有界阻塞隊列,按FIFO排序任務;LinkedBlockingQueue: 基于鏈表結構的阻塞隊列,按FIFO排序任務,吞吐量通常要高于ArrayBlockingQueue;SynchronousQueue: 一個不存儲元素的阻塞隊列,每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處于阻塞狀態,吞吐量通常要高于LinkedBlockingQueue;PriorityBlockingQueue: 具有優先級的無界阻塞隊列;LinkedBlockingQueue比ArrayBlockingQueue在插入刪除節點性能方面更優,但是二者在put(), take()任務的時均需要加鎖,SynchronousQueue使用無鎖算法,根據節點的狀態判斷執行,而不需要用到鎖,其核心是Transfer.transfer().maximumPoolSize 線程池中允許的最大線程數。如果當前阻塞隊列滿了,且繼續提交任務,則創建新的線程執行任務,前提是當前線程數小于maximumPoolSize;當阻塞隊列是無界隊列, 則maximumPoolSize則不起作用, 因為無法提交至核心線程池的線程會一直持續地放入workQueue.keepAliveTime 線程空閑時的存活時間,即當線程沒有任務執行時,該線程繼續存活的時間;默認情況下,該參數只在線程數大于corePoolSize時才有用, 超過這個時間的空閑線程將被終止;unit keepAliveTime的單位threadFactory 創建線程的工廠,通過自定義的線程工廠可以給每個新建的線程設置一個具有識別度的線程名。默認為DefaultThreadFactoryhandler 線程池的飽和策略,當阻塞隊列滿了,且沒有空閑的工作線程,如果繼續提交任務,必須采取一種策略處理該任務,線程池提供了4種策略:AbortPolicy: 直接拋出異常,默認策略;CallerRunsPolicy: 用調用者所在的線程來執行任務;DiscardOldestPolicy: 丟棄阻塞隊列中靠最前的任務,并執行當前任務;DiscardPolicy: 直接丟棄任務;當然也可以根據應用場景實現RejectedExecutionHandler接口,自定義飽和策略,如記錄日志或持久化存儲不能處理的任務。三種類型newFixedThreadPool線程池的線程數量達corePoolSize后,即使線程池沒有可執行任務時,也不會釋放線程。FixedThreadPool的工作隊列為無界隊列LinkedBlockingQueue(隊列容量為Integer.MAX_VALUE), 這會導致以下問題:線程池里的線程數量不超過corePoolSize,這導致了maximumPoolSize和keepAliveTime將會是個無用參數由于使用了無界隊列, 所以FixedThreadPool永遠不會拒絕, 即飽和策略失效newSingleThreadExecutor初始化的線程池中只有一個線程,如果該線程異常結束,會重新創建一個新的線程繼續執行任務,唯一的線程可以保證所提交任務的順序執行.由于使用了無界隊列, 所以SingleThreadPool永遠不會拒絕, 即飽和策略失效newCachedThreadPool線程池的線程數可達到Integer.MAX_VALUE,即2147483647,內部使用SynchronousQueue作為阻塞隊列; 和newFixedThreadPool創建的線程池不同,newCachedThreadPool在沒有任務執行時,當線程的空閑時間超過keepAliveTime,會自動釋放線程資源,當提交新任務時,如果沒有空閑線程,則創建新線程執行任務,會導致一定的系統開銷; 執行過程與前兩種稍微不同:主線程調用SynchronousQueue的offer()方法放入task, 倘若此時線程池中有空閑的線程嘗試讀取 SynchronousQueue的task, 即調用了SynchronousQueue的poll(), 那么主線程將該task交給空閑線程. 否則執行(2)當線程池為空或者沒有空閑的線程, 則創建新的線程執行任務.執行完任務的線程倘若在60s內仍空閑, 則會被終止. 因此長時間空閑的CachedThreadPool不會持有任何線程資源.關閉線程池遍歷線程池中的所有線程,然后逐個調用線程的interrupt方法來中斷線程.關閉方式 - shutdown將線程池里的線程狀態設置成SHUTDOWN狀態, 然后中斷所有沒有正在執行任務的線程.關閉方式 - shutdownNow將線程池里的線程狀態設置成STOP狀態, 然后停止所有正在執行或暫停任務的線程. 只要調用這兩個關閉方法中的任意一個, isShutDown() 返回true. 當所有任務都成功關閉了, isTerminated()返回true.ThreadPoolExecutor源碼詳解幾個關鍵屬性內部狀態其中AtomicInteger變量ctl的功能非常強大: 利用低29位表示線程池中線程數,通過高3位表示線程池的運行狀態:RUNNING: -1 << COUNT_BITS,即高3位為111,該狀態的線程池會接收新任務,并處理阻塞隊列中的任務;SHUTDOWN: 0 << COUNT_BITS,即高3位為000,該狀態的線程池不會接收新任務,但會處理阻塞隊列中的任務;STOP : 1 << COUNT_BITS,即高3位為001,該狀態的線程不會接收新任務,也不會處理阻塞隊列中的任務,而且會中斷正在運行的任務;TIDYING : 2 << COUNT_BITS,即高3位為010, 所有的任務都已經終止;TERMINATED: 3 << COUNT_BITS,即高3位為011, terminated()方法已經執行完成任務的執行execute –> addWorker –>runworker (getTask)線程池的工作線程通過Woker類實現,在ReentrantLock鎖的保證下,把Woker實例插入到HashSet后,并啟動Woker中的線程。 從Woker類的構造方法實現可以發現: 線程工廠在創建線程thread時,將Woker實例本身this作為參數傳入,當執行start方法啟動線程thread時,本質是執行了Worker的runWorker方法。 firstTask執行完成之后,通過getTask方法從阻塞隊列中獲取等待的任務,如果隊列中沒有任務,getTask方法會被阻塞并掛起,不會占用cpu資源;execute()方法ThreadPoolExecutor.execute(task)實現了Executor.execute(task)為什么需要double check線程池的狀態?在多線程環境下,線程池的狀態時刻在變化,而ctl.get()是非原子操作,很有可能剛獲取了線程池狀態后線程池狀態就改變了。判斷是否將command加入workque是線程池之前的狀態。倘若沒有double check,萬一線程池處于非running狀態(在多線程環境下很有可能發生),那么command永遠不會執行。addWorker方法從方法execute的實現可以看出: addWorker主要負責創建新的線程并執行任務 線程池創建新線程執行任務時,需要 獲取全局鎖:Worker類的runworker方法繼承了AQS類,可以方便的實現工作線程的中止操作;實現了Runnable接口,可以將自身作為一個任務在工作線程中執行;當前提交的任務firstTask作為參數傳入Worker的構造方法;一些屬性還有構造方法:runWorker方法是線程池的核心:線程啟動之后,通過unlock方法釋放鎖,設置AQS的state為0,表示運行可中斷;Worker執行firstTask或從workQueue中獲取任務:進行加鎖操作,保證thread不被其他線程中斷(除非線程池被中斷)檢查線程池狀態,倘若線程池處于中斷狀態,當前線程將中斷。執行beforeExecute執行任務的run方法執行afterExecute方法解鎖操作通過getTask方法從阻塞隊列中獲取等待的任務,如果隊列中沒有任務,getTask方法會被阻塞并掛起,不會占用cpu資源;getTask方法下面來看一下getTask()方法,這里面涉及到keepAliveTime的使用,從這個方法我們可以看出線程池是怎么讓超過corePoolSize的那部分worker銷毀的。注意這里一段代碼是keepAliveTime起作用的關鍵:
allowCoreThreadTimeOut為false,線程即使空閑也不會被銷毀;倘若為ture,在keepAliveTime內仍空閑則會被銷毀。如果線程允許空閑等待而不被銷毀timed == false,workQueue.take任務: 如果阻塞隊列為空,當前線程會被掛起等待;當隊列中有任務加入時,線程被喚醒,take方法返回任務,并執行;如果線程不允許無休止空閑timed == true, workQueue.poll任務: 如果在keepAliveTime時間內,阻塞隊列還是沒有任務,則返回null;任務的提交submit任務,等待線程池execute執行FutureTask類的get方法時,會把主線程封裝成WaitNode節點并保存在waiters鏈表中, 并阻塞等待運行結果;FutureTask任務執行完成后,通過UNSAFE設置waiters相應的waitNode為null,并通過LockSupport類unpark方法喚醒主線程;在實際業務場景中,Future和Callable基本是成對出現的,Callable負責產生結果,Future負責獲取結果。Callable接口類似于Runnable,只是Runnable沒有返回值。Callable任務除了返回正常結果之外,如果發生異常,該異常也會被返回,即Future可以拿到異步執行任務各種結果;Future.get方法會導致主線程阻塞,直到Callable任務執行完成;submit方法AbstractExecutorService.submit()實現了ExecutorService.submit() 可以獲取執行完的返回值, 而ThreadPoolExecutor 是AbstractExecutorService.submit()的子類,所以submit方法也是ThreadPoolExecutor`的方法。通過submit方法提交的Callable任務會被封裝成了一個FutureTask對象。通過Executor.execute方法提交FutureTask到線程池中等待被執行,最終執行的是FutureTask的run方法;FutureTask對象public class FutureTask<V> implements RunnableFuture<V> 可以將FutureTask提交至線程池中等待被執行(通過FutureTask的run方法來執行)內部狀態內部狀態的修改通過sun.misc.Unsafe修改get方法內部通過awaitDone方法對主線程進行阻塞,具體實現如下:
如果主線程被中斷,則拋出中斷異常;判斷FutureTask當前的state,如果大于COMPLETING,說明任務已經執行完成,則直接返回;如果當前state等于COMPLETING,說明任務已經執行完,這時主線程只需通過yield方法讓出cpu資源,等待state變成NORMAL;通過WaitNode類封裝當前線程,并通過UNSAFE添加到waiters鏈表;最終通過LockSupport的park或parkNanos掛起線程;run方法FutureTask.run方法是在線程池中被執行的,而非主線程通過執行Callable任務的call方法;如果call執行成功,則通過set方法保存結果;如果call執行有異常,則通過setException保存異常;任務的關閉shutdown方法會將線程池的狀態設置為SHUTDOWN,線程池進入這個狀態后,就拒絕再接受任務,然后會將剩余的任務全部執行完shutdownNow做的比較絕,它先將線程池狀態設置為STOP,然后拒絕所有提交的任務。最后中斷左右正在運行中的worker,然后清空任務隊列。
更深入理解為什么線程池不允許使用Executors去創建? 推薦方式是什么?線程池不允許使用Executors去創建,而是通過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險。 說明:Executors各個方法的弊端:newFixedThreadPool和newSingleThreadExecutor: ??主要問題是堆積的請求處理隊列可能會耗費非常大的內存,甚至OOM。newCachedThreadPool和newScheduledThreadPool: ??主要問題是線程數最大數是Integer.MAX_VALUE,可能會創建數量非常多的線程,甚至OOM。推薦方式 1首先引入:commons-lang3包推薦方式 2首先引入:com.google.guava包推薦方式 3spring配置線程池方式:自定義線程工廠bean需要實現ThreadFactory,可參考該接口的其它默認實現類,使用方式直接注入bean調用execute(Runnable task)方法即可配置線程池需要考慮因素從任務的優先級,任務的執行時間長短,任務的性質(CPU密集/ IO密集),任務的依賴關系這四個角度來分析。并且近可能地使用有界的工作隊列。性質不同的任務可用使用不同規模的線程池分開處理:CPU密集型: 盡可能少的線程,Ncpu+1IO密集型: 盡可能多的線程, Ncpu*2,比如數據庫連接池混合型: CPU密集型的任務與IO密集型任務的執行時間差別較小,拆分為兩個線程池;否則沒有必要拆分。監控線程池的狀態可以使用ThreadPoolExecutor以下方法:getTaskCount() Returns the approximate total number of tasks that have ever been scheduled for execution.getCompletedTaskCount() Returns the approximate total number of tasks that have completed execution. 返回結果少于getTaskCount()。getLargestPoolSize() Returns the largest number of threads that have ever simultaneously been in the pool. 返回結果小于等于maximumPoolSizegetPoolSize() Returns the current number of threads in the pool.getActiveCount() Returns the approximate number of threads that are actively executing tasks.參考文章《Java并發編程藝術》https://www.jianshu.com/p/87bff5cc8d8chttps://blog.csdn.net/programmer_at/article/details/79799267https://blog.csdn.net/u013332124/article/details/79587436https://www.journaldev.com/1069/threadpoolexecutor-java-thread-pool-example-executorservice由于問答代碼塊插入受限,部分代碼未完全展示,若有需要可閱讀原文:戳我閱讀原文