当前位置: 首页 > 知识库问答 >
问题:

Kafka表(Ktable)中是否有基于值检索键的功能?或者是否有任何基于键和值检索数据的方法

蒋典
2023-03-14

我正在做一个关于kafka流和KTable的poc。我想知道是否有任何方法在kafka中存储数据(键-值对或键-对象对),或者通过流、KTable、状态存储,这样我就可以检索基于键和值的数据库。我创建了一个基于topic的kstream,在该kstream上推送了一些消息,并使用wordcountalgo在kstream上创建的ktable中填充了值。类似这样的事情:

StoreBuilder customerStateStore = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("customer-store"),Serdes.String(), customerSerde)
                .withLoggingEnabled(new HashMap<>());
streamsBuilder.stream("customer", Consumed.with(Serdes.String(), customerSerde)).to("customer-to-ktable-topic",Produced.with(Serdes.String(), customerSerde));
KTable<String, Customer> customerKTable = streamsBuilder.table("customer-to-ktable-topic", Consumed.with(Serdes.String(), customerSerde),Materialized.as(customerStateStore.name()));

我无法获取基于值的记录。

https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/kstream/ktable.html在kafka文档中只有get(字符串键)函数可用。然而,我正在探索是否可以通过其他方式实现这一点?

共有1个答案

怀洛华
2023-03-14

您的CustomerStateStore是一个键值存储,如您所述,您只能基于键进行查询。

一个建议是在IN流上工作,以便将值(或值的一部分)用作存储中的键。您可以使用map()方法来完成此操作。这样做的目的可能是:

Original IN msg: key1 - value1

Would generate 2 entries in the store:
    key1 - value1
    value1 - key1 (or whatever depending on your usecase)

这样做,您将能够查询value1上的存储,因为它是一个键。(如果在in主题中,不同键的值相同,请小心。)

但是你必须小心你的输入主题的分区:一个给定的值可能会出现在你的主题的几个分区中,然后出现在你的KS应用程序的不同实例中。

 类似资料:
  • 问题内容: 我记得在Oracle中可以基于函数进行索引,例如。 MySQL支持吗?如果没有,还有其他选择吗? 问题答案: 不,不是一般意义上的,我什至不认为5.6(首次编写此答案时的最新版本)具有此功能。值得注意的是,8.0.13及更高版本现在支持功能索引,使您无需下面所述的触发方法即可实现所需的功能。 有关更多详细信息,请参见https://dev.mysql.com/doc/refman/8.

  • 有一个名为“矩阵”的哈希图,它有很多键。每个键的值都是一个具有自己值的ArrayList。考虑到这一点,我无法找到一种方法来测试ArrayList值中是否有特定值,因为如果我将字符串参数传递给HashMap的方法“.containsValue()”,该方法将找到ArrayList对象,测试将为false。因此,我必须做一些相当疯狂的事情,就像我在例子中所做的那样。正如您所看到的,没有像“getAr

  • 问题内容: 我有两个表,一个叫,一个叫。 这个想法是,客户表包含核心客户数据,并且可以根据应用程序的用途对应用程序进行自定义以支持其他属性。 有以下3列: 我是否可以检索整行,如果指定了其他任何属性,如果没有,则默认为NULL?我正在使用以下查询,但只有两个属性都存在于customer_attributes表中时,它才有效。 在这种情况下,我感兴趣的两个属性称为“ wedding_date”和“

  • 问题内容: 我有 我想按 位置 获得 职位, 而不是关键 职位 。 我不想使用迭代。 还有其他方法可以基于索引获取Value吗? 问题答案: 您无法获得基于索引的值,只是那样行不通。一种解决方法是根据您的值创建一个新列表,然后根据索引获取值。

  • 问题内容: 我敢肯定有一种明显的方法可以做到这一点,但现在还不能想到任何光滑的东西。 基本上不是引发异常,而是要获取或查看pandas索引中是否存在值。 我现在工作的是以下内容 问题答案: 这应该可以解决问题

  • 我想过滤掉列表中所有列的值为零的行。 这是我尝试过的, 这对于小型数据集很好,但是如果col_list很长,则会出现以下错误。 ava.lang.stackoverflowerrorat org.apache.spark.sql.catalyst.analysis.resolvelambdavariables.org$Apache$spark$sql$catalys$analysis$resolv