python 实时获取kafka消费队列信息示例详解
作者:xiaoming0018
这篇文章主要介绍了python实时获取kafka消费队列信息,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
安装 pykafka
pip install pykafka
一、消费kafka消息
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pykafka import KafkaClient
from pykafka.common import OffsetType
from vpn_data_handler import handler_data
bootstrap_servers = '10.*.**.**:9092'
group_id = 'test1'
class KConsumer(object):
"""kafka 消费者; 动态传参,非配置文件传入;
kafka 的消费者应该尽量和生产者保持在不同的节点上;否则容易将程序陷入死循环中;
"""
_encode = "UTF-8"
def __init__(self, topics, bootstrap_server=None, group_id=group_id, partitions=None):
""" 初始化kafka的消费者;
1. 设置默认 kafka 的主题, 节点地址, 消费者组 id(不传入的时候使用默认的值)
2. 当需要设置特定参数的时候可以直接在 kwargs 直接传入,进行解包传入原始函数;
Args:
topics: str; kafka 的消费主题;
bootstrap_server: list; kafka 的消费者地址;
group_id: str; kafka 的消费者分组 id,默认是 start_task 主要是接收并启动任务的消费者,仅此一个消费者组id;
"""
if bootstrap_server is None:
bootstrap_server = bootstrap_servers
self.client = KafkaClient(hosts=bootstrap_server)
# 选择要消费的topic
vpn_topic = self.client.topics[topics]
self.consumer = vpn_topic.get_simple_consumer(consumer_group=group_id,
consumer_timeout_ms=200,
auto_commit_enable=True,# 自动提交偏移量
auto_offset_reset=OffsetType.LATEST) #LATEST 获取当前偏移量最新消息 EARLIEST从头开始获取信息
def recv(self):
"""
接收消费中的数据
Returns:
"""
return self.consumer
def main():
"""
kafka消费队列入口
:param topic:
:return:
"""
obj = KConsumer(topics="topics_name")
while True:
for message in obj.recv():
data = eval(message.value.decode('utf-8'))
handler_data(data)
if __name__ == '__main__':
main()二、生产者推送消息
#!/usr/bin/python
# -*- coding:utf-8 -*-
from pykafka import KafkaClient
client = KafkaClient(hosts="10.XX0.XX0.XX4:9092") # 可接受多个client
# 查看所有的topic
# print(client.topics)
topic = client.topics['test_78'] # 选择一个topic
message = "test message2 test message2"
with topic.get_sync_producer() as producer:
producer.produce(bytes(message, encoding='utf8')) #python3需要编码
print(message)到此这篇关于python 实时获取kafka消费队列信息的文章就介绍到这了,更多相关python kafka消费队列信息内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
