在启动应用程序时,Kafka流出现了奇怪的错误
java.lang.IllegalArgumentException: Illegal base64 character 7b
at java.base/java.util.Base64$Decoder.decode0(Base64.java:743)
at java.base/java.util.Base64$Decoder.decode(Base64.java:535)
at java.base/java.util.Base64$Decoder.decode(Base64.java:558)
at org.apache.kafka.streams.processor.internals.StreamTask.decodeTimestamp(StreamTask.java:985)
at org.apache.kafka.streams.processor.internals.StreamTask.initializeTaskTime(StreamTask.java:303)
at org.apache.kafka.streams.processor.internals.StreamTask.initializeMetadata(StreamTask.java:265)
at org.apache.kafka.streams.processor.internals.AssignedTasks.initializeNewTasks(AssignedTasks.java:71)
at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:385)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
结果,关于失败流的错误:error KafkaStreams-stream client[xxx]所有流线程都已死亡。实例将处于错误状态,应该关闭
根据
org内部的代码。阿帕奇。Kafka。溪流。加工机内部。StreamTask
,由于解码时间戳元数据时出错而发生故障(StreamTask.decodeTimestamp()
)。它发生在prod上,不能在舞台上复制。这些错误的根本原因是什么?
额外信息:我们的应用程序使用Kafka流,并使用相同的
应用程序使用来自多个Kafka代理的消息。id和状态。dir
(实际上,我们从一个代理切换到另一个代理,但在一段时间内,我们连接到两个代理,所以我们有两个Kafka流,每个代理一个)。据我所知,消费者群体生活在代理端(所以不应该是问题),但state dir生活在客户端。可能由于使用了相同的状态而出现了一些竞争条件。两条Kafka流的dir
?这可能是根本原因吗?
我们使用
kafka-stream
v.2.4.0
、kafka-客户端
v.2.4.0
、Kafka Broker v.1.1.1
,具有以下配置:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.timestamp.extractor: org.apache.kafka.streams.processor.WallclockTimestampExtractor
default.deserialization.exception.handler: org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
commit.interval.ms: 5000
num.stream.threads: 1
auto.offset.reset: latest
最后,我们弄清楚了一些消费者组损坏元数据的根本原因。是我们的内部监控工具之一(用pykafka
编写)损坏了暂时不活跃的消费者组的元数据。元数据未被破坏,并且包含以下无效数据:{"consumer_id": "", "host name":"uniting-xxx"}
。为了了解我们在消费者元数据中到底有什么,我们可以使用以下代码:
Map<String, Object> config = Map.of( "group.id", "...", "bootstrap.servers", "...");
String topicName = "...";
Consumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<byte[], byte[]>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
Set<TopicPartition> topicPartitions = kafkaConsumer.partitionsFor(topicName).stream()
.map(partitionInfo -> new TopicPartition(topicName, partitionInfo.partition()))
.collect(Collectors.toSet());
kafkaConsumer.committed(topicPartitions).forEach((key, value) ->
System.out.println("Partition: " + key + " metadata: " + (value != null ? value.metadata() : null)));
修复已损坏的元数据的几个选项:
>
最新
或最早
偏移重置策略,您可能会丢失或重复消息。因此,对于某些情况,此选项可能是不可接受的手动覆盖元数据(时间戳根据StreamTask.decodeTimestamp()
中的逻辑编码):
地图
我正在尝试使用Kafka流来处理Kafka主题中的一些数据。数据来自Kafka0.11.0编写的Kafka主题,该主题没有嵌入时间戳。在网上读了一些书之后,我明白了我可以通过在自定义类中扩展类并将其传递到中来解决这个问题。 我是这样做的- 我基于github上的这段代码 但是,当我运行
我们正在使用使用STREAM_TIME标点符号的自定义转换器。当我记录通过转换函数发送的消息时,来自context.timestamp()的流时间显示如预期的那样——基于使用时间戳提取器派生的数据的合理日期。 现在——在过去的某个时候,我们收到了一些恶意消息,将流时间提前到2036年。我们现在已经阻止了这些上游,重新启动了Kafka河。 当流启动时,标点符号会在受影响任务的启动时运行,但会显示20
我们如何在最初的Eureka注册之前将引导时间戳放入环境中,这样应用程序就不会在没有时间戳的情况下出现在Eureka中?
我在一个Kafka主题“原始数据”中获取CSV,目标是通过在另一个主题“数据”中发送具有正确时间戳(每行不同)的每行来转换它们。 null 我想通过直接设置时间戳来删除这个“内部”主题的使用,但我找不到一个方法(时间戳提取器只在消耗时间使用)。 我在文档中偶然发现了这一行: 请注意,通过调用#forward()时显式地为输出记录分配时间戳,可以在处理器API中更改description默认行为。
我正在使用FluentD(V.12最后一个稳定版本)向Kafka发送消息。但是FluentD使用的是旧的KafkaProducer,因此记录时间戳始终设置为-1。因此,我必须使用WallclockTimestampExtractor将记录的时间戳设置为消息到达Kafka时的时间点。 我真正感兴趣的时间戳是由fluentd在消息中发送的: “时间戳”:“1507885936”,“主机”:“V.X.Y
我正在使用spark结构流发送记录到一个Kafka主题。kafka主题是用config- 这样做使得目标Kafka主题记录具有与原始记录相同的时间戳。 我的Kafka流代码: