当前位置: 首页 > 知识库问答 >
问题:

Spring的云流Kafka流活页夹3。x:在多个输出绑定的情况下,没有输出到第二个输出主题

杜良骏
2023-03-14

我有以下处理器bean方法签名:

@Bean
public BiFunction<KStream<String, MyClass>, KStream<String, String>, KStream<String, MyClass>[]> myStream() {
    return (inputStream1, intputStream2) -> {

        intputStream2
            .peek((k, v) -> {
                log.debug(...);
            });

        return inputStream1
            .mapValues(...)
            .branch((k,v) -> true, (k,v) -> true);

    };
}

相关物业:

spring.cloud.stream.function.definition: ...;myStream
spring.cloud.stream.bindings:
  myStream-in-0:
    destination: inp0
  myStream-in-1:
    destination: inp1
  myStream-out-0:
    destination: out0
  myStream-out-1:
    destination: out1

Spring Cloud Kafka Stream版本Hoxton. SR4(Spring-Cloud-stream-binder-kafka-stream: jar: 3.0.4. RELEASE),嵌入式Kafka版本2.5.0。

我正在使用嵌入式Kafka测试我的拓扑:

@RunWith(SpringRunner.class)
@SpringBootTest(
    properties = "spring.cloud.stream.kafka.binder.brokers=${spring.embedded.kafka.brokers}"
)
@EmbeddedKafka(partitions = 1,
        topics = {
            "inp0", "inp1", "out0", "out1"
        },
        brokerPropertiesLocation = "kafka.properties"
)
@Slf4j
public class MyApplicationTests {

    @Test
    public void embeddedKafkaTest() throws IOException, InterruptedException {
        Consumer<String, MyClass> out0Consumer = createConsumer("out0ConsumerGroup");
        Consumer<String, MyClass> out1Consumer = createConsumer("out1ConsumerGroup");

        this.embeddedKafka.consumeFromAnEmbeddedTopic(out0Consumer, "out0");
        this.embeddedKafka.consumeFromAnEmbeddedTopic(out1Consumer, "out1");

        latch = new CountDownLatch(1);
        // ... publish ...
        latch.await(15, TimeUnit.SECONDS);

        ConsumerRecords<String, MyClass> out0 = KafkaTestUtils.getRecords(out0Consumer);
        assertThat(out0.count(), is(greaterThanOrEqualTo(1)));

        ConsumerRecords<String, MyClass> out1 = KafkaTestUtils.getRecords(out1Consumer);
        assertThat(out1.count(), is(greaterThanOrEqualTo(1)));

    }

private <K,V> Consumer<K, V> createConsumer(String groupName) {
    Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(groupName, "true", this.embeddedKafka);
    consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    return new DefaultKafkaConsumerFactory<K, V>(consumerProps).createConsumer();
}

我的测试显示,来自myStream的消息按预期到达并到达主题“out0”中,但“out1”主题仍然为空,单元测试在第二次断言中失败。

我已经尝试了一些东西,但是看起来第二个输出主题的输出根本没有产生(第一个输出主题的输出产生得很好)。

你能看到我的设置有什么错误吗?

还有一件事:myStream bean方法定义中的return语句显示了一个编译器警告:

为varargs参数创建未检查的泛型数组

但是看起来这就是Spring Cloud Kafka Stream 3. x API要求定义返回类型的方式?

共有1个答案

陶琦
2023-03-14

您将两个谓词传递给分支方法,它们的计算结果总是true。第一个谓词始终获胜,并向第一个输出绑定生成数据。分支方法调用在第一个谓词求值为true后退出。有关更多详细信息,请参阅javadoc。您应该使用不同的谓词(可能会检查键/值的某些条件)。如果第一个谓词失败,第二个谓词成功,那么您将看到生成到第二个输出主题的数据。

关于编译器警告,我认为您可以安全地忽略它,因为API将确保传递到分支调用的谓词对象具有正确的类型。由于该方法的实现使用泛型varargs,因此会出现该异常。有关编译器警告的详细信息,请参阅此线程

 类似资料:
  • 我正在使用Spring Cloud Stream Kafka Binder。我有以下Kafka活页夹函数。 在yml中,我有: 如果我想从同一个功能向两个不同的主题发送数据,我需要做什么?

  • 在本页中,您不能只进行一次输出 但我只需要使用Spring Cloud Stream Kafka活页夹进行一次输出 那我该怎么办? 一些文章说使用org.springframework.cloud.stream.function.StreamBridge但它不适合我 我让StreamBridge向Kafka发送主题,但Kafka不会为我的Spring Boot应用程序生成主题 这是我的applic

  • 我有一个要求加入3个Kafka主题。前两个主题A和B将使用inner join添加,因为消息键相同,并且生成一个POJO与B相同的新Kafka流。现在,使用这个累积的流,我需要加入另一个主题C,并且我需要根据C中存在的字段对输出进行分组。 到目前为止,我有以下方法: 前两个主题(A和B)的KStream-KStream inner join是否可以不发布任何主题的累积输出,并且仍然可以在下面使用它

  • Kafka流中是否内置了允许将单个输入流动态连接到多个输出流的功能?允许基于true/false谓词进行分支,但这不是我想要的。我希望每个传入日志都确定它将在运行时流到的主题,例如,日志将流到主题和日志将流到主题。 我可以在流中调用,然后写给Kafka制作人,但这似乎不是很好。在Streams框架中是否有更好的方法来实现这一点?

  • 给定一个将消息发布到两个不同主题的Kafka流拓扑,是否可以保证在这两个分支中执行各个步骤的顺序,或者这些分支是完全分开并并行执行的? 在本例中,是否会在调用< code>mapTwo或向output-topic-two发布消息之前执行< code>mapOne并发布到output-topic-one?换句话说,能否保证在消息发布到output-topic-two之前完成< code>mapOne

  • 问题内容: 我下面有以下代码示例。你可以在其中输入的命令,即回显结果。但是,先读后。其他输出流不起作用? 为什么会这样或我做错了什么?我的最终目标是创建一个线程计划任务,该任务定期执行对/ bash的命令,因此必须一前一后工作,而不能停止工作。我也一直在经历错误的任何想法? 谢谢。 问题答案: 首先,我建议更换生产线 与线 ProcessBuilder是Java 5中的新增功能,它使运行外部进程更