我的使用者绑定到匿名使用者组,而不是我指定的使用者组。
spring:
cloud:
stream:
html" target="_blank">kafka:
binder:
brokers: localhost
defaultBrokerPort: 9092
zkNodes: localhost
defaultZkPort: 2181
bindings:
inEvent:
group: eventin
destination: event
outEvent:
group: eventout
destination: processevent
我的春靴应用
@SpringBootApplication
@EnableBinding(EventStream.class)
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
@StreamListener(value = "inEvent")
public void getEvent(Event event){
System.out.println(event.name);
}
}
我的输入输出通道接口
public interface EventStream {
@Input("inEvent")
SubscribableChannel inEvent();
@Output("outEvent")
MessageChannel outEvent();
}
我的控制台日志--
:在3.233秒内启动ConsumerApplication(JVM运行于4.004):[使用者Clientid=Consumer-3,Groupid=Anonymous.0D0C87D6-EF39-4BFE-B475-4491C40CAF6D]发现组协调器Singh:9092(ID:2147483647 RACK:null):[使用者Clientid=Consumer-3,Groupid=Anonymous.0D0C87D6-EF39-4BFE-B475-4491C40CAF6D]撤消以前分配的分区[]:撤消的分区:[]:[使用者Clientid=Consumer-3,Groupid=Anonymous.[使用者Clientid=Consumer-3,Groupid=Anonymous.0D0C87D6-EF39-4BFE-B475-4491C40CAF6D]成功加入具有第1代的组:[使用者Clientid=Consumer-3,Groupid=Anonymous.0D0C87D6-EF39-4BFE-B475-4491C40CAF6D]设置新分配的分区[inEvent-0]:[使用者Client=Consumer-3,Groupid=Anonymous.0D0C87D6-EF39-4BFE-B475-4491C40CAF6D]将分区inEvent-0的偏移量重置为:分配的分区:[inEvent-0]
group
属性不能在Kafka
树中。必须是这样的:
我的使用者绑定到匿名使用者组,而不是我指定的使用者组。
spring:
cloud:
stream:
bindings:
inEvent:
group: eventin
destination: event
请参阅文档中的更多信息:http://cloud.spring.io/spring-cloud-static/spring-cloud-stream/2.1.1.release/single/spring-cloud-stream.html#consumer-groups
组
是一个公共属性,因此它与绑定器实现无关。Kafka
是针对Apache Kafka特定的属性,在绑定器实现级别上公开。
我正试图按照GitHub的建议设置测试 其中StreamProcessor设置为 -->line从不使用在我看来应该在主题“output”上的消息,因为@StreamProcessor有@Sendto(“output”) 我希望能够测试流处理的消息。
我试图改变生产者和消费者配置的顺序,但没有帮助。 编辑:我已经添加了完整的application.yml。当我第一次引导服务时,这个主题在Kafka中是不存在的。 它感觉在生产者和消费者配置之间有冲突,我认为它说有3个分区的原因是消费者中的并发性是3,所以它首先创建有3个分区的主题,然后当它移动到生产者配置时,它不调整分区计数。
当我只打开一次处理时,我会得到以下错误。注意:我们的应用程序非常安全,我们只允许Kafka用户和消费者访问他们明确需要的资源。 只有一次处理kafka流是否在所有流任务中使用每个流任务的消费者组而不是消费者组?
我是Kafka的新手,正在学习Kafka内部知识。请根据需要随时更正我的理解。。 这是我的实时场景..感谢所有的回复: 我有一个接收数据文件的实时FTP服务器…比如索赔文件。 我将把这些数据发布到一个主题中.让我们把这个主题称为claims_topic(2个分区). 我需要订阅这个claims_topic,阅读消息并将它们写入Oracle和Postgres表。让我们将oracle表称为Otable
我在使用Kafka时遇到了一些问题。非常感谢任何帮助!我在docker swell中分别有zookeeper和kafka集群3个节点。您可以在下面看到Kafka代理配置。 我的情况: < li > 20x位制片人不断向Kafka主题传达信息 < li>1x消费者读取和记录消息 < li >终止kafka节点(docker容器停止),因此现在群集有2个Kafka代理节点(第3个节点将自动启动并加入群
我们希望获得Kafka消费群体指标(例如,节流和字节率)。 我们已经使用以下工具完成了此操作: Kafka消费者Java应用程序的JMX Mbean CLI实用程序: bin/kafka-consumer-groups.sh--描述--组group_name--bootstrap-serverlocalhost: port . 问题:这可以通过使用一些Java库以编程方式完成吗? 到目前为止,我们