我对Kafka相对来说是新的,我试图在主题上发送消息后产生消费者。 单个生产者在不同的分区上发送200个msg。 我多次运行消费者脚本。
我在使用时遇到了困难,无法从开头或其他任何显式偏移量读取它。 为同一主题的使用者运行命令行工具,我确实看到带有选项的消息,否则它将挂起 我使用的是kafka-python 0.9.5,而代理运行的是Kafka8.2。不确定确切的问题是什么。 按照dpkp的建议设置_group_id=none_以模拟控制台使用者的行为。
我正在使用@Kafkalistener注释编写一个kafka使用者,我知道有一种方法可以使用ConcurrentKafkaListenerContainerFactory中的方法来增加来自不同分区的并发kafka使用者的数量 用于setconcurrency的Javadoc如下所示:- 运行的并发KafkaMessageListenerContainer的最大数目。来自同一分区内的消息将按顺序处理
为了使用Kafka通用地发布消息,我使用类名作为主题: 服务器属性(我从默认属性中唯一更改的内容): 注意:我还尝试了以下用户设置:
当我向主题“Test19”发送任何消息时,配置的ServiceActivator“ProcessMessage”方法将两条消息显示为配置的两个客户,但这里的问题是,在添加到消费者上下文之前,我需要为每个客户加载入站配置文件…否则,我只能在控制台中得到一条消息…是正确的方式还是我需要在这里改变什么? 谢了。
给定以下代码: 如果kafka消费者是Vert. x Kafka消费者,我希望 会发生在Reactive IO线程上。但是,它在Vert. x事件循环线程上执行。当我运行以下测试类时,相同的场景按照预期在IO线程上运行map方法。 是什么导致线程执行中出现这种差异?
我正在使用Apache Beam的kafkaIO阅读一个主题,该主题在Confluent schema Registry中有一个avro模式。我可以反序列化消息并写入文件。但最终我想写给BigQuery。我的管道无法推断架构。我如何提取/推断模式并将其附加到管道中的数据,以便我的下游进程(写入BigQuery)能够推断模式? 下面是我使用模式注册表url设置反序列化器的代码,以及我从Kafka读到
我试图使用Java API将发送到,因为从开始,他们建议使用Java API而不是Scala API来获得更好的性能。 http://kafka.apache.org/090/documentation.html#ProducerAPI 我的XML文件有大约,我可以使用什么XML API来读取XML文件并转换为字符串,然后将该字符串发送到Kafka主题。 或者,我可以将XML转换为数据并将其发送给
我正在尝试阅读使用apache Beam上的KafkaIO的多个kafka代理。偏移量管理的默认选项是kafka分区本身(不再使用来自kafka>0.9的zookeper)。使用此设置,当我重新启动作业/管道时,会出现重复和丢失记录的问题。 从我读到的内容来看,处理这一点的最佳方法是管理对外部数据存储的偏移量。用当前版本的apache beam和Kafkaio是否可以做到这一点?我现在使用的是2.
我使用的是Azure HDInsight的托管Apache Kafka解决方案,因为不幸的是Azure上没有托管汇流Kafka解决方案。是否可以运行汇合模式注册表并将其连接到HDInsight Apache Kafka集群的代理? 我希望只在单个VM上安装模式注册表,然后使用schema-registry.properties文件中的这一行,将其指向HDInsight集群的代理列表: kafkas
执行kafka客户端的生产者/消费者连接池有意义吗? kafka是否在内部维护已初始化并准备好使用的连接对象列表? 我们希望最小化连接创建的时间,这样在发送/接收消息时就不会有额外的开销。 目前,我们正在使用apache共享池库来保持连接。 任何帮助都将不胜感激。
我有一个简单的spring boot服务,它是按需调用的,并且从主题中消耗指定数量的消息。要使用的消息数作为参数传递。服务每隔30分钟呼叫一次。每个消息大小约为1.6KB。我每次都能收到1100或1200条信息。只有一个分区的主题,REST服务是唯一的使用者。以下是该服务的名称:http://example.com/messages?limit=2000 这是我在application.yml中的
使用kafka版本2.11-0.11.0.3发布10,000条消息(所有消息的总大小为10MB),将有2个消费者(具有相同的group-id)作为并行处理来消费该消息。在消费过程中,两个消费者都消费了相同的消息。 以下错误/警告是Kafka抛出的 警告:此成员将离开组,因为使用者轮询超时已过期。这意味着对poll()的后续调用之间的时间长于配置的max.poll.interval.ms,这通常意味