当前位置: 首页 > 编程笔记 >

python3连接kafka模块pykafka生产者简单封装代码

严狐若
2023-03-14
本文向大家介绍python3连接kafka模块pykafka生产者简单封装代码,包括了python3连接kafka模块pykafka生产者简单封装代码的使用技巧和注意事项,需要的朋友参考一下

1.1安装模块

pip install pykafka

1.2基本使用

# -* coding:utf8 *- 
from pykafka import KafkaClient 
host = 'IP:9092, IP:9092, IP:9092'
client = KafkaClient(hosts = host) 
# 生产者 
topicdocu = client.topics['my-topic'] 
producer = topicdocu.get_producer() 
for i in range(100): 
 print i 
 producer.produce('test message ' + str(i ** 2)) 
producer.stop()

1.3简单封装

class KafkaProduct():

 def __init__(self,hosts,topic):
  """
  初始化实例
  :param hosts: 连接地址
  :param topic:
  """
  self.__client = KafkaClient(hosts=hosts)
  self.__topic = self.__client.topics[topic.encode()]

 def __set_topic(self, topic):
  self.__topic = self.__client.topics[topic.encode()]

 def set_topic(self, topic):
  """
  设置topic
  :param topic:
  :return:
  """
  self.__set_topic(topic)

 def get_topics(self):
  """
  获取当前所有topic
  :return:
  """
  return self.__client.topics

 def get_topic(self):
  """
  获取当前topic
  :return:
  """
  return self.__topic

 def Producer(self):
  """
  生产者对象
  :return:
  """
  with self.__topic.get_producer(delivery_reports=True) as producer:
   next_data = ''
   while True:
    if next_data:
     producer.produce(str(next_data).encode())
    next_data = yield True

 def send_data(self,datas):
  """
  发送数据
  :param datas:需要传入的可迭代对象
  :return:
  """
  c = self.Producer()
  next(c)
  for i in datas:
   c.send(i)

if __name__ == '__main__':

hosts = "1.2.3.4:9999,2.3.4.5:9090" #连接hosts
topic = "test_523"
K = KafkaProduct(hosts=hosts, topic=topic) #
#K.set_topic("test") #切换设置新的topic
K.get_topic() #获取当前设置的topic
#K.get_topics() #获取所有topic
data = range(10000) #要发送的可迭代对象
K.send_data(data)

以上这篇python3连接kafka模块pykafka生产者简单封装代码就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持小牛知识库。

 类似资料:
  • 主要内容:KafkaProducer API,生产者API,配置设置,SimpleProducer应用程序,简单的消费者实例,SimpleConsumer应用程序在这一节中将创建一个使用Java客户端发布和使用消息的应用程序。 Kafka生产者客户端由以下API组成。 KafkaProducer API 下面来了解Kafka生产者API。 KafkaProducer API的核心部分是类。 类提供了一个选项,用于将Kafka代理的构造函数与以下方法连接起来。 类提供方法来异步发送消息到主题。 的

  • 我有一组Kafka代理实例作为集群运行。我有一个客户正在生产数据给Kafka: 当我们使用tcpdump进行监控时,我可以看到只有到broker1和broker2的连接被建立,而对于broker3,没有来自我的生产者的连接。我有一个只有一个分区的单一主题。 我的问题是: > 为什么在我的情况下,我无法连接到broker3?或者至少我的网络监控没有显示我的制作人与broker3建立了连接? 如果我能

  • kafka-python(1.0.0)在连接到代理时抛出错误。同时 /usr/bin/kafka-console-producer和 /usr/bin/kafka-console-consumer正常工作。 Python应用程序过去也运行良好,但是在动物园管理员重新启动后,它不再能够连接。 我使用文档中的裸露骨骼示例: 我收到这个错误: 单步通过( /usr/lib/python2.6/site-

  • 我们目前在HDF(Hortonworks Dataflow)3.3.1上,它捆绑了Kafka 2.0.0,并且正在尝试使用分布式模式下的Kafka Connect,以推出一个Google Cloud PubSub接收器连接器。我们正在计划将一些元数据发回到Kafka主题中,并且需要将一个Kafka生产者集成到Sink任务Java代码的flush()函数中。 这是否会对Kafka Connect向K

  • 模块的定义 模块是自动运行在严格模式下并且没有办法退出运行的JavaScript代码。 模块可以是函数、数据、类,需要指定导出的模块名,才能被其他模块访问。 //数据模块 const obj = {a: 1} //函数模块 const sum = (a, b) => { return a + b } //类模块 class My

  • 我为Kafka建立了一个docker形象(Wurstmeister/Kafka-Docker)。在docker容器中,我可以使用内置的shell脚本创建主题、生成消息和使用消息。现在,我使用https://github.com/mapr-demos/kafka-sample-programs托管的代码从我的主机连接到kafka broker。在构建和运行程序之后,什么都没有发生,程序就会堆积起来。