我遵循了http://www.confluent.io/blog/kafka-connect-cassandra-sink-the-perfect-match/我可以从avro控制台向cassandra插入数据。现在我正在尝试将其扩展到使用水槽,我的机器中设置了水槽,它将拾取日志文件并将其推送到kafka,尝试将我的数据插入cassandra数据库。在文本文件中,我正在放置数据
{“id”: 1, “created”: “2016-05-06 13:53:00”, “product”: “OP-DAX-P-20150201-95.7”, “price”: 94.2}
{“id”: 2, “created”: “2016-05-06 13:54:00”, “product”: “OP-DAX-C-20150201-100”, “price”: 99.5}
{“id”: 3, “created”: “2016-05-06 13:55:00”, “product”: “FU-DATAMOUNTAINEER-20150201-100”, “price”: 10000}
{“id”: 4, “创建”: “2016-05-06 13:56:00”, “产品”: “FU-KOSPI-C-20150201-100”, “价格”: 150}
Flume正在收集数据并将其推送给kafka。
在cassandra水槽,我面临着一个错误,
错误任务cassandra-sink-orders-0引发了一个未捕获且不可恢复的异常(org . Apache . Kafka . connect . runtime . worker Task:142)org . Apache . Kafka . connect . errors . data异常:无法将数据反序列化到Avro:at io . confluent . connect . Avro . Avro . converter . to connect data(Avro converter . Java:109)at org . Apache . Kafka . connect . runtime . workersinktask . convert messages(workersinktask .[2016-09-28 15:47:00,951]错误任务正在被终止,在手动重启之前不会恢复(org . Apache . Kafka . connect . runtime . worker Task:143)[2016-09-28 15:47:00,951]信息停止Cassandra sink。(com . data mountaineer . stream reactor . connect . cassandra . sink . cassandras ink task:79)[2016-09-28 15:47:00,952]关闭Cassandra驱动程序会话和集群的信息。(com . data mountaineer . stream reactor . connect . Cassandra . sink . cassandrajsonwriter:165)
我正在使用的模式
./confluent/bin/kafka-avro-console-producer \--broker-list localhost:9092 \--topic orders-topic \--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"}, {"name":"created", "type": "string"}, {"name":"product", "type": "string"}, {"name":"price", "type": "double"}]}'
flume的配置:Flume-kafka.conf.properties
agent.sources = spoolDirSrc
agent.channels = memoryChannel
agent.sinks = kafkaSink
agent.sources.spoolDirSrc.type = spooldir
agent.sources.spoolDirSrc.spoolDir = eventlogs
agent.sources.spoolDirSrc.inputCharset = UTF-8
agent.sources.spoolDirSrc.deserializer.maxLineLength = 1048576
agent.sources.spoolDirSrc.channels = memoryChannel
agent.sinks.kafkaSink.channel = memoryChannel
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 1000
agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafkaSink.topic = orders-topic
agent.sinks.kafkaSink.brokerList = localhost:9092
agent.sinks.kafkaSink.channel = memoryChannel
agent.sinks.kafkaSink.batchSize = 20
有没有人可以帮我,如何修复这个错误?
一般来说,如果您有一个未知的魔术字节,这意味着您的Kafka客户端和服务器版本不兼容。检查以确保您的Cassandra接收器版本已使用Kafka客户端库构建,其版本小于或等于您的代理。
我使用的是spring数据cassandra,需要使用jpa映射一个字段,在cassandra中,该字段的类型为
我目前在cassandra中有一个名为macrecord的表,类似于以下内容: 在这种情况下,我想不出其他解决方案,只有在macadd值重复的情况下删除整行,然后插入具有更新时间戳的新行。 是否有更好的解决方案在macadd值重复时更新时间戳,或者在我的原始表中只有macadd是主键的范围内查询时间戳值的替代方法。
我用Spring Data Cassandra 2.2.1开发了一个新的应用程序,想在Cassandra 2.1.9服务器上运行它(旧的,我知道)。但是我们得到了错误 Spring数据卡桑德拉手册声称Spring数据2.2.1至少需要卡桑德拉2.1,所以这应该有效,但它没有。我们包含的唯一特定于卡桑德拉的依赖项是 我怎样才能让这个工作?
我有这个代码: 我得到以下异常: 所有主机尝试查询失败(已尝试:/127.0.0.1:9042(com.datastax.driver.core.TransportException:[/127.0.0.1:9042]无法连接)),堆栈跟踪:com.datastax.driver.core.exceptions.NoHostAvailableException:所有主机尝试查询失败(已尝试:/12
我有很多事件的行数据。这些事件共享一些公共标识符,但也具有事件所特有的数据。 既然Cassandra不允许在没有索引的字段上使用where子句,那么为每个键创建一行是否有优势,或者将json数据存储在blob中是否更容易? 例子 表有一个按事件类型分区的键(事件类型,时间戳) event_type 可能有< code > ts:12345 page _ uri:" ABC 1234 " user
我正在使用卡桑德拉2.1.5和卡桑德拉爪哇驱动程序2.0.10。当我从卡桑德拉表获取数据时,我面临以下异常。 com . datas tax . driver . core . exceptions . nohostavailableexception:在com . datas tax . driver . core . exceptions . nohostavailableexception