当前位置: 首页 > 知识库问答 >
问题:

如何迭代Kafka Streams表的键值

申屠秦斩
2023-03-14

我不熟悉Kafka streams,我尝试通过keyValueStore迭代Kafka streams表中的项:

这个想法是使用KeyValueStore创建一个Ktable(我也尝试过globalKTable)。然后一个分离的线程负责读取KeyValueStore的内容,以便遍历每个键的最后一个值。

      val streamProperties: Properties = {
      val p = new Properties()
      p.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application")
      p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, config.getStringList("kafka.bootstrap.servers").toList.mkString(","))
      p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
      p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray.getClass.getName)
      p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
      p
    }

    val builder: StreamsBuilder = new StreamsBuilder()
    import org.apache.kafka.streams.kstream.Materialized
    import org.apache.kafka.streams.state.KeyValueStore


    val globalTable = builder.table("test",
      Materialized
        .as[String, Array[Byte], KeyValueStore[org.apache.kafka.common.utils.Bytes, Array[Byte]]]("TestStore")
        .withCachingDisabled()
        .withKeySerde(Serdes.String())
        .withValueSerde(Serdes.ByteArray())
    )

    val streams: KafkaStreams = new KafkaStreams(builder.build(), streamProperties)
    streams.start()

    val ex = new ScheduledThreadPoolExecutor(1)
    val task = new Runnable {
      def run() = {
        println("\n\n\n tick \n\n\n")
        try {
          val keyValueStore = streams.store(globalTable.queryableStoreName(), QueryableStoreTypes.keyValueStore())
          keyValueStore.all().toIterator.map { k =>
            print(k.key)
          }
        } catch {
          case _ => println("error")
        }
      }
    }
    val f = ex.scheduleAtFixedRate(task, 1, 10, TimeUnit.SECONDS)
  }
}

在线程中,即使我在主题“test”上生成项目,keyValueStore也会保持为空。

有什么我错过或不明白的吗?

共有1个答案

丁恩
2023-03-14

缺少的一件事是状态目录位置配置:

p.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp")

如果没有它,Kafka流不会抛出异常,但像全局KTables这样有状态的东西将无法默默地工作。

 类似资料:
  • 我有一本字典的格式是 我怎样才能通过这样的方法来遍历这本词典

  • 我有一本字典的格式是 我如何通过执行以下操作来遍历此词典

  • 问题内容: 我有一个像这样的ArrayList对象: 如何遍历列表?我想在TextView中显示值,该值来自ArrayList对象的数据。 问题答案: 最简单的方法是遍历的所有s ,然后遍历的所有键:

  • 问题内容: 使用Python 2.7。我有一本字典,其中以球队名称为键,对每支球队得分并允许的奔跑次数作为值列表: 我希望能够将字典输入一个函数并遍历每个团队(键)。 这是我正在使用的代码。现在,我只能逐队参加。我将如何遍历每个团队并为每个团队打印预期的win_percentage? 谢谢你的帮助。 问题答案: 您有几种选择可以遍历字典。 如果迭代字典本身(),则将迭代字典的键。当使用for循环进

  • 使用JSF 2.0,我需要显示一个表,其中每一行都包含一个打开弹出窗口的链接。我有两种型号:

  • 本文向大家介绍Lua 迭代表,包括了Lua 迭代表的使用技巧和注意事项,需要的朋友参考一下 示例 Lua标准库提供pairs了对表的键和值进行迭代的功能。使用进行迭代时pairs,即使表的键是numeric,也没有指定的遍历顺序。 对于使用数字键的表,Lua提供了一个ipairs功能。该ipairs函数将始终从table[1],table[2]等等进行迭代,直到nil找到第一个值。 请注意,ipa