join线程java
1. java线程中wait、await、sleep、yield、join用法总结
Java线程中wait、await、sleep、yield、join用法总结1. wait 用法:wait方法用于使当前线程等待,直到其他线程调用此对象的notify或notifyAll方法将其唤醒。调用wait方法时,线程必须拥有该对象的监视器。 特点:wait是Object类的方法,必须在同步代码块或同步方法中调用。
2. await 用法:await方法用于使当前线程等待,直到另一个线程调用与当前await调用关联的Condition对象的signal或signalAll方法。与wait不同,await属于java.util.concurrent.locks.Condition接口,通常与ReentrantLock一起使用。 特点:await需要在lock.lock和lock.unlock之间调用,以实现更灵活的线程同步。
3. sleep 用法:sleep方法使当前线程休眠指定的毫秒数。线程在休眠期间不会释放任何监视器。 特点:sleep是Thread类的方法,用于控制线程的执行节奏,但不涉及线程间的通信。
4. yield 用法:yield方法提示调度器当前线程愿意放弃当前对处理器的使用。调度器可以忽略这个提示。 特点:yield是Thread类的方法,通常用于提高程序的并发性能,但不能保证线程会立即放弃处理器。
5. join 用法:join方法使当前线程等待,直到调用join方法的线程执行完毕。这可以用于确保某些线程在继续执行之前完成其任务。 特点:join是Thread类的方法,常用于控制线程的执行顺序。
总结对比:
- wait、await:都用于使线程等待,但wait是Object类的方法,通常与synchronized一起使用;await是Condition接口的方法,通常与ReentrantLock一起使用。
- sleep:使线程休眠,不涉及线程间的通信。
- yield:提示调度器当前线程愿意放弃处理器,但不保证立即放弃。
- join:使当前线程等待另一个线程执行完毕,常用于控制线程执行顺序。
2. Java线程中wait、await、sleep、yield、join用法总结
一、wait()、notify()、notifyAll()用法
测试代码:
打印日志:
从日志中我们可以看出waitTest方法阻塞直到被notifyTest唤醒。
二、await()、signal()、signalAll()用法
java.util.concurrent类库中提供的Condition类来实现线程之间的协调。
测试代码:
打印日志:
从日志中可以看出我们得到了和wait同样的效果。
三、yield()、join()用法
yield测试代码:
打印结果:
可以看出虽然主线程调用了yield,但是仍然又开始执行了,因此并不能保证轮流执行。
join测试代码:
打印日志:
从日志中我们可以看出主线程在线程执行完成后才开始执行。
四、wait()、await()、sleep()、yield、join对比
通过表格对比(join的情况下,t1指代当前线程,t2代表其他线程)
3. Fork/Join框架基本使用和原理探究(基础篇)
前提概述Java7开始引入了一种新的Fork/Join线程池,它可以执行一种特殊的任务:把一个大任务拆成多个小任务并行执行。
我们举个例子:如果要计算一个超大数组的和,最简单的做法是用一个循环在一个线程内完成:
算法原理介绍相信大家此前或多或少有了解到ForkJoin框架,ForkJoin框架其实就是一个线程池ExecutorService的实现,通过工作窃取(work-stealing)算法,获取其他线程中未完成的任务来执行。可以充分利用机器的多处理器优势,利用空闲的线程去并行快速完成一个可拆分为小任务的大任务,类似于分治算法。
实现达成目标ForkJoin的目标,就是利用所有可用的处理能力来提高程序的响应和性能。本文将介绍ForkJoin框架,依次介绍基础特性、案例使用、源码剖析和实现亮点。
java.util.concurrent.ForkJoinPool由Java大师DougLea主持编写,它可以将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果,并进行输出。
基本使用入门例子,用Fork/Join框架使用示例,在这个示例中我们计算了1-5000累加后的值:
{privatestaticfinalIntegerMAX=400;<Integer>{//子任务开始计算的值privateIntegerstartValue;//子任务结束计算的值privateIntegerendValue;publicWorkTask(IntegerstartValue,IntegerendValue){this.startValue=startValue;this.endValue=endValue;}@(){//如果小于最小分片阈值,则说明要进行相关的数据操作//可以正式进行累加计算了if(endValue-startValue<MAX){System.out.println("开始计算的部分:startValue="+startValue+";endValue="+endValue);IntegertotalValue=0;for(intindex=this.startValue;index<=this.endValue;index++){totalValue+=index;}returntotalValue;}//否则再进行任务拆分,拆分成两个任务else{//因为采用二分法,拆分,所以进行1/2切分数据量WorkTasksubTask1=newWorkTask(startValue,(startValue+endValue)/2);subTask1.fork();//进行拆分机制控制WorkTasksubTask2=newWorkTask((startValue+endValue)/2+1,endValue);subTask2.fork();returnsubTask1.join()+subTask2.join();}}}publicstaticvoidmain(String[]args){//这是Fork/Join框架的线程池ForkJoinPoolpool=newForkJoinPool();ForkJoinTask<Integer>taskFuture=pool.submit(newMyForkJoinTask(1,1001));try{Integerresult=taskFuture.get();System.out.println("result="+result);}catch(InterruptedException|ExecutionExceptione){e.printStackTrace(System.out);}}}对此我封装了一个框架集合,基于JDK1.8+中的Fork/Join框架实现,参考的Fork/Join框架主要源代码也基于JDK1.8+。
WorkTaskCallable实现抽象模型层次操作转换@Accessors(chain=true)publicclassWorkTaskCallable<T>extendsRecursiveTask<T>{/***断言操作控制*/@GetterprivatePredicate<T>predicate;/***执行参数化分割条件*/@GetterprivateTsplitParam;/***操作拆分方法操作机制*/@GetterprivateFunction<Object,Object[]>splitFunction;/***操作合并方法操作机制*/@GetterprivateBiFunction<Object,Object,T>mergeFunction;/***操作处理机制*/@Setter@GetterprivateFunction<T,T>processHandler;/***构造器是否进行分割操作*@parampredicate判断是否进行下一步分割的条件关系*@paramsplitParam分割参数*@paramsplitFunction分割方法*@parammergeFunction合并数据操作*/publicWorkTaskCallable(Predicatepredicate,TsplitParam,Function<Object,Object[]>splitFunction,BiFunction<Object,Object,T>mergeFunction,Function<T,T>processHandler){this.predicate=predicate;this.splitParam=splitParam;this.splitFunction=splitFunction;this.mergeFunction=mergeFunction;this.processHandler=processHandler;}/***实际执行调用操作机制*@return*/@OverrideprotectedTcompute(){if(predicate.test(splitParam)){Object[]result=splitFunction.apply(splitParam);=newWorkTaskCallable(predicate,result[0],splitFunction,mergeFunction,processHandler);workTaskCallable1.fork();=newWorkTaskCallable(predicate,result[1],splitFunction,mergeFunction,processHandler);workTaskCallable2.fork();returnmergeFunction.apply(workTaskCallable1.join(),workTaskCallable2.join());}else{returnprocessHandler.apply(splitParam);}}}ArrayListWorkTaskCallable实现List集合层次操作转换/***@project-name:wiz-shrding-framework*@package-name:com.wiz.sharding.framework.boot.common.thread.forkjoin*@author:LiBo/Alex*@create-date:2021-09-0917:26*@right:libo-alex4java*@email:[email protected]*@description:*/<List>{staticPredicate<List>predicateFunction=param->param.size()>3;staticFunction<List,List[]>splitFunction=(param)->{if(predicateFunction.test(param)){returnnewList[]{param.subList(0,param.size()/2),param.subList(param.size()/2,param.size())};}else{returnnewList[]{param.subList(0,param.size()+1),Lists.newArrayList()};}};staticBiFunction<List,List,List>mergeFunction=(param1,param2)->{Listdatalist=Lists.newArrayList();datalist.addAll(param2);datalist.addAll(param1);returndatalist;};/***构造器是否进行分割操作*@parampredicate判断是否进行下一步分割的条件关系*@paramsplitParam分割参数*@paramsplitFunction分割方法*@parammergeFunction合并数据操作*/(Predicate<List>predicate,ListsplitParam,FunctionsplitFunction,BiFunctionmergeFunction,Function<List,List>processHandler){super(predicate,splitParam,splitFunction,mergeFunction,processHandler);}(ListsplitParam,FunctionsplitFunction,BiFunctionmergeFunction,Function<List,List>processHandler){super(predicateFunction,splitParam,splitFunction,mergeFunction,processHandler);}(Predicate<List>predicate,ListsplitParam,Function<List,List>processHandler){this(predicate,splitParam,splitFunction,mergeFunction,processHandler);}(ListsplitParam,Function<List,List>processHandler){this(predicateFunction,splitParam,splitFunction,mergeFunction,processHandler);}publicstaticvoidmain(String[]args){ListdataList=Lists.newArrayList(0,1,2,3,4,5,6,7,8,9);ForkJoinPoolforkJoinPool=ForkJoinPool.commonPool();ForkJoinTask<List>forkJoinResult=forkJoinPool.submit(newArrayListWorkTaskCallable(dataList,param->Lists.newArrayList(param.size())));try{System.out.println(forkJoinResult.get());}catch(InterruptedExceptione){e.printStackTrace();}catch(ExecutionExceptione){e.printStackTrace();}}ForkJoin代码分析ForkJoinPool构造函数/***Createsa{@codeForkJoinPool}withparallelismequalto{@link*java.lang.Runtime#availableProcessors},usingthe{@linkplain*#},*noUncaughtExceptionHandler,andnon-asyncLIFOprocessingmode.**@**becauseitdoesnothold{@link*java.lang.RuntimePermission}{@code("modifyThread")}*/publicForkJoinPool(){this(Math.min(MAX_CAP,Runtime.getRuntime().availableProcessors()),,null,false);}/***Createsa{@codeForkJoinPool}withtheindicatedparallelism*level,the{@linkplain*#},*noUncaughtExceptionHandler,andnon-asyncLIFOprocessingmode.**@*@*equaltozero,*@**becauseitdoesnothold{@link*java.lang.RuntimePermission}{@code("modifyThread")}*/publicForkJoinPool(intparallelism){this(parallelism,,null,false);}/***Createsa{@codeForkJoinPool}withthegivenparameters.**@.Fordefaultvalue,*use{@linkjava.lang.Runtime#availableProcessors}.*@.Fordefaultvalue,*use{@link#}.*@**tasks.Fordefaultvalue,use{@codenull}.*@paramasyncModeiftrue,*establisheslocalfirst-in-first-outschelingmodeforforked*tasksthatareneverjoined.Thismodemaybemoreappropriate*thandefaultlocallystack-*workerthreadsonlyprocessevent-styleasynchronoustasks.*Fordefaultvalue,use{@codefalse}.*@*equaltozero,*@*@**becauseitdoesnothold{@link*java.lang.RuntimePermission}{@code("modifyThread")}*/publicForkJoinPool(intparallelism,,,booleanasyncMode){this(checkParallelism(parallelism),checkFactory(factory),handler,(asyncMode?FIFO_QUEUE:LIFO_QUEUE),"ForkJoinPool-"+nextPoolId()+"-worker-");checkPermission();}/***Createsa{@codeForkJoinPool}withthegivenparameters,without*.Invokeddirectlyby*makeCommonPool.*/privateForkJoinPool(intparallelism,,,intmode,StringworkerNamePrefix){this.workerNamePrefix=workerNamePrefix;this.factory=factory;this.ueh=handler;this.mode=(short)mode;this.parallelism=(short)parallelism;longnp=(long)(-parallelism);//offsetctlcountsthis.ctl=((np<<AC_SHIFT)&AC_MASK)|((np<<TC_SHIFT)&TC_MASK);}parallelism:可并行级别,Fork/Join框架将依据这个并行级别的设定,决定框架内并行执行的线程数量。并行的每一个任务都会有一个线程进行处理,但是千万不要将这个属性理解成Fork/Join框架中最多存在的线程数量。
factory:当Fork/Join框架创建一个新的线程时,同样会用到线程创建工厂。只不过这个线程工厂不再需要实现ThreadFactory接口,而是需要实现ForkJoinWorkerThreadFactory接口。后者是一个函数式接口,只需要实现一个名叫newThread的方法。
在Fork/Join框架中有一个默认的ForkJoinWorkerThreadFactory接口实现:。
handler:异常捕获处理器。当执行的任务中出现异常,并从任务中被抛出时,就会被handler捕获。
asyncMode:这个参数也非常重要,从字面意思来看是指的异步模式,它并不是说Fork/Join框架是采用同步模式还是采用异步模式工作。Fork/Join框架中为每一个独立工作的线程准备了对应的待执行任务队列,这个任务队列是使用数组进行组合的双向队列。即是说存在于队列中的待执行任务,即可以使用先进先出的工作模式,也可以使用后进先出的工作模式。
asyncMode?FIFO_QUEUE:LIFO_QUEUE,
当asyncMode设置为true的时候,队列采用先进先出方式工作;反之则是采用后进先出的方式工作,该值默认为false
后进先出
先进先出
需要注意点ForkJoinPool一个构造函数只带有parallelism参数,既是可以设定Fork/Join框架的最大并行任务数量;另一个构造函数则不带有任何参数,对于最大并行任务数量也只是一个默认值——当前操作系统可以使用的CPU内核数量(Runtime.getRuntime().availableProcessors())。实际上ForkJoinPool还有一个私有的、原生构造函数,之上提到的三个构造函数都是对这个私有的、原生构造函数的调用。
如果你对Fork/Join框架没有特定的执行要求,可以直接使用不带有任何参数的构造函数。也就是说推荐基于当前操作系统可以使用的CPU内核数作为Fork/Join框架内最大并行任务数量,这样可以保证CPU在处理并行任务时,尽量少发生任务线程间的运行状态切换(实际上单个CPU内核上的线程间状态切换基本上无法避免,因为操作系统同时运行多个线程和多个进程)。
从上面的的类关系图可以看出来,ForkJoin框架的核心是ForkJoinPool类,基于AbstractExecutorService扩展(@sun.misc.Contended注解)。
ForkJoinPool中维护了一个队列数组WorkQueue[],每个WorkQueue维护一个ForkJoinTask数组和当前工作线程。ForkJoinPool实现了工作窃取(work-stealing)算法并执行ForkJoinTask。
ForkJoinPool类的属性介绍ADD_WO
4. 详解java Thread中的join方法
在Java编程中,Thread类的join()方法发挥着关键作用。当需要控制线程执行顺序时,它能让调用线程暂停,直至被调用的线程完成。在主线程(如main())中,join()尤其有用,它会阻止主线程直到目标线程结束,例如:
当调用t1.join()时,main()线程会被暂停,直到t1线程完全执行完毕,然后main()线程才会继续执行。
join()方法的工作原理主要依赖于Java内存模型中的同步机制。通过查看Thread类的源码,我们发现join()实际上调用了wait()方法,使调用线程进入等待状态,直到目标线程结束。由于wait()方法前有synchronized修饰,这意味着主线程(t1线程的持有者)会在一个锁定的上下文中等待,如下所示:
代码等效于:synchronized(this) { wait(); },使得主线程进入等待队列,直到t1线程结束。
然而,wait()方法本身并不会唤醒主线程,唤醒过程隐藏在Java虚拟机(JVM)的底层。当t1线程执行完毕,JVM会自动调用lock.notify_all()方法,将主线程从等待队列中唤醒。
总结起来,join()方法的使用需要注意以下两点:
1. 它让调用线程暂停,直到目标线程结束。
2. 唤醒机制由JVM内部的notify_all()方法控制,确保线程按照预期顺序执行。
理解这些原理,能帮助你更有效地管理和控制Java线程。