当前位置: 首页 > 知识库问答 >
问题:

提交云数据流作业时出错

鲁宏爽
2023-03-14
Exception in thread "main" java.lang.RuntimeException: Failed to create a workflow job: Invalid JSON payload received. Unknown token.
       { 8r W
^
    at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.run(DataflowPipelineRunner.java:219)
    at com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner.run(BlockingDataflowPipelineRunner.java:96)
    at com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner.run(BlockingDataflowPipelineRunner.java:47)
    at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:145)
    at snippet.WordCount.main(WordCount.java:165)
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request
{
  "code" : 400,
  "errors" : [ {
    "domain" : "global",
    "message" : "Invalid JSON payload received. Unknown token.\n\u001F \b\u0000\u0000\u0000\u0000\u0000\u0000\u0000  \t{ 8r\u0000 W\n^",
    "reason" : "badRequest"
  } ],
  "message" : "Invalid JSON payload received. Unknown token.\n\u001F \b\u0000\u0000\u0000\u0000\u0000\u0000\u0000  \t{ 8r\u0000 W\n^",
  "status" : "INVALID_ARGUMENT"
}

共有1个答案

扈韬
2023-03-14

为了调试这个问题,我们希望验证所发出的请求是否有效,并找到JSON有效负载的无效部分。为此,我们将:

  1. 增加日志记录的冗长
  2. 重新运行应用程序并捕获日志
  3. 在表示JSON有效负载的日志中找到相关部分
  4. 验证JSON有效负载

增加日志记录的冗长性

import java.util.logging.ConsoleHandler;
import java.util.logging.Level;
import java.util.logging.Logger;
public class MyDataflowProgram {
  public static void main(String[] args) {
    ConsoleHandler consoleHandler = new ConsoleHandler();
    consoleHandler.setLevel(Level.ALL);
    Logger googleApiLogger = Logger.getLogger("com.google.api");
    googleApiLogger.setLevel(Level.ALL);
    googleApiLogger.setUseParentHandlers(false);
    googleApiLogger.addHandler(consoleHandler);
    ... Pipeline Construction ...
}

在重新执行数据流作业期间,您需要查找与提交数据流作业相关的日志。这些日志将包含后跟响应的HTTP请求,如下所示:

POST https://dataflow.googleapis.com/v1b3/projects/$GCP_PROJECT_NAME/jobs
Accept-Encoding: gzip
... Additional HTTP headers ...
... JSON request payload for creation ...
{"environment":{"clusterManagerApiService":"compute.googleapis.com","dataset":"bigquery.googleapis.com/cloud_dataflow","sdkPipelineOptions": ...

-------------- RESPONSE --------------
HTTP/1.1 200 OK
... Additional HTTP headers ...
... JSON response payload ...

您对请求有效负载感兴趣,因为您得到的错误表明它是问题的根源。

验证JSON有效负载

 类似资料:
  • 使用“file_loads”技术通过Apache Beam数据流作业写入BigQuery时出错。流式插入(else块)工作正常,符合预期。file_load(如果块)失败,错误在代码后面给出。bucket中GCS上的临时文件是有效的JSON对象。 来自pub/sub的原始事件示例: 数据流作业出错:

  • 我当前正尝试将Dataflow与pub/sub一起使用,但出现以下错误: 工作流失败。原因:(6E74E8516C0638CA):刷新凭据时出现问题。请检查:1。已为项目启用Dataflow API。2.您的项目有一个机器人服务帐户:service-[project number]@dataflow-service-producer-prod.iam.gserviceAccount.com应该可以

  • 每个人都试着用https://console.developers.google.com/project/_/mc/template/hadoop? Spark对我来说安装正确,我可以SSH进入hadoop worker或master,Spark安装在/home/hadoop/Spark install/ 我可以使用spark python shell在云存储中读取文件 lines=sc.text

  • 当我在GCP中运行作业时,这工作很好,但如果没有任何更新,它将失败。如果我删除update标志,只要没有正在运行的作业,它就可以正常工作。 是否有一种方法来指定,如果作业存在,则更新它,而不仅仅是开始一个新的作业?

  • 我正在尝试使用airflow的DataflowPythonOperator计划数据流作业。这是我的dag操作员: gcp_conn_id已设置,可以正常工作。错误显示数据流失败,返回代码为1。完整日志如下所示。 gcp_dataflow_hook.py似乎有问题,除了这个没有更多的信息。有没有办法解决这个问题,有没有DataflowPython算子的任何例子?)到目前为止,我找不到任何使用案例)

  • 我试图从一个数据流作业中运行两个分离的管道,类似于下面的问题: 一个数据流作业中的并行管道 如果我们使用单个p.run()使用单个数据流作业运行两个分离的管道,如下所示: 我认为它将在一个数据流作业中启动两个独立的管道,但它会创建两个包吗?它会在两个不同的工人上运行吗?