当前位置: 首页 > 知识库问答 >
问题:

Google Cloud数据流和Cloud Functions错误-ModuleNotFoundError

通典
2023-03-14

我正在从GCP中的Cloud Function触发数据流作业。

嵌入云功能的代码

import apache_beam as beam
import argparse

PROJECT = 'projectName'
BUCKET='bucketName'
SCHEMA = 'sr:INTEGER,abv:FLOAT,id:INTEGER,name:STRING,style:STRING,ounces:FLOAT'
DATAFLOW_JOB_NAME = 'jobName'

def discard_incomplete(data):
    """Filters out records that don't have an information."""
    return len(data['abv']) > 0 and len(data['id']) > 0 and len(data['name']) > 0 and len(data['style']) > 0


def convert_types(data):
    """Converts string values to their appropriate type."""
    data['abv'] = float(data['abv']) if 'abv' in data else None
    data['id'] = int(data['id']) if 'id' in data else None
    data['name'] = str(data['name']) if 'name' in data else None
    data['style'] = str(data['style']) if 'style' in data else None
    data['ounces'] = float(data['ounces']) if 'ounces' in data else None
    return data

def del_unwanted_cols(data):
    """Delete the unwanted columns"""
    del data['ibu']
    del data['brewery_id']
    return data

def execute(event, context):
    argv = [
      '--project={0}'.format(PROJECT),
      '--job_name={0}'.format(DATAFLOW_JOB_NAME),
      '--staging_location=gs://{0}/staging/'.format(BUCKET),
      '--temp_location=gs://{0}/staging/'.format(BUCKET),
      '--region=us-central1',
      '--runner=DataflowRunner'
   ]

    p = beam.Pipeline(argv=argv)
    input = 'gs://{0}/beers.csv'.format(BUCKET)

    (p | 'ReadData' >> beam.io.ReadFromText(input, skip_header_lines =1)
       | 'SplitData' >> beam.Map(lambda x: x.split(','))
       | 'FormatToDict' >> beam.Map(lambda x: {"sr": x[0], "abv": x[1], "ibu": x[2], "id": x[3], "name": x[4], "style": x[5], "brewery_id": x[6], "ounces": x[7]}) 
       | 'DeleteIncompleteData' >> beam.Filter(discard_incomplete)
       | 'ChangeDataType' >> beam.Map(convert_types)
       | 'DeleteUnwantedData' >> beam.Map(del_unwanted_cols)
       | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
           '{0}:sandeep_beer_test.beer_data'.format(PROJECT),
           schema=SCHEMA,
           write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
    p.run()
    

当执行Cloud函数时,数据流作业确实会被触发,但作业一直失败。当我检查作业日志时,我看到这条错误消息-ModuleNotFoundError:没有名为“google.cloud.functions”的模块

requirements.txt

apache-梁[gc p]

如果我在安装apache beam[gcp]后直接从Cloud shell运行它,则嵌入云函数中的python代码运行良好。

请分享您对如何克服丢失模块的数据流错误的意见。

谢谢

桑迪普

共有1个答案

孙玺
2023-03-14

这可能是因为你正在通过 --save_main_session

 类似资料:
  • 我试图用Beam sdk Version2.20.0在Python 3.7中构建一个Apache Beam管道,该管道成功地部署在Dataflow上,但似乎没有做任何事情。在工作日志中,我可以看到重复报告的以下错误消息 同步pod xxxxxxxxxxxx()时出错,跳过:启动容器工作日志失败 我已经尝试了我能尝试的一切,但这个错误是相当顽固的,我的管道看起来像这样。 我尝试使用sdk_locat

  • 问题内容: 我正在尝试使用post请求将用户保存到mongodb数据库,如下所示,但我收到错误bcrypt错误:需要数据和哈希参数。这是代码的非常简单的设置,但我不知道它有什么问题。models / users.js 路线/users.js 服务器正在运行,但是在使用邮递员chrome后显示请求错误,并且服务器停止工作,如图中所示。 问题答案: 错误来自方法。就您而言,您具有以下代码段: 我认为您

  • 我正在尝试为我的SpringCloudDataflow流创建一个自定义异常处理程序,以路由一些要重新排队的错误和其他要DLQ的错误。 为此,我使用了全局Spring集成“errorChannel”和基于异常类型的路由。 这是Spring集成错误路由器的代码: 错误路由器由每个流应用程序通过Spring Boot应用程序上的包扫描获取: 当它与本地 Spring Cloud Dataflow 服务器

  • BigQuery->ParDo->BigQuery 该表有~2B行,不到1TB。 运行了8个多小时后,作业失败,出现以下错误: 作业id为:2015-05-18_21_04_28-9907828662358367047 此外,即使作业失败,它仍然在图表上显示为成功。为什么?