当前位置: 首页 > 知识库问答 >
问题:

无法在Camel HTTP组件中配置“Keep Alive”

羊舌诚
2023-03-14

我在HTTP组件的正确设置方面遇到了一些问题。目前,一个微服务从提供者那里提取JSON内容,对其进行处理,并将其发送到下一个服务以进行进一步的处理。主要问题是这个微服务创建了大量CLOSE_WAIT套接字连接。我知道“KEEP-ALIVE”的整个概念应该保持连接打开,直到我关闭它,但服务器可能会出于某种原因断开连接并创建这个CLOSE_WAIT套接字。

我创建了一个用于调试/测试目的的小服务,它可以调用Google,但即使是这个连接,在我关闭程序之前也会保持打开状态。我尝试了许多不同的解决方案:

  • . setHeader(“连接”,常量(“关闭”))
  • -Dhttp.keepAlive=false as VM参数
  • 从Camel-Http切换到Camel-Http4
  • httpClient.soTimeout=500(Camel-HTTP),httpClient.socketTimeout=500和ConnectionTimeToLive=500(Camel-HTTP4)
  • . setHeader(“连接”,简单(“Keep-Alive”))和. setHeader(“Keep-Alive”,简单(“timeout=10”))(Camel-HTTP4)
  • 通过调试DefaultConnectionKeepAliveStrategy的响应从-1(永不结束)设置为Camel-HTTP4中的特定值-这可以工作,但我无法注入自己的策略。

但我没有成功。也许你们中的一个人可以帮我:

  • 我如何告诉骆驼HTTP在经过特定时间后应该关闭连接?例如,该服务每小时都会从内容提供商那里获取信息。3-4小时后,HttpComponent应在拉后关闭连接,并在下一次拉时重新打开。目前,每个连接都会放回多线程HttpConnectionManager,套接字仍然打开

谢谢大家的帮助

共有3个答案

司空凌
2023-03-14

首先,罗曼·沃特纳,你的回答和你对发现这个问题的全心投入帮了我很大的忙。我已经和CLOSE_抗争了2天了,你的回答很有帮助。以下是我所做的。在我的CamelConfiguration类中添加了以下代码,它在启动时实质上篡改了CamelContext。

    HttpComponent http4 = camelContext.getComponent("https4", HttpComponent.class);
    http4.setHttpClientConfigurer(new HttpClientConfigurer() {

        @Override
        public void configureHttpClient(HttpClientBuilder builder) {
            builder.setConnectionReuseStrategy(NoConnectionReuseStrategy.INSTANCE);
        }
    });

工作起来很有魅力。

郑俊弼
2023-03-14

如果空闲连接在配置时间内空闲,则可以通过关闭空闲连接来完成。您可以通过为Camel Http组件配置空闲连接超时来实现同样的目的。Camel Http提供了这样做的接口。

转换org.apache.camel.component.http4. HttpComponent到PoolingHttpClientConnectionManager

        PoolingHttpClientConnectionManager poolingClientConnectionManager = (PoolingHttpClientConnectionManager) httpComponent
                .getClientConnectionManager();

        poolingClientConnectionManager.closeIdleConnections(5000, TimeUnit.MILLISECONDS);

