当前位置: 首页 > 知识库问答 >
问题:

kafka-骆驼文件读写错误

燕鸿文
2023-03-14

我试图将Apache骆驼与Kafka集成在一起,并编写了一个示例程序来读取文件并写入Kafka主题。但是我这样做的时候出错了。我可以用相反的方式从Kafka主题读取并写入文件。

组织。阿帕奇。Kafka。常见的错误。SerializationException:无法转换类org的值。阿帕奇。骆驼组成部分文件将GenericFile发送到类组织。阿帕奇。Kafka。常见的序列化。在值中指定了StringSerializer。序列化程序[#0-file://C:\share\input]KafkaProducer WARN No message key或partition key set[#0-file://C:\share\input]GenericFileOnCompletion WARN回滚文件策略:org。阿帕奇。骆驼组成部分文件策略GenericFileRenameProcessStrategy@7127845b对于文件:GenericFile[C:\share\input\file.txt][#0-file://C:\share\input]DefaultErrorHandler错误传递失败(交换ID:ID-L8-CWBL462-49953-1480494317350-0-21上的MessageId:ID-L8-CWBL462-49953-1480494317350-0-22)。尝试交付后筋疲力尽:1捕获:组织。阿帕奇。Kafka。常见的错误。SerializationException:无法转换类org的值。阿帕奇。骆驼组成部分文件将GenericFile发送到类组织。阿帕奇。Kafka。常见的序列化。在值中指定了StringSerializer。序列化程序

密码

@ContextName(“myCdiCamelContext”)公共类MyRoutes扩展了RouteBuilder{

 @Inject
 @Uri("file:C:\\share\\input?fileName=file.txt&noop=true")
 private Endpoint inputEndpoint;

 @Inject
 @Uri("kafka:localhost:9092?topic=test&groupId=testing&autoOffsetReset=earliest&consumersCount=1")
 private Endpoint resultEndpoint;



@Override
public void configure() throws Exception {
    from(inputEndpoint)
         .to(resultEndpoint);
}

}

共有1个答案

夏侯鹏
2023-03-14

添加新处理器后,它对我有效

public void configure() throws Exception {
        from(inputEndpoint).process(new Processor() {
            @Override
            public void process(Exchange exchange) throws Exception {
                exchange.getIn().setBody(exchange.getIn().getBody(),String.class);
                exchange.getIn().setHeader(KafkaConstants.PARTITION_KEY, 0);
                exchange.getIn().setHeader(KafkaConstants.KEY, "1");
            }
        })
             .to(resultEndpoint);
    }
 类似资料:
  • 我有一个Spring Boot2.25.1应用程序,它使用Camel 2.25.1与camel-kafka,一切都正常工作…在我的Kafka消费者中,我需要添加该功能以按需暂停消费,因此我升级到camel 3.18.1,以便我可以使用可暂停功能。升级到3.18.1后,我收到错误FileNotes与类文件TimeoutAwareAggregationStategy.class. 当我打开camel-

  • 我最近注意到Camel现在有自己的Kafka组件,所以我决定给它一个旋转。 我决定尝试一个很好的简单文件->kafka主题如下...

  • 我刚接触骆驼,需要一些指导。我需要从S3存储桶中读取一些文件。结构是这样的。 当一个特定的excel文件被放入传入的/xls文件夹(比如file1.xls)时,我需要拾取所有文件,进行一些处理并将它们放入具有相同目录结构的已处理文件夹中。 我需要使用什么组件?我试着阅读留档,但有点难以弄清楚我需要什么组件。我知道我会使用camel-aws-s3插件,但那里没有很多例子。

  • 我有一个Camel/SpringBoot应用程序,它从GraphQLendpoint检索数据,将数据存储在内存数据库(2个表)中,通过运行SQL查询提取CSV文件,然后将文件上传到FTP服务器。由于将提取约350k条记录,我使用SQLs outputType=StreamList、splitter和stream:file。整个路线如下所示: 提取数据时不会出现任何问题,并使用记录创建CSV文件。但

  • 我刚刚开始研究apache camel(使用蓝图路线),我已经被卡住了。 我需要处理一组不同格式的csv文件。我有5个文件,foo_X_X指定csv文件的类型,文件有日期戳。这些文件可能很大,所以一旦写入所有文件,就会写入一个“完成”文件。完成的文件名为foo_trigger_20160110.csv. 我在文件中看到了doneFileName选项,但它只支持静态名称(我在文件名中有一个日期),或

  • 我正在使用骆驼beanio组件对文件内部的数据进行封送和解封。