我试图遵循示例:https://blog.knoldus.com/a-quick-demo-kafka-to-flink-to-cassandra/我试图从kafka解析我的Shippingorder JSON消息并将其解析为对象。然后按一些属性对其进行分组,但在平面图步骤时出现错误。
我的sbt文件:
import Dependencies._
scalaVersion := "2.13.4"
version := "0.1.0-SNAPSHOT"
organization := "com.example"
organizationName := "example"
lazy val root = (project in file("."))
.settings(
name := "KafkaTest",
libraryDependencies += scalaTest % Test,
libraryDependencies += "org.apache.flink" % "flink-streaming-scala_2.12" % "1.12.1" % "provided",
libraryDependencies += "org.apache.flink" % "flink-connector-kafka_2.12" % "1.12.1",
libraryDependencies += "org.apache.flink" % "flink-clients_2.12" % "1.12.1",
libraryDependencies += "org.json4s" %% "json4s-native" % "3.6.10",
)
我的主文件。
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.scala._
import org.json4s.native.JsonMethods
import java.util.Properties
object Kafka {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
implicit lazy val formats = org.json4s.DefaultFormats
properties.setProperty("bootstrap.servers", "broker:9092")
properties.setProperty("group.id", "Flink")
implicit val typeInfo = TypeInformation.of(classOf[(String)])
implicit val typeInfo_2 = TypeInformation.of(classOf[(String, Int)])
implicit val typeInfo_3 = TypeInformation.of(classOf[(org.json4s.JsonAST.JValue)])
implicit val typeInfo_4 = TypeInformation.of(classOf[(ShippingOrder)])
val consumer = new FlinkKafkaConsumer[String]("ShippingOrders", new SimpleStringSchema(), properties)
consumer.setCommitOffsetsOnCheckpoints(true)
consumer.setStartFromEarliest()
val stream = env.addSource(consumer)
.flatMap(JsonMethods.parse(_).toOption)
.map(_.extract[ShippingOrder])
stream.print()
env.execute("Flink Kafka Example")
}
}
我的订单对象
import scala.tools.nsc.doc.model.Trait
class ShippingOrder(
Old: Data,
New: Data,
)
class Data(
ID: String,
Action: String,
ClientID: Int,
Data: Trait,
ToLocation: Location,
ToName: String,
ToPhone: String,
Log: List[Log],
IsPartialReturn: Boolean,
Items: List[Item],
)
class Log(
Reason: String,
ReasonCode: String,
Status: String,
// UpdatedDate: java.sql.Date,
)
class Item(
Code: String,
Name: String,
Quantity: Int,
)
class Location(
// Coordinates: Trait,
Type: String,
)
运行此作业时出错
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/usr/local/Cellar/apache-flink/1.12.1/libexec/lib/flink-dist_2.12-1.12.1.jar) to field java.util.Properties.serialVersionUID
WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Job has been submitted with JobID 391088d1b7233806d15cd10da73f8660
------------------------------------------------------------
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 391088d1b7233806d15cd10da73f8660)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:360)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:213)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:816)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:248)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1058)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1136)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1136)
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 391088d1b7233806d15cd10da73f8660)
at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
at org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:123)
at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:746)
at Kafka$.main(Kafka.scala:34)
at Kafka.main(Kafka.scala)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:343)
... 8 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 391088d1b7233806d15cd10da73f8660)
at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:125)
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:665)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:394)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610)
at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085)
at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:123)
... 18 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:665)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
at jdk.internal.reflect.GeneratedMethodAccessor109.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:306)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NoSuchMethodError: 'scala.collection.immutable.List scala.collection.immutable.List.map(scala.Function1)'
at org.json4s.ParserUtil$Buffer.substring(ParserUtil.scala:139)
at org.json4s.ParserUtil$.unquote(ParserUtil.scala:98)
at org.json4s.native.JsonParser$Parser.parseString$1(JsonParser.scala:243)
at org.json4s.native.JsonParser$Parser.nextToken(JsonParser.scala:282)
at org.json4s.native.JsonParser$.$anonfun$astParser$1(JsonParser.scala:188)
at org.json4s.native.JsonParser$.$anonfun$astParser$1$adapted(JsonParser.scala:145)
at org.json4s.native.JsonParser$.parse(JsonParser.scala:133)
at org.json4s.native.JsonParser$.parse(JsonParser.scala:71)
at org.json4s.native.JsonMethods.parse(JsonMethods.scala:10)
at org.json4s.native.JsonMethods.parse$(JsonMethods.scala:9)
at org.json4s.native.JsonMethods$.parse(JsonMethods.scala:63)
at Kafka$.$anonfun$main$1(Kafka.scala:30)
at org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:681)
at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
我不知道这个错误。请解释并帮助我解决这个问题。
我很确定这是因为您没有使用fat jar,因此集群中缺少scala。您可能应该看看这里,了解如何在SBT中创建胖jar。
我正在使用Apache Flink中的Kafka连接器来访问合流Kafka提供的流。 除了模式注册表url
D: \软件\Kafka\Kafka2.10-0.10.0.1\bin\windows 我使用上面的命令来消费消息,有什么我错过的吗?帮助我: 这个 那些是生产者和消费者......
我已将flinkkafkaconsumer作为源添加到我的streamexecutionenvironment中。我想在特定时间内没有收到新消息时关闭/阻止flink使用数据(类似于kafka polltime)。目前它正在无限期运行,并阻止执行移动到下一步(验证消息)。请建议是否有任何解决方法。 注意:我从反序列化中尝试了endofstream,但它无法工作,因为流实际上是不确定的。 提前谢谢。
我们使用的是Spring kafka 2.7非阻塞重试机制。在Spring Kafka重试机制中,Kafka listenser使用来自main topic、Retry topic和DLT topic的消息,我们希望侦听器仅使用来自main和Retry topic的消息。 有没有简单的方法来进行设置? 因为我们不希望同一个消费者处理DLT消息。DLT还将被另一个进程使用,以发送请求通知。
我们使用Akka流Kafka来生成和消费消息和Strimzi Kafka集群。以下是相关版本: 重构消息发出后,消费者停止工作。我们在主题中确实有一些信息,但消费者只是在无休止地等待。 以下是日志片段: 还有一些要点: 架构注册表配置正确且良好(否则生产者将无法工作)。 主题(和组协调器)很好,我可以通过这样的普通消费者消费消息: 这就是代码卡住的地方——我使用阻塞调用获取2条消息(甚至无法获取1
我必须记录消费者在SpringKafka中花费的时间。由于kafkaListener方法对每条消息都执行,因此在那里放置一个记录器是行不通的。此外,有时一些信息会丢失,而不是被消费者消费掉。我应该把记录器放在哪里,以找出消费者启动后的弹性时间。使用者不会退出或关闭,其轮询将无限期进行