当前位置: 首页 > 知识库问答 >
问题:

未能反序列化Avro记录-Apache flink SQL CLI

嵇丰
2023-03-14

我正在将avro序列化数据发布到kafka主题,然后尝试通过SQL CLI界面从该主题创建Flink表。我能够创建主题,但在执行SQLSELECT语句后无法查看主题数据。然而,我能够使用简单的Kafka消费者反序列化和打印已发布的数据。在SQL CLI上获取此错误:

Flink SQL> SELECT * FROM test_flink2;
[ERROR] Could not execute SQL statement. Reason:
java.lang.ArrayIndexOutOfBoundsException: Index -3 out of bounds for length 2

表创建

Flink SQL> CREATE TABLE test_flink2 (
> `name` STRING,
> `address` STRING)
> WITH (
> 'connector' = 'kafka',
> 'topic' = 'test_flink2',
> 'scan.startup.mode' = 'earliest-offset',
> 'properties.bootstrap.servers' = 'krypton04.psc:9092',
> 'format' = 'avro');
[INFO] Table has been created.

表定义

Flink SQL> DESC test_flink2;
+---------+--------+------+-----+--------+-----------+
|    name |   type | null | key | extras | watermark |
+---------+--------+------+-----+--------+-----------+
|    name | STRING | true |     |        |           |
| address | STRING | true |     |        |           |
+---------+--------+------+-----+--------+-----------+
2 rows in set

Avro模式

{
  "name": "MyClass",
  "type": "record",
  "namespace": "myns",
  "fields": [
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "address",
      "type": "string"
    }
  ]
}

消息值(消息键为无)

    value = {'name' : 'vikram',
            'address' : 'hyd'}

我不断发送相同的消息值使用简单的kafka生产者主题test_flink2

Kafka主题描述

Topic: reddyvel_test_flink2 PartitionCount: 1   ReplicationFactor: 3    Configs: 
    Topic: reddyvel_test_flink2 Partition: 0    Leader: 1   Replicas: 1,4,5 Isr: 1,4,5

完整错误日志

来自flink-*-sql客户机-*。日志

2021-02-05 09:10:01,351 WARN  org.apache.flink.table.client.cli.CliClient                  [] - Could not execute SQL statement.
org.apache.flink.table.client.gateway.SqlExecutionException: Error while retrieving result.
        at org.apache.flink.table.client.gateway.local.result.CollectStreamResult.lambda$startRetrieval$0(CollectStreamResult.java:96) ~[flink-sql-client_2.12-1.12.1.jar:1.12.1]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) ~[?:?]
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:840) ~[?:?]
        at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:479) ~[?:?]
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) ~[?:?]
        at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) ~[?:?]
        at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) ~[?:?]
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) ~[?:?]
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) ~[?:?]
Caused by: java.util.concurrent.CompletionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 583e8c2eb20eb8d8bdedba04673bb297)
        at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:125) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) ~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]
        at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:665) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]
        at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:394) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) ~[?:?]
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085) ~[?:?]
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
        at java.lang.Thread.run(Thread.java:834) ~[?:?]
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 583e8c2eb20eb8d8bdedba04673bb297)
        at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:125) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) ~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]
        at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:665) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]
        at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:394) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) ~[?:?]
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085) ~[?:?]
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
        at java.lang.Thread.run(Thread.java:834) ~[?:?]
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:123) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) ~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]
        at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:665) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]
        at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:394) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) ~[?:?]
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085) ~[?:?]
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
        at java.lang.Thread.run(Thread.java:834) ~[?:?]
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:665) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at jdk.internal.reflect.GeneratedMethodAccessor42.invoke(Unknown Source) ~[?:?]
        at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
        at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:306) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
 at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.actor.Actor.aroundReceive(Actor.scala:517) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.actor.Actor.aroundReceive$(Actor.scala:515) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
Caused by: java.io.IOException: Failed to deserialize Avro record.
        at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:104) ~[?:?]
        at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:44) ~[?:?]
        at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113) ~[?:?]
        at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:177) ~[?:?]
        at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137) ~[?:?]
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:761) ~[?:?]
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
Caused by: java.lang.ArrayIndexOutOfBoundsException: Index -3 out of bounds for length 2
        at org.apache.flink.avro.shaded.org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460) ~[?:?]
        at org.apache.flink.avro.shaded.org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283) ~[?:?]
        at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187) ~[?:?]
        at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) ~[?:?]
        at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259) ~[?:?]
        at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247) ~[?:?]
        at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) ~[?:?]
        at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) ~[?:?]
        at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) ~[?:?]
        at org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:137) ~[?:?]
        at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:101) ~[?:?]
        at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:44) ~[?:?]
        at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113) ~[?:?]
        at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:177) ~[?:?]
        at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137) ~[?:?]
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:761) ~[?:?]
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241) ~[flink-dist_2.12-1.12.1.jar:1.12.1]

我无法找出问题的原因。

谢谢。

共有1个答案

督灿
2023-03-14

使用合流kafka python API发送消息

然后必须使用Flink的汇合Avro反序列化程序

您的错误是因为您试图使用纯Avro,这要求模式是消息的一部分(它找不到它,所以抛出数组)

 类似资料:
  • 我是Avro和Kafka的新手,我花了几天时间来发送关于Kafka主题的序列化数据...不成功。 让我来解释一下我想要达到的目标: 在生产者方面,我通过SOAP接收数据并发送关于Kafka主题的内容。我正在使用CXF从WSDL生成POJO,并且编写了相应的模式。我正在尝试做的是序列化由CXF解封的对象,并在我的Kafka主题上发送它们。 在web上找到的大多数示例中,Avro记录都是使用已知的模式

  • 我试图将Kafka中的Avro消息反序列化为从Avro模式生成的POJO。我正在使用Kafkaavroderializer进行此转换。 我可以在

  • 我在AWS上建立了一个汇流平台。我的源是MySql,我已经使用debezium连接器将它连接到Kafka connect。源的数据格式是JSON。现在在KSQL中,我创建了一个派生主题,并将JSON主题转换为AVRO以使数据能够使用JDBC连接器下沉到MYSQL。我使用了以下查询: 派生主题: 我曾尝试使用JSON消息直接下沉到mysql,但失败了,因为连接器需要模式,所以带模式的JSON或Avr

  • 目前,我正在使用Avro1.8.0序列化/反序列化对象,但面临一些问题,特别是java.util.Map对象。不面临其他类型对象的问题。 这里的示例代码- 在deserialize方法中,我试图根据输入数据获取模式,但avro抛出错误- 多谢了。

  • 添加到DTO对象后,我想向服务器发送一个列表 从…起 当向控制器发送对象时,它会抛出