当前位置: 首页 > 知识库问答 >
问题:

Flink Kafka生产者元素乱序

周奇
2023-03-14

我试图使用FlinkKafkaProducer010生成元素,但是当我打开消费者控制台窗口时,元素似乎出现了故障。

我使用kafka-topics.bat创建了主题

消费者是使用:kafka console consumer创建的。蝙蝠——zookeeper本地主机:2181——主题mytopic

我使用的Kafka制作人代码是:

public static void main(String[] args) throws Exception {
    ParameterTool parameterTool = ParameterTool.fromArgs(args);
    if(parameterTool.getNumberOfParameters() < 2) {
        System.out.println("Missing parameters!");
        System.out.println("Usage: Kafka --topic <topic> --bootstrap.servers <kafka brokers>");
        return;
    }

    StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
    env.getConfig().disableSysoutLogging();
    env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));

    DataStream<String> messageStream = env.addSource(getSourceFunction());

    FlinkKafkaProducer010<String> producer = new FlinkKafkaProducer010<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties());
    messageStream.addSink(producer);
    env.execute("Kafka Producer");
}

public static SourceFunction<String> getSourceFunction() {
    return new SourceFunction<String>() {
        private static final long serialVersionUID = 6369260225318862378L;
        public boolean running = true;

        @Override
        public void run(SourceContext<String> ctx) {
            int counter = 0;
            while (this.running && counter < 500) {
                String data = "item " + Integer.toString(counter);
                ctx.collect(data);

                counter++;
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    };
}

当我查看Kafka日志文件时,我看到一个. log文件,其中的元素也是无序的。元素的顺序会跳转大约10个值。在我的用例中,拥有正确的顺序是必不可少的。我一直在搜索如何确保元素按顺序到达,但到目前为止没有任何运气。我是否错过了修复排序的东西?

提前感谢您的帮助!

共有1个答案

申昌勋
2023-03-14

我猜你在使用并行

 类似资料:
  • 大家好,我正在努力将一个简单的avro模式与模式注册表一起序列化。 设置: 两个用java编写的Flink jobs(一个消费者,一个生产者) 目标:生产者应该发送一条用ConfluentRegistryAvroSerializationSchema序列化的消息,其中包括更新和验证模式。 然后,使用者应将消息反序列化为具有接收到的模式的对象。使用。 到目前为止还不错:如果我将架构注册表上的主题配置

  • 我有一个消费者作为生产者消费者模式的一部分: 简化: 如果我移除 通过将线程设置为睡眠,CPU使用率攀升到极高的水平(13%),而不是0%。 此外,如果我实例化该类的多个实例,则每个实例的CPU使用率都会以13%的增量攀升。 大约每分钟(可能每30秒)都会向BlockingCollection添加一个新的LogItem,并将适用的消息写入文件。 有没有可能线程以某种方式阻止了其他线程的运行,而系统

  • 我有一个生产者/消费者模式,如下所示 固定数量的生成器线程,每个线程写入它们自己的BlockingQueue,通过执行器调用 单个使用者线程,读取生产者线程 每个生产者都在运行一个数据库查询,并将结果写入其队列。消费者轮询所有生产者队列。目前,如果出现数据库错误,生产者线程就会死掉,然后消费者就会永远停留在产品队列中等待更多的结果。 我应该如何构造它来正确处理catch错误?

  • 问题 你想打乱数组中的元素。 解决方案 Fisher-Yates shuffle 是一种高效、公正的方式来让数组中的元素随机化。这是一个相当简单的方法:在列表的结尾处开始,用一个随机元素交换最后一个元素列表中的最后一个元素。继续下一个并重复操作,直到你到达列表的起始端,最终列表中所有的元素都已打乱。这 [ Fisher-Yates shuffle Visualization ]( http://b

  • 本教程演示了如何发送和接收来自Spring Kafka的消息。 首先创建一个能够发送消息给Kafka主题的Spring Kafka Producer。 接下来,我们创建一个Spring Kafka Consumer,它可以收听发送给Kafka主题的消息。使用适当的键/值序列化器和解串器来配置它们。 最后用一个简单的Spring Boot应用程序演示应用程序。 下载并安装Apache Kafka 要

  • 生产者线程与消费者线程使用信号量同步 生产者线程与消费者线程使用信号量同步 源码/* * Copyright (c) 2006-2018, RT-Thread Development Team * * SPDX-License-Identifier: Apache-2.0 * * Change Logs: * Date Author Notes * 2018-08-24 yangjie the f