當前位置:首頁 » 操作系統 » 集合框架源碼

集合框架源碼

發布時間: 2025-10-08 08:21:49

① Fork/Join框架基本使用和原理探究(基礎篇)

前提概述

java7開始引入了一種新的Fork/Join線程池,它可以執行一種特殊的任務:把一個大任務拆成多個小任務並行執行。

我們舉個例子:如果要計算一個超大數組的和,最簡單的做法是用一個循環在一個線程內完成:

演算法原理介紹

相信大家此前或多或少有了解到ForkJoin框架,ForkJoin框架其實就是一個線程池ExecutorService的實現,通過工作竊取(work-stealing)演算法,獲取其他線程中未完成的任務來執行。可以充分利用機器的多處理器優勢,利用空閑的線程去並行快速完成一個可拆分為小任務的大任務,類似於分治演算法。

實現達成目標

ForkJoin的目標,就是利用所有可用的處理能力來提高程序的響應和性能。本文將介紹ForkJoin框架,依次介紹基礎特性、案例使用、源碼剖析和實現亮點。

java.util.concurrent.ForkJoinPool由Java大師DougLea主持編寫,它可以將一個大的任務拆分成多個子任務進行並行處理,最後將子任務結果合並成最後的計算結果,並進行輸出。

基本使用

入門例子,用Fork/Join框架使用示例,在這個示例中我們計算了1-5000累加後的值:

