当前位置: 首页 > 知识库问答 >
问题:

使用beam nuggets库部署管道时发生GCP Dataflow runner错误-“未能读取数据平面中的输入。”

仉明知
2023-03-14

我一直在测试GCP提供的ApacheBeam笔记本中的ApacheBeam管道,使用Kafka实例作为输入,Bigquery作为输出。我已经能够通过Interactive runner成功地使用管道,但当我将同一管道部署到Dataflow runner时,它似乎从未真正阅读过已定义的Kafka主题。查看日志会发现错误:

无法读取数据平面中的输入。回溯(最后一次调用):文件/usr/local/lib/python3。7/站点包/apache_梁/runners/worker/data_平面。哎呀,,

基于本文的实现

有什么想法吗?代码如下:

from __future__ import print_function
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import kafkaio

kafka_config = {"topic": kafka_topic, "bootstrap_servers": ip_addr}

# p = beam.Pipeline(interactive_runner.InteractiveRunner(), options=options) # <- use for test
p = beam.Pipeline(DataflowRunner(), options=options) # <- use for dataflow implementation

notifications = p | "Reading messages from Kafka" >> kafkaio.KafkaConsume(kafka_config) 
preprocess = notifications | "Pre-process for model" >> beam.ParDo(preprocess()) 
model = preprocess | "format & predict" >> beam.ParDo(model())

newWrite = model | beam.io.WriteToBigQuery(
    table_spec,
    schema=table_schema,
    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
    create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER)

来自日志的错误消息:

Failed to read inputs in the data plane. Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py", line 528, in _read_inputs for elements in elements_iterator: File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 416, in __next__ return self._next() File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 689, in _next raise self grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with: status = StatusCode.UNAVAILABLE details = "DNS resolution failed" debug_error_string = "{"created":"@1595595923.509682344","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3948,"referenced_errors":[{"created":"@1595595923.509650517","description":"Resolver transient failure","file":"src/core/ext/filters/client_channel/resolving_lb_policy.cc","file_line":216,"referenced_errors":[{"created":"@1595595923.509649070","description":"DNS resolution failed","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc","file_line":375,"grpc_status":14,"referenced_errors":[{"created":"@1595595923.509645878","description":"unparseable host:port","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc","file_line":417,"target_address":""}]}]}]}" >

而且

grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with: status = StatusCode.UNAVAILABLE details = "DNS resolution failed" debug_error_string = "{"created":"@1594205651.745381243","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3948,"referenced_errors":[{"created":"@1594205651.745371624","description":"Resolver transient failure","file":"src/core/ext/filters/client_channel/resolving_lb_policy.cc","file_line":216,"referenced_errors":[{"created":"@1594205651.745370349","description":"DNS resolution failed","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc","file_line":375,"grpc_status":14,"referenced_errors":[{"created":"@1594205651.745367499","description":"unparseable host:port","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc","file_line":417,"target_address":""}]}]}]}" >

管道设置:

Python sdk harness started with pipeline_options: {'streaming': True, 'project': 'example-project', 'job_name': 'beamapp-root-0727105627-001796', 'staging_location': 'example-staging-location', 'temp_location': 'example-staging-location', 'region': 'europe-west1', 'labels': ['goog-dataflow-notebook=2_23_0_dev'], 'subnetwork': 'example-subnetwork', 'experiments': ['use_fastavro', 'use_multiple_sdk_containers'], 'setup_file': '/root/notebook/workspace/setup.py', 'sdk_location': '/root/apache-beam-custom/packages/beam/sdks/python/dist/apache-beam-2.23.0.dev0.tar.gz', 'sdk_worker_parallelism': '1', 'environment_cache_millis': '0', 'job_port': '0', 'artifact_port': '0', 'expansion_port': '0'}

共有2个答案

韦阳辉
2023-03-14

似乎这在我的落地方案中是不可能的,但是使用多语言管道似乎更可行。我在google support上开了一张关于此事的票,经过一段时间的调查,得到了以下回复:

“…目前Python没有任何可与DataflowRunner配合使用的KafkaIO。您可以使用Java作为解决方法。如果您需要Python来完成某些特定任务(TensorFlow或类似任务),可以将Kafka的消息发送到PubSub主题(通过另一条仅从Kafka读取并发布到PS或外部应用程序的管道)。”

所以,请随意接受他们的建议,否则你可能会一起解决一些问题。我刚刚修改了我的架构,用pubsub代替Kafka。

孟思远
2023-03-14

据我所知,无法读取数据平面中的输入。。。状态=状态代码。UNAVAILABLE details=“DNS resolution failed”可能是Python Beam SDK中的问题,建议更新到Python Beam SDK 2.23。0

 类似资料:
  • 我正在尝试部署一个现有的。Net核心应用程序,通过创建构建和发布管道来使用Azure Devops。构建管道工作正常,但我在运行发布管道(在部署Azure App Service下)时收到以下错误。 错误:未找到具有指定模式的包:D:\a\r1\a***. zip 检查任务中提到的包是作为构建中的工件发布还是作为前一阶段发布并在当前作业中下载。 应该做些什么来解决这个问题?

  • 在提交下一个测试时,我在GitLab管道上遇到了一个错误。gitlab ci。yml用于存储库。 执行管道以构建解决方案、部署到Artifactory并触发和API调用 部署作业必须手动执行,并且有两种不同的作业选项可供执行 在您的应用程序中发现错误。gitlab ci。yml:“触发运动”工作需要“部署工作运动”工作,但“部署工作运动”不在任何前一阶段“触发其他人”工作需要“部署工作其他人”工作

  • 我使用beam SDK用python编写了一个Google数据流管道。有一些文档介绍了我如何在本地运行它,并设置runner标志以在数据流上运行它。 我现在正尝试将其自动部署到CI管道(bitbucket管道,但并不真正相关)。有关于如何“运行”管道的文档,但没有真正的“部署”管道。我测试过的命令如下: 这将运行作业,但因为它正在流式传输,所以永远不会返回。它还在内部管理打包并推送到存储桶。我知道

  • 从数据库获取数据时出错: 那我该怎么解决呢?

  • 当我在本地运行应用程序时,我使用GETlocalhost:8080/resources/sqlData/projects访问Rest资源没有问题,但是出于某种原因,当我在AWS服务器上尝试时,我使用my-app.elasticbeanstalk.com/resources/sqlData/projects. Amazon Web服务是否需要某种版本的Resteasy或其他东西?这是我的日志: 17

  • 我在过去多次部署Firebase功能。由于某些原因,最近(大约2-3天)我经常在部署时出错。我使用windows来释放代码。当我尝试在我的Linux系统和Mac系统中设置相同的项目时,部署工作正常。从Windows环境发布可能存在的问题。 以下是部署时遇到的问题。 ! 功能[付款(us-central1)]:部署错误。加载用户代码时函数失败。这可能是由于用户代码中的错误造成的。错误消息:错误:请检