集合框架源码
① Fork/Join框架基本使用和原理探究(基础篇)
前提概述java7开始引入了一种新的Fork/Join线程池,它可以执行一种特殊的任务:把一个大任务拆成多个小任务并行执行。
我们举个例子:如果要计算一个超大数组的和,最简单的做法是用一个循环在一个线程内完成:
算法原理介绍相信大家此前或多或少有了解到ForkJoin框架,ForkJoin框架其实就是一个线程池ExecutorService的实现,通过工作窃取(work-stealing)算法,获取其他线程中未完成的任务来执行。可以充分利用机器的多处理器优势,利用空闲的线程去并行快速完成一个可拆分为小任务的大任务,类似于分治算法。
实现达成目标ForkJoin的目标,就是利用所有可用的处理能力来提高程序的响应和性能。本文将介绍ForkJoin框架,依次介绍基础特性、案例使用、源码剖析和实现亮点。
java.util.concurrent.ForkJoinPool由Java大师DougLea主持编写,它可以将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果,并进行输出。
基本使用入门例子,用Fork/Join框架使用示例,在这个示例中我们计算了1-5000累加后的值:
{privatestaticfinalIntegerMAX=400;<Integer>{//子任务开始计算的值privateIntegerstartValue;//子任务结束计算的值privateIntegerendValue;publicWorkTask(IntegerstartValue,IntegerendValue){this.startValue=startValue;this.endValue=endValue;}@(){//如果小于最小分片阈值,则说明要进行相关的数据操作//可以正式进行累加计算了if(endValue-startValue<MAX){System.out.println("开始计算的部分:startValue="+startValue+";endValue="+endValue);IntegertotalValue=0;for(intindex=this.startValue;index<=this.endValue;index++){totalValue+=index;}returntotalValue;}//否则再进行任务拆分,拆分成两个任务else{//因为采用二分法,拆分,所以进行1/2切分数据量WorkTasksubTask1=newWorkTask(startValue,(startValue+endValue)/2);subTask1.fork();//进行拆分机制控制WorkTasksubTask2=newWorkTask((startValue+endValue)/2+1,endValue);subTask2.fork();returnsubTask1.join()+subTask2.join();}}}publicstaticvoidmain(String[]args){//这是Fork/Join框架的线程池ForkJoinPoolpool=newForkJoinPool();ForkJoinTask<Integer>taskFuture=pool.submit(newMyForkJoinTask(1,1001));try{Integerresult=taskFuture.get();System.out.println("result="+result);}catch(InterruptedException|ExecutionExceptione){e.printStackTrace(System.out);}}}对此我封装了一个框架集合,基于JDK1.8+中的Fork/Join框架实现,参考的Fork/Join框架主要源代码也基于JDK1.8+。
WorkTaskCallable实现抽象模型层次操作转换@Accessors(chain=true)publicclassWorkTaskCallable<T>extendsRecursiveTask<T>{/***断言操作控制*/@GetterprivatePredicate<T>predicate;/***执行参数化分割条件*/@GetterprivateTsplitParam;/***操作拆分方法操作机制*/@GetterprivateFunction<Object,Object[]>splitFunction;/***操作合并方法操作机制*/@GetterprivateBiFunction<Object,Object,T>mergeFunction;/***操作处理机制*/@Setter@GetterprivateFunction<T,T>processHandler;/***构造器是否进行分割操作*@parampredicate判断是否进行下一步分割的条件关系*@paramsplitParam分割参数*@paramsplitFunction分割方法*@parammergeFunction合并数据操作*/publicWorkTaskCallable(Predicatepredicate,TsplitParam,Function<Object,Object[]>splitFunction,BiFunction<Object,Object,T>mergeFunction,Function<T,T>processHandler){this.predicate=predicate;this.splitParam=splitParam;this.splitFunction=splitFunction;this.mergeFunction=mergeFunction;this.processHandler=processHandler;}/***实际执行调用操作机制*@return*/@OverrideprotectedTcompute(){if(predicate.test(splitParam)){Object[]result=splitFunction.apply(splitParam);=newWorkTaskCallable(predicate,result[0],splitFunction,mergeFunction,processHandler);workTaskCallable1.fork();=newWorkTaskCallable(predicate,result[1],splitFunction,mergeFunction,processHandler);workTaskCallable2.fork();returnmergeFunction.apply(workTaskCallable1.join(),workTaskCallable2.join());}else{returnprocessHandler.apply(splitParam);}}}ArrayListWorkTaskCallable实现List集合层次操作转换/***@project-name:wiz-shrding-framework*@package-name:com.wiz.sharding.framework.boot.common.thread.forkjoin*@author:LiBo/Alex*@create-date:2021-09-0917:26*@right:libo-alex4java*@email:[email protected]*@description:*/<List>{staticPredicate<List>predicateFunction=param->param.size()>3;staticFunction<List,List[]>splitFunction=(param)->{if(predicateFunction.test(param)){returnnewList[]{param.subList(0,param.size()/2),param.subList(param.size()/2,param.size())};}else{returnnewList[]{param.subList(0,param.size()+1),Lists.newArrayList()};}};staticBiFunction<List,List,List>mergeFunction=(param1,param2)->{Listdatalist=Lists.newArrayList();datalist.addAll(param2);datalist.addAll(param1);returndatalist;};/***构造器是否进行分割操作*@parampredicate判断是否进行下一步分割的条件关系*@paramsplitParam分割参数*@paramsplitFunction分割方法*@parammergeFunction合并数据操作*/(Predicate<List>predicate,ListsplitParam,FunctionsplitFunction,BiFunctionmergeFunction,Function<List,List>processHandler){super(predicate,splitParam,splitFunction,mergeFunction,processHandler);}(ListsplitParam,FunctionsplitFunction,BiFunctionmergeFunction,Function<List,List>processHandler){super(predicateFunction,splitParam,splitFunction,mergeFunction,processHandler);}(Predicate<List>predicate,ListsplitParam,Function<List,List>processHandler){this(predicate,splitParam,splitFunction,mergeFunction,processHandler);}(ListsplitParam,Function<List,List>processHandler){this(predicateFunction,splitParam,splitFunction,mergeFunction,processHandler);}publicstaticvoidmain(String[]args){ListdataList=Lists.newArrayList(0,1,2,3,4,5,6,7,8,9);ForkJoinPoolforkJoinPool=ForkJoinPool.commonPool();ForkJoinTask<List>forkJoinResult=forkJoinPool.submit(newArrayListWorkTaskCallable(dataList,param->Lists.newArrayList(param.size())));try{System.out.println(forkJoinResult.get());}catch(InterruptedExceptione){e.printStackTrace();}catch(ExecutionExceptione){e.printStackTrace();}}ForkJoin代码分析ForkJoinPool构造函数/***Createsa{@codeForkJoinPool}withparallelismequalto{@link*java.lang.Runtime#availableProcessors},usingthe{@linkplain*#},*noUncaughtExceptionHandler,andnon-asyncLIFOprocessingmode.**@**becauseitdoesnothold{@link*java.lang.RuntimePermission}{@code("modifyThread")}*/publicForkJoinPool(){this(Math.min(MAX_CAP,Runtime.getRuntime().availableProcessors()),,null,false);}/***Createsa{@codeForkJoinPool}withtheindicatedparallelism*level,the{@linkplain*#},*noUncaughtExceptionHandler,andnon-asyncLIFOprocessingmode.**@*@*equaltozero,*@**becauseitdoesnothold{@link*java.lang.RuntimePermission}{@code("modifyThread")}*/publicForkJoinPool(intparallelism){this(parallelism,,null,false);}/***Createsa{@codeForkJoinPool}withthegivenparameters.**@.Fordefaultvalue,*use{@linkjava.lang.Runtime#availableProcessors}.*@.Fordefaultvalue,*use{@link#}.*@**tasks.Fordefaultvalue,use{@codenull}.*@paramasyncModeiftrue,*establisheslocalfirst-in-first-outschelingmodeforforked*tasksthatareneverjoined.Thismodemaybemoreappropriate*thandefaultlocallystack-*workerthreadsonlyprocessevent-styleasynchronoustasks.*Fordefaultvalue,use{@codefalse}.*@*equaltozero,*@*@**becauseitdoesnothold{@link*java.lang.RuntimePermission}{@code("modifyThread")}*/publicForkJoinPool(intparallelism,,,booleanasyncMode){this(checkParallelism(parallelism),checkFactory(factory),handler,(asyncMode?FIFO_QUEUE:LIFO_QUEUE),"ForkJoinPool-"+nextPoolId()+"-worker-");checkPermission();}/***Createsa{@codeForkJoinPool}withthegivenparameters,without*.Invokeddirectlyby*makeCommonPool.*/privateForkJoinPool(intparallelism,,,intmode,StringworkerNamePrefix){this.workerNamePrefix=workerNamePrefix;this.factory=factory;this.ueh=handler;this.mode=(short)mode;this.parallelism=(short)parallelism;longnp=(long)(-parallelism);//offsetctlcountsthis.ctl=((np<<AC_SHIFT)&AC_MASK)|((np<<TC_SHIFT)&TC_MASK);}parallelism:可并行级别,Fork/Join框架将依据这个并行级别的设定,决定框架内并行执行的线程数量。并行的每一个任务都会有一个线程进行处理,但是千万不要将这个属性理解成Fork/Join框架中最多存在的线程数量。
factory:当Fork/Join框架创建一个新的线程时,同样会用到线程创建工厂。只不过这个线程工厂不再需要实现ThreadFactory接口,而是需要实现ForkJoinWorkerThreadFactory接口。后者是一个函数式接口,只需要实现一个名叫newThread的方法。
在Fork/Join框架中有一个默认的ForkJoinWorkerThreadFactory接口实现:。
handler:异常捕获处理器。当执行的任务中出现异常,并从任务中被抛出时,就会被handler捕获。
asyncMode:这个参数也非常重要,从字面意思来看是指的异步模式,它并不是说Fork/Join框架是采用同步模式还是采用异步模式工作。Fork/Join框架中为每一个独立工作的线程准备了对应的待执行任务队列,这个任务队列是使用数组进行组合的双向队列。即是说存在于队列中的待执行任务,即可以使用先进先出的工作模式,也可以使用后进先出的工作模式。
asyncMode?FIFO_QUEUE:LIFO_QUEUE,
当asyncMode设置为true的时候,队列采用先进先出方式工作;反之则是采用后进先出的方式工作,该值默认为false
后进先出
先进先出
需要注意点ForkJoinPool一个构造函数只带有parallelism参数,既是可以设定Fork/Join框架的最大并行任务数量;另一个构造函数则不带有任何参数,对于最大并行任务数量也只是一个默认值——当前操作系统可以使用的CPU内核数量(Runtime.getRuntime().availableProcessors())。实际上ForkJoinPool还有一个私有的、原生构造函数,之上提到的三个构造函数都是对这个私有的、原生构造函数的调用。
如果你对Fork/Join框架没有特定的执行要求,可以直接使用不带有任何参数的构造函数。也就是说推荐基于当前操作系统可以使用的CPU内核数作为Fork/Join框架内最大并行任务数量,这样可以保证CPU在处理并行任务时,尽量少发生任务线程间的运行状态切换(实际上单个CPU内核上的线程间状态切换基本上无法避免,因为操作系统同时运行多个线程和多个进程)。
从上面的的类关系图可以看出来,ForkJoin框架的核心是ForkJoinPool类,基于AbstractExecutorService扩展(@sun.misc.Contended注解)。
ForkJoinPool中维护了一个队列数组WorkQueue[],每个WorkQueue维护一个ForkJoinTask数组和当前工作线程。ForkJoinPool实现了工作窃取(work-stealing)算法并执行ForkJoinTask。
ForkJoinPool类的属性介绍ADD_WO
② 用java语言编写的jdbc数据库与java集合框架相连接的程序源代码
我以前用到的,oracle数据库的:
package com.icool.common.util;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
/**
*
* @author ZH_Q
* @version 1.0
*/
public class GetConn {
private Connection conn=null;
private String usName;
private String usPwd;
private String Clfn;
private String dmName;
//调用空参构造,默认启用本地数据库
public GetConn() {
this.Clfn ="oracle.jdbc.driver.OracleDriver";
this.dmName ="jdbc:oracle:thin:@localhost:1521:orcl";
this.usPwd = "q792002998";
this.usName = "system";
}
/**
* @return 数据库连接对象
*/
public Connection getConn() {
try
{
Class.forName(Clfn);
conn = DriverManager.getConnection(dmName,usName,usPwd);
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
}
return conn;
}
/**
* @return 根据SQL语句查询出的结果集
* @throws SQLException
*/
public ResultSet executeQuery(String sql) throws SQLException {
conn =getConn();
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(sql);
return rs;
}
/**
* @return 影响数据行数
* @throws SQLException
*/
public int executeUpdate(String sql) throws SQLException {
Statement stmt = null;
int i = 0;
getConn();
stmt = conn.createStatement();
i = stmt.executeUpdate(sql);
return i;
}
/**
* @return 根据SQL语句返回预编译对象
* @throws SQLException
*/
public PreparedStatement PreparedStatement(String sql) throws SQLException {
PreparedStatement pstmt = null;
getConn();
pstmt= conn.prepareStatement(sql);
return pstmt;
}
/**
* @param 关闭数据库连接
* @throws DataBaseExpection
*/
public void close(){
if(conn!=null) {
try
{
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
/**
* @param 设置是否自动提交
* @throws SQLException
*/
public void setAutoCommit(boolean b) throws SQLException {
getConn();
conn.setAutoCommit(b);
}
public void commit() throws SQLException {
getConn();
conn.commit();
}
public void rollback() throws SQLException {
getConn();
conn.rollback();
}
}
③ 集合竞价选股公式源码
集合竞价选股公式源码的具体内容需要根据特定的选股策略和需求来编写,无法直接给出一个通用的源码。但我可以提供一个大致的框架和思路。
解释:
集合竞价选股公式通常是基于技术分析、基本面分析或其他选股策略来编写的。源码的编写需要使用特定的编程语言,如Python、C++等,来实现选股的逻辑。以下是一个简化的集合竞价选股公式的编写框架:
确定选股策略:首先确定你要采用的选股策略,如趋势跟踪、动量策略、价值投资等。明确策略后,可以进一步分析需要获取的数据和计算指标。
数据获取与处理:编写代码获取股票的历史数据,包括集合竞价数据、日常交易数据等。数据获取后需要进行清洗和处理,以得到用于分析和计算的纯净数据。
算法实现:根据选股策略设计相应的算法。例如,如果采用趋势跟踪策略,可能需要计算股票的移动平均线、相对强弱指数等。这些计算将用于判断股票的走势和买卖点。
规则判断与信号输出:基于计算的结果设定选股规则,如当股票价格突破某一水平时发出买入信号。在源码中实现这些规则判断,并输出相应的信号。
优化与测试:对编写的源码进行优化和测试,确保其在不同市场环境下的稳定性和准确性。这可能需要使用历史数据回测或实时交易测试等方法。
请注意,以上只是一个大致的框架,具体的源码编写需要根据具体的选股策略和需求来详细设计和实现。如果你有更具体的需求或问题,可以提供更详细的信息,以便得到更准确的答案。