{privatestaticfinalIntegerMAX=400;<Integer>{//子任務開始計算的值privateIntegerstartValue;//子任務結束計算的值privateIntegerendValue;publicWorkTask(IntegerstartValue,IntegerendValue){this.startValue=startValue;this.endValue=endValue;}@(){//如果小於最小分片閾值,則說明要進行相關的數據操作//可以正式進行累加計算了if(endValue-startValue<MAX){System.out.println("開始計算的部分:startValue="+startValue+";endValue="+endValue);IntegertotalValue=0;for(intindex=this.startValue;index<=this.endValue;index++){totalValue+=index;}returntotalValue;}//否則再進行任務拆分,拆分成兩個任務else{//因為採用二分法,拆分,所以進行1/2切分數據量WorkTasksubTask1=newWorkTask(startValue,(startValue+endValue)/2);subTask1.fork();//進行拆分機制控制WorkTasksubTask2=newWorkTask((startValue+endValue)/2+1,endValue);subTask2.fork();returnsubTask1.join()+subTask2.join();}}}publicstaticvoidmain(String[]args){//這是Fork/Join框架的線程池ForkJoinPoolpool=newForkJoinPool();ForkJoinTask<Integer>taskFuture=pool.submit(newMyForkJoinTask(1,1001));try{Integerresult=taskFuture.get();System.out.println("result="+result);}catch(InterruptedException|ExecutionExceptione){e.printStackTrace(System.out);}}}

對此我封裝了一個框架集合,基於JDK1.8+中的Fork/Join框架實現,參考的Fork/Join框架主要源代碼也基於JDK1.8+。

WorkTaskCallable實現抽象模型層次操作轉換@Accessors(chain=true)publicclassWorkTaskCallable<T>extendsRecursiveTask<T>{/***斷言操作控制*/@GetterprivatePredicate<T>predicate;/***執行參數化分割條件*/@GetterprivateTsplitParam;/***操作拆分方法操作機制*/@GetterprivateFunction<Object,Object[]>splitFunction;/***操作合並方法操作機制*/@GetterprivateBiFunction<Object,Object,T>mergeFunction;/***操作處理機制*/@Setter@GetterprivateFunction<T,T>processHandler;/***構造器是否進行分割操作*@parampredicate判斷是否進行下一步分割的條件關系*@paramsplitParam分割參數*@paramsplitFunction分割方法*@parammergeFunction合並數據操作*/publicWorkTaskCallable(Predicatepredicate,TsplitParam,Function<Object,Object[]>splitFunction,BiFunction<Object,Object,T>mergeFunction,Function<T,T>processHandler){this.predicate=predicate;this.splitParam=splitParam;this.splitFunction=splitFunction;this.mergeFunction=mergeFunction;this.processHandler=processHandler;}/***實際執行調用操作機制*@return*/@OverrideprotectedTcompute(){if(predicate.test(splitParam)){Object[]result=splitFunction.apply(splitParam);=newWorkTaskCallable(predicate,result[0],splitFunction,mergeFunction,processHandler);workTaskCallable1.fork();=newWorkTaskCallable(predicate,result[1],splitFunction,mergeFunction,processHandler);workTaskCallable2.fork();returnmergeFunction.apply(workTaskCallable1.join(),workTaskCallable2.join());}else{returnprocessHandler.apply(splitParam);}}}ArrayListWorkTaskCallable實現List集合層次操作轉換/***@project-name:wiz-shrding-framework*@package-name:com.wiz.sharding.framework.boot.common.thread.forkjoin*@author:LiBo/Alex*@create-date:2021-09-0917:26*@right:libo-alex4java*@email:[email protected]*@description:*/<List>{staticPredicate<List>predicateFunction=param->param.size()>3;staticFunction<List,List[]>splitFunction=(param)->{if(predicateFunction.test(param)){returnnewList[]{param.subList(0,param.size()/2),param.subList(param.size()/2,param.size())};}else{returnnewList[]{param.subList(0,param.size()+1),Lists.newArrayList()};}};staticBiFunction<List,List,List>mergeFunction=(param1,param2)->{Listdatalist=Lists.newArrayList();datalist.addAll(param2);datalist.addAll(param1);returndatalist;};/***構造器是否進行分割操作*@parampredicate判斷是否進行下一步分割的條件關系*@paramsplitParam分割參數*@paramsplitFunction分割方法*@parammergeFunction合並數據操作*/(Predicate<List>predicate,ListsplitParam,FunctionsplitFunction,BiFunctionmergeFunction,Function<List,List>processHandler){super(predicate,splitParam,splitFunction,mergeFunction,processHandler);}(ListsplitParam,FunctionsplitFunction,BiFunctionmergeFunction,Function<List,List>processHandler){super(predicateFunction,splitParam,splitFunction,mergeFunction,processHandler);}(Predicate<List>predicate,ListsplitParam,Function<List,List>processHandler){this(predicate,splitParam,splitFunction,mergeFunction,processHandler);}(ListsplitParam,Function<List,List>processHandler){this(predicateFunction,splitParam,splitFunction,mergeFunction,processHandler);}publicstaticvoidmain(String[]args){ListdataList=Lists.newArrayList(0,1,2,3,4,5,6,7,8,9);ForkJoinPoolforkJoinPool=ForkJoinPool.commonPool();ForkJoinTask<List>forkJoinResult=forkJoinPool.submit(newArrayListWorkTaskCallable(dataList,param->Lists.newArrayList(param.size())));try{System.out.println(forkJoinResult.get());}catch(InterruptedExceptione){e.printStackTrace();}catch(ExecutionExceptione){e.printStackTrace();}}ForkJoin代碼分析ForkJoinPool構造函數/***Createsa{@codeForkJoinPool}withparallelismequalto{@link*java.lang.Runtime#availableProcessors},usingthe{@linkplain*#},*noUncaughtExceptionHandler,andnon-asyncLIFOprocessingmode.**@**becauseitdoesnothold{@link*java.lang.RuntimePermission}{@code("modifyThread")}*/publicForkJoinPool(){this(Math.min(MAX_CAP,Runtime.getRuntime().availableProcessors()),,null,false);}/***Createsa{@codeForkJoinPool}withtheindicatedparallelism*level,the{@linkplain*#},*noUncaughtExceptionHandler,andnon-asyncLIFOprocessingmode.**@*@*equaltozero,*@**becauseitdoesnothold{@link*java.lang.RuntimePermission}{@code("modifyThread")}*/publicForkJoinPool(intparallelism){this(parallelism,,null,false);}/***Createsa{@codeForkJoinPool}withthegivenparameters.**@.Fordefaultvalue,*use{@linkjava.lang.Runtime#availableProcessors}.*@.Fordefaultvalue,*use{@link#}.*@**tasks.Fordefaultvalue,use{@codenull}.*@paramasyncModeiftrue,*establisheslocalfirst-in-first-outschelingmodeforforked*tasksthatareneverjoined.Thismodemaybemoreappropriate*thandefaultlocallystack-*workerthreadsonlyprocessevent-styleasynchronoustasks.*Fordefaultvalue,use{@codefalse}.*@*equaltozero,*@*@**becauseitdoesnothold{@link*java.lang.RuntimePermission}{@code("modifyThread")}*/publicForkJoinPool(intparallelism,,,booleanasyncMode){this(checkParallelism(parallelism),checkFactory(factory),handler,(asyncMode?FIFO_QUEUE:LIFO_QUEUE),"ForkJoinPool-"+nextPoolId()+"-worker-");checkPermission();}/***Createsa{@codeForkJoinPool}withthegivenparameters,without*.Invokeddirectlyby*makeCommonPool.*/privateForkJoinPool(intparallelism,,,intmode,StringworkerNamePrefix){this.workerNamePrefix=workerNamePrefix;this.factory=factory;this.ueh=handler;this.mode=(short)mode;this.parallelism=(short)parallelism;longnp=(long)(-parallelism);//offsetctlcountsthis.ctl=((np<<AC_SHIFT)&AC_MASK)|((np<<TC_SHIFT)&TC_MASK);}

parallelism:可並行級別,Fork/Join框架將依據這個並行級別的設定,決定框架內並行執行的線程數量。並行的每一個任務都會有一個線程進行處理,但是千萬不要將這個屬性理解成Fork/Join框架中最多存在的線程數量。

factory:當Fork/Join框架創建一個新的線程時,同樣會用到線程創建工廠。只不過這個線程工廠不再需要實現ThreadFactory介面,而是需要實現ForkJoinWorkerThreadFactory介面。後者是一個函數式介面,只需要實現一個名叫newThread的方法。

在Fork/Join框架中有一個默認的ForkJoinWorkerThreadFactory介面實現:。

handler:異常捕獲處理器。當執行的任務中出現異常,並從任務中被拋出時,就會被handler捕獲。

asyncMode:這個參數也非常重要,從字面意思來看是指的非同步模式,它並不是說Fork/Join框架是採用同步模式還是採用非同步模式工作。Fork/Join框架中為每一個獨立工作的線程准備了對應的待執行任務隊列,這個任務隊列是使用數組進行組合的雙向隊列。即是說存在於隊列中的待執行任務,即可以使用先進先出的工作模式,也可以使用後進先出的工作模式。

asyncMode?FIFO_QUEUE:LIFO_QUEUE,

當asyncMode設置為true的時候,隊列採用先進先出方式工作;反之則是採用後進先出的方式工作,該值默認為false

後進先出

先進先出

需要注意點

ForkJoinPool一個構造函數只帶有parallelism參數,既是可以設定Fork/Join框架的最大並行任務數量;另一個構造函數則不帶有任何參數,對於最大並行任務數量也只是一個默認值——當前操作系統可以使用的CPU內核數量(Runtime.getRuntime().availableProcessors())。實際上ForkJoinPool還有一個私有的、原生構造函數,之上提到的三個構造函數都是對這個私有的、原生構造函數的調用。

如果你對Fork/Join框架沒有特定的執行要求,可以直接使用不帶有任何參數的構造函數。也就是說推薦基於當前操作系統可以使用的CPU內核數作為Fork/Join框架內最大並行任務數量,這樣可以保證CPU在處理並行任務時,盡量少發生任務線程間的運行狀態切換(實際上單個CPU內核上的線程間狀態切換基本上無法避免,因為操作系統同時運行多個線程和多個進程)。

從上面的的類關系圖可以看出來,ForkJoin框架的核心是ForkJoinPool類,基於AbstractExecutorService擴展(@sun.misc.Contended註解)。

ForkJoinPool中維護了一個隊列數組WorkQueue[],每個WorkQueue維護一個ForkJoinTask數組和當前工作線程。ForkJoinPool實現了工作竊取(work-stealing)演算法並執行ForkJoinTask。

ForkJoinPool類的屬性介紹

ADD_WO

② 用java語言編寫的jdbc資料庫與java集合框架相連接的程序源代碼

我以前用到的,oracle資料庫的:
package com.icool.common.util;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

/**
*
* @author ZH_Q
* @version 1.0
*/
public class GetConn {

private Connection conn=null;
private String usName;
private String usPwd;
private String Clfn;
private String dmName;

//調用空參構造,默認啟用本地資料庫
public GetConn() {
this.Clfn ="oracle.jdbc.driver.OracleDriver";
this.dmName ="jdbc:oracle:thin:@localhost:1521:orcl";
this.usPwd = "q792002998";
this.usName = "system";
}

/**
* @return 資料庫連接對象
*/
public Connection getConn() {

try
{
Class.forName(Clfn);
conn = DriverManager.getConnection(dmName,usName,usPwd);
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
}
return conn;
}
/**
* @return 根據SQL語句查詢出的結果集
* @throws SQLException
*/
public ResultSet executeQuery(String sql) throws SQLException {

conn =getConn();

Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(sql);

return rs;
}

/**
* @return 影響數據行數
* @throws SQLException
*/
public int executeUpdate(String sql) throws SQLException {

Statement stmt = null;
int i = 0;

getConn();

stmt = conn.createStatement();
i = stmt.executeUpdate(sql);

return i;
}

/**
* @return 根據SQL語句返回預編譯對象
* @throws SQLException
*/
public PreparedStatement PreparedStatement(String sql) throws SQLException {

PreparedStatement pstmt = null;

getConn();

pstmt= conn.prepareStatement(sql);

return pstmt;
}

/**
* @param 關閉資料庫連接
* @throws DataBaseExpection
*/
public void close(){
if(conn!=null) {
try
{
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}

/**
* @param 設置是否自動提交
* @throws SQLException
*/
public void setAutoCommit(boolean b) throws SQLException {

getConn();
conn.setAutoCommit(b);

}

public void commit() throws SQLException {

getConn();
conn.commit();

}
public void rollback() throws SQLException {

getConn();
conn.rollback();
}
}

③ 集合競價選股公式源碼

集合競價選股公式源碼的具體內容需要根據特定的選股策略和需求來編寫,無法直接給出一個通用的源碼。但我可以提供一個大致的框架和思路。

解釋:

集合競價選股公式通常是基於技術分析、基本面分析或其他選股策略來編寫的。源碼的編寫需要使用特定的編程語言,如Python、C++等,來實現選股的邏輯。以下是一個簡化的集合競價選股公式的編寫框架:

確定選股策略:首先確定你要採用的選股策略,如趨勢跟蹤、動量策略、價值投資等。明確策略後,可以進一步分析需要獲取的數據和計算指標。

數據獲取與處理:編寫代碼獲取股票的歷史數據,包括集合競價數據、日常交易數據等。數據獲取後需要進行清洗和處理,以得到用於分析和計算的純凈數據。

演算法實現:根據選股策略設計相應的演算法。例如,如果採用趨勢跟蹤策略,可能需要計算股票的移動平均線、相對強弱指數等。這些計算將用於判斷股票的走勢和買賣點。

規則判斷與信號輸出:基於計算的結果設定選股規則,如當股票價格突破某一水平時發出買入信號。在源碼中實現這些規則判斷,並輸出相應的信號。

優化與測試:對編寫的源碼進行優化和測試,確保其在不同市場環境下的穩定性和准確性。這可能需要使用歷史數據回測或實時交易測試等方法。

請注意,以上只是一個大致的框架,具體的源碼編寫需要根據具體的選股策略和需求來詳細設計和實現。如果你有更具體的需求或問題,可以提供更詳細的信息,以便得到更准確的答案。

熱點內容
安卓不跟手怎麼辦 發布:2025-10-08 10:01:00 瀏覽:503
android獲取所有文件 發布:2025-10-08 09:52:01 瀏覽:654
電腦怎麼樣才是高配置 發布:2025-10-08 09:50:44 瀏覽:944
通分子的演算法 發布:2025-10-08 09:36:15 瀏覽:379
Android繼承 發布:2025-10-08 09:25:06 瀏覽:417
如何跟客戶介紹車配置 發布:2025-10-08 09:25:00 瀏覽:76
百度網盤上傳速度慢怎麼辦 發布:2025-10-08 09:23:34 瀏覽:329
演算法手冊 發布:2025-10-08 09:13:34 瀏覽:991
wr703n編程器 發布:2025-10-08 08:56:20 瀏覽:160
c語言做推理 發布:2025-10-08 08:49:36 瀏覽:788