我有一个Spring boot应用程序,我使用spring-Cloud-stream从一个kafka主题中消费,进行一些处理并发布到另一个kafka主题。该应用程序运行良好,我已经编写了运行良好的单元测试(使用TestBinder)。
我现在正试图用嵌入式Kafka编写一个集成测试,并测试端到端的功能。我在这里跟踪了样本https://github.com/spring-cloud/spring-cloud-stream-samples/blob/master/testing-samples/test-embedded-kafka/src/test/java/demo/EmbeddedKafkaApplicationTests.java然而,为了编写测试,这不起作用-我无法收到关于输出主题的任何消息。
应用yml公司
spring:
cloud:
stream:
bindings:
incoming-message:
destination: ReadyForProcessing
content-type: application/json
group: ReadyForProcessingGroup
outgoing-message:
destination: TransactionSettled
content-type: application/json
变压器Binding.java
public interface TransformerBinding {
String INCOMING_MESSAGE = "incoming-message";
String OUTGOING_MESSAGE = "outgoing-message";
@Input(INCOMING_MESSAGE)
SubscribableChannel incomingMessage();
@Output(OUTGOING_MESSAGE)
MessageChannel outgoingMessage();
}
事件处理器。Java语言
@Service
@EnableBinding(TransformerBinding.class)
@Slf4j
@AllArgsConstructor
public class EventProcessor {
@Transformer(inputChannel = TransformerBinding.INCOMING_MESSAGE, outputChannel = TransformerBinding.OUTGOING_MESSAGE)
public TransactionSettledEvent transform(@Payload final ReadyForProcessingEvent readyForProcessingEvent) {
log.info("Event received in processor: {}", readyForProcessingEvent);
return TransactionSettledEvent.builder().transactionRef(readyForProcessingEvent.getTransactionRef()).status("Settled").build();
}
}
事件处理器测试。Java语言
@RunWith(SpringRunner.class)
@SpringBootTest(properties = "spring.autoconfigure.exclude="
+ "org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration")
public class EventProcessorIT {
private static final String INPUT_TOPIC = "ReadyForProcessing";
private static final String OUTPUT_TOPIC = "TransactionSettled";
private static final String CONSUMER_GROUP = "TestConsumerGroup";
@Autowired
private ObjectMapper mapper;
@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, INPUT_TOPIC, OUTPUT_TOPIC);
@BeforeClass
public static void setup() {
System.setProperty("spring.cloud.stream.kafka.binder.brokers", embeddedKafka.getEmbeddedKafka().getBrokersAsString());
}
@Test
public void testSendReceive() {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
senderProps.put("key.serializer", StringSerializer.class);
senderProps.put("value.serializer", JsonSerializer.class);
DefaultKafkaProducerFactory<String, ReadyForProcessingEvent> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<String, ReadyForProcessingEvent> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic(INPUT_TOPIC);
template.sendDefault(ReadyForProcessingEvent.builder().transactionRef("123456").build());
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(CONSUMER_GROUP, "false", embeddedKafka.getEmbeddedKafka());
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP);
consumerProps.put("key.deserializer", StringDeserializer.class);
consumerProps.put("value.deserializer", JsonDeserializer.class);
DefaultKafkaConsumerFactory<String, TransactionSettledEvent> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<String, TransactionSettledEvent> consumer = cf.createConsumer();
consumer.subscribe(Collections.singleton(OUTPUT_TOPIC));
ConsumerRecords<String, TransactionSettledEvent> records = consumer.poll(0);
consumer.commitSync();
assertEquals("Only 1 record should be received as response", 1, records.count());
final TransactionSettledEvent transactionSettledEvent = this.mapper.convertValue(records.iterator().next().value(), TransactionSettledEvent.class);
assertEquals("Output event not as expected", "Settled", transactionSettledEvent.getStatus());
}
}
上面的测试失败了,因为我希望出现1条记录,但在输出主题中得到0条记录。
<代码>消费者记录
您需要等待订阅发生;0不会这样做;示例最多等待10秒。
然而,使用它更安全
embeddedKafkaRule()。getEmbeddedKafka()。ConsumerFromanEmbeddedTopic(…)
因为它使用
消费者平衡监听器
可靠地等待分配。
订阅后,您还可以使用
KafkaTestUtils.getSingleRecord(Consumer<K, V> consumer, String topic);
获取记录(如果您只需要一条记录,或获取记录(…)
否则)。
我正在尝试为SpringCloudAzure服务总线队列流绑定器配置错误通道,但未成功。我已通过启用错误通道 并尝试定义一个: 我还尝试了和。显然,我在这里遗漏了一些东西,但我找不到一个有效的例子。 编辑:我正在使用以下供应商bean: 根据绑定命名约定,绑定的名称将为。我可以看到消息确实发送到了(另一侧有一个消费者)。 编辑和解决方案: 看来我的环境出了问题,重建后一切都如期进行,正如Garry
试图开发一个Spring云应用程序,使用kafka Kafka使用的配置是: 运行应用程序,我可以看到这些配置被选中 问题是以下错误消息: 如何配置这个“AdminClient”并将正确的主机/ip信息传递给它?查看了Spring Cloud Stream Kafka活页夹参考指南,但找不到答案。
我有一个需要建模为流应用程序的服务(该服务目前为相同的用例提供同步和异步api)。我正在探索是否可以将实际处理建模为流应用程序。由于处理阶段保持不变,真正的区别在于服务接口(http与消息传递协议)<在这种背景下,我正在探索spring cloud stream,因为它似乎在抽象绑定器和处理器。但要支持同步api,我需要以下内容:- 同步Api==Http Binder 将有2个可部署的很好,处理
我正在使用Edgware版本中的Spring Cloud Stream binder发送Kafka消息。我也在使用Spring Sleuth和Zipkin。 Spring使用自定义类将标头嵌入到Kafka消息中。这会给一些必须处理此自定义解码的消息的非Spring消费者带来问题。 我的问题是:有没有办法为Spring配置消息头的自定义编码器/解码器(例如普通JSON)?或者可能使用Kafka标题?
我试图弄清楚如何在Spring
我用以下组件构建了一个spring boot kinesis消费者: Spring boot(版本-2.1.2.Release) Spring cloud(version-greenwich.release) Spring cloud stream kinesis绑定器(版本-1.1.0.发行版) 假设我有3个消费者实例部署到PCF(通过在MANIFEST.YML文件中将instances属性设置