我第一次试着让它工作,所以请容忍我。我正在尝试学习Kafka的检查点设置和处理“错误”消息,在不丢失状态的情况下重新启动。
用例:使用检查点。从Kafka那里读取一个整数流,保持一个连续的和。如果读到“坏”Kafka消息,请重新启动应用程序,跳过“坏”消息,保持状态。我的流看起来像这样:
set1,5
set1,7
set1,foobar
set1,6
我希望我的应用程序保留它看到的整数的运行总和,如果它崩溃而不丢失状态,则重新启动,因此应用程序行为/运行总和将是:
5、
12、
应用程序崩溃并重新启动、读取检查点
18等。
但是,我发现当我的应用程序重新启动时,它一直在读取错误的“fobar”消息并且无法通过它。下面的源代码。当我尝试将“fobar”解析为整数时,映射器会爆炸。如何修改应用程序以绕过“毒药”消息?
env.enableCheckpointing(1000L);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L);
env.getCheckpointConfig().setCheckpointTimeout(10000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.setStateBackend(new
FsStateBackend("hdfs://mymachine:9000/flink/checkpoints"));
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", BROKERS);
properties.setProperty("zookeeper.connect", ZOOKEEPER_HOST);
properties.setProperty("group.id", "consumerGroup1");
FlinkKafkaConsumer08 kafkaConsumer = new FlinkKafkaConsumer08<>(topicName,
new SimpleStringSchema(), properties);
DataStream<String> messageStream = env.addSource(kafkaConsumer);
DataStream<Tuple2<String,Integer>> sums = messageStream
.map(new NumberMapper())
.keyBy(0)
.sum(1);
sums.print();
private static class NumberMapper implements
MapFunction<String,Tuple2<String,Integer>> {
public Tuple2<String,Integer> map(String input) throws Exception {
return parseData(input);
}
private Tuple2<String,Integer> parseData(String record) {
String[] tokens = record.toLowerCase().split(",");
// Get Key
String key = tokens[0];
// Get Integer Value
String integerValue = tokens[1];
System.out.println("Trying to Parse=" + integerValue);
Integer value = Integer.parseInt(integerValue);
// Build Tuple
return new Tuple2<String,Integer>(key, value);
}
}
您可以将NumberMapper
更改为FlatMap
并过滤掉无效元素:
java prettyprint-override">private static class NumberMapper implements FlatMapFunction<String, Tuple2<String, Integer>> {
public void flatMap(String input, Collector<Tuple2<String, Integer>> collector) throws Exception {
Optional<Tuple2<String, Integer>> optionalResult = parseData(input);
optionalResult.ifPresent(collector::collect);
}
private Optional<Tuple2<String, Integer>> parseData(String record) {
String[] tokens = record.toLowerCase().split(",");
// Get Key
String key = tokens[0];
// Get Integer Value
String integerValue = tokens[1];
try {
Integer value = Integer.parseInt(integerValue);
// Build Tuple
return Optional.of(Tuple2.of(key, value));
} catch (NumberFormatException e) {
return Optional.empty();
}
}
}
我试图在WebLogic10r3服务器上调试一个web应用程序主机。该应用程序通过Java消息驱动bean接收来自外部IBM JMS队列(classname:)的输入。 我写了一个小的测试应用程序来连接到队列并发送测试消息。目前的问题是测试消息会生成异常,并且不知何故它会被放回队列中,并一次又一次地循环。这会产生大量异常,使日志不可读。 代码概述了JMS生产者:
我们在kafka中使用Ktabke进行聚合,它非常基本的用途,并参考了kafka文件。 使用Kafka的Streams API处理坏消息的KStream参考 我的用例非常简单,对于任何类型的异常,只需移到错误主题并移到不同的消息
我已将flinkkafkaconsumer作为源添加到我的streamexecutionenvironment中。我想在特定时间内没有收到新消息时关闭/阻止flink使用数据(类似于kafka polltime)。目前它正在无限期运行,并阻止执行移动到下一步(验证消息)。请建议是否有任何解决方法。 注意:我从反序列化中尝试了endofstream,但它无法工作,因为流实际上是不确定的。 提前谢谢。
我试图阅读和打印从Kafka使用Apache Flink的原型消息。 我遵循官方文件,但没有成功:https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/third_party_serializers/ Flink消费者代码是: 反序列化器代码是:
Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka
Kafka过去在我自己的电脑上工作得很好。我正在另一台电脑上工作,上面写着 为目录C:\tmp\kafka logs(kafka.server.LogDirFailureChannel)java中的\uu consumer\u offset-41创建日志时出错。木卫一。IOException:映射在sun失败。尼奥。总经理。Kafka地图(FileChannelImpl.java:940)。日志抽