当前位置: 首页 > 知识库问答 >
问题:

Kafka Streams-具有字段状态和任务/线程模型的转换器

公胤运
2023-03-14

我有一个带有状态存储的转换器,它使用标点符号来操作所述状态存储。重复几次标点符号之后,操作可能已经完成,所以我想取消标点符号——但只针对实际完成分区各自状态存储上操作的任务。尚未完成的任务的标点操作应继续运行。为此,我的transformer保留了对schedule()返回的可取消项的引用。

据我所知,每个任务总是有自己的隔离Transex实例,每个任务都有自己的隔离计划标点符号()在该实例中(?)

但是,由于这实际上是状态,但不是在stateStore中,我不确定这有多安全。例如,是否有某些场景可以跨任务共享一个转换器实例(因此绝对不能将状态保留在StateStores之外)?


public class CoolTransformer implements Transformer {

  private KeyValueStore stateStore;
  private Cancellable taskPunctuate; // <----- Will this lead to conflicts between tasks?


  public void init(ProcessorContext context) {
    this.store =  context.getStateStore(...);
    this.taskPunctuate = context.schedule(Duration.ofMillis(...), PunctuationType.WALL_CLOCK_TIME, this::scheduledOperation);
    
  }

  private void scheduledOperation(long l) {
    stateStore.get(...)
    // do stuff...

    if (done) {
      this.taskPunctuate.cancel(); // <----- Will this lead to conflicts between tasks?
    }
  }

  public KeyValue transform(key, value) {
    // do stuff
    stateStore.put(key, value)  
  }

  public void close() {
    taskPunctuate.cancel();
  }
}


共有1个答案

顾文昌
2023-03-14

您也许可以查看TransformerProviier,特别是TransformProviier#get(),这将确保我们在应该保持独立的时候创建新的变压器。此外,变形金刚不应共享对象,因此请注意您的Cempable taskPunctuate。如果违反了这些情况中的任何一种,您应该会看到诸如org.apache.kafka.streams.errors.StreamsException:当前节点未知ConvoltModificationExceptionInstanceAlreadyExistsException之类的错误。

 类似资料:
  • 我只看到两种方法: 将所有字段设置为受保护(或包可见性) 为所有字段生成50个getter 创建50个委托方法来获取这些字段的状态,而不是整个对象的状态 在正常情况下,我会选择最后一个(尽管我真的不想仅仅出于测试的原因而改变界面)。但是在我的类中编写50个新方法只是为了测试真的有意义吗?在这种情况下,将字段设置为受保护的不是更好吗,因为有这么多方法,类会变得不清楚? 注意。反射是我想使用的最后一个

  • 在我的React JS项目中,我正在处理。我已经通过了使用进行私有路由和身份验证的示例。 https://reacttraining.com/react-router/web/example/auth-workflow 根据这个留档,他们创建了一个作为无状态组件。 但我的要求是将其转换为有状态的React组件,因为我想将我的组件连接到redux存储。 这是我的代码。 无状态组件 我将这个组件转换成

  • 用来管理任务的所有信息都被保存在一个特别的段中,任务状态段(TSS)。图7-1 显示了80386的TSS的格式(另一种类型用来执行80286任务,参看第13章) TSS 状态段由两部分组成: 1、 动态部分,处理器在每次任务切换时会设置这些字段值: 通用寄存器(EAX, ECX, EDX, EBX, ESP, EBP, ESI, EDI)。 段寄存器(ES,CS,SS,DS,FS,GS) 状态寄存

  • 问题内容: 我正在开发一个多租户应用程序,其中一些用户可以定义自己的数据字段(通过管理员)以收集表单中的其他数据并报告数据。后一点使得JSONField不是一个很好的选择,所以我有以下解决方案: 请注意,CustomDataField如何具有Site的ForeignKey-每个Site将具有一组不同的自定义数据字段,但是使用相同的数据库。然后可以将各种具体的数据字段定义为: 这导致以下用途: 但这

  • 在使用CPU的高负载应用程序上进行线程转储,我看到很多线程处于这种状态: “ajp executor threads-XXXXXX”prio=10 tid=0x00002b04b8b33801 nid=0x5327可运行[0x0000000000000000]java。lang.Thread。状态:可运行 对我来说,真正奇怪的是根本没有堆栈跟踪,ajp线程的总数高于配置的最大线程数(见下文) 当应

  • 多任务是一个操作系统可以同时执行多个程序的能力。基本上,操作系统使用一个硬件时钟为同时执行的每个程序配置「时间片段」。如果时间片段够小,并且机器也没有由于太多的程序而超出负荷时,那么在使用者看来,所有的这些程序似乎在同时执行着。 多任务并不是什么新的东西。在大型计算机上,多任务是必然的。这些大型主机通常有几十甚至几百个终端机和它连结,而每个终端机使用者都应该感觉到他或者她独占了整个计算机。另外,大