当前位置: 首页 > 知识库问答 >
问题:

Kafka流内部数据管理

范鸿
2023-03-14

在我的公司,我们广泛使用Kafka,但出于容错的原因,我们一直使用关系数据库来存储几个中间转换和聚合的结果。现在我们正在探索Kafka流作为一种更自然的方法来做到这一点。通常,我们的需求很简单--其中一个例子是

  • 监听 ...
  • 的输入队列
  • 对每个记录执行一些高延迟操作(调用远程服务)
  • 如果在处理 时, 都已生成,那么我应该处理V3,因为V2已经过时了

为了实现这一点,我将主题作为ktable阅读。代码如下所示

KStreamBuilder builder = new KStreamBuilder();
KTable<String, String> kTable = builder.table("input-topic");
kTable.toStream().foreach((K,V) -> client.post(V));
return builder;

这是预期的,但我不清楚Kafka是如何自动实现这一点的。我假设Kafka创造了内部主题来实现这一点,但我没有看到任何内部主题被创造出来。Javadoc的方法似乎证实了这一点。但后来我看到了这个官方页面,它似乎表明Kafka使用了一个单独的数据存储库,也就是RocksDB,以及一个changelog主题。

现在我很困惑,因为changelog主题是在什么情况下创建的。我的问题是

    null

共有1个答案

梁丘翔
2023-03-14

>

  • Kafka Streams存储本地状态。默认情况下使用rocksdb。然而,局部状态是短暂的。为了容错,对存储区的所有更新也被写入changelog主题中。这允许在出现故障或伸缩/扩展时重建和/或迁移存储区。对于您的特殊情况,没有创建changelog主题,因为ktable不是聚合的结果,而是直接从主题填充的--这只是一个优化。因为changelog主题将包含与输入主题完全相同的数据,所以Kafka Streams避免了数据重复,并在出现错误时使用输入主题作为changelog主题。

    我不知道你这个问题到底是什么意思。注意,RocksDB被认为是一个短暂的存储。默认情况下使用RocksDB,原因如下:为什么Apache Kafka Streams使用RocksDB,以及如何更改它?(例如,它允许保存大于主存的状态,因为它可能溢出到磁盘)。您可以用任何其他存储替换RocksDB。Kafka Streams还附带内存存储。(编辑)

    >

  • 这是正确的。您需要相应地提供应用程序,以便能够存储整体状态的局部碎片。这里有一个大小调整指南:https://docs.confluent.io/current/streams/sizing.html

    1. 如果启用日志压缩,则忽略保留时间设置。因此,是的,最新的更新将永远保持(或者直到写入value=null的墓碑消息)。注意,当在代理端执行压缩时,旧数据将被垃圾收集,因此,在还原时,只读取每个键的新版本--旧版本在压缩过程中被删除。如果您在一段时间后对某些数据不感兴趣,则需要在源主题中写入一个墓碑以使其工作。

  •  类似资料:
    • 有人能帮我理解在以下情况下会发生什么吗: 来自主题A的流在其上执行了一些不同的操作,这些操作会导致生成多个内部Kafka主题,例如:KSTREAM-REDUCE-00000000 14 KSTREAM-JOIN-0000000 358等。 它们在拓扑中显示为“消费者组名生成的名称” 话题A加入话题B。。。B必须重新输入才能与A加入内部主题“组重新输入B”。 如果我的拓扑结构发生变化,那么除非所有这

    • 我正在尝试设置一个安全的Kafka集群,但在ACL方面遇到了一些困难。 Kafka流的汇流安全指南(https://docs.Confluent.io/current/Streams/developer-guide/security.html)只说明必须将集群创建ACL交给主体...但它没有说任何关于如何实际处理内部话题的内容。 通过研究和实验,我确定(对于Kafka版本1.0.0): 通配符不能

    • 我使用beam SDK用python编写了一个Google数据流管道。有一些文档介绍了我如何在本地运行它,并设置runner标志以在数据流上运行它。 我现在正尝试将其自动部署到CI管道(bitbucket管道,但并不真正相关)。有关于如何“运行”管道的文档,但没有真正的“部署”管道。我测试过的命令如下: 这将运行作业,但因为它正在流式传输,所以永远不会返回。它还在内部管理打包并推送到存储桶。我知道

    • 现在还不清楚你是否能像在《水槽》中那样在Kafka中做一个扇出(复制)。 我想让Kafka将数据保存到HDFS或S3,并将该数据的副本发送到Storm进行实时处理。Storm集合/分析的输出将存储在Cassandra中。我看到一些实现将所有数据从Kafka流到Storm,然后从Storm输出两个。但是,我想消除Storm对原始数据存储的依赖。 这可能吗?您知道任何类似的文档/示例/实现吗? 还有,

    • 在我当前的架构中,多个数据流作业在不同阶段被触发,作为ABC框架的一部分,我需要捕获这些作业的作业id作为数据流管道中的审计指标,并在BigQuery中更新它。 如何使用JAVA从管道中获取数据流作业的运行id?有没有我可以使用的现有方法,或者我是否需要在管道中使用google cloud的客户端库?

    • 我正在遵循入门指南[1],但是我已经从配置设置中删除了MySQL和analytics的内容,因为我不打算使用任何分析函数。但是,scdf服务后来崩溃了,因为没有配置数据源。 好的,所以似乎仍然需要在scdf-config-kafka.yml[2]中配置数据源(尽管从阅读文档来看,我认为它只用于分析内容)。 但为了什么?数据源用于持久化Kafka消息,还是在节点之间建立云流消息? 我找不到任何关于大