当前位置: 首页 > 知识库问答 >
问题:

使用Apache Beam的数据流sdk写入BigTable时捕获NullPointerException

贺雅健
2023-03-14

我正在使用Apache的Beamsdk version0.2.0-孵化-SNAPSHOT并尝试使用Dataflow运行器将数据拉至bigtable。不幸的是,当我使用BigTableIO. Write作为我的接收器时,我在执行我的数据流管道时得到了NullPointerException。已经检查了我的BigtableOptions并且参数很好,根据我的需要。

基本上,我创建并在我的管道的某个点上完成了编写PCollection的步骤

final BigtableOptions.Builder optionsBuilder =
    new BigtableOptions.Builder().setProjectId(System.getProperty("PROJECT_ID"))
        .setInstanceId(System.getProperty("BT_INSTANCE_ID"));

// do intermediary steps and create PCollection<KV<ByteString, Iterable<Mutation>>> 
// to write to bigtable

// modifiedHits is a PCollection<KV<ByteString, Iterable<Mutation>>>
modifiedHits.apply("writting to big table", BigtableIO.write()
    .withBigtableOptions(optionsBuilder).withTableId(System.getProperty("BT_TABLENAME")));

p.run();

在执行管道时,我得到了NullPointerException,它精确地指向了公共void processElement(ProcessContext c)方法中的BigtableIO类:

(6e0ccd8407eed08b): java.lang.NullPointerException at org.apache.beam.sdk.io.gcp.bigtable.BigtableIO$Write$BigtableWriterFn.processElement(BigtableIO.java:532)

我检查过这个方法在bigtable上写之前正在处理所有元素,但不确定为什么我在执行这个管道的时候会遇到这样的异常。根据下面的代码,此方法使用bigtableWriter属性来处理每个c.element(),但我甚至无法设置断点来调试正好是null的地方。对于如何解决这个问题,有什么建议吗?

@ProcessElement
  public void processElement(ProcessContext c) throws Exception {
    checkForFailures();
    Futures.addCallback(
        bigtableWriter.writeRecord(c.element()), new WriteExceptionCallback(c.element()));
    ++recordsWritten;
  }

谢谢。

共有1个答案

于意智
2023-03-14

我查找了作业及其类路径,如果我没有弄错的话,看起来您使用的是beam SDK java{core,io}的0.3.0-Cubating-SNAPSHOT版本,但是google cloud dataflow java的0.2.0-Cubating-SNAPSHOT版本。

我认为问题就在于此-您必须使用相同的版本(更多详细信息:0.3.0版中的BigtableIO使用设置和拆卸方法,但runner 0.2.0尚不支持)。

 类似资料:
  • 我发现这个带有数据流的示例https://github.com/GoogleCloudPlatform/cloud-bigtable-examples/blob/master/java/dataflow-connector-examples/src/main/java/com/google/cloud/bigtable/dataflow/example/HelloWorldWrite.java 然

  • 据Beam网站报道, 通常,对管道代码执行本地单元测试比调试管道的远程执行更快更简单。 出于这个原因,我想对写到Bigtable的Beam/DataFlow应用程序使用测试驱动开发。 但是,在Beam测试文档之后,我遇到了一个僵局--Passert并不有用,因为输出PCollection包含org.apache.hadoop.hbase.client.Put对象,这些对象不重写equals方法。

  • 我正在开发一个物联网应用程序,需要从PubSub主题读取流数据。我想使用Google云数据流SDK读取这些数据。我正在使用Java 1.8 我正在使用谷歌云平台的试用版。当我使用PubSubIO时。Read方法读取流数据时,我在日志文件中发现错误,我的项目没有足够的CPU配额来运行应用程序。 所以我想使用谷歌云数据流SDK读取流数据。 请有人告诉我在哪里可以找到使用Google Cloud Dat

  • 这参考了阿帕奇光束 SDK 版本 2.2.0。 我正在尝试使用<code>AfterPane.elementCountAtLeast(…) 使用 GCP 数据流 2.0 PubSub 到 GCS 作为参考,以下是我尝试过的方法: 其中< code>stringMessages是从Avro编码的发布订阅中读取的PCollection。上游发生了一些解包,将事件转换成字符串,但没有合并/划分/分组,只

  • 我正在开发kafka-stream api。基本上Kafka-stream从源主题获取数据,并在应用一些过滤器后将其写回目标kafka主题。 使用的依存关系: 下面是相同的代码。: { ... 这是我的应用程序架构: 生产者API(源主题中的生产者)= 我想要的是,当流将数据写入目标主题时,我想要捕获事件,无论它是否成功。 有没有办法捕捉到回调?谢谢

  • 我试图在Apache Beam中使用BigtableIO的运行时参数来写入BigTable。 我创建了一个从 BigQuery 读取并写入 Bigtable 的管道。当我提供静态参数时,管道工作正常(使用 ConfigBigtableIO 和 ConfigBigtableConfiguration,请参阅此处的示例 - https://github.com/GoogleCloudPlatform/