当前位置: 首页 > 知识库问答 >
问题:

状态和计时器的问题-apache beam

岳奇逸
2023-03-14

我试图重新创建本博客文章中描述的apache beam管道的一个简单示例,该示例使用了状态和计时器。

以下是从博客中复制粘贴的Enrich DoFn:

public class Enrich extends DoFn<KV<String, String>, String> {
  private static final long serialVersionUID = 1L;

  private static final int MAX_BUFFER_SIZE = 2;

  @StateId("buffer")
  private final StateSpec<BagState<String>> bufferedEvents =
    StateSpecs.bag();

  @StateId("count")
  private final StateSpec<ValueState<Integer>> countState =
    StateSpecs.value();

  @TimerId("expiry")
  private final TimerSpec expirySpec = 
    TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @ProcessElement
  public void process(
    ProcessContext context,
    BoundedWindow window,
    @StateId("buffer") BagState<String> bufferState,
    @StateId("count") ValueState<Integer> countState,
    @TimerId("expiry") Timer expiryTimer) {

      Duration allowedLateness = Duration.standardSeconds(10);

      expiryTimer.set(window.maxTimestamp().plus(allowedLateness));

      int count = firstNonNull(countState.read(), 0);
      count = count + 1;
      countState.write(count);
      bufferState.add(context.element().getValue());

      if (count >= MAX_BUFFER_SIZE) {
        for (String event : bufferState.read()) {
          context.output(enrichEvent(event));
        }
        bufferState.clear();
        countState.clear();
      }
    }

    @OnTimer("expiry")
    public void onExpiry(
      OnTimerContext context,
      @StateId("buffer") BagState<String> bufferState) {

        if (!bufferState.isEmpty().read()) {
          for (String event : bufferState.read()) {
            context.output(enrichEvent(event));
          }
          bufferState.clear();
        }
    }

    public static String enrichEvent(String event) {
      return event + ": enriched";
    }

    public static int firstNonNull(Integer x, Integer y) {
      if (x == null) {
        return y;
      }
      return x;
    }
}

下面是我用来测试enrichdofn:

@RunWith(JUnit4.class)
public class EnrichTest {
  final Logger LOG = LoggerFactory.getLogger(EnrichTest.class);

  @Rule
  public TestPipeline p = TestPipeline.create();

  static final String record1 = "1";
  static final String record2 = "2";
  static final String record3 = "3";

  static final String key = "a key";

  static final String result1 = "1: enriched";
  static final String result2 = "2: enriched";
  static final String result3 = "3: enriched";

  @Test
  public void testSimple() throws Exception {
    Duration ALLOWED_LATENESS = Duration.standardSeconds(10);
    Duration WINDOW_DURATION = Duration.standardSeconds(10);
    Instant baseTime = new Instant(0L);
    KvCoder<String, String> coder = 
      KvCoder.of(AvroCoder.of(String.class), AvroCoder.of(String.class));

    TestStream<KV<String, String>> items = 
        TestStream
          .create(coder)
          .advanceWatermarkTo(baseTime)
          .addElements(
              TimestampedValue.of(
                KV.of(key, record1),
                baseTime.plus(Duration.standardSeconds(1))))
          .addElements(
              TimestampedValue.of(
                KV.of(key, record2),
                baseTime.plus(Duration.standardSeconds(0))))
          .advanceWatermarkTo(
              baseTime.plus(Duration.standardSeconds(11)))
          .addElements(
              TimestampedValue.of(
                KV.of(key, record3),
                baseTime.plus(Duration.standardSeconds(2))))
          .advanceWatermarkToInfinity();

    PCollection<String> results = 
        p.apply(items)
         .apply(new CreateWindows (WINDOW_DURATION, ALLOWED_LATENESS))
         .apply(ParDo.of(new Enrich()));

    PAssert
      .that(results)
      .inWindow(new IntervalWindow(baseTime, WINDOW_DURATION))
      .containsInAnyOrder(result1, result2, result3);

    p.run().waitUntilFinish();
  }
}

下面是我的窗口函数:

