2个,主线程和Sender线程。主线程负责创建消息,然后通过分区器、序列化器、拦截器作用之后缓存到累加器RecordAccumulator中。Sender线程负责将RecordAccumulator中消息发送到kafka中.
#
本文向大家介绍Kafka生产者客户端的整体结构是什么样子的?使用了几个线程来处理?分别是什么?相关面试题,主要包含被问及Kafka生产者客户端的整体结构是什么样子的?使用了几个线程来处理?分别是什么?时的应答技巧和注意事项,需要的朋友参考一下
我用的是。NET客户端(生产者和消费者)的Apache Kafka. 我试图增加可以发送的最大消息的大小。我阅读了配置手册和一些关于我的问题的帖子: Kafka:发送15MB信息 https://github.com/confluentinc/kafka-rest/issues/208 我看到应该为生产者设置属性“max.request.size”。因此,我做了以下工作: 但当我运行Produce
我是Vert.x的新手,对Kafka来说相对较新。 如何设置我的Vert.x KafkaProducer来导出Prometheus指标? 目前,我可以启用Prometheus度量,如<code>vertx_http_server_request_bytes_max</code>并通过Web服务器查看它们: 在使用Vert. x之前,我可以使用Apache KafkaProducer并绑定Kafka
假设我有两个经纪人。 我读到Kafka制作人创建的制作人线程等于经纪人的数量。在这种情况下,我将有两个内部线程。 假设我有5个主题,每秒只收到200条消息。Kafka如何进行批处理? 一批大小=30条消息。[topic1=5,topic2=10,topic3=3,topic4=10,topic5=2消息]这些是最重要的消息和相应的主题。 Kafka是如何执行批处理的?
我有一个循环缓冲区(数组/先进先出),一个消费者和一个生产者。生产者将随机数放入数组中,消费者获取第一个数字并检查它是否是相对素数。 我的代码工作正常,我认为它工作正常,但我想改进它。我不太确定我的“空运行”方法。我应该在其他地方做异常处理吗?改变“无限循环”?不应更改方法签名(它们是预定义的)。 我会很高兴每一个改进代码的建议。(不在乎知名度(公开,...),还有静态的东西,我刚刚把它们放在一个
我发现maven repo中有几个Kafka。 阿帕奇的maven回购协议中有两个Kafka。https://mvnrepository.com/artifact/org.apache.kafka/kafka https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients 它们都可以从kafka服务器生成Mesg并消耗msg。 我