当前位置: 首页 > 知识库问答 >
问题:

Apache Beam with Dataflow-从BigQuery读取时为空指针

柯栋
2023-03-14

我正在使用apache束编写的google数据流上运行一项作业,该数据流从BigQuery表和文件中读取。转换数据并将其写入其他BigQuery表。作业“通常”会成功,但有时我在从大查询表读取时随机获得nullpoer异常,而我的作业失败:

(288abb7678892196): java.lang.NullPointerException
at org.apache.beam.sdk.io.gcp.bigquery.BigQuerySourceBase.split(BigQuerySourceBase.java:98)
at com.google.cloud.dataflow.worker.runners.worker.WorkerCustomSources.splitAndValidate(WorkerCustomSources.java:261)
at com.google.cloud.dataflow.worker.runners.worker.WorkerCustomSources.performSplitTyped(WorkerCustomSources.java:209)
at com.google.cloud.dataflow.worker.runners.worker.WorkerCustomSources.performSplitWithApiLimit(WorkerCustomSources.java:184)
at com.google.cloud.dataflow.worker.runners.worker.WorkerCustomSources.performSplit(WorkerCustomSources.java:161)
at com.google.cloud.dataflow.worker.runners.worker.WorkerCustomSourceOperationExecutor.execute(WorkerCustomSourceOperationExecutor.java:47)
at com.google.cloud.dataflow.worker.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:341)
at com.google.cloud.dataflow.worker.runners.worker.DataflowWorker.doWork(DataflowWorker.java:297)
at com.google.cloud.dataflow.worker.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:244)
at com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:125)
at com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:105)
at com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:92)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

我不知道这与什么有关。当我清除临时目录并重新加载模板时,作业再次通过。

我阅读BQ的方式很简单:

BigQueryIO.read().fromQuery()

我将非常感谢任何帮助。

任何人

共有3个答案

袁轶
2023-03-14

好的,让我再详细一点。

  • 作业作为模板上传并在谷歌数据流上运行
  • 这项工作通常是成功的——这就是为什么我怀疑实际代码是否有问题。异常来自源代码,看起来像:bqServices。getDatasetService(bqOptions)在BigQuerySourceBase中返回null
  • 是的,我提供实际查询

下面是我的工作。正如您所看到的,此运行已成功。它处理了从BQ导出的200多万行,从csv文件导出的150万行,并将800k写回BigQuery(数字是正确的)。这项工作基本上按预期工作(当它工作时)。左上角(读取事务)是查询BQ的步骤。这一步骤有时会无缘无故地失败。

成功运行-Beam DAG

以下是在BQ源上使用Nullpointer失败时的同一作业。

运行失败-光束DAG

我不确定代码片段在这种情况下会有多大帮助,但这是执行查询的一部分:

PCollection<Transaction> transactions = p.apply("Read Transactions", BigQueryIO.read().fromQuery(createTransactionQuery(options)))
                                        .apply("Map to Transaction", MapElements.via(new TableRowToTransactionFn()));

    PCollection<KV<String, Transaction>> transactionsPerMtn = 
            transactions.apply("Filter Transactions Without MTN", Filter.by(t -> t.transactionMtn != null))
                        .apply("Map Transactions to MTN key", MapElements.into(
                    TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptor.of(Transaction.class)))
                                    .via(t -> KV.of(t.transactionMtn, t)));

下面是获取查询的方法:

private ValueProvider<String> createTransactionQuery(TmsPipelineOptions options) {
    return NestedValueProvider.of(options.getInputTransactionTable(), table -> {
        StringBuilder sb = new StringBuilder();
        sb.append(
                "SELECT transaction_id, transaction_mtn, transaction_folio_number, transaction_payer_folio_number FROM ");
        sb.append(table);
        return sb.toString();
    });
}

我相信大查询源中存在某种错误,导致了这样的问题。我只是无法确定是什么导致了这种情况,因为它是随机发生的。就像我写的,上次我遇到它时,我刚刚清除了gcs上的temp dir并重新上传了我的模板(没有任何代码更改),然后工作又开始工作了。

麻宾白
2023-03-14

我也遇到了这个问题,在深入研究之后,发现该限制在2.2.0版中已被删除。然而,它尚未正式发布。您可以查看JIRA项目中此版本的进度(似乎只剩下一个问题)。

但是如果你现在想使用它,你可以自己编译它,这并不难。只需从他们的github镜像中签出源代码,签出标签v2.2.0-RC4,然后运行mvn清洁安装。然后只需在pom.xml中修改您的项目依赖项以指向version2.2.0即可。

从2.2.0开始,如果您想使用BigQueryIO作为模板,您将需要使用TemplateCompartity()调用

BigQueryIO
    .readTableRows() // read() has been deprecated in 2.2.0
    .withTemplateCompatibility() // You need to add this
    .fromQuery(options.getInputQuery())

我目前正在为我的项目使用2.2.0,到目前为止效果很好。

韩晋
2023-03-14

我最终在google issuetracker中添加了bug。在与google员工进行了长时间的交谈和调查后,发现将模板用于读取BigQuery的数据流批处理作业是没有意义的,因为您只能执行一次。

引用:“对于BigQuery批处理管道,模板只能执行一次,因为BigQuery作业ID是在模板创建时设置的。这个限制将在SDK2的未来版本中删除,但当我不能说。创建模板:https://cloud.google.com/dataflow/docs/templates/creating-templates#pipeline-io-and-runtime-parameters”

如果错误比NullpointerException更清楚,那就更好了。

无论如何,我希望这对将来的人有所帮助。

如果有人对整个对话感兴趣,问题是:https://issuetracker.google.com/issues/63124894

 类似资料:
  • 当第一个单元格为空时,获取空指针异常。我已经搜索了很多博客,但找不到任何解决方案,请任何人帮助我与代码。

  • 我知道为什么会出现错误:这是因为我想读取数据的表(Htest)为null。 我检查了我的excel,有一张正确名称为“htest”的表。 我还检查了工作簿中的工作表数。它返回工作簿的正确页数 我不知道为什么工作簿中的工作表是可用的,但代码返回null??我错过了什么?有人有同样的问题吗?或者你能给我一个提示来使用它吗? 谢谢你。 错误是:

  • 我试图在DataFlow中读取bigquery数据集。它找不到我指定的bigquery数据集/表。 我的datalab vm、gcs bucket和bigquery dataset都位于欧洲西部2。 出于某种原因,它正在“美国”位置搜索数据集。 搜索了文档,但无法找到为什么会发生这种情况的答案。 HttpError:HttpError访问https://www.googleapis.com/big

  • 我使用的是spring boot 1.4, 当使用@SpringBootTest注释进行集成测试时,它会给出一个空指针。 对于主类: 然后在我的控制器中: HelloService 但在处理请求时,它会告诉helloService NullPointException。 我错过了什么?

  • 我正在将数据从BigQuery读入dataproc spark集群。如果在我的例子中BigQuery表中的数据最初是从GCS加载的,那么如果BigQuery connector for dataproc(newAPIHadoopRDD)首先将数据下载到Google云存储桶中,那么直接从GCS读取数据是否更好?这两种方法有什么利弊吗?

  • 问题内容: 我有一个数据库,其中列中没有值(所以是),但是我无法在vb.net中处理它。我尝试使用以下代码: 以及: 与: 但是显然这是行不通的,因为在此方法无法获取值之后,我在语句上出现了异常。我想您会通过阅读代码本身来弄清楚我对程序的要求。 问题答案: 定义了DbDataReader基础对象的IsDBNull方法来处理这种情况。 当然,如果reader.Read()返回false(意味着没有更