当前位置: 首页 > 知识库问答 >
问题:

在所有Kafka消费者结束后完成Flink程序

谷梁英毅
2023-03-14

我在这里设置了一个最小的示例,其中有N个Kakfa主题的N个流(在下面的示例中为100个)。

我想在每个流看到“EndofStream”消息时完成它。当所有流都完成时,我希望Flink程序能够顺利完成
当parallelism设置为1时,这是正确的,但通常不会发生。

从另一个问题来看,似乎并非Kafka消费群体的所有线索都结束了。

其他人建议抛出异常。但是,程序将在第一个异常时终止,并且不会等待所有流完成。

我还添加了一个最小的python程序,将消息添加到Kafka主题中,以实现可复制性。请填写<代码>

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        String outputPath = "file://" + System.getProperty("user.dir") + "/out/output";

        Properties kafkaProps = null;
        kafkaProps = new Properties();
        String brokers = "<IP>:<PORT>";
        kafkaProps.setProperty("bootstrap.servers", brokers);
        kafkaProps.setProperty("auto.offset.reset", "earliest");


        ArrayList<FlinkKafkaConsumer<String>> consumersList = new ArrayList<FlinkKafkaConsumer<String>>();
        ArrayList<DataStream<String>> streamList = new ArrayList<DataStream<String>>();

        for (int i = 0; i < 100; i++) {
            consumersList.add(new FlinkKafkaConsumer<String>(Integer.toString(i),
                        new SimpleStringSchema() {
                            @Override
                            public boolean isEndOfStream(String nextElement) {
                                if (nextElement.contains("EndofStream")) {
                                    // throw new RuntimeException("End of Stream");       
                                    return true;
                                } else { 
                                    return false;
                                }
                            }
                        }
                        , kafkaProps));
            consumersList.get(i).setStartFromEarliest();
            streamList.add(env.addSource(consumersList.get(i)));
            streamList.get(i).writeAsText(outputPath + Integer.toString(i), WriteMode.OVERWRITE);
        }

        // execute program
        env.execute("Flink Streaming Java API Skeleton");

Python 3程序

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='<IP>:<PORT>')

for i in range(100): # Channel Number
    for j in range(100): # Message Number
        message = "Message: " + str(j) + " going on channel: " + str(i)
        producer.send(str(i), str.encode(message))

    message = "EndofStream on channel: " + str(i)
    producer.send(str(i), str.encode(message))


producer.flush()

更改这一行:stream List.add(env.addSource(消费者ist.get(i)));stream List.add(env.addSource(消费者ist.get(i)). setParallelism(1));也可以完成这项工作,但Flink会将所有消费者放置到同一台物理机器上。

我希望消费者也能得到分配。

flink-conf.yaml

parallelism.default: 2
cluster.evenly-spread-out-slots: true

最后一招将每个主题编写在单独的文件中并使用文件作为源而不是kafka消费者。
最终目标是测试flink处理某些程序的某些工作负载需要多少时间。

共有1个答案

汪臻
2023-03-14

使用FlinkKafkaConsumerBase中的cancel方法,该方法是FlinkKafkaConsumer的父类。

从接口复制的公共无效取消()描述:SourceFunction取消源。大多数源在SourceFunction.run(SourceContext)方法中都会有一个while循环。实现需要确保在调用此方法后源将跳出该循环。典型的模式是在此方法中设置一个“易失性布尔值”标志为false。该标志在循环条件中被检查。

当源被取消时,执行线程也将被中断(通过Thread.interrupt())。中断严格发生在调用此方法之后,因此任何中断处理程序都可以依赖于此方法已完成的事实。最好将此方法更改的任何标志设置为“易失性”,以保证此方法的效果对任何中断处理程序的可见性。

指定方式:在接口SourceFunction中取消

你是对的。必须使用SimpleStringSchema。这是基于这个答案https://stackoverflow.com/a/44247452/2096986.看看这个例子。首先,我发送了我们看到的字符串Flink代码,该代码也在集群中工作,Kafka消费者使用该消息。然后我发送shutdowndddddddd,这对完成流也没有影响。最后,我发送了SHUTDOWN,流作业完成了。请参阅程序下面的日志。

package org.sense.flink.examples.stream.kafka;

import java.util.Properties;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

public class KafkaConsumerQuery {