访问这里[http://hc.apache.org/httpcomponents-client-ga/httpclient/apidocs/org/apache/http/impl/conn/PoolingHttpClientConnectionManager.html#closeIdleConnections(long,java.util.concurrent.TimeUnit)]

东方乐
2023-03-14

不幸的是,在应用程序最终关闭之前,没有一个建议的答案解决了我这边的CLOSE_WAIT连接状态。

我用以下测试用例重现了这个问题:

public class HttpInvokationTest extends CamelSpringTestSupport {

  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

  @EndpointInject(uri = "mock:success")
  private MockEndpoint successEndpoint;
  @EndpointInject(uri = "mock:failure")
  private MockEndpoint failureEndpoint;

  @Override
  protected AbstractApplicationContext createApplicationContext() {
    return new AnnotationConfigApplicationContext(ContextConfig.class);
  }

  @Configuration
  @Import(HttpClientSpringTestConfig.class)
  public static class ContextConfig extends CamelConfiguration {

    @Override
    public List<RouteBuilder> routes() {
      List<RouteBuilder> routes = new ArrayList<>(1);
      routes.add(new RouteBuilder() {
        @Override
        public void configure() {
          from("direct:start")
            .log(LoggingLevel.INFO, LOG, CONFIDENTIAL, "Invoking external URL: ${header[ERPEL_URL]}")
            .setHeader("Connection", constant("close"))
            .recipientList(header("TEST_URL"))
            .log(LoggingLevel.DEBUG, "HTTP response code: ${header["+Exchange.HTTP_RESPONSE_CODE+"]}")
            .bean(CopyBodyToHeaders.class)
            .choice()
              .when(header(Exchange.HTTP_RESPONSE_CODE).isGreaterThanOrEqualTo(300))
                .to("mock:failure")
              .otherwise()
                .to("mock:success");
        }
      });
      return routes;
    }
  }

  @Test
  public void testHttpInvocation() throws Exception {
    successEndpoint.expectedMessageCount(1);
    failureEndpoint.expectedMessageCount(0);

    ProducerTemplate template = context.createProducerTemplate();

    template.sendBodyAndHeader("direct:start", null, "TEST_URL", "http4://meta.stackoverflow.com");

    successEndpoint.assertIsSatisfied();
    failureEndpoint.assertIsSatisfied();

    Exchange exchange = successEndpoint.getExchanges().get(0);
    Map<String, Object> headers = exchange.getIn().getHeaders();
    String body = exchange.getIn().getBody(String.class);
    for (String key : headers.keySet()) {
      LOG.info("Header: {} -> {}", key, headers.get(key));
    }
    LOG.info("Body: {}", body);

    Thread.sleep(120000);
  }
}

并发出netstat-ab-p tcp|grep151.101.129.69请求,其中IP是meta.stackoverflow.com之一。

这给出了如下回答:

tcp4       0      0  192.168.0.10.52183     151.101.129.69.https   ESTABLISHED      37562       2118
tcp4       0      0  192.168.0.10.52182     151.101.129.69.http    ESTABLISHED        885        523

就在调用之后

tcp4       0      0  192.168.0.10.52183     151.101.129.69.https   CLOSE_WAIT       37562       2118
tcp4       0      0  192.168.0.10.52182     151.101.129.69.http    CLOSE_WAIT         885        523

响应,直到应用程序因连接而关闭:保持活动(keep alive),即使配置如下:

java prettyprint-override">@Configuration
@EnableConfigurationProperties(HttpClientSettings.class)
public class HttpClientSpringTestConfig {

  private final static Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

  @Resource
  private HttpClientSettings httpClientSettings;

  @Resource
  private CamelContext camelContext;

  private SocketConfig httpClientSocketConfig() {
    /*
      socket timeout:
      Monitors the time passed between two consecutive incoming messages over the connection and
      raises a SocketTimeoutException if no message was received within the given timeout interval
     */
    LOG.info("Creating a SocketConfig with a socket timeout of {} seconds", httpClientSettings.getSoTimeout());
    return SocketConfig.custom()
        .setSoTimeout(httpClientSettings.getSoTimeout() * 1000)
        .setSoKeepAlive(false)
        .setSoReuseAddress(false)
        .build();
  }

  private RequestConfig httpClientRequestConfig() {
    /*
      connection timeout:
      The time span the application will wait for a connection to get established. If the connection
      is not established within the given amount of time a ConnectionTimeoutException will be raised.
     */
    LOG.info("Creating a RequestConfig with a socket timeout of {} seconds and a connection timeout of {} seconds",
             httpClientSettings.getSoTimeout(), httpClientSettings.getConTimeout());
    return RequestConfig.custom()
        .setConnectTimeout(httpClientSettings.getConTimeout() * 1000)
        .setSocketTimeout(httpClientSettings.getSoTimeout() * 1000)
        .build();
  }

  @Bean(name = "httpClientConfigurer")
  public HttpClientConfigurer httpConfiguration() {
    ConnectionKeepAliveStrategy myStrategy = new ConnectionKeepAliveStrategy() {
      @Override
      public long getKeepAliveDuration(HttpResponse response, HttpContext context) {
        return 5 * 1000;
      }
    };

    PoolingHttpClientConnectionManager conMgr =
        new PoolingHttpClientConnectionManager();
    conMgr.closeIdleConnections(5, TimeUnit.SECONDS);

    return builder -> builder.setDefaultSocketConfig(httpClientSocketConfig())
                             .setDefaultRequestConfig(httpClientRequestConfig())
                             .setConnectionTimeToLive(5, TimeUnit.SECONDS)
                             .setKeepAliveStrategy(myStrategy)
                             .setConnectionManager(conMgr);
  }

  @PostConstruct
  public void init() {
    LOG.debug("Initializing HTTP clients");
    HttpComponent httpComponent = camelContext.getComponent("http4", HttpComponent.class);
    httpComponent.setHttpClientConfigurer(httpConfiguration());
    HttpComponent httpsComponent = camelContext.getComponent("https4", HttpComponent.class);
    httpsComponent.setHttpClientConfigurer(httpConfiguration());
  }
}

或者直接在各自的HttpComponent上定义设置。

在检查HttpClient代码中各自提出的方法时,很明显这些方法是单次操作,而不是HttpClient内部每隔几毫秒检查一次的配置。

PoolighttpClientConnectionManager进一步声明:

旧连接的处理在版本4.4中更改。以前,代码默认情况下会在重新使用之前检查每个连接。现在,代码仅在自上次使用连接以来经过的时间超过已设置的超时时检查连接。默认超时设置为2000ms

这仅在尝试重用连接时才会发生,这对于连接池是有意义的,尤其是在通过同一连接交换多个消息的情况下。对于单次调用,它应该更像Connection:关闭可能在一段时间内不会重用该连接,使连接处于打开或半关闭状态,因为没有进一步尝试从该连接读取,因此认识到自己可以关闭该连接。

我注意到,我已经解决了这样一个问题,而回来与传统的HttpClients,并开始移植此解决方案的骆驼,这是相当容易的。

该解决方案基本上包括向服务注册HttpClients,然后定期(在我的情况下为5秒)调用closeExpiredConnections()closeIdleConnections(...)

此逻辑保存在单例枚举中,因为这实际上是在几个应用程序使用的库中,每个应用程序都在自己的JVM中运行。

/**
 * This singleton monitor will check every few seconds for idle and stale connections and perform
 * a cleanup on the connections using the registered connection managers.
 */
public enum IdleConnectionMonitor {

  INSTANCE;

  private final static Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

  /** The execution service which runs the cleanup every 5 seconds **/
  private ScheduledExecutorService executorService =
      Executors.newScheduledThreadPool(1, new NamingThreadFactory());
  /** The actual thread which performs the monitoring **/
  private IdleConnectionMonitorThread monitorThread = new IdleConnectionMonitorThread();

  IdleConnectionMonitor() {
    // execute the thread every 5 seconds till the application is shutdown (or the shutdown method
    // is invoked)
    executorService.scheduleAtFixedRate(monitorThread, 5, 5, TimeUnit.SECONDS);
  }

  /**
   * Registers a {@link HttpClientConnectionManager} to monitor for stale connections
   */
  public void registerConnectionManager(HttpClientConnectionManager connMgr) {
    monitorThread.registerConnectionManager(connMgr);
  }

  /**
   * Request to stop the monitoring for stale HTTP connections.
   */
  public void shutdown() {
    executorService.shutdown();
    try {
      if (!executorService.awaitTermination(3, TimeUnit.SECONDS)) {
        LOG.warn("Connection monitor shutdown not finished after 3 seconds!");
      }
    } catch (InterruptedException iEx) {
      LOG.warn("Execution service was interrupted while waiting for graceful shutdown");
    }
  }

  /**
   * Upon invocation, the list of registered connection managers will be iterated through and if a
   * referenced object is still reachable {@link HttpClientConnectionManager#closeExpiredConnections()}
   * and {@link HttpClientConnectionManager#closeIdleConnections(long, TimeUnit)} will be invoked
   * in order to cleanup stale connections.
   * <p/>
   * This runnable implementation holds a weakly referable list of {@link
   * HttpClientConnectionManager} objects. If a connection manager is only reachable by {@link
   * WeakReference}s or {@link PhantomReference}s it gets eligible for garbage collection and thus
   * may return null values. If this is the case, the connection manager will be removed from the
   * internal list of registered connection managers to monitor.
   */
  private static class IdleConnectionMonitorThread implements Runnable {

    // we store only weak-references to connection managers in the list, as the lifetime of the
    // thread may extend the lifespan of a connection manager and thus allowing the garbage
    // collector to collect unused objects as soon as possible
    private List<WeakReference<HttpClientConnectionManager>> registeredConnectionManagers =
        Collections.synchronizedList(new ArrayList<>());

    @Override
    public void run() {

      LOG.trace("Executing connection cleanup");
      Iterator<WeakReference<HttpClientConnectionManager>> conMgrs =
          registeredConnectionManagers.iterator();
      while (conMgrs.hasNext()) {
        WeakReference<HttpClientConnectionManager> weakConMgr = conMgrs.next();
        HttpClientConnectionManager conMgr = weakConMgr.get();
        if (conMgr != null) {
          LOG.trace("Found connection manager: {}", conMgr);
          conMgr.closeExpiredConnections();
          conMgr.closeIdleConnections(30, TimeUnit.SECONDS);
        } else {
          conMgrs.remove();
        }
      }
    }

    void registerConnectionManager(HttpClientConnectionManager connMgr) {
      registeredConnectionManagers.add(new WeakReference<>(connMgr));
    }
  }

  private static class NamingThreadFactory implements ThreadFactory {

    @Override
    public Thread newThread(Runnable r) {
      Thread t = new Thread(r);
      t.setName("Connection Manager Monitor");
      return t;
    }
  }
}

如前所述,此单例服务生成一个自己的线程,该线程每5秒调用上述两个方法。这些调用负责关闭在一定时间内未使用或在规定时间内处于IDLE状态的连接。

为了简化此服务,可以使用EventNotifierSupport,以便在监视器线程关闭后让Camel负责关闭它。

/**
 * This Camel service with take care of the lifecycle management of {@link IdleConnectionMonitor} 
 * and invoke {@link IdleConnectionMonitor#shutdown()} once Camel is closing down in order to stop
 * listening for stale connetions.
 */
public class IdleConnectionMonitorService extends EventNotifierSupport {

  private final static Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

  private IdleConnectionMonitor connectionMonitor;

  @Override
  public void notify(EventObject event) {
    if (event instanceof CamelContextStartedEvent) {
      LOG.info("Start listening for closable HTTP connections");
      connectionMonitor = IdleConnectionMonitor.INSTANCE;
    } else if (event instanceof CamelContextStoppingEvent){
      LOG.info("Shutting down listener for open HTTP connections");
      connectionMonitor.shutdown();
    }
  }

  @Override
  public boolean isEnabled(EventObject event) {
    return event instanceof CamelContextStartedEvent || event instanceof CamelContextStoppingEvent;
  }

  public IdleConnectionMonitor getConnectionMonitor() {
    return this.connectionMonitor;
  }
}

为了利用该服务,HttpClient Camel内部使用的连接管理器需要向该服务注册,这在下面的代码块中完成:

private void registerHttpClientConnectionManager(HttpClientConnectionManager conMgr) {
  if (!getIdleConnectionMonitorService().isPresent()) {
    // register the service with Camel so that on a shutdown the monitoring thread will be stopped
    camelContext.getManagementStrategy().addEventNotifier(new IdleConnectionMonitorService());
  }
  IdleConnectionMonitor.INSTANCE.registerConnectionManager(conMgr);
}

private Optional<IdleConnectionMonitorService> getIdleConnectionMonitorService() {
  for (EventNotifier eventNotifier : camelContext.getManagementStrategy().getEventNotifiers()) {
    if (eventNotifier instanceof IdleConnectionMonitorService) {
      return Optional.of((IdleConnectionMonitorService) eventNotifier);
    }
  }
  return Optional.empty();
}

最后但并非最不重要的是,在我的例子中,在HttpClientSpringTestConfig中定义的连接管理器需要经过引入的寄存器函数

PoolingHttpClientConnectionManager conMgr = new PoolingHttpClientConnectionManager();
registerHttpClientConnectionManager(conMgr);

这可能不是最漂亮的解决方案,但它确实关闭了我机器上半封闭的连接。

@编辑

我刚刚了解到,您可以使用NoConnectionReuseStrategy,它将连接状态更改为TIME_WAIT而不是CLOSE_WAIT,因此会在很短的时间后删除连接。不幸的是,请求仍然带有Connection:保持活力标头。此策略将为每个请求创建一个新连接,即如果您有301 Moded Permanally重定向响应,则重定向将发生在新连接上。

http://code>bean需要更改为以下内容才能使用上述策略:

@Bean(name = "httpClientConfigurer")
public HttpClientConfigurer httpConfiguration() {
    return builder -> builder.setDefaultSocketConfig(socketConfig)
        .setDefaultRequestConfig(requestConfig)
        .setConnectionReuseStrategy(NoConnectionReuseStrategy.INSTANCE);
}
 类似资料:
  • 我在正确设置HTTP组件方面遇到了一些麻烦。目前,微服务从提供者提取JSON内容,对其进行处理,并将其发送到下一个服务以进行进一步的处理。主要问题是这个微服务创建了大量的CLOSE_WAIT套接字连接。我理解“keep-alive”的整个概念将保持连接打开直到我关闭它,但服务器可能会因为某些原因丢弃连接并创建这个CLOSE_WAIT套接字。 我已经创建了一个用于调试/测试目的的小服务,它向Goog

  • 我正在尝试开始将Jetty与Camel一起使用。我已将依赖项添加到我的pom: 我的CamelContext初始化如下: 当我尝试启动我的服务时,该服务具有定义为以下endpoint的路由: 我得到一个例外: 关于如何设置码头组件的文档最多也缺乏。我发现了一个邮件列表条目,其中说JettyHttpComponent自Camel 2.15以来一直是抽象的,现在该组件必须使用JettyHttpComp

  • 问题内容: 我正在尝试在在线考试中实现STRUTS SPRING和HIBERNATE INTEGRATION。使用apache tomcat 7.0.42在Eclipse Kepler中运行项目时,抛出以下错误 在控制台日志中,出现以下内容, struts.xml 请帮我朋友。我不知道为什么会出现。无论如何在此先感谢… !!! 问题答案: 我认为您缺少“ struts2-spring-plugin

  • 我很难用Spring安装发电机。我确信我的pom文件中有一个问题,询问我的打印跟踪是指我的数据源的问题和我目前没有使用的Hibernate的问题。我最终希望完成的是从我的数据库中打印出信息。感谢任何帮助。 配置类: Pom文件: 打印跟踪:

  • > WLP jpa级别:JPA-2.0 Spring版本:3.1.4.发行版 EntityManagerFactory bean:class=“org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean”

  • 问题内容: 我正在使用Spark 1.6(Cloudera 5.8.2),并尝试了以下方法来配置ORC属性。但这不会影响输出。 以下是我尝试过的代码段。 除此之外,我还尝试了在hive-site.xml和hiveContext对象中设置的这些属性。 hive –orcfiledump在输出中确认未应用配置。以下是Orcfiledump代码段。 问题答案: 您在这里犯了两个不同的错误。我不怪你 我去