我想在Apache Flink 1.12中使用Kafka源作为有界数据源,我尝试过使用Flink Kafka消费者连接器,但它给了我以下原因
原因:java.lang.IllegalStateException:检测到一个未绑定的源,execution.runtime模式设置为BATCH。不允许此组合,请在org.apache.flink.util.Preconditions.check状态下将execution.runtime模式设置为STREAMING或AUTOMATIC(Preconditions.java:198)~[flink-core-1.12.0.jar: 1.12.0]
根据flink的最新文档,我们可以使用Kafka作为有界源,但没有提供如何实现这一点的示例,也没有提到这是继续使用此方法的最佳方式。
有人能帮我获得一些示例工作代码来实现这个用例吗
这里有一个例子:
KafkaSource<String> source = KafkaSource
.<String>builder()
.setBootstrapServers(...)
.setGroupId(...)
.setTopics(...)
.setDeserializer(...)
.setStartingOffsets(OffsetsInitializer.earliest())
.setBounded(OffsetsInitializer.latest())
.build();
env.fromSource(source, WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source"));
有关更多信息,请参阅javadocs。
我尝试执行Flink(1.12.1)批处理作业,步骤如下: null 资料来源: 谢了!
11.3 添加一个Footer记录 经常写文本文件时,在所有处理都已经完成,一个”footer” 记录必须附加到文件的末尾.这也可以通过使用由Spring Batch提供的FlatFileFooterCallback接口,FlatFileItemWriter的FlatFileFooterCallback(和与之对应的FlatFileHeaderCallback)是可选的属性. <bean id="
一些批处理任务可以使用spring batch现成的组件完全的组装.例如ItemReader和ItemWriter实现可配置覆盖范围广泛的场景,然而,对于大多数情况下,必须编写自定义代码。应用程序开发人员的主要API人口点是Tasklet,ItemReader ItemWriter和各种各样的监听器接口.最简单的批处理任务能够使用Spring BatchItemReader现成的输出,但通常情况下
我这么说是因为来自avro的POJO一代不是那么直截了当的。在此基础上,它需要maven插件和一个.avsc文件。 例如,我在我的Kafka制作人上创建了一个POJO,名为User: 我连载它,并发送到我的用户主题在Kafka。然后我有一个消费者,它本身有一个POJO用户,并反序列化消息。是空间问题吗?这样序列化和反序列化不是也更快吗?更不用说维护模式注册表的开销了。
假设我有两个经纪人。 我读到Kafka制作人创建的制作人线程等于经纪人的数量。在这种情况下,我将有两个内部线程。 假设我有5个主题,每秒只收到200条消息。Kafka如何进行批处理? 一批大小=30条消息。[topic1=5,topic2=10,topic3=3,topic4=10,topic5=2消息]这些是最重要的消息和相应的主题。 Kafka是如何执行批处理的?
我知道ConcurrentKafkaListenerContainerFactory具有属性“setBatchListener(true)”,但就我对Kafka文档的理解而言,这种类型的工厂配置仅与@KafKalistener注释一起使用。 任何帮助都将不胜感激。