当前位置: 首页 > 知识库问答 >
问题:

Apache camel与Kafka的集成

竺和洽
2023-03-14

我试图测试camel与kafka的集成,如这里所述

以下是我的代码

public class KafkaTest {

    public static void main(String args[]) throws Exception {
        CamelContext context = new DefaultCamelContext();

        context.addRoutes(new RouteBuilder() {
            public void configure() {
                from("kafka:test?zkConnect=localhost:2181&metadataBrokerList=localhost:9092")
                .process(new Processor() {
                    @Override
                    public void process(Exchange exchange) throws Exception {
                        System.out.println(exchange.getIn().getBody());
                    }
                })
                .end();
            }
        });

        context.start();
        while (true) {

        }
    }
}

然而,我得到了以下错误

Exception in thread "main" org.apache.camel.FailedToCreateRouteException: Failed to create route route1: Route(route1)[[From[kafka:test?zkConnect=localhost:2181&... because of Failed to resolve endpoint: kafka://test?amp%3BmetadataBrokerList=localhost%3A9092&zkConnect=localhost%3A2181 due to: Failed to resolve endpoint: kafka://test?amp%3BmetadataBrokerList=localhost%3A9092&zkConnect=localhost%3A2181 due to: There are 2 parameters that couldn't be set on the endpoint. Check the uri if the parameters are spelt correctly and that they are properties of the endpoint. 

Unknown parameters=[{metadataBrokerList=localhost:9092, zkConnect=localhost:2181}]

请提出可能遗漏的内容。

共有2个答案

董同
2023-03-14

使用endpoint类?

比如:

 public static KafkaEndpoint endpoint(String host, String port, String topic, String offset, String groupId) {
        String endpointUri = "kafka://" + host + ":" + port;
        KafkaEndpoint endpoint = new DefaultCamelContext().getEndpoint(endpointUri, KafkaEndpoint.class);
        endpoint.getConfiguration().setTopic(topic);
        endpoint.getConfiguration().setKeyDeserializer("org.apache.kafka.common.serialization.StringDeserializer");
        endpoint.getConfiguration().setValueDeserializer("org.apache.kafka.common.serialization.StringDeserializer");
        endpoint.getConfiguration().setAutoOffsetReset(offset);
        endpoint.getConfiguration().setGroupId(groupId);
        return endpoint;
    }

PollingConsumer consumer = endpoint.createPollingConsumer();

new RouteBuilder() {
            public void configure() {
                from(endpoint)
                .process(new Processor() {
                    @Override
                    public void process(Exchange exchange) throws Exception {
                        System.out.println(exchange.getIn().getBody());
                    }
                })
                .end();
            }
        }
鲁烨熠
2023-03-14

您应该使用正式文档中指定的正确参数名称。

from("kafka:localhost:9092?topic=test&zookeeperHost=localhost&zookeeperPort=2181")

github上的wiki中描述了您所指的版本,该版本是为Apache提供的,此后有所更改。

 类似资料:
  • 在Spring MVC项目中,我试图通过Spring Websockets将使用过的Kafka数据发送到前端(JavaScript)。 为了建立服务器和客户端之间的通信,我有以下内容。 客户端(app.js) 服务器(KafkaController.java) 要使用来自特定Kafka主题的数据,我使用@KafkaListener注释如下: 我有一个适当的Kafkanconfig类,包含所有必要的

  • 我正在尝试连接陋居到AWS MSKKafka。我一直收到下面的消息。我能够连接到MSK从相同的EC2实例以下步骤。但是陋居是不能连接。我们需要指定truststore,我不能在陋居中设置它。如有任何帮助,不胜感激。

  • 我正试图通过带有SASL配置的spring集成模块连接到kafka服务器,但出现错误 JAVAlang.IllegalArgumentException:在JAAS配置中找不到“KafkaClient”条目。系统属性的java。安全auth。登录。未设置“配置” 但当我构建简单的消费者和民意调查消息时,一切都很好。有人能告诉我如何关闭JAAS授权或通过它正确连接吗。这是我的Kafka。java和S

  • 我们有kafka集群,包含3个kafka代理节点和3个zookeepers服务器 Kafka版本- 10.1 ( hortonworks) 根据我的理解,因为所有的元数据都位于zookeeper服务器上,kafka代理正在使用这些数据(kafka通过端口2181与zookeeper服务器对话) 我只是想知道是否每台kafka机器都与集群中的其他kafka交谈,或者kafka可能只在动物园管理员服务

  • 我正在尝试向异步路由发送消息,但它不起作用。我刚刚在github上创建了一个项目来模拟这个问题

  • 我试图在Spring Integration中定义一个简单的消息流,它从一个通道读取消息,然后将消息转储到Kafka队列中。为此,我使用了spring集成kafka。问题是我得到了一个无法解读的错误。 以下是我的XML配置: 当我通过Spring启动运行我的应用程序时,我得到这个异常: 创建名称为org.springframework.integration.kafka.outbound.Kafk