当前位置: 首页 > 知识库问答 >
问题:

在带有自定义通道绑定的Spring-Cloud-Stream测试中使用嵌入式Kafka

范哲
2023-03-14

我有一个Spring boot应用程序,我使用spring-Cloud-stream从一个kafka主题中消费,进行一些处理并发布到另一个kafka主题。该应用程序运行良好,我已经编写了运行良好的单元测试(使用TestBinder)。

我现在正试图用嵌入式Kafka编写一个集成测试,并测试端到端的功能。我在这里跟踪了样本https://github.com/spring-cloud/spring-cloud-stream-samples/blob/master/testing-samples/test-embedded-kafka/src/test/java/demo/EmbeddedKafkaApplicationTests.java然而,为了编写测试,这不起作用-我无法收到关于输出主题的任何消息。

应用yml公司

spring:
  cloud:
    stream:
      bindings:
        incoming-message:
          destination: ReadyForProcessing
          content-type: application/json
          group: ReadyForProcessingGroup
        outgoing-message:
          destination: TransactionSettled
          content-type: application/json

变压器Binding.java

public interface TransformerBinding {

    String INCOMING_MESSAGE = "incoming-message";

    String OUTGOING_MESSAGE = "outgoing-message";

    @Input(INCOMING_MESSAGE)
    SubscribableChannel incomingMessage();

    @Output(OUTGOING_MESSAGE)
    MessageChannel outgoingMessage();

}

事件处理器。Java语言

@Service
@EnableBinding(TransformerBinding.class)
@Slf4j
@AllArgsConstructor
public class EventProcessor {

    @Transformer(inputChannel = TransformerBinding.INCOMING_MESSAGE, outputChannel = TransformerBinding.OUTGOING_MESSAGE)
    public TransactionSettledEvent transform(@Payload final ReadyForProcessingEvent readyForProcessingEvent) {
        log.info("Event received in processor: {}", readyForProcessingEvent);


        return TransactionSettledEvent.builder().transactionRef(readyForProcessingEvent.getTransactionRef()).status("Settled").build();
    }

}

事件处理器测试。Java语言

@RunWith(SpringRunner.class)
@SpringBootTest(properties = "spring.autoconfigure.exclude="
        + "org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration")
public class EventProcessorIT {

    private static final String INPUT_TOPIC = "ReadyForProcessing";
    private static final String OUTPUT_TOPIC = "TransactionSettled";
    private static final String CONSUMER_GROUP = "TestConsumerGroup";

    @Autowired
    private ObjectMapper mapper;

    @ClassRule
    public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, INPUT_TOPIC, OUTPUT_TOPIC);

    @BeforeClass
    public static void setup() {
        System.setProperty("spring.cloud.stream.kafka.binder.brokers", embeddedKafka.getEmbeddedKafka().getBrokersAsString());
    }

    @Test
    public void testSendReceive() {
        Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
        senderProps.put("key.serializer", StringSerializer.class);
        senderProps.put("value.serializer", JsonSerializer.class);
        DefaultKafkaProducerFactory<String, ReadyForProcessingEvent> pf = new DefaultKafkaProducerFactory<>(senderProps);
        KafkaTemplate<String, ReadyForProcessingEvent> template = new KafkaTemplate<>(pf, true);
        template.setDefaultTopic(INPUT_TOPIC);
        template.sendDefault(ReadyForProcessingEvent.builder().transactionRef("123456").build());

        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(CONSUMER_GROUP, "false", embeddedKafka.getEmbeddedKafka());
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP);
        consumerProps.put("key.deserializer", StringDeserializer.class);
        consumerProps.put("value.deserializer", JsonDeserializer.class);
        DefaultKafkaConsumerFactory<String, TransactionSettledEvent> cf = new DefaultKafkaConsumerFactory<>(consumerProps);

        Consumer<String, TransactionSettledEvent> consumer = cf.createConsumer();
        consumer.subscribe(Collections.singleton(OUTPUT_TOPIC));
        ConsumerRecords<String, TransactionSettledEvent> records = consumer.poll(0);
        consumer.commitSync();

        assertEquals("Only 1 record should be received as response", 1, records.count());
        final TransactionSettledEvent transactionSettledEvent = this.mapper.convertValue(records.iterator().next().value(), TransactionSettledEvent.class);
        assertEquals("Output event not as expected", "Settled", transactionSettledEvent.getStatus());
    }

}

上面的测试失败了,因为我希望出现1条记录,但在输出主题中得到0条记录。

共有1个答案

逑何平
2023-03-14

<代码>消费者记录

您需要等待订阅发生;0不会这样做;示例最多等待10秒。

然而,使用它更安全

embeddedKafkaRule()。getEmbeddedKafka()。ConsumerFromanEmbeddedTopic(…)

因为它使用消费者平衡监听器可靠地等待分配。

订阅后,您还可以使用

KafkaTestUtils.getSingleRecord(Consumer<K, V> consumer, String topic);

获取记录(如果您只需要一条记录,或获取记录(…) 否则)。

 类似资料:
  • 我正在尝试为SpringCloudAzure服务总线队列流绑定器配置错误通道,但未成功。我已通过启用错误通道 并尝试定义一个: 我还尝试了和。显然,我在这里遗漏了一些东西,但我找不到一个有效的例子。 编辑:我正在使用以下供应商bean: 根据绑定命名约定,绑定的名称将为。我可以看到消息确实发送到了(另一侧有一个消费者)。 编辑和解决方案: 看来我的环境出了问题,重建后一切都如期进行,正如Garry

  • 试图开发一个Spring云应用程序,使用kafka Kafka使用的配置是: 运行应用程序,我可以看到这些配置被选中 问题是以下错误消息: 如何配置这个“AdminClient”并将正确的主机/ip信息传递给它?查看了Spring Cloud Stream Kafka活页夹参考指南,但找不到答案。

  • 我有一个需要建模为流应用程序的服务(该服务目前为相同的用例提供同步和异步api)。我正在探索是否可以将实际处理建模为流应用程序。由于处理阶段保持不变,真正的区别在于服务接口(http与消息传递协议)<在这种背景下,我正在探索spring cloud stream,因为它似乎在抽象绑定器和处理器。但要支持同步api,我需要以下内容:- 同步Api==Http Binder 将有2个可部署的很好,处理

  • 我正在使用Edgware版本中的Spring Cloud Stream binder发送Kafka消息。我也在使用Spring Sleuth和Zipkin。 Spring使用自定义类将标头嵌入到Kafka消息中。这会给一些必须处理此自定义解码的消息的非Spring消费者带来问题。 我的问题是:有没有办法为Spring配置消息头的自定义编码器/解码器(例如普通JSON)?或者可能使用Kafka标题?

  • 我试图弄清楚如何在Spring

  • 我用以下组件构建了一个spring boot kinesis消费者: Spring boot(版本-2.1.2.Release) Spring cloud(version-greenwich.release) Spring cloud stream kinesis绑定器(版本-1.1.0.发行版) 假设我有3个消费者实例部署到PCF(通过在MANIFEST.YML文件中将instances属性设置