當前位置:首頁 » 存儲配置 » flink如何管理配置

flink如何管理配置

發布時間: 2023-02-21 04:40:50

Ⅰ FlinkX快速開始(一)

鏈接地址: FlinkX

FlinkX是一個基於Flink的批流統一的數據同步工具,既可以採集靜態的數據,比如Mysql,HDFS等,也可以採集實時變化的數據,比如MySQL binlog,Kafka等

1、前置
需要安裝maven、java8、配置好github相關參數

2、Fork FlinX項目到自己的倉庫中

2、Clone項目到本地
git clone https://github.com/liukunyuan/flinkx.git

3、安裝額外的jar包
1)、cd flinkx/bin
2)、執行sh ./install_jars.sh(windows執行install_jars.bat腳本

4、打包
1)、回到flinkx目錄:cd ..
2)、執行打包命令:mvn clean package -Dmaven.test.skip=true

1、配置flink conf文件(暫時不需要安裝flink)
1)、進入flinkconf目錄
cd flinkconf
2)、修改flink-conf.yaml文件添加一行
rest.bind-port: 8888

2、配置mysqltomysql的json文件,路徑:/Users/jack/Documents/jack-project/flinkx/flinkconf/mysql2mysql.json

3、運行任務

4、查看監控網頁和log.txt文件: http://localhost:8888/

Ⅱ Flink運行模式

    在idea中運行Flink程序的方式就是開發模式。

    Flink中的Local-cluster(本地集群)模式,單節點運行,主要用於測試, 學習。

        獨立集群模式,由Flink自身提供計算資源。

把Flink應用提交給Yarn的ResourceManager

Flink會根據運行在JobManger上的job的需要的slot的數量動態的分配TaskManager資源

Yarn又分3種模式

Session-Cluster模式需要先啟動Flink集群,向Yarn申請資源。以後提交任務都向這里提交。

這個Flink集群會常駐在yarn集群中,除非手工停止。

在向Flink集群提交Job的時候, 如果資源被用完了,則新的Job不能正常提交.

缺點: 如果提交的作業中有長時間執行的大作業, 佔用了該Flink集群的所有資源, 則後續無法提交新的job.

所以, Session-Cluster適合那些需要頻繁提交的多個小Job, 並且執行時間都不長的Job.

一個Job會對應一個Flink集群,每提交一個作業會根據自身的情況,都會單獨向yarn申請資源,直到作業執行完成,一個作業的失敗與否並不會影響下一個作業的正常提交和運行。獨享Dispatcher和ResourceManager,按需接受資源申請;適合規模大長時間運行的作業。

每次提交都會創建一個新的flink集群,任務之間互相獨立,互不影響,方便管理。任務執行完成之後創建的集群也會消失。

Per-job模式執行結果,一個job對應一個Application

Application Mode會在Yarn上啟動集群, 應用jar包的main函數(用戶類的main函數)將會在JobManager上執行. 只要應用程序執行結束, Flink集群會馬上被關閉. 也可以手動停止集群.

與Per-Job-Cluster的區別:就是Application Mode下, 用戶的main函數式在集群中執行的,並且當一個application中有多個job的話,per-job模式則是一個job對應一個yarn中的application,而Application Mode則這個application中對應多個job。

Application Mode模式執行結果,多個job對應一個Application

官方建議:

出於生產的需求, 我們建議使用Per-job or Application Mode,因為他們給應用提供了更好的隔離!

https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/

0.Flink任務提交後,Client向HDFS上傳Flink的Jar包和配置

1.向Yarn ResourceManager提交任務,ResourceManager分配Container資源

2.通知對應的NodeManager啟動ApplicationMaster,ApplicationMaster啟動後載入Flink的Jar包和配置構建環境,然後啟動JobManager(Dispatcher)

2.1.Dispatcher啟動JobMaster

3.JobMaster向ResourceManager(Flink)申請資源

4.ResourceManager(Flink)向ResourceManager(Yarn)申請資源啟動TaskManager

5.ResourceManager分配Container資源後,由ApplicationMaster通知資源所在節點的NodeManager啟動TaskManager

6.TaskManager注冊Slot

7.發出提供Slot命令

8.TaskManager向JobMaster提供Slot

9.JobMaster提交要在Slot中執行的任務

Ⅲ Flink kafka kerberos的配置