    public KafkaConsumerQuery() throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "test");

        FlinkKafkaConsumer myConsumer = new FlinkKafkaConsumer(java.util.regex.Pattern.compile("test"),
                new MySimpleStringSchema(), properties);

        DataStream<String> stream = env.addSource(myConsumer);
        stream.print();

        System.out.println("Execution plan >>>\n" + env.getExecutionPlan());
        env.execute(KafkaConsumerQuery.class.getSimpleName());
    }

    private static class MySimpleStringSchema extends SimpleStringSchema {
        private static final long serialVersionUID = 1L;
        private final String SHUTDOWN = "SHUTDOWN";

        @Override
        public String deserialize(byte[] message) {

            return super.deserialize(message);
        }

        @Override
        public boolean isEndOfStream(String nextElement) {
            if (SHUTDOWN.equalsIgnoreCase(nextElement)) {
                return true;
            }
            return super.isEndOfStream(nextElement);
        }
    }

    public static void main(String[] args) throws Exception {
        new KafkaConsumerQuery();
    }
}

日志:

2020-07-02 16:39:59,025 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - [Consumer clientId=consumer-8, groupId=test] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
3> Flink code we saw also works in a cluster. To run this code in a cluster
3> SHUTDOWNDDDDDDD
2020-07-02 16:40:27,973 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source -> Sink: Print to Std. Out (3/4) (5f47c2b3f55c5eb558484d49fb1fcf0e) switched from RUNNING to FINISHED.
2020-07-02 16:40:27,973 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Source: Custom Source -> Sink: Print to Std. Out (3/4) (5f47c2b3f55c5eb558484d49fb1fcf0e).
2020-07-02 16:40:27,974 INFO  org.apache.flink.runtime.taskmanager.Task                     - Ensuring all FileSystem streams are closed for task Source: Custom Source -> Sink: Print to Std. Out (3/4) (5f47c2b3f55c5eb558484d49fb1fcf0e) [FINISHED]
2020-07-02 16:40:27,975 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Un-registering task and sending final execution state FINISHED to JobManager for task Source: Custom Source -> Sink: Print to Std. Out (3/4) 5f47c2b3f55c5eb558484d49fb1fcf0e.
2020-07-02 16:40:27,979 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> Sink: Print to Std. Out (3/4) (5f47c2b3f55c5eb558484d49fb1fcf0e) switched from RUNNING to FINISHED.
 类似资料:
  • 我在Flink的工作中使用Kafka资料来源的信息流,一次阅读50个主题,如下所示: 然后有一些运算符,如:过滤器- 我能获得的最大吞吐量是每秒10k到20k条记录,考虑到源发布了数十万个事件,这相当低,我可以清楚地看到消费者落后于生产者。我甚至试着移除水槽和其他操作员,以确保没有背压,但它仍然是一样的。我正在将我的应用程序部署到Amazon Kinesis data analytics,并尝试了

  • Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka

  • 我们有一个应用程序,消费者读取一条消息,线程执行许多操作,包括在生成另一主题的消息之前访问数据库。在线程上消耗和生成消息之间的时间可能需要几分钟。一旦生成了指向新主题的消息,就会进行提交,以表明我们已经完成了对消费者队列消息的处理。自动提交因此被禁用。 我正在使用高级消费者,我注意到的是zoowatch和kafka会话超时,因为我们在消费者队列上做任何事情之前需要太长时间,所以kafka每次线程返

  • 我有一个生产者-消费者模式的多线程任务。可能有许多生产者和一个消费者。我使用ArrayBlockingQueue作为共享资源。 Producer类中的run()方法: Consumer类中的run()方法: main()方法: 现在,当队列为空时,我有消费者结束条件。但是可能会有一段时间队列变成空的,但是一些生产者线程仍然在工作。所以我只需要在完成所有生产者线程之后才完成消费者线程(但它们的数量事

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 我做了一个poc,其中我使用spark流从Kafka读取数据。但我们的组织要么使用ApacheFlink,要么使用Kafka消费者从ApacheKafka读取数据,作为标准流程。所以我需要用Kafka消费者或ApacheFlink替换Kafka流媒体。在我的应用程序用例中,我需要从kafka读取数据,过滤json数据并将字段放入cassandra中,因此建议使用kafka consumer而不是f