我有以下处理器bean方法签名:
@Bean
public BiFunction<KStream<String, MyClass>, KStream<String, String>, KStream<String, MyClass>[]> myStream() {
return (inputStream1, intputStream2) -> {
intputStream2
.peek((k, v) -> {
log.debug(...);
});
return inputStream1
.mapValues(...)
.branch((k,v) -> true, (k,v) -> true);
};
}
相关物业:
spring.cloud.stream.function.definition: ...;myStream
spring.cloud.stream.bindings:
myStream-in-0:
destination: inp0
myStream-in-1:
destination: inp1
myStream-out-0:
destination: out0
myStream-out-1:
destination: out1
Spring Cloud Kafka Stream版本Hoxton. SR4(Spring-Cloud-stream-binder-kafka-stream: jar: 3.0.4. RELEASE),嵌入式Kafka版本2.5.0。
我正在使用嵌入式Kafka测试我的拓扑:
@RunWith(SpringRunner.class)
@SpringBootTest(
properties = "spring.cloud.stream.kafka.binder.brokers=${spring.embedded.kafka.brokers}"
)
@EmbeddedKafka(partitions = 1,
topics = {
"inp0", "inp1", "out0", "out1"
},
brokerPropertiesLocation = "kafka.properties"
)
@Slf4j
public class MyApplicationTests {
@Test
public void embeddedKafkaTest() throws IOException, InterruptedException {
Consumer<String, MyClass> out0Consumer = createConsumer("out0ConsumerGroup");
Consumer<String, MyClass> out1Consumer = createConsumer("out1ConsumerGroup");
this.embeddedKafka.consumeFromAnEmbeddedTopic(out0Consumer, "out0");
this.embeddedKafka.consumeFromAnEmbeddedTopic(out1Consumer, "out1");
latch = new CountDownLatch(1);
// ... publish ...
latch.await(15, TimeUnit.SECONDS);
ConsumerRecords<String, MyClass> out0 = KafkaTestUtils.getRecords(out0Consumer);
assertThat(out0.count(), is(greaterThanOrEqualTo(1)));
ConsumerRecords<String, MyClass> out1 = KafkaTestUtils.getRecords(out1Consumer);
assertThat(out1.count(), is(greaterThanOrEqualTo(1)));
}
private <K,V> Consumer<K, V> createConsumer(String groupName) {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(groupName, "true", this.embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<K, V>(consumerProps).createConsumer();
}
我的测试显示,来自myStream
的消息按预期到达并到达主题“out0”中,但“out1”主题仍然为空,单元测试在第二次断言中失败。
我已经尝试了一些东西,但是看起来第二个输出主题的输出根本没有产生(第一个输出主题的输出产生得很好)。
你能看到我的设置有什么错误吗?
还有一件事:myStream bean方法定义中的return语句显示了一个编译器警告:
为varargs参数创建未检查的泛型数组
但是看起来这就是Spring Cloud Kafka Stream 3. x API要求定义返回类型的方式?
您将两个谓词传递给分支
方法,它们的计算结果总是true
。第一个谓词始终获胜,并向第一个输出绑定生成数据。分支方法调用在第一个谓词求值为true后退出。有关更多详细信息,请参阅javadoc。您应该使用不同的谓词(可能会检查键/值的某些条件)。如果第一个谓词失败,第二个谓词成功,那么您将看到生成到第二个输出主题的数据。
关于编译器警告,我认为您可以安全地忽略它,因为API将确保传递到分支
调用的谓词对象具有正确的类型。由于该方法的实现使用泛型varargs,因此会出现该异常。有关编译器警告的详细信息,请参阅此线程。
我正在使用Spring Cloud Stream Kafka Binder。我有以下Kafka活页夹函数。 在yml中,我有: 如果我想从同一个功能向两个不同的主题发送数据,我需要做什么?
在本页中,您不能只进行一次输出 但我只需要使用Spring Cloud Stream Kafka活页夹进行一次输出 那我该怎么办? 一些文章说使用org.springframework.cloud.stream.function.StreamBridge但它不适合我 我让StreamBridge向Kafka发送主题,但Kafka不会为我的Spring Boot应用程序生成主题 这是我的applic
我有一个要求加入3个Kafka主题。前两个主题A和B将使用inner join添加,因为消息键相同,并且生成一个POJO与B相同的新Kafka流。现在,使用这个累积的流,我需要加入另一个主题C,并且我需要根据C中存在的字段对输出进行分组。 到目前为止,我有以下方法: 前两个主题(A和B)的KStream-KStream inner join是否可以不发布任何主题的累积输出,并且仍然可以在下面使用它
Kafka流中是否内置了允许将单个输入流动态连接到多个输出流的功能?允许基于true/false谓词进行分支,但这不是我想要的。我希望每个传入日志都确定它将在运行时流到的主题,例如,日志将流到主题和日志将流到主题。 我可以在流中调用,然后写给Kafka制作人,但这似乎不是很好。在Streams框架中是否有更好的方法来实现这一点?
给定一个将消息发布到两个不同主题的Kafka流拓扑,是否可以保证在这两个分支中执行各个步骤的顺序,或者这些分支是完全分开并并行执行的? 在本例中,是否会在调用< code>mapTwo或向output-topic-two发布消息之前执行< code>mapOne并发布到output-topic-one?换句话说,能否保证在消息发布到output-topic-two之前完成< code>mapOne
问题内容: 我下面有以下代码示例。你可以在其中输入的命令,即回显结果。但是,先读后。其他输出流不起作用? 为什么会这样或我做错了什么?我的最终目标是创建一个线程计划任务,该任务定期执行对/ bash的命令,因此必须一前一后工作,而不能停止工作。我也一直在经历错误的任何想法? 谢谢。 问题答案: 首先,我建议更换生产线 与线 ProcessBuilder是Java 5中的新增功能,它使运行外部进程更