我无法将来自topic3的响应与topic1上的请求关联起来,因为correlationid在中间主题中丢失了。如果我不使用中间主题(比如topic2),那么topic1将发送一个带有相关ID的消息,相应的响应将从Topic3接收。
任何意见/建议都大有帮助。
下面是示例代码:从我的API中,我发布了一个事务
public String postTransaction(String request,Map<String, String> headers) throws InterruptedException, ExecutionException {
ProducerRecord<String,String> record=new ProducerRecord<String,String>(topic1,"300",request);
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC,topic3.getBytes()));
RequestReplyFuture<String,String,String> sendAndReceive=kafkaTemplate.sendAndReceive(record);
SendResult<String,String> requestMessage=sendAndReceive.getSendFuture().get();
return sendAndReceive.get().value();
}
public void listen(Object request, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets,
@Header(KafkaHeaders.CORRELATION_ID) byte[] coRlId) throws InterruptedException {
ProducerRecord<String,String> record=new ProducerRecord<String,String>("topic2","300",k.value());
record.headers().add(new RecordHeader(KafkaHeaders.CORRELATION_ID,coRlId.get(0).getBytes()));
kafkaTemplate.send(record);
}
我刚测试了一下,对我来说很好...
@SpringBootApplication
public class So61152047Application {
public static void main(String[] args) {
SpringApplication.run(So61152047Application.class, args);
}
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Bean
public ReplyingKafkaTemplate<String, String, String> replyer(ProducerFactory<String, String> pf,
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
containerFactory.setReplyTemplate(kafkaTemplate(pf));
ConcurrentMessageListenerContainer<String, String> container = replyContainer(containerFactory);
ReplyingKafkaTemplate<String, String, String> replyer = new ReplyingKafkaTemplate<>(pf, container);
return replyer;
}
@Bean
public ConcurrentMessageListenerContainer<String, String> replyContainer(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("topic3");
container.getContainerProperties().setGroupId("three");
return container;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf) {
return new KafkaTemplate<>(pf);
}
@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> rkt) {
return args -> {
ProducerRecord<String, String> pr = new ProducerRecord<>("topic1", "foo", "bar");
RequestReplyFuture<String, String, String> future = rkt.sendAndReceive(pr);
System.out.println(future.get(10, TimeUnit.SECONDS).value());
};
}
@Bean
public NewTopic topic1() {
return TopicBuilder.name("topic1").partitions(1).replicas(1).build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("topic2").partitions(1).replicas(1).build();
}
@Bean
public NewTopic topic3() {
return TopicBuilder.name("topic3").partitions(1).replicas(1).build();
}
@KafkaListener(id = "one", topics = "topic1")
public void listen1(String in,
@Header(KafkaHeaders.CORRELATION_ID) byte[] corrId) {
System.out.println(in);
ProducerRecord<String, String> pr = new ProducerRecord<>("topic2", in.toUpperCase());
pr.headers().add(new RecordHeader(KafkaHeaders.CORRELATION_ID, corrId));
this.kafkaTemplate.send(pr);
}
@KafkaListener(id = "two", topics = "topic2")
@SendTo("topic3")
public String listen2(String in) {
return in + in;
}
}
和
bar
BARBAR
您还可以传播reply-to标头...
@KafkaListener(id = "one", topics = "topic1")
public void listen1(String in,
@Header(KafkaHeaders.REPLY_TOPIC) byte[] replyTo,
@Header(KafkaHeaders.CORRELATION_ID) byte[] corrId) {
System.out.println(in);
ProducerRecord<String, String> pr = new ProducerRecord<>("topic2", in.toUpperCase());
pr.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, replyTo));
pr.headers().add(new RecordHeader(KafkaHeaders.CORRELATION_ID, corrId));
this.kafkaTemplate.send(pr);
}
@KafkaListener(id = "two", topics = "topic2")
@SendTo // ("topic3")
public String listen2(String in) {
return in + in;
}
public class CorrelatingProducerInterceptor implements ProducerInterceptor<String, Foo> {
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public ProducerRecord<String, Foo> onSend(ProducerRecord<String, Foo> record) {
Header correlation = record.headers().lastHeader(KafkaHeaders.CORRELATION_ID);
if (correlation != null) {
record.value().setCorrelation(correlation.value());
}
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
}
@SpringBootApplication
public class So61152047Application {
public static void main(String[] args) {
SpringApplication.run(So61152047Application.class, args);
}
@Autowired
private KafkaTemplate<String, Foo> kafkaTemplate;
@Bean
public ReplyingKafkaTemplate<String, Foo, Foo> replyer(ProducerFactory<String, Foo> pf,
ConcurrentKafkaListenerContainerFactory<String, Foo> containerFactory) {
containerFactory.setReplyTemplate(kafkaTemplate(pf));
ConcurrentMessageListenerContainer<String, Foo> container = replyContainer(containerFactory);
ReplyingKafkaTemplate<String, Foo, Foo> replyer = new ReplyingKafkaTemplate<>(pf, container);
return replyer;
}
@Bean
public ConcurrentMessageListenerContainer<String, Foo> replyContainer(
ConcurrentKafkaListenerContainerFactory<String, Foo> containerFactory) {
ConcurrentMessageListenerContainer<String, Foo> container = containerFactory.createContainer("topic3");
container.getContainerProperties().setGroupId("three");
return container;
}
@Bean
public KafkaTemplate<String, Foo> kafkaTemplate(ProducerFactory<String, Foo> pf) {
return new KafkaTemplate<>(pf);
}
@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String, Foo, Foo> rkt) {
return args -> {
ProducerRecord<String, Foo> pr = new ProducerRecord<>("topic1", "foo", new Foo("bar"));
RequestReplyFuture<String, Foo, Foo> future = rkt.sendAndReceive(pr);
System.out.println(future.get(10, TimeUnit.SECONDS).value());
};
}
@Bean
public NewTopic topic1() {
return TopicBuilder.name("topic1").partitions(1).replicas(1).build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("topic2").partitions(1).replicas(1).build();
}
@Bean
public NewTopic topic3() {
return TopicBuilder.name("topic3").partitions(1).replicas(1).build();
}
@KafkaListener(id = "one", topics = "topic1")
public void listen1(Foo in) {
System.out.println(in);
in.setContent(in.getContent().toUpperCase());
ProducerRecord<String, Foo> pr = new ProducerRecord<>("topic2", in);
this.kafkaTemplate.send(pr);
}
@KafkaListener(id = "two", topics = "topic2")
public void listen2(Foo in) {
ProducerRecord<String, Foo> pr = new ProducerRecord<>("topic3", new Foo(in.getContent() + in.getContent()));
pr.headers().add(new RecordHeader(KafkaHeaders.CORRELATION_ID, in.getCorrelation()));
this.kafkaTemplate.send(pr);
}
}
class Foo {
String content;
byte[] correlation;
public Foo() {
}
public Foo(String content) {
this.content = content;
}
public String getContent() {
return this.content;
}
public void setContent(String content) {
this.content = content;
}
public byte[] getCorrelation() {
return this.correlation;
}
public void setCorrelation(byte[] correlation) {
this.correlation = correlation;
}
@Override
public String toString() {
return "Foo [content=" + this.content + "]";
}
}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.demo.Foo
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.interceptor.classes=com.example.demo.CorrelatingProducerInterceptor
Foo [content=bar]
Foo [content=BARBAR]
问题内容: 如果我需要按顺序调用3 http API,那么以下代码将是更好的选择: 问题答案: 使用像这样的延期。 如果您需要传递范围,则只需执行以下操作
我想要将kafka事务与存储库事务同步: 如果我能获得一个简单的kafka事务与存储库事务同步的示例和一个解释,我会真正帮助我。
本文向大家介绍全面解析iOS中同步请求、异步请求、GET请求、POST请求,包括了全面解析iOS中同步请求、异步请求、GET请求、POST请求的使用技巧和注意事项,需要的朋友参考一下 先给大家分别介绍下iOS中同步请求、异步请求、GET请求、POST所代表的意思,然后在逐一通过实例给大家介绍。 1、同步请求可以从因特网请求数据,一旦发送同步请求,程序将停止用户交互,直至服务器返回数据完成,才可以进
问题内容: 最新版本的Chrome是否已停止同步Ajax调用?进行同步Ajax调用时出现错误。我们更新到最新的Chrome版本-73.0.3683.103后,该问题突然出现。目前,同步调用在Firefox和IE上可以正常工作。我们正在错误以下 消息:“无法在’XMLHttpRequest’上执行’发送’:无法加载’Path …’:页面关闭时的同步XHR”。名称:“ NetworkError” 有人
我有一个网络应用程序,当用户提交请求时,我们向远程服务发送JMS消息,然后等待回复。(也有异步请求,我们为消息重放等设置了各种细节,所以我们更愿意坚持使用JMS而不是HTTP) 在如何使用JMS实现请求-响应?,ActiveMQ似乎不鼓励每个请求使用临时队列,也不鼓励在JMSCorrelationID上使用选择器的临时消费者,因为这会增加开销。 但是,如果我使用池消费者进行回复,我如何从回复消费者