当前位置: 首页 > 知识库问答 >
问题:

使用Apache Beam编写通用记录时的Avro“not open”异常

羊舌和安
2023-03-14

我使用avroio. WriteCustomTypeTogenericRecords() 在流数据流作业中向GCS写入一般记录。在最初的几分钟里,一切似乎都很好,然而,大约10分钟后,作业开始抛出以下错误:

java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: org.apache.avro.AvroRuntimeException: not open
        com.google.cloud.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:183)
        com.google.cloud.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:102)
        org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1057)
        org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:438)
        org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:125)
        org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1060)
        org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:768)
        com.google.cloud.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:95)
        com.google.cloud.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:42)
        com.google.cloud.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115)
        com.google.cloud.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
        org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
        com.google.cloud.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:133)
        com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:43)
        com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:48)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:200)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
        com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1227)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:136)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.beam.sdk.util.UserCodeException: org.apache.avro.AvroRuntimeException: not open
        org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
        org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown Source)
        org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
        org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:237)
        com.google.cloud.dataflow.worker.StreamingSideInputDoFnRunner.processElement(StreamingSideInputDoFnRunner.java:72)
        com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:324)
        com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:43)
        com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:48)
        com.google.cloud.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:181)
        com.google.cloud.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:102)
        org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1057)
        org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:438)
        org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:125)
        org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1060)
        org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:768)
        com.google.cloud.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:95)
        com.google.cloud.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:42)
        com.google.cloud.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115)
        com.google.cloud.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
        org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
        com.google.cloud.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:133)
        com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:43)
        com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:48)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:200)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
        com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1227)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:136)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.avro.AvroRuntimeException: not open
        org.apache.avro.file.DataFileWriter.assertOpen(DataFileWriter.java:82)
        org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:299)
        org.apache.beam.sdk.io.AvroSink$AvroWriter.write(AvroSink.java:123)
        org.apache.beam.sdk.io.WriteFiles.writeOrClose(WriteFiles.java:550)
        org.apache.beam.sdk.io.WriteFiles.access$1000(WriteFiles.java:112)
        org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:718)

不过,数据流作业继续正常运行。为了给出关于流作业的一些背景知识:它从pub/sub中提取消息,创建一个5分钟的固定窗口,触发10,000条消息(以先到者为准),处理这些消息,最后写入GCP bucket,根据消息的类型使用.to(new AvroEventDynamicDestinations(avroBaseDir,schemaView))将每个特定类型的消息发送到特定的文件夹。

更新1:查看这个错误的时间戳,它似乎是以10秒的间隔出现的,所以每分钟6秒。

共有1个答案

黎腾
2023-03-14

我也有同样的例外。我的问题来自错误的模式,确切地说是空模式(模式注册表找不到)

 类似资料:
  • 我试图从< code > generic record (< code > Avro 1 . 8 . 2 )中检索字段的值 上述代码段的输出是 从一般记录中获取模式{"type":"record "," name":"customer_address "," namespace ":" customer _ address . avro "," fields":[{"name":"datetime

  • 更新:spark avro软件包已更新以支持此场景。https://github.com/databricks/spark-avro/releases/tag/v3.1.0 我有一个AVRO文件,它是由我无法控制的第三方创建的,我需要使用spark进行处理。AVRO模式是一个记录,其中一个字段是混合联合类型: 这是不支持的火花avro阅读器: 除了上面列出的类型之外,它还支持读取三种类型的联合类型

  • 我有以下:Source-Kafka topic(trans)Channel-memory Sink-Hdfs(avro _ event) kafka主题trans中的数据是使用c#生产者编写的,并且有数千条avro记录。当我运行我的水槽消费者时,它开始将数据下沉到hdfs。问题是数据的格式是:模式数据模式数据 而不是: 模式数据数据 我猜这是因为flume需要一个带有{header} {body}

  • 我有一个奇怪的错误:我有一个使用Hibernate的注释类。一切都很好,直到我开始这样介绍日志: 产生的错误是: [错误]原因:org。冬眠MappingException:无法确定组织的类型。slf4j。Logger,位于table:container,用于列:[org.hibernate.mapping.Column(Logger)] 根据我的理解,在这种情况下,静态场应该是合适的。那么为什么

  • 我试图在Apache Beam中使用BigtableIO的运行时参数来写入BigTable。 我创建了一个从 BigQuery 读取并写入 Bigtable 的管道。当我提供静态参数时,管道工作正常(使用 ConfigBigtableIO 和 ConfigBigtableConfiguration,请参阅此处的示例 - https://github.com/GoogleCloudPlatform/

  • 问题内容: 我目前正在Visual Studio中编写一些软件,以使用SQL分析来自Access数据库的大量数据。我有编写一个新的计算变量的代码,但在将数据写回到Access所花费的时间上却很挣扎。 我目前正在使用一些vb com代码与以2002/3可比模式运行的Access数据库进行通信。以下是我当前的代码,该代码在循环中运行一个函数以写入数据库。 这是功能: 我目前正在尝试为每个参数插入〜45

  • 我正在写一个脚本,连接到N主机通过SSH...查询第三方系统和提取数据,然后显示所有收集的数据在一定的格式。 我希望将脚本执行的所有操作以及在控制台和日志文件中遇到的任何异常记录下来,这样用户就可以看到脚本运行时发生了什么(如果有人使用了Ansible,那么就像我们在运行playbooks时在控制台和日志中得到的输出一样) 预期产出 null 请给出建议,如果可能的话,用一个使用该技术的示例脚本。

  • 问题内容: 我想检查特定背景文件中的错误,但是标准错误流由前台程序控制,并且问题中文件中的错误未显示。不过,我可以使用该模块并将输出写入文件。我想知道如何使用它来记录所有异常,错误及其回溯。 问题答案: 记录程序中引发的 任何 异常可能是一个坏主意,因为Python还将异常用于正常控制流。 因此,您应该只记录 未捕获的 异常。一旦有了异常对象,就可以使用记录器的方法轻松地执行此操作。 要处理所有未