当前位置: 首页 > 知识库问答 >
问题:

使用Spring Kafka ReplyingKafka模板与Kafka Streams应用程序

陶腾
2023-03-14

我想有一个客户端应用程序与请求/响应语义学调用另一个应用程序,这是一个Kafka流应用程序。

我的客户端应用程序基于此示例(基本上没有变化)。我需要从客户端接收消息的应用程序是Kafka Streams应用程序。但是包含相关id的消息头丢失。

Kafka Streams应用程序是一个简单的拓扑结构,用于测试此。。。

    @Bean
    public KafkaStreams stream(KafkaStreamsConfiguration kafkaStreamsConfiguration) {
        final StreamsBuilder builder = new StreamsBuilder();
        builder.<String, String>stream(REQUEST_TOPIC_NAME)
                .groupByKey()
                .count()
                .toStream()
                .mapValues((ValueMapper<Long, String>)String::valueOf)
                .to(REPLY_TOPIC_NAME);

        return new KafkaStreams(builder.build(), kafkaStreamsConfiguration.asProperties());
    }

对于这个POC,我保持它的简单性,让客户机和服务器“同意”主题名称(kRequestskReplies)。所以在这一点上,我只想得到要识别并返回的相关id。

我现在看到的是

2019-10-01 10:55:38.792  WARN 76830 --- [TaskScheduler-1] o.s.k.r.ReplyingKafkaTemplate            : Reply timed out for: ProducerRecord(topic=kRequests, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = kafka_replyTopic, value = [107, 82, 101, 112, 108, 105, 101, 115]), RecordHeader(key = kafka_correlationId, value = [101, -4, -35, 41, -127, -66, 69, 37, -117, -127, -95, -92, 38, 79, 73, 127])], isReadOnly = true), key=null, value=foo21074, timestamp=null) with correlationId: [135564972083657938538225367552235620735]
2019-10-01 10:55:38.792 ERROR 76830 --- [TaskScheduler-1] org.KRequestingApplication  : Reply timed out

org.springframework.kafka.KafkaException: Reply timed out
    at org.springframework.kafka.requestreply.ReplyingKafkaTemplate.lambda$scheduleTimeout$0(ReplyingKafkaTemplate.java:257) ~[spring-kafka-2.2.8.RELEASE.jar:2.2.8.RELEASE]
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_211]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_211]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_211]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[na:1.8.0_211]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_211]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_211]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_211]

在超时时间内,回复主题上没有具有匹配相关性id的消息。似乎至少使用Kafka Streams DSL无法支持回复KafkaTemplate

共有1个答案

叶展
2023-03-14

你的场景没有意义;您的KStream正在对多个输入进行分组;请求/回复为1请求1回复。

这个很好用。。。

@SpringBootApplication
@EnableKafkaStreams
public class So58193901Application {

    private static final String REQUEST_TOPIC_NAME = "requests";

    private static final String REPLY_TOPIC_NAME = "replies";

    public static void main(String[] args) {
        SpringApplication.run(So58193901Application.class, args);
    }

    @Bean
    public KStream<byte[], byte[]> stream(StreamsBuilder builder) {
        KStream<byte[], byte[]> stream = builder.stream(REQUEST_TOPIC_NAME);
        stream
                .mapValues(val -> new String(val).toUpperCase().getBytes())
                .to(REPLY_TOPIC_NAME);
        return stream;
    }

    @Bean
    public NewTopic topic1() {
        return TopicBuilder.name(REQUEST_TOPIC_NAME).partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic topic2() {
        return TopicBuilder.name(REPLY_TOPIC_NAME).partitions(1).replicas(1).build();
    }

    @Bean
    public ReplyingKafkaTemplate<String, String, String> template(ProducerFactory<String, String> pf,
            ConcurrentKafkaListenerContainerFactory<String, String> factory) {

        ConcurrentMessageListenerContainer<String, String> replyContainer = factory.createContainer(REPLY_TOPIC_NAME);
        return new ReplyingKafkaTemplate<>(pf, replyContainer);
    }

    @Bean
    public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
        return args -> {
            System.out.println(template.sendAndReceive(new ProducerRecord<>(REQUEST_TOPIC_NAME, "bar", "foo"))
                    .get(10, TimeUnit.SECONDS).value());
        };
    }

}
 类似资料:
  • 英文原文:http://emberjs.com/guides/application/the-application-template/ 应用模板是应用启动的时候默认渲染的模板。 你应该把你的header、footer和其他装饰性的内容放在应用模板里面。另外,应用模版中至少需要一个{{outlet}}占位符,以便路由能根据当前的URL将适当的模版渲染进来。 下面是一个应用模板的例子: 1 2 3

  • 介绍 TensorFlow template application表示通用的TensorFlow应用代码,用户可以直接使用这些模板而不需要编写TensorFlow应用代码。 用户训练数据一般都是稠密的CSV格式,或稀疏的LIBSVM格式或图片,这些数据都可以转成TFRecords,模型本身则可以使用代码生成,通过不同的超参数组合可以实现通过的TensorFlow应用,用户直接下载模板或者使用Xi

  • 问题内容: 我了解C 中模板的方面与Java和C#中的泛型不同。C#是一种形式,Java使用类型擦除,C 使用鸭子类型,等等。C 模板可以做很多事情,而Java和C#泛型则做不到(例如,模板专业化)。但是 Java泛型可以做很多事情,而C#和C ++则做不到(例如,使泛型族的有界类型参数成为现实 ),而 C#泛型可以做的很多事情Java和C 不能做(例如运行时通用反射)。 [编辑:显然Java泛型

  • 问题内容: 是否有Flask或Jinja2配置标记/扩展名,可在呈现模板后自动最小化HTML输出? 问题答案: 找到了一种更好的方法。你可以使用以下方法缩小所有页面:

  • 我的机器使用Rails 5。因为一些业务,我必须创建新的rails 4应用程序。我使用以下语法生成rails 4应用程序: 然而,新的应用程序结构是为rails 5构建的。我的问题是:我如何使用rails 5来生成rails 4结构应用程序。 谢谢

  • 模板提供了一种简便的方式,将展现逻辑从控制器和业务逻辑中分离出来。 一般来说,模板包含应用程序的 HTML 代码,但也可以使用其他的格式,例如 XML 。 模板通常也被称为「视图」, 而它是 模型-视图-控制器 (MVC) 软件架构模式第二个元素的 一部份 。