flink如何管理配置
Ⅰ FlinkX快速开始(一)
链接地址: FlinkX
FlinkX是一个基于Flink的批流统一的数据同步工具,既可以采集静态的数据,比如Mysql,HDFS等,也可以采集实时变化的数据,比如MySQL binlog,Kafka等
1、前置
需要安装maven、java8、配置好github相关参数
2、Fork FlinX项目到自己的仓库中
2、Clone项目到本地
git clone https://github.com/liukunyuan/flinkx.git
3、安装额外的jar包
1)、cd flinkx/bin
2)、执行sh ./install_jars.sh(windows执行install_jars.bat脚本)
4、打包
1)、回到flinkx目录:cd ..
2)、执行打包命令:mvn clean package -Dmaven.test.skip=true
1、配置flink conf文件(暂时不需要安装flink)
1)、进入flinkconf目录
cd flinkconf
2)、修改flink-conf.yaml文件添加一行
rest.bind-port: 8888
2、配置mysqltomysql的json文件,路径:/Users/jack/Documents/jack-project/flinkx/flinkconf/mysql2mysql.json
3、运行任务
4、查看监控网页和log.txt文件: http://localhost:8888/
Ⅱ Flink运行模式
在idea中运行Flink程序的方式就是开发模式。
Flink中的Local-cluster(本地集群)模式,单节点运行,主要用于测试, 学习。
独立集群模式,由Flink自身提供计算资源。
把Flink应用提交给Yarn的ResourceManager
Flink会根据运行在JobManger上的job的需要的slot的数量动态的分配TaskManager资源
Yarn又分3种模式
Session-Cluster模式需要先启动Flink集群,向Yarn申请资源。以后提交任务都向这里提交。
这个Flink集群会常驻在yarn集群中,除非手工停止。
在向Flink集群提交Job的时候, 如果资源被用完了,则新的Job不能正常提交.
缺点: 如果提交的作业中有长时间执行的大作业, 占用了该Flink集群的所有资源, 则后续无法提交新的job.
所以, Session-Cluster适合那些需要频繁提交的多个小Job, 并且执行时间都不长的Job.
一个Job会对应一个Flink集群,每提交一个作业会根据自身的情况,都会单独向yarn申请资源,直到作业执行完成,一个作业的失败与否并不会影响下一个作业的正常提交和运行。独享Dispatcher和ResourceManager,按需接受资源申请;适合规模大长时间运行的作业。
每次提交都会创建一个新的flink集群,任务之间互相独立,互不影响,方便管理。任务执行完成之后创建的集群也会消失。
Per-job模式执行结果,一个job对应一个Application
Application Mode会在Yarn上启动集群, 应用jar包的main函数(用户类的main函数)将会在JobManager上执行. 只要应用程序执行结束, Flink集群会马上被关闭. 也可以手动停止集群.
与Per-Job-Cluster的区别:就是Application Mode下, 用户的main函数式在集群中执行的,并且当一个application中有多个job的话,per-job模式则是一个job对应一个yarn中的application,而Application Mode则这个application中对应多个job。
Application Mode模式执行结果,多个job对应一个Application
官方建议:
出于生产的需求, 我们建议使用Per-job or Application Mode,因为他们给应用提供了更好的隔离!
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/
0.Flink任务提交后,Client向HDFS上传Flink的Jar包和配置
1.向Yarn ResourceManager提交任务,ResourceManager分配Container资源
2.通知对应的NodeManager启动ApplicationMaster,ApplicationMaster启动后加载Flink的Jar包和配置构建环境,然后启动JobManager(Dispatcher)
2.1.Dispatcher启动JobMaster
3.JobMaster向ResourceManager(Flink)申请资源
4.ResourceManager(Flink)向ResourceManager(Yarn)申请资源启动TaskManager
5.ResourceManager分配Container资源后,由ApplicationMaster通知资源所在节点的NodeManager启动TaskManager
6.TaskManager注册Slot
7.发出提供Slot命令
8.TaskManager向JobMaster提供Slot
9.JobMaster提交要在Slot中执行的任务
Ⅲ Flink kafka kerberos的配置
Flink消费集成kerberos认证的kafka集群时,需要做一些配置才可以正常执行。 Flink版本:1.8;kafka版本:2.0.1;Flink模式:Standalone //指示是否从 Kerberos ticket 缓存中读取 security.kerberos.login.use-ticket-cache: false1 //Kerberos 密钥表文件的绝对路径 security.kerberos.login.keytab: /data/home/keytab/flink.keytab //认证主体名称 security.kerberos.login.principal: [email protected] //Kerberos登陆contexts security.kerberos.login.contexts: Client,KafkaClient val properties: Properties =new Properties() properties.setProperty("bootstrap.servers","broker:9092") properties.setProperty("group.id","testKafka") properties.setProperty("security.protocol","SASL_PLAINTEXT") properties.setProperty("sasl.mechanism","GSSAPI") properties.setProperty("sasl.kerberos.service.name","kafka") consumer =new FlinkKafkaConsumer[String]("flink",new SimpleStringSchema(), properties) 参数说明 :security.protocol 运行参数可以配置为PLAINTEXT(可不配置)/SASL_PLAINTEXT/SSL/SASL_SSL四种协议,分别对应Fusion Insight Kafka集群的21005/21007/21008/21009端口。 如果配置了SASL,则必须配置sasl.kerberos.service.name为kafka,并在conf/flink-conf.yaml中配置security.kerberos.login相关配置项。如果配置了SSL,则必须配置ssl.truststore.location和ssl.truststore.password,前者表示truststore的位置,后者表示truststore密码。Ⅳ Flink内存管理
java所有数据类型对应的字节大小
java对象的组成 : 对象头,实例数据,对齐部分
jvm 序列化缺点
上面图为TaskManager内存模型,左边为细分的内存模型,右边为整体内存模型,该图摘自Flink官网
heap内存在jvm启动的时候申请的一块不变的内存区域,该内存实际上是Flink和task公用的一块区域,在flink层面通过控制来区分框架使用和task内存,heap内存管理起来是比较容易的,实际上non-heap的内存是难管理的一块,如果管理不当或者使用不当可能造成内存泄漏或者内存无限增长等问题
内存参数配置
在flink中对内存进行了抽象成了MemorySegment,�默认情况下,一个 MemorySegment 对应着一个 32KB 大小的内存块,这块内存既可以是堆上内存( byte数组) ,也可以是堆外内存(nio的ByteBufferr ) .
同时MemorySegment也提供了对二进制数据的操作方法,以及读取字节数组序列化以及序列化字节数组的方法等
下面是类继承图,该类有两MemorySegment实现类有两个分别为使用heap的以及混合的即有heap和non-heap,对于内存的访问有子类具体的实现
MemorySemgent是flink内存分配的最小单元了,对于数据夸MemorySemgent保存,那么对于上层的使用者来说,需要考虑考虑所有的细节,由于过于繁琐,所以在MemorySemgent上又抽象了一层内存也,内存也是在MemorySemgent数据访问上的视图,对数据输入和输出分别抽象为DataInputView/DataOutputView,有了这一层,上层使用者无需关心跨MemorySemgent的细节问题,内存也对自动处理跨MemorySemgent的内存操作
DataInputView
DataInputView继承DataInput,DataInputView是对MemorySemgent读取的抽象视图,提供一系列读取二进制数据不同类型的方法,AbstractPageInputView是DataInputView的一个抽象实现类,并且基本所有InputView都实现了该类,即所有实现该类的InputView都支持Page
InputView持有了多个MemorySemgent的引用(可以基于数组,list,deque等),这些MemorySemgent被视为一个内存页,可以顺序,随机等方式读取数据,要基于不同的实现类,实现类不同读取方式不同
方法图
DataOutputView
与DataInputView相对应,继承Output,并有一个拥有Page功能的抽象类(AbstractPagedOutputView),其大部outputView的实现都是继承自该抽象类,对一组MemorySemgent提供一个基于页的写入功能
方法图
类继承图
用于网络io数据的包装,每个buffer持有一个MemorySegment的引用,resultPartition写数据的时候,会向LocalBufferPool申请Buffer,会返回BufferBuilder,通过BufferBuilder想Buffe r<实际写入的是MemorySegment> 写数据
BufferBuilder是在上游Task中,负责想Buffer写入数据,BufferConsumer位于下游,与BufferBuilder相对应,用于消费Buffer的数据,每个bufferBuilder对应一个bufferConsumer
常用参数介绍
buffer申请
buffer回收
当buffer用完之后需要进行回收比如在netty的clientHandler收到响应之后进行处理就会把buffer回收掉,buffer回收之后并不会释放memorySegment,而是放回池中,变为可用内存,反复使用
flink托管的内存,托管内存使用堆外内存,用于批处理缓存排序等以及提供rocksDB内存
NetworkBufferPool是一个固定大小的MemorySegment实例吃,用于网络栈中,NettyBufferPool会为每个ResultPartition创建属于自己的LocalBufferPool,NettyBufferPool会作为全局的pool来提供内存,LocalBufferPool会通过限制来控制自己内存的申请,防止过多申请
LocalBufferPool继承关系,实现了bufferRecycler的接口,用于回收自己持有的buffer
在数据接收的时候会将数据封装成NettyBuffer,在数据发送的时候会通过BufferBilder向MemorySegment写入数据,然后通过BufferConsumer读取MemorySegment的数据
BufferManager主要用于为RemoteInputChannel提供buffer的,bufferManager在启动的时候会向全局bufferPool请求自己的独有buffer,当bufferManager的buffer不够的时候,则会向localBufferPool请求buffer,此时请求的buffer为浮动buffer
实际上提供的buffer是
Ⅳ Flink 配置Kafka数据源
flink 中已经预置了 kafka 相关的数据源实现 FlinkKafkaConsumer010 ,先看下具体的实现:
kafka 的 Consumer 有一堆实现,不过最终都是继承自 FlinkKafkaConsumerBase ,而这个抽象类则是继承 RichParallelSourceFunction ,是不是很眼熟,跟自定义 mysql 数据源继承的抽象类 RichSourceFunction 很类似。
可以看到,这里有很多构造函数,我们直接使用即可。
说明:
a、这里直接使用 properties 对象来设置 kafka 相关配置,比如 brokers 、 zk 、 groupId 、 序列化 、 反序列化 等。
b、使用 FlinkKafkaConsumer010 构造函数,指定 topic 、 properties 配置
c、 SimpleStringSchema 仅针对 String 类型数据的序列化及反序列化,如果 kafka 中消息的内容不是 String ,则会报错;看下 SimpleStringSchema 的定义:
d、这里直接把获取到的消息打印出来。
Ⅵ Flink状态管理和恢复机制
1、什么是状态?
2、Flink状态类型有哪几种?
3、状态有什么作用?
4、如何使用状态,实现什么样的API?
5、什么是checkpoint与savepoint?
6、如何使用checkpoint与savepoint?
7、checkpoint原理是什么?
8、checkpint存储到hdfs上又是什么意思?
<1> 增量计算
聚合操作、机器学习训练模型迭代运算时保存当前模型等等
<2> 容错
Job故障重启、升级
定义: 某task或者operator 在某一时刻的在内存中的状态。
而checkpoint是,对于这个中间结果进行一次快照。
作用:State是可以被记录的,在失败的情况下可以恢复。
checkpoint则表示了一个Flink Job,在一个特定时刻的一份全局状态快照,即包含了一个job下所有task/operator某时刻的状态。
比如任务挂掉的时候或被手动停止的时候,可以从挂掉的点重新继续消费。
基本类型:Operator state、Keyed state
特殊的 Broadcast State
适用场景:
增量计算:
<1>聚合操作
<2>机器学习训练模型迭代运算时保存当前模型
等等
容错:
Job故障重启
使用状态,必须使用RichFunction,因为状态是使用RuntimeContext访问的,只能在RichFunction中访问
假设现在存在输入源数据格式为(EventID,Value)
输出数据,直接flatMap即可,无状态。
如果要输出某EventID最大值/最小值等,HashMap是否可以?
程序一旦Crash,如何恢复?
答案:Flink提供了一套状态保存的方法,不需要借助第三方存储系统来解决状态存储问题。
Operator State跟一个特定operator的一个并发实例绑定,整个operator只对应一个state。相比较而言,在一个operator上,可能有很多个key,从而对应多个keyed state。
所以一个并行度为4的source,即有4个实例,那么就会有4个状态
举例:Flink中的Kafka Connector,就使用了operator state。有几个并行度,就会有几个connector实例,消费的分区不一样,它会在每个connector实例中,保存该实例中消费topic的所有(partition,offset)映射。
数据结构:ListState<T>
一般编码过程:实现CheckpointedFunction接口,必须实现两个函数,分别是:
initializeState和snapshotState
如何保存状态?
通常是定义一个private transient ListState<Long> checkPointList;
注意:使用Operator State最好不要在keyBy之后使用,另外不要将太大的state存放到这个里面。
是基于KeyStream之上的状态,keyBy之后的Operator State。
那么,一个并行度为3的keyed Opreator有几个状态,这个就不一定是3了,这里有几个状态是由keyby之后有几个key所决定的。
案例:有一个事件流Tuple2[eventId,val],求不同的事件eventId下,相邻3个val的平均值,事件流如下:
(1,4),(2,3),(3,1),(1,2),(3,2),(1,2),(2,2),(2,9)
那么事件1:8/3=2
那么事件2:14/3=4
Keyed State的数据结构类型有:
ValueState<T>:update(T)
ListState<T>:add(T)、get(T)和clear(T)
RecingState<T>:add(T)、receFunction()
MapState<UK,UV>:put(UK,UV)、putAll(Map<UK,UV>)、get(UK)
FlatMapFunction是无状态函数;RichFlatMapFunction是有状态函数
这里没有实现CheckpointedFunction接口,而是直接调用方法 getRuntimeContext(),然后使用getState方法来获取状态值。
特殊场景: 来自一个流的一些数据需要广播到所有下游任务,在这些任务中,这些数据被本地存储并且用于处理另一个流上的所有处理元素 。例如:一个低吞吐量流,其中包含一组规则,我们希望对来自另一个流的所有元素按照规则进行计算
典型应用:常规事件流.connect(规则流)
常规事件流.connect(配置流)
<1> 创建常规事件流DataStream或者KeyedDataStream
<2> 创建BroadcastedStream:创建规则流/配置流(低吞吐)并广播
<3> 连接两个Stream并实现计算处理
process(可以是BroadcastProcessFunction 或者 KeyedBroadcastProcessFunction )
BroadcastProcessFunction:
processElement(...):负责处理非广播流中的传入元素
processBroadcastElement(...):负责处理广播流中的传入元素(如规则),一般广播流的元素添加到状态里去备用,processElement处理业务数据时就可以使用
ReadOnlyContext和Context:
ReadOnlyContext对Broadcast State只有只读权限,Conetxt有写权限
KeyedBroadcastProcessFunction:
注意:
<1> Flink之间没有跨Task的通信
<2> 每个任务的广播状态的元素顺序有可能不一样
<3> Broadcast State保存在内存中(并不在RocksDB)
Ⅶ flink配置和内存
1.Hosts and Ports
metrics.internal.query-service.port "0" String Accepts a list of ports (“50100,50101”), ranges(“50100-50200”) or a combination of both.
rest.bind-address (none) String
rest.bind-port "8081" String
taskmanager.data.port 0 Integer 任务管理器的外部端口,用于数据交换操作。
taskmanager.host (none) String
taskmanager.rpc.port "0" String
2.Fault Tolerance
restart-strategy 重启策略 默认空
restart-strategy.fixed-delay.attempts 固定周期重启
restart-strategy.fixed-delay.delay 固定周期重启
restart-strategy.failure-rate.delay 失败率重启
restart-strategy.failure-rate.failure-rate-interval 失败率重启
restart-strategy.failure-rate.max-failures-per-interval 失败率重启
state.backend.incremental 增量checkpoint(仅对rocksdb支持)
state.backend.local-recovery 状态后端配置本地恢复,默认false,只支持键控状态后端
state.checkpoints.num-retained 保留的最大已完成检查点数,默认1
taskmanager.state.local.root-dirs 本地恢复的根目录
3.High Availability
high-availability 默认为NONE To enable high-availability, set this mode to "ZOOKEEPER" or specify FQN of factory class.
high-availability.cluster-id 高可用flink集群ID
high-availability.storageDir 元数据路径
high-availability.zookeeper.path.root 配置ZK路径
high-availability.zookeeper.quorum 配置ZK集群
4.Memory Configuration
在大多数情况下,用户只需要设置值taskmanager.memory.process.size或taskmanager.memory.flink.size(取决于设置方式),并可能通过调整JVM堆与托管内存的比率taskmanager.memory.managed.fraction。
jobmanager.memory.enable-jvm-direct-memory-limit 是否启用jm进程的JVM直接内存限制,默认false
jobmanager.memory.flink.size 默认none。这包括JobManager消耗的所有内存。非容器配置
jobmanager.memory.heap.size 默认none。默认为总内存减去JVM,network,managed等
jobmanager.memory.jvm-metaspace.size jm的元空间大小,默认256mb
jobmanager.memory.jvm-overhead.fraction jm保留总进程内存的比例,默认0.1
jobmanager.memory.jvm-overhead.max 最大JVM开销,默认1gb
jobmanager.memory.jvm-overhead.min 最小JVM开销,默认192mb
jobmanager.memory.off-heap.size jm的堆外内存,默认128mb,如果第一个参数被启用,这个将生效
jobmanager.memory.process.size JobManager的总进程内存大小。容器化配置这个,为容器总大小
taskmanager.memory.flink.size TaskExecutor的总Flink内存大小。默认none,非容器配置
taskmanager.memory.framework.heap.size TaskExecutor的框架堆内存大小。默认128mb
taskmanager.memory.framework.off-heap.size TaskExecutor的框架堆外内存大小。默认128mb
taskmanager.memory.jvm-metaspace.size TaskExecutor的JVM元空间大小。默认256mb
taskmanager.memory.jvm-overhead.fraction 要为JVM开销保留的总进程内存的分数。默认0.1
taskmanager.memory.jvm-overhead.max TaskExecutor的最大JVM开销最大大小,默认1gb
taskmanager.memory.jvm-overhead.min TaskExecutor的最小JVM开销大小。默认192mb
taskmanager.memory.managed.consumer-weights 消费者权重。DATAPROC(用于流式RocksDB状态后端和批量内置算法)和PYTHON(用于Python进程),默认DATAPROC:70,PYTHON:30
taskmanager.memory.managed.fraction 如果未显式指定托管内存大小,则将用作托管内存的Flink总内存的分数,默认0.4
taskmanager.memory.managed.size TaskExecutor的托管内存大小。默认none
taskmanager.memory.network.fraction 用作网络内存的总Flink内存的分数,默认0.1
taskmanager.memory.network.max TaskExecutor的最大网络内存大小。默认1gb
taskmanager.memory.network.min TaskExecutor的最小网络内存大小。默认64mb
taskmanager.memory.process.size TaskExecutor的总进程内存大小。默认none,容器配置
taskmanager.memory.task.heap.size tm内存,默认none,
taskmanager.memory.task.off-heap.size tm堆外内存,默认0
5.Miscellaneous Options
fs.allowed-fallback-filesystems none
fs.default-scheme none
io.tmp.dirs 'LOCAL_DIRS' on Yarn. '_FLINK_TMP_DIR' on Mesos. System.getProperty("java.io.tmpdir") in standalone.
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_setup_tm.html
Ⅷ Flink HistoryServer配置(简单三步完成)
允许您查询JobManager存档的已完成作业的状态和统计信息。(官网原话)
最适合用于:了解 flink过去完成任务的状态,以及有状态作业的恢复(保存了最后一次的checkpoint地址)
官网地址: https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/historyserver.html
官网配置参数: https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#history-server
修改flink-1.11.2/conf/flink-conf.yaml文件
两张图:
historyserver.web.tmpdir的默认配置图:
historyserver.web.tmpdir的自定义路径配置图:
在hdfs的/flink目录下创建completed-jobs目录(权限可以改成777)
启动/关闭命令:
1、查看启动状态
2、分别启一个per-job任务、sql任务、基于session启的任务,过一会全部cancel掉,都可以在hdfs路径和/tmp下的自定义目录看到相关数据,最后可以在host:8082上面看到你刚才canceled的任务,如下图:
3、访问hdfs路径:
4、访问 http://host:8082 可以查看到历史完成任务状态:
生产中遇到突然这个服务丢失,然后重启任务失败。通过排查任务是historyserver.web.tmpdir: /tmp/flinkhistoryserver/这个路径被删除了。
Ⅸ Flink如何管理Kafka 消费位点(译文)
Checkpointing 是 Flink 故障恢复的内部机制。一个 checkpoint 就是 Flink应用程序产生的状态的一个副本。如果 Flink 任务发生故障,它会从 checkpoint 中载入之前的状态来恢复任务,就好像故障没有发生一样。
Checkpoints 是 Flink 容错的基础,并且确保了 Flink 流式应用在失败时的完整性。Checkpoints 可以通过 Flink 设置定时触发。
Flink Kafka consumer 使用 Flink 的 checkpoint 机制来存储 Kafka 每个分区的位点到 state。当 Flink 执行 checkpoint 时,Kafka 的每个分区的位点都被存储到 checkpoint 指定的 filesystem 中。Flink 的 checkpoint 机制确保了所有任务算子的状态是一致的,也就是说这些状态具有相同的数据输入。当所有的任务算子成功存储他们自己的状态后,代表一次 checkpoint 的完成。因此,当任务从故障中恢复时,Flink 保证了exactly-once。
下面将一步一步的演示 Flink 是如何通过 checkpoint 来管理 Kafka 的 offset 的。
下面的例子从两个分区的 Kafka topic 中读取数据,每个分区的数据是 “A”, “B”, “C”, ”D”, “E”。假设每个分区都是从 0 开始读取。
假设 Flink Kafka consumer 从分区 0 开始读取数据 “A”,那么此时第一个 consumer 的位点从 0 变成 1。如下图所示。
此时数据 “A” 到达 Flink Job 中的 Map Task。两个 consumer 继续读取数据 (从分区 0 读取数据 “B” ,从分区 1 读取数据 “A”)。 offsets 分别被更新成 2 和 1。与此同时,假设 Flink 从 source 端开始执行 checkpoint。
到这里,Flink Kafka consumer tasks 已经执行了一次快照,offsets也保存到了 state 中(“offset = 2, 1”) 。此时 source tasks 在 数据 “B” 和 “A” 后面,向下游发送一个 checkpoint barrier。checkpoint barriers 是 Flink 用来对齐每个任务算子的 checkpoint,以确保整个 checkpoint 的一致性。分区 1 的数据 “A” 到达 Flink Map Task, 与此同时分区 0 的 consumer 继续读取下一个消息(message “C”)。
Flink Map Task 收到上游两个 source 的 checkpoint barriers 然后开始执行 checkpoint ,把 state 保存到 filesystem。同时,消费者继续从Kafka分区读取更多事件。
假设 Flink Map Task 是 Flink Job 的最末端,那么当它完成 checkpoint 后,就会立马通知 Flink Job Master。当 job 的所有 task 都确认其 state 已经 “checkpointed”,Job Master将完成这次的整个 checkpoint。 之后,checkpoint 可以用于故障恢复。
如果发生故障(例如,worker 挂掉),则所有任务将重启,并且它们的状态将被重置为最近一次的 checkpoint 的状态。 如下图所示。
source 任务将分别从 offset 2 和 1 开始消费。当任务重启完成, 将会正常运行,就像之前没发生故障一样。
PS: 文中提到的 checkpoint 对齐,我说下我的理解,假设一个 Flink Job 有 Source -> Map -> Sink,其中Sink有多个输入。那么当一次checkpoint的 barrier从source发出时,到sink这里,多个输入需要等待其它的输入的barrier已经到达,经过对齐后,sink才会继续处理消息。这里就是exactly-once和at-least-once的区别。
The End
原文链接: How Apache Flink manages Kafka consumer offsets
Ⅹ Flink生产配置
1、ssh端口号
修改默认ssh端口号: export FLINK_SSH_OPTS="-p 53742"
2、flink-conf.yaml