當前位置:首頁 » 操作系統 » kafka源碼剖析

kafka源碼剖析

發布時間: 2022-11-15 01:10:33

① 如何保證kafka 的消息機制 ack-fail 源碼跟蹤

Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design.(Kafka布式、區(partitioned)、基於備份(replicated)commit-log存儲服務.提供類似於messaging system特性,設計實現完全同)kafka種高吞吐量布式發布訂閱消息系統特性:
(1)、通O(1)磁碟數據結構提供消息持久化種結構於即使數TB消息存儲能夠保持間穩定性能
(2)、高吞吐量:即使非普通硬體kafka支持每秒數十萬消息
(3)、支持通kafka伺服器消費機集群區消息
(4)、支持Hadoop並行數據載入
、用Kafka面自帶腳本進行編譯
載Kafka源碼面自帶gradlew腳本我利用編譯Kafka源碼:
1 # wget
2 # tar -zxf kafka-0.8.1.1-src.tgz
3 # cd kafka-0.8.1.1-src
4 # ./gradlew releaseTarGz
運行面命令進行編譯現異信息:
01 :core:signArchives FAILED
02
03 FAILURE: Build failed with an exception.
04
05 * What went wrong:
06 Execution failed for task ':core:signArchives'.
07 > Cannot perform signing task ':core:signArchives' because it
08 has no configured signatory
09
10 * Try:
11 Run with --stacktrace option to get the stack trace. Run with
12 --info or --debug option to get more log output.
13
14 BUILD FAILED
bug()用面命令進行編譯
1 ./gradlew releaseTarGzAll -x signArchives
候編譯功(編譯程現)編譯程我指定應Scala版本進行編譯:
1 ./gradlew -PscalaVersion=2.10.3 releaseTarGz -x signArchives
編譯完core/build/distributions/面kafka_2.10-0.8.1.1.tgz文件網載直接用
二、利用sbt進行編譯
我同用sbt編譯Kafka步驟:
01 # git clone
02 # cd kafka
03 # git checkout -b 0.8 remotes/origin/0.8
04 # ./sbt update
05 [info] [SUCCESSFUL ] org.eclipse.jdt#core;3.1.1!core.jar (2243ms)
06 [info] downloading ...
07 [info] [SUCCESSFUL ] ant#ant;1.6.5!ant.jar (1150ms)
08 [info] Done updating.
09 [info] Resolving org.apache.hadoop#hadoop-core;0.20.2 ...
10 [info] Done updating.
11 [info] Resolving com.yammer.metrics#metrics-annotation;2.2.0 ...
12 [info] Done updating.
13 [info] Resolving com.yammer.metrics#metrics-annotation;2.2.0 ...
14 [info] Done updating.
15 [success] Total time: 168 s, completed Jun 18, 2014 6:51:38 PM
16
17 # ./sbt package
18 [info] Set current project to Kafka (in build file:/export1/spark/kafka/)
19 Getting Scala 2.8.0 ...
20 :: retrieving :: org.scala-sbt#boot-scala
21 confs: [default]
22 3 artifacts copied, 0 already retrieved (14544kB/27ms)
23 [success] Total time: 1 s, completed Jun 18, 2014 6:52:37 PM
於Kafka 0.8及版本需要運行命令:
01 # ./sbt assembly-package-dependency
02 [info] Loading project definition from /export1/spark/kafka/project
03 [warn] Multiple resolvers having different access mechanism configured with
04 same name 'sbt-plugin-releases'. To avoid conflict, Remove plicate project
05 resolvers (`resolvers`) or rename publishing resolver (`publishTo`).
06 [info] Set current project to Kafka (in build file:/export1/spark/kafka/)
07 [warn] Credentials file /home/wyp/.m2/.credentials does not exist
08 [info] Including slf4j-api-1.7.2.jar
09 [info] Including metrics-annotation-2.2.0.jar
10 [info] Including scala-compiler.jar
11 [info] Including scala-library.jar
12 [info] Including slf4j-simple-1.6.4.jar
13 [info] Including metrics-core-2.2.0.jar
14 [info] Including snappy-java-1.0.4.1.jar
15 [info] Including zookeeper-3.3.4.jar
16 [info] Including log4j-1.2.15.jar
17 [info] Including zkclient-0.3.jar
18 [info] Including jopt-simple-3.2.jar
19 [warn] Merging 'META-INF/NOTICE' with strategy 'rename'
20 [warn] Merging 'org/xerial/snappy/native/README' with strategy 'rename'
21 [warn] Merging 'META-INF/maven/org.xerial.snappy/snappy-java/LICENSE'
22 with strategy 'rename'
23 [warn] Merging 'LICENSE.txt' with strategy 'rename'
24 [warn] Merging 'META-INF/LICENSE' with strategy 'rename'
25 [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard'
26 [warn] Strategy 'discard' was applied to a file
27 [warn] Strategy 'rename' was applied to 5 files
28 [success] Total time: 3 s, completed Jun 18, 2014 6:53:41 PM
我sbt面指定scala版本:
01 <!--
02 User: 往記憶
03 Date: 14-6-18
04 Time: 20:20
05 bolg:
06 本文址:/archives/1044
07 往記憶博客專注於hadoop、hive、spark、shark、flume技術博客量干貨
08 往記憶博客微信公共帳號:iteblog_hadoop
09 -->
10 sbt "++2.10.3 update"
11 sbt "++2.10.3 package"
12 sbt "++2.10.3 assembly-package-dependency"

② kafka技術內幕與apache kafka源碼剖析看哪一本好,為什麼

Jafka/Kafka
Kafka是Apache下的一個子項目,是一個高性能跨語言分布式Publish/Subscribe消息隊列系統,而Jafka是在Kafka之上孵化而來的,即Kafka的一個升級版。具有以下特性:快速持久化,可以在O(1)的系統開銷下進行消息持久化;高吞吐,在一台普通的伺服器上既可以達到10W/s的吞吐速率;完全的分布式系統,Broker、Procer、Consumer都原生自動支持分布式,自動實現復雜均衡;支持Hadoop數據並行載入,對於像Hadoop的一樣的日誌數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka通過Hadoop的並行載入機制來統一了在線和離線的消息處理,這一點也是本課題所研究系統所看重的。Apache Kafka相對於ActiveMQ是一個非常輕量級的消息系統,除了性能非常好之外,還是一個工作良好的分布式系統。
其他一些隊列列表HornetQ、Apache Qpid、Sparrow、Starling、Kestrel、Beanstalkd、Amazon SQS就不再一一分析。

③ Kafka 源碼解析之 Topic 的新建/擴容/刪除

[TOC]
本篇接著講述 Controller 的功能方面的內容,在 Kafka 中,一個 Topic 的新建、擴容或者刪除都是由 Controller 來操作的,本篇文章也是主要聚焦在 Topic 的操作處理上(新建、擴容、刪除),實際上 Topic 的創建在 Kafka 源碼解析之 topic 創建過程(三) 中已經講述過了,本篇與前面不同的是,本篇主要是從 Controller 角度來講述,而且是把新建、擴容、刪除這三個 Topic 級別的操作放在一起做一個總結。

這里把 Topic 新建與擴容放在一起講解,主要是因為無論 Topic 是新建還是擴容,在 Kafka 內部其實都是 Partition 的新建,底層的實現機制是一樣的,Topic 的新建與擴容的整體流程如下圖所示:

Topic 新建與擴容觸發條件的不同如下所示:

下面開始詳細講述這兩種情況。

Topic 擴容
Kafka 提供了 Topic 擴容工具,假設一個 Topic(topic_test)只有一個 partition,這時候我們想把它擴容到兩個 Partition,可以通過下面兩個命令來實現:

這兩種方法的區別是:第二種方法直接指定了要擴容的 Partition 2 的副本需要分配到哪台機器上,這樣的話我們可以精確控制到哪些 Topic 放下哪些機器上。

無論是使用哪種方案,上面兩條命令產生的結果只有一個,將 Topic 各個 Partition 的副本寫入到 ZK 對應的節點上,這樣的話 /brokers/topics/topic_test 節點的內容就會發生變化, 監聽器就會被觸發 ,該監聽器的處理流程如下:

其 doHandleDataChange() 方法的處理流程如下:

下面我們看下 onNewPartitionCreation() 方法,其實現如下:

關於 Partition 的新建,總共分了以下四步:

經過上面幾個階段,一個 Partition 算是真正創建出來,可以正常進行讀寫工作了,當然上面只是講述了 Controller 端做的內容,Partition 副本所在節點對 LeaderAndIsr 請求會做更多的工作,這部分會在後面關於 LeaderAndIsr 請求的處理中只能夠詳細講述。

Topic 新建
Kafka 也提供了 Topic 創建的工具,假設我們要創建一個名叫 topic_test,Partition 數為2的 Topic,創建的命令如下:

跟前面的類似,方法二是可以精確控制新建 Topic 每個 Partition 副本所在位置,Topic 創建的本質上是在 /brokers/topics 下新建一個節點信息,並將 Topic 的分區詳情寫入進去,當 /brokers/topics 有了新增的 Topic 節點後,會觸發 TopicChangeListener 監聽器,其實現如下:

只要 /brokers/topics 下子節點信息有變化(topic 新增或者刪除),TopicChangeListener 都會被觸發,其 doHandleChildChange() 方法的處理流程如下:

接著看下 onNewTopicCreation() 方法實現

上述方法主要做了兩件事:

onNewPartitionCreation() 的實現在前面 Topic 擴容部分已經講述過,這里不再重復,最好參考前面流程圖來梳理 Topic 擴容和新建的整個過程。

Kafka Topic 刪除這部分的邏輯是一個單獨線程去做的,這個線程是在 Controller 啟動時初始化和啟動的。

TopicDeletionManager 初始化
TopicDeletionManager 啟動實現如下所示:

TopicDeletionManager 啟動時只是初始化了一個 DeleteTopicsThread 線程,並啟動該線程。TopicDeletionManager 這個類從名字上去看,它是 Topic 刪除的管理器,它是如何實現 Topic 刪除管理呢,這里先看下該類的幾個重要的成員變數:

前面一小節,簡單介紹了 TopicDeletionManager、DeleteTopicsThread 的啟動以及它們之間的關系,這里我們看下一個 Topic 被設置刪除後,其處理的整理流程,簡單做了一個小圖,如下所示:

這里先簡單講述上面的流程,當一個 Topic 設置為刪除後:

先看下 DeleteTopicsListener 的實現,如下:

其 doHandleChildChange() 的實現邏輯如下:

接下來,看下 Topic 刪除線程 DeleteTopicsThread 的實現,如下所示:

doWork() 方法處理邏輯如下:

先看下 onTopicDeletion() 方法,這是 Topic 最開始刪除時的實現,如下所示:

Topic 的刪除的真正實現方法還是在 startReplicaDeletion() 方法中,Topic 刪除時,會先調用 onPartitionDeletion() 方法刪除所有的 Partition,然後在 Partition 刪除時,執行 startReplicaDeletion() 方法刪除該 Partition 的副本,該方法的實現如下:

該方法的執行邏輯如下:

在將副本狀態從 OfflineReplica 轉移成 ReplicaDeletionStarted 時,會設置一個回調方法 (),該方法會將刪除成功的 Replica 設置為 ReplicaDeletionSuccessful 狀態,刪除失敗的 Replica 設置為 ReplicaDeletionIneligible 狀態(需要根據 StopReplica 請求處理的過程,看下哪些情況下 Replica 會刪除失敗,這個會在後面講解)。

下面看下這個方法 completeDeleteTopic(),當一個 Topic 的所有 Replica 都刪除成功時,即其狀態都在 ReplicaDeletionSuccessful 時,會調用這個方法,如下所示:

當一個 Topic 所有副本都刪除後,會進行如下處理:

至此,一個 Topic 算是真正刪除完成。

④ Kafka 設計詳解之隊列

在 上文 中我們介紹了 Kafka 的網路通信,本文打算詳細分析 Kafka 的核心 — 隊列 的設計和實現,來對 Kafka 進行更深一步的了解。

隊列是一種先進先出(FIFO)的數據結構,它是 Kafka 中最重要的部分,負責收集生產者生產的消息,並將這些消息傳遞給消費者。要實現一個隊列有多種方式,Kafka 作為一個消息隊列中間件,在設計隊列時主要要考慮兩個問題:

乍一看到這個問題,我們會想,內存的讀取速度遠快於磁碟,如果追求性能,內存也充足的話,當然是將生產者產生的消息數據寫到內存(比如用一個數組或者鏈表來存儲隊列數據),供消費者消費。真的是這樣嗎?
下面我們依次分析下寫內存和寫磁碟文件的優缺點,首先,內存的優點是讀寫速度非常快,但是,如果我們的目標是設計「大數據量」下的「高吞吐量」的消息隊列,會有以下幾個問題:

接下來我們來分析一下磁碟,寫磁碟文件方式存儲隊列數據的優點就是能規避上述內存的缺點,但其有很嚴重的缺點,就是讀寫速度慢,如果純依靠磁碟,那消息隊列肯定做不到「高吞吐量」這個目標。

分析了內存跟磁碟的優缺點,好像我們還是只能選寫內存,但我們忽視了磁碟的兩個情況:一是磁碟慢是慢在隨機讀寫,如果是順序讀寫,他的速度能達到 600MB/sec(RAID-5 磁碟陣列),並不慢,如果我們盡可能地將數據的讀寫設計成順序的,可以大大提升性能。二是 現代的操作系統會(盡可能地)將磁碟里的文件進行緩存

有了操作系統級別的文件緩存,那用磁碟存儲隊列數據的方式就變得有優勢了。首先,磁碟文件的數據會有文件緩存,所以不必擔心隨機讀寫的性能;其次,同樣是使用內存,磁碟文件使用的是操作系統級別的內存,相比於在 Java 內存堆中存儲隊列,它沒有 GC 問題,也沒有 Java 對象的額外內存開銷,更可以規避應用重啟後的內存 load 數據耗時的問題,而且,文件緩存是操作系統提供的,因為我們只要簡單的寫磁碟文件,系統復雜性大大降低。

因此,Kafka 直接使用磁碟來存儲消息隊列的數據。

剛才我們已經決定用磁碟文件來存儲隊列數據,那麼要如何選擇數據結構呢?一般情況下,如果需要查找數據並隨機訪問,我們會用 B+ 樹來存儲數據,但其時間復雜度是 O(log N),由於我們設計的是消息隊列,我們可以完全順序的寫收到的生產者消息,消費者消費時,只要記錄下消費者當前消費的位置,往後消費就可以了,這樣可以對文件盡可能的進行順序讀寫,同時,時間復雜度是O(1)。其實,這跟我們寫日誌的方式很像,每條日誌順序 append 到日誌文件。

之前我們已經確定採用直接順序寫磁碟文件的方式來存儲隊列數據,下面我們來剖析下具體的實現細節。

在 Kafka 中,用一個文件夾存儲一條消息隊列,成為一個 Log,每條消息隊列由多個文件組成,每個文件稱為一個 LogSegment,每當一個 LogSegment 的大小到達閾值,系統就會重新生成一個 LogSegment;當舊的 LogSegment 過期需要清理時(雖然磁碟空間相對於內存會寬裕很多,我們可以保存更長時間的消息數據,比如一周,以供消費者更靈活的使用,但還是需要定期清理太老的數據),系統會根據清理策略刪除這些文件。

現在我們知道一個隊列(Log)是由多個隊列段文件(LogSegment)組成的,那麼 Kafka 是如何將這些文件邏輯上連接從而組成一條有序隊列的呢?在生成每個隊列段文件時,Kafka 用該段的初始位移來對其命名,如在新建一個隊列時,會初始化第一個隊列段文件,那麼其文件名就是0,假設每個段的大小是固定值 L,那麼第二個段文件名就是 L,第 N 個就是 (N - 1)* L。這樣,我們就可以根據文件名對段文件進行排序,排序後的順序就是整個隊列的邏輯順序。

了解了隊列的基本實現,下面我們就來分析下隊列的核心操作—讀和寫。

寫操作發生在生產者向隊列生產消息時,在上篇文章講網路通信時我們已經說到,所有的客戶端請求會根據協議轉到一個 Handler 來具體處理,負責寫操作的 Handler 叫 ProcerHandler,整個寫請求的流程如下:

之前我們說過,如果是順序寫,由於省掉了磁頭定址的時間,磁碟的性能還是很高的,我們看到 Kakfa 隊列是以順序方式寫的,所以性能很高。但是,如果一台 Kafka 伺服器有很多個隊列,而硬碟的磁頭是有限的,所以還是得在不同的隊列直接來回切換定址,性能會有所下降。

隊列的讀操作發送在消費者消費隊列數據時,由於隊列是線性的,只需要記錄消費者上次消費到了哪裡(offset),接下去消費就好了。那麼首先會有一個問題,由誰來記消費者到底消費到哪裡了?

一般情況下,我們會想到讓服務端來記錄各個消費者當前的消費位置,當消費者來拉數據,根據記錄的消費位置和隊列的當前位置,要麼返回新的待消費數據,要麼返回空。讓服務端記錄消費位置,當遇到網路異常時會有一些問題,比如服務端將消息發給消費者後,如果網路異常消費者沒有收到消息,那麼這條消息就被「跳過」了,當然我們可以借鑒二階段提交的思想,服務端將消息發送給消費者後,標記狀態為「已發送」,等消費者消費成功後,返回一個 ack 給服務端,服務端再將其標記為「成功消費」。不過這樣設計還是會有一個問題,如果消費者沒有返回 ack 給服務端,此時這條消息可能在已經被消費也可能還沒被消費,服務端無從得知,只能根據人為策略跳過(可能會漏消息)或者重發(可能存在重復數據)。另一個問題是,如果有很多消費者,服務端需要記錄每條消息的每個消費者的消費狀態,這在大數據的場景下,非常消耗性能和內存。

Kafka 將每個消費者的消費狀態記錄在消費者本身(隔一段時間將最新消費狀態同步到 zookeeper),每次消費者要拉數據,就給服務端傳遞一個 offset,告訴服務端從隊列的哪個位置開始給我數據,以及一個參數 length,告訴服務端最多給我多大的數據(批量順序讀數據,更高性能),這樣就能使服務端的設計復雜度大大降低。當然這解決不了一致性的問題,不過消費者可以根據自己程序特點,更靈活地處理事務。

下面就來分析整個讀的流程:

分布式系統中不可避免的會遇到一致性問題,主要是兩塊:生產者與隊列服務端之間的一致性問題、消費者與隊列服務端之間的一致性問題,下面依次展開。

當生產者向服務端投遞消息時,可能會由於網路或者其他問題失敗,如果要保證一致性,需要生產者在失敗後重試,不過重試又會導致消息重復的問題,一個解決方案是每個消息給一個唯一的 id,通過服務端的主動去重來避免重復消息的問題,不過這一機制目前 Kafka 還未實現。目前 Kafka 提供配置,供用戶不同場景下選擇允許漏消息(失敗後不重試)還是允許重復消息(失敗後重試)。

由於在消費者里我們可以自己控制消費位置,就可以更靈活的進行個性化設計。如果我們在拉取到消息後,先增加 offset,然後再進行消息的後續處理,如果在消息還未處理完消費者就掛掉,就會存在消息遺漏的問題;如果我們在拉取到消息後,先進行消息處理,處理成功後再增加 offset,那麼如果消息處理一半消費者掛掉,會存在重復消息的問題。要做到完全一致,最好的辦法是將 offset 的存儲與消費者放一起,每消費一條數據就將 offset+1。

本文介紹了 Kafka 的隊列實現以及其讀寫過程。Kafka 認為操作系統級別的文件緩存比 Java 的堆內存更省空間和高效,如果生產者消費者之間比較「和諧」的話,大部分的讀寫操作都會落在文件緩存,且在順序讀寫的情況下,硬碟的速度並不慢,因此選擇直接寫磁碟文件的方式存儲隊列。在隊列的讀寫過程中,Kafka 盡可能地使用順序讀寫,並使用零拷貝來優化性能。最後,Kafka 讓消費者自己控制消費位置,提供了更加靈活的數據消費方式。

⑤ Kafaka入門(1)- Kafka簡介和安裝與啟動(mac)

Kafka是由Apache軟體基金會開發的一個開源流處理平台,由Scala和Java編寫。kafka 是一個高性能的消息隊列,也是一個分布式流處理平台。
kafka中文網
kafka官網

Procer :Procer即生產者,消息的產生者,是消息的入口。
kafka cluster
Broker :Broker是kafka實例,每個伺服器上有一個或多個kafka的實例,姑且認為每個broker對應一台伺服器。一個集群由多個broker組成,集群內的broker都有一個不重復的編號,如圖中的broker-0、broker-1等……
Topic :消息的主題,可以理解為消息的分類,kafka的數據就保存在topic。在每個broker上都可以創建多個topic。
Partition :Topic的分區,每個topic可以有多個分區,分區的作用是做負載,提高kafka的吞吐量。 同一個topic在不同的分區的數據是不重復的 ,partition的表現形式就是一個一個的文件夾!
Replication : 每一個分區都有多個副本 ,副本的作用是做備胎。當主分區(Leader)故障的時候會選擇一個備胎(Follower)上位,成為Leader。在kafka中默認副本的最大數量是10個,且副本的數量不能大於Broker的數量,follower和leader絕對是在不同的機器,同一機器對同一個分區也只可能存放一個副本(包括自己)。
Message :每一條發送的消息主體。
Consumer :消費者,即消息的消費方,是消息的出口。
Consumer Group :將多個消費組成一個消費者組。在kafka的設計中 同一個分區的數據只能被同一消費者組中的某一個消費者消費 。Partition 的分配問題,即確定哪個 Partition 由哪個 Consumer 來消費。Kafka 有兩種分配策略,一個是 RoundRobin,一個是 Range,默認為Range。
一個消費者組內也可以訂閱多個topic
多個消費組可以訂閱同一個topic 。

Zookeeper :kafka集群依賴zookeeper來保存集群的的元信息,來保證系統的可用性。

使用brew進行安裝,非常方便。

ZooKeeper是一個分布式的,開放源碼的 分布式應用程序協調服務 ,是Google的Chubby一個開源的實現,是Hadoop和Hbase的重要組件。它是一個為分布式應用提供一致性服務的軟體,提供的功能包括:配置維護、域名服務、分布式同步、組服務等。
kafka是基於zookeeper的,啟動kafka之前,需要先啟動zookeeper

查看啟動是否成功

啟動kafka

查看啟動是否成功

查看topic列表

新起一個終端,作為生產者,用於發送消息,每一行算一條消息,將消息發送到kafka伺服器

新起一個終端作為消費者,接收消息

服務關閉的順序是先kafka,然後zookeeper

再過半小時,你就能明白kafka的工作原理了
Kafka架構原理,也就這么回事!

⑥ apache kafka源碼怎麼編譯

Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design.(Kafka是一個分布式的、可分區的(partitioned)、基於備份的(replicated)和commit-log存儲的服務.。它提供了類似於messaging system的特性,但是在設計實現上完全不同)。kafka是一種高吞吐量的分布式發布訂閱消息系統,它有如下特性:
(1)、通過O(1)的磁碟數據結構提供消息的持久化,這種結構對於即使數以TB的消息存儲也能夠保持長時間的穩定性能。
(2)、高吞吐量:即使是非常普通的硬體kafka也可以支持每秒數十萬的消息。
(3)、支持通過kafka伺服器和消費機集群來分區消息。
(4)、支持Hadoop並行數據載入。
一、用Kafka裡面自帶的腳本進行編譯
下載好了Kafka源碼,裡面自帶了一個gradlew的腳本,我們可以利用這個編譯Kafka源碼:
1 # wget http://mirror.bit.e.cn/apache/kafka/0.8.1.1/kafka-0.8.1.1-src.tgz
2 # tar -zxf kafka-0.8.1.1-src.tgz
3 # cd kafka-0.8.1.1-src
4 # ./gradlew releaseTarGz
運行上面的命令進行編譯將會出現以下的異常信息:
01 :core:signArchives FAILED
02
03 FAILURE: Build failed with an exception.
04
05 * What went wrong:
06 Execution failed for task ':core:signArchives'.
07 > Cannot perform signing task ':core:signArchives' because it
08 has no configured signatory
09
10 * Try:
11 Run with --stacktrace option to get the stack trace. Run with
12 --info or --debug option to get more log output.
13
14 BUILD FAILED
這是一個bug(https://issues.apache.org/jira/browse/KAFKA-1297),可以用下面的命令進行編譯
1 ./gradlew releaseTarGzAll -x signArchives
這時候將會編譯成功(在編譯的過程中將會出現很多的)。在編譯的過程中,我們也可以指定對應的Scala版本進行編譯:
1 ./gradlew -PscalaVersion=2.10.3 releaseTarGz -x signArchives
編譯完之後將會在core/build/distributions/裡面生成kafka_2.10-0.8.1.1.tgz文件,這個和從網上下載的一樣,可以直接用。
二、利用sbt進行編譯
我們同樣可以用sbt來編譯Kafka,步驟如下:
01 # git clone https://git-wip-us.apache.org/repos/asf/kafka.git
02 # cd kafka
03 # git checkout -b 0.8 remotes/origin/0.8
04 # ./sbt update
05 [info] [SUCCESSFUL ] org.eclipse.jdt#core;3.1.1!core.jar (2243ms)
06 [info] downloading http://repo1.maven.org/maven2/ant/ant/1.6.5/ant-1.6.5.jar ...
07 [info] [SUCCESSFUL ] ant#ant;1.6.5!ant.jar (1150ms)
08 [info] Done updating.
09 [info] Resolving org.apache.hadoop#hadoop-core;0.20.2 ...
10 [info] Done updating.
11 [info] Resolving com.yammer.metrics#metrics-annotation;2.2.0 ...
12 [info] Done updating.
13 [info] Resolving com.yammer.metrics#metrics-annotation;2.2.0 ...
14 [info] Done updating.
15 [success] Total time: 168 s, completed Jun 18, 2014 6:51:38 PM
16
17 # ./sbt package
18 [info] Set current project to Kafka (in build file:/export1/spark/kafka/)
19 Getting Scala 2.8.0 ...
20 :: retrieving :: org.scala-sbt#boot-scala
21 confs: [default]
22 3 artifacts copied, 0 already retrieved (14544kB/27ms)
23 [success] Total time: 1 s, completed Jun 18, 2014 6:52:37 PM
對於Kafka 0.8及以上版本還需要運行以下的命令:
01 # ./sbt assembly-package-dependency
02 [info] Loading project definition from /export1/spark/kafka/project
03 [warn] Multiple resolvers having different access mechanism configured with
04 same name 'sbt-plugin-releases'. To avoid conflict, Remove plicate project
05 resolvers (`resolvers`) or rename publishing resolver (`publishTo`).
06 [info] Set current project to Kafka (in build file:/export1/spark/kafka/)
07 [warn] Credentials file /home/wyp/.m2/.credentials does not exist
08 [info] Including slf4j-api-1.7.2.jar
09 [info] Including metrics-annotation-2.2.0.jar
10 [info] Including scala-compiler.jar
11 [info] Including scala-library.jar
12 [info] Including slf4j-simple-1.6.4.jar
13 [info] Including metrics-core-2.2.0.jar
14 [info] Including snappy-java-1.0.4.1.jar
15 [info] Including zookeeper-3.3.4.jar
16 [info] Including log4j-1.2.15.jar
17 [info] Including zkclient-0.3.jar
18 [info] Including jopt-simple-3.2.jar
19 [warn] Merging 'META-INF/NOTICE' with strategy 'rename'
20 [warn] Merging 'org/xerial/snappy/native/README' with strategy 'rename'
21 [warn] Merging 'META-INF/maven/org.xerial.snappy/snappy-java/LICENSE'
22 with strategy 'rename'
23 [warn] Merging 'LICENSE.txt' with strategy 'rename'
24 [warn] Merging 'META-INF/LICENSE' with strategy 'rename'
25 [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard'
26 [warn] Strategy 'discard' was applied to a file
27 [warn] Strategy 'rename' was applied to 5 files
28 [success] Total time: 3 s, completed Jun 18, 2014 6:53:41 PM
當然,我們也可以在sbt裡面指定scala的版本:
01 <!--
02 User: 過往記憶
03 Date: 14-6-18
04 Time: 20:20
05 bolg: http://www.iteblog.com
06 本文地址:http://www.iteblog.com/archives/1044
07 過往記憶博客,專注於hadoop、hive、spark、shark、flume的技術博客,大量的干貨
08 過往記憶博客微信公共帳號:iteblog_hadoop
09 -->
10 sbt "++2.10.3 update"
11 sbt "++2.10.3 package"
12 sbt "++2.10.3 assembly-package-dependency"

⑦ 一文解密Kafka,Kafka源碼設計與實現原理剖析,真正的通俗易懂

Apache Kafka (簡稱Kafka )最早是由Linkedln開源出來的分布式消息系統,現在是Apache旗下的一個子項目,並且已經成為開冊、領域應用最廣泛的消息系統之 Kafka社區也非常活躍,從 版本開始, Kafka 的標語已經從「一個高吞吐量、分布式的消息系統」改為「一個分布式的流平台」
關於Kafka,我打算從入門開始講起,一直到它的底層實現邏輯個原理以及源碼,建議大家花點耐心,從頭開始看,相信會對你有所收獲。

作為 個流式數據平台,最重要的是要具備下面 個特點

消息系統:
消息系統 也叫作消息隊列)主要有兩種消息模型:隊列和發布訂Kafka使用消費組( consumer group )統 上面兩種消息模型 Kafka使用隊列模型時,它可以將處理 作為平均分配給消費組中的消費者成員

下面我們會從 個角度分析Kafka 的幾個基本概念,並嘗試解決下面 個問題

消息由生產者發布到 fk 集群後,會被消費者消費 消息的消費模型有兩種:推送模型( pu和拉取模型( pull 基於推送模型的消息系統,由消息代理記錄消費者的消費狀態 消息代理在將消息推送到消費者後 標記這條消息為已消費

但這種方式無法很好地保證消息的處理語義 比如,消息代理把消息發送出去後,當消費進程掛掉或者由於網路原因沒有收到這條消息時,就有可能造成消息丟失(因為消息代理已經 這條消息標記為自己消費了,但實際上這條消息並沒有被實際處理) 如果要保證消息的處理語義,消息代理發送完消息後,要設置狀態為「已發送」,只有收到消費者的確認請求後才更新為「已消費」,這就需要在消息代理中記錄所有消息的消費狀態,這種做法也是不可取的

Kafka每個主題的多個分區日誌分布式地存儲在Kafka集群上,同時為了故障容錯,每個分區都會以副本的方式復制到多個消息代理節點上 其中一個節點會作為主副本( Leader ),其 節點作為備份副本( Follower ,也叫作從副本)

主副本會負責所有的客戶端讀寫操作,備份副本僅僅從主副本同步數據 當主副本 IH 現在故障時,備份副本中的 副本會被選擇為新的主副本 因為每個分區的副本中只有主副本接受讀寫,所以每個服務端都會作為某些分區的主副本,以及另外一些分區的備份副本這樣Kafka集群的所有服務端整體上對客戶端是負載均衡的

消息系統通常由生產者「pro ucer 消費者( co sumer )和消息代理( broke 大部分組成,生產者會將消息寫入消息代理,消費者會從消息代理中讀取消息 對於消息代理而言,生產者和消費者都屬於客戶端:生產者和消費者會發送客戶端請求給服務端,服務端的處理分別是存儲消息和獲取消息,最後服務端返回響應結果給客戶端

新的生產者應用程序使用 af aP oce 對象代表 個生產者客戶端進程 生產者要發送消息,並不是直接發送給 務端 ,而是先在客戶端 消息放入隊列 然後 一個 息發送線程從隊列中消息,以 鹽的方式發送消息給服務端 Kafka的記 集器( Reco dACCUl'lUlato )負責緩存生產者客戶端產生的消息,發送線程( Sende )負責讀取 集器的批 過網路發送給服務端為了保證客戶端 絡請求 快速 應, Kafka 用選擇器( Selecto 絡連接 讀寫 理,使網路連接( Netwo kCl i.ent )處理客戶端 絡請求

追加消息到記錄收集器時按照分區進行分組,並放到batches集合中,每個分區的隊列都保存了將發送到這個分區對應節點上的 記錄,客戶端的發送線程可 只使用 Sende 線程迭 batches的每個分區,獲取分區對應的主劇本節點,取出分區對應的 列中的批記錄就可以發送消息了

消息發送線程有兩種消息發送方式 按照分區直接發送 按照分區的目標節點發迭 假設有兩台伺服器, 題有 個分區,那麼每台伺服器就有 個分區 ,消息發送線程迭代batches的每個分 接往分區的主副本節點發送消息,總共會有 個請求 所示,我 先按照分區的主副本節點進行分組, 屬於同 個節點的所有分區放在一起,總共只有兩個請求做法可以大大減少網路的開銷

消息系統由生產者 存儲系統和消費者組成 章分析了生產者發送消息給服務端的過程,本章分析消費者從服務端存儲系統讀取生產者寫入消息的過程 首先我 來了解消費者的 些基礎知識

作為分布式的消息系統, Kafka支持多個生產者和多個消費者,生產者可以將消息發布到集群中不同節點的不同分區上;「肖費者也可以消費集群中多個節點的多個分區上的消息 寫消息時,多個生產者可以 到同 個分區 讀消息時,如果多個消費者同時讀取 個分區,為了保證將日誌文件的不同數據分配給不同的消費者,需要採用加鎖 同步等方式,在分區級別的日誌文件上做些控制

相反,如果約定「同 個分區只可被 個消費者處理」,就不需要加鎖同步了,從而可提升消費者的處理能力 而且這也並不違反消息的處理語義:原先需要多個消費者處理,現在交給一個消費者處理也是可以的 3- 給出了 種最簡單的消息系統部署模式,生產者的數據源多種多樣,它們都統寫人Kafka集群 處理消息時有多個消費者分擔任務 ,這些消費者的處理邏輯都相同, 每個消費者處理的分區都不會重復

因為分區要被重新分配,分區的所有者都會發生變 ,所以在還沒有重新分配分區之前 所有消費者都要停止已有的拉取錢程 同時,分區分配給消費者都會在ZK中記錄所有者信息,所以也要先刪ZK上的節點數據 只有和分區相關的 所有者 拉取線程都釋放了,才可以開始分配分區

如果說在重新分配分區前沒有釋放這些信息,再平衡後就可能造成同 個分區被多個消費者所有的情況 比如分區Pl 原先歸消費者 所有,如果沒有釋放拉取錢程和ZK節點,再平衡後分區Pl 被分配給消費者 了,這樣消費者 和消費者 就共享了分區Pl ,而這顯然不符合 fka 中關於「一個分區只能被分配給 個消費者」的限制條件 執行再平衡操作的步驟如下

如果是協調者節點發生故障,服務端會有自己的故障容錯機制,選出管理消費組所有消費者的新協調者節,點消費者客戶端沒有權利做這個工作,它能做的只是等待一段時間,查詢服務端是否已經選出了新的協調節點如果消費者查到現在已經有管理協調者的協調節點,就會連接這個新協調節,哉由於這個協調節點是服務端新選出來的,所以每個消費者都應該重新連接協調節點

消費者重新加入消費組,在分配到分區的前後,都會對消費者的拉取工作產生影響 消費者發送「加入組請求」之前要停止拉取消息,在收到「加入組響應」中的分區之後要重新開始拉取消息時,為了能夠讓客戶端應用程序感知消費者管理的分區發生變化,在加入組前後,客戶端還可以設置自定義的「消費者再平衡監聽器」,以便對分區的變化做出合適的處理


⑧ 如何確定Kafka的分區數,key和consumer線程數,以及不消費問題解決

在Kafak中國社區的qq群中,這個問題被提及的比例是相當高的,這也是Kafka用戶最常碰到的問題之一。本文結合Kafka源碼試圖對該問題相關的因素進行探討。希望對大家有所幫助。
怎麼確定分區數?
「我應該選擇幾個分區?」——如果你在Kafka中國社區的群里,這樣的問題你會經常碰到的。不過有些遺憾的是,我們似乎並沒有很權威的答案能夠解答這樣的問題。其實這也不奇怪,畢竟這樣的問題通常都是沒有固定答案的。Kafka官網上標榜自己是"high-throughput distributed messaging system",即一個高吞吐量的分布式消息引擎。那麼怎麼達到高吞吐量呢?Kafka在底層摒棄了Java堆緩存機制,採用了操作系統級別的頁緩存,同時將隨機寫操作改為順序寫,再結合Zero-Copy的特性極大地改善了IO性能。但是,這只是一個方面,畢竟單機優化的能力是有上限的。如何通過水平擴展甚至是線性擴展來進一步提升吞吐量呢? Kafka就是使用了分區(partition),通過將topic的消息打散到多個分區並分布保存在不同的broker上實現了消息處理(不管是procer還是consumer)的高吞吐量。
Kafka的生產者和消費者都可以多線程地並行操作,而每個線程處理的是一個分區的數據。因此分區實際上是調優Kafka並行度的最小單元。對於procer而言,它實際上是用多個線程並發地向不同分區所在的broker發起Socket連接同時給這些分區發送消息;而consumer呢,同一個消費組內的所有consumer線程都被指定topic的某一個分區進行消費(具體如何確定consumer線程數目我們後面會詳細說明)。所以說,如果一個topic分區越多,理論上整個集群所能達到的吞吐量就越大。
但分區是否越多越好呢?顯然也不是,因為每個分區都有自己的開銷:
一、客戶端/伺服器端需要使用的內存就越多
先說說客戶端的情況。Kafka 0.8.2之後推出了Java版的全新的procer,這個procer有個參數batch.size,默認是16KB。它會為每個分區緩存消息,一旦滿了就打包將消息批量發出。看上去這是個能夠提升性能的設計。不過很顯然,因為這個參數是分區級別的,如果分區數越多,這部分緩存所需的內存佔用也會更多。假設你有10000個分區,按照默認設置,這部分緩存需要佔用約157MB的內存。而consumer端呢?我們拋開獲取數據所需的內存不說,只說線程的開銷。如果還是假設有10000個分區,同時consumer線程數要匹配分區數(大部分情況下是最佳的消費吞吐量配置)的話,那麼在consumer client就要創建10000個線程,也需要創建大約10000個Socket去獲取分區數據。這裡面的線程切換的開銷本身已經不容小覷了。
伺服器端的開銷也不小,如果閱讀Kafka源碼的話可以發現,伺服器端的很多組件都在內存中維護了分區級別的緩存,比如controller,FetcherManager等,因此分區數越多,這種緩存的成本越久越大。
二、文件句柄的開銷
每個分區在底層文件系統都有屬於自己的一個目錄。該目錄下通常會有兩個文件: base_offset.log和base_offset.index。Kafak的controller和ReplicaManager會為每個broker都保存這兩個文件句柄(file handler)。很明顯,如果分區數越多,所需要保持打開狀態的文件句柄數也就越多,最終可能會突破你的ulimit -n的限制。
三、降低高可用性
Kafka通過副本(replica)機制來保證高可用。具體做法就是為每個分區保存若干個副本(replica_factor指定副本數)。每個副本保存在不同的broker上。期中的一個副本充當leader 副本,負責處理procer和consumer請求。其他副本充當follower角色,由Kafka controller負責保證與leader的同步。如果leader所在的broker掛掉了,contorller會檢測到然後在zookeeper的幫助下重選出新的leader——這中間會有短暫的不可用時間窗口,雖然大部分情況下可能只是幾毫秒級別。但如果你有10000個分區,10個broker,也就是說平均每個broker上有1000個分區。此時這個broker掛掉了,那麼zookeeper和controller需要立即對這1000個分區進行leader選舉。比起很少的分區leader選舉而言,這必然要花更長的時間,並且通常不是線性累加的。如果這個broker還同時是controller情況就更糟了。
說了這么多「廢話」,很多人肯定已經不耐煩了。那你說到底要怎麼確定分區數呢?答案就是:視情況而定。基本上你還是需要通過一系列實驗和測試來確定。當然測試的依據應該是吞吐量。雖然LinkedIn這篇文章做了Kafka的基準測試,但它的結果其實對你意義不大,因為不同的硬體、軟體、負載情況測試出來的結果必然不一樣。我經常碰到的問題類似於,官網說每秒能到10MB,為什麼我的procer每秒才1MB? —— 且不說硬體條件,最後發現他使用的消息體有1KB,而官網的基準測試是用100B測出來的,因此根本沒有可比性。不過你依然可以遵循一定的步驟來嘗試確定分區數:創建一個只有1個分區的topic,然後測試這個topic的procer吞吐量和consumer吞吐量。假設它們的值分別是Tp和Tc,單位可以是MB/s。然後假設總的目標吞吐量是Tt,那麼分區數 = Tt / max(Tp, Tc)
Tp表示procer的吞吐量。測試procer通常是很容易的,因為它的邏輯非常簡單,就是直接發送消息到Kafka就好了。Tc表示consumer的吞吐量。測試Tc通常與應用的關系更大, 因為Tc的值取決於你拿到消息之後執行什麼操作,因此Tc的測試通常也要麻煩一些。
另外,Kafka並不能真正地做到線性擴展(其實任何系統都不能),所以你在規劃你的分區數的時候最好多規劃一下,這樣未來擴展時候也更加方便。
消息-分區的分配
默認情況下,Kafka根據傳遞消息的key來進行分區的分配,即hash(key) % numPartitions,如下圖所示:
def partition(key: Any, numPartitions: Int): Int = {
Utils.abs(key.hashCode) % numPartitions
}

這就保證了相同key的消息一定會被路由到相同的分區。如果你沒有指定key,那麼Kafka是如何確定這條消息去往哪個分區的呢?
if(key == null) { // 如果沒有指定key
val id = sendPartitionPerTopicCache.get(topic) // 先看看Kafka有沒有緩存的現成的分區Id
id match {
case Some(partitionId) =>
partitionId // 如果有的話直接使用這個分區Id就好了
case None => // 如果沒有的話,
val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined) //找出所有可用分區的leader所在的broker
if (availablePartitions.isEmpty)
throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)
val index = Utils.abs(Random.nextInt) % availablePartitions.size // 從中隨機挑一個
val partitionId = availablePartitions(index).partitionId
sendPartitionPerTopicCache.put(topic, partitionId) // 更新緩存以備下一次直接使用
partitionId
}
}

可以看出,Kafka幾乎就是隨機找一個分區發送無key的消息,然後把這個分區號加入到緩存中以備後面直接使用——當然了,Kafka本身也會清空該緩存(默認每10分鍾或每次請求topic元數據時)
如何設定consumer線程數
我個人的觀點,如果你的分區數是N,那麼最好線程數也保持為N,這樣通常能夠達到最大的吞吐量。超過N的配置只是浪費系統資源,因為多出的線程不會被分配到任何分區。讓我們來看看具體Kafka是如何分配的。
topic下的一個分區只能被同一個consumer group下的一個consumer線程來消費,但反之並不成立,即一個consumer線程可以消費多個分區的數據,比如Kafka提供的ConsoleConsumer,默認就只是一個線程來消費所有分區的數據。——其實ConsoleConsumer可以使用通配符的功能實現同時消費多個topic數據,但這和本文無關。
再討論分配策略之前,先說說KafkaStream——它是consumer的關鍵類,提供了遍歷方法用於consumer程序調用實現數據的消費。其底層維護了一個阻塞隊列,所以在沒有新消息到來時,consumer是處於阻塞狀態的,表現出來的狀態就是consumer程序一直在等待新消息的到來。——你當然可以配置成帶超時的consumer,具體參看參數consumer.timeout.ms的用法。
下面說說 Kafka提供的兩種分配策略: range和roundrobin,由參數partition.assignment.strategy指定,默認是range策略。本文只討論range策略。所謂的range其實就是按照階段平均分配。舉個例子就明白了,假設你有10個分區,P0 ~ P9,consumer線程數是3, C0 ~ C2,那麼每個線程都分配哪些分區呢?
C0 消費分區 0, 1, 2, 3
C1 消費分區 4, 5, 6
C2 消費分區 7, 8, 9
具體演算法就是:
val nPartsPerConsumer = curPartitions.size / curConsumers.size // 每個consumer至少保證消費的分區數
val nConsumersWithExtraPart = curPartitions.size % curConsumers.size // 還剩下多少個分區需要單獨分配給開頭的線程們
...
for (consumerThreadId <- consumerThreadIdSet) { // 對於每一個consumer線程
val myConsumerPosition = curConsumers.indexOf(consumerThreadId) //算出該線程在所有線程中的位置,介於[0, n-1]
assert(myConsumerPosition >= 0)
// startPart 就是這個線程要消費的起始分區數
val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
// nParts 就是這個線程總共要消費多少個分區
val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)
...
}

針對於這個例子,nPartsPerConsumer就是10/3=3,nConsumersWithExtraPart為10%3=1,說明每個線程至少保證3個分區,還剩下1個分區需要單獨分配給開頭的若干個線程。這就是為什麼C0消費4個分區,後面的2個線程每個消費3個分區,具體過程詳見下面的Debug截圖信息:
ctx.myTopicThreadIds

nPartsPerConsumer = 10 / 3 = 3
nConsumersWithExtraPart = 10 % 3 = 1

第一次:
myConsumerPosition = 1
startPart = 1 * 3 + min(1, 1) = 4 ---也就是從分區4開始讀
nParts = 3 + (if (1 + 1 > 1) 0 else 1) = 3 讀取3個分區, 即4,5,6
第二次:
myConsumerPosition = 0
startPart = 3 * 0 + min(1, 0) =0 --- 從分區0開始讀
nParts = 3 + (if (0 + 1 > 1) 0 else 1) = 4 讀取4個分區,即0,1,2,3
第三次:
myConsumerPosition = 2
startPart = 3 * 2 + min(2, 1) = 7 --- 從分區7開始讀
nParts = 3 + if (2 + 1 > 1) 0 else 1) = 3 讀取3個分區,即7, 8, 9
至此10個分區都已經分配完畢

說到這里,經常有個需求就是我想讓某個consumer線程消費指定的分區而不消費其他的分區。坦率來說,目前Kafka並沒有提供自定義分配策略。做到這點很難,但仔細想一想,也許我們期望Kafka做的事情太多了,畢竟它只是個消息引擎,在Kafka中加入消息消費的邏輯也許並不是Kafka該做的事情。

不消費問題
第一步:參看消費者的基本情況

查看mwbops系統,【Consumer監控】-->【對應的consumerId】

如果offset數字一直在動,說明一直在消費,說明不存在問題,return;
如果offset數字一直不動,看Owner是不是有值存在
如果Owner是空,說明消費端的程序已經跟Kafka斷開連接,應該排查消費端是否正常,return;
如果Owner不為空,就是有上圖上面的類似於 bennu_index_benuprdapp02-1444748505181-f558155a-0 的文字,繼續看下面內容
第二步:查看消費端的程序代碼
一般的消費代碼是這樣的

看看自己的消費代碼裡面,存不存在處理消息的時候出異常的情況
如果有,需要try-catch一下,其實不論有沒有異常,都用try-catch包一下最好,如下面代碼

return;
原因:如果在處理消息的時候有異常出現,又沒有進行處理,那麼while循環就會跳出,線程會結束,所以不會再去取消息,就是消費停止了。
第三步:查看消費端的配置
消費代碼中一般以以下方式創建Consumer

消費端有一個配置,叫 fetch.message.max.bytes,默認是1M,此時如果有消息大於1M,會發生停止消費的情況。
此時,在配置中增加 props.put("fetch.message.max.bytes", "10 * 1024 * 1024"); 即可
return;
原因:目前Kafka集群配置的運行最大的消息大小是10M,如果客戶端配置的運行接收的消息是1M,跟Kafka服務端配置的不一致,
則消息大於1M的情況下,消費端就無法消費,導致一直卡在這一條消息,現象就是消費停止。

⑨ 《ApacheKafka源碼剖析》pdf下載在線閱讀,求百度網盤雲資源

《Apache Kafka源碼剖析》(徐郡明)電子書網盤下載免費在線閱讀

資源鏈接:

鏈接:

提取碼:tmjo

書名:Apache Kafka源碼剖析

作者:徐郡明

豆瓣評分:8.4

出版社:電子工業出版社

出版年份:2017-5

頁數:604

內容簡介:

《Apache Kafka源碼剖析》以Kafka 0.10.0版本源碼為基礎,針對Kafka的架構設計到實現細節進行詳細闡述。《Apache Kafka源碼剖析》共5章,從Kafka的應用場景、源碼環境搭建開始逐步深入,不僅介紹Kafka的核心概念,而且對Kafka生產者、消費者、服務端的源碼進行深入的剖析,最後介紹Kafka常用的管理腳本實現,讓讀者不僅從宏觀設計上了解Kafka,而且能夠深入到Kafka的細節設計之中。在源碼分析的過程中,還穿插了筆者工作積累的經驗和對Kafka設計的理解,希望讀者可以舉一反三,不僅知其然,而且知其所以然。

《Apache Kafka源碼剖析》旨在為讀者閱讀Kafka源碼提供幫助和指導,讓讀者更加深入地了解Kafka的運行原理、設計理念,讓讀者在設計分布式系統時可以參考Kafka的優秀設計。《Apache Kafka源碼剖析》的內容對於讀者全面提升自己的技術能力有很大幫助。

⑩ Kafka Consumer Offset解析

Kafka __consumer_offsets是一個特殊的存儲元數據的Topic
數據格式可以想像成一個 KV 格式的消息,key 就是一個三元組:group.id+topic+分區號,而 value 就是 offset 的值。

查看方式:使用kafka自帶的讀取類
./bin/kafka-console-consumer.sh --topic __consumer_offsets --partition 01 --bootstrap-server xxx:9092 --formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter" --from-beginning --max-messages 30

一般情況下, 使用 OffsetsMessageFormatter 列印的格式可以概括為:
"[%s,%s,%d]::[OffsetMetadata[%d,%s],CommitTime %d,ExpirationTime %d]".format(group, topic, partition, offset, metadata, commitTimestamp, expireTimestamp)

數據內容:
[flink-payment-alert_query_time_1576066085229,payment-result-count,4]::NULL
[flink-payment-alert_query_time_1576066085229,payment-result-count,3]::NULL
[flink-payment-alert_query_time_1576066085229,payment-result-count,9]::NULL

另外一種是
[work_default_yw.int.spring.cxyw.blackgold.kafka.orderdomain.core.sub,work_default_yw.int.spring.cxyw.blackgold.kafka.orderdomain.topic,0]::OffsetAndMetadata(offset=19, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1636939024066, expireTimestamp=None)
[work_default_yw.int.spring.cxyw.blackgold.kafka.orderdomain.core.sub,work_default_yw.int.spring.cxyw.blackgold.kafka.orderdomain.topic,0]::OffsetAndMetadata(offset=19, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1636939028621, expireTimestamp=None)
[work_default_yw.int.spring.cxyw.blackgold.kafka.orderdomain.core.sub,work_default_yw.int.spring.cxyw.blackgold.kafka.orderdomain.topic,0]::OffsetAndMetadata(offset=19, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1636939033680, expireTimestamp=None)

還有一種是
[ProcessEngineBusinessProcess,CasBusinessTopic,1]::[OffsetMetadata[99649027,NO_METADATA],CommitTime 1636930671854,ExpirationTime 1637017071854]
[ProcessEngineBusinessProcess,CasBusinessTopic,0]::[OffsetMetadata[99650360,NO_METADATA],CommitTime 1636930671854,ExpirationTime 1637017071854]
[ProcessEngineBusinessProcess,CasBusinessTopic,3]::[OffsetMetadata[99640798,NO_METADATA],CommitTime 1636930672471,ExpirationTime 1637017072471]

分別解釋一下:

在 Kafka 中有一個名為「delete-expired-group-metadata」的定時任務來負責清理過期的消費位移,這個定時任務的執行周期由參數 offsets.retention.check.interval.ms 控制,默認值為600000,即10分鍾。這和普通的topic的不太一樣

還有 metadata,一般情況下它的值要麼為 null 要麼為空字元串,OffsetsMessageFormatter 會把它展示為 NO_METADATA,否則就按實際值進行展示。

看一下源碼里這些類的結構
case class OffsetAndMetadata(offsetMetadata: OffsetMetadata,
commitTimestamp: Long = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP,
expireTimestamp: Long = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP) {

case class OffsetMetadata(offset: Long, metadata: String = OffsetMetadata.NoMetadata) {
override def toString = "OffsetMetadata[%d,%s]"
.format(offset,
if (metadata != null && metadata.length > 0) metadata else "NO_METADATA")
}

@Deprecated
public static final long DEFAULT_TIMESTAMP = -1L; // for V0, V1

另外0.11.0之後對應的數據格式版本是V2,這個版本的消息相比於v0和v1的版本而言改動很大,同時還參考了Protocol Buffer而引入了變長整型(Varints)和ZigZag編碼。

另外:
offset為什麼會有墓碑消息?
因為offset本身也會過期清理.受offsets.retention.minutes 這個配置的影響
看下官網介紹
After a consumer group loses all its consumers (i.e. becomes empty) its offsets will be kept for this retention period before getting discarded. For standalone consumers (using manual assignment), offsets will be expired after the time of last commit plus this retention period.
當group里的consumer全部下線後過offsets.retention.minutes 時間後offset就會被刪除
val OffsetsRetentionMinutes: Int = 7 * 24 * 60 // 默認7天
默認2.0之前是1天,2.0及以後是7天 這個官方真是..要麼就改為2天,結果直接改為7天,改動不可謂不大,而且active的group不會過期

附: https://cwiki.apache.org/confluence/display/KAFKA/KIP-186%3A+Increase+offsets+retention+default+to+7+days

另外active的group無法修改consumer offset?
Usually we do not allow committed offset changes while a group is active because we do not have a mechanism to notify the group of the change.
原因是無法通知到組成員consumer offset的變更

熱點內容
sqlserver注冊表清理 發布:2024-05-18 20:13:14 瀏覽:988
linux刪除連接 發布:2024-05-18 20:06:56 瀏覽:820
linux搭建雲伺服器平台 發布:2024-05-18 19:52:21 瀏覽:400
安卓怎麼關閉美易訂閱 發布:2024-05-18 19:29:16 瀏覽:642
蘋果手機配置代理伺服器怎麼開 發布:2024-05-18 19:29:07 瀏覽:229
伺服器屏蔽了別人的ip 發布:2024-05-18 19:10:09 瀏覽:619
怎麼獲取ins伺服器地址 發布:2024-05-18 19:10:03 瀏覽:30
仙方一般是什麼配置 發布:2024-05-18 18:53:43 瀏覽:159
黑莓安卓手機主題下載到哪裡 發布:2024-05-18 18:47:18 瀏覽:57
湯靖軒編程 發布:2024-05-18 18:46:04 瀏覽:533