kafka存儲圖片
『壹』 深入理解kafka(五)日誌存儲
5.1文件目錄布局
根目錄下有以下5個checkpoint文件: cleaner-offset-checkpoint, log-start-offset-checkpoint, meta.properties, recovery-point-offset-checkpoint, replication-offset-checkpoint
分區目錄下有以下目錄: 0000xxx.index(偏移量為64位長整形,長度固定為20位), 0000xxx.log, 0000xxx.timeindex.
還有可能包含.deleted .cleaned .swap等臨時文件, 以及可能的.snapshot .txnindex leader-epoch-checkpoint
5.2日誌格式演變
5.2.1 v0版本
kafka0.10.0之前
RECORD_OVERHEAD包括offset(8B)和message size(4B)
RECORD包括:
crc32(4B):crc32校驗值
magic(1B):消息版本號0
attributes(1B):消息屬性。低3位表示壓縮類型:0-NONE 1-GZIP 2-SNAPPY 3-LZ4(0.9.x引入)
key length(4B):表示消息的key的長度。-1代表null
key: 可選
value length(4B):實際消息體的長度。-1代表null
value: 消息體。可以為空,如墓碑消息
5.2.2 v1版本
kafka0.10.0-0.11.0
比v0多了timestamp(8B)欄位,表示消息的時間戳
attributes的第4位也被利用起來,0表示timestamp的類型為CreateTime,1表示timestamp的類型為LogAppendTime
timestamp類型由broker端參數log.message.timestamp.type來配置,默認為CreateTime,即採用生產者創建的時間戳
5.2.3 消息壓縮
保證端到端的壓縮,服務端配置compression.type,默認為"procer",表示保留生產者使用的壓縮方式,還可以配置為"gzip","snappy","lz4"
多條消息壓縮至value欄位,以提高壓縮率
5.2.4 變長欄位
變長整形(Varints):每一個位元組都有一個位於最高位的msb位(most significant bit),除了最後一個位元組為1,其餘都為0,位元組倒序排列
為了使編碼更加高效,Varints使用ZigZag編碼:sint32對應 (n<<1)^(n>>31) sint64對應 (n<<1)^(n>>63)
5.2.5 v2版本
Record Batch
first offset:
length:
partition leader epoch:
magic:固定為2
attributes:兩個位元組。低3位表示壓縮格式,第4位表示時間戳類型,第5位表示事務(0-非事務1-事務),第6位控制消息(0-非控制1控制)
first timestamp:
max timestamp:
procer id:
procer epoch:
first sequence:
records count:
v2版本的消息去掉了crc欄位,另外增加了length(消息總長度)、timestamp delta(時間戳增量)、offset delta(位移增量)和headers信息,並且棄用了attributes
Record
length:
attributes:棄用,但仍占據1B
timestamp delta:
offset delta:
headers:
5.3日誌索引
稀疏索引(sparse index):每當寫入一定量(broker端參數log.index.interval.bytes指定,默認為4096B),偏移量索引文件和時間索引文件分別對應一個索引項
日誌段切分策略:
1.大小超過broker端參數log.segment.bytes配置的值,默認為1073741824(1GB)
2.當前日誌段消息的最大時間戳與當前系統的時間戳差值大於log.roll.ms或者log.roll.hours,ms優先順序高,默認log.roll.hours=168(7天)
3.索引文件或者時間戳索引文件的大小大於log.index.size.max.bytes配置的值,默認為10485760(10MB)
4.偏移量差值(offset-baseOffset)>Integer.MAX_VALUE
5.3.1 偏移量索引
每個索引項佔用8個位元組,分為兩個部分:1.relativeOffset相對偏移量(4B) 2.position物理地址(4B)
使用kafka-mp-log.sh腳本來解析.index文件(包括.timeindex、.snapshot、.txnindex等文件),如下:
bin/kafka-mp-log.sh --files /tmp/kafka-logs/topicId-0/00……00.index
如果broker端參數log.index.size.max.bytes不是8的倍數,內部會自動轉換為8的倍數
5.3.2 時間戳索引
每個索引項佔用12個位元組,分為兩個部分:1.timestamp當前日誌分段的最大時間戳(12B) 2.relativeOffset時間戳對應的相對偏移量(4B)
如果broker端參數log.index.size.max.bytes不是12的倍數,內部會自動轉換為12的倍數
5.4日誌清理
日誌清理策略可以控制到主題級別
5.4.1 日誌刪除
broker端參數log.cleanup.policy設置為delete(默認為delete)
檢測周期broker端參數log.retention.check.interval.ms=300000(默認5分鍾)
1.基於時間
broker端參數log.retention.hours,log.retention.minutes,log.retention.ms,優先順序ms>minutes>hours
刪除時先增加.delete後綴,延遲刪除根據file.delete.delay.ms(默認60000)配置
2.基於日誌大小
日誌總大小為broker端參數log.retention.bytes(默認為-1,表示無窮大)
日誌段大小為broker端參數log.segment.bytes(默認為1073741824,1GB)
3.基於日誌起始偏移量
DeleteRecordRequest請求
1.KafkaAdminClient的deleteRecord()
2.kafka-delete-record.sh腳本
5.4.2 日誌壓縮
broker端參數log.cleanup.policy設置為compact,且log.cleaner.enable設置為true(默認為true)
5.5磁碟存儲
相關測試:一個由6塊7200r/min的RAID-5陣列組成的磁碟簇的線性寫入600MB/s,隨機寫入100KB/s,隨機內存寫入400MB/s,線性內存3.6GB/s
5.5.1 頁緩存
linux操作系統的vm.dirty_background_ratio參數用來指定臟頁數量達到系統的百分比之後就觸發pdflush/flush/kdmflush,一般小於10,不建議為0
vm.dirty_ratio表示臟頁百分比之後刷盤,但是阻塞新IO請求
kafka同樣提供同步刷盤及間斷性強制刷盤(fsync)功能,可以通過log.flush.interval.messages、log.flush.interval.ms等參數來控制
kafka不建議使用swap分區,vm.swappiness參數上限為100,下限為0,建議設置為1
5.5.2 磁碟I/O流程
一般磁碟IO的場景有以下4種:
1.用戶調用標准C庫進行IO操作,數據流為:應用程序Buffer->C庫標准IOBuffer->文件系統也緩存->通過具體文件系統到磁碟
2.用戶調用文件IO,數據流為:應用程序Buffer->文件系統也緩存->通過具體文件系統到磁碟
3.用戶打開文件時使用O_DIRECT,繞過頁緩存直接讀寫磁碟
4.用戶使用類似dd工具,並使用direct參數,繞過系統cache與文件系統直接讀寫磁碟
Linux系統中IO調度策略有4種:
1.NOOP:no operation
2.CFQ
3.DEADLINE
4.ANTICIPATORY
5.5.3 零拷貝
指數據直接從磁碟文件復制到網卡設備中,不需要經應用程序
對linux而言依賴於底層的sendfile()
對java而言,FileChannal.transferTo()的底層實現就是sendfile()
『貳』 kafka的原理是什麼
在Kafka中的每一條消息都有一個topic。一般來說在我們應用中產生不同類型的數據,都可以設置不同的主題。一個主題一般會有多個消息的訂閱者,當生產者發布消息到某個主題時,訂閱了這個主題的消費者都可以接收到生產者寫入的新消息。
kafka為每個主題維護了分布式的分區(partition)日誌文件,每個partition在kafka存儲層面是append log。
任何發布到此partition的消息都會被追加到log文件的尾部,在分區中的每條消息都會按照時間順序分配到一個單調遞增的順序編號,也就是我們的offset,offset是一個long型的數字,通過這個offset可以確定一條在該partition下的唯一消息。在partition下面是保證了有序性,但是在topic下面沒有保證有序性。
(2)kafka存儲圖片擴展閱讀
procer選擇一個topic,生產消息,消息會通過分配策略append到某個partition末尾。
consumer選擇一個topic,通過id指定從哪個位置開始消費消息。消費完成之後保留id,下次可以從這個位置開始繼續消費,也可以從其他任意位置開始消費。
保證了消息不變性,為並發消費提供了線程安全的保證。每個 consumer都保留自己的offset,互相之間不幹擾,不存在線程安全問題。
消息訪問的並行高效性。每個topic中的消息被組織成多個partition,partition均勻分配到集群server中。生產、消費消息的時候,會被路由到指定partition,減少競爭,增加了程序的並行能力。
『叄』 怎麼設置kafka topic數據存儲時間
1、Kafka創建topic命令很簡單,一條命令足矣:bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test 。