当前位置: 首页 > 知识库问答 >
问题:

Kafka连接接收器到Cassandra::java.lang.VerifyError:错误的返回类型

柴彬
2023-03-14

我正在尝试设置Kafka Connect接收器,以便使用Datastax连接器将主题中的数据收集到Cassandra表中:https://downloads.Datastax.com/#AKC

运行一个直接在代理上运行的独立worker,运行Kafka 0.10.2.2-1:

name=dse-sink
connector.class=com.datastax.kafkaconnector.DseSinkConnector
tasks.max=1
datastax-java-driver.advanced.protocol.version = V4

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

plugin.path=/usr/share/java/kafka-connect-dse/kafka-connect-dse-1.2.1.jar

topics=connect-test
contactPoints=172.16.0.48
loadBalancing.localDc=datacenter1
port=9042

ignoreErrors=true

topic.connect-test.cdrs.test.mapping= kafkakey=key, value=value
topic.connect-test.cdrs.test.consistencyLevel=LOCAL_QUORUM

但我有以下错误:

2019-12-23 16:58:43,165] ERROR Task dse-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
java.lang.VerifyError: Bad return type
Exception Details:
  Location:
    com/fasterxml/jackson/databind/cfg/MapperBuilder.streamFactory()Lcom/fasterxml/jackson/core/TokenStreamFactory; @7: areturn
  Reason:
    Type 'com/fasterxml/jackson/core/JsonFactory' (current frame, stack[0]) is not assignable to 'com/fasterxml/jackson/core/TokenStreamFactory' (from method signature)
  Current Frame:
    bci: @7
    flags: { }
    locals: { 'com/fasterxml/jackson/databind/cfg/MapperBuilder' }
    stack: { 'com/fasterxml/jackson/core/JsonFactory' }
  Bytecode:
    0x0000000: 2ab4 0002 b600 08b0                    

	at com.fasterxml.jackson.databind.json.JsonMapper.builder(JsonMapper.java:114)
	at com.datastax.dsbulk.commons.codecs.json.JsonCodecUtils.getObjectMapper(JsonCodecUtils.java:36)
	at com.datastax.kafkaconnector.codecs.CodecSettings.init(CodecSettings.java:131)
	at com.datastax.kafkaconnector.state.LifeCycleManager.lambda$buildInstanceState$9(LifeCycleManager.java:423)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.HashMap$ValueSpliterator.forEachRemaining(HashMap.java:1625)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
	at com.datastax.kafkaconnector.state.LifeCycleManager.buildInstanceState(LifeCycleManager.java:457)
	at com.datastax.kafkaconnector.state.LifeCycleManager.lambda$startTask$0(LifeCycleManager.java:106)
	at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
	at com.datastax.kafkaconnector.state.LifeCycleManager.startTask(LifeCycleManager.java:101)
	at com.datastax.kafkaconnector.DseSinkTask.start(DseSinkTask.java:74)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:244)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:145)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

卡桑德拉或Kafka方面没有额外的错误。我在cassandra节点上看到活动连接,但没有任何东西到达密钥空间。

共有1个答案

澹台景山
2023-03-14

Imho这是由于使用带有BigDecimal数据的JSON内部转换器引起的问题(请参见相关的SO问题)。正如下面的博客文章所述,internal.key.converterinternal.value.converter自Kafka2.0以来就不推荐使用,因此不应该显式设置。是否可以注释掉所有internal.属性&重试?

附言。还要了解JSON+Decimal在Kafka2.4中的变化

 类似资料:
  • 我对Kafka connect很陌生。我想把我的信息从Kafka主题推到弹性搜索。在阅读了可用的文档之后,我从发行版tar下载并编译了弹性搜索接收器。拉链(https://github.com/confluentinc/kafka-connect-elasticsearch/releases) 我添加了弹性搜索属性文件,并将上述jar包含在类路径中。当我在独立模式下运行kafka connect时

  • 我正在尝试从kafka中的主题将数据插入postgres数据库。我正在使用以下命令加载 sink-quick start-MySQL . properties如下 我得到的错误是 Postgres jar文件已经在文件夹中。有人能提出建议吗?

  • 我得到了一个错误:- 线程“main”java.lang.nosuchmethoderror:com.datastax.driver.core.queryoptions.setrefreshnodeintervalmillis(I)lcom/datastax/driver/core/queryoptions;**在com.datastax.spark.connector.cql.defaultCo

  • 我正在尝试将来自主题的数据(json数据)写入MySql数据库。我想我需要一个JDBC接收器连接器。 我如何配置连接器以将主题中的json数据映射到如何将数据插入数据库。 我能找到的文件只有这个。 “接收器连接器需要了解架构,因此您应该使用合适的转换器,例如架构注册表附带的Avro转换器,或启用了架构的JSON转换器。如果存在Kafka记录键,则可以是基元类型或连接结构,记录值必须是连接结构。从连

  • 我有一个Kafka连接接收器记录从Kafka主题到S3。它在工作,但太慢了。Kafka主题每秒接收约30000条消息。连接接收器无法跟上。我已经尝试增加Kafka连接器的任务。最大值从1到3,这会创建更多任务,但这似乎无助于提高消息/秒的速度。我试着增加Kafka连接工人的CPU分配,这似乎也没有帮助。 我还能试什么?哪些指标有助于监控以进一步识别瓶颈? 更新:Kafka主题有5个分区。Kafka

  • 下面是/etc/kafka/connect-MongoDB-source.properties中的MongoDB配置 但是低于误差 以独立模式运行连接器。 我在debezium-debezium-连接器-mongob-1.0.0/debezium-connector-mongodb-1.0.0.Final.jar 类路径的设置如下 使用插件路径,我看到它能够注册和加载所有必需的插件。 但最后还是同