当前位置: 首页 > 面试题库 >

流之前BigQuery表被截断不起作用

柯天宇
2023-03-14
问题内容

我们正在使用BigQuery Python API进行一些分析。为此,我们创建了以下适配器:

def stream_data(self, table, data, schema, how=None):
    r = self.connector.tables().list(projectId=self._project_id,
                                     datasetId='lbanor').execute()
    table_exists = [row['tableReference']['tableId'] for row in
                    r['tables'] if
                    row['tableReference']['tableId'] == table]
    if table_exists:
        if how == 'WRITE_TRUNCATE':
            self.connector.tables().delete(projectId=self._project_id,
                                           datasetId='lbanor',
                                           tableId=table).execute()
            body = {
                'tableReference': {
                    'tableId': table,
                    'projectId': self._project_id,
                    'datasetId': 'lbanor'
                },
                'schema': schema
            }
            self.connector.tables().insert(projectId=(
                                           self._project_id),
                                           datasetId='lbanor',
                                           body=body).execute()
    else:
        body = {
            'tableReference': {
                'tableId': table,
                'projectId': self._project_id,
                'datasetId': 'lbanor'
            },
            'schema': schema
        }
        self.connector.tables().insert(projectId=(
                                       self._project_id),
                                       datasetId='lbanor',
                                       body=body).execute()

    body = {
        'rows': [
            {
                'json': data,
                'insertId': str(uuid.uuid4())
            }
        ]
    }
    self.connector.tabledata().insertAll(projectId=(
                                         self._project_id),
                                         datasetId='lbanor',
                                         tableId=table,
                                               body=body).execute(num_retries=5)

哪里connector是构建对象。

其主要目的是将数据流式传输到给定的表。如果该表已经存在并且“如何”输入作为“
WRITE_TRUNCATE”传递,则该表首先被删除并再次创建。之后,继续进行数据流。

当不一次又一次删除表时,一切工作正常。

举例来说,这是结果,当我们没有模拟写截断选项(一运行该脚本for循环不断给你打电话stream_datahow=None):

[
  {
    "date": "2016-04-25",
    "unix_date": "1461606664981207",
    "init_cv_date": "2016-03-12",
    "end_cv_date": "2016-03-25",
    "days_trained": "56",
    "days_validated": "14",
    "navigated_score": "1",
    "carted_score": "3",
    "purchased_score": "10",
    "description": "First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5",
    "metric": "rank",
    "result": "0.31729249914663893"
  },
  {
    "date": "2016-04-25",
    "unix_date": "1461606599745107",
    "init_cv_date": "2016-03-06",
    "end_cv_date": "2016-03-25",
    "days_trained": "80",
    "days_validated": "20",
    "navigated_score": "1",
    "carted_score": "3",
    "purchased_score": "10",
    "description": "First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5",
    "metric": "rank",
    "result": "0.32677143128667446"
  },
  {
    "date": "2016-04-25",
    "unix_date": "1461606688950415",
    "init_cv_date": "2016-03-14",
    "end_cv_date": "2016-03-25",
    "days_trained": "48",
    "days_validated": "12",
    "navigated_score": "1",
    "carted_score": "3",
    "purchased_score": "10",
    "description": "First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5",
    "metric": "rank",
    "result": "0.3129267723358932"
  },
  {
    "date": "2016-04-25",
    "unix_date": "1461606707195122",
    "init_cv_date": "2016-03-16",
    "end_cv_date": "2016-03-25",
    "days_trained": "40",
    "days_validated": "10",
    "navigated_score": "1",
    "carted_score": "3",
    "purchased_score": "10",
    "description": "First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5",
    "metric": "rank",
    "result": "0.310620987663015"
  },
  {
    "date": "2016-04-25",
    "unix_date": "1461606622432947",
    "init_cv_date": "2016-03-08",
    "end_cv_date": "2016-03-25",
    "days_trained": "72",
    "days_validated": "18",
    "navigated_score": "1",
    "carted_score": "3",
    "purchased_score": "10",
    "description": "First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5",
    "metric": "rank",
    "result": "0.32395802949369296"
  }
]

但是,当我们在输入how =“ WRITE_TRUNCATE”中使用相同的适配器时,其行为发生了变化,并且变得不可预测。

有时它可以正常工作,并且数据会保存到表中。但是有时,即使没有出现错误,也没有数据保存到表中。

尝试查询表时,不返回任何数据。它只返回“查询返回零结果”。

删除表,再次创建表并传输数据时出了点问题。我们在犯一些错误吗?

如果您需要更多信息,请告诉我。提前致谢!


问题答案:

参见Jordan Jordan的答案和SeanChen对的评论(均为BigQuery工程师)。

摘要是:

  • 重新创建或截断表时,“您需要在流式传输前等待2分钟以上,以避免数据丢失。

这样就可以解释为什么您会得到这种不确定的行为。



 类似资料:
  • 我有一个很奇怪的问题。 我有一个SSIS包,它将数据从Oracle源传输到OLE DB目标。转移工作很好。在传输之前,SQL任务应该截断目标。 我不知道为什么它不起作用。 希望你能帮我。

  • 问题内容: 我有两个清单: 我正在尝试将它们合并为一个列表,其中每个框架都与每种座椅颜色唯一匹配。 我想这样使用: 但是它会缩短到最小列表的长度。 我知道我可以使用以下方法遍历列表: 但我希望它变得更聪明,并使用python内置函数的功能。 有什么想法如何使用zip(或类似zip的东西)并避免被截断吗? 问题答案: 使用保持荏苒,直到 最长 序列已经用完。 这使用默认值(默认为)代替较短列表的缺失

  • 问题内容: 我试图在AngularJS中制作一个拦截器。我是AngularJS的新手,并找到了一些Interceptor的示例,但无法使其正常工作。 这里有我的app.js文件,其中包含所有相关代码。我还有一个控制器,该控制器调用REST api并返回JSONP。 首先,我声明模块,然后进行配置(定义拦截器)。现在它应该捕获所有请求并将其输出到控制台… 用app.factory创建拦截器是否错误?

  • 问题内容: 我正在尝试编写一个断言,以检查用户提供的大小是否为正值,如果不是,则使其为正,此语句位于类构造函数内部,该类构造函数采用大小值,然后生成array [size]。我写了下面的代码,我认为是正确的。 尽管我似乎从未评估过我的断言并继续执行程序,但会导致NegativeArraySize错误(我正在尝试避免) 我也尝试过 并且程序无法停止为负值。 我最近在Mac上运行Java时遇到了一些问

  • 问题内容: 我有一个控制器,可提供对信息的RESTful访问: 我遇到的问题是,如果我使用带有特殊字符的路径变量访问服务器,则会被截断。例如: http:// localhost:8080 / blah-server / blah / get / blah2010.08.19-02:25:47 参数blahName将为blah2010.08 但是,对request.getRequestURI()的

  • 命令用于截断表。 如果您截断表,表的所有行将永久删除。 语法: 示例: 我们有一个名为“”的表具有以下数据(创建表并插入数据): 现在,我们使用命令: 现在表已经被截断了,您可以使用SELECT命令验证它。 如下图所示 - 您可以看到表现在被截断。