当前位置: 首页 > 知识库问答 >
问题:

Apache camel-K:ActiveMQ发射器实例化问题

佴阳曦
2023-03-14

我对Apache camel和camel-K都是新手。我正在构建一个Kubernetes容器化堆栈,其中ActiveMQ消息队列作为外部数据源的入站接口,InfluxDB作为数据存储,camel-K路由用于从AMQ到InfluxDB的消息路由。除了Camel-K路线,该系统运行良好。我通过分配给服务的静态IPendpoint向集群公开ActiveMQ的61616端口:

apiVersion: v1
kind: Service
metadata:
  name: activemq-external
spec:
  selector:
    app: activemq
  type: NodePort
  ports:
    - name: port-service-console
      port: 8161
      targetPort: 8161
      nodePort: 8161
      protocol: TCP
---
# Service definition for internal service with static IP (which can be used in camel-K integration "ActiveMQToInfluxDB.yaml")
# see https://kubernetes.io/docs/concepts/services-networking/service/#service-resource
apiVersion: v1
kind: Service
metadata:
  name: activemq-internal
spec:
#  type: NodePort
  ports:
    - name: port-internal
      port: 61616
      targetPort: 61616
#      nodePort: 61616
      protocol: TCP
---
apiVersion: v1
kind: Endpoints
metadata:
  name: activemq-internal
subsets:
  - addresses:
      - ip: 172.17.0.8
    ports:
      - port: 61616

因此,我希望集群中的其他POD可以通过172.17.0.8:616访问该端口。

我正在通过命令运行camel-K路由(不必担心对apache commons的依赖,我需要它来进行字符串操作)

kamel run kamel-integrations/ActiveMQToInfluxDbRoute.java -d mvn:org.apache.commons:commons-lang3:3.9 -d mvn:org.influxdb:influxdb-java:2.17 -d mvn:org.apache.activemq:activemq-camel:5.15.11 -d mvn:org.apache.camel:camel-core:2.25.0

集成的java代码如下所示:

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.BindToRegistry;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.Message;
import org.apache.commons.lang3.StringUtils;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
//import org.springframework.context.annotation.Bean;
//import org.springframework.context.annotation.Configuration;

public class ActiveMQToInfluxDbRoute extends RouteBuilder {

    private static final String MESSAGE_HEADER = "InfluxTimestamp";

    @BindToRegistry
    public ActiveMQConnectionFactory registerActiveMQConnectionFactory() {
        System.out.println("ActiveMQ Listener: STARTING...");
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL("tcp://172.17.0.8:61616");
        connectionFactory.setUserName("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setUseAsyncSend(false);
        connectionFactory.setClientID("Influx Message Queue");
        connectionFactory.setConnectResponseTimeout(300);
        System.out.println("ActiveMQ Listener: STARTED");
        return connectionFactory;
    }

    @Override
    public void configure() throws Exception {

    String sourceString = "activemq:queue:cryring_db_inbound?brokerURL=tcp://172.17.0.8:61616";
    String targetString = "influxdb://influxDb?databaseName=my_database=true&retentionPolicy=default";

    from(sourceString) //
      .process(messagePayload -> {
           String manipulation stuff
      })//
     .to(targetString) //
     .onException(Exception.class) //
     .useOriginalMessage() //
     .handled(true) //
     .log("error") //
     .to("stream:out");
    }

java代码的执行因以下异常而失败:

Route(route1)[From[activemq:queue:my_database?brokerU... because of Failed to resolve endpoint: activemq://queue:my_database?brokerURL=tcp%3A%2F%2F172.17.0.8%3A61616 due to: java.lang.IllegalArgumentException: wrong number of arguments

我发现这篇帖子也有类似的问题:https://issues.apache.org/jira/browse/CAMEL-13145但我不知道该怎么办。

共有1个答案

薛浩言
2023-03-14

我能自己解决这个问题:

正如帖子中所说https://issues.apache.org/jira/browse/CAMEL-13145,以及我一开始不理解的依赖关系-d mvn:org。阿帕奇。activemq:activemq camel:5.15.11导致问题。如果我按如下方式运行camel-K,ActiveMQ问题就会消失:

kamel run kamel-integrations/ActiveMQToInfluxDbRoute.java -d mvn:org.apache.commons:commons-lang3:3.9 -d mvn:org.influxdb:influxdb-java:2.17 -d mvn:org.apache.camel:camel-activemq:3.0.1 -d mvn:org.apache.camel:camel-core:2.25.0
 类似资料:
  • 根据单一实例,系统不会向持有实例的任务启动任何其他活动。该活动始终是其任务的唯一成员;由该活动启动的任何活动都将在单独的任务中打开。 但是,当我从活动A(启动器活动)导航时- *没有添加旗帜。 为什么活动B推到活动A之上(因为活动具有启动模式:“singleInstance”),而不是创建新任务? 活动清单: TaskRecord{14ba4a25#18 A=com.example.nischay

  • 在最近的JDK升级后,我面临着Codenameone的问题。试图用多种方法解决它,但没有运气。

  • 嗨,我有下面的map-reduce代码,我试图通过它解析我的XML文件并在输出中创建一个CSV。 我还有一个名为Connect_Home的类,在这个类中,我使用JAXB解析数据,提取数据。但当我运行代码时,会出现以下错误:

  • 7.2.2 实例化容器 实例化Spring的IoC容器很简单。提供给ApplicationContext构造方法的一个或多个路径是表示实际资源的字符串,通过路径容器能从各种外部资源(比如本地文件系统)、Java的CLASSPATH等处加载配置元数据。 ApplicationContext context = new ClassPathXmlApplicationContext("services.

  • 我正在尝试向异步路由发送消息,但它不起作用。我刚刚在github上创建了一个项目来模拟这个问题

  • 我试图在聚合器完成后获得一个回复,但是我得到一个异常,我没有指定任何聚合器子项,但是当我指定一个。to()endpoint我没有收到聚合结果。。。这可能吗? 控制器: 聚合器: