当前位置: 首页 > 知识库问答 >
问题:

如何在GCP数据流中读取CombineFn函数的日志消息?

太叔逸春
2023-03-14

我正在创建一个Apache Beam streaming处理管道,以在GCP Dataflow中运行。我有许多扩展DoFn和combinefn的转换。在DoFn中,使用数据流作业详细信息中的日志窗口,日志可以很好地可视化。但是,不显示来自CombineFn转换的日志。

import java.io.Serializable;
import org.apache.avro.reflect.Nullable;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AverageSpv extends CombineFn<String, AverageSpv.Accum, String> {
    private static final Logger LOG = LoggerFactory.getLogger(AverageSpv.class);

    @DefaultCoder(AvroCoder.class)
    public static class Accum implements Serializable {
        @Nullable String id;
    }

    @Override
    public Accum createAccumulator() {
        return new Accum();
    }

    @Override
    public Accum addInput(Accum accumulator, String input) {
        LOG.info("Add input: id {}, input);

        accumulator.id = input;

        return accumulator;
    }

    @Override
    public Accum mergeAccumulators(Iterable<Accum> accumulators) {
        LOG.info("Merging accumulator");

        Accum merged = createAccumulator();
        for (Accum accumulator : accumulators) {
            merged.id = accumulator.id;            
        }

        return merged;
    }

    @Override
    public VehicleSpeedPerSegmentInfo extractOutput(Accum accumulator) {
        LOG.info("Extracting accumulator");

        LOG.info("Extract output: id {}", acummulator.id);

        return acummulator.id;
    }
}

共有1个答案

咸臻
2023-03-14

Apache Beam CombineFn操作在数据流中跨几个步骤执行。(具体地说,在将所有结果洗牌到单个键之前会进行大量的预合并,然后在后续的post-GBK步骤中将所有上游结果合并到最终结果中。)事实上,图中没有一个单独的执行“步骤”对应于原始的合并步骤,这可能是阻止找到日志的原因。

这是一个bug,应该修复。如前所述,一个解决方法是查看管道中的所有日志。

 类似资料:
  • 徒步旅行例子 我们首先创建到应用程序的三个连接(内存池、共识和查询)(在本例中本地运行 kvstore)。 I[10-04|13:54:27.364] Starting multiAppConn module=proxy impl=multiAppConn I[10-04|13:54:27.366] Starting localClient

  • 我正在linux 64bit中构建没有glibc的简单应用程序。但是我不知道如何获取参数。 我谷歌了一下,发现RDI是argc,RSI是argv。但它不起作用。 当_start函数开始使用gdb时,我看到了寄存器,但RDI和RSI都是0x0。我还用最简单的汇编应用程序进行了测试,但结果是一样的。RDI和RSI是0x0。我认为即使我没有向程序传递参数,argc也不应该是0x0。 这是我尝试过的C代码

  • 使用标准的GCP提供的存储/文本文件来发布Sub数据流模板,但是尽管我已经设置了#workernodes eq 1,但是对于下游组件来说,处理的消息吞吐量“太高”。 在 Pub/Sub 中的消息事件上运行的 Cloud 函数会命中 GCP 配额,并且使用 CloudRun,我在开始时收到一堆 500、429 和 503 个错误(由于步进突发率)。 有没有办法控制数据流的处理速率?需要获得更软/更慢

  • 我尝试在Spring Cloud数据流UI上查看任务日志。但是,我得到的消息是:当我使用RESTendpoint:localhost:9393/dashboard/#tasks/executions/33时,由于任务实例没有运行,因此无法检索日志 当我第一次启动任务时,任务id是33,日志显示在UI上。但是当我再次重新启动相同的任务(任务id为34)时,显示的是id 34的日志,而id 33的日志

  • 这是我的一个简单代码片段,它试图从使用者读取Avro泛型记录: 正如您所看到的,我可以记录模式,但不知道如何获取其数据值。 而在此对象中的架构和数据如下所示: {schema:{“type”:“record”,“name”:“user”,“namespace”:“confluent.kafka.example.avrospecific”,“fields”:[{“name”:“name”,“type