当前位置: 首页 > 知识库问答 >
问题:

ConFluent Kafka Python库为批量msg配置生产者

颛孙安康
2023-03-14

我需要设置Kafka生产商发送500 msg在一批不是由味精味精,但批量进口味精。我查过了https://github.com/dpkp/kafka-python/issues/479并尝试了制作人。发送消息(主题,*消息)但失败并出现错误:

>       producer.send_messages('event_connector_mt', *load_entries)
E       AttributeError: 'cimpl.Producer' object has no attribute 'send_messages'

我也试着像《代码》制作人一样通过考试。制作(主题,*消息)失败:

       producer.produce('event_connector_mt', *load_entries)
E       TypeError: function takes at most 8 arguments (501 given)

因此,我挖掘了更多信息,发现我必须在producer配置中将类型设置为async和batch。大小要大于默认值,但当我尝试配置时,如:

from confluent_kafka import Consumer, Producer       

producer = Producer(**{'bootstrap.servers': KAFKA_BROKERS,
                       'queue.buffering.max.messages': 1000000,
                       'batch.num.messages': 500,
                       'batch.size': 19999,
                       'producer.type': 'async'
                       })

失败:

E       KafkaException: KafkaError{code=_INVALID_ARG,val=-186,str="No such configuration property: "producer.type""}

批处理也有同样的错误。你能告诉我在哪里以及如何设置异步和批量大小,或者以任何其他方式将批量MSG传递给Kafka 0.9.3.1吗

共有2个答案

徐昕
2023-03-14

你正在混淆名为“汇合-Kafka-蟒蛇”的汇合蟒蛇客户端

http://docs.confluent.io/3.2.1/clients/confluent-kafka-python/

使用“Kafkapython”客户端

https://github.com/dpkp/kafka-python

这是两个具有不同API的不同客户端。

鄂伟兆
2023-03-14

默认情况下,所有生成器都是异步的。Producer.type和batch.size不支持底层库的配置。

因此,请使用可用配置批处理。num.messages或message。最大字节数。

 类似资料:
  • 我使用的是Kafka producer客户端,我的项目中没有任何log4j配置。 在运行时,程序打印了大量的Kafka调试日志,这是我不想要的。

  • 只要遵循默认设置,如http、localhost和8200,存储库endpoint和tls_disable=1关闭SSL,一切都可以正常工作。然而,对于任何实际环境来说,这些都不是实用的设置,而且很少有任何地方的例子对此有所帮助。有人能提供一个可行的例子吗? 我已经成功地设置了启用TLS的保险库。我已经成功地设置了一个配置服务器,该服务器使用自签名证书进行连接。我甚至可以将一个秘密值注入配置服务器

  • 考虑到以下同步Kafka制作人 请帮助我理解请求之间的区别。暂停。ms和max block。ms producer配置。是否包括所有重试的最长时间?还是每次重试都有自己的超时?

  • 问题内容: 我将docker用于laravel项目的开发和生产。我用于开发和生产的dockerfile略有不同。例如,我在开发环境中将本地目录安装到docker容器中,因此我不需要为代码中的每个更改进行docker构建。 由于挂载目录仅在运行docker容器时可用,因此无法在开发文件中将诸如“ composer install”或“ npm install”之类的命令放入dockerfile中。

  • 我使用docker为laravel项目开发和生产。我有一个稍微不同的dockerfile用于开发和生产。例如,我在开发环境中将本地目录装载到docker容器中,这样我就不需要为代码中的每个更改进行docker构建。 由于挂载目录只有在运行docker容器时才可用,因此我无法将“composer install”或“npm install”之类的命令放入dockerfile中进行开发。 目前,我正在

  • 在Spring批处理分区中,PartitionHandler的与分区器返回的ExecutionContexts数量之间的关系有点混乱。例如,MultiResourcePartitioner声明它忽略gridSize,但是文档没有解释何时/为什么可以这样做。 例如,假设我有一个,我希望在不同的并行步骤中重用它,并且我将它的大小设置为20。如果我使用网格大小为5的TaskExecutorPartiti