我正在使用Spring Cloud Stream Kafka Binder。我有以下Kafka活页夹函数。
public Function<T1,T2> f() {
return d ->{}
}
在yml中,我有:
spring:
cloud:
steam:
function:
definition:f;
bindings:
f-in-0:
destination: input-topic
f-out-0:
destination: output-topic
如果我想从同一个功能向两个不同的主题发送数据,我需要做什么?
不要使用函数,使用消费者
https://docs.spring.io/spring-cloud-stream/docs/3.2.2/reference/html/spring-cloud-stream.html#_sending_arbitrary_data_to_an_output_e_g_foreign_event_driven_sources
您可以通过命令分隔目标值
destination: output-topic,blah,foo
我有以下处理器bean方法签名: 相关物业: Spring Cloud Kafka Stream版本Hoxton. SR4(Spring-Cloud-stream-binder-kafka-stream: jar: 3.0.4. RELEASE),嵌入式Kafka版本2.5.0。 我正在使用嵌入式Kafka测试我的拓扑: 我的测试显示,来自的消息按预期到达并到达主题“out0”中,但“out1”主
问题内容: 我有一个logstash输入设置为 我需要将主题提供给Elasticsearch中的两个不同的索引。任何人都可以帮助我如何为此类任务设置输出。目前,我只能设置 我需要在同一elasticsearch例如两个指标说和,这将在未来对信息供给和 问题答案: 首先,您需要添加到输入中才能知道消息来自哪个主题 然后,您有两个选择,都涉及条件逻辑。首先是通过引入一个过滤器来根据主题名称添加正确的索
我有一个要求加入3个Kafka主题。前两个主题A和B将使用inner join添加,因为消息键相同,并且生成一个POJO与B相同的新Kafka流。现在,使用这个累积的流,我需要加入另一个主题C,并且我需要根据C中存在的字段对输出进行分组。 到目前为止,我有以下方法: 前两个主题(A和B)的KStream-KStream inner join是否可以不发布任何主题的累积输出,并且仍然可以在下面使用它
给定一个将消息发布到两个不同主题的Kafka流拓扑,是否可以保证在这两个分支中执行各个步骤的顺序,或者这些分支是完全分开并并行执行的? 在本例中,是否会在调用< code>mapTwo或向output-topic-two发布消息之前执行< code>mapOne并发布到output-topic-one?换句话说,能否保证在消息发布到output-topic-two之前完成< code>mapOne
Kafka流中是否内置了允许将单个输入流动态连接到多个输出流的功能?允许基于true/false谓词进行分支,但这不是我想要的。我希望每个传入日志都确定它将在运行时流到的主题,例如,日志将流到主题和日志将流到主题。 我可以在流中调用,然后写给Kafka制作人,但这似乎不是很好。在Streams框架中是否有更好的方法来实现这一点?
我从教程中创建了示例Kafka Streams应用程序: 不幸的是,这个应用程序不读取输入流。我有一个来自PostgreSQL的JDBC源连接器,它正在处理来自一个数据库的精细流数据(我可以在本主题中的Kafka Connect UI数据上看到)。 我的问题是,即使我在BOOTSTRAP\u SERVERS\u CONFIG的Properties IP is localhost中更改了IP,我也不