我正在与Kafka和阿帕奇·Flink合作。我正在尝试使用apache Flink中的一个kafka主题中的记录(这些记录是avro格式的)。下面是我正在尝试的一段代码。 使用自定义反序列化器来反序列化主题中的avro记录。 我发送到主题“test-topic”的数据的Avro模式如下所示。 我正在使用的自定义反序列化器如下所示。 我的flink应用程序就是这样写的。 我得到的输出是{“name”
我在Flink的工作中使用Kafka资料来源的信息流,一次阅读50个主题,如下所示: 然后有一些运算符,如:过滤器- 我能获得的最大吞吐量是每秒10k到20k条记录,考虑到源发布了数十万个事件,这相当低,我可以清楚地看到消费者落后于生产者。我甚至试着移除水槽和其他操作员,以确保没有背压,但它仍然是一样的。我正在将我的应用程序部署到Amazon Kinesis data analytics,并尝试了
我们观察到,其中一位消费者多次试图从Kafka主题中选取事件。我们在消费者应用程序方面有以下内容。spring.kafka.consumer.enable auto commit=false
大家好,我正在努力将一个简单的avro模式与模式注册表一起序列化。 设置: 两个用java编写的Flink jobs(一个消费者,一个生产者) 目标:生产者应该发送一条用ConfluentRegistryAvroSerializationSchema序列化的消息,其中包括更新和验证模式。 然后,使用者应将消息反序列化为具有接收到的模式的对象。使用。 到目前为止还不错:如果我将架构注册表上的主题配置
所以首先,为了能够暂停/停止消费者,我必须访问MessageListenerContainer。这意味着,在配置中,我将创建:ConcurrentKafkaListenerContainerFactory并(从2.2开始)使用它创建ConcurrentMessageListenerContainer的托管bean。然后可以使用这个bean来启动/停止消费者。管用。一旦它是并发的...我假设,我传递
我正在做一个Kafka的消费者计划。最近我们在PROD环境下进行了部署。在那里,我们面临以下问题: 我的理解是,当组协调器不可用并被重新发现时,心跳间隔(根据文档为3秒)过期,消费者被踢出组。这是正确的吗?。如果是这样的话,应该为这个工作做些什么呢?。如果我错了,请帮助我理解这个问题,并建议您有任何想法,以解决这个问题。如果需要,我可以分享代码。
在使用Spring Kafka Consumer时,我有时会收到以下错误消息。如代码片段所示,我至少实现了一次语义 1)我的疑问是,我是否错过了来自消费者的任何信息? 2) 我需要处理这个错误吗。由于 org.apache.kafka.clients.consumer.提交失败异常:无法完成偏移提交,因为消费者不是自动分区分配的活动组的一部分;消费者很可能被踢出组。 我的SpringKafka消费
我是Apache Camel的新手,我试图在一个简单的项目中理解和使用轮询消费者EIP,但我感到有点迷茫…谁能帮我解释一下,甚至用一个小的工作例子。 如有任何帮助,我们将不胜感激
我有一个spring boot后端,我想为它实现一个SSEendpoint。我想使用基于Xamarin表单的应用程序使用这个endpoint。 我设法为双方实现了一些例子,但是,我没有在应用程序上收到任何消息。 对于后端部分,我实现了以下示例: 注意:我特意以达到默认30秒超时的方式实现它。使用postman调用此方法,它将加载所述30秒并同时显示所有已发送的消息: 在我的应用程序部分,我使用了S
嗨,我正在将kafka升级到.9,并将kafka consumer升级到与.9一起发布的新java consumer。在升级时,我使用的是现有的主题,步骤只是停止.8 kafka并开始指向相同log.dirs的.9 kafka。在消费者端,我使用的是相同的组名和主题名,但是新的消费者从主题中的起始位置再次使用消息。我已经把他们交了。我正在添加auto.offset.reset=最早。 任何想法为什
我重新安装了Apache Kafka 0.10.1.0。 在使用Producer/Consumer Java示例时,我无法知道Consumer示例中的group.id参数。 让我知道如何解决这个问题。 下面是我使用的消费者示例: 在为consumer运行命令后,我可以看到Producer发布的消息(在控制台上)。但无法看到来自java程序的消息 bin\windows\kafka-console-
数据消费 Talos的高级Consumer为用户解决了数据消费的很多问题,其中一点就是“有记忆”的消费,保证用户在启动Consumer的时候能从“上次消费过”的地方开始消费; Commit Offset:TalosConsumer运行过程中,会不定时的对用户已经消费过的数据进行commit,我们叫做‘commit offset’,含义就是提交到server端记录已经消费的offset,请注意Com
我对Kafka是陌生的。我用spring boot创建了一个kafka消费者(spring-kafka dependency)。在我的应用程序中,我使用了consumerFactory和producerfactory beans进行配置。所以在我的应用程序中,我创建了如下的kafka消费者。 我的配置如下 所以我想并行消费,因为我可能会收到更多的消息。关于使用并行主题,我发现我需要为一个主题创建多
更新: 根据答案,下面的配置在绑定器级别工作。
我试图有多个消费者的Kafka主题的多个分区与相同的groupId,这将帮助我扩大消费的消息。 根据Kafka的文件,它说: 如何让多个消费者拥有相同的消费者groupId,以实现负载平衡?