当前位置: 首页 > 知识库问答 >
问题:

骆驼Kafka积分

柯镜
2023-03-14

我最近注意到Camel现在有自己的Kafka组件,所以我决定给它一个旋转。

我决定尝试一个很好的简单文件->kafka主题如下...

<route>
        <from uri="file:///tmp/input" />
        <setHeader headerName="kafka.PARTITION_KEY">
            <constant>Test</constant>
        </setHeader>
        <to uri="kafka:localhost:9092?topic=test&amp;zookeeperHost=localhost&amp;zookeeperPort=2181&amp;groupId=group1" />
</route>
    java.lang.ClassCastException: java.lang.String cannot be cast to [B
    at kafka.serializer.DefaultEncoder.toBytes(Encoder.scala:34)
    at org.apache.camel.component.kafka.KafkaProducer.process(KafkaProducer.java:78)
String msg = exchange.getIn().getBody(String.class);
KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, partitionKey.toString(), msg);
producer.send(data);

共有1个答案

秦琦
2023-03-14

啊,没关系,我们走了...希望这对其他人有帮助,您必须在选项中设置序列化程序。

<route>
            <from uri="file:///tmp/input" />
            <setHeader headerName="kafka.PARTITION_KEY">
                <constant>Test</constant>
            </setHeader>
            <to uri="kafka:localhost:9092?topic=test&amp;zookeeperHost=localhost&amp;zookeeperPort=2181&amp;groupId=group1&amp;serializerClass=kafka.serializer.StringEncoder" />
</route>
 类似资料:
  • 我有一个Spring Boot2.25.1应用程序,它使用Camel 2.25.1与camel-kafka,一切都正常工作…在我的Kafka消费者中,我需要添加该功能以按需暂停消费,因此我升级到camel 3.18.1,以便我可以使用可暂停功能。升级到3.18.1后,我收到错误FileNotes与类文件TimeoutAwareAggregationStategy.class. 当我打开camel-

  • 我一直在尝试为Spring引导Kafka骆驼Avro消费者寻找示例代码,但没有运气。我在以下URL找到了Spring Camel Kafka消费者和生产者示例: https://thysmichels.com/2015/09/04/apache-camel-kafka-spring-integration/ 我的具体问题是,一旦我的bean从Avro模式创建,并且我有了POJO类,我如何将上面的c

  • 我试图将Apache骆驼与Kafka集成在一起,并编写了一个示例程序来读取文件并写入Kafka主题。但是我这样做的时候出错了。我可以用相反的方式从Kafka主题读取并写入文件。 组织。阿帕奇。Kafka。常见的错误。SerializationException:无法转换类org的值。阿帕奇。骆驼组成部分文件将GenericFile发送到类组织。阿帕奇。Kafka。常见的序列化。在值中指定了Stri

  • 我正在使用带有Apache骆驼的Spring Boot。我正在从控制器调用路由。一旦路由完成,控制就会返回控制器。我正在VerifyLimitProcess和批准限制处理器中生成响应。如果我没有在路由中提供窃听配置,控制器会按预期检索标头和正文。但如果我在路由中引入窃听,控制器会将标头和正文接收为null。如果有人指出我需要做什么,以便我可以在选择语句中引入两个处理器的窃听配置,即VerifyLi

  • 我有一条小路线,我想使用自定义的重新传递策略来重复向endpoint发送消息,但这种行为非常奇怪。看起来,重新交付政策只是在重复一个错误。我试图将所有交换发送到路由的开头,但策略不起作用,因为每次都在创建: 我做错了什么?当错误发生时,我想以间隔重复我的请求。我的骆驼版本是2.6 日志:

  • 我正在遵循位于Camel MyBatis Integration guide的安装指南。我使用的是Service Mix 5.0.1。我使用了安装spring mybatis的功能,它支持3.2.4。释放我的SqlMapConfig文件只包含有关TypeHandler和TypeAlias的信息。 当我开启服务混合,然后启动我的应用程序,我收到以下堆栈跟踪: ...还有50个 我的Bean定义如下: