当前位置: 首页 > 知识库问答 >
问题:

Kafka保留策略未按预期工作

缑桐
2023-03-14

我想为我们拥有的一些用例实现数据回放,为此,我需要使用Kafka保留策略(我使用的是连接,我需要窗口时间才能准确)。附言:我正在使用Kafka版本0.10.1.1

我将数据发送到主题中,如下所示:

 kafkaProducer.send(
                    new ProducerRecord<>(kafkaTopic, 0, (long) r.get("date_time") ,r.get(keyFieldName).toString(), r)
            );

我创建的主题如下:

Kafka主题--创建--zookeeper localhost:2181--复制因子1--分区1--主题myTopic
Kafka主题--zookeeper localhost--alter--主题myTopic--配置保留。ms=17280000Kafka主题--zookeeper localhost--alter--主题myTopic--配置段。ms=17280000

所以通过上面的设置,我应该将我的主题的保留时间设置为48小时。

我扩展TimestampExtractor以记录每条消息的实际时间。

public class ConsumerRecordOrWallclockTimestampExtractor implements TimestampExtractor {
    private static final Logger LOG = LoggerFactory.getLogger(ConsumerRecordOrWallclockTimestampExtractor.class);
    @Override
    public long extract(ConsumerRecord<Object, Object> consumerRecord) {
        LOG.info("TIMESTAMP : " + consumerRecord.timestamp() + " - Human readable : " + new Date(consumerRecord.timestamp()));
        return consumerRecord.timestamp() >= 0.1 ? consumerRecord.timestamp() : System.currentTimeMillis();
    }
}

为了测试,我向我的主题发送了4条消息,我收到了这4条日志消息。

2017-02-28 10:23:39信息消费者记录墙时钟时间提取器:21-时间戳:1488295086292人类可读-2017年2月28日星期二10:18:06 EST 2017
2017-02-28 10:24:01信息消费者记录墙时钟时间提取器:21-时间戳:1483272000000人类可读-2017年1月1日07:00:00 EST
2017-02-28 10:26:11信息消费者记录墙时钟时间提取器:21-时间戳:1485820800000人类可读性-周一2017年1月30日19:00:00 EST 2017-02-28 10:27:22信息消费者记录墙时钟时间提取器:21-时间戳:1488295604411人类可读性-周二2017年2月28日10:26:44 EST

因此,根据Kafka的保留政策,我希望看到我的两条消息在5分钟后被清除/删除(第2条和第3条消息,因为它们是1月1日和1月30日的消息)。但是我试着花了一个小时来阅读我的主题,每次我阅读完我的主题,我就收到了所有的4条信息。

kafka avro控制台消费者——zookeeper本地主机:2181——从一开始——主题myTopic

我的Kafka配置如下:

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

我是做错了什么还是错过了什么?

共有1个答案

督弘化
2023-03-14

Kafka通过删除日志段来实现其保留策略。Kafka从不删除活动段,即它将在其中附加发送到分区的新消息的段。Kafka只删除旧片段。当新消息发送到分区时,Kafka将活动段滚动到旧段,并且

  • 带有新消息的活动段的大小将超过log.segment。字节,或
  • 活动段中第一条消息的时间戳早于log.roll。ms(默认为7天)

因此,在您的示例中,您必须在美国东部标准时间2017年2月2日28 10:18:06后等待7天,发送一条新消息,然后所有4条初始消息都将被删除。

 类似资料:
  • 我有一个Log4j2.xml定义为: 据我所知,当我的日志文件达到1MB时,它应该会滚动到新的日志文件。要滚动的文件数应为30。然而,如果你看看我下面的日志,有超过40个,所有最新的都接近30MB。当前日志文件MLMServices。日志记录了从2016年4月13日至今的条目。事实上,最后几个日志文件是MLMServices-2016-05-24-4。日志,MLMServices-2016-05-

  • 我搞不清这里出了什么问题。如有任何帮助,我们将不胜感激。

  • 我已经配置了log4j2.xml文件,application.log文件将被创建,它应该每天翻转。 但是在JVM中,applicatoin.log文件在10MB之后会翻转,如果翻转三次,第一个文件会被覆盖。也就是说我随时都application.logapplication-2020-10-16.log.zip. 为什么log4j2(v2.13)即使配置为每日,也会每10MB滚动一次文件?任何在l

  • 假设我有一个多代理(运行在同一主机上)的Kafka设置,其中有3个代理和50个主题,每个主题配置为有7个分区和3个复制因子。 我有50GB的内存要用于kafka,并确保kafka日志永远不会超过这个内存数量,因此我想配置我的保留策略以防止这种情况。 我已设置删除清理策略: 我应该如何配置上述参数,以便每7天删除一次数据,并确保如果需要,可以在较短的窗口中删除数据,这样我就不会耗尽内存?

  • 本文向大家介绍kafka 有几种数据保留的策略?相关面试题,主要包含被问及kafka 有几种数据保留的策略?时的应答技巧和注意事项,需要的朋友参考一下 kafka 有两种数据保存策略:按照过期时间保留和按照存储的消息大小保留。

  • 我正在使用spring Roo并希望访问Controller类中的一个bean,该类在ApplicationContext.xml中具有以下配置: 配置类本身是: 在我的Controller中,我认为一个简单的Autowired注释应该可以完成这项工作 在启动过程中,spring在setSkipWeeks方法中打印消息。不幸的是,每当我在控制器中调用config.getSkipWeeks()时,它