当前位置: 首页 > 知识库问答 >
问题:

数据流DoFn将不使用PipelineOptions序列化

锺离森
2023-03-14

我试图将PipelineOptions接口传递给dataflow DoFn,以便DoFn可以配置一些它需要重新实例化的不可序列化的东西,但是当我告诉dataflow保存我的PipelineOptions子类的实例时,它似乎无法序列化DoFn。我需要对Options接口做什么才能使其正确序列化吗?

public interface Options 
extends BigtableOptions, BigtableScanOptions, OfflineModuleOptions, Serializable {...}

DoFn定义

public class RunEventGeneratorsDoFn extends DoFn<...,...> {
    private OfflinePipelineRunner.Options options;
....
}

选项未标记时的序列化异常瞬态

Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize [my DoFn]
    at com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:54)
    at com.google.cloud.dataflow.sdk.util.SerializableUtils.clone(SerializableUtils.java:91)
    at com.google.cloud.dataflow.sdk.transforms.ParDo$Bound.<init>(ParDo.java:720)
    at com.google.cloud.dataflow.sdk.transforms.ParDo$Unbound.of(ParDo.java:678)
    at com.google.cloud.dataflow.sdk.transforms.ParDo$Unbound.access$000(ParDo.java:596)
    at com.google.cloud.dataflow.sdk.transforms.ParDo.of(ParDo.java:563)
    at com.google.cloud.dataflow.sdk.transforms.ParDo.of(ParDo.java:558)
    at [dofn instantiation line]
Caused by: java.io.NotSerializableException: com.google.cloud.dataflow.sdk.options.ProxyInvocationHandler
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:50)
    ... 7 more

共有1个答案

吕德业
2023-03-14

实际的管道选项对象不应作为字段包含在特定的dofnptransform中。相反,传入要访问的特定选项的值。

有关更多上下文“如何在Beam2.0中的复合PTransform中获取PipelineOptions?”,请参见此问题。

 类似资料:
  • SDK: Google Cloud Dataflow SDK forJava2.1.0 Class: DoFnTester method: setOutputTags https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/transforms/DoFnTester.html Google Cloud

  • 我想把DStream发送到Kafka,但它仍然不起作用。 以下是一些错误信息: 16/10/31 14:44:15错误StreamingContext:错误启动上下文,将其标记为停止java.io.NotSerializableException:DStream检查点已启用,但DStreams及其功能不可序列化spider.app.job.MeetMonitor序列化堆栈:-对象不可序列化(类:s

  • 我正在创建一个简单的Kafka Streaming应用程序。我的Producer正在为一个主题生成protobuf序列化消息,我在Kafka Streaming应用程序中使用该主题来处理消费者消息。我正在尝试使用在我的应用程序中。yml文件。我发现以下错误。 错误: 我的配置文件: 在日志中打印实际反序列化消息的处理方法: 请帮我解决这个问题。如何在Kafka Stream中使用protobuf反

  • 我正在学习使用可拆分DOFN。我预计我的工作将分配给500名员工,但Dataflow只运行了1或2名员工。我是否错误地理解或实现了可拆分DoFn? 我的beam版本是2.16.0

  • 问题内容: 我的问题如下: 我需要根据汽车租赁实例中的信息填充“汽车”表。 我需要创建一个主键“ car_id”,但仅针对出租表中的不同车牌。 我正在创建带有序列的car_id。 我尝试了以下代码,但收到错误消息: 尽管这将起作用(没有不同的车牌): (第一行带有注释,因此我可以立即看到要输出的值) 所以!有谁知道我怎么能做?A)获取上面的代码以使用DISTINCT或B)找到一种方法来获取序列的M

  • 我有一个反应式核心WebClient要发布到给定的endpoint。有效负载是对象的流量,内容类型是application/stream json flux jobFlux=flux。只是(新工作); 在服务器端,我尝试了Spring控制器样式和Spring Web反应式FunctionHandler,以使用流量负载处理上述调用的负载。 当实例化一个新对象时,域类创建和标识: 仓库目前只是一个存根