当前位置: 首页 > 知识库问答 >
问题:

Spring集成JavaDSL HTTP在超时错误内没有收到回复

梁丘赞
2023-03-14

我正在使用Spring集成5.0.6。我已经浏览了它的文档并创建了以下代码,这些代码监听HTTPendpoint并发布到kafka主题。

一切都很好,我也收到了主题信息。但在HTTP客户端没有发送回复,它给出“在超时内没有收到回复”。

如何在下面的代码中向超文本传输协议调用者发送回复:

@Bean
public DirectChannel replyChannel() {
    return new DirectChannel();
}

@Bean(name = "restInputFlow")
public IntegrationFlow send() {
    return IntegrationFlows
            .from(Http.inboundGateway("/push").requestMapping(m -> m.methods(HttpMethod.POST))
                    .requestPayloadType(String.class).replyChannel(replyChannel()))
            .transform(new Transformer())
            .handle(kafkaMessageHandler(producerFactory(), getKafkaSourceTopic()))
            .enrichHeaders(
                    c -> c.header(org.springframework.integration.http.HttpHeaders.STATUS_CODE, HttpStatus.CREATED))
            .get();
}

private KafkaProducerMessageHandlerSpec<GenericRecord, GenericRecord, ?> kafkaMessageHandler(
            ProducerFactory<GenericRecord, GenericRecord> producerFactory, String topic) {

        return Kafka.outboundChannelAdapter(producerFactory)
                .messageKey("key").headerMapper(mapper())
                .topicExpression("headers[kafka_topic] ?: '" + topic + "'")
                .configureKafkaTemplate(t -> t.id("kafkaTemplate:" + topic));
    }

谢谢你的帮助。

共有1个答案

嵇财
2023-03-14

您使用单向Kafka.outboundChannelAdapter(producerFactory)的问题。这只是为了“发送和忘记”。

如果您有兴趣生成一些后续过程,或者需要回复HTTP请求,则应考虑使用:

/**
 * The {@link org.springframework.integration.channel.PublishSubscribeChannel} {@link #channel}
 * method specific implementation to allow the use of the 'subflow' subscriber capability.
 * @param publishSubscribeChannelConfigurer the {@link Consumer} to specify
 * {@link PublishSubscribeSpec} options including 'subflow' definition.
 * @return the current {@link IntegrationFlowDefinition}.
 */
public B publishSubscribeChannel(Consumer<PublishSubscribeSpec> publishSubscribeChannelConfigurer) {

在流定义中,您的第一个订阅者实际上是Kafka。outboundChannelAdapter(producerFactory)第二个可以是前面提到的。如果您不做更多操作,最后一个将把结果发送到replyChannel头中,因此将到达HTTP响应。

在这个发布-订阅场景中,您应该记住,第二个订阅者的负载将与您尝试发送给Kafka的负载相同。

 类似资料:
  • 我有一个如下的集成,我从rest控制器调用这个方法,但回复超时并没有像我预期的那样工作。 我期望的是:如果在我给出的回复超时时间内没有响应,则返回timeout作为对客户端的响应。 对于通道配置中的超时持续时间,是否需要执行一些操作? 谢谢。

  • 我正在通过绑定到不同Webshpere MQ的入站和出站原子和JMS使用带有JTA支持的Spring集成。流程如下: JMS入站通道适配器收到消息 一些转变 输出队列的JMS出站通道适配器 发生错误时,收到消息 异常类型路由器将未处理的错误路由到自定义重新抛出服务,并将处理的错误路由到接收者列表路由器,该路由器将它们发送到2个错误队列 我的问题是,即使消息到达errorChannel下游(在已处理

  • 您可以帮助进行spring集成配置吗。 收到消息后,我收到: 我的代码: 我有类似的JmsInboudGateway配置,用于其他没有.requestChannel和.replyChannel的队列。 如果不是注入请求通道bean,而是用文本名称声明它,得到了这个 以及更多的文字宣传问题。

  • 使用Boot 2.2.2和Integration 5.2.2——当一条XML消息源于且未能解组(即,它不是XML)时,消息将按预期的方式进行。但是,当消息来自JMS时,通过相同的通道路由,并且无法解组,它不会被路由到,消息会回滚到JMS。在这之后,我陷入了一个没完没了的循环,只为同一条消息。 我从正确的终极方式迁移JMS事件到Spring与Spring Boot的集成,遵循了这个例子。是否有一些我

  • 我对Spring集成的设想是: 使用自定义协议(大小和内容)发送数据的十个生产者 我必须解码这个自定义协议,然后处理结果。 所以我尝试了很多配置,目前最好的配置如下: 序列化类为: 我使用此代码来测试服务器: 当我用一个线程执行此操作时,如果我尝试执行多个线程,则效果很好,如: spring集成服务器卡住了,我有以下警告: 而且它不工作,服务器无法接收消息。 我错在哪里?非常感谢。 编辑 我这样修

  • 我使用Jmeter对一个包括web套接字连接的应用程序进行负载测试。 当尝试使用单读取采样器读取帧中的数据时,得到错误响应代码:无响应响应消息:读取超时,未收到响应。 有人能帮我解决这个问题吗? 线程名称:密苏里州TestEnv 1-1样例开始时间:2019-10-09 10:40:43 IST加载时间:1000连接时间:0延迟时间:0大小以字节为单位:0发送字节:0头大小以字节为单位:0主体大小