当前位置: 首页 > 知识库问答 >
问题:

使用Kafka流将状态持久化到Kafka

濮阳
2023-03-14

我试着把我的头缠绕在Kafka的溪流和一些根本的问题,我似乎无法解决,我自己。我理解ktable和Kafka状态存储的概念,但我很难决定如何实现它。我还在使用Spring Cloud Streams,这在此基础上增加了另一个层次的复杂性。

我的用例:

@Bean
public Function<KStream<String, ProcessNode>, KStream<String, List<IndicatorEvaluation>>> process() {
    return input -> input.mapValues(this::analyze).filter((host, evaluation) -> evaluation != null);
}

public List<IndicatorEvaluation> analyze(final String host, final ProcessNode process) {
    // Does stuff
}

一些有状态规则如下所示:

[some condition] REPEATS 5 TIMES WITHIN 1 MINUTE
[some condition] FOLLOWEDBY [some condition] WITHIN 1 MINUTE
[rule A exists and rule B exists]

我当前的实现将所有这些信息存储在内存中,以便能够执行分析。由于显而易见的原因,它不容易扩展。所以我想我会坚持到Kafka的州立商店。

我不确定做这件事的最好办法。我知道有一种方法可以创建自定义状态存储,从而允许更高级别的灵活性。我不确定KafkaDSL是否支持这一点。

共有1个答案

韦星文
2023-03-14

从您给出的描述来看,我相信这个用例仍然可以在Kafka流中使用DSL来实现。上面显示的代码不跟踪任何状态。在您的拓扑中,您需要通过跟踪规则的计数来添加状态,并将它们存储在状态存储区中。则只需要在计数达到阈值时发送输出规则。下面是这个作为伪代码背后的大致思路。显然,您必须对其进行调整,以满足您的用例的特定规范。

@Bean
public Function<KStream<String, ProcessNode>, KStream<String, List<IndicatorEvaluation>>> process() {
    return input -> input
                     .mapValues(this::analyze)
                     .filter((host, evaluation) -> evaluation != null)
                     ...
                     .groupByKey(...)
                     .windowedBy(TimeWindows.of(Duration.ofHours(1)))
                     .count(Materialized.as("rules"))
                     .filter((key, value) -> value > 4)
                     .toStream()
                    ....
}
 类似资料:
  • 我在试图通过Kafka流实现以下目标时遇到了一些困难: 在应用程序启动时,(压缩的)主题α被加载到键值StateStore中 Kafka流从另一个主题中消费,使用上面的映射(get),并最终在主题alpha中生成一个新记录 结果是,即使拖缆重新启动,内存中的映射也应与底层主题对齐 我的方法如下: 装载机Treamer(store): : ...但是我得到的是: 试图获取存储处理程序时。 你知道如何

  • 我已经阅读了有状态流处理概述,如果理解正确的话,RocksDB被用作键值存储的默认实现的主要原因之一是这样一个事实,即与内存中的集合不同,它可以处理大于可用内存的数据,因为它可以刷新到磁盘。这两种类型的存储都可以在应用程序重新启动时幸存下来,因为数据是作为Kafka主题备份的。 但还有其他不同吗?例如,我注意到我的持久状态存储为每个主题分区创建了一些。log文件,但它们都是空的。 简而言之,我想知

  • 我有一个Kafka主题,它有多个消费者群体。我需要主题上的消息在其持续时间到期时不被删除,如果它们尚未被所有消费者组读取 是否可以在持续时间之外设置其他持久性规则?我需要这些信息始终停留在一个主题上,如果它们从未被使用过 如果邮件未被使用且其持续时间已过期,是否可以“刷新”该邮件的超时?

  • 什么是 Volume Volume 就是在一个或者多个容器里有特殊用途的目录。它绕过了容器内部的文件系统为持久化数据、共享数据提供了下面这些有用的特性: 容器可以通过把数据写在 Volume 上来实现数据持久化 Volume 可以在不同的容器之间共享和重用数据 容器数据的备份、恢复和迁移都可以通过 Volume 实现 通过 Volume 实现多容器共享数据,从而实现应用的横向扩展 在 DaoClo

  • 1. 前言 本节课和大家聊聊持久化对象的 3 种状态。通过本节课程,你将了解到: 持久化对象的 3 种状态; 什么是对象持久化能力。 2. 持久化对象的状态 程序运行期间的数据都是存储在内存中。内存具有临时性。程序结束、计算机挂机…… 内存中的数据将不复存在。 重要的数据,需要使用持久化技术将数据保存到永久性设备上。Hibernate 能够通过 PO(持久化对象) 将数据持久化到数据库。 Hibe

  • 本文向大家介绍vuex实现数据状态持久化,包括了vuex实现数据状态持久化的使用技巧和注意事项,需要的朋友参考一下 用过vuex的肯定会有这样一个痛点,就是刷新以后vuex里面存储的state就会被浏览器释放掉,因为我们的state都是存储在内存中的。 所以我们通过 vuex-persistedstate这个插件,来实现将数据存储到本地 用法很简单 1、 2、 以上这篇vuex实现数据状态持久化就