當前位置:首頁 » 密碼管理 » python訪問kafka

python訪問kafka

發布時間: 2022-07-05 01:26:49

Ⅰ 如何利用pykafka遠程消費 zookeeper+kafka集群 python腳本

#從kafka消費
#consumer_area = topic_area.get_simple_consumer(auto_offset_reset=OffsetType.LATEST)

#從ZOOKEEPER消費
consumer_area = topic_area.get_balanced_consumer(
consumer_group=b'zs_download_04', # 自己命令
auto_offset_reset=OffsetType.LATEST,#在consumer_group存在的情況下,設置此變數,表示從最新的開始取
#auto_offset_reset=OffsetType.EARLIEST,
#reset_offset_on_start=True,
auto_commit_enable=True,
#auto_commit_interval_ms=1,
zookeeper_connect=ZK_LIST
)

Ⅱ 如何在kafka-python和confluent-kafka之間做出選擇

kafka-python:蠻荒的西部
kafka-python是最受歡迎的Kafka Python客戶端。我們過去使用時從未出現過任何問題,在我的《敏捷數據科學2.0》一書中我也用過它。然而在最近這個項目中,它卻出現了一個嚴重的問題。我們發現,當以文檔化的方式使用KafkaConsumer、Consumer迭代式地從消息隊列中獲取消息時,最終到達主題topic的由Consumer攜帶的消息通常會丟失。我們通過控制台Consumer的分析驗證了這一點。
需要更詳細說明的是,kafka-python和KafkaConsumer是與一個由SSL保護的Kafka服務(如Aiven Kafka)一同使用的,如下面這樣:
kafka_consumer = KafkaConsumer(
topic,
enable_auto_commit=True,
group_id=group_id,
bootstrap_servers=config.kafka.host,
api_version=(0, 10),
security_protocol='SSL',
ssl_check_hostname=True,
ssl_cafile=config.kafka.ca_pem,
ssl_certfile=config.kafka.service_cert,
ssl_keyfile=config.kafka.service_key
)

for message in kafka_consumer:
application_message = json.loads(message.value.decode())
...

當以這樣的推薦方式使用時,KafkaConsumer會丟失消息。但有一個變通方案,就是保留所有消息。這個方案是Kafka服務提供商Aiven support提供給我們的。它看起來像這樣:
while True:
raw_messages = consumer.poll(timeout_ms=1000, max_records=5000)
for topic_partition, messages in raw_messages.items():
application_message = json.loads(message.value.decode())
...

雖然這個變通方案可能有用,但README中的方法會丟棄消息使我對其失去興趣。所以我找到了一個替代方案。
confluent-kafka:企業支持
發現coufluent-kafka Python模塊時,我感到無比驚喜。它既能做librdkafka的外封裝,又非常小巧。librdkafka是一個用C語言寫的kafka庫,它是Go和.NET的基礎。更重要的是,它由Confluent公司支持。我愛開源,但是當「由非正式社區擁有或支持」這種方式效果不行的時候,或許該考慮給替代方案印上公章、即該由某個公司擁有或支持了。不過,我們並未購買商業支持。我們知道有人會維護這個庫的軟體質量,而且可以選擇買或不買商業支持,這一點真是太棒了。
用confluent-kafka替換kafka-python非常簡單。confluent-kafka使用poll方法,它類似於上面提到的訪問kafka-python的變通方案。
kafka_consumer = Consumer(
{
"api.version.request": True,
"enable.auto.commit": True,
"group.id": group_id,
"bootstrap.servers": config.kafka.host,
"security.protocol": "ssl",
"ssl.ca.location": config.kafka.ca_pem,
"ssl.certificate.location": config.kafka.service_cert,
"ssl.key.location": config.kafka.service_key,
"default.topic.config": {"auto.offset.reset": "smallest"}
}
)
consumer.subscribe([topic])
# Now loop on the consumer to read messages
running = True
while running:
message = kafka_consumer.poll()
application_message = json.load(message.value.decode())

kafka_consumer.close()

