当前位置: 首页 > 知识库问答 >
问题:

来自kafka的Flink消耗消息scala不能平面图

宋景福
2023-03-14

我试图遵循示例:https://blog.knoldus.com/a-quick-demo-kafka-to-flink-to-cassandra/我试图从kafka解析我的Shippingorder JSON消息并将其解析为对象。然后按一些属性对其进行分组,但在平面图步骤时出现错误。

我的sbt文件:

import Dependencies._

scalaVersion := "2.13.4"
version := "0.1.0-SNAPSHOT"
organization := "com.example"
organizationName := "example"

lazy val root = (project in file("."))
  .settings(
    name := "KafkaTest",
    libraryDependencies += scalaTest % Test,
    libraryDependencies += "org.apache.flink" % "flink-streaming-scala_2.12" % "1.12.1" % "provided",
    libraryDependencies += "org.apache.flink" % "flink-connector-kafka_2.12" % "1.12.1",
    libraryDependencies += "org.apache.flink" % "flink-clients_2.12" % "1.12.1",
    libraryDependencies += "org.json4s" %% "json4s-native" % "3.6.10",
  )

我的主文件。

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.scala._
import org.json4s.native.JsonMethods
import java.util.Properties

object Kafka {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val properties = new Properties()
    implicit lazy val formats = org.json4s.DefaultFormats
    properties.setProperty("bootstrap.servers", "broker:9092")
    properties.setProperty("group.id", "Flink")
    implicit val typeInfo = TypeInformation.of(classOf[(String)])
    implicit val typeInfo_2 = TypeInformation.of(classOf[(String, Int)])
    implicit val typeInfo_3 = TypeInformation.of(classOf[(org.json4s.JsonAST.JValue)])
    implicit val typeInfo_4 = TypeInformation.of(classOf[(ShippingOrder)])

    val consumer = new FlinkKafkaConsumer[String]("ShippingOrders", new SimpleStringSchema(), properties)
    consumer.setCommitOffsetsOnCheckpoints(true)
    consumer.setStartFromEarliest()
    val stream = env.addSource(consumer)
      .flatMap(JsonMethods.parse(_).toOption)
      .map(_.extract[ShippingOrder])

    stream.print()
    env.execute("Flink Kafka Example")
  }
}

我的订单对象

import scala.tools.nsc.doc.model.Trait

class ShippingOrder(
                     Old: Data,
                     New: Data,
                   )

class Data(
            ID: String,
            Action: String,
            ClientID: Int,
            Data: Trait,
            ToLocation: Location,
            ToName: String,
            ToPhone: String,
            Log: List[Log],
            IsPartialReturn: Boolean,
            Items: List[Item],
          )

class Log(
           Reason: String,
           ReasonCode: String,
           Status: String,
           //           UpdatedDate: java.sql.Date,
         )

class Item(
            Code: String,
            Name: String,
            Quantity: Int,
          )

class Location(
                //                Coordinates: Trait,
                Type: String,

              )

运行此作业时出错

WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/usr/local/Cellar/apache-flink/1.12.1/libexec/lib/flink-dist_2.12-1.12.1.jar) to field java.util.Properties.serialVersionUID
WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Job has been submitted with JobID 391088d1b7233806d15cd10da73f8660

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 391088d1b7233806d15cd10da73f8660)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:360)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:213)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:816)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:248)
        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1058)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1136)
        at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1136)
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 391088d1b7233806d15cd10da73f8660)
        at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
        at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
        at org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:123)
        at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782)
        at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:746)
        at Kafka$.main(Kafka.scala:34)
        at Kafka.main(Kafka.scala)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:343)
        ... 8 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 391088d1b7233806d15cd10da73f8660)
        at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:125)
        at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
        at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
        at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:665)
        at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
        at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
        at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
        at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:394)
        at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
        at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
        at java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610)
        at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085)
        at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
        at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:123)
        ... 18 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
        at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:665)
        at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
        at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
        at jdk.internal.reflect.GeneratedMethodAccessor109.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:306)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at akka.actor.Actor.aroundReceive(Actor.scala:517)
        at akka.actor.Actor.aroundReceive$(Actor.scala:515)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NoSuchMethodError: 'scala.collection.immutable.List scala.collection.immutable.List.map(scala.Function1)'
        at org.json4s.ParserUtil$Buffer.substring(ParserUtil.scala:139)
        at org.json4s.ParserUtil$.unquote(ParserUtil.scala:98)
        at org.json4s.native.JsonParser$Parser.parseString$1(JsonParser.scala:243)
        at org.json4s.native.JsonParser$Parser.nextToken(JsonParser.scala:282)
        at org.json4s.native.JsonParser$.$anonfun$astParser$1(JsonParser.scala:188)
        at org.json4s.native.JsonParser$.$anonfun$astParser$1$adapted(JsonParser.scala:145)
        at org.json4s.native.JsonParser$.parse(JsonParser.scala:133)
        at org.json4s.native.JsonParser$.parse(JsonParser.scala:71)
        at org.json4s.native.JsonMethods.parse(JsonMethods.scala:10)
        at org.json4s.native.JsonMethods.parse$(JsonMethods.scala:9)
        at org.json4s.native.JsonMethods$.parse(JsonMethods.scala:63)
        at Kafka$.$anonfun$main$1(Kafka.scala:30)
        at org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:681)
        at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
        at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
        at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
        at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
        at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
        at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
        at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
        at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
        at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
        at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
        at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)

我不知道这个错误。请解释并帮助我解决这个问题。

共有1个答案

柯景龙
2023-03-14

我很确定这是因为您没有使用fat jar,因此集群中缺少scala。您可能应该看看这里,了解如何在SBT中创建胖jar。

 类似资料:
  • 我正在使用Apache Flink中的Kafka连接器来访问合流Kafka提供的流。 除了模式注册表url

  • D: \软件\Kafka\Kafka2.10-0.10.0.1\bin\windows 我使用上面的命令来消费消息,有什么我错过的吗?帮助我: 这个 那些是生产者和消费者......

  • 我已将flinkkafkaconsumer作为源添加到我的streamexecutionenvironment中。我想在特定时间内没有收到新消息时关闭/阻止flink使用数据(类似于kafka polltime)。目前它正在无限期运行,并阻止执行移动到下一步(验证消息)。请建议是否有任何解决方法。 注意:我从反序列化中尝试了endofstream,但它无法工作,因为流实际上是不确定的。 提前谢谢。

  • 我们使用的是Spring kafka 2.7非阻塞重试机制。在Spring Kafka重试机制中,Kafka listenser使用来自main topic、Retry topic和DLT topic的消息,我们希望侦听器仅使用来自main和Retry topic的消息。 有没有简单的方法来进行设置? 因为我们不希望同一个消费者处理DLT消息。DLT还将被另一个进程使用,以发送请求通知。

  • 我们使用Akka流Kafka来生成和消费消息和Strimzi Kafka集群。以下是相关版本: 重构消息发出后,消费者停止工作。我们在主题中确实有一些信息,但消费者只是在无休止地等待。 以下是日志片段: 还有一些要点: 架构注册表配置正确且良好(否则生产者将无法工作)。 主题(和组协调器)很好,我可以通过这样的普通消费者消费消息: 这就是代码卡住的地方——我使用阻塞调用获取2条消息(甚至无法获取1

  • 我必须记录消费者在SpringKafka中花费的时间。由于kafkaListener方法对每条消息都执行,因此在那里放置一个记录器是行不通的。此外,有时一些信息会丢失,而不是被消费者消费掉。我应该把记录器放在哪里,以找出消费者启动后的弹性时间。使用者不会退出或关闭,其轮询将无限期进行