Exception in thread "main" java.lang.RuntimeException: Failed to create a workflow job: Invalid JSON payload received. Unknown token. { 8r W ^ at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.run(DataflowPipelineRunner.java:219) at com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner.run(BlockingDataflowPipelineRunner.java:96) at com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner.run(BlockingDataflowPipelineRunner.java:47) at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:145) at snippet.WordCount.main(WordCount.java:165) Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request { "code" : 400, "errors" : [ { "domain" : "global", "message" : "Invalid JSON payload received. Unknown token.\n\u001F \b\u0000\u0000\u0000\u0000\u0000\u0000\u0000 \t{ 8r\u0000 W\n^", "reason" : "badRequest" } ], "message" : "Invalid JSON payload received. Unknown token.\n\u001F \b\u0000\u0000\u0000\u0000\u0000\u0000\u0000 \t{ 8r\u0000 W\n^", "status" : "INVALID_ARGUMENT" }
为了调试这个问题,我们希望验证所发出的请求是否有效,并找到JSON有效负载的无效部分。为此,我们将:
增加日志记录的冗长性
import java.util.logging.ConsoleHandler;
import java.util.logging.Level;
import java.util.logging.Logger;
public class MyDataflowProgram {
public static void main(String[] args) {
ConsoleHandler consoleHandler = new ConsoleHandler();
consoleHandler.setLevel(Level.ALL);
Logger googleApiLogger = Logger.getLogger("com.google.api");
googleApiLogger.setLevel(Level.ALL);
googleApiLogger.setUseParentHandlers(false);
googleApiLogger.addHandler(consoleHandler);
... Pipeline Construction ...
}
在重新执行数据流作业期间,您需要查找与提交数据流作业相关的日志。这些日志将包含后跟响应的HTTP请求,如下所示:
POST https://dataflow.googleapis.com/v1b3/projects/$GCP_PROJECT_NAME/jobs
Accept-Encoding: gzip
... Additional HTTP headers ...
... JSON request payload for creation ...
{"environment":{"clusterManagerApiService":"compute.googleapis.com","dataset":"bigquery.googleapis.com/cloud_dataflow","sdkPipelineOptions": ...
-------------- RESPONSE --------------
HTTP/1.1 200 OK
... Additional HTTP headers ...
... JSON response payload ...
您对请求有效负载感兴趣,因为您得到的错误表明它是问题的根源。
验证JSON有效负载
使用“file_loads”技术通过Apache Beam数据流作业写入BigQuery时出错。流式插入(else块)工作正常,符合预期。file_load(如果块)失败,错误在代码后面给出。bucket中GCS上的临时文件是有效的JSON对象。 来自pub/sub的原始事件示例: 数据流作业出错:
我当前正尝试将Dataflow与pub/sub一起使用,但出现以下错误: 工作流失败。原因:(6E74E8516C0638CA):刷新凭据时出现问题。请检查:1。已为项目启用Dataflow API。2.您的项目有一个机器人服务帐户:service-[project number]@dataflow-service-producer-prod.iam.gserviceAccount.com应该可以
每个人都试着用https://console.developers.google.com/project/_/mc/template/hadoop? Spark对我来说安装正确,我可以SSH进入hadoop worker或master,Spark安装在/home/hadoop/Spark install/ 我可以使用spark python shell在云存储中读取文件 lines=sc.text
当我在GCP中运行作业时,这工作很好,但如果没有任何更新,它将失败。如果我删除update标志,只要没有正在运行的作业,它就可以正常工作。 是否有一种方法来指定,如果作业存在,则更新它,而不仅仅是开始一个新的作业?
我正在尝试使用airflow的DataflowPythonOperator计划数据流作业。这是我的dag操作员: gcp_conn_id已设置,可以正常工作。错误显示数据流失败,返回代码为1。完整日志如下所示。 gcp_dataflow_hook.py似乎有问题,除了这个没有更多的信息。有没有办法解决这个问题,有没有DataflowPython算子的任何例子?)到目前为止,我找不到任何使用案例)
我试图从一个数据流作业中运行两个分离的管道,类似于下面的问题: 一个数据流作业中的并行管道 如果我们使用单个p.run()使用单个数据流作业运行两个分离的管道,如下所示: 我认为它将在一个数据流作业中启动两个独立的管道,但它会创建两个包吗?它会在两个不同的工人上运行吗?