当前位置: 首页 > 知识库问答 >
问题:

在Beam Spark runner中注册Kryo序列化的自定义类

伊裕
2023-03-14

我已经看到Beam Spark runner使用BeamSparkRunnerRegistrator进行kryo注册。有没有办法注册自定义用户类?

共有3个答案

钱瑞
2023-03-14

如果您不想注册您的类,Kryo序列化仍然可以工作,但它必须与每个对象一起存储完整的类名,这是浪费。

江光明
2023-03-14

使用此自定义序列化程序创建您自己的Kryoynstrator

package Mypackage
class MyRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
    kryo.register(classOf[A], new CustomASerializer())
}}

然后,使用注册者的完全限定名称添加关于它的配置条目,例如Mypackage。我的注册人:

val conf = new SparkConf()
conf.set("spark.kryo.registrator", "Mypackage.KryoRegistrator")

请参阅留档:数据序列化火花

万俟穆冉
2023-03-14

有一个方法可以做到这一点,但是首先,我可以问一下你为什么要这么做吗?

一般来说,Beam的Spark运行器使用Beam编码器来序列化用户数据。

我们目前有一个错误,其中缓存DStream正在使用Kryo进行序列化,如果用户类不可Kryo序列化,则失败。BEAM-2669。我们目前正在尝试解决此问题。

如果这是您面临的问题,您目前可以使用Kryo的注册器来解决此问题。这是您面临的问题吗?或者您有其他原因这样做,请告诉我。

在任何情况下,以下是您如何使用SparkContextOptions将自己的自定义JavaSparkContext实例提供给Beam的Spark运行程序

SparkConf conf = new SparkConf();
conf.set("spark.serializer", KryoSerializer.class.getName());
conf.set("spark.kryo.registrator", "my.custom.KryoRegistrator");

JavaSparkContext jsc = new JavaSparkContext(..., conf);

SparkContextOptions options = PipelineOptionsFactory.as(SparkContextOptions.class);
options.setRunner(SparkRunner.class);
options.setUsesProvidedSparkContext(true);
options.setProvidedSparkContext(jsc);

Pipeline p = Pipeline.create(options);

有关更多信息,请参阅:

Beam Spark runner文档

示例:ProvidedSparkContextTest。Java语言

 类似资料:
  • 我有一个类,它通过实现中的和方法来实现自定义Kryo序列化程序(请参见下面的示例)。如何用Spark注册此自定义序列化程序? 现在在Spark: 不幸的是,Spark没有给我注册自定义序列化程序的选项。你知道有没有办法做到这一点?

  • 我不明白注册一个类用于kryo序列化是什么意思。为了给出一些背景,这里的链接说 Kryo不支持所有可序列化类型,并且要求您预先注册将在程序中使用的类以获得最佳性能。 还是那句话,我不明白注册一个班级的意义是什么?序列化代码不是静态的吗?就是应用相同的逻辑序列化所有类型的对象。在“高级”中提到将要序列化的类会有什么帮助呢? 谢谢!

  • 我正在尝试将kryo序列化用于: 我一直收到以下错误,说某个类没有注册: 但是,我确实注册了它: 当我设置日志时。TRACE()我得到以下输出: 当跟踪记录器打印出它已注册时,为什么会说它未注册。我找不到关于这件事的任何有用的文件。以前有人经历过这种情况吗?如果有帮助,我正在运行Apache Spark v0.8.1

  • 我正在尝试配置序列化程序实例,以便在storm拓扑中使用。 非常感谢

  • 我试图创建会影响序列化值的自定义jackson注释。 意思是: 现在序列化对象X(10)将导致: 我怎样才能做到这一点?

  • 我想,问题出在深度嵌套的内部类上。我在登记上做错什么了吗?