我正在使用Apache的Beam
sdk version0.2.0-孵化-SNAPSHOT
并尝试使用Dataflow
运行器将数据拉至bigtable。不幸的是,当我使用BigTableIO. Write
作为我的接收器时,我在执行我的数据流管道时得到了NullPointerException
。已经检查了我的BigtableOptions
并且参数很好,根据我的需要。
基本上,我创建并在我的管道的某个点上完成了编写PCollection的步骤
final BigtableOptions.Builder optionsBuilder =
new BigtableOptions.Builder().setProjectId(System.getProperty("PROJECT_ID"))
.setInstanceId(System.getProperty("BT_INSTANCE_ID"));
// do intermediary steps and create PCollection<KV<ByteString, Iterable<Mutation>>>
// to write to bigtable
// modifiedHits is a PCollection<KV<ByteString, Iterable<Mutation>>>
modifiedHits.apply("writting to big table", BigtableIO.write()
.withBigtableOptions(optionsBuilder).withTableId(System.getProperty("BT_TABLENAME")));
p.run();
在执行管道时,我得到了NullPointerException,它精确地指向了公共void processElement(ProcessContext c)方法中的BigtableIO类:
(6e0ccd8407eed08b): java.lang.NullPointerException at org.apache.beam.sdk.io.gcp.bigtable.BigtableIO$Write$BigtableWriterFn.processElement(BigtableIO.java:532)
我检查过这个方法在bigtable上写之前正在处理所有元素,但不确定为什么我在执行这个管道的时候会遇到这样的异常。根据下面的代码,此方法使用bigtableWriter属性来处理每个c.element()
,但我甚至无法设置断点来调试正好是null的地方。对于如何解决这个问题,有什么建议吗?
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
checkForFailures();
Futures.addCallback(
bigtableWriter.writeRecord(c.element()), new WriteExceptionCallback(c.element()));
++recordsWritten;
}
谢谢。
我查找了作业及其类路径,如果我没有弄错的话,看起来您使用的是beam SDK java{core,io}的0.3.0-Cubating-SNAPSHOT版本,但是google cloud dataflow java的0.2.0-Cubating-SNAPSHOT版本。
我认为问题就在于此-您必须使用相同的版本(更多详细信息:0.3.0版中的BigtableIO使用设置和拆卸方法,但runner 0.2.0尚不支持)。
我发现这个带有数据流的示例https://github.com/GoogleCloudPlatform/cloud-bigtable-examples/blob/master/java/dataflow-connector-examples/src/main/java/com/google/cloud/bigtable/dataflow/example/HelloWorldWrite.java 然
据Beam网站报道, 通常,对管道代码执行本地单元测试比调试管道的远程执行更快更简单。 出于这个原因,我想对写到Bigtable的Beam/DataFlow应用程序使用测试驱动开发。 但是,在Beam测试文档之后,我遇到了一个僵局--Passert并不有用,因为输出PCollection包含org.apache.hadoop.hbase.client.Put对象,这些对象不重写equals方法。
我正在开发一个物联网应用程序,需要从PubSub主题读取流数据。我想使用Google云数据流SDK读取这些数据。我正在使用Java 1.8 我正在使用谷歌云平台的试用版。当我使用PubSubIO时。Read方法读取流数据时,我在日志文件中发现错误,我的项目没有足够的CPU配额来运行应用程序。 所以我想使用谷歌云数据流SDK读取流数据。 请有人告诉我在哪里可以找到使用Google Cloud Dat
这参考了阿帕奇光束 SDK 版本 2.2.0。 我正在尝试使用<code>AfterPane.elementCountAtLeast(…) 使用 GCP 数据流 2.0 PubSub 到 GCS 作为参考,以下是我尝试过的方法: 其中< code>stringMessages是从Avro编码的发布订阅中读取的PCollection。上游发生了一些解包,将事件转换成字符串,但没有合并/划分/分组,只
我正在开发kafka-stream api。基本上Kafka-stream从源主题获取数据,并在应用一些过滤器后将其写回目标kafka主题。 使用的依存关系: 下面是相同的代码。: { ... 这是我的应用程序架构: 生产者API(源主题中的生产者)= 我想要的是,当流将数据写入目标主题时,我想要捕获事件,无论它是否成功。 有没有办法捕捉到回调?谢谢
我试图在Apache Beam中使用BigtableIO的运行时参数来写入BigTable。 我创建了一个从 BigQuery 读取并写入 Bigtable 的管道。当我提供静态参数时,管道工作正常(使用 ConfigBigtableIO 和 ConfigBigtableConfiguration,请参阅此处的示例 - https://github.com/GoogleCloudPlatform/