我目前正在开发Kafka模块,我正在使用Kafka通信的spring-kafka
抽象。我能够集成生产者
Spring Boot测试类
//imports not mentioned due to brevity
@RunWith(SpringRunner.class)
@SpringBootTest(classes = PaymentAccountUpdaterApplication.class,
webEnvironment = SpringBootTest.WebEnvironment.NONE)
public class CardUpdaterMessagingIntegrationTest {
private final static String cardUpdateTopic = "TP.PRF.CARDEVENTS";
@Autowired
private ObjectMapper objectMapper;
@ClassRule
public static KafkaEmbedded kafkaEmbedded =
new KafkaEmbedded(1, false, cardUpdateTopic);
@Test
public void sampleTest() throws Exception {
Map<String, Object> consumerConfig =
KafkaTestUtils.consumerProps("test", "false", kafkaEmbedded);
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
ConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerConfig);
ContainerProperties containerProperties = new ContainerProperties(cardUpdateTopic);
containerProperties.setMessageListener(new SafeStringJsonMessageConverter());
KafkaMessageListenerContainer<String, String>
container = new KafkaMessageListenerContainer<>(cf, containerProperties);
BlockingQueue<ConsumerRecord<String, String>> records = new LinkedBlockingQueue<>();
container.setupMessageListener((MessageListener<String, String>) data -> {
System.out.println("Added to Queue: "+ data);
records.add(data);
});
container.setBeanName("templateTests");
container.start();
ContainerTestUtils.waitForAssignment(container, kafkaEmbedded.getPartitionsPerTopic());
Map<String, Object> producerConfig = KafkaTestUtils.senderProps(kafkaEmbedded.getBrokersAsString());
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
ProducerFactory<String, Object> pf =
new DefaultKafkaProducerFactory<>(producerConfig);
KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(pf);
String payload = objectMapper.writeValueAsString(accountWrapper());
kafkaTemplate.send(cardUpdateTopic, 0, payload);
ConsumerRecord<String, String> received = records.poll(10, TimeUnit.SECONDS);
assertThat(received).has(partition(0));
}
@After
public void after() {
kafkaEmbedded.after();
}
private AccountWrapper accountWrapper() {
return AccountWrapper.builder()
.eventSource("PROFILE")
.eventName("INITIAL_LOAD_CARD")
.eventTime(LocalDateTime.now().toString())
.eventID("8730c547-02bd-45c0-857b-d90f859e886c")
.details(AccountDetail.builder()
.customerId("idArZ_K2IgE86DcPhv-uZw")
.vaultId("912A60928AD04F69F3877D5B422327EE")
.expiryDate("122019")
.build())
.build();
}
}
监听器类
@Service
public class ConsumerMessageListener {
private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerMessageListener.class);
private ConsumerMessageProcessorService consumerMessageProcessorService;
public ConsumerMessageListener(ConsumerMessageProcessorService consumerMessageProcessorService) {
this.consumerMessageProcessorService = consumerMessageProcessorService;
}
@KafkaListener(id = "cardUpdateEventListener",
topics = "${kafka.consumer.cardupdates.topic}",
containerFactory = "kafkaJsonListenerContainerFactory")
public void processIncomingMessage(Payload<AccountWrapper,Object> payloadContainer,
Acknowledgment acknowledgment,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) String partitionId,
@Header(KafkaHeaders.OFFSET) String offset) {
try {
// business logic to process the message
consumerMessageProcessorService.processIncomingMessage(payloadContainer);
} catch (Exception e) {
LOGGER.error("Unhandled exception in card event message consumer. Discarding offset commit." +
"message:: {}, details:: {}", e.getMessage(), messageMetadataInfo);
throw e;
}
acknowledgment.acknowledge();
}
}
我的问题是:在测试类中,我断言分区、有效负载等是从BlockingQueue轮询的,然而,我的问题是如何验证用KafkaListener注释的类中的业务逻辑是否得到正确执行,并根据错误处理和其他业务场景将消息路由到不同的主题。在一些示例中,我看到了CountDownLatch的断言,我不想将其放在业务逻辑中以在生产级代码中进行断言。此外,消息处理器是异步的,因此,如何断言执行,还不确定。
任何帮助,谢谢。
正在正确执行,并根据错误处理和其他业务场景将消息路由到不同的主题。
联调可以使用该“不同”主题来断言侦听器按预期处理了它。
您还可以向测试用例中添加BeanPostProcessor,并将ConsumerMessageListener封装在代理中,以验证输入参数是否符合预期。
编辑
下面是一个在代理中包装侦听器的示例。。。
@SpringBootApplication
public class So53678801Application {
public static void main(String[] args) {
SpringApplication.run(So53678801Application.class, args);
}
@Bean
public MessageConverter converter() {
return new StringJsonMessageConverter();
}
public static class Foo {
private String bar;
public Foo() {
super();
}
public Foo(String bar) {
this.bar = bar;
}
public String getBar() {
return this.bar;
}
public void setBar(String bar) {
this.bar = bar;
}
@Override
public String toString() {
return "Foo [bar=" + this.bar + "]";
}
}
}
@Component
class Listener {
@KafkaListener(id = "so53678801", topics = "so53678801")
public void processIncomingMessage(Foo payload,
Acknowledgment acknowledgment,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) String partitionId,
@Header(KafkaHeaders.OFFSET) String offset) {
System.out.println(payload);
// ...
acknowledgment.acknowledge();
}
}
和
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.ack-mode=manual
和
@RunWith(SpringRunner.class)
@SpringBootTest(classes = { So53678801Application.class,
So53678801ApplicationTests.TestConfig.class})
public class So53678801ApplicationTests {
@ClassRule
public static EmbeddedKafkaRule embededKafka = new EmbeddedKafkaRule(1, false, "so53678801");
@BeforeClass
public static void setup() {
System.setProperty("spring.kafka.bootstrap-servers",
embededKafka.getEmbeddedKafka().getBrokersAsString());
}
@Autowired
private KafkaTemplate<String, String> template;
@Autowired
private ListenerWrapper wrapper;
@Test
public void test() throws Exception {
this.template.send("so53678801", "{\"bar\":\"baz\"}");
assertThat(this.wrapper.latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(this.wrapper.argsReceived[0]).isInstanceOf(Foo.class);
assertThat(((Foo) this.wrapper.argsReceived[0]).getBar()).isEqualTo("baz");
assertThat(this.wrapper.ackCalled).isTrue();
}
@Configuration
public static class TestConfig {
@Bean
public static ListenerWrapper bpp() { // BPPs have to be static
return new ListenerWrapper();
}
}
public static class ListenerWrapper implements BeanPostProcessor, Ordered {
private final CountDownLatch latch = new CountDownLatch(1);
private Object[] argsReceived;
private boolean ackCalled;
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof Listener) {
ProxyFactory pf = new ProxyFactory(bean);
pf.setProxyTargetClass(true); // unless the listener is on an interface
pf.addAdvice(interceptor());
return pf.getProxy();
}
return bean;
}
private MethodInterceptor interceptor() {
return invocation -> {
if (invocation.getMethod().getName().equals("processIncomingMessage")) {
Object[] args = invocation.getArguments();
this.argsReceived = Arrays.copyOf(args, args.length);
Acknowledgment ack = (Acknowledgment) args[1];
args[1] = (Acknowledgment) () -> {
this.ackCalled = true;
ack.acknowledge();
};
try {
return invocation.proceed();
}
finally {
this.latch.countDown();
}
}
else {
return invocation.proceed();
}
};
}
}
}
本教程演示了如何发送和接收来自Spring Kafka的消息。 首先创建一个能够发送消息给Kafka主题的Spring Kafka Producer。 接下来,我们创建一个Spring Kafka Consumer,它可以收听发送给Kafka主题的消息。使用适当的键/值序列化器和解串器来配置它们。 最后用一个简单的Spring Boot应用程序演示应用程序。 下载并安装Apache Kafka 要
生产者线程与消费者线程使用信号量同步 生产者线程与消费者线程使用信号量同步 源码/* * Copyright (c) 2006-2018, RT-Thread Development Team * * SPDX-License-Identifier: Apache-2.0 * * Change Logs: * Date Author Notes * 2018-08-24 yangjie the f
所谓的生产者消费者模型就是 某个模块(函数)负责生产数据,这些数据由另一个模块来负责处理 一般生产者消费者模型包含三个部分 生产者、缓冲区、消费者 为什么生产者消费者模型要含三个部分?直接生产和消费不行么? 一个案例说明一切 生产者好比现实生活中的某个人 缓冲区好比现实生活中的邮箱 消费者好比现实生活中的邮递员 如果只有生产者和消费者, 那么相当于只有写信的人和邮递员,那么如果将来过去的邮递员离职
我有一个消费者作为生产者消费者模式的一部分: 简化: 如果我移除 通过将线程设置为睡眠,CPU使用率攀升到极高的水平(13%),而不是0%。 此外,如果我实例化该类的多个实例,则每个实例的CPU使用率都会以13%的增量攀升。 大约每分钟(可能每30秒)都会向BlockingCollection添加一个新的LogItem,并将适用的消息写入文件。 有没有可能线程以某种方式阻止了其他线程的运行,而系统
一、线程间通信的两种方式 1.wait()/notify() Object类中相关的方法有notify方法和wait方法。因为wait和notify方法定义在Object类中,因此会被所有的类所继承。这些方法都是final的,即它们都是不能被重写的,不能通过子类覆写去改变它们的行为。 ①wait()方法: 让当前线程进入等待,并释放锁。 ②wait(long)方法: 让当前线程进入等待,并释放锁,
我的应用程序有一个生产者和一个消费者。我的生产者不定期地生成消息。有时我的队列会是空的,有时我会有一些消息。我想让我的消费者监听队列,当有消息在其中时,接受它并处理这条消息。这个过程可能需要几个小时,如果我的消费者没有完成处理当前消息,我不希望他接受队列中的另一条消息。 我认为AKKA和AWS SQS可以满足我的需求。通过阅读文档和示例,akka-camel似乎可以简化我的工作。 我在github