hive編譯源碼圖
『壹』 hive自定義udf函數,在udf函數內怎麼讀取hive表數據
最近感受了hive的udf函數的強大威力了,不僅可以使用很多已經有的udf函數,還可以自己定義符合業務場景的udf函數,下面就說一下如何寫udf/udaf/udtf函數,算是一個入門介紹吧。
First, you need to create a new class that extends UDF, with one or more methods named evaluate.
package com.example.hive.udf;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
public final class Lower extends UDF {
public Text evaluate(final Text s) {
if (s == null) { return null; }
return new Text(s.toString().toLowerCase());
}
}
After compiling your code to a jar, you need to add this to the hive classpath.
add jar my_jar.jar;
Once hive is started up with your jars in the classpath, the final step is to register your function
create temporary function my_lower as 'com.example.hive.udf.Lower';
上面主要描述了實現一個udf的過程,首先自然是實現一個UDF函數,然後編譯為jar並加入到hive的classpath中,最後創建一個臨時變數名字讓hive中調用。轉載,僅供參考。
『貳』 源碼級解讀如何解決Spark-sql讀取hive分區表執行效率低問題
問題描述
在開發過程中使用spark去讀取hive分區表的過程中(或者使用hive on spark、nodepad開發工具),部分開發人員未注意添加分區屬性過濾導致在執行過程中載入了全量數據,引起任務執行效率低、磁碟IO大量損耗等問題。
解決辦法
1、自定義規則CheckPartitionTable類,實現Rule,通過以下方式創建SparkSession。
2、自定義規則CheckPartitionTable類,實現Rule,將規則類追加至Optimizer.batches: Seq[Batch]中,如下。
規則內容實現
1、CheckPartitionTable規則執行類,需要通過引入sparkSession從而獲取到引入conf;需要繼承Rule[LogicalPlan];
2、通過splitPredicates方法,分離分區謂詞,得到分區謂詞表達式。在sql解析過程中將謂詞解析為TreeNode,此處採用遞歸的方式獲取分區謂詞。
3、判斷是否是分區表,且是否添加分區欄位。
4、實現Rule的apply方法
大數據和雲計算的關系
大數據JUC面試題
大數據之Kafka集群部署
大數據logstsh架構
大數據技術kafka的零拷貝
『叄』 spark thrift server 與 網易 kyuubi thrift server
thrift server可以實現通過jdbc, beeline等工具,實現連接到spark集群,並提交sql查詢的機制。
默認情況下,cdh安裝的spark沒有包含thrift server模塊,因此我們需要重新編譯spark。
另外,為了不影響cdh自帶的spark,而且spark目前都是基於yarn運行的,本身也沒有什麼獨立的服務部署(除了history sever)。
所以,在一個集群中,可以部署安裝多個版本的spark。
我們使用源碼編譯的spark 2.4.0(其中hive的版本是1.2.1)
cdh集成的spark版本和Hive版本如下:
使用jdk1.8
修改spark提供的mvn,使用自行安裝的maven 3.8.1
使用make-distribution.sh可以幫助與我們編譯之後打包成tgz文件
修改pom.xml文件的配置如下。
最後,執行編譯命令如下:
這樣打出的包,就含有thrift server的jar包了。
最終打包文件,根目錄下。
之後就是解壓到其他目錄下後即可。
將hive-site.xml的文件連接過來,這樣spark就可以讀取hive的表了。
為了確保spark提交到yarn上運行,需要配置
cp spark-defaults.conf.template spar-defaults.conf
另外,可以在spark-env.sh中設置環境變數。
HADOOP_CONF_DIR
環境變數,也可以在/etc/profile中設置
啟動日誌可以查看,注意下埠佔用問題,如下。
啟動時候,使用beeline工具連接上,主要這里不用使用cdh默認安裝hive提供的beeline工具,應為版本太高。
使用編譯後spark生成beeline工具
參考beeline使用教程。
https://github.com/apache/incubator-kyuubi
kyuubi是基於thrift sever二次開發,在系能和安全上優於thrift server。
鑒於目前hive的版本是2.1,而最新的kyuubi的hive是2.3,所以採用前天版本的kyuubi,採用0.7版本,保證hive的版本小於當前集群中的hive版本。
使用build目錄下的dist腳本進行編譯和打包。
編譯成功後,會在更目錄下出現tar.gz的壓縮文件,如上圖。
之後解壓到目錄下。
配置bin/kyuubi-env.sh腳本,設置spark路徑
執行bin/start-kyuubi.sh命令即可。
訪問的方式同樣採用beelin,注意使用上面章節的beeline工具。
訪問後,可以通過beeline訪問到hive的表(在spark中已經配置了hive-site.xml)
!connect jdbc: hive2://xxxx:10009 即可。
『肆』 如何用sql來寫動態sql,本文主要是hiveql
動態SQL語句在編譯時,並不知道SQL語句的內容,SQL語句的內容「不確定」,只有在運行時,才建立、解析並執行SQL語句。利用動態SQL,在存儲過程中,可以動態創建表、視圖、觸發器等。
動態SQL主要用在以下兩種場景:
編譯時,無法確定SQL語句的內容
靜態SQL不支持的SQL語句,就比如上面代碼中的create
我們可以看到,靜態SQL在編譯時就已經提前檢查了SQL正確性,以及涉及的資料庫對象和對應的許可權關系,而動態SQL則需要在運行的時候才能判斷,所以,靜態SQL的效率高於動態SQL。說了這么多概念的東西,我們現在就來實際看看如何編寫動態SQL,以及如何運行動態SQL。
『伍』 hue/oozie 調度shell執行hive腳本
前面已經有篇文章介紹如何編譯包含hive的spark-assembly.jar了,不清楚的可以翻看一下前面的文章。clouderamanager裝好的spark,直接執行spark-shell進入命令行後,寫入如下語句:valhiveContext=neworg.apache.spark.sql.hive.HiveContext(sc)你會發現沒法執行通過,因為cm裝的原生的spark是不支持sparkhql的,我們需要手動進行一些調整:第一步,將編譯好的包含hive的JAR包上傳到hdfs上配置的默認的spark的sharelib目錄:/user/spark/share/lib第二步:在你要運行spark-shell腳本的節點上的/opt/cloudera/parcels/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/spark/lib/目錄下面,下載這個jar到這個目錄:hadoopfs-gethdfs://n1:8020/user/spark/share/lib/spark-assembly-with-hive-maven.jar(具體路徑替換成你自己的)。然後這個目錄下面原來會有個軟鏈接spark-assembly.jar指向的是spark-assembly-1.2.0-cdh5.3.0-hadoop2.5.0-cdh5.3.0.jar,我們把這個軟鏈接刪除掉重新創建一個同名的軟鏈接:ln-sspark-assembly-with-hive-maven.jarspark-assembly.jar,指向我們剛下載下來的那個JAR包,這個JAR包會在啟動spark-shell腳本時裝載到driverprogram的classpath中去的,sparkContext也是在driver中創建出來的,所以需要將我們編譯的JAR包替換掉原來的spark-assembly.jar包,這樣在啟動spark-shell的時候,包含hive的spark-assembly就被裝載到classpath中去了。第三步:在/opt/cloudera/parcels/CDH/lib/spark/conf/目錄下面創建一個hive-site.xml。/opt/cloudera/parcels/CDH/lib/spark/conf目錄是默認的spark的配置目錄,當然你可以修改默認配置目錄的位置。hive-site.xml內容如下:hive.metastore.localfalsehive.metastore.uristhrift://n1:9083hive.metastore.client.socket.timeout300hive.metastore.warehouse.dir/user/hive/warehouse這個應該大家都懂的,總要讓spark找到hive的元數據在哪吧,於是就有了上面一些配置。第四步:修改/opt/cloudera/parcels/CDH/lib/spark/conf/spark-defaults.conf,添加一個屬性:spark.yarn.jar=hdfs://n1:8020/user/spark/share/lib/spark-assembly-with-hive-maven.jar。這個是讓每個executor下載到本地然後裝載到自己的classpath下面去的,主要是用在yarn-cluster模式。local模式由於driver和executor是同一個進程所以沒關系。以上完事之後,運行spark-shell,再輸入:valhiveContext=neworg.apache.spark.sql.hive.HiveContext(sc)應該就沒問題了。我們再執行一個語句驗證一下是不是連接的我們指定的hive元資料庫:hiveContext.sql("showtables").take(10)//取前十個表看看最後要重點說明一下這里的第二步第三步和第四步,如果是yarn-cluster模式的話,應該替換掉集群所有節點的spark-assembly.jar集群所有節點的sparkconf目錄都需要添加hive-site.xml,每個節點spark-defaults.conf都需要添加spark.yarn.jar=hdfs://n1:8020/user/spark/share/lib/spark-assembly-with-hive-maven.jar。可以寫個shell腳本來替換,不然手動一個一個節點去替換也是蠻累的。
『陸』 hive核心組件及流程(一)
依賴第三方組件: Meta store(mysql),hdfs,MapRece
hive:
Client客戶端 CLI、JDBC
Driver連接客戶端與服務端的橋梁
SQL Pareser解析器,將SQL轉換為抽象語法樹AST
1.將HQL語句轉換為Token
2.對Token進行解析,生成AST
Physical Plan編譯器將AST編譯生成邏激虛帆輯譽磨執行計劃
Query Optimizer優化器,對邏輯執行計劃進行優化
1.將AST轉換為QueryBlock
2.將QueryBlock轉換為OperatorTree
3.OperatorTree進行邏輯優化
4.生成TaskTree
5.TaskTree執行物理優化
Execution執行器把邏輯執行計劃轉換成可以運行的物理計劃
1.獲取MR臨時工作目錄
3.定義Mapper和Recer
2.定義Partitioner
4.實例化Job
5.提交Job
1.以Antlr定義的語法規則,對SQL完成詞法解析,將SQL轉換為AST
2.遍歷AST,抽象出查詢基本組成單元QueryBlock。
3.遍歷QueryBlock,將其轉換為OperatorTree,邏輯執行單元
4.利用邏輯優化器對OperatorTree進行邏輯優化。
5.遍歷OperatorTree轉換為TaskTree,將邏輯執行計劃轉化為物理執行計劃
6.使用物理優化器對TaskTree進行物理優化
7.生成最終的執行計劃,提交執行
$HIVE_HOME/bin/hive可以進入客戶端
$HIVE_HOME/bin/hive -e "{SQL語句}"可以執行SQL語句
$HIVE_HOME/bin/hive -f {SQL文件名.sql}可以執行sql文件
開啟hiveserver2服務,可以通過JDBC提交SQL
創建Driver
創建OptionsProcessor
初始化log4j
標准輸入輸出以及錯誤輸出流的定義,後續需要輸入 SQL 以及列印控制台信息
解析輸入的參數,包含"-e -f -v -database"
讀取輸入的sql
按照";"分割的方式解析
解析單行SQL
遇到為"quit"或者"exit"退出
遇到為"source"開頭,執行 SQL 文件,讀取文件並解析
如果命令以"!"開頭,則表示用戶需要執行 shell命令
以上三種都不是的情況下執行SQL,進行SQL解析
獲取當前系統時間
獲取系統結束時間
編譯SQL語句
SQL生成AST,構建詞法解析器,將關鍵詞替換為TOKEN,明雹進行語法解析,生成最終AST
處理AST,轉換為QueryBlock然後轉換為OperatorTree,對Operator進行邏輯優化,然後轉換為任務樹,然後進行物理優化。
根據任務樹構建MrJob
添加啟動任務,根據是否可以並行來決定是否並行啟動Task
設置MR任務的InputFormat、OutputFormat 等等這些 MRJob 的執行類
構建執行MR任務的命令
向yarn提交任務
列印頭信息
獲取結果集並獲取抓取到的條數
列印SQL執行時間及數據條數
『柒』 Hive UDF 第一篇:怎麼實現自己的 hive 自定義函數
看到這篇文章的同學相信都是 大數據相關技術愛好者 、從業人員或者業界大佬。這篇文章篇基礎,屬於上手性指引,如有不對的地方歡迎指正
文章的源代碼github地址在文末
為什麼要有 UDF
縱然 內置函數(Build-in Function)再豐富,也會遇到 無法完全滿足我們特定場景的需要,這時我們需要 UDF;或者我們通過內置函數可以實現,但是業務代碼冗長,又或者業務代碼不僅冗長而且可能要多處維護,那麼這時候我們需要 UDF。
強如 Oracle 這一霸佔傳統金融行業大部分資料庫份額的廠商,一樣提供 用戶自定義函數的介面,Hive 這一開源生態的神級項目,自然也不會少。
總之:UDF 就是為靈活性而存在,為可擴展性而存在
選擇合適的 UDF
按照不同使用場景需要, Hive 已經為我們將 UDF 歸為3類,分別對應3種不同使用場景
1. 轉換函數 UDF :適用於行級別轉換操作,數據行中的一列後者幾列,生成一列或者幾列,效源世果可參照 內置函數的 upper() 講一列的字元串所有字元 統一轉換為 大寫字元;
2. 聚合函數 UDAF: 適用與多行進行聚合成一行,或者多行分組聚合成相對小的多行,效果參照 內置函數 sum() ,一般配合 group by 使用較多
3. 表生成函數 UDTF: 適用一行生成多行場景 ,效果參照 內置函數 explode()
實現你的 UDF 邏輯
1. 轉換函數UDF 實現 :
以前在 Oracle 上使用 decode 做 枚舉型值轉譯用得方便,到 hive 上了 沒有這么方便的函數,只能用 case when 或者 if 這種相對繁瑣,代碼行數偏大
為實現這個UDF,我們需要繼承Hive相應類,實現3個函數
1. 第一個函數,初始化:這里做2個事情
1> 檢查傳入欄位的合法性,我兆正這里函數使用時傳入參數必須是大於等於4個的 偶數,參照 oracle decode
2> 我這里返回的是 字元串類型(如果你在這里有疑問:為什麼返回字元串要這樣做, 我在第二篇中努力講清楚)
如果有些需要共享連接之類資源的場景,也可以在這里做,這個方法是 每生成一個 mapper對象 執行一次,也可以理解: MR 中 每個 parttion 才執行一次 ,我在實際工作中有一個 需要遠端解密的場景就用到了
2. 第二個函數,是真正轉換函數
3. 第三個函數,hive收集元數據信息的函數
2. 聚合函數 UDAF實現:
抱歉,我並沒有合適的公開的 案例代碼直接貼出來,後面有合適的
3. 表生成函數 UDTF 實現:
表生成函數有很多使用場景,我這直接上我的實例:我們在 實際數據倉庫研發中,對緩慢變化維度 經常使用 拉鏈雹猜肢表 來實現;而在 hive 中是不支持 不等值條件寫在 on 條件里 ,只能通過 where 子句實現,但 where 子句明顯是過程中產生 笛卡爾積 的做法,
為規避這個情況 寫了一個通用的 日粒度拉鏈表 爆炸展開的 函數, 我們在 與事實表連接時 將很方便地實現 等值連接,也方便 HIve 或者 Spark 的 執行計劃 優化,避免全量的shulffe,等值join 會方便 按連接鍵分區
實現UDTF 需要繼承 相應類,實現 3個函數
1. 第一個函數,初始化:所做事情參照 UDF
2. 第二個函數,循環生成記錄並 通過 forward 函數輸出
3. 第三個函數,關閉函數,按需使用
我這里什麼都沒做
注冊UDF 函數
臨時 UDF,當前會話
永久且全局 UDF
這樣做之後,會在 hive 的元資料庫中查到函數的信息,若遇到 跨 session 無法使用,可選擇 在相應的 session 中做函數重載
全局 UDF 不僅可在 Hive 中使用,亦可在 spark sql 中使用 ,hive 的 hiveserver2 的其他 jdbc/odbc 連接中使用
[這篇文章完整代碼在我的 github 上開源項目中,其中包含 類decode/日期轉 星座 /IPv4與整數互轉 以及適用 拉鏈表爆炸展開的表生成函數](https://github.com/Jiafan/hudf)
參考
為了進一步學習 和使用 UDF相關的功能,可以參考 官方文檔 和 Clouder 文檔
[Hive官方文檔-UDF](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-Built-inOperators)
[Cloudera的關於Hive UDF 的文檔](https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.0.0/using-hiveql/content/hive_create_udf.html)
下一篇
Hive UDF 第二篇:實現UDF為什麼要實現這幾個方法
[源代碼倉庫](https://github.com/Jiafan/hudf.git)
『捌』 Hive內置函數之時間函數
零、生產常用組合方式
(0.1)離線數倉獲取昨天的日期作為分區,格式yyyyMMdd
regexp_replace(date_sub(from_unixtime(unix_timestamp(),'yyyy-MM-dd'),1) ,'-','')
或者
date_format(date_sub(from_unixtime(unix_timestamp(),'yyyy-MM-dd'),1),'yyyyMMdd')
一、源碼部分
Hive的函數類為:org.apache.hadoop.hive.ql.exec.FunctionRegistry
二、常用時間函數
對於函數,除了知道怎麼用,還需要知道返回值是什麼類型,這里給出官方文檔,文檔中給出了函數的返回值類型
官方文檔見: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-DateFunctions
(2.1)from_unixtime(bigint unixtime[, string format])
示例:
select from_unixtime(1591627588); -- 2020-06-08 22:46:28
select from_unixtime(1591627588,'yyyyMMddHHmmss'); -- 20200608224628
(2.2)unix_timestamp()、unix_timestamp(string date)、unix_timestamp(string date, string pattern)
示例:
select unix_timestamp('2020-06-08 22:50:00'); -- 1591627800
select unix_timestamp('20200608225000','yyyyMMddHHmmss'); -- 1591627800
(2.3)to_date(string timestamp)
示例:
SELECT to_date('2009-07-30 04:17:52'); -- 2009-07-30
(2.4)year(string date)、month(string date)、day(string date)、hour(string date)、minute(string date)、second(string date)
這些函數是差不多的,都是從一個時間字元串中抽取出某個特定的時間欄位。具有相同功能的還有extract(field FROM source)函數
示例:
SELECT day('2009-07-29 20:30:40'); -- 29
SELECT minute('2009-07-29 20:30:40'); -- 30
(2.5)date_add(date/timestamp/string startdate, tinyint/smallint/int days)、date_sub(date/timestamp/string startdate, tinyint/smallint/int days)
這兩個功能是類似的
示例:
SELECT date_add('2009-07-30 20:50:59', 1); -- 2009-07-31
(2.6)datediff(string enddate, string startdate)
截圖中結果是錯誤的,應該為-1。
示例:
SELECT datediff('2009-06-30', '2009-07-02'); -- -2
SELECT datediff('2009-07-30', '2009-07-28'); -- 2
(2.7)current_date、current_timestamp
這兩個函數使用desc function extended 查看會報錯
示例:
(2.8)date_format(date/timestamp/string ts, string fmt)
示例:
SELECT date_format('2015-04-08', 'yyyyMMdd'); -- 20150408