Flink消費集成kerberos認證的kafka集群時,需要做一些配置才可以正常執行。     Flink版本:1.8;kafka版本:2.0.1;Flink模式:Standalone     //指示是否從 Kerberos ticket 緩存中讀取     security.kerberos.login.use-ticket-cache: false1    //Kerberos 密鑰表文件的絕對路徑     security.kerberos.login.keytab: /data/home/keytab/flink.keytab    //認證主體名稱     security.kerberos.login.principal: [email protected]     //Kerberos登陸contexts     security.kerberos.login.contexts: Client,KafkaClient   val properties: Properties =new Properties()   properties.setProperty("bootstrap.servers","broker:9092")   properties.setProperty("group.id","testKafka")   properties.setProperty("security.protocol","SASL_PLAINTEXT")   properties.setProperty("sasl.mechanism","GSSAPI")   properties.setProperty("sasl.kerberos.service.name","kafka")   consumer =new   FlinkKafkaConsumer[String]("flink",new SimpleStringSchema(), properties)     參數說明 :security.protocol      運行參數可以配置為PLAINTEXT(可不配置)/SASL_PLAINTEXT/SSL/SASL_SSL四種協議,分別對應Fusion Insight Kafka集群的21005/21007/21008/21009埠。 如果配置了SASL,則必須配置sasl.kerberos.service.name為kafka,並在conf/flink-conf.yaml中配置security.kerberos.login相關配置項。如果配置了SSL,則必須配置ssl.truststore.location和ssl.truststore.password,前者表示truststore的位置,後者表示truststore密碼。

Ⅳ Flink內存管理

java所有數據類型對應的位元組大小

java對象的組成 : 對象頭,實例數據,對齊部分

jvm 序列化缺點

上面圖為TaskManager內存模型,左邊為細分的內存模型,右邊為整體內存模型,該圖摘自Flink官網

heap內存在jvm啟動的時候申請的一塊不變的內存區域,該內存實際上是Flink和task公用的一塊區域,在flink層面通過控制來區分框架使用和task內存,heap內存管理起來是比較容易的,實際上non-heap的內存是難管理的一塊,如果管理不當或者使用不當可能造成內存泄漏或者內存無限增長等問題

內存參數配置

在flink中對內存進行了抽象成了MemorySegment,�默認情況下,一個 MemorySegment 對應著一個 32KB 大小的內存塊,這塊內存既可以是堆上內存( byte數組) ,也可以是堆外內存(nio的ByteBufferr ) .

同時MemorySegment也提供了對二進制數據的操作方法,以及讀取位元組數組序列化以及序列化位元組數組的方法等

下面是類繼承圖,該類有兩MemorySegment實現類有兩個分別為使用heap的以及混合的即有heap和non-heap,對於內存的訪問有子類具體的實現

MemorySemgent是flink內存分配的最小單元了,對於數據誇MemorySemgent保存,那麼對於上層的使用者來說,需要考慮考慮所有的細節,由於過於繁瑣,所以在MemorySemgent上又抽象了一層內存也,內存也是在MemorySemgent數據訪問上的視圖,對數據輸入和輸出分別抽象為DataInputView/DataOutputView,有了這一層,上層使用者無需關心跨MemorySemgent的細節問題,內存也對自動處理跨MemorySemgent的內存操作

DataInputView

DataInputView繼承DataInput,DataInputView是對MemorySemgent讀取的抽象視圖,提供一系列讀取二進制數據不同類型的方法,AbstractPageInputView是DataInputView的一個抽象實現類,並且基本所有InputView都實現了該類,即所有實現該類的InputView都支持Page

InputView持有了多個MemorySemgent的引用(可以基於數組,list,deque等),這些MemorySemgent被視為一個內存頁,可以順序,隨機等方式讀取數據,要基於不同的實現類,實現類不同讀取方式不同

方法圖

DataOutputView

與DataInputView相對應,繼承Output,並有一個擁有Page功能的抽象類(AbstractPagedOutputView),其大部outputView的實現都是繼承自該抽象類,對一組MemorySemgent提供一個基於頁的寫入功能

方法圖

類繼承圖

用於網路io數據的包裝,每個buffer持有一個MemorySegment的引用,resultPartition寫數據的時候,會向LocalBufferPool申請Buffer,會返回BufferBuilder,通過BufferBuilder想Buffe r<實際寫入的是MemorySegment> 寫數據

BufferBuilder是在上游Task中,負責想Buffer寫入數據,BufferConsumer位於下游,與BufferBuilder相對應,用於消費Buffer的數據,每個bufferBuilder對應一個bufferConsumer

