当前位置: 首页 > 知识库问答 >
问题:

Flink KafkaSource从该主题读取所有消息

尉迟国发
2023-03-14

我的目标是使用Flink KafkaSource阅读来自Kafka主题的所有消息。我尝试用批处理和流模式执行。问题如下:当我将env.setParallelism设置为高于2时,我必须使用包含bug的接收器。于是,我设置了例如:< code > streamexecutionenvironment . setparallelism(1);

我想使用的Kafka主题包含3个分区。这是我的代码片段:

KafkaSourceBuilder<Request> builder = KafkaSource.builder();
    builder.setBootstrapServers(kafkaBrokers);
    builder.setProperty("partition.discovery.interval.ms", "10000");
    builder.setTopics(topic);
    builder.setGroupId(groupId);
    builder.setBounded(OffsetsInitializer.latest());
    builder.setStartingOffsets(OffsetsInitializer.earliest());
    builder.setDeserializer(KafkaRecordDeserializationSchema.of(deserializer));

DataStreamSource<Request> streamSource = streamExecutionEnvironment.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
    streamSource.map(new MyMapper())
            .addSink(new Sink(props)).setParallelism(3) //by this setting I expected to have exactly 3 consumers - per partition/split, but it doesn't work when I do not set anything as well
            .name("Flink " + context.getJobDetail().getKey());

这段代码应该在将被对接的Spring Boot应用程序中运行,我配置了一个定期执行的quartz作业,streamExecutionEnvironment是本地环境:StreamExecution环境env=StreamExecutIONEnvironmen.createLocalEnvironmend()

此时,主题内已经有超过1000万条消息。当作业执行时,我可以在日志中看到:

    [ -> Map (1/1)#0] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=rrequest_consumer_test-0, groupId=request_consumer_test] Seeking to EARLIEST offset of partition request-2
INFO 7748 --- [ -> Map (1/1)#0] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=request_consumer_test-0, groupId=request_consumer_test] Seeking to EARLIEST offset of partition request-0
INFO 7748 --- [ -> Map (1/1)#0] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=request_consumer_test-0, groupId=request_consumer_test] Seeking to EARLIEST offset of partition request-1

然后他们总共消耗了大约100万条消息并停止了消费,对于所有3条消息,我可以看到:

[ -> Map (1/1)#0] o.a.f.c.b.s.reader.fetcher.SplitFetcher  : Finished reading from splits [request-1]

因此,他们并没有完全理解这个话题,只是其中的一部分。当石英作业重新触发时,它再次开始从OffsetsInitializer读取。在earlish()中,它们使用重复的消息,也使用新消息,不仅是新添加到主题中的消息,还有一些在前一次执行期间未使用的消息。

我还尝试重新命名消费群体,以消除消费者在前一次消费后promise的情况下的补偿问题。

我的问题是 - 我如何配置数据流以完全阅读主题。我的问题与 setParallelism(1) 设置或一般的并行性、使用者组配置或其他任何东西有什么关系?请给我任何关于解决问题的建议。


共有1个答案

张亦
2023-03-14

问题与

builder.setBounded(OffsetsInitializer.latest());

此行告诉 Kafka 将消息读取到作业开始时看到的最后一个偏移量。然后,它将停止消耗更多消息。

 类似资料:
  • 我有以下用例: 我有两个Kafka主题,一个是用来处理传入消息流的,另一个是用来存储记录的,作为应用程序初始状态的引导。 有没有办法做到以下几点: 当应用程序启动时,读取Kafka主题中的所有消息,并将该主题中用于将应用程序引导至初始状态的所有存储在内存中 只有在读取了所有消息后,才允许处理流主题中的 因为在应用程序运行时,状态主题上可能会有其他记录,以便在不必重新启动应用程序的情况下将它们合并到

  • 我有一个@KafkaListener方法来获取主题中的所有消息,但对于@Scheduled方法工作的每个间隔时间,我只获取一条消息。如何一次从topic获取所有消息? 这是我的课; 这是我在应用程序中的Kafka属性。yml; 还有我的KafkaConfiguration课程;

  • 我有一个简单的java制作人,如下所示 我正在尝试读取如下数据 但消费者并没有从Kafka那里读到任何信息。如果我在处添加以下内容 然后消费者开始从题目开始阅读。但是每次消费者重新启动时,它都从我不想要的主题开始读取消息。如果我在启动消费程序时添加了以下配置 然后,它从主题中读取消息,但是如果消费者在处理所有消息之前重新启动,那么它不会读取未处理的消息。 有人可以让我知道出了什么问题,我该如何解决

  • 我正在构建一个由亚马逊服务提供支持的警报系统。 我每天将一个文件放到S3上,它生成一个lambda函数(我们称之为生成器函数)来处理该文件。 Generator基于此文件构建警报并将多条消息发布到SNS主题(让我们称之为发件箱)-由Generator计算的每个收件人一条消息。 我在发件箱中订阅了第二个lambda函数(我们称之为Courier),它应该接收每条消息并对其进行处理。 发电机代码: 以

  • 我想要从服务器的一个主题开始所有的消息。 当使用上面的控制台命令时,我希望能够从一开始就获得一个主题中的所有消息,但我不能从一开始就使用java代码消费一个主题中的所有消息。

  • 问题内容: 我有一个简单的Java生产者,如下所示 我正在尝试读取以下数据 但是消费者没有阅读来自kafka的任何消息。如果我在下面添加以下内容 然后,消费者开始阅读该主题。但是,每当使用者重新启动时,它都会从我不希望的主题开头读取消息。如果我在启动Consumer时添加以下配置 然后它从主题读取消息,但是如果使用者在处理所有消息之前重新启动,则它不会读取未处理的消息。 有人可以让我知道出了什么问