我们希望在读取消息表单kafka时实现并行性。因此我们想在flinkkafkaconsumer中指定分区号。它将从kafka中的所有分区读取消息,而不是特定的分区号。以下是示例代码:
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "Message-Test-Consumers");
properties.setProperty("partition", "1"); //not sure about this syntax.
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>("EventLog", new SimpleStringSchema(), properties);
请建议任何更好的选择来获得并行性。
我不相信有一种机制可以限制Flink从哪个分区读取数据。我也看不出这将如何帮助您实现并行读取分区的目标,而Flink无论如何都会这样做。
Flink Kafka源连接器并行读取所有可用分区。只需将kafka源连接器的并行度设置为所需的任何并行度,记住有效并行度不能超过分区数。这样,Flink的Kafka源连接器的每个实例都将从一个或多个分区读取。您还可以配置kafka使用者,以自动发现作业运行时可能创建的新分区。
大家好,我正在努力将一个简单的avro模式与模式注册表一起序列化。 设置: 两个用java编写的Flink jobs(一个消费者,一个生产者) 目标:生产者应该发送一条用ConfluentRegistryAvroSerializationSchema序列化的消息,其中包括更新和验证模式。 然后,使用者应将消息反序列化为具有接收到的模式的对象。使用。 到目前为止还不错:如果我将架构注册表上的主题配置
我对Kafka和Spring Boot是一种新的体验,并试图使我的应用程序从主题的特定分区读取。 单厂代码 这也是我的消费者工厂配置 当我试图运行程序时,它给我一个错误 分区Single上的偏移量提交失败。偏移量308处的Attendance-0:协调器不知道此成员。 和警告 失败:无法完成提交,因为组已重新平衡并将分区分配给另一个成员。这意味着对poll()的后续调用之间的时间比配置的max.p
我刚刚注意到,当我在分区中生成单个消息时,我的使用者不会收到它。只有在我在同一分区中生成了更多的消息之后,使用者才会收到它们。我的数设置为 1。 是否有其他一些配置可能会影响这里? 每个分区都有一个专用的消费者。 相关部件的使用者代码。我的使用者为 定义的不同主题启动多个线程。使用 https://github.com/mmustala/rdkafka-ruby 这是原始消费宝石的叉子。我添加了一
我有3个spring kafka消费者(同一组)从3个分区获得消息。我想检测其中一个使用者何时停止从一个分区读取(其他两个使用者继续从其他两个分区读取)。到目前为止,这种情况已经发生过两次,当检测到时,很容易通过重新启动所有消费者来修复,这将导致重新平衡。问题是在这两种情况下,早点知道会很好。所以我尝试使用ListenerContainerIdleEvent,如下所示- 这是我的测试结果- 当一个
我开始在我的项目中使用spring-integration-kafka,我可以生产和消费来自kafka的消息。但是现在,我想为特定的分区生成消息,并从特定的分区消费消息。 例如,我想向分区3生成消息,而消费将只接收来自分区3的消息。 到目前为止,我的主题有8个分区,我可以向特定的分区发送消息,但是我还没有找到配置消费者只接收来自特定分区的消息的方法。 因此,任何关于我应该如何配置消费者与sping
我们计划编写一个Kafka消费者(java),它读取Kafka队列以执行消息中的操作。