當前位置:首頁 » 操作系統 » eclipsekafka源碼

eclipsekafka源碼

發布時間: 2023-03-26 11:44:32

❶ apache kafka源碼怎麼編譯

Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design.(Kafka是一個分布式的、可分區的(partitioned)、基於備份的(replicated)和commit-log存儲的服務.。它提供了類似於messaging system的特性,但是在設計實現上完全不同)。kafka是一種高吞吐量的分布式發布訂閱消息系統,它有如下特性:
(1)、通過O(1)的磁碟數據結構提供消息的持久化,這種結構對於即使數以TB的消息存儲也能夠保持長時間的穩定性能。
(2)、高吞吐量:即使是非常普通的硬體kafka也可以支持每秒數十萬的消息。
(3)、支持通過kafka伺服器和消費機集群來分區消息。
(4)、支持Hadoop並行數據載入。
一、用Kafka裡面自帶的腳本進行編譯
下載好了Kafka源碼,裡面自帶了一個gradlew的腳本,我們可以利用這個編譯Kafka源碼:
1 # wget http://mirror.bit.e.cn/apache/kafka/0.8.1.1/kafka-0.8.1.1-src.tgz
2 # tar -zxf kafka-0.8.1.1-src.tgz
3 # cd kafka-0.8.1.1-src
4 # ./gradlew releaseTarGz
運行上面的命令進行編譯將會出現以下的異常信息:
01 :core:signArchives FAILED
02
03 FAILURE: Build failed with an exception.
04
05 * What went wrong:
06 Execution failed for task ':core:signArchives'.
07 > Cannot perform signing task ':core:signArchives' because it
08 has no configured signatory
09
10 * Try:
11 Run with --stacktrace option to get the stack trace. Run with
12 --info or --debug option to get more log output.
13
14 BUILD FAILED
這是一個bug(https://issues.apache.org/jira/browse/KAFKA-1297),可以用下面的命令進行編譯
1 ./gradlew releaseTarGzAll -x signArchives
這時候將會編譯成功(在編譯的過程中將會出現很多的)。在編譯的過程中,我們也可以指定對應的Scala版本進行編譯:
1 ./gradlew -PscalaVersion=2.10.3 releaseTarGz -x signArchives
編譯完之後將會在core/build/distributions/裡面生成kafka_2.10-0.8.1.1.tgz文件,這個和從網上下載的一樣,可以直接用。
二、利用sbt進行編譯
我們同樣可以用sbt來編譯Kafka,步驟如下:
01 # git clone https://git-wip-us.apache.org/repos/asf/kafka.git
02 # cd kafka
03 # git checkout -b 0.8 remotes/origin/0.8
04 # ./sbt update
05 [info] [SUCCESSFUL ] org.eclipse.jdt#core;3.1.1!core.jar (2243ms)
06 [info] downloading http://repo1.maven.org/maven2/ant/ant/1.6.5/ant-1.6.5.jar ...
07 [info] [SUCCESSFUL ] ant#ant;1.6.5!ant.jar (1150ms)
08 [info] Done updating.
09 [info] Resolving org.apache.hadoop#hadoop-core;0.20.2 ...
10 [info] Done updating.
11 [info] Resolving com.yammer.metrics#metrics-annotation;2.2.0 ...
12 [info] Done updating.
13 [info] Resolving com.yammer.metrics#metrics-annotation;2.2.0 ...
14 [info] Done updating.
15 [success] Total time: 168 s, completed Jun 18, 2014 6:51:38 PM
16
17 # ./sbt package
18 [info] Set current project to Kafka (in build file:/export1/spark/kafka/)
19 Getting Scala 2.8.0 ...
20 :: retrieving :: org.scala-sbt#boot-scala
21 confs: [default]
22 3 artifacts copied, 0 already retrieved (14544kB/27ms)
23 [success] Total time: 1 s, completed Jun 18, 2014 6:52:37 PM
對於Kafka 0.8及以上版本還需要運行以下的命令:
01 # ./sbt assembly-package-dependency
02 [info] Loading project definition from /export1/spark/kafka/project
03 [warn] Multiple resolvers having different access mechanism configured with
04 same name 'sbt-plugin-releases'. To avoid conflict, Remove plicate project
05 resolvers (`resolvers`) or rename publishing resolver (`publishTo`).
06 [info] Set current project to Kafka (in build file:/export1/spark/kafka/)
07 [warn] Credentials file /home/wyp/.m2/.credentials does not exist
08 [info] Including slf4j-api-1.7.2.jar
09 [info] Including metrics-annotation-2.2.0.jar
10 [info] Including scala-compiler.jar
11 [info] Including scala-library.jar
12 [info] Including slf4j-simple-1.6.4.jar
13 [info] Including metrics-core-2.2.0.jar
14 [info] Including snappy-java-1.0.4.1.jar
15 [info] Including zookeeper-3.3.4.jar
16 [info] Including log4j-1.2.15.jar
17 [info] Including zkclient-0.3.jar
18 [info] Including jopt-simple-3.2.jar
19 [warn] Merging 'META-INF/NOTICE' with strategy 'rename'
20 [warn] Merging 'org/xerial/snappy/native/README' with strategy 'rename'
21 [warn] Merging 'META-INF/maven/org.xerial.snappy/snappy-java/LICENSE'
22 with strategy 'rename'
23 [warn] Merging 'LICENSE.txt' with strategy 'rename'
24 [warn] Merging 'META-INF/LICENSE' with strategy 'rename'
25 [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard'
26 [warn] Strategy 'discard' was applied to a file
27 [warn] Strategy 'rename' was applied to 5 files
28 [success] Total time: 3 s, completed Jun 18, 2014 6:53:41 PM
當然,我們也可以在sbt裡面指定scala的版本:
01 <!--
02 User: 過往記憶
03 Date: 14-6-18
04 Time: 20:20
05 bolg: http://www.iteblog.com
06 本文地址:http://www.iteblog.com/archives/1044
07 過往記憶博客,專注於hadoop、hive、spark、shark、flume的技術博客,大量的干貨
08 過往記憶博客微信公共帳號:iteblog_hadoop
09 -->
10 sbt "++2.10.3 update"
11 sbt "++2.10.3 package"
12 sbt "++2.10.3 assembly-package-dependency"

❷ Kafka 源碼解析之 Topic 的新建/擴容/刪除

[TOC]
本篇接著講述 Controller 的功能方面的內容,在 Kafka 中,一個 Topic 的新建、擴容或者刪除都是由 Controller 來操作的,本篇文章也是主要聚焦在 Topic 的操作處理上(新建、擴容、刪除),實際上 Topic 的創建在 Kafka 源碼解析之 topic 創建過程(三) 中已經講述過了,本篇與前面不同的是,本篇主要是從 Controller 角度來講述,而且是把新建、擴容、刪除這三個 Topic 級別的操作放在一起做一個總結。

這里把 Topic 新建與擴容放在一起講解,主要是因為無論 Topic 是新建還是擴容,在 Kafka 內部其實都是 Partition 的新建,底層的實現機制是一樣的,Topic 的新建與擴容的整體流程如下圖所示:

Topic 新建與擴容觸發條件的不同如下所示:

下面開始詳細講述這兩種情況。

Topic 擴容
Kafka 提供了 Topic 擴容工具,假設一個 Topic(topic_test)只有一個 partition,這時候我們想把它擴容到兩個 Partition,可以通過下面兩個命令來實現:

這兩種方法的區別是:第二種方法直接指定了要擴容的 Partition 2 的副本需要分配到哪台機器上,這樣的話我們可以精確控制到哪些 Topic 放下哪些機器上。

無論是使用哪種方案,上面兩條命令產生的結果只有一個,將 Topic 各個 Partition 的副本寫入到 ZK 對應的節點上,這樣的話 /brokers/topics/topic_test 節點的內容就會發生變化, 監聽器就會被觸發 ,該監聽器的處理流程如下:

其 doHandleDataChange() 方法的處理流程如下:

下面我們看下 onNewPartitionCreation() 方法,其實現如下:

關於 Partition 的新建,總共分了以下四步:

經過上面幾個階段,一個 Partition 算是真正創建出來,可以正常進行讀寫工作了,當然上面只是講述了 Controller 端做的內容,Partition 副本所在節點對 LeaderAndIsr 請求會做更多的工作,這部分會在後面關於 LeaderAndIsr 請求的處理中只能夠詳細講述。

Topic 新建
Kafka 也提供了 Topic 創建的工具,假設我們要創建一個名叫 topic_test,Partition 數為2的 Topic,創建的命令如下:

跟前面的類似,方法二是可以精確控制新建 Topic 每個 Partition 副本所在位置,Topic 創建的本質上是在 /brokers/topics 下新建一個節點信息,並將 Topic 的分區詳情寫入進去,當 /brokers/topics 有了新增的 Topic 節點後,會觸發 TopicChangeListener 監聽器,其實現如下:

只要 /brokers/topics 下子節點信息有變化(topic 新增或者刪除),TopicChangeListener 都會被觸發,其 doHandleChildChange() 方法的處理流程如下:

接著看下 onNewTopicCreation() 方法實現

上述方法主要做了兩件事:

onNewPartitionCreation() 的實現在前面 Topic 擴容部分已經講述過,這里不再重復,最好參考前面流程圖來梳理 Topic 擴容和新建的整個過程。

Kafka Topic 刪除這部分的邏輯是一個單獨線程去做的,這個線程是在 Controller 啟動時初始化和啟動的。

TopicDeletionManager 初始化
TopicDeletionManager 啟動實現如下所示:

TopicDeletionManager 啟動時只是初始化了一個 DeleteTopicsThread 線程,並啟動該線程。TopicDeletionManager 這個類從名字上去看,它是 Topic 刪除的管理器,它是如何實現 Topic 刪除管理呢,這里先看下該類的幾個重要的成員變數:

前面一小節,簡單介紹了 TopicDeletionManager、DeleteTopicsThread 的啟動以及它們之間的關系,這里我們看下一個 Topic 被設置刪除後,其處理的整理流程,簡單做了一個小圖,如下所示:

這里先簡單講述上面的流程,當一個 Topic 設置為刪除後:

先看下 DeleteTopicsListener 的實現,如下:

其 doHandleChildChange() 的實現邏輯如下:

接下來,看下 Topic 刪除線程 DeleteTopicsThread 的實現,如下所示:

doWork() 方法處理邏輯如下:

先看下 onTopicDeletion() 方法,這是 Topic 最開始刪除時的實現,如下所示:

Topic 的刪除的真正實現方法還是在 startReplicaDeletion() 方法中,Topic 刪除時,會先調用 onPartitionDeletion() 方法刪除所有的 Partition,然後在 Partition 刪除時,執行 startReplicaDeletion() 方法刪除該 Partition 的副本,該方法的實現如下:

該方法的執行邏輯如下:

在將副本狀態從 OfflineReplica 轉移成 ReplicaDeletionStarted 時,會設置一個回調方法 (),該方法會將刪除成功的 Replica 設置為 ReplicaDeletionSuccessful 狀態,刪除失敗的 Replica 設置為 ReplicaDeletionIneligible 狀態(需要根據 StopReplica 請求處理的過程,看下哪些情況下 Replica 會刪除失敗,這個會在後面講解)。

下面看下這個方法 completeDeleteTopic(),當一個 Topic 的所有 Replica 都刪除成功時,即其狀態都在 ReplicaDeletionSuccessful 時,會調用這個方法,如下所示:

當一個 Topic 所有副本都刪除後,會進行如下處理:

至此,一個 Topic 算是真正刪除完成。

❸ 《ApacheKafka源碼剖析》pdf下載在線閱讀,求百度網盤雲資源

《Apache Kafka源碼剖析》(徐郡明)電子書網盤下載免費在線閱讀

資源鏈接:

鏈接:

提取碼:tmjo

書名:Apache Kafka源碼剖析

作者:徐郡明

豆瓣評分:8.4

出版社:電子工業出版社

出版年份:2017-5

頁數:604

內容簡介:

《Apache Kafka源碼剖析》以Kafka 0.10.0版本源碼為基礎,針對Kafka的架構設計到實現細節進行詳細闡述。《Apache Kafka源碼剖析》共5章,從Kafka的應用場景、源碼環境搭建開始逐步深入,不僅介紹Kafka的核心概念,而且對Kafka生產者、消費者、服務端的源碼進行深入的剖析,最後介紹Kafka常用的管理腳本實現,讓讀者不僅從宏觀設計上了解Kafka,而且能夠深入到Kafka的細節設計之中。在源碼分析的過程中,還穿插了筆者工作積累的經驗和對Kafka設計的理解,希望讀者可以舉一反三,不僅知其然,而且知其所以然。

《Apache Kafka源碼剖析》旨在為讀者閱讀Kafka源碼提供幫助和指導,讓讀者更加深入地了解Kafka的運行原理、設計理念,讓讀者在設計分布式系統時可以參考Kafka的優秀設計。《Apache Kafka源碼剖析》的內容對於讀者全面提升自己的技術能力有很大幫助。

❹ Kafka 源碼解析之 Consumer 兩種 commit 機制和 partition 分配機制

先看下兩種不同的 commit 機制,一種是同步 commit,一種是非同步 commit,既然其作用都是 offset commit,應該不難猜到它們底層使用介面都是一樣的

同步 commit

同步 commit 的實現方式,client.poll() 方法會阻塞直到這個request 完成或超時才會返回。

非同步 commit

而對於非同步的 commit,最後調用的都是 doCommitOffsetsAsync 方法,其具體實現如下:

在非同步 commit 中,可以添加相應的回調函數,如果 request 處理成功或處理失敗,ConsumerCoordinator 會通過 () 方法喚醒相應的回調函數。

關鍵區別在於future是否會get,同步提交就是future會get.

consumer 提供的兩種不同 partition 分配策略,可以通過 partition.assignment.strategy 參數進行配置,默認情況下使用的是 org.apache.kafka.clients.consumer.RangeAssignor,Kafka 中提供另一種 partition 的分配策略 org.apache.kafka.clients.consumer.RoundRobinAssignor

用戶可以自定義相應的 partition 分配機制,只需要繼承這個 AbstractPartitionAssignor 抽象類即可。

AbstractPartitionAssignor

AbstractPartitionAssignor 有一個抽象方法,如下所示:

assign() 這個方法,有兩個參數:

RangeAssignor 和 RoundRobinAssignor 通過這個方法 assign() 的實現,來進行相應的 partition 分配。

直接看一下這個方法的實現:

假設 topic 的 partition 數為 numPartitionsForTopic,group 中訂閱這個 topic 的 member 數為 consumersForTopic.size(),首先需要算出兩個值:

分配的陪巧規則是:對於剩下的那些 partition 分配到前 consumersWithExtraPartition 個 consumer 上,也就是前 consumersWithExtraPartition 個 consumer 獲得 topic-partition 列表會比後面多一個。

在上述的程序中,舉了一個例子,假設有一個 topic 有 7 個 partition,group 有5個 consumer,這個5個 consumer 都訂閱這個 topic,那麼 range 的分配方式如下:

而如果 group 中有 consumer 沒有訂閱這個 topic,那麼這個 consumer 將不會參與分配。下面再舉個例子,將有兩個 topic,一個 partition 有5個,一個孝銀 partition 有7個,group 有5個 consumer,但是只有前3個訂閱第一個 topic,而另一個 topic 是所有 consumer 都訂閱了,那麼其分配結果如下:

這個是 roundrobin 的實現,其實現方法如下:

roundrobin 的實現原則,簡單來說就是:列出所有 topic-partition 和列出所有的 consumer member,然後開始分配,一輪之後繼續下一輪,假設有有一個 topic,它有7個 partition,group 有3個 consumer 都訂閱了這個 topic,那麼其分配方式為:

對於多個 topic 的訂閱,將有兩個 topic,一個 partition 有5個,一個 partition 有7個,group 有5個 consumer,但是蘆慎鍵只有前3個訂閱第一個 topic,而另一個 topic 是所有 consumer 都訂閱了,那麼其分配結果如下:

roundrobin 分配方式與 range 的分配方式還是略有不同。

❺ kafka中的key有啥作用

kafka源碼ProcerRecord.java類的注釋說明了key的作用,注釋如下:

A key/value pair to be sent to Kafka. This consists of a topic name to which the record is being sent, an optional partition number, and an optional key and value.
一個k/v對被發送到kafka。這包含被發送記錄的主題名字,一個可選的分區編號,一個可選的key和value。

If a valid partition number is specified that partition will be used when sending the record. If no partition is specified but a key is present a partition will be chosen using a hash of the key. If neither key nor partition is present a partition will be assigned in a round-robin fashion.
如果一個有效的partition屬性數值被指定,那麼在發送記鍵喊帶錄時partition屬性數值就會被應用。如果沒有partition屬性數值被指定,而一個key屬性被聲明的話,一個partition會通過key的hash而被選中。如果既沒有key也滲蔽沒有partition屬性數值被聲明稿蘆,那麼一個partition將會被分配以輪詢的方式。

The record also has an associated timestamp. If the user did not provide a timestamp, the procer will stamp the record with its current time. The timestamp eventually used by Kafka depends on the timestamp type configured for the topic.
record也有一個關聯的時間戳。如果用戶未提供一個時間戳,procer 將會通過當前的時間標記此record。時間戳最終會被kafka應用,其依賴時間戳類型來配置主題。

If the topic is configured to use {@link org.apache.kafka.common.record.TimestampType#CREATE_TIME CreateTime},the timestamp in the procer record will be used by the broker。
如果主題是配置用的CREATE_TIME ,在procer記錄中的時間戳將會被broker應用。

If the topic is configured to use {@link org.apache.kafka.common.record.TimestampType#LOG_APPEND_TIME LogAppendTime}, the timestamp in the procer record will be overwritten by the broker with the broker local time when it appends the message to its log.
如果主題被配置用的LogAppendTime,當broker添加消息到它的日誌中的時候,procer記錄中的時間戳將會被broker覆蓋掉,覆蓋成以broker本地的時間。

In either of the cases above, the timestamp that has actually been used will be returned to user in {@link RecordMetadata}
對於以上兩者之一,確實已被應用的時間戳將會在RecordMetadata中返回給用戶。

❻ 使用IDEA 打開、調試kafka源碼

kafka的啟動類是隱激:
core/src/main/scala/kafka/Kafka.scala

我嘗試在本地運晌譽行,死灶謹襪活跑不起來,報錯如下。網上也沒有找到靠譜的解決辦法。

嘗試本地運行失敗後,又嘗試了遠程調試的方式。

❼ kafka在本地eclipse上跑不起來,配置是不是漏掉什麼了

1、wmqtt.jar和mqttv3.jar有什麼區別? ...
2、MQTT本身只是個協議枝孝,而wmqtt.jar和mqttv ...
3、我看git裡面提供的Android推送Demo都唯搭野是使指喊用MQTT+ActiveMQ的,這兩個是如何結合到...

❽ 小記一次Kafka集群響應慢問題追查

某一天業務來找我,反映發數據到某一個Kafka集群特別慢。
並且他們提供了一份自己的測試結果,結果顯示發送數據到Kafka集群A的平均響應延遲在10ms以內,但是發送到Kafka集群B的平均響應延遲卻達到了2000ms+。
這種問題一般是比較頭疼的,首先,我們Kafka集群都有監控和報警,通過查看可用性、流量變化、Kafka日誌等方式,都沒有發現任何異樣;其次,響應慢也有可能和用戶的使用方式和測試方法有關系。
因此第一步,我決定驗證一下問題的存在。

在 kafka/bin 目錄中,kafka提供了一個寫請求性能測試腳本 kafka-procer-perf-test.sh 。
這個腳本會運行kafka中的 kafka.perf.ProcerPerformance 類,發送消息到kafka並輸出CSV報告。
測試命令如下:

kafka/bin/kafka-procer-perf-test.sh --broker-list ${BROKER_LIST} --topics perf-test-topic --show-detailed-stats --messages 10000 --csv-reporter-enabled --metrics-dir ./perf-report

通過分析生成的報告,發現確實有一台節點的響應比較慢:

可以看到P999分布已經達到了1300ms左右,這顯然是不正常的,但是原因是什麼呢?

既然日誌沒有問題,那隻能看一下jstack信息了:

如上發現jstack中有非常奇怪的信息,很多kafka-request-handler線程都處於阻塞狀態。
這里簡單解釋一下kafka的處理請求線程模型,引用一篇講 Kafka NIO網路通信的文章 中的圖來說明:

如圖,kafka採用了Java NIO中的selector模型。一個Acceptor線程負責接受請求,多個Processor線程負責處理請求。但實際上Processor線程只是把請求封裝成kafka request,然後丟到RequestChannel中(當然也負責讀取response並返回,這里不展開)。真正執行請求的是KafkaRequestHandler,即jstack中的kafka-request-handler線程。
所以當kafka-request-handler線程出現大量阻塞的時候,會極大逗悉兆地影響整個節點的響應效率。

關於Java線程中的BLOCKED狀態,可以直接看一下Java doc說明:

可見kafka-request-handler線程是因為搶鎖而發生了阻塞。我們根據jstack信息中的 kafka.cluster.Partition.appendMessagesToLeader 定位到了源碼:

可以看到這個方法確實是同步的,同步對象是leaderIsrUpdateLock。由於leaderIsrUpdateLock是 kafka.cluster.Partition 的成員變數,也就是說只有在寫同一個topic partition的時候,才會發生互斥等待。
所以發生上面問題的原因陸悉只可能是某個topic有大量的寫請求,而且這個topic的partition數量不多,導致並發不足。
於是大量該topic的ProceRequest佔用了kafka-request-handler線程池,但是這些線程之間互相搶鎖,執行效率比較低,從而導致其他topic的請求無法及時被處理。

通過分析日誌和查看監控流量,定位到集群中某個topic的ProceRequest請求的QPS佔了整個集群的山租80%以上。
通過查看該topic監控指標中的單位時間內的消息條目數(MessagesInPerSec)和單位時間內的發送請求數(ProceRequestPerSec),可以計算出該Topic平均不到10條消息就會觸發一次kafka寫請求;再考慮到partition數量,推測應該是業務採用了kafka procer的同步模式,每條消息都觸發一次kafka寫請求。
解決方法有兩種:

當然,增加topic partition數量也能在一定程度上緩解問題,因為不同partition之間的寫請求是不互斥的,但這種方式更像是治標不治本,掩蓋了根本問題。

合理地發送網路請求在分布式系統中非常重要,為了提高效率,通常在權衡時效性和吞吐的情況下,以「聚少為多」的方式發送批量的請求。過多的小請求不僅會降低吞吐,還可能會壓垮後端的服務。
當然,作為服務提供方,需要通過多租戶、限流等方式,避免不正常使用的場景把服務壓垮。

❾ 一文解密Kafka,Kafka源碼設計與實現原理剖析,真正的通俗易懂

Apache Kafka (簡稱Kafka )最早是由Linkedln開源出來的分布式消息系統,現在是Apache旗下的一個子項目,並且已經成為開冊、領域應用最廣泛的消息系統之 Kafka社區也非常活躍,從 版本開始, Kafka 的標語已經從「一個高吞吐量、分布式的消息系統」改為「一個分布式的流平台」
關於Kafka,我打算從入門開始講起,一直到它的底層實現邏輯個原理以及源碼,建議大家花點耐心,從頭開始看,相信會對你有所收獲。

作為 個流式數據平台,最重要的是要具備下面 個特點

消息系統:
消息系統 也叫作消息隊列)主要有兩種消息模型:隊列和發布訂Kafka使用消費組( consumer group )統 上面兩種消息模型 Kafka使用隊列模型時,它可以將處理 作為平均分配給消費組中的消費者成員

下面我們會從 個角度分析Kafka 的幾個基本概念,並嘗試解決下面 個問題

消息由生產者發布到 fk 集群後,會被消費者消費 消息的消費模型有兩種:推送模型( pu和拉取模型( pull 基於推送模型的消息系統,由消息代理記錄消費者的消費狀態 消息代理在將消息推送到消費者後 標記這條消息為已消費

但這種方式無法很好地保證消息的處理語義 比如,消息代理把消息發送出去後,當消費進程掛掉或者由於網路原因沒有收到這條消息時,就有可能造成消息丟失(因為消息代理已經 這條消息標記為自己消費了,但實際上這條消息並沒有被實際處理) 如果要保證消息的處理語義,消息代理發送完消息後,要設置狀態為「已發送」,只有收到消費者的確認請求後才更新為「已消費」,這就需要在消息代理中記錄所有消息的消費狀態,這種做法也是不可取的

Kafka每個主題的多個分區日誌分布式地存儲在Kafka集群上,同時為了故障容錯,每個分區都會以副本的方式復制到多個消息代理節點上 其中一個節點會作為主副本( Leader ),其 節點作為備份副本( Follower ,也叫作從副本)

主副本會負責所有的客戶端讀寫操作,備份副本僅僅從主副本同步數據 當主副本 IH 現在故障時,備份副本中的 副本會被選擇為新的主副本 因為每個分區的副本中只有主副本接受讀寫,所以每個服務端都會作為某些分區的主副本,以及另外一些分區的備份副本這樣Kafka集群的所有服務端整體上對客戶端是負載均衡的

消息系統通常由生產者「pro ucer 消費者( co sumer )和消息代理( broke 大部分組成,生產者會將消息寫入消息代理,消費者會從消息代理中讀取消息 對於消息代理而言,生產者和消費者都屬於客戶端:生產者和消費者會發送客戶端請求給服務端,服務端的處理分別是存儲消息和獲取消息,最後服務端返回響應結果給客戶端

新的生產者應用程序使用 af aP oce 對象代表 個生產者客戶端進程 生產者要發送消息,並不是直接發送給 務端 ,而是先在客戶端 消息放入隊列 然後 一個 息發送線程從隊列中消息,以 鹽的方式發送消息給服務端 Kafka的記 集器( Reco dACCUl'lUlato )負責緩存生產者客戶端產生的消息,發送線程( Sende )負責讀取 集器的批 過網路發送給服務端為了保證客戶端 絡請求 快速 應, Kafka 用選擇器( Selecto 絡連接 讀寫 理,使網路連接( Netwo kCl i.ent )處理客戶端 絡請求

追加消息到記錄收集器時按照分區進行分組,並放到batches集合中,每個分區的隊列都保存了將發送到這個分區對應節點上的 記錄,客戶端的發送線程可 只使用 Sende 線程迭 batches的每個分區,獲取分區對應的主劇本節點,取出分區對應的 列中的批記錄就可以發送消息了

消息發送線程有兩種消息發送方式 按照分區直接發送 按照分區的目標節點發迭 假設有兩台伺服器, 題有 個分區,那麼每台伺服器就有 個分區 ,消息發送線程迭代batches的每個分 接往分區的主副本節點發送消息,總共會有 個請求 所示,我 先按照分區的主副本節點進行分組, 屬於同 個節點的所有分區放在一起,總共只有兩個請求做法可以大大減少網路的開銷

消息系統由生產者 存儲系統和消費者組成 章分析了生產者發送消息給服務端的過程,本章分析消費者從服務端存儲系統讀取生產者寫入消息的過程 首先我 來了解消費者的 些基礎知識

作為分布式的消息系統, Kafka支持多個生產者和多個消費者,生產者可以將消息發布到集群中不同節點的不同分區上;「肖費者也可以消費集群中多個節點的多個分區上的消息 寫消息時,多個生產者可以 到同 個分區 讀消息時,如果多個消費者同時讀取 個分區,為了保證將日誌文件的不同數據分配給不同的消費者,需要採用加鎖 同步等方式,在分區級別的日誌文件上做些控制

相反,如果約定「同 個分區只可被 個消費者處理」,就不需要加鎖同步了,從而可提升消費者的處理能力 而且這也並不違反消息的處理語義:原先需要多個消費者處理,現在交給一個消費者處理也是可以的 3- 給出了 種最簡單的消息系統部署模式,生產者的數據源多種多樣,它們都統寫人Kafka集群 處理消息時有多個消費者分擔任務 ,這些消費者的處理邏輯都相同, 每個消費者處理的分區都不會重復

因為分區要被重新分配,分區的所有者都會發生變 ,所以在還沒有重新分配分區之前 所有消費者都要停止已有的拉取錢程 同時,分區分配給消費者都會在ZK中記錄所有者信息,所以也要先刪ZK上的節點數據 只有和分區相關的 所有者 拉取線程都釋放了,才可以開始分配分區

如果說在重新分配分區前沒有釋放這些信息,再平衡後就可能造成同 個分區被多個消費者所有的情況 比如分區Pl 原先歸消費者 所有,如果沒有釋放拉取錢程和ZK節點,再平衡後分區Pl 被分配給消費者 了,這樣消費者 和消費者 就共享了分區Pl ,而這顯然不符合 fka 中關於「一個分區只能被分配給 個消費者」的限制條件 執行再平衡操作的步驟如下

如果是協調者節點發生故障,服務端會有自己的故障容錯機制,選出管理消費組所有消費者的新協調者節,點消費者客戶端沒有權利做這個工作,它能做的只是等待一段時間,查詢服務端是否已經選出了新的協調節點如果消費者查到現在已經有管理協調者的協調節點,就會連接這個新協調節,哉由於這個協調節點是服務端新選出來的,所以每個消費者都應該重新連接協調節點

消費者重新加入消費組,在分配到分區的前後,都會對消費者的拉取工作產生影響 消費者發送「加入組請求」之前要停止拉取消息,在收到「加入組響應」中的分區之後要重新開始拉取消息時,為了能夠讓客戶端應用程序感知消費者管理的分區發生變化,在加入組前後,客戶端還可以設置自定義的「消費者再平衡監聽器」,以便對分區的變化做出合適的處理


❿ 如何保證kafka 的消息機制 ack-fail 源碼跟蹤

Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design.(Kafka布式、區(partitioned)、基於備份(replicated)commit-log存儲服務.提供類似於messaging system特性,設計實現完全同)kafka種高吞吐量布式發布訂閱消息系統特性:
(1)、通O(1)磁碟數據結構提供消息持久化種結構於即使數TB消息存儲能夠保持間穩定性能
(2)、高吞吐量:即使非普通硬體kafka支持每秒數十萬消息
(3)、支持通kafka伺服器消費機集群區消息
(4)、支持Hadoop並行數據載入
、用Kafka面自帶腳本進行編譯
載Kafka源碼面自帶gradlew腳本我利用編譯Kafka源碼:
1 # wget
2 # tar -zxf kafka-0.8.1.1-src.tgz
3 # cd kafka-0.8.1.1-src
4 # ./gradlew releaseTarGz
運行面命令進行編譯現異信息:
01 :core:signArchives FAILED
02
03 FAILURE: Build failed with an exception.
04
05 * What went wrong:
06 Execution failed for task ':core:signArchives'.
07 > Cannot perform signing task ':core:signArchives' because it
08 has no configured signatory
09
10 * Try:
11 Run with --stacktrace option to get the stack trace. Run with
12 --info or --debug option to get more log output.
13
14 BUILD FAILED
bug()用面命令進行編譯
1 ./gradlew releaseTarGzAll -x signArchives
候編譯功(編譯程現)編譯程我指定應Scala版本進行編譯:
1 ./gradlew -PscalaVersion=2.10.3 releaseTarGz -x signArchives
編譯完core/build/distributions/面kafka_2.10-0.8.1.1.tgz文件網載直接用
二、利用sbt進行編譯
我同用sbt編譯Kafka步驟:
01 # git clone
02 # cd kafka
03 # git checkout -b 0.8 remotes/origin/0.8
04 # ./sbt update
05 [info] [SUCCESSFUL ] org.eclipse.jdt#core;3.1.1!core.jar (2243ms)
06 [info] downloading ...
07 [info] [SUCCESSFUL ] ant#ant;1.6.5!ant.jar (1150ms)
08 [info] Done updating.
09 [info] Resolving org.apache.hadoop#hadoop-core;0.20.2 ...
10 [info] Done updating.
11 [info] Resolving com.yammer.metrics#metrics-annotation;2.2.0 ...
12 [info] Done updating.
13 [info] Resolving com.yammer.metrics#metrics-annotation;2.2.0 ...
14 [info] Done updating.
15 [success] Total time: 168 s, completed Jun 18, 2014 6:51:38 PM
16
17 # ./sbt package
18 [info] Set current project to Kafka (in build file:/export1/spark/kafka/)
19 Getting Scala 2.8.0 ...
20 :: retrieving :: org.scala-sbt#boot-scala
21 confs: [default]
22 3 artifacts copied, 0 already retrieved (14544kB/27ms)
23 [success] Total time: 1 s, completed Jun 18, 2014 6:52:37 PM
於Kafka 0.8及版本需要運行命令:
01 # ./sbt assembly-package-dependency
02 [info] Loading project definition from /export1/spark/kafka/project
03 [warn] Multiple resolvers having different access mechanism configured with
04 same name 'sbt-plugin-releases'. To avoid conflict, Remove plicate project
05 resolvers (`resolvers`) or rename publishing resolver (`publishTo`).
06 [info] Set current project to Kafka (in build file:/export1/spark/kafka/)
07 [warn] Credentials file /home/wyp/.m2/.credentials does not exist
08 [info] Including slf4j-api-1.7.2.jar
09 [info] Including metrics-annotation-2.2.0.jar
10 [info] Including scala-compiler.jar
11 [info] Including scala-library.jar
12 [info] Including slf4j-simple-1.6.4.jar
13 [info] Including metrics-core-2.2.0.jar
14 [info] Including snappy-java-1.0.4.1.jar
15 [info] Including zookeeper-3.3.4.jar
16 [info] Including log4j-1.2.15.jar
17 [info] Including zkclient-0.3.jar
18 [info] Including jopt-simple-3.2.jar
19 [warn] Merging 'META-INF/NOTICE' with strategy 'rename'
20 [warn] Merging 'org/xerial/snappy/native/README' with strategy 'rename'
21 [warn] Merging 'META-INF/maven/org.xerial.snappy/snappy-java/LICENSE'
22 with strategy 'rename'
23 [warn] Merging 'LICENSE.txt' with strategy 'rename'
24 [warn] Merging 'META-INF/LICENSE' with strategy 'rename'
25 [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard'
26 [warn] Strategy 'discard' was applied to a file
27 [warn] Strategy 'rename' was applied to 5 files
28 [success] Total time: 3 s, completed Jun 18, 2014 6:53:41 PM
我sbt面指定scala版本:
01 <!--
02 User: 往記憶
03 Date: 14-6-18
04 Time: 20:20
05 bolg:
06 本文址:/archives/1044
07 往記憶博客專注於hadoop、hive、spark、shark、flume技術博客量干貨
08 往記憶博客微信公共帳號:iteblog_hadoop
09 -->
10 sbt "++2.10.3 update"
11 sbt "++2.10.3 package"
12 sbt "++2.10.3 assembly-package-dependency"

熱點內容
javaapi源碼 發布:2024-04-25 07:51:15 瀏覽:605
怎麼在伺服器執行jmeter腳本 發布:2024-04-25 07:35:25 瀏覽:396
域名訪問https 發布:2024-04-25 07:16:56 瀏覽:414
javaie亂碼 發布:2024-04-25 07:07:15 瀏覽:602
php開發微信支付 發布:2024-04-25 06:57:38 瀏覽:317
上傳視頻最快 發布:2024-04-25 06:42:59 瀏覽:14
允許更新預編譯站點 發布:2024-04-25 06:32:53 瀏覽:679
如何獲取已經連上的網路密碼 發布:2024-04-25 06:27:48 瀏覽:466
python打開界面 發布:2024-04-25 06:27:44 瀏覽:667
java數組重復 發布:2024-04-25 06:27:40 瀏覽:827