常用參數介紹

buffer申請

buffer回收

當buffer用完之後需要進行回收比如在netty的clientHandler收到響應之後進行處理就會把buffer回收掉,buffer回收之後並不會釋放memorySegment,而是放回池中,變為可用內存,反復使用

flink託管的內存,託管內存使用堆外內存,用於批處理緩存排序等以及提供rocksDB內存

NetworkBufferPool是一個固定大小的MemorySegment實例吃,用於網路棧中,NettyBufferPool會為每個ResultPartition創建屬於自己的LocalBufferPool,NettyBufferPool會作為全局的pool來提供內存,LocalBufferPool會通過限制來控制自己內存的申請,防止過多申請

LocalBufferPool繼承關系,實現了bufferRecycler的介面,用於回收自己持有的buffer

在數據接收的時候會將數據封裝成NettyBuffer,在數據發送的時候會通過BufferBilder向MemorySegment寫入數據,然後通過BufferConsumer讀取MemorySegment的數據

BufferManager主要用於為RemoteInputChannel提供buffer的,bufferManager在啟動的時候會向全局bufferPool請求自己的獨有buffer,當bufferManager的buffer不夠的時候,則會向localBufferPool請求buffer,此時請求的buffer為浮動buffer

實際上提供的buffer是

Ⅳ Flink 配置Kafka數據源

flink 中已經預置了 kafka 相關的數據源實現 FlinkKafkaConsumer010 ,先看下具體的實現:

kafka Consumer 有一堆實現,不過最終都是繼承自 FlinkKafkaConsumerBase ,而這個抽象類則是繼承 RichParallelSourceFunction ,是不是很眼熟,跟自定義 mysql 數據源繼承的抽象類 RichSourceFunction 很類似。

可以看到,這里有很多構造函數,我們直接使用即可。

說明:

a、這里直接使用 properties 對象來設置 kafka 相關配置,比如 brokers zk groupId 序列化 反序列化 等。

b、使用 FlinkKafkaConsumer010 構造函數,指定 topic properties 配置

c、 SimpleStringSchema 僅針對 String 類型數據的序列化及反序列化,如果 kafka 中消息的內容不是 String ,則會報錯;看下 SimpleStringSchema 的定義:

d、這里直接把獲取到的消息列印出來。

Ⅵ Flink狀態管理和恢復機制

1、什麼是狀態?
2、Flink狀態類型有哪幾種?
3、狀態有什麼作用?
4、如何使用狀態,實現什麼樣的API?
5、什麼是checkpoint與savepoint?
6、如何使用checkpoint與savepoint?
7、checkpoint原理是什麼?
8、checkpint存儲到hdfs上又是什麼意思?

<1> 增量計算
聚合操作、機器學習訓練模型迭代運算時保存當前模型等等
<2> 容錯
Job故障重啟、升級

定義: 某task或者operator 在某一時刻的在內存中的狀態。
而checkpoint是,對於這個中間結果進行一次快照。
作用:State是可以被記錄的,在失敗的情況下可以恢復。
checkpoint則表示了一個Flink Job,在一個特定時刻的一份全局狀態快照,即包含了一個job下所有task/operator某時刻的狀態。

比如任務掛掉的時候或被手動停止的時候,可以從掛掉的點重新繼續消費。
基本類型:Operator state、Keyed state
特殊的 Broadcast State
適用場景:
增量計算:
<1>聚合操作
<2>機器學習訓練模型迭代運算時保存當前模型
等等
容錯:
Job故障重啟

使用狀態,必須使用RichFunction,因為狀態是使用RuntimeContext訪問的,只能在RichFunction中訪問

假設現在存在輸入源數據格式為(EventID,Value)
輸出數據,直接flatMap即可,無狀態。
如果要輸出某EventID最大值/最小值等,HashMap是否可以?
程序一旦Crash,如何恢復?

答案:Flink提供了一套狀態保存的方法,不需要藉助第三方存儲系統來解決狀態存儲問題。

Operator State跟一個特定operator的一個並發實例綁定,整個operator只對應一個state。相比較而言,在一個operator上,可能有很多個key,從而對應多個keyed state。
所以一個並行度為4的source,即有4個實例,那麼就會有4個狀態

舉例:Flink中的Kafka Connector,就使用了operator state。有幾個並行度,就會有幾個connector實例,消費的分區不一樣,它會在每個connector實例中,保存該實例中消費topic的所有(partition,offset)映射。

