当前位置: 首页 > 知识库问答 >
问题:

Flink 1.11当Flink 1.12成功时,消费者无法传播水印

张溪叠
2023-03-14

我看到了一些奇怪的行为。我使用Flink 1.12编写了一些Flink处理器,并试图让它们在Amazon EMR上运行。然而,Amazon EMR目前只支持Flink 1.11.2。当我降级时,我莫名其妙地发现水印不再传播。

主题上只有一个分区,并行度设置为1。这里有我遗漏的东西吗?我觉得我有点疯了。

这是Flink 1.12的输出:

Topic:input partitions=1
(name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null)
{
  "nodes" : [ {
    "id" : 1,
    "type" : "Source: Custom Source",
    "pact" : "Data Source",
    "contents" : "Source: Custom Source",
    "parallelism" : 1
  }, {
    "id" : 2,
    "type" : "Process",
    "pact" : "Operator",
    "contents" : "Process",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 1,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  } ]
}
Topic:input partitions=1
(name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null)
Assigning timestamp 86400000
Source [timestamp=86400000 watermark=-9223372036854775808] test message
Emitting watermark 0
Assigning timestamp 864000000
Source [timestamp=864000000 watermark=0] test message
Emitting watermark 777600000
Assigning timestamp 8640000000
Source [timestamp=8640000000 watermark=777600000] test message
Emitting watermark 8553600000
Assigning timestamp 86400000000
Source [timestamp=86400000000 watermark=8553600000] test message
Emitting watermark 86313600000
Assigning timestamp 9223372036854775807
Source [timestamp=9223372036854775807 watermark=86313600000] test message
Emitting watermark 9223372036768375807

这是Flink 1.11的输出:

Topic:input partitions=1
(name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null)
{
  "nodes" : [ {
    "id" : 1,
    "type" : "Source: Custom Source",
    "pact" : "Data Source",
    "contents" : "Source: Custom Source",
    "parallelism" : 1
  }, {
    "id" : 2,
    "type" : "Process",
    "pact" : "Operator",
    "contents" : "Process",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 1,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  } ]
}
Topic:input partitions=1
(name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null)
Assigning timestamp 86400000
Source [timestamp=0 watermark=-9223372036854775808] test message
Emitting watermark 0
Assigning timestamp 864000000
Source [timestamp=0 watermark=-9223372036854775808] test message
Emitting watermark 777600000
Assigning timestamp 8640000000
Source [timestamp=0 watermark=-9223372036854775808] test message
Emitting watermark 8553600000
Assigning timestamp 86400000000
Source [timestamp=0 watermark=-9223372036854775808] test message
Emitting watermark 86313600000
Assigning timestamp 9223372036854775807
Source [timestamp=0 watermark=-9223372036854775808] test message
Emitting watermark 9223372036768375807

下面是公开它的集成测试:

package mytest;

import java.io.FileInputStream;
import java.io.InputStream;
import java.io.IOException;

import java.nio.file.Files;
import java.nio.file.Paths;

import java.text.SimpleDateFormat;

import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;

import kafka.utils.MockTime;
import kafka.utils.TestUtils;

import kafka.zk.EmbeddedZookeeper;

import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.client.JobCancellationException;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsConfig;

import org.junit.*;

public class FailTest {
    public Properties getKafkaConsumerProperties() {
        Properties result = new Properties();
        result.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application");
        result.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        result.put("group.id", "0");
        result.put("enable.auto.commit", "true");
        result.put("auto.commit.interval.ms", "1000");
        result.put("session.timeout.ms", "30000");
        return result;
    }

    public Properties getProducerProperties() {
        // Use Kafka provided properties
        Properties result = new Properties();
        result.put("bootstrap.servers", "localhost:9092");
        result.put("compression.type", "none");
        return result;
    }

