当前位置: 首页 > 知识库问答 >
问题:

使用Camel-Kafka的ProducerTemplate sendBody()方法时,无法为终结点创建生产者

明越
2023-03-14

我正在测试使用Apache Camel在Kafka(0.8.2.1)上发送消息的simple producer。我在Camel中使用java DSL创建了endpoint。

CamelContext ctx =new DefaultCamelContext();
PropertiesComponent properties=new PropertiesComponent();

properties.setLocation("com/camel/test/props.properties");
ctx.addComponent("properties",properties);

final String uri= "kafka://{{kafka.host}}?topic={{topic}}&zookeeperHost={{zookeeperHost}}&zookeeperPort={{zookeeperPort}}";
String uriParams = "&metadata.broker.list={{metadata.broker.list}";

ctx.addRoutes(new RouteBuilder() {
    public void configure() { //
        from(uri+"&groupId={{groupId}}")
        .process(new Processor() {
            @Override
            public void process(Exchange exchange) throws Exception {
                System.out.println(exchange.getIn().getBody());
            }
        })
        ;
    }
});


ctx.start();

ProducerTemplate tmp = ctx.createProducerTemplate();
tmp.sendBody(ctx.getEndpoint(uri), "my test is working");// Error occurs here

现在我想使用Apache Camel提供的ProducerTempalte在kafka上发送消息。但我在运行该程序时得到以下错误注意:Zookeeper和Kafka已启动并可以使用Kafka控制台生成/消费消息。

Exception in thread "main" org.apache.camel.FailedToCreateProducerException: Failed to create Producer for endpoint: Endpoint[kafka://localhost:9092?topic=test&zookeeperHost=localhost&zookeeperPort=2181]. Reason: java.lang.NullPointerException
    at org.apache.camel.impl.ProducerCache.doGetProducer(ProducerCache.java:407)
    at org.apache.camel.impl.ProducerCache.doInProducer(ProducerCache.java:220)
    at org.apache.camel.impl.ProducerCache.sendExchange(ProducerCache.java:343)
    at org.apache.camel.impl.ProducerCache.send(ProducerCache.java:184)
    at org.apache.camel.impl.DefaultProducerTemplate.send(DefaultProducerTemplate.java:124)
    at org.apache.camel.impl.DefaultProducerTemplate.sendBody(DefaultProducerTemplate.java:137)
    at com.camel.test.CamelTest.main(CamelTest.java:45)
Caused by: java.lang.NullPointerException
    at java.util.Hashtable.put(Hashtable.java:514)
    at org.apache.camel.component.kafka.KafkaProducer.getProps(KafkaProducer.java:54)
    at org.apache.camel.component.kafka.KafkaProducer.doStart(KafkaProducer.java:61)
    at org.apache.camel.support.ServiceSupport.start(ServiceSupport.java:61)
    at org.apache.camel.impl.DefaultCamelContext.startService(DefaultCamelContext.java:2869)
    at org.apache.camel.impl.DefaultCamelContext.doAddService(DefaultCamelContext.java:1097)
    at org.apache.camel.impl.DefaultCamelContext.addService(DefaultCamelContext.java:1058)
    at org.apache.camel.impl.ProducerCache.doGetProducer(ProducerCache.java:405)
    ... 6 more

我猜这些属性不是为生产者设置的,而是不知道如何在生产者模板中设置。

共有1个答案

邹修真
2023-03-14

uri应该将broker列表作为服务器名(不要责怪我没有创建这个组件的语法)。

final String uri= "kafka://{{metadata.broker.list}}?topic={{topic}}&zookeeperHost={{zookeeperHost}}&zookeeperPort={{zookeeperPort}}";
 类似资料:
  • 我正在尝试向外部SOAP服务发送SOAP消息。我需要做的是用SOAP消息包装一个xml字符串,并添加一些SOAP头以进行身份验证。现在我正在使用: 这是可行的,但我还需要包括soap头。有效负载模式只发送带有主体的soap信封。我曾尝试使用拦截器注入头,但它被忽略了。 有可能有一个带有有效负载模式的肥皂标题部分吗?如果没有,完成我正在尝试做的事情的最简单方法是什么? 谢谢

  • 使用protobuf版本2.6.1(我通过自制软件安装) 我想跑 我一直收到这个错误。 我的go路径中安装了protoc gen go。还有其他人有这个问题吗?

  • 我使用的是Spring-Kafka2.2.2.release(org.apache.kafka:kafka-clients:jar:2.0.1)和spring-boot(2.1.1)。我无法执行事务,因为我的侦听器无法获得分配的分区。我只为一个消费者创建了建议的配置。我正在尝试配置一个事务性侦听器容器,并且只处理一次 我使用事务管理器配置了生产者和使用者,生产者使用事务id,使用者使用isolat

  • 我还更改了zookeeper中的zoo.cfg。 和server.properties。 我看了所有的教程,并做了完全相同的方法。还有乌斯金Kafka开放式动物园管理员。 3)创建主题 .\bin\windows\kafka-topics.bat--create-zookeeper localhost:2181-replication-factor 1-partitions 1-topic hel

  • 我正在使用apache camel,希望有多条路由。路线如下。

  • kafka-python(1.0.0)在连接到代理时抛出错误。同时 /usr/bin/kafka-console-producer和 /usr/bin/kafka-console-consumer正常工作。 Python应用程序过去也运行良好,但是在动物园管理员重新启动后,它不再能够连接。 我使用文档中的裸露骨骼示例: 我收到这个错误: 单步通过( /usr/lib/python2.6/site-