当前位置: 首页 > 知识库问答 >
问题:

触发后取消Kafka Streams上的标点符号

慕光霁
2023-03-14

我在转换器上创建了一个定时标点器,并将其定期运行(使用kafka v2.1.0)。每次我接受一个特定的密钥时,我都会创建一个这样的新密钥

        scheduled = context.schedule (Duration.ofMillis(scheduleTime),
             PunctuationType.WALL_CLOCK_TIME,new CustomPunctuator(context, customStateStoreName));

我的问题是,我创建的所有这些标点符号都经常运行,我找不到取消它们的方法。我在互联网上找到了一个片段来使用

    private Cancellable scheduled;

    @Override
    public void init(PorcessorContext processContext) {
        this.context = processorContext;
        scheduled = context.schedule(TimeUnit.SECONDS.toMillis(5), PunctuationType.WALL_CLOCK_TIME,
                                     this::punctuateCancel);
    }

    private void punctuateCancel(long timestamp) {
        scheduled.cancel();
    }

但不幸的是,这似乎只取消了最新创建的标点符号。

我编辑我的帖子只是为了进一步了解我的方法,以及这与沃兹尼亚的评论之间的关系。所以我的方法非常类似,只是使用一个映射,因为每个事件只需要一个标点或活动键,所以在我的Transformer类中,我启动

   private Map<String,Cancellable> scheduled  = new HashMap<>();

在我的转换方法中,我执行下面的代码

{
 final Cancellable cancelSched = scheduled.get(recordKey);

 // Every time I get a new event I cancel my previous Punctuator
 // and schedule a new one ( context.schedule a few lines later)

 if(cancelSched != null)
    cancelSched.cancel();

 // This is supposed to work like a closure by capturing the currentCancellable which in the next statement
 // is moved to the scheduled map. Scheduled map at any point will have the only active Punctuator for a
 // specific String as it is constantly renewed
 // Note: Previous registered punctuators have already been cancelled as it can be seen by the previous
 // statement (cancelSched.cancel();)

 Cancellable currentCancellable = context.schedule(Duration.ofMillis(scheduleTime), PunctuationType.WALL_CLOCK_TIME,
                new CustomPunctuator(context, recordKey ,()-> scheduled ));

 // Update Active Punctuators for a specific key.

 scheduled.put(recordKey,currentCancellable);   
}

我在标点符号方法上使用注册的回调来取消启动后的最后一个活动标点符号。它看起来很有效(虽然不确定),但感觉非常“hacky”,不是那种肯定可取的解决方案

那么,我如何取消标点符号或在触发后取消标点符号呢。有办法解决这个问题吗?

共有2个答案

丁业
2023-03-14

我也遇到过同样的情况,只是为了那些对scala感兴趣的人,我把它当成了自己的工作

val punctuation = new myPunctuation()
val scheduled:Cancellable=context.schedule(Duration.ofSeconds(5), PunctuationType.WALL_CLOCK_TIME, punctuation)
punctuation.schedule=scheduled

班级

class myPunctuation() extends Punctuator{
      var schedule: Cancellable = _
      override def punctuate(timestamp: Long): Unit = {
      println("hello")
      schedule.cancel()
      }

    }

很有魅力

夏侯彬郁
2023-03-14

我认为你可以做以下一件事:

class CustomPunctuator implements Punctuator {
  final Cancellable schedule;

  public void punctuate(final long timestamp) {
    // business logic
    if (/* do cancel */) {
      schedule.cancel()
    }
  }
}

// registering a punctuation
{
  final CustomPunctuator punctuation = new CustomPunctuator();
  final Cancellable currentCancellable = context.schedule(
    Duration.ofMillis(scheduleTime),
    PunctuationType.WALL_CLOCK_TIME,
    punctuation);

  punctuation.schedule = currentCancellable;
}

这样,您就不需要维护HashMap,也不需要为每个customsparentor实例提供一种取消自身的方法。

 类似资料:
  • 我想知道对于quartz是否有一个简单的解决方案/hack来触发一个在集群中的每个节点上都被删除的作业。 我的情况:我的应用程序正在缓存一些东西,并且运行在一个没有分布式缓存的集群中。现在,我需要刷新作业触发的所有节点上的缓存。

  • 我是石英调度器的新手。我有一个批处理文件,它将需要3分钟运行。我需要运行这批每2分钟使用石英调度器。所以我每天安排3个小时。我的问题是我需要检查第一个触发器的状态,如果它不是完整的状态,我需要从这个工作出来。我需要继续我安排的下一个工作。说明:作业53触发器在上午11.30开始,下一个触发器在上午11.32开始,下一个触发器在上午11.34开始,我需要检查上午11.30的触发器状态,如果它不是co

  • 本文向大家介绍javascript触发模拟鼠标点击事件,包括了javascript触发模拟鼠标点击事件的使用技巧和注意事项,需要的朋友参考一下 事件触发器就是用来触发某个元素下的某个事件,IE下fireEvent方法,高级浏览器(chrome,firefox等)有dispatchEvent方法。 一般我们在元素上绑定事件后,是靠用户在这些元素上的鼠标行为来捕获或者触发事件的,或者自带的浏览器行为事

  • 我在谷歌标签管理器中使用点击触发器时遇到困难。 我想设置一个触发器来触发单击事件,只有当元素类包含"scrollto"时。 但问题是,即使“scrollto”类不是我单击的元素的一部分,它也会不断启动。 这里有一些截图,我希望能帮助你理解这个问题: 谢谢你的帮助,亚历克西斯

  • 问题内容: 有一个在我的可调整大小的标题列的应用程序。通常,当我将光标移到表标题上方以调整大小时,光标图标会更改为调整大小的箭头,例如<->。 但是在以下情况下情况有所不同。 在同一按钮操作中,在执行操作期间,我将光标设置为忙碌图标,并在完成操作后使用方法将其更改为默认光标。 有时,如果将光标移到调整大小的表标题上,则在执行按钮操作后,光标图标不会更改为调整大小箭头,光标也不会更改。 可以将其视为

  • 原则 中文语句的标点符号,均应该采取全角符号,这样可以保证视觉的一致。 如果整句为英文,则该句使用英文/半角标点。 句号、问号、叹号、逗号、顿号、分号和冒号不得出现在一行之首。 句号 中文语句中的结尾处应该用全角句号(。)。 句子末尾用括号加注时,句号应在括号之外。 错误:关于文件的输出,请参照第 1.3 节(见第 26 页。) 正确:关于文件的输出,请参照第 1.3 节(见第 26 页)。 逗号