    public Properties getServerProperties(int port) {
        // Use Kafka provided properties
        Properties result = new Properties();
        result.put("broker.id", "0");
        result.put("num.network.threads", "3");
        result.put("num.io.threads", "8");
        result.put("socket.send.buffer.bytes", "102400");
        result.put("socket.recv.buffer.bytes", "102400");
        result.put("num.partitions", "1");
        result.put("offset.topic.replication.factor", "1");
        result.put("transaction.state.log.replication.factor", "1");
        result.put("transaction.state.log.min.isr", "1");
        result.put("log.retention.hours", "168");
        result.put("log.segment.bytes", "1073741824");
        result.put("log.retention.check.interval.ms", "300000");
        result.put("zookeeper.connect", "localhost:" + port);
        result.put("zookeeper.connection.timeout.ms", "18000");
        result.put("group.initial.rebalance.delay.ms", "0");

        String path = "target/kafka-logs/run.";
        int index = 0;
        while (!Files.notExists(Paths.get(path + String.valueOf(index)))) {
            index += 1;
        }
        result.put("log.dirs", path + String.valueOf(index));
        return result;
    }

    public void printTopics(AdminClient admin, String inputTopic) throws Exception {
        Map<String, TopicDescription> topics = admin.describeTopics(Arrays.asList(inputTopic)).all().get();
        for (Map.Entry<String, TopicDescription> topic : topics.entrySet()) {
            System.out.printf("Topic:%s partitions=%d\n", topic.getValue().name(), topic.getValue().partitions().size());
            System.out.println(topic.getValue().toString());
        }
    }

    @ClassRule
    public static MiniClusterWithClientResource flinkCluster =
        new MiniClusterWithClientResource(
            new MiniClusterResourceConfiguration.Builder()
                .setNumberSlotsPerTaskManager(1)
                .setNumberTaskManagers(1)
                .build());

