当前位置: 首页 > 知识库问答 >
问题:

具有不同键/值的Spring Cloud Stream中的多个输出绑定(又名分支)

卫逸春
2023-03-14

我知道Kafka Streams允许基于指定的谓词将数据分发到多个主题,并且Kafka Streams绑定器使用@StreamListenerFunctional Binding方法支持这一点。

...
// return type KStream<?, WordCount>[]

Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
Predicate<Object, WordCount> isFrench =  (k, v) -> v.word.equals("french");
Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");

return input.
    ... 
    branch(isEnglish, isFrench, isSpanish);

我想知道在返回数据之前如何转换其中一个分支的键或值。假设我希望其中一个分支具有不同于其他分支的密钥类型。

Predicate<Key, Value> isOne = (k, v) -> v.important.equals("1");
Predicate<Key, Value> isTwo =  (k, v) -> v.important.equals("2");

KStream<Key, Value>[] branches = input.branch(isOne, isTwo);

KStream<String, Value> one = branches[0].selectKey((k, v) -> v.importantValue);

我想用这两个流创建一个新的kstream<?,value>[]数组,但由于泛型数组创建错误而无法创建。

我知道这是可能的,从下面的文档摘录中可以看出,可以为每个分支的生产者指定不同的键/值serdes。

spring.cloud.stream.kafka.streams.bindings.output1.producer.valueSerde=IntegerSerde
spring.cloud.stream.kafka.streams.bindings.output2.producer.valueSerde=StringSerde
spring.cloud.stream.kafka.streams.bindings.output3.producer.valueSerde=JsonSerde

感谢你的帮助.

共有1个答案

丁翰海
2023-03-14

一个选择是创建一个边主题。然后,发送到该副主题的与wordcount不同的每条记录。仍然是wordcount的记录保留在分支的主题中。

我基于使用KStream的Spring cloud streams示例创建了这个代码示例。它不起作用是因为这个想法起作用了。我有一个模拟示例,它使用不同的order对象,并向错误主题发送错误的order

return input -> {
            KStream<?, WordCount> intermediateStream = input
                    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                    .groupBy((key, value) -> value)
                    .windowedBy(TimeWindows.of(Duration.ofSeconds(6)))
                    .count(Materialized.as("WordCounts-1"))
                    .toStream()
                    .map((key, value) -> new KeyValue<>(null,
                            new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));

            // here we return to the SIDE_TOPIC records with JsonSerde
            intermediateStream
                    .filter((k, v) -> `create another filter`)
                    .map((k, v) -> `transform only this stream`)
                    .to(SIDE_TOPIC, Produced.with(CustomSerdes.String(), new JsonSerde(....)));

            // here we keep using the branch serializer.
            intermediateStream.branch(isEnglish, isFrench, isSpanish);
        }

当您使用带有错误或空值的数据时,此用例是一种常见的方法,您希望将这些数据发送到一个附带主题,即:错误主题。然后您仍然可以保存那些事件以便将来分析它们。

 类似资料:
  • 我今天面试了,我的面试官问我如何在HashMap中存储具有相同键的多个值?她给了我这个例子—— 我在如何使用HashMap时给了她以下解决方案: 整数是字符串的长度,ArrayList将存储该特定长度的字符串。 面试官说这是使用HashMap的一种方式,但还有另一种方式我不需要ArrayList或任何其他数据结构。在面试期间,我无法想出任何解决方案,现在在谷歌搜索了足够多之后,我仍然一无所获。有人

  • 嗨,我写了一个mapreduce作业,它一般解析XML文件。我能够解析XML文件并正确生成所有键值对。我有6个不同的键和相应的值。所以我并行运行6个不同的减速器。 现在我面临的问题是,reducer将两个不同的键 - 值对放在同一个文件中,其余4个键值放在单个文件中。因此,简而言之,在化简器输出的6个文件中,我得到了4个具有单键值对的文件和1个具有两个键值对的文件以及1个没有任何内容的文件。 我尝

  • 问题内容: 我们是否可以用一个键和两个值实现HashMap。就像HashMap一样? 还请告诉我(如果没有办法)通过其他任何方法来实现三个值的存储(以一个为键)的方法,对我有帮助吗? 问题答案: 你可以: 使用具有列表作为值的地图。 创建一个新的包装器类,并将该包装器的实例放置在地图中。。 使用类似类的元组(节省创建许多包装器)。。 并排使用多个地图。 例子 1.使用列表作为值进行映射 这种方法的

  • 问题内容: 是否可以获取多个具有相同名称的输入,然后从PHP访问它们?想法是这样的:我有一个表格,可以输入不确定数量的物理地址以及其他信息。如果我只是在几个条目中为每个字段赋予相同的名称,然后通过post提交该数据,PHP是否可以访问它? 举例来说,我在一个名为“ xyz”的页面上有五个输入,我想使用PHP访问它们。我可以做些什么: 如果是这样,那将使我的生活变得轻松十倍,因为我可以通过一个表单发

  • 问题内容: 我有一个这样的架构层次结构: 一个文件夹中的所有文件都具有相同的命名空间。 现在,我想将名称空间映射到特定的Java包(我不能更改名称空间)。 我找到了将架构绑定到程序包的解决方案。但是然后我必须为每个xsd文件创建一个条目: 有没有一种方法可以直接定义名称空间和程序包名称之间的绑定? 另一种方法是在maven中定义包: 但是然后我必须为每个文件夹创建一个执行,这并不是我真正想要的。

  • 问题内容: 我是Java图形和线程的新手,我正在尝试制作一个游戏(特别是Pong)。这个想法是两个人可以在同一个键盘上玩(即,有两个通过不同键控制的拨盘)。目前,两个玩家都无法同时移动其球拍。 有针对这个的解决方法吗?单独的线程是答案吗? 如果可能的话,我希望这些拨片能够同时(至少在表面上)移动。 更新:似乎使用存储按下的键是最好的选择。我已经做到了(并且可以正常工作),但是我想知道是否其中的任何