數據結構:ListState<T>

一般編碼過程:實現CheckpointedFunction介面,必須實現兩個函數,分別是:
initializeState和snapshotState

如何保存狀態?
通常是定義一個private transient ListState<Long> checkPointList;

注意:使用Operator State最好不要在keyBy之後使用,另外不要將太大的state存放到這個裡面。

是基於KeyStream之上的狀態,keyBy之後的Operator State。
那麼,一個並行度為3的keyed Opreator有幾個狀態,這個就不一定是3了,這里有幾個狀態是由keyby之後有幾個key所決定的。

案例:有一個事件流Tuple2[eventId,val],求不同的事件eventId下,相鄰3個val的平均值,事件流如下:
(1,4),(2,3),(3,1),(1,2),(3,2),(1,2),(2,2),(2,9)
那麼事件1:8/3=2
那麼事件2:14/3=4

Keyed State的數據結構類型有:
ValueState<T>:update(T)
ListState<T>:add(T)、get(T)和clear(T)
RecingState<T>:add(T)、receFunction()
MapState<UK,UV>:put(UK,UV)、putAll(Map<UK,UV>)、get(UK)

FlatMapFunction是無狀態函數;RichFlatMapFunction是有狀態函數

這里沒有實現CheckpointedFunction介面,而是直接調用方法 getRuntimeContext(),然後使用getState方法來獲取狀態值。

特殊場景: 來自一個流的一些數據需要廣播到所有下游任務,在這些任務中,這些數據被本地存儲並且用於處理另一個流上的所有處理元素 。例如:一個低吞吐量流,其中包含一組規則,我們希望對來自另一個流的所有元素按照規則進行計算

典型應用:常規事件流.connect(規則流)
常規事件流.connect(配置流)

<1> 創建常規事件流DataStream或者KeyedDataStream
<2> 創建BroadcastedStream:創建規則流/配置流(低吞吐)並廣播
<3> 連接兩個Stream並實現計算處理
process(可以是BroadcastProcessFunction 或者 KeyedBroadcastProcessFunction )

BroadcastProcessFunction:

processElement(...):負責處理非廣播流中的傳入元素

processBroadcastElement(...):負責處理廣播流中的傳入元素(如規則),一般廣播流的元素添加到狀態里去備用,processElement處理業務數據時就可以使用

ReadOnlyContext和Context:
ReadOnlyContext對Broadcast State只有隻讀許可權,Conetxt有寫許可權

KeyedBroadcastProcessFunction:

注意:
<1> Flink之間沒有跨Task的通信
<2> 每個任務的廣播狀態的元素順序有可能不一樣
<3> Broadcast State保存在內存中(並不在RocksDB)

Ⅶ flink配置和內存

1.Hosts and Ports
metrics.internal.query-service.port "0" String Accepts a list of ports (「50100,50101」), ranges(「50100-50200」) or a combination of both.
rest.bind-address (none) String
rest.bind-port "8081" String
taskmanager.data.port 0 Integer 任務管理器的外部埠,用於數據交換操作。
taskmanager.host (none) String
taskmanager.rpc.port "0" String
2.Fault Tolerance
restart-strategy 重啟策略 默認空
restart-strategy.fixed-delay.attempts 固定周期重啟
restart-strategy.fixed-delay.delay 固定周期重啟
restart-strategy.failure-rate.delay 失敗率重啟
restart-strategy.failure-rate.failure-rate-interval 失敗率重啟
restart-strategy.failure-rate.max-failures-per-interval 失敗率重啟

state.backend.incremental 增量checkpoint(僅對rocksdb支持)
state.backend.local-recovery 狀態後端配置本地恢復,默認false,只支持鍵控狀態後端
state.checkpoints.num-retained 保留的最大已完成檢查點數,默認1
taskmanager.state.local.root-dirs 本地恢復的根目錄
3.High Availability
high-availability 默認為NONE To enable high-availability, set this mode to "ZOOKEEPER" or specify FQN of factory class.
high-availability.cluster-id 高可用flink集群ID
high-availability.storageDir 元數據路徑

