当前位置: 首页 > 知识库问答 >
问题:

汇合和卡桑德拉:获取数据异常:无法将数据反序列化到Avro,未知的神奇字节

丁长卿
2023-03-14

我遵循了http://www.confluent.io/blog/kafka-connect-cassandra-sink-the-perfect-match/我可以从avro控制台向cassandra插入数据。现在我正在尝试将其扩展到使用水槽,我的机器中设置了水槽,它将拾取日志文件并将其推送到kafka,尝试将我的数据插入cassandra数据库。在文本文件中,我正在放置数据

{“id”: 1, “created”: “2016-05-06 13:53:00”, “product”: “OP-DAX-P-20150201-95.7”, “price”: 94.2}

{“id”: 2, “created”: “2016-05-06 13:54:00”, “product”: “OP-DAX-C-20150201-100”, “price”: 99.5}

{“id”: 3, “created”: “2016-05-06 13:55:00”, “product”: “FU-DATAMOUNTAINEER-20150201-100”, “price”: 10000}

{“id”: 4, “创建”: “2016-05-06 13:56:00”, “产品”: “FU-KOSPI-C-20150201-100”, “价格”: 150}

Flume正在收集数据并将其推送给kafka。

在cassandra水槽,我面临着一个错误,

错误任务cassandra-sink-orders-0引发了一个未捕获且不可恢复的异常(org . Apache . Kafka . connect . runtime . worker Task:142)org . Apache . Kafka . connect . errors . data异常:无法将数据反序列化到Avro:at io . confluent . connect . Avro . Avro . converter . to connect data(Avro converter . Java:109)at org . Apache . Kafka . connect . runtime . workersinktask . convert messages(workersinktask .[2016-09-28 15:47:00,951]错误任务正在被终止,在手动重启之前不会恢复(org . Apache . Kafka . connect . runtime . worker Task:143)[2016-09-28 15:47:00,951]信息停止Cassandra sink。(com . data mountaineer . stream reactor . connect . cassandra . sink . cassandras ink task:79)[2016-09-28 15:47:00,952]关闭Cassandra驱动程序会话和集群的信息。(com . data mountaineer . stream reactor . connect . Cassandra . sink . cassandrajsonwriter:165)

我正在使用的模式

 ./confluent/bin/kafka-avro-console-producer \--broker-list localhost:9092 \--topic orders-topic \--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"}, {"name":"created", "type": "string"}, {"name":"product", "type": "string"}, {"name":"price", "type": "double"}]}'

flume的配置:Flume-kafka.conf.properties

agent.sources = spoolDirSrc
agent.channels = memoryChannel
agent.sinks = kafkaSink


agent.sources.spoolDirSrc.type = spooldir
agent.sources.spoolDirSrc.spoolDir = eventlogs
agent.sources.spoolDirSrc.inputCharset = UTF-8
agent.sources.spoolDirSrc.deserializer.maxLineLength = 1048576

agent.sources.spoolDirSrc.channels = memoryChannel
agent.sinks.kafkaSink.channel = memoryChannel
agent.channels.memoryChannel.type = memory

agent.channels.memoryChannel.capacity = 1000

 agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
 agent.sinks.kafkaSink.topic = orders-topic
 agent.sinks.kafkaSink.brokerList = localhost:9092
 agent.sinks.kafkaSink.channel = memoryChannel
 agent.sinks.kafkaSink.batchSize = 20

有没有人可以帮我,如何修复这个错误?

共有1个答案

皇甫宇定
2023-03-14

一般来说,如果您有一个未知的魔术字节,这意味着您的Kafka客户端和服务器版本不兼容。检查以确保您的Cassandra接收器版本已使用Kafka客户端库构建,其版本小于或等于您的代理。

 类似资料:
  • 我使用的是spring数据cassandra,需要使用jpa映射一个字段,在cassandra中,该字段的类型为

  • 我目前在cassandra中有一个名为macrecord的表,类似于以下内容: 在这种情况下,我想不出其他解决方案,只有在macadd值重复的情况下删除整行,然后插入具有更新时间戳的新行。 是否有更好的解决方案在macadd值重复时更新时间戳,或者在我的原始表中只有macadd是主键的范围内查询时间戳值的替代方法。

  • 我用Spring Data Cassandra 2.2.1开发了一个新的应用程序,想在Cassandra 2.1.9服务器上运行它(旧的,我知道)。但是我们得到了错误 Spring数据卡桑德拉手册声称Spring数据2.2.1至少需要卡桑德拉2.1,所以这应该有效,但它没有。我们包含的唯一特定于卡桑德拉的依赖项是 我怎样才能让这个工作?

  • 我有这个代码: 我得到以下异常: 所有主机尝试查询失败(已尝试:/127.0.0.1:9042(com.datastax.driver.core.TransportException:[/127.0.0.1:9042]无法连接)),堆栈跟踪:com.datastax.driver.core.exceptions.NoHostAvailableException:所有主机尝试查询失败(已尝试:/12

  • 我有很多事件的行数据。这些事件共享一些公共标识符,但也具有事件所特有的数据。 既然Cassandra不允许在没有索引的字段上使用where子句,那么为每个键创建一行是否有优势,或者将json数据存储在blob中是否更容易? 例子 表有一个按事件类型分区的键(事件类型,时间戳) event_type 可能有< code > ts:12345 page _ uri:" ABC 1234 " user

  • 有人可以帮助我了解如何使用POJO类插入卡桑德拉UDT数据吗? 我创建了一个POJO类来映射Cassandra的表,并为Cassandra UDT创建了另一个类,但是当我插入映射Cassandra表的主POJO类时,它无法识别另一个POJO类(映射Cassandra的UDT)。我还在每个类和每个类对象上编写了注释。 这是我的一个POJO类:- 另一个POJO类:-