join線程java
1. java線程中wait、await、sleep、yield、join用法總結
Java線程中wait、await、sleep、yield、join用法總結1. wait 用法:wait方法用於使當前線程等待,直到其他線程調用此對象的notify或notifyAll方法將其喚醒。調用wait方法時,線程必須擁有該對象的監視器。 特點:wait是Object類的方法,必須在同步代碼塊或同步方法中調用。
2. await 用法:await方法用於使當前線程等待,直到另一個線程調用與當前await調用關聯的Condition對象的signal或signalAll方法。與wait不同,await屬於java.util.concurrent.locks.Condition介面,通常與ReentrantLock一起使用。 特點:await需要在lock.lock和lock.unlock之間調用,以實現更靈活的線程同步。
3. sleep 用法:sleep方法使當前線程休眠指定的毫秒數。線程在休眠期間不會釋放任何監視器。 特點:sleep是Thread類的方法,用於控制線程的執行節奏,但不涉及線程間的通信。
4. yield 用法:yield方法提示調度器當前線程願意放棄當前對處理器的使用。調度器可以忽略這個提示。 特點:yield是Thread類的方法,通常用於提高程序的並發性能,但不能保證線程會立即放棄處理器。
5. join 用法:join方法使當前線程等待,直到調用join方法的線程執行完畢。這可以用於確保某些線程在繼續執行之前完成其任務。 特點:join是Thread類的方法,常用於控制線程的執行順序。
總結對比:
- wait、await:都用於使線程等待,但wait是Object類的方法,通常與synchronized一起使用;await是Condition介面的方法,通常與ReentrantLock一起使用。
- sleep:使線程休眠,不涉及線程間的通信。
- yield:提示調度器當前線程願意放棄處理器,但不保證立即放棄。
- join:使當前線程等待另一個線程執行完畢,常用於控制線程執行順序。
2. Java線程中wait、await、sleep、yield、join用法總結
一、wait()、notify()、notifyAll()用法
測試代碼:
列印日誌:
從日誌中我們可以看出waitTest方法阻塞直到被notifyTest喚醒。
二、await()、signal()、signalAll()用法
java.util.concurrent類庫中提供的Condition類來實現線程之間的協調。
測試代碼:
列印日誌:
從日誌中可以看出我們得到了和wait同樣的效果。
三、yield()、join()用法
yield測試代碼:
列印結果:
可以看出雖然主線程調用了yield,但是仍然又開始執行了,因此並不能保證輪流執行。
join測試代碼:
列印日誌:
從日誌中我們可以看出主線程在線程執行完成後才開始執行。
四、wait()、await()、sleep()、yield、join對比
通過表格對比(join的情況下,t1指代當前線程,t2代表其他線程)
3. Fork/Join框架基本使用和原理探究(基礎篇)
前提概述Java7開始引入了一種新的Fork/Join線程池,它可以執行一種特殊的任務:把一個大任務拆成多個小任務並行執行。
我們舉個例子:如果要計算一個超大數組的和,最簡單的做法是用一個循環在一個線程內完成:
演算法原理介紹相信大家此前或多或少有了解到ForkJoin框架,ForkJoin框架其實就是一個線程池ExecutorService的實現,通過工作竊取(work-stealing)演算法,獲取其他線程中未完成的任務來執行。可以充分利用機器的多處理器優勢,利用空閑的線程去並行快速完成一個可拆分為小任務的大任務,類似於分治演算法。
實現達成目標ForkJoin的目標,就是利用所有可用的處理能力來提高程序的響應和性能。本文將介紹ForkJoin框架,依次介紹基礎特性、案例使用、源碼剖析和實現亮點。
java.util.concurrent.ForkJoinPool由Java大師DougLea主持編寫,它可以將一個大的任務拆分成多個子任務進行並行處理,最後將子任務結果合並成最後的計算結果,並進行輸出。
基本使用入門例子,用Fork/Join框架使用示例,在這個示例中我們計算了1-5000累加後的值:
{privatestaticfinalIntegerMAX=400;<Integer>{//子任務開始計算的值privateIntegerstartValue;//子任務結束計算的值privateIntegerendValue;publicWorkTask(IntegerstartValue,IntegerendValue){this.startValue=startValue;this.endValue=endValue;}@(){//如果小於最小分片閾值,則說明要進行相關的數據操作//可以正式進行累加計算了if(endValue-startValue<MAX){System.out.println("開始計算的部分:startValue="+startValue+";endValue="+endValue);IntegertotalValue=0;for(intindex=this.startValue;index<=this.endValue;index++){totalValue+=index;}returntotalValue;}//否則再進行任務拆分,拆分成兩個任務else{//因為採用二分法,拆分,所以進行1/2切分數據量WorkTasksubTask1=newWorkTask(startValue,(startValue+endValue)/2);subTask1.fork();//進行拆分機制控制WorkTasksubTask2=newWorkTask((startValue+endValue)/2+1,endValue);subTask2.fork();returnsubTask1.join()+subTask2.join();}}}publicstaticvoidmain(String[]args){//這是Fork/Join框架的線程池ForkJoinPoolpool=newForkJoinPool();ForkJoinTask<Integer>taskFuture=pool.submit(newMyForkJoinTask(1,1001));try{Integerresult=taskFuture.get();System.out.println("result="+result);}catch(InterruptedException|ExecutionExceptione){e.printStackTrace(System.out);}}}對此我封裝了一個框架集合,基於JDK1.8+中的Fork/Join框架實現,參考的Fork/Join框架主要源代碼也基於JDK1.8+。
WorkTaskCallable實現抽象模型層次操作轉換@Accessors(chain=true)publicclassWorkTaskCallable<T>extendsRecursiveTask<T>{/***斷言操作控制*/@GetterprivatePredicate<T>predicate;/***執行參數化分割條件*/@GetterprivateTsplitParam;/***操作拆分方法操作機制*/@GetterprivateFunction<Object,Object[]>splitFunction;/***操作合並方法操作機制*/@GetterprivateBiFunction<Object,Object,T>mergeFunction;/***操作處理機制*/@Setter@GetterprivateFunction<T,T>processHandler;/***構造器是否進行分割操作*@parampredicate判斷是否進行下一步分割的條件關系*@paramsplitParam分割參數*@paramsplitFunction分割方法*@parammergeFunction合並數據操作*/publicWorkTaskCallable(Predicatepredicate,TsplitParam,Function<Object,Object[]>splitFunction,BiFunction<Object,Object,T>mergeFunction,Function<T,T>processHandler){this.predicate=predicate;this.splitParam=splitParam;this.splitFunction=splitFunction;this.mergeFunction=mergeFunction;this.processHandler=processHandler;}/***實際執行調用操作機制*@return*/@OverrideprotectedTcompute(){if(predicate.test(splitParam)){Object[]result=splitFunction.apply(splitParam);=newWorkTaskCallable(predicate,result[0],splitFunction,mergeFunction,processHandler);workTaskCallable1.fork();=newWorkTaskCallable(predicate,result[1],splitFunction,mergeFunction,processHandler);workTaskCallable2.fork();returnmergeFunction.apply(workTaskCallable1.join(),workTaskCallable2.join());}else{returnprocessHandler.apply(splitParam);}}}ArrayListWorkTaskCallable實現List集合層次操作轉換/***@project-name:wiz-shrding-framework*@package-name:com.wiz.sharding.framework.boot.common.thread.forkjoin*@author:LiBo/Alex*@create-date:2021-09-0917:26*@right:libo-alex4java*@email:[email protected]*@description:*/<List>{staticPredicate<List>predicateFunction=param->param.size()>3;staticFunction<List,List[]>splitFunction=(param)->{if(predicateFunction.test(param)){returnnewList[]{param.subList(0,param.size()/2),param.subList(param.size()/2,param.size())};}else{returnnewList[]{param.subList(0,param.size()+1),Lists.newArrayList()};}};staticBiFunction<List,List,List>mergeFunction=(param1,param2)->{Listdatalist=Lists.newArrayList();datalist.addAll(param2);datalist.addAll(param1);returndatalist;};/***構造器是否進行分割操作*@parampredicate判斷是否進行下一步分割的條件關系*@paramsplitParam分割參數*@paramsplitFunction分割方法*@parammergeFunction合並數據操作*/(Predicate<List>predicate,ListsplitParam,FunctionsplitFunction,BiFunctionmergeFunction,Function<List,List>processHandler){super(predicate,splitParam,splitFunction,mergeFunction,processHandler);}(ListsplitParam,FunctionsplitFunction,BiFunctionmergeFunction,Function<List,List>processHandler){super(predicateFunction,splitParam,splitFunction,mergeFunction,processHandler);}(Predicate<List>predicate,ListsplitParam,Function<List,List>processHandler){this(predicate,splitParam,splitFunction,mergeFunction,processHandler);}(ListsplitParam,Function<List,List>processHandler){this(predicateFunction,splitParam,splitFunction,mergeFunction,processHandler);}publicstaticvoidmain(String[]args){ListdataList=Lists.newArrayList(0,1,2,3,4,5,6,7,8,9);ForkJoinPoolforkJoinPool=ForkJoinPool.commonPool();ForkJoinTask<List>forkJoinResult=forkJoinPool.submit(newArrayListWorkTaskCallable(dataList,param->Lists.newArrayList(param.size())));try{System.out.println(forkJoinResult.get());}catch(InterruptedExceptione){e.printStackTrace();}catch(ExecutionExceptione){e.printStackTrace();}}ForkJoin代碼分析ForkJoinPool構造函數/***Createsa{@codeForkJoinPool}withparallelismequalto{@link*java.lang.Runtime#availableProcessors},usingthe{@linkplain*#},*noUncaughtExceptionHandler,andnon-asyncLIFOprocessingmode.**@**becauseitdoesnothold{@link*java.lang.RuntimePermission}{@code("modifyThread")}*/publicForkJoinPool(){this(Math.min(MAX_CAP,Runtime.getRuntime().availableProcessors()),,null,false);}/***Createsa{@codeForkJoinPool}withtheindicatedparallelism*level,the{@linkplain*#},*noUncaughtExceptionHandler,andnon-asyncLIFOprocessingmode.**@*@*equaltozero,*@**becauseitdoesnothold{@link*java.lang.RuntimePermission}{@code("modifyThread")}*/publicForkJoinPool(intparallelism){this(parallelism,,null,false);}/***Createsa{@codeForkJoinPool}withthegivenparameters.**@.Fordefaultvalue,*use{@linkjava.lang.Runtime#availableProcessors}.*@.Fordefaultvalue,*use{@link#}.*@**tasks.Fordefaultvalue,use{@codenull}.*@paramasyncModeiftrue,*establisheslocalfirst-in-first-outschelingmodeforforked*tasksthatareneverjoined.Thismodemaybemoreappropriate*thandefaultlocallystack-*workerthreadsonlyprocessevent-styleasynchronoustasks.*Fordefaultvalue,use{@codefalse}.*@*equaltozero,*@*@**becauseitdoesnothold{@link*java.lang.RuntimePermission}{@code("modifyThread")}*/publicForkJoinPool(intparallelism,,,booleanasyncMode){this(checkParallelism(parallelism),checkFactory(factory),handler,(asyncMode?FIFO_QUEUE:LIFO_QUEUE),"ForkJoinPool-"+nextPoolId()+"-worker-");checkPermission();}/***Createsa{@codeForkJoinPool}withthegivenparameters,without*.Invokeddirectlyby*makeCommonPool.*/privateForkJoinPool(intparallelism,,,intmode,StringworkerNamePrefix){this.workerNamePrefix=workerNamePrefix;this.factory=factory;this.ueh=handler;this.mode=(short)mode;this.parallelism=(short)parallelism;longnp=(long)(-parallelism);//offsetctlcountsthis.ctl=((np<<AC_SHIFT)&AC_MASK)|((np<<TC_SHIFT)&TC_MASK);}parallelism:可並行級別,Fork/Join框架將依據這個並行級別的設定,決定框架內並行執行的線程數量。並行的每一個任務都會有一個線程進行處理,但是千萬不要將這個屬性理解成Fork/Join框架中最多存在的線程數量。
factory:當Fork/Join框架創建一個新的線程時,同樣會用到線程創建工廠。只不過這個線程工廠不再需要實現ThreadFactory介面,而是需要實現ForkJoinWorkerThreadFactory介面。後者是一個函數式介面,只需要實現一個名叫newThread的方法。
在Fork/Join框架中有一個默認的ForkJoinWorkerThreadFactory介面實現:。
handler:異常捕獲處理器。當執行的任務中出現異常,並從任務中被拋出時,就會被handler捕獲。
asyncMode:這個參數也非常重要,從字面意思來看是指的非同步模式,它並不是說Fork/Join框架是採用同步模式還是採用非同步模式工作。Fork/Join框架中為每一個獨立工作的線程准備了對應的待執行任務隊列,這個任務隊列是使用數組進行組合的雙向隊列。即是說存在於隊列中的待執行任務,即可以使用先進先出的工作模式,也可以使用後進先出的工作模式。
asyncMode?FIFO_QUEUE:LIFO_QUEUE,
當asyncMode設置為true的時候,隊列採用先進先出方式工作;反之則是採用後進先出的方式工作,該值默認為false
後進先出
先進先出
需要注意點ForkJoinPool一個構造函數只帶有parallelism參數,既是可以設定Fork/Join框架的最大並行任務數量;另一個構造函數則不帶有任何參數,對於最大並行任務數量也只是一個默認值——當前操作系統可以使用的CPU內核數量(Runtime.getRuntime().availableProcessors())。實際上ForkJoinPool還有一個私有的、原生構造函數,之上提到的三個構造函數都是對這個私有的、原生構造函數的調用。
如果你對Fork/Join框架沒有特定的執行要求,可以直接使用不帶有任何參數的構造函數。也就是說推薦基於當前操作系統可以使用的CPU內核數作為Fork/Join框架內最大並行任務數量,這樣可以保證CPU在處理並行任務時,盡量少發生任務線程間的運行狀態切換(實際上單個CPU內核上的線程間狀態切換基本上無法避免,因為操作系統同時運行多個線程和多個進程)。
從上面的的類關系圖可以看出來,ForkJoin框架的核心是ForkJoinPool類,基於AbstractExecutorService擴展(@sun.misc.Contended註解)。
ForkJoinPool中維護了一個隊列數組WorkQueue[],每個WorkQueue維護一個ForkJoinTask數組和當前工作線程。ForkJoinPool實現了工作竊取(work-stealing)演算法並執行ForkJoinTask。
ForkJoinPool類的屬性介紹ADD_WO
4. 詳解java Thread中的join方法
在Java編程中,Thread類的join()方法發揮著關鍵作用。當需要控制線程執行順序時,它能讓調用線程暫停,直至被調用的線程完成。在主線程(如main())中,join()尤其有用,它會阻止主線程直到目標線程結束,例如:
當調用t1.join()時,main()線程會被暫停,直到t1線程完全執行完畢,然後main()線程才會繼續執行。
join()方法的工作原理主要依賴於Java內存模型中的同步機制。通過查看Thread類的源碼,我們發現join()實際上調用了wait()方法,使調用線程進入等待狀態,直到目標線程結束。由於wait()方法前有synchronized修飾,這意味著主線程(t1線程的持有者)會在一個鎖定的上下文中等待,如下所示:
代碼等效於:synchronized(this) { wait(); },使得主線程進入等待隊列,直到t1線程結束。
然而,wait()方法本身並不會喚醒主線程,喚醒過程隱藏在Java虛擬機(JVM)的底層。當t1線程執行完畢,JVM會自動調用lock.notify_all()方法,將主線程從等待隊列中喚醒。
總結起來,join()方法的使用需要注意以下兩點:
1. 它讓調用線程暫停,直到目標線程結束。
2. 喚醒機制由JVM內部的notify_all()方法控制,確保線程按照預期順序執行。
理解這些原理,能幫助你更有效地管理和控制Java線程。