我们正在使用sping-kafka-test-2.2.8-RELEASE。当我使用模板发送消息时,它会正确触发侦听器,但我无法在consumer.poll.中获取消息内容。如果我实例化KafkaTem板而不在类属性中“连接”它,并基于生产者工厂实例化它,它发送消息,但不触发@KafkaListener,只有在我在@Test method中设置消息监听器时才能工作。我需要触发kafka监听器,并意识到下一个主题将被调用(成功主题时,执行没有错误,和错误主题监听器抛出一个异常)和消息内容。
@RunWith(SpringRunner.class)
@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = { "tp-in-gco-mao-notasfiscais" })
public class InvoicingServiceTest {
@Autowired
private NFKafkaListener nfKafkaListener;
@ClassRule
public static EmbeddedKafkaRule broker = new EmbeddedKafkaRule(1, false, "tp-in-gco-mao-
notasfiscais");
@Value("${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
private String brokerAddresses;
@Autowired
private KafkaTemplate<Object, Object> template;
@BeforeClass
public static void setup() {
System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY,
"spring.kafka.bootstrap-servers");
}
@Test
public void testTemplate() throws Exception {
NFServiceTest nfServiceTest = spy(new NFServiceTest());
nfKafkaListener.setNfServiceClient(nfServiceTest);
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("teste9", "false", broker.getEmbeddedKafka());
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, InvoiceDeserializer.class);
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
DefaultKafkaConsumerFactory<Integer, Object> cf = new DefaultKafkaConsumerFactory<Integer, Object>(
consumerProps);
Consumer<Integer, Object> consumer = cf.createConsumer();
broker.getEmbeddedKafka().consumeFromAnEmbeddedTopic(consumer, "tp-in-gco-mao-notasfiscais");
ZfifNfMao zf = new ZfifNfMao();
zf.setItItensnf(new Zfietb011());
Zfietb011 zfietb011 = new Zfietb011();
Zfie011 zfie011 = new Zfie011();
zfie011.setMatkl("TESTE");
zfietb011.getItem().add(zfie011);
zf.setItItensnf(zfietb011);
template.send("tp-in-gco-mao-notasfiscais", zf);
List<ConsumerRecord<Integer, Object>> received = new ArrayList<>();
int n = 0;
while (received.size() < 1 && n++ < 10) {
ConsumerRecords<Integer, Object> records1 = consumer.poll(Duration.ofSeconds(10));
//records1 is always empty
if (!records1.isEmpty()) {
records1.forEach(rec -> received.add(rec));
}
}
assertThat(received).extracting(rec -> {
ZfifNfMao zfifNfMaoRdesponse = (ZfifNfMao) rec.value();
return zfifNfMaoRdesponse.getItItensnf().getItem().get(0).getMatkl();
}).contains("TESTE");
broker.getEmbeddedKafka().getKafkaServers().forEach(b -> b.shutdown());
broker.getEmbeddedKafka().getKafkaServers().forEach(b -> b.awaitShutdown());
consumer.close();
}
public static class NFServiceTest implements INFServiceClient {
CountDownLatch latch = new CountDownLatch(1);
@Override
public ZfifNfMaoResponse enviarSap(ZfifNfMao zfifNfMao) {
ZfifNfMaoResponse zfifNfMaoResponse = new ZfifNfMaoResponse();
zfifNfMaoResponse.setItItensnf(new Zfietb011());
Zfietb011 zfietb011 = new Zfietb011();
Zfie011 zfie011 = new Zfie011();
zfie011.setMatkl("TESTE");
zfietb011.getItem().add(zfie011);
zfifNfMaoResponse.setItItensnf(zfietb011);
return zfifNfMaoResponse;
}
}
}
我遵循了你的建议,但它一直在触发听众,但consumer.poll没有捕捉到主题内容。
@RunWith(SpringRunner.class)
@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = { "tp-in-gco-mao-notasfiscais" })
public class InvoicingServiceTest {
@Autowired
private NFKafkaListener nfKafkaListener;
@Autowired
public EmbeddedKafkaBroker broker;
@Autowired
private KafkaTemplate<Object, Object> template;
@BeforeClass
public static void setup() {
System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY,
"spring.kafka.bootstrap-servers");
}
@Test
public void testTemplate() throws Exception {
NFServiceTest nfServiceTest = spy(new NFServiceTest());
nfKafkaListener.setNfServiceClient(nfServiceTest);
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("teste9", "false", broker);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, InvoiceDeserializer.class);
DefaultKafkaConsumerFactory<Integer, Object> cf = new DefaultKafkaConsumerFactory<Integer, Object>(
consumerProps);
Consumer<Integer, Object> consumer = cf.createConsumer();
broker.consumeFromAnEmbeddedTopic(consumer, "tp-in-gco-mao-notasfiscais");
ZfifNfMao zf = new ZfifNfMao();
zf.setItItensnf(new Zfietb011());
Zfietb011 zfietb011 = new Zfietb011();
Zfie011 zfie011 = new Zfie011();
zfie011.setMatkl("TESTE");
zfietb011.getItem().add(zfie011);
zf.setItItensnf(zfietb011);
template.send("tp-in-gco-mao-notasfiscais", zf);
List<ConsumerRecord<Integer, Object>> received = new ArrayList<>();
int n = 0;
while (received.size() < 1 && n++ < 10) {
ConsumerRecords<Integer, Object> records1 = consumer.poll(Duration.ofSeconds(10));
//records1 is always empty
if (!records1.isEmpty()) {
records1.forEach(rec -> received.add(rec));
}
}
assertThat(received).extracting(rec -> {
ZfifNfMao zfifNfMaoRdesponse = (ZfifNfMao) rec.value();
return zfifNfMaoRdesponse.getItItensnf().getItem().get(0).getMatkl();
}).contains("TESTE");
broker.getKafkaServers().forEach(b -> b.shutdown());
broker.getKafkaServers().forEach(b -> b.awaitShutdown());
consumer.close();
}
public static class NFServiceTest implements INFServiceClient {
CountDownLatch latch = new CountDownLatch(1);
@Override
public ZfifNfMaoResponse enviarSap(ZfifNfMao zfifNfMao) {
ZfifNfMaoResponse zfifNfMaoResponse = new ZfifNfMaoResponse();
zfifNfMaoResponse.setItItensnf(new Zfietb011());
Zfietb011 zfietb011 = new Zfietb011();
Zfie011 zfie011 = new Zfie011();
zfie011.setMatkl("TESTE");
zfietb011.getItem().add(zfie011);
zfifNfMaoResponse.setItItensnf(zfietb011);
return zfifNfMaoResponse;
}
}
}
你有两个经纪人;一个由@EmbeddedKafka
创建,另一个由@ClassRule
创建。
使用一个或另一个;最好是@EmbeddedKafka
和简单的@Autowired
代理实例。
我猜消费者正在听不同的经纪人;您可以通过查看消费者配置发布的INFO日志来确认这一点。
我是Apache Kafka的新手,能够从发送方发送消息(以JSON格式),但不能在消费者服务中消费。 有人能帮我吗?
我有课: 配置类:公共类RabbitConfiguration{ 听众: a仅启动应用程序有错误 2017-08-08 12:58:02.128警告5024---[cTaskExecutor-1]S.A.R.L.ConditionalRejectingErrorHandler:Rabbit消息侦听器执行失败。 原因:org.SpringFramework.Messaging.Handler.Ann
我一直在Kafka消费者方面面临下面的异常。令人惊讶的是,这个问题不一致,旧版本的代码(具有完全相同的配置,但有一些新的不相关功能)按预期工作。有人能帮助确定是什么导致了这种情况吗? 我的应用程序使用以下内容: 自定义侦听器类com。我的公司。听众。Kafka巴奇列斯特纳 附加查询:即使设置了,异常堆栈跟踪仍然包含我省略的完整有效负载。知道为什么吗? 提前感谢! 更新: KafkaBatchLis
这是创建ListenerContainerFactory的类 这是我用@KafKalistener注释的Listener类 这是KafkaListenerConfig类,它接受引导服务器、主题名称等。
我需要在一定的持续时间后将消息发送给MessageListener,所以有没有任何方法可以使用SpringAMQP实现。 如。Producer生成消息并将消息发送到RabbitMQ Q,该消息立即被侦听器接收到,我想延迟消费者端接收到的消息,比如说在一些配置参数(比如1000ms)之后