hive编译源码图
‘壹’ hive自定义udf函数,在udf函数内怎么读取hive表数据
最近感受了hive的udf函数的强大威力了,不仅可以使用很多已经有的udf函数,还可以自己定义符合业务场景的udf函数,下面就说一下如何写udf/udaf/udtf函数,算是一个入门介绍吧。
First, you need to create a new class that extends UDF, with one or more methods named evaluate.
package com.example.hive.udf;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
public final class Lower extends UDF {
public Text evaluate(final Text s) {
if (s == null) { return null; }
return new Text(s.toString().toLowerCase());
}
}
After compiling your code to a jar, you need to add this to the hive classpath.
add jar my_jar.jar;
Once hive is started up with your jars in the classpath, the final step is to register your function
create temporary function my_lower as 'com.example.hive.udf.Lower';
上面主要描述了实现一个udf的过程,首先自然是实现一个UDF函数,然后编译为jar并加入到hive的classpath中,最后创建一个临时变量名字让hive中调用。转载,仅供参考。
‘贰’ 源码级解读如何解决Spark-sql读取hive分区表执行效率低问题
问题描述
在开发过程中使用spark去读取hive分区表的过程中(或者使用hive on spark、nodepad开发工具),部分开发人员未注意添加分区属性过滤导致在执行过程中加载了全量数据,引起任务执行效率低、磁盘IO大量损耗等问题。
解决办法
1、自定义规则CheckPartitionTable类,实现Rule,通过以下方式创建SparkSession。
2、自定义规则CheckPartitionTable类,实现Rule,将规则类追加至Optimizer.batches: Seq[Batch]中,如下。
规则内容实现
1、CheckPartitionTable规则执行类,需要通过引入sparkSession从而获取到引入conf;需要继承Rule[LogicalPlan];
2、通过splitPredicates方法,分离分区谓词,得到分区谓词表达式。在sql解析过程中将谓词解析为TreeNode,此处采用递归的方式获取分区谓词。
3、判断是否是分区表,且是否添加分区字段。
4、实现Rule的apply方法
大数据和云计算的关系
大数据JUC面试题
大数据之Kafka集群部署
大数据logstsh架构
大数据技术kafka的零拷贝
‘叁’ spark thrift server 与 网易 kyuubi thrift server
thrift server可以实现通过jdbc, beeline等工具,实现连接到spark集群,并提交sql查询的机制。
默认情况下,cdh安装的spark没有包含thrift server模块,因此我们需要重新编译spark。
另外,为了不影响cdh自带的spark,而且spark目前都是基于yarn运行的,本身也没有什么独立的服务部署(除了history sever)。
所以,在一个集群中,可以部署安装多个版本的spark。
我们使用源码编译的spark 2.4.0(其中hive的版本是1.2.1)
cdh集成的spark版本和Hive版本如下:
使用jdk1.8
修改spark提供的mvn,使用自行安装的maven 3.8.1
使用make-distribution.sh可以帮助与我们编译之后打包成tgz文件
修改pom.xml文件的配置如下。
最后,执行编译命令如下:
这样打出的包,就含有thrift server的jar包了。
最终打包文件,根目录下。
之后就是解压到其他目录下后即可。
将hive-site.xml的文件连接过来,这样spark就可以读取hive的表了。
为了确保spark提交到yarn上运行,需要配置
cp spark-defaults.conf.template spar-defaults.conf
另外,可以在spark-env.sh中设置环境变量。
HADOOP_CONF_DIR
环境变量,也可以在/etc/profile中设置
启动日志可以查看,注意下端口占用问题,如下。
启动时候,使用beeline工具连接上,主要这里不用使用cdh默认安装hive提供的beeline工具,应为版本太高。
使用编译后spark生成beeline工具
参考beeline使用教程。
https://github.com/apache/incubator-kyuubi
kyuubi是基于thrift sever二次开发,在系能和安全上优于thrift server。
鉴于目前hive的版本是2.1,而最新的kyuubi的hive是2.3,所以采用前天版本的kyuubi,采用0.7版本,保证hive的版本小于当前集群中的hive版本。
使用build目录下的dist脚本进行编译和打包。
编译成功后,会在更目录下出现tar.gz的压缩文件,如上图。
之后解压到目录下。
配置bin/kyuubi-env.sh脚本,设置spark路径
执行bin/start-kyuubi.sh命令即可。
访问的方式同样采用beelin,注意使用上面章节的beeline工具。
访问后,可以通过beeline访问到hive的表(在spark中已经配置了hive-site.xml)
!connect jdbc: hive2://xxxx:10009 即可。
‘肆’ 如何用sql来写动态sql,本文主要是hiveql
动态SQL语句在编译时,并不知道SQL语句的内容,SQL语句的内容“不确定”,只有在运行时,才建立、解析并执行SQL语句。利用动态SQL,在存储过程中,可以动态创建表、视图、触发器等。
动态SQL主要用在以下两种场景:
编译时,无法确定SQL语句的内容
静态SQL不支持的SQL语句,就比如上面代码中的create
我们可以看到,静态SQL在编译时就已经提前检查了SQL正确性,以及涉及的数据库对象和对应的权限关系,而动态SQL则需要在运行的时候才能判断,所以,静态SQL的效率高于动态SQL。说了这么多概念的东西,我们现在就来实际看看如何编写动态SQL,以及如何运行动态SQL。
‘伍’ hue/oozie 调度shell执行hive脚本
前面已经有篇文章介绍如何编译包含hive的spark-assembly.jar了,不清楚的可以翻看一下前面的文章。clouderamanager装好的spark,直接执行spark-shell进入命令行后,写入如下语句:valhiveContext=neworg.apache.spark.sql.hive.HiveContext(sc)你会发现没法执行通过,因为cm装的原生的spark是不支持sparkhql的,我们需要手动进行一些调整:第一步,将编译好的包含hive的JAR包上传到hdfs上配置的默认的spark的sharelib目录:/user/spark/share/lib第二步:在你要运行spark-shell脚本的节点上的/opt/cloudera/parcels/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/spark/lib/目录下面,下载这个jar到这个目录:hadoopfs-gethdfs://n1:8020/user/spark/share/lib/spark-assembly-with-hive-maven.jar(具体路径替换成你自己的)。然后这个目录下面原来会有个软链接spark-assembly.jar指向的是spark-assembly-1.2.0-cdh5.3.0-hadoop2.5.0-cdh5.3.0.jar,我们把这个软链接删除掉重新创建一个同名的软链接:ln-sspark-assembly-with-hive-maven.jarspark-assembly.jar,指向我们刚下载下来的那个JAR包,这个JAR包会在启动spark-shell脚本时装载到driverprogram的classpath中去的,sparkContext也是在driver中创建出来的,所以需要将我们编译的JAR包替换掉原来的spark-assembly.jar包,这样在启动spark-shell的时候,包含hive的spark-assembly就被装载到classpath中去了。第三步:在/opt/cloudera/parcels/CDH/lib/spark/conf/目录下面创建一个hive-site.xml。/opt/cloudera/parcels/CDH/lib/spark/conf目录是默认的spark的配置目录,当然你可以修改默认配置目录的位置。hive-site.xml内容如下:hive.metastore.localfalsehive.metastore.uristhrift://n1:9083hive.metastore.client.socket.timeout300hive.metastore.warehouse.dir/user/hive/warehouse这个应该大家都懂的,总要让spark找到hive的元数据在哪吧,于是就有了上面一些配置。第四步:修改/opt/cloudera/parcels/CDH/lib/spark/conf/spark-defaults.conf,添加一个属性:spark.yarn.jar=hdfs://n1:8020/user/spark/share/lib/spark-assembly-with-hive-maven.jar。这个是让每个executor下载到本地然后装载到自己的classpath下面去的,主要是用在yarn-cluster模式。local模式由于driver和executor是同一个进程所以没关系。以上完事之后,运行spark-shell,再输入:valhiveContext=neworg.apache.spark.sql.hive.HiveContext(sc)应该就没问题了。我们再执行一个语句验证一下是不是连接的我们指定的hive元数据库:hiveContext.sql("showtables").take(10)//取前十个表看看最后要重点说明一下这里的第二步第三步和第四步,如果是yarn-cluster模式的话,应该替换掉集群所有节点的spark-assembly.jar集群所有节点的sparkconf目录都需要添加hive-site.xml,每个节点spark-defaults.conf都需要添加spark.yarn.jar=hdfs://n1:8020/user/spark/share/lib/spark-assembly-with-hive-maven.jar。可以写个shell脚本来替换,不然手动一个一个节点去替换也是蛮累的。
‘陆’ hive核心组件及流程(一)
依赖第三方组件: Meta store(mysql),hdfs,MapRece
hive:
Client客户端 CLI、JDBC
Driver连接客户端与服务端的桥梁
SQL Pareser解析器,将SQL转换为抽象语法树AST
1.将HQL语句转换为Token
2.对Token进行解析,生成AST
Physical Plan编译器将AST编译生成逻激虚帆辑誉磨执行计划
Query Optimizer优化器,对逻辑执行计划进行优化
1.将AST转换为QueryBlock
2.将QueryBlock转换为OperatorTree
3.OperatorTree进行逻辑优化
4.生成TaskTree
5.TaskTree执行物理优化
Execution执行器把逻辑执行计划转换成可以运行的物理计划
1.获取MR临时工作目录
3.定义Mapper和Recer
2.定义Partitioner
4.实例化Job
5.提交Job
1.以Antlr定义的语法规则,对SQL完成词法解析,将SQL转换为AST
2.遍历AST,抽象出查询基本组成单元QueryBlock。
3.遍历QueryBlock,将其转换为OperatorTree,逻辑执行单元
4.利用逻辑优化器对OperatorTree进行逻辑优化。
5.遍历OperatorTree转换为TaskTree,将逻辑执行计划转化为物理执行计划
6.使用物理优化器对TaskTree进行物理优化
7.生成最终的执行计划,提交执行
$HIVE_HOME/bin/hive可以进入客户端
$HIVE_HOME/bin/hive -e "{SQL语句}"可以执行SQL语句
$HIVE_HOME/bin/hive -f {SQL文件名.sql}可以执行sql文件
开启hiveserver2服务,可以通过JDBC提交SQL
创建Driver
创建OptionsProcessor
初始化log4j
标准输入输出以及错误输出流的定义,后续需要输入 SQL 以及打印控制台信息
解析输入的参数,包含"-e -f -v -database"
读取输入的sql
按照";"分割的方式解析
解析单行SQL
遇到为"quit"或者"exit"退出
遇到为"source"开头,执行 SQL 文件,读取文件并解析
如果命令以"!"开头,则表示用户需要执行 shell命令
以上三种都不是的情况下执行SQL,进行SQL解析
获取当前系统时间
获取系统结束时间
编译SQL语句
SQL生成AST,构建词法解析器,将关键词替换为TOKEN,明雹进行语法解析,生成最终AST
处理AST,转换为QueryBlock然后转换为OperatorTree,对Operator进行逻辑优化,然后转换为任务树,然后进行物理优化。
根据任务树构建MrJob
添加启动任务,根据是否可以并行来决定是否并行启动Task
设置MR任务的InputFormat、OutputFormat 等等这些 MRJob 的执行类
构建执行MR任务的命令
向yarn提交任务
打印头信息
获取结果集并获取抓取到的条数
打印SQL执行时间及数据条数
‘柒’ Hive UDF 第一篇:怎么实现自己的 hive 自定义函数
看到这篇文章的同学相信都是 大数据相关技术爱好者 、从业人员或者业界大佬。这篇文章篇基础,属于上手性指引,如有不对的地方欢迎指正
文章的源代码github地址在文末
为什么要有 UDF
纵然 内置函数(Build-in Function)再丰富,也会遇到 无法完全满足我们特定场景的需要,这时我们需要 UDF;或者我们通过内置函数可以实现,但是业务代码冗长,又或者业务代码不仅冗长而且可能要多处维护,那么这时候我们需要 UDF。
强如 Oracle 这一霸占传统金融行业大部分数据库份额的厂商,一样提供 用户自定义函数的接口,Hive 这一开源生态的神级项目,自然也不会少。
总之:UDF 就是为灵活性而存在,为可扩展性而存在
选择合适的 UDF
按照不同使用场景需要, Hive 已经为我们将 UDF 归为3类,分别对应3种不同使用场景
1. 转换函数 UDF :适用于行级别转换操作,数据行中的一列后者几列,生成一列或者几列,效源世果可参照 内置函数的 upper() 讲一列的字符串所有字符 统一转换为 大写字符;
2. 聚合函数 UDAF: 适用与多行进行聚合成一行,或者多行分组聚合成相对小的多行,效果参照 内置函数 sum() ,一般配合 group by 使用较多
3. 表生成函数 UDTF: 适用一行生成多行场景 ,效果参照 内置函数 explode()
实现你的 UDF 逻辑
1. 转换函数UDF 实现 :
以前在 Oracle 上使用 decode 做 枚举型值转译用得方便,到 hive 上了 没有这么方便的函数,只能用 case when 或者 if 这种相对繁琐,代码行数偏大
为实现这个UDF,我们需要继承Hive相应类,实现3个函数
1. 第一个函数,初始化:这里做2个事情
1> 检查传入字段的合法性,我兆正这里函数使用时传入参数必须是大于等于4个的 偶数,参照 oracle decode
2> 我这里返回的是 字符串类型(如果你在这里有疑问:为什么返回字符串要这样做, 我在第二篇中努力讲清楚)
如果有些需要共享连接之类资源的场景,也可以在这里做,这个方法是 每生成一个 mapper对象 执行一次,也可以理解: MR 中 每个 parttion 才执行一次 ,我在实际工作中有一个 需要远端解密的场景就用到了
2. 第二个函数,是真正转换函数
3. 第三个函数,hive收集元数据信息的函数
2. 聚合函数 UDAF实现:
抱歉,我并没有合适的公开的 案例代码直接贴出来,后面有合适的
3. 表生成函数 UDTF 实现:
表生成函数有很多使用场景,我这直接上我的实例:我们在 实际数据仓库研发中,对缓慢变化维度 经常使用 拉链雹猜肢表 来实现;而在 hive 中是不支持 不等值条件写在 on 条件里 ,只能通过 where 子句实现,但 where 子句明显是过程中产生 笛卡尔积 的做法,
为规避这个情况 写了一个通用的 日粒度拉链表 爆炸展开的 函数, 我们在 与事实表连接时 将很方便地实现 等值连接,也方便 HIve 或者 Spark 的 执行计划 优化,避免全量的shulffe,等值join 会方便 按连接键分区
实现UDTF 需要继承 相应类,实现 3个函数
1. 第一个函数,初始化:所做事情参照 UDF
2. 第二个函数,循环生成记录并 通过 forward 函数输出
3. 第三个函数,关闭函数,按需使用
我这里什么都没做
注册UDF 函数
临时 UDF,当前会话
永久且全局 UDF
这样做之后,会在 hive 的元数据库中查到函数的信息,若遇到 跨 session 无法使用,可选择 在相应的 session 中做函数重载
全局 UDF 不仅可在 Hive 中使用,亦可在 spark sql 中使用 ,hive 的 hiveserver2 的其他 jdbc/odbc 连接中使用
[这篇文章完整代码在我的 github 上开源项目中,其中包含 类decode/日期转 星座 /IPv4与整数互转 以及适用 拉链表爆炸展开的表生成函数](https://github.com/Jiafan/hudf)
参考
为了进一步学习 和使用 UDF相关的功能,可以参考 官方文档 和 Clouder 文档
[Hive官方文档-UDF](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-Built-inOperators)
[Cloudera的关于Hive UDF 的文档](https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.0.0/using-hiveql/content/hive_create_udf.html)
下一篇
Hive UDF 第二篇:实现UDF为什么要实现这几个方法
[源代码仓库](https://github.com/Jiafan/hudf.git)
‘捌’ Hive内置函数之时间函数
零、生产常用组合方式
(0.1)离线数仓获取昨天的日期作为分区,格式yyyyMMdd
regexp_replace(date_sub(from_unixtime(unix_timestamp(),'yyyy-MM-dd'),1) ,'-','')
或者
date_format(date_sub(from_unixtime(unix_timestamp(),'yyyy-MM-dd'),1),'yyyyMMdd')
一、源码部分
Hive的函数类为:org.apache.hadoop.hive.ql.exec.FunctionRegistry
二、常用时间函数
对于函数,除了知道怎么用,还需要知道返回值是什么类型,这里给出官方文档,文档中给出了函数的返回值类型
官方文档见: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-DateFunctions
(2.1)from_unixtime(bigint unixtime[, string format])
示例:
select from_unixtime(1591627588); -- 2020-06-08 22:46:28
select from_unixtime(1591627588,'yyyyMMddHHmmss'); -- 20200608224628
(2.2)unix_timestamp()、unix_timestamp(string date)、unix_timestamp(string date, string pattern)
示例:
select unix_timestamp('2020-06-08 22:50:00'); -- 1591627800
select unix_timestamp('20200608225000','yyyyMMddHHmmss'); -- 1591627800
(2.3)to_date(string timestamp)
示例:
SELECT to_date('2009-07-30 04:17:52'); -- 2009-07-30
(2.4)year(string date)、month(string date)、day(string date)、hour(string date)、minute(string date)、second(string date)
这些函数是差不多的,都是从一个时间字符串中抽取出某个特定的时间字段。具有相同功能的还有extract(field FROM source)函数
示例:
SELECT day('2009-07-29 20:30:40'); -- 29
SELECT minute('2009-07-29 20:30:40'); -- 30
(2.5)date_add(date/timestamp/string startdate, tinyint/smallint/int days)、date_sub(date/timestamp/string startdate, tinyint/smallint/int days)
这两个功能是类似的
示例:
SELECT date_add('2009-07-30 20:50:59', 1); -- 2009-07-31
(2.6)datediff(string enddate, string startdate)
截图中结果是错误的,应该为-1。
示例:
SELECT datediff('2009-06-30', '2009-07-02'); -- -2
SELECT datediff('2009-07-30', '2009-07-28'); -- 2
(2.7)current_date、current_timestamp
这两个函数使用desc function extended 查看会报错
示例:
(2.8)date_format(date/timestamp/string ts, string fmt)
示例:
SELECT date_format('2015-04-08', 'yyyyMMdd'); -- 20150408