当前位置: 首页 > 知识库问答 >
问题:

Flink在《Kafka》中克服坏消息:“毒药消息”

太叔灿
2023-03-14

我第一次试着让它工作,所以请容忍我。我正在尝试学习Kafka的检查点设置和处理“错误”消息,在不丢失状态的情况下重新启动。

用例:使用检查点。从Kafka那里读取一个整数流,保持一个连续的和。如果读到“坏”Kafka消息,请重新启动应用程序,跳过“坏”消息,保持状态。我的流看起来像这样:

set1,5
set1,7
set1,foobar
set1,6

我希望我的应用程序保留它看到的整数的运行总和,如果它崩溃而不丢失状态,则重新启动,因此应用程序行为/运行总和将是:
5、
12、
应用程序崩溃并重新启动、读取检查点
18等。

但是,我发现当我的应用程序重新启动时,它一直在读取错误的“fobar”消息并且无法通过它。下面的源代码。当我尝试将“fobar”解析为整数时,映射器会爆炸。如何修改应用程序以绕过“毒药”消息?

    env.enableCheckpointing(1000L);   


   env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); 
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); 
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L); 
        env.getCheckpointConfig().setCheckpointTimeout(10000); 
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); 
        env.setStateBackend(new 
        FsStateBackend("hdfs://mymachine:9000/flink/checkpoints")); 

        Properties properties = new Properties(); 
        properties.setProperty("bootstrap.servers", BROKERS); 
        properties.setProperty("zookeeper.connect", ZOOKEEPER_HOST); 
        properties.setProperty("group.id", "consumerGroup1"); 

        FlinkKafkaConsumer08 kafkaConsumer = new FlinkKafkaConsumer08<>(topicName, 
        new SimpleStringSchema(), properties); 
        DataStream<String> messageStream = env.addSource(kafkaConsumer); 

        DataStream<Tuple2&lt;String,Integer>> sums = messageStream 
          .map(new NumberMapper()) 
          .keyBy(0) 
          .sum(1);  
          sums.print(); 


                private static class NumberMapper implements 
        MapFunction<String,Tuple2<String,Integer>> { 
                        public Tuple2<String,Integer> map(String input) throws Exception { 
                                return parseData(input); 
                        } 

                        private Tuple2<String,Integer> parseData(String record) { 

                                String[] tokens = record.toLowerCase().split(","); 

                                // Get Key 
                                String key = tokens[0]; 

                                // Get Integer Value 
                                String integerValue = tokens[1]; 
                                System.out.println("Trying to Parse=" + integerValue); 
                                Integer value = Integer.parseInt(integerValue); 

                                // Build Tuple
                                return new Tuple2<String,Integer>(key, value); 
                        } 

                } 

共有1个答案

苏涛
2023-03-14

您可以将NumberMapper更改为FlatMap并过滤掉无效元素:

java prettyprint-override">private static class NumberMapper implements FlatMapFunction<String, Tuple2<String, Integer>> { 
        public void flatMap(String input, Collector<Tuple2<String, Integer>> collector) throws Exception { 
                Optional<Tuple2<String, Integer>> optionalResult = parseData(input); 

                optionalResult.ifPresent(collector::collect);
        } 

        private Optional<Tuple2<String, Integer>> parseData(String record) { 

                String[] tokens = record.toLowerCase().split(","); 

                // Get Key 
                String key = tokens[0]; 

                // Get Integer Value 
                String integerValue = tokens[1]; 

                try {
                        Integer value = Integer.parseInt(integerValue); 
                        // Build Tuple
                        return Optional.of(Tuple2.of(key, value)); 
                } catch (NumberFormatException e) {
                        return Optional.empty();
                }
        } 
} 
 类似资料:
  • 我试图在WebLogic10r3服务器上调试一个web应用程序主机。该应用程序通过Java消息驱动bean接收来自外部IBM JMS队列(classname:)的输入。 我写了一个小的测试应用程序来连接到队列并发送测试消息。目前的问题是测试消息会生成异常,并且不知何故它会被放回队列中,并一次又一次地循环。这会产生大量异常,使日志不可读。 代码概述了JMS生产者:

  • 我们在kafka中使用Ktabke进行聚合,它非常基本的用途,并参考了kafka文件。 使用Kafka的Streams API处理坏消息的KStream参考 我的用例非常简单,对于任何类型的异常,只需移到错误主题并移到不同的消息

  • 我已将flinkkafkaconsumer作为源添加到我的streamexecutionenvironment中。我想在特定时间内没有收到新消息时关闭/阻止flink使用数据(类似于kafka polltime)。目前它正在无限期运行,并阻止执行移动到下一步(验证消息)。请建议是否有任何解决方法。 注意:我从反序列化中尝试了endofstream,但它无法工作,因为流实际上是不确定的。 提前谢谢。

  • 我试图阅读和打印从Kafka使用Apache Flink的原型消息。 我遵循官方文件,但没有成功:https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/third_party_serializers/ Flink消费者代码是: 反序列化器代码是:

  • Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka

  • Kafka过去在我自己的电脑上工作得很好。我正在另一台电脑上工作,上面写着 为目录C:\tmp\kafka logs(kafka.server.LogDirFailureChannel)java中的\uu consumer\u offset-41创建日志时出错。木卫一。IOException:映射在sun失败。尼奥。总经理。Kafka地图(FileChannelImpl.java:940)。日志抽