当前位置: 首页 > 知识库问答 >
问题:

Kafka流:在左联接期间不初始化状态存储

鲜于谦
2023-03-14

我试着加入两个Kafka的话题。一个是KStream,另一个是Ktable。左联接抱怨处理器的状态存储不存在。我确实查看了kafka、GitHub和其他地方的许多代码示例,其中StateStore不是由KStream客户机代码显式创建的。请告知以下代码中缺少什么。

应用程序流与users表保持连接,以发出app和user一起的记录。应用程序的所有者是用户。

版本:1.1.0

  public void process() {
    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
    config.put(StreamsConfig.CLIENT_ID_CONFIG, CLIENT_ID);
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, Common.KAFKA_SOCKET);
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, CustomSerdes.applicationSerde);
    config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyEventTimeExtractor.class);
    config.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");

    // User properties: userid, username
    KTable<String, User> users = new StreamsBuilder().table(TOPIC_USERS,
        Consumed.with(Serdes.String(), CustomSerdes.serdeFor(User.class)));

    StreamsBuilder builder = new StreamsBuilder();
    // Application properties: id, name
    KStream<String, Application> stream = builder.stream(TOPIC_APPLICATIONS);

    stream.
        map((appId, app) -> KeyValue.pair(app.getOwnerId(), app.getAppId()))
        .leftJoin(users, (app, user) -> "a:" + app + " u:" + user.getUserName())
        .to(OUTPUT_TOPIC);

    KafkaStreams streams = new KafkaStreams(builder.build(), config);
    StreamsManager.startAndHandleShutdown(streams);
  }
Exception in thread "main" org.apache.kafka.streams.errors.TopologyException: Invalid topology: StateStore topic-users-STATE-STORE-0000000000 is not added yet.
    at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.connectProcessorAndStateStore(InternalTopologyBuilder.java:716)
    at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.connectProcessorAndStateStores(InternalTopologyBuilder.java:615)
    at org.apache.kafka.streams.kstream.internals.KStreamImpl.doStreamTableJoin(KStreamImpl.java:797)
    at org.apache.kafka.streams.kstream.internals.KStreamImpl.leftJoin(KStreamImpl.java:817)
    at org.apache.kafka.streams.kstream.internals.KStreamImpl.leftJoin(KStreamImpl.java:805)
    at com.test.streams.users.AppWithUserConsumerMain.process(AppWithUserConsumerMain.java:50)

共有1个答案

殳飞扬
2023-03-14

为了能够使用join,联接的两个部分 ;(在您的示例中为KStream 和KTable) 应从相同的 ;StreamsBuilder创建,因此它们将属于相同的拓扑。

在您的示例中,创建了两个StreamsBuilder,结果,KStream 和KTable不属于同一拓扑。

 类似资料:
  • 我正在开发我的第一个 Swing 应用程序,现在提出了一个难题:在静态初始化期间或开始实际执行后执行引导和资源初始化。我是什么意思...我有单例: 因此,方法如下所示 或者,也许我在启动后手动初始化资源,然后运行它。逻辑上正确的方式是什么?

  • 本文向大家介绍react-native 初始化状态,包括了react-native 初始化状态的使用技巧和注意事项,需要的朋友参考一下 示例 您应该像这样在组件的构造函数内部初始化状态: 使用setState可以更新视图。

  • 我在试图通过Kafka流实现以下目标时遇到了一些困难: 在应用程序启动时,(压缩的)主题α被加载到键值StateStore中 Kafka流从另一个主题中消费,使用上面的映射(get),并最终在主题alpha中生成一个新记录 结果是,即使拖缆重新启动,内存中的映射也应与底层主题对齐 我的方法如下: 装载机Treamer(store): : ...但是我得到的是: 试图获取存储处理程序时。 你知道如何

  • 我正在开发使用Spring Cloud Stream构建的Kafka Streams应用程序。在这个应用程序中,我需要: 使用可在以后检索的连续消息流。 保留与某些条件匹配的邮件 ID 列表。 在单独的线程中,运行一个计划程序,该计划程序定期读出消息 ID,检索与这些 ID 匹配的相应消息,并对这些消息执行操作。 从列表中删除已处理的消息 ID,以便不重复工作。 我已考虑如下实施: < li >将

  • 我有一个EJB无状态会话Bean。我有以下要求: 这个无状态EJB应该在启动时初始化 初始化代码应该对数据库进行事务性访问 问题是: @Startup仅适用于@Singleton EJB @PostConstruct注释(至少在WebSphere上)在这一点上没有事务性上下文,所以初始化代码在这里爆炸! 可能的解决方案? 使用JavaEE定时器,但它似乎是为周期性执行而设计的。我只想在零点执行一次