当前位置: 首页 > 知识库问答 >
问题:

使用Spring KafkaStreams组合来自两个主题的数据

葛威
2023-03-14

我正在研究一个解决方案,我必须将Kafka两个主题t1和t2的数据结合起来
t1将包含消息的前半部分,t2将包含消息的后半部分
例如,如果完整消息是“a1b1c1d1”和“a2b2c2d2”,那么
t1将有“a1b1”和“a2b2”
t2将有“c1d1”和“c2d2”
并且我必须对它们执行并集以生成“a1b1c1d1”和“a2b2c2d2”
,因为消息不会按顺序存储在KStream store中,并在它们变为有空<现在的问题是,这是一个好的解决方案吗?如果是,有一个例子。

@Component
public class MyViews {

    @Autowired
    public void buildProductView(StreamsBuilder sb) {
        sb.stream("products", Consumed.with(Serdes.Integer(), Serdes.serdeFrom(Product.class)))
                .map((k, v) -> new KeyValue<>(v.getId(), v))
                .toTable(Materialized.<Integer, Product, KeyValueStore<Bytes, byte[]>>as("products-view")
                        .withKeySerde(Serdes.Integer())
                        .withValueSerde(Serdes.serdeFrom(Product.class)));
    }

    @Autowired
    public void buildPriceView(StreamsBuilder sb) {
        sb.stream("prices", Consumed.with(Serdes.Integer(), Serdes.serdeFrom(Price.class)))
                .map((k, v) -> new KeyValue<>(v.getId(), v))
                .toTable(Materialized.<Integer, Price, KeyValueStore<Bytes, byte[]>>as("prices-view")
                        .withKeySerde(Serdes.Integer())
                        .withValueSerde(Serdes.serdeFrom(Price.class)));
    }

    @Data
    private class Product {
        Integer id;
        String item;
    }

    @Data
    private class Price {
        Integer id;
        Integer price;
    }

    @Data
    private class Order{
        Integer id;
        String item;
        Integer price;
    }
}

这就是我想要实现的
每当html" target="_blank">消息发布到“订单”主题时,我想检查“价格视图”中是否存在相应的ID,如果存在,那么我将组合它们生成“订单”对象。
类似地,当消息发布到“价格”主题时,我想检查“产品视图”

共有1个答案

袁单鹗
2023-03-14
@Component
public class MyViews {

    @Autowired
    public void buildProductView(StreamsBuilder sb) {
        final Serde<Product> productSerde = Serdes.serdeFrom(new JsonSerializer<>(), new ProductDeserializer());
        final Serde<Price> valueSerde = Serdes.serdeFrom(new JsonSerializer<>(), new PriceDeserializer());

        final KTable<Integer, Product> products = sb.stream("products", Consumed.with(Serdes.Integer(), productSerde)).selectKey((k, v) -> v.getId())
                .toTable(Materialized.<Integer, Product, KeyValueStore<Bytes, byte[]>>as("products-view")
                        .withKeySerde(Serdes.Integer())
                        .withValueSerde(productSerde)
                        .withCachingDisabled());

        final KTable<Integer, Price> prices = sb.stream("prices", Consumed.with(Serdes.Integer(), valueSerde))
                .map((k, v) -> new KeyValue<>(v.getId(), v))
                .toTable(Materialized.<Integer, Price, KeyValueStore<Bytes, byte[]>>as("prices-view")
                        .withKeySerde(Serdes.Integer())
                        .withValueSerde(valueSerde)
                        .withCachingDisabled());

        final KTable<Integer, Order> join = products.join(prices, Product::getId, (l, r) ->
                new Order(l.getId(), l.getItem(), r.getPrice())
        );

        join.toStream().foreach((x, y) -> System.out.println(y));

    }
    

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Product {
        Integer id;
        String item;
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Price {
        Integer id;
        Integer price;
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    @ToString
    public class Order {
        Integer id;
        String item;
        Integer price;
    }
}
 类似资料:
  • 我有两个jms消费者,每个都在不同的流中。我想使用另一个流来聚合这两条消息的消息。并且还需要保留相关ID,因为我需要分割有效负载并发送回消息。 我尝试将分散收集与两个入站 VM 一起使用,但收到以下错误: 由以下原因导致:组织.xml.sax.SAXParse 异常:cvc-complex-type.2.4.a:发现以元素“vm:入站终结点”开头的无效内容。“{”http://www.muleso

  • 当我在SBT之上运行时,我会得到一些异常/错误:

  • 我试图使用来自另一组主机[etcd]的事实来配置一组主机[节点]。这是我的主机文件 请注意,组[etcd]不是供应的目标,[nodes]是。但是提供[节点]需要了解[etcd]的事实。 当我运行这个playbook时,我得到控制台输出 使etcd事实对节点播放可用的惯用Ansible方法是什么?

  • 问题内容: 我试图在尝试对6参数函数进行任何复杂操作之前,遍历6参数函数的参数空间,以研究其数值行为,因此,我正在寻找一种有效的方法来执行此操作。 我的函数将6-dim numpy数组中给出的浮点值作为输入。我最初尝试做的是: 首先,我创建了一个函数,该函数接受2个数组并生成一个包含两个数组中值的所有组合的数组: 然后,我曾经将其应用于同一数组的m个副本: 最后,我这样评估我的功能: 这工作,但它

  • 问题内容: 我有两个像这样的数组: 我想结合这两个数组,使其不包含重复项,并保留其原始键。例如,输出应为: 我已经尝试过了,但是它正在更改其原始键: 有什么办法吗? 问题答案: 只需使用: 那应该解决。因为如果一个键出现多次(例如在您的示例中),则使用字符串键,因此一个键将覆盖具有相同名称的处理键。因为在您的情况下,它们两者都具有相同的值,但这无关紧要,并且还会删除重复项。 更新:我刚刚意识到,P

  • 寻找一种优雅的方式以特殊的方式合并两个散列数组: 如果名称关键字匹配,则结果必须包含< code>new_data的所有名称散列,仅包含< code>old_data的额外数据。 我的第一次尝试是这样的,但是它创建了一个额外的散列: