当前位置: 首页 > 知识库问答 >
问题:

Spring云流Kafka Binder的一个输入主题多个输出主题

唐昊焜
2023-03-14

我正在使用Spring Cloud Stream Kafka Binder。我有以下Kafka活页夹函数。

public Function<T1,T2> f() {
  return d ->{}
}

在yml中,我有:

spring:
  cloud:
   steam:
    function:
      definition:f;
    bindings:
      f-in-0:
       destination: input-topic
      f-out-0:
       destination: output-topic

如果我想从同一个功能向两个不同的主题发送数据,我需要做什么?

共有2个答案

韩喜
2023-03-14

不要使用函数,使用消费者

https://docs.spring.io/spring-cloud-stream/docs/3.2.2/reference/html/spring-cloud-stream.html#_sending_arbitrary_data_to_an_output_e_g_foreign_event_driven_sources

窦弘义
2023-03-14

您可以通过命令分隔目标值

destination: output-topic,blah,foo
 类似资料:
  • 我有以下处理器bean方法签名: 相关物业: Spring Cloud Kafka Stream版本Hoxton. SR4(Spring-Cloud-stream-binder-kafka-stream: jar: 3.0.4. RELEASE),嵌入式Kafka版本2.5.0。 我正在使用嵌入式Kafka测试我的拓扑: 我的测试显示,来自的消息按预期到达并到达主题“out0”中,但“out1”主

  • 问题内容: 我有一个logstash输入设置为 我需要将主题提供给Elasticsearch中的两个不同的索引。任何人都可以帮助我如何为此类任务设置输出。目前,我只能设置 我需要在同一elasticsearch例如两个指标说和,这将在未来对信息供给和 问题答案: 首先,您需要添加到输入中才能知道消息来自哪个主题 然后,您有两个选择,都涉及条件逻辑。首先是通过引入一个过滤器来根据主题名称添加正确的索

  • 我有一个要求加入3个Kafka主题。前两个主题A和B将使用inner join添加,因为消息键相同,并且生成一个POJO与B相同的新Kafka流。现在,使用这个累积的流,我需要加入另一个主题C,并且我需要根据C中存在的字段对输出进行分组。 到目前为止,我有以下方法: 前两个主题(A和B)的KStream-KStream inner join是否可以不发布任何主题的累积输出,并且仍然可以在下面使用它

  • 给定一个将消息发布到两个不同主题的Kafka流拓扑,是否可以保证在这两个分支中执行各个步骤的顺序,或者这些分支是完全分开并并行执行的? 在本例中,是否会在调用< code>mapTwo或向output-topic-two发布消息之前执行< code>mapOne并发布到output-topic-one?换句话说,能否保证在消息发布到output-topic-two之前完成< code>mapOne

  • Kafka流中是否内置了允许将单个输入流动态连接到多个输出流的功能?允许基于true/false谓词进行分支,但这不是我想要的。我希望每个传入日志都确定它将在运行时流到的主题,例如,日志将流到主题和日志将流到主题。 我可以在流中调用,然后写给Kafka制作人,但这似乎不是很好。在Streams框架中是否有更好的方法来实现这一点?

  • 我从教程中创建了示例Kafka Streams应用程序: 不幸的是,这个应用程序不读取输入流。我有一个来自PostgreSQL的JDBC源连接器,它正在处理来自一个数据库的精细流数据(我可以在本主题中的Kafka Connect UI数据上看到)。 我的问题是,即使我在BOOTSTRAP\u SERVERS\u CONFIG的Properties IP is localhost中更改了IP,我也不