当前位置: 首页 > 知识库问答 >
问题:

如何从ValueTransformer中的标点或实例向下游转发事件?

郎翔
2023-03-14

在KafkaStream中,当实现ValueTransformer或ValueTransformerWithKey时,在transform()调用时,我会安排一个新的标点器。当执行标点器的标点()方法时,我希望它使用上下文实例向下游转发事件。然而,作为DSL拓扑的一部分,上下文实例似乎没有定义。

有没有关于如何使用变压器的线索?

在处理器中使用相同的逻辑,实现其工作的底层处理器拓扑。

在ValueTransformerWithKey中:

@Override 
    public Event transform(final String key, final Event event) { 
        this.context.schedule(timeout.toMillis(), PunctuationType.WALL_CLOCK_TIME, new MyPunctuator(context, key, event));
        return null;
}

在MyPunctuator中:

private class MytPunctuator implements Punctuator {
    private String key;
    private ProcessorContext context;
    private Event event;

    MyPunctuator(ProcessorContext context, String key, Event event)
    {
        this.context = context;
        this.key = key;
        this.event = event;
    }

    @Override
    public void punctuate(final long timestamp) {
        context.forward(key, AlertEvent.builder().withSource(event).build());
        context.commit();
    }
}

执行时

myStream
    .groupByKey(Serialized.with(Serdes.String(), Event.serde()))
    .reduce((k, v) -> v)
    .transformValues(() -> valueTransformerWithKey)
    .toStream().to(ALARM_TOPIC, Produced.with(Serdes.String(), AlarmEvent.serde()));

我希望标点器产生的报警事件一旦过期,就会被转发到报警主题。

相反,我得到了以下例外:ProcessorContext.forward()不支持。

共有1个答案

壤驷安和
2023-03-14

和往常一样,我在javadoc中找到了关于ValueTransformerWithKey接口的答案:https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/ValueTransformerWithKey.html

请注意,在转换中不允许使用ProcessorContext.forward(Object, Object)或ProcessorContext.forward(Object, Object, To),这将导致异常。

然而,实现Transformer接口允许使用上下文。前进()。谢谢@Matthias J.Sax

https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/Transformer.html

如果多个输出记录应转发到下游ProcessorContext。转发(对象,对象)和ProcessorContext。可以使用forward(Object,Object,To)。若记录不应转发到下游,则transform可以返回null。

 类似资料:
  • 本文向大家介绍Java 转型(向上或向下转型)详解及简单实例,包括了Java 转型(向上或向下转型)详解及简单实例的使用技巧和注意事项,需要的朋友参考一下 在Java编程中经常碰到类型转换,对象类型转换主要包括向上转型和向下转型。 向上转型 我们在现实中常常这样说:这个人会唱歌。在这里,我们并不关心这个人是黑人还是白人,是成人还是小孩,也就是说我们更倾向于使用抽象概念“人”。再例如,麻雀是鸟类的一

  • 在React本机AppState库中: iOS有三种状态背景->非活动->活动 Android只有背景->活动 当一个Android应用完全后台化时,主要活动从onPause->onStop 当有系统通知(例如在应用程序购买)时,它将转到onPause 当应用程序从后台到前台时,我需要运行一些代码 onstop->onResume 如果应用程序由于系统通知 onpause->onResume而短暂

  • 本文向大家介绍MySql游标的使用实例,包括了MySql游标的使用实例的使用技巧和注意事项,需要的朋友参考一下 mysql游标使用的整个过程为: 1.创建游标 2.打开游标 3.使用游标 4.关闭游标 实例代码如下所示:

  • 本文向大家介绍Android下hook点击事件的示例,包括了Android下hook点击事件的示例的使用技巧和注意事项,需要的朋友参考一下 Hook是一种思想,也就是将原来的事件,替换到我们自己的事件,方便我们做一些切入处理。目的是不修改原来的代码,同时也避免遗漏的N多类里面处理。 最近需要在现有的app中设置统计埋点。去业务代码里埋的话似乎耦合度太高。所以决定使用hook的方法对事件进行埋点处理

  • 问题内容: 我正在开发JavaFX项目,并且需要类似于表征 “按下并按住”事件的东西 。但是应该将其映射为一个,因为我在Linux上的触摸事件遇到了麻烦。例如,在Ubuntu中,它不会响应触摸事件。 请让我知道,如果你对如何触发一个任何想法 每当 “按住” 在Linux上出现? 问题答案: 只需将a 用作“保持”的计时器即可。如果按下鼠标,则将其启动;如果释放或拖动,则将其停止。