我不熟悉Kafka streams,我尝试通过keyValueStore迭代Kafka streams表中的项:
这个想法是使用KeyValueStore创建一个Ktable(我也尝试过globalKTable)。然后一个分离的线程负责读取KeyValueStore的内容,以便遍历每个键的最后一个值。
val streamProperties: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, config.getStringList("kafka.bootstrap.servers").toList.mkString(","))
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray.getClass.getName)
p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
p
}
val builder: StreamsBuilder = new StreamsBuilder()
import org.apache.kafka.streams.kstream.Materialized
import org.apache.kafka.streams.state.KeyValueStore
val globalTable = builder.table("test",
Materialized
.as[String, Array[Byte], KeyValueStore[org.apache.kafka.common.utils.Bytes, Array[Byte]]]("TestStore")
.withCachingDisabled()
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.ByteArray())
)
val streams: KafkaStreams = new KafkaStreams(builder.build(), streamProperties)
streams.start()
val ex = new ScheduledThreadPoolExecutor(1)
val task = new Runnable {
def run() = {
println("\n\n\n tick \n\n\n")
try {
val keyValueStore = streams.store(globalTable.queryableStoreName(), QueryableStoreTypes.keyValueStore())
keyValueStore.all().toIterator.map { k =>
print(k.key)
}
} catch {
case _ => println("error")
}
}
}
val f = ex.scheduleAtFixedRate(task, 1, 10, TimeUnit.SECONDS)
}
}
在线程中,即使我在主题“test”上生成项目,keyValueStore也会保持为空。
有什么我错过或不明白的吗?
缺少的一件事是状态目录位置配置:
p.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp")
如果没有它,Kafka流不会抛出异常,但像全局KTables这样有状态的东西将无法默默地工作。
我有一本字典的格式是 我怎样才能通过这样的方法来遍历这本词典
我有一本字典的格式是 我如何通过执行以下操作来遍历此词典
问题内容: 我有一个像这样的ArrayList对象: 如何遍历列表?我想在TextView中显示值,该值来自ArrayList对象的数据。 问题答案: 最简单的方法是遍历的所有s ,然后遍历的所有键:
问题内容: 使用Python 2.7。我有一本字典,其中以球队名称为键,对每支球队得分并允许的奔跑次数作为值列表: 我希望能够将字典输入一个函数并遍历每个团队(键)。 这是我正在使用的代码。现在,我只能逐队参加。我将如何遍历每个团队并为每个团队打印预期的win_percentage? 谢谢你的帮助。 问题答案: 您有几种选择可以遍历字典。 如果迭代字典本身(),则将迭代字典的键。当使用for循环进
使用JSF 2.0,我需要显示一个表,其中每一行都包含一个打开弹出窗口的链接。我有两种型号:
本文向大家介绍Lua 迭代表,包括了Lua 迭代表的使用技巧和注意事项,需要的朋友参考一下 示例 Lua标准库提供pairs了对表的键和值进行迭代的功能。使用进行迭代时pairs,即使表的键是numeric,也没有指定的遍历顺序。 对于使用数字键的表,Lua提供了一个ipairs功能。该ipairs函数将始终从table[1],table[2]等等进行迭代,直到nil找到第一个值。 请注意,ipa