現在我們能收到所有消息了。我並不是說kafka-python工具不好,我相信社區會對它的問題做出反應並解決。但從現在開始,我會一直堅持使用confluent-kafka。
開源治理
開源是強大的,但是涉及到復雜的「大數據」和NoSQL工具時,通常需要有一家大公司在背後推動工具的開發。這樣你就知道,如果那個公司可以使用工具,那麼該工具應該擁有很好的基本功能。它的出現可能是非正式的,就像某公司發布類似FOSS的項目一樣,但也可能是正式的,就像某公司為工具提供商業支持一樣。當然,從另一個角度來看,如果一家與開源社區作對的公司負責開發某個工具,你便失去了控制權。你的意見可能無關緊要,除非你是付費客戶。
理想情況是採取開源治理,就像Apache基金會一樣,還有就是增加可用的商業支持選項。這對互聯網上大部分的免費軟體來說根本不可能。限制自己只使用那些公司蓋章批准後的工具將非常限制你的自由。這對於一些商店可能是正確選擇,但對於我們不是。我喜歡工具測試,如果工具很小,而且只專心做一件事,我就會使用它。
信任開源
對於更大型的工具,以上決策評估過程更為復雜。通常,我會看一下提交問題和貢獻者的數量,以及最後一次commit的日期。我可能會問朋友某個工具的情況,有時也會在推特上問。當你進行嗅探檢查後從Github選擇了一個項目,即說明你信任社區可以產出好的工具。對於大多數工具來說,這是沒問題的。
但信任社區可能存在問題。對於某個特定的工具,可能並沒有充分的理由讓你信任社區可以產出好的軟體。社區在目標、經驗和開源項目的投入時間方面各不相同。選擇工具時保持審慎態度十分重要,不要讓理想蒙蔽了判斷。

Ⅲ kafka python topic 多少數據

您好,希望以下回答能幫助您 我只想說還是換個系統比較好,win7裝tornado特別容易出問題,XP就可以 如您還有疑問可繼續追問。

Ⅳ 如何使用python 連接kafka 並獲取數據

連接 kafka 的庫有兩種類型,一種是直接連接 kafka 的,存儲 offset 的事情要自己在客戶端完成。還有一種是先連接 zookeeper 然後再通過 zookeeper 獲取 kafka 的 brokers 信息, offset 存放在 zookeeper 上面,由 zookeeper 來協調。
我現在使用 samsa 這個 highlevel 庫
Procer示例
from kazoo.client import KazooClientfrom samsa.cluster import Clusterzookeeper = KazooClient()zookeeper.start()cluster = Cluster(zookeeper)topic = cluster.topics['topicname']topic.publish('msg')

** Consumer示例 **
from kazoo.client import KazooClientfrom samsa.cluster import Clusterzookeeper = KazooClient()zookeeper.start()cluster = Cluster(zookeeper)topic = cluster.topics['topicname']consumer = topic.subscribe('groupname')for msg in consumer:
print msg

Tip
consumer 必需在 procer 向 kafka 的 topic 裡面提交數據後才能連接,否則會出錯。
在 Kafka 中一個 consumer 需要指定 groupname , groue 中保存著 offset 等信息,新開啟一個 group 會從 offset 0 的位置重新開始獲取日誌。
kafka 的配置參數中有個 partition ,默認是 1 ,這個會對數據進行分區,如果多個 consumer 想連接同個 group 就必需要增加 partition , partition 只能大於 consumer 的數量,否則多出來的 consumer 將無法獲取到數據。

Ⅳ confluent kafka python怎麼實時獲取數據

使用kafkapython讀取實時數據小例子 使用kafkapython讀取實時數據小例子 from kafka import KafkaConsumer from kafka.client import KafkaClient imp

熱點內容
sql應用領域 發布:2024-04-19 18:42:56 瀏覽:36
訪問外網伺服器加速軟體 發布:2024-04-19 17:48:45 瀏覽:696
加密軟體對比 發布:2024-04-19 17:27:05 瀏覽:367
保密管理系統怎麼連接伺服器 發布:2024-04-19 17:26:59 瀏覽:18
廣州社保卡密碼激活在哪裡辦 發布:2024-04-19 17:21:18 瀏覽:368
編譯器和操作系統有關系嗎 發布:2024-04-19 17:20:28 瀏覽:274
數學公式編譯器下載 發布:2024-04-19 17:02:52 瀏覽:987
網頁無法緩存視頻 發布:2024-04-19 16:56:44 瀏覽:615
演算法紅 發布:2024-04-19 16:44:42 瀏覽:625
海量數據存儲與處理 發布:2024-04-19 16:33:46 瀏覽:542