high-availability.zookeeper.path.root 配置ZK路徑
high-availability.zookeeper.quorum 配置ZK集群
4.Memory Configuration
在大多數情況下,用戶只需要設置值taskmanager.memory.process.size或taskmanager.memory.flink.size(取決於設置方式),並可能通過調整JVM堆與託管內存的比率taskmanager.memory.managed.fraction。
jobmanager.memory.enable-jvm-direct-memory-limit 是否啟用jm進程的JVM直接內存限制,默認false
jobmanager.memory.flink.size 默認none。這包括JobManager消耗的所有內存。非容器配置
jobmanager.memory.heap.size 默認none。默認為總內存減去JVM,network,managed等
jobmanager.memory.jvm-metaspace.size jm的元空間大小,默認256mb
jobmanager.memory.jvm-overhead.fraction jm保留總進程內存的比例,默認0.1
jobmanager.memory.jvm-overhead.max 最大JVM開銷,默認1gb
jobmanager.memory.jvm-overhead.min 最小JVM開銷,默認192mb
jobmanager.memory.off-heap.size jm的堆外內存,默認128mb,如果第一個參數被啟用,這個將生效
jobmanager.memory.process.size JobManager的總進程內存大小。容器化配置這個,為容器總大小
taskmanager.memory.flink.size TaskExecutor的總Flink內存大小。默認none,非容器配置
taskmanager.memory.framework.heap.size TaskExecutor的框架堆內存大小。默認128mb
taskmanager.memory.framework.off-heap.size TaskExecutor的框架堆外內存大小。默認128mb
taskmanager.memory.jvm-metaspace.size TaskExecutor的JVM元空間大小。默認256mb
taskmanager.memory.jvm-overhead.fraction 要為JVM開銷保留的總進程內存的分數。默認0.1
taskmanager.memory.jvm-overhead.max TaskExecutor的最大JVM開銷最大大小,默認1gb
taskmanager.memory.jvm-overhead.min TaskExecutor的最小JVM開銷大小。默認192mb
taskmanager.memory.managed.consumer-weights 消費者權重。DATAPROC(用於流式RocksDB狀態後端和批量內置演算法)和PYTHON(用於Python進程),默認DATAPROC:70,PYTHON:30
taskmanager.memory.managed.fraction 如果未顯式指定託管內存大小,則將用作託管內存的Flink總內存的分數,默認0.4
taskmanager.memory.managed.size TaskExecutor的託管內存大小。默認none
taskmanager.memory.network.fraction 用作網路內存的總Flink內存的分數,默認0.1
taskmanager.memory.network.max TaskExecutor的最大網路內存大小。默認1gb
taskmanager.memory.network.min TaskExecutor的最小網路內存大小。默認64mb
taskmanager.memory.process.size TaskExecutor的總進程內存大小。默認none,容器配置
taskmanager.memory.task.heap.size tm內存,默認none,
taskmanager.memory.task.off-heap.size tm堆外內存,默認0
5.Miscellaneous Options
fs.allowed-fallback-filesystems none
fs.default-scheme none
io.tmp.dirs 'LOCAL_DIRS' on Yarn. '_FLINK_TMP_DIR' on Mesos. System.getProperty("java.io.tmpdir") in standalone.

https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_setup_tm.html

Ⅷ Flink HistoryServer配置(簡單三步完成)

允許您查詢JobManager存檔的已完成作業的狀態和統計信息。(官網原話)
最適合用於:了解 flink過去完成任務的狀態,以及有狀態作業的恢復(保存了最後一次的checkpoint地址)

官網地址: https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/historyserver.html
官網配置參數: https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#history-server

修改flink-1.11.2/conf/flink-conf.yaml文件

兩張圖:
historyserver.web.tmpdir的默認配置圖:

historyserver.web.tmpdir的自定義路徑配置圖:

在hdfs的/flink目錄下創建completed-jobs目錄(許可權可以改成777)

啟動/關閉命令:

1、查看啟動狀態

2、分別啟一個per-job任務、sql任務、基於session啟的任務,過一會全部cancel掉,都可以在hdfs路徑和/tmp下的自定義目錄看到相關數據,最後可以在host:8082上面看到你剛才canceled的任務,如下圖:

3、訪問hdfs路徑:

4、訪問 http://host:8082 可以查看到歷史完成任務狀態:

生產中遇到突然這個服務丟失,然後重啟任務失敗。通過排查任務是historyserver.web.tmpdir: /tmp/flinkhistoryserver/這個路徑被刪除了。

Ⅸ Flink如何管理Kafka 消費位點(譯文)

Checkpointing 是 Flink 故障恢復的內部機制。一個 checkpoint 就是 Flink應用程序產生的狀態的一個副本。如果 Flink 任務發生故障,它會從 checkpoint 中載入之前的狀態來恢復任務,就好像故障沒有發生一樣。