public class CreateWindows extends 
  PTransform<PCollection<KV<String, String>>,
             PCollection<KV<String, String>>> {

  private static final long serialVersionUID = 1L;
  private final Duration windowDuration;
  private final Duration allowedLateness;

  public CreateStringWindows(Duration windowDuration, Duration allowedLateness) {
    this.windowDuration = windowDuration;
    this.allowedLateness = allowedLateness;
  }

  @Override
  public PCollection<KV<String, String>> expand(
    PCollection<KV<String, String>> items) {

    return items.apply("Aggregate fixed window",
      Window.<KV<String, String>>into(FixedWindows.of(windowDuration))
            .triggering(AfterWatermark.pastEndOfWindow())
            .discardingFiredPanes()
            .withAllowedLateness(allowedLateness));
  }
}
java.lang.IllegalStateException: TimestampCombiner moved element from 1970-01-01T00:00:19.999Z to earlier time 1970-01-01T00:00:09.999Z for window [1970-01-01T00:00:00.000Z..1970-01-01T00:00:10.000Z)

博客文章没有具体提到它使用的窗口化策略。这会是问题所在吗?我还尝试使用never.ever()作为窗口触发器,但得到相同的错误:

.triggering(Never.ever())
.discardingFiredPanes()
.withAllowedLateness(allowedLateness));

很抱歉这篇文章太长了,任何帮助都将不胜感激。

共有1个答案

戚正业
2023-03-14

最后,我编辑了onexpiry以使用context.outputWithTimestamp(encryEvent(event),window.maxTimestamp());而不是context.output(encryEvent(event));。这就解决了问题。

下面是更正的onexpiry方法。

    @OnTimer("expiry")
    public void onExpiry(
      OnTimerContext context, BoundedWindow window,
      @StateId("buffer") BagState<String> bufferState) {

        if (!bufferState.isEmpty().read()) {
          for (String event : bufferState.read()) {
            context.outputWithTimestamp(enrichEvent(event), window.maxTimestamp());
          }
          bufferState.clear();
        }
    }
 类似资料:
  • 所以我正在制作一个机器人,我想为“userinfo”命令制作一个很酷的小功能,我很少或几乎从来没有在机器人上看到过。 我目前被活动/游戏信息所困扰。我已经为用户的所有活动制作了一张地图,以防用户有多个活动。 例如:用户同时拥有自定义状态和正在收听Spotify或玩游戏。我已经设法制作了一个地图,上面写着用户正在玩的游戏,我想让自定义状态也显示出来。不幸的是,对于自定义状态,它只写“自定义状态”,而

  • 问题内容: 我已经使用timer_create()API实现了POSIX计时器,当计时器到期时,我将为其放置处理程序代码,这将生成SIGUSR1。现在的问题是,如果此程序收到另一个SIGUSR1,则将调用并捕获相同的信号处理程序。 有什么方法可以防止这种情况,以便处理程序可以捕获仅由计时器生成的信号? 问题答案: 这对您有用吗?(修改了手册页中示例中的代码。) 当捕捉到来自定时器的信号时,将显示。

  • 问题内容: 我正在尝试制作一个使用套接字编程和计时器来侦听客户端输入流的程序 但是每当计时器执行..它被挂起 请帮帮我 这是代码… 提前致谢 问题答案: 使程序成为多线程;一个线程在套接字上侦听,另一个线程处理GUI。使用SwingUtilities.invokeLater,只要网络线程接收到数据,就让GUI线程(“事件调度线程”)进行GUI更新。

  • 我正在将我的应用程序迁移到iOS 7。为了处理状态栏问题,我添加了以下代码 这在正常情况下工作正常。如果我正在更改方向(应用程序仅支持横向方向)或显示任何视图控制器并取消模型视图控制器,则我的视图控制器对齐方式将更改。状态栏再次与我的视图控制器重叠。这段代码根本不起作用。请指导我解决此状态栏问题。 案例2:这就是我展示视图控制器的方式 裁判: 提前谢谢。

  • 我觉得摇摆计时器有问题。我写了一些运行良好的代码,然后将其全部移动到我的新计算机上,但它很快就无法工作。我用这个方法编写了一个GUI类(基于JFrame): 该方法是从另一个类调用的。真是太夸张了。splash guy是一个只运行splash动画的JPanel类。它通过摆动计时器来实现。在splash类中,侦听器通过以下代码调整我的帧的内容: 现在,如果第一个块中的print语句被注释掉,我的程序

  • 我首先想到的是:我认为有点非常准确,给出了一个值,其中as请求一个参数;上面写着什么?不要认为<代码>摆动定时器<代码>是准确的吗? 2) 假设用计时一个单词需要x毫秒;如果我们在重复一项任务(突出显示一个单词,如卡拉OK应用程序),我会包含以下代码: 安静地工作很好,但我肯定我不能依靠这一点,因为时间,长度可能会改变!如何克服这个问题?因为几毫秒内的变化可能会给我带来灾难性的后果。 3) 同时,