当前位置: 首页 > 知识库问答 >
问题:

如何使用Apache Flink 1.12和DataStream API批处理模式将Kafka添加为有界源

孟安民
2023-03-14

我想在Apache Flink 1.12中使用Kafka源作为有界数据源,我尝试过使用Flink Kafka消费者连接器,但它给了我以下原因

原因:java.lang.IllegalStateException:检测到一个未绑定的源,execution.runtime模式设置为BATCH。不允许此组合,请在org.apache.flink.util.Preconditions.check状态下将execution.runtime模式设置为STREAMING或AUTOMATIC(Preconditions.java:198)~[flink-core-1.12.0.jar: 1.12.0]

根据flink的最新文档,我们可以使用Kafka作为有界源,但没有提供如何实现这一点的示例,也没有提到这是继续使用此方法的最佳方式。

有人能帮我获得一些示例工作代码来实现这个用例吗

共有1个答案

南门棋
2023-03-14

这里有一个例子:

KafkaSource<String> source = KafkaSource
        .<String>builder()
        .setBootstrapServers(...)
        .setGroupId(...)
        .setTopics(...)
        .setDeserializer(...)
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setBounded(OffsetsInitializer.latest())
        .build();

env.fromSource(source, WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source"));

有关更多信息,请参阅javadocs。

 类似资料:
  • 我尝试执行Flink(1.12.1)批处理作业,步骤如下: null 资料来源: 谢了!

  • 11.3 添加一个Footer记录 经常写文本文件时,在所有处理都已经完成,一个”footer” 记录必须附加到文件的末尾.这也可以通过使用由Spring Batch提供的FlatFileFooterCallback接口,FlatFileItemWriter的FlatFileFooterCallback(和与之对应的FlatFileHeaderCallback)是可选的属性. <bean id="

  • 一些批处理任务可以使用spring batch现成的组件完全的组装.例如ItemReader和ItemWriter实现可配置覆盖范围广泛的场景,然而,对于大多数情况下,必须编写自定义代码。应用程序开发人员的主要API人口点是Tasklet,ItemReader ItemWriter和各种各样的监听器接口.最简单的批处理任务能够使用Spring BatchItemReader现成的输出,但通常情况下

  • 我这么说是因为来自avro的POJO一代不是那么直截了当的。在此基础上,它需要maven插件和一个.avsc文件。 例如,我在我的Kafka制作人上创建了一个POJO,名为User: 我连载它,并发送到我的用户主题在Kafka。然后我有一个消费者,它本身有一个POJO用户,并反序列化消息。是空间问题吗?这样序列化和反序列化不是也更快吗?更不用说维护模式注册表的开销了。

  • 假设我有两个经纪人。 我读到Kafka制作人创建的制作人线程等于经纪人的数量。在这种情况下,我将有两个内部线程。 假设我有5个主题,每秒只收到200条消息。Kafka如何进行批处理? 一批大小=30条消息。[topic1=5,topic2=10,topic3=3,topic4=10,topic5=2消息]这些是最重要的消息和相应的主题。 Kafka是如何执行批处理的?

  • 我知道ConcurrentKafkaListenerContainerFactory具有属性“setBatchListener(true)”,但就我对Kafka文档的理解而言,这种类型的工厂配置仅与@KafKalistener注释一起使用。 任何帮助都将不胜感激。