Checkpoints 是 Flink 容錯的基礎,並且確保了 Flink 流式應用在失敗時的完整性。Checkpoints 可以通過 Flink 設置定時觸發。

Flink Kafka consumer 使用 Flink 的 checkpoint 機制來存儲 Kafka 每個分區的位點到 state。當 Flink 執行 checkpoint 時,Kafka 的每個分區的位點都被存儲到 checkpoint 指定的 filesystem 中。Flink 的 checkpoint 機制確保了所有任務運算元的狀態是一致的,也就是說這些狀態具有相同的數據輸入。當所有的任務運算元成功存儲他們自己的狀態後,代表一次 checkpoint 的完成。因此,當任務從故障中恢復時,Flink 保證了exactly-once。

下面將一步一步的演示 Flink 是如何通過 checkpoint 來管理 Kafka 的 offset 的。

下面的例子從兩個分區的 Kafka topic 中讀取數據,每個分區的數據是 「A」, 「B」, 「C」, 」D」, 「E」。假設每個分區都是從 0 開始讀取。

假設 Flink Kafka consumer 從分區 0 開始讀取數據 「A」,那麼此時第一個 consumer 的位點從 0 變成 1。如下圖所示。

此時數據 「A」 到達 Flink Job 中的 Map Task。兩個 consumer 繼續讀取數據 (從分區 0 讀取數據 「B」 ,從分區 1 讀取數據 「A」)。 offsets 分別被更新成 2 和 1。與此同時,假設 Flink 從 source 端開始執行 checkpoint。

到這里,Flink Kafka consumer tasks 已經執行了一次快照,offsets也保存到了 state 中(「offset = 2, 1」) 。此時 source tasks 在 數據 「B」 和 「A」 後面,向下游發送一個 checkpoint barrier。checkpoint barriers 是 Flink 用來對齊每個任務運算元的 checkpoint,以確保整個 checkpoint 的一致性。分區 1 的數據 「A」 到達 Flink Map Task, 與此同時分區 0 的 consumer 繼續讀取下一個消息(message 「C」)。

Flink Map Task 收到上游兩個 source 的 checkpoint barriers 然後開始執行 checkpoint ,把 state 保存到 filesystem。同時,消費者繼續從Kafka分區讀取更多事件。

假設 Flink Map Task 是 Flink Job 的最末端,那麼當它完成 checkpoint 後,就會立馬通知 Flink Job Master。當 job 的所有 task 都確認其 state 已經 「checkpointed」,Job Master將完成這次的整個 checkpoint。 之後,checkpoint 可以用於故障恢復。

如果發生故障(例如,worker 掛掉),則所有任務將重啟,並且它們的狀態將被重置為最近一次的 checkpoint 的狀態。 如下圖所示。

source 任務將分別從 offset 2 和 1 開始消費。當任務重啟完成, 將會正常運行,就像之前沒發生故障一樣。

PS: 文中提到的 checkpoint 對齊,我說下我的理解,假設一個 Flink Job 有 Source -> Map -> Sink,其中Sink有多個輸入。那麼當一次checkpoint的 barrier從source發出時,到sink這里,多個輸入需要等待其它的輸入的barrier已經到達,經過對齊後,sink才會繼續處理消息。這里就是exactly-once和at-least-once的區別。

The End
原文鏈接: How Apache Flink manages Kafka consumer offsets

Ⅹ Flink生產配置

1、ssh埠號
修改默認ssh埠號: export FLINK_SSH_OPTS="-p 53742"

2、flink-conf.yaml

熱點內容
api加密java 發布:2025-07-12 18:15:55 瀏覽:443
爬蟲專用編譯器 發布:2025-07-12 18:15:10 瀏覽:458
安卓版糖果傳奇叫什麼 發布:2025-07-12 18:06:26 瀏覽:262
編程素質人 發布:2025-07-12 18:04:17 瀏覽:671
在雲伺服器安裝sqlserver 發布:2025-07-12 18:04:05 瀏覽:737
釘釘oa初始密碼是多少 發布:2025-07-12 18:02:34 瀏覽:72
樓道口防盜門密碼該如何使用 發布:2025-07-12 17:53:55 瀏覽:351
mysql源碼目錄 發布:2025-07-12 17:48:41 瀏覽:442
資料庫導出dmp 發布:2025-07-12 17:39:08 瀏覽:336
濟南少兒編程哪家好 發布:2025-07-12 17:33:09 瀏覽:128