当前位置: 首页 > 知识库问答 >
问题:

在Apache Beam中混合流式和非流式源时,转换节点AppliedTransform未按预期错误被DirectRunner替换

孔嘉茂
2023-03-14

当我声明一个有两个源(1 gcs和1 pubsub)的管道时,我得到一个错误,但仅限于Beam DirectRunner。有了谷歌数据流转轮,它工作得很好。我的管道有"流选项=True"

gcsEventsColl = p | "Read from GCS" >> beam.io.ReadFromText("gs://sample_events_for_beam/*.log") \
                  | 'convert to dict' >> beam.Map(lambda x: json.loads(x))
liveEventsColl = p | "Read from Pubsub" >> beam.io.ReadFromPubSub(topic="projects/axxxx/topics/input_topic") \
                   | 'convert to dict2' >> beam.Map(lambda x: json.loads(x))


input_rec = (gcsEventsColl, liveEventsColl) | 'flatten' >> beam.Flatten()

似乎DirectRunner为ReadFromText进行了一些不兼容的转换,但我不明白。

   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 564, in run
    return self.runner.run_pipeline(self, self._options)
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 131, in run_pipeline
    return runner.run_pipeline(pipeline, options)
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 529, in run_pipeline
    pipeline.replace_all(_get_transform_overrides(options))
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 504, in replace_all
    self._check_replacement(override)
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 478, in _check_replacement
    self.visit(ReplacementValidator())
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 611, in visit
    self._root_transform().visit(visitor, self, visited)
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 1195, in visit
    part.visit(visitor, pipeline, visited)
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 1195, in visit
    part.visit(visitor, pipeline, visited)
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 1195, in visit
    part.visit(visitor, pipeline, visited)   [Previous line repeated 4 more times]
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 1198, in visit
    visitor.visit_transform(self)
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 476, in visit_transform
    transform_node) RuntimeError: Transform node AppliedPTransform(Read from GCS/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/ProcessKeyedElements/GroupByKey/GroupByKey,
    _GroupByKeyOnly) was not replaced as expected.

我想它与此代码有关,但我不确定:https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/direct/direct_runner.py#L375

谢谢你的帮助

共有2个答案

禹德水
2023-03-14

这是由于错误导致的内部故障。此错误消息表示Python DirectRunner在尝试重写转换时损坏了管道图。

赫连开畅
2023-03-14

事实上,相同的代码适用于Java direct runner。也许这是Python runner的一个限制?

  public static void main(String[] args) {
    Pipeline p = Pipeline.create();
    PCollection<String> apply = p.apply("read from gcs", TextIO.read().from("gs://xxx/*.log"));
    PCollection<String> apply1 = p.apply("read from pubsub", PubsubIO.readMessages().fromTopic("projects/xxx/topics/input_topic")).apply("test", MapElements.via(new FormatAsPubSubMessage()));
    PCollectionList<String> pcs = PCollectionList.of(apply).and(apply1);
    PCollection<String> merged = pcs.apply("merge", Flatten.<String>pCollections());
    merged.apply("log elements", MapElements.via(new LogElement()));
    p.run().waitUntilFinish();
  }
 类似资料:
  • 在过去,我写过一些java程序,使用两个线程。第一个线程(生产者)从API(C库)读取数据,创建一个java对象,将该对象发送到另一个线程。C API正在传递一个事件流(无限)。线程使用LinkedBlockingQueue作为管道交换对象(put、poll)。第二个线程(使用者)正在处理对象。(我还发现,线程中的代码更易读。第一个线程处理C API的内容并生成适当的java对象,第二个线程不受C

  • 通过好奇心,有没有办法用Java Stream编写它?

  • Apache Camel:2.12.2,activemq:5.7 我们注意到,在下面的路由中,对于前100次交换,节流工作正常。此后,它不是每秒发送100次交换,而是每秒仅发送1次交换。现在,如果我们将timePeriodMillis设置为100,它似乎可以正常工作。注意,我们同时发送500个交换。

  • 我想做张桌子。我希望这些数字向左对齐。我不熟悉格式化,不知道为什么会出现错误。

  • 我使用https://github . com/confluent Inc/confluent-Kafka-python/blob/master/examples/avro _ producer . py中的示例代码将数据加载到主题中。我只做了一个更改,那就是我添加了“default”:为了模式兼容性,每个字段都为null。它加载得很好,因为我可以在http://localhost:9021/中看

  • 我有一个类似的数据集 我的星火密码是 我试图通过调用createOrReplaceTempView来替换people视图 但我得到如下错误 如何替换spark中的视图?