    @Test
    public void testFail() throws Exception {
        StringSerializer stringSerializer = new StringSerializer();
        StringDeserializer stringDeserializer = new StringDeserializer();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        EmbeddedZookeeper zooKeeper = new EmbeddedZookeeper();
        KafkaServer server = TestUtils.createServer(new KafkaConfig(getServerProperties(zooKeeper.port())), new MockTime());
        AdminClient admin = AdminClient.create(getProducerProperties());

        String inputTopic = "input";

        Map<String, String> configs = new HashMap<>();
        int partitions = 1;
        short replication = 1;

        CreateTopicsResult result = admin.createTopics(Arrays.asList(
            new NewTopic(inputTopic, partitions, replication).configs(configs)
        ));
        result.all().get();

        printTopics(admin, inputTopic);

        // Some subscription events
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(getProducerProperties(), stringSerializer, stringSerializer);
        producer.send(new ProducerRecord<String, String>(inputTopic, 0, Time.days(1).toMilliseconds(), "0", "test message"));
        producer.send(new ProducerRecord<String, String>(inputTopic, 0, Time.days(10).toMilliseconds(), "0", "test message"));
        producer.send(new ProducerRecord<String, String>(inputTopic, 0, Time.days(100).toMilliseconds(), "0", "test message"));
        producer.send(new ProducerRecord<String, String>(inputTopic, 0, Time.days(1000).toMilliseconds(), "0", "test message"));
        producer.send(new ProducerRecord<String, String>(inputTopic, 0, Long.MAX_VALUE, "0", "test message"));
        producer.flush();
        producer.close();

        FlinkKafkaConsumer<String> source = new FlinkKafkaConsumer<String>(inputTopic, new SimpleStringSchema(), getKafkaConsumerProperties());
        source.setStartFromEarliest();
        source.assignTimestampsAndWatermarks(
            new WatermarkStrategy<String>() {
                @Override
                public TimestampAssigner<String> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
                    return new TimestampAssigner<String>() {
                        @Override
                        public long extractTimestamp(String event, long recordTimestamp) {
                            System.out.printf("Assigning timestamp %d\n", recordTimestamp);
                            return recordTimestamp;
                        }
                    };
                }

                @Override
                public WatermarkGenerator<String> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                    return new WatermarkGenerator<String>() {
                        public long latestWatermark = Long.MIN_VALUE;

                        @Override
                        public void onEvent(String event, long timestamp, WatermarkOutput output) {
                            long eventWatermark = timestamp - Time.days(1).toMilliseconds();
                            if (eventWatermark > latestWatermark) {
                                System.out.printf("Emitting watermark %d\n", eventWatermark);
                                output.emitWatermark(new Watermark(eventWatermark));
                                latestWatermark = eventWatermark;
                            }
                        }

                        @Override
                        public void onPeriodicEmit(WatermarkOutput output) {
                        }
                    };
                }
            });

        env.addSource(source)
            .process(new ProcessFunction<String, String>() {
                @Override
                public void processElement(String value, Context ctx, Collector<String> out) {
                    System.out.printf("Source ");
                    if (ctx != null) {
                        TimerService srv = ctx.timerService();
                        Long timestampLong = ctx.timestamp();
                        long timestamp = 0;
                        if (timestampLong != null) {
                            timestamp = timestampLong;
                        }
                        long watermark = 0;
                        if (srv != null) {
                            watermark = srv.currentWatermark();
                        }
                        System.out.printf("[timestamp=%d watermark=%d] ", timestamp, watermark);
                    }

                    System.out.println(value);
                    out.collect(value);
                }
            });

        System.out.println(env.getExecutionPlan());
        JobClient client = null;
        try {
            client = env.executeAsync("Fail Test");
        } catch (Exception e) {
            e.printStackTrace();
            throw e;
        }

        printTopics(admin, inputTopic);

        TimeUnit.SECONDS.sleep(5);
        client.cancel().get(5, TimeUnit.SECONDS);

        try {
            server.shutdown();
            zooKeeper.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

共有1个答案

长孙翔
2023-03-14

结果表明,Flink 1.12将TimeCharacteristic默认为EventTime,并反对使用整个TimeCharacteristic流。因此,要降级到Flink 1.11,必须添加以下语句来配置StreamExecutionEnvironment。

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 类似资料:
  • 我刚刚开始研究使用hdfs接收器向hdfs写入消息的水槽。我想知道水槽源是否可以充当我的消息代理的jms消费者。 flume是否提供与消息代理的集成。或者我是否需要编写一个自定义jms客户端,将消息推送到水槽源。

  • 我在ActiveMQ中使用异步消息使用者。我的制作人工作正常,向队列发送消息。现在,我的异步消息消费者正在等待调用onMessage(),但这从未发生过。因此,问题是: 异步使用者不会使用消息 ActiveMQ日志的快照还显示了许多刚刚堆积在挂起状态中的消息: 我想不出问题到底出在哪里。 计数: toPageIn 78 只是不断增加,信息仍然无法传递给消费者。 是服务器端问题还是客户端问题?

  • 我试图做一个简单的poc与Spring启动与版本(2.3.7发布)的SpringKafka,以实现消费者批处理的工作原理,以及如何再平衡工作,如果消费者需要更多的流转时长,因为我是全新的这个消息系统。 现在我看到kafka重新平衡单个消费者(不允许并发)的问题。 这些是我设置的max.poll.interval属性。ms=50000和factory.getContanerProperties。se

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 我的应用程序由一个带有POST方法的REST控制器组成,用于提交我必须使用生产者发送到主题的数据。 这是控制器 使用Spring-Cloud-Stream版本 从3.1版开始,和注释被弃用,所以我尝试切换到新的方式来设置生产者,我就是这样工作的 最后在应用程序中。yaml我有这个 现在的问题是,当我启动应用程序时,方法被无限调用(我在主题中看到消息)。然后使用供应商似乎我被迫在供应商内部定义消息数

  • 我花了几个小时想弄清楚发生了什么,但没能找到解决办法。 这是我在一台机器上的设置: 1名zookeeper跑步 我正在使用kafka控制台生成器插入消息。如果我检查复制偏移量(