定义了一个自定义存储,用于自定义变压器(参考下面)。
https://github.com/apache/kafka/blob/trunk/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
public class KafkaStream {
public static void main(String[] args) {
StateStoreSupplier houseStore = Stores.create("HOUSE").withKeys(Serdes.String()).withValues(houseSerde).persistent().build();
KStreamBuilder kstreamBuilder = new KStreamBuilder();
kstreamBuilder.addStateStore(houseStore);
.
.
.
KStream<String, String> testStream = kstreamBuilder.stream(Serdes.String(), Serdes.String(), "test");
testStream.transform(HourlyDetail::new, houseStore.name());
.
.
.
}
}
class HouseDetail implements Transformer<String, String, KeyValue<String, House>> {
@SuppressWarnings("unchecked")
@Override
public void init(ProcessorContext context) {
this.usageStore = (KeyValueStore<String, House>) context.getStateStore("HOUSE");
}
.
.
.
}
我得到以下例外。不确定,为什么内部主题“test_01-HOUSE-changelog”是用单个分区和单个复制创建的,而不是源分区“test”中的两个分区。我错过了什么?
[2018-05-14 23:38:09,391] ERROR stream-thread [StreamThread-1] Failed to create an active task 0_1: (org.apache.kafka.streams.processor.internals.StreamThread:666)
org.apache.kafka.streams.errors.StreamsException: task [0_1] Store HOUSE's change log (test_01-HOUSE-changelog) does not contain partition 1
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:185)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123)
at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:169)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:85)
at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81)
at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:119)
at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:633)
at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:660)
at org.apache.kafka.streams.processor.internals.StreamThread.access$100(StreamThread.java:69)
at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:124)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
$ ./kafka-topics.sh --zookeeper localhost:2181 --topic test --describe
Topic:test PartitionCount:2 ReplicationFactor:3 Configs:
Topic: test Partition: 0 Leader: 1001 Replicas: 1001,1002,1003 Isr: 1002,1001,1003
Topic: test Partition: 1 Leader: 1002 Replicas: 1002,1003,1001 Isr: 1002,1001,1003
$ ./kafka-topics.sh --zookeeper localhost:2181 --topic test_01-HOUSE-changelog --describe
Topic:test_01-HOUSE-changelog PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test_01-HOUSE-changelog Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
禁用自动创建主题后出现异常
[2018-05-17 14:25:41,114] ERROR stream-thread [StreamThread-1] Failed to create an active task 0_0: (org.apache.kafka.streams.processor.internals.StreamThread:666)
org.apache.kafka.streams.errors.StreamsException: task [0_0] Could not find partition info for topic: test_01-HOUSE-changelog
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:174)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123)
at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:169)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:85)
at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81)
at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:119)
at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:633)
at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:660)
at org.apache.kafka.streams.processor.internals.StreamThread.access$100(StreamThread.java:69)
at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:124)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
如果主题存在于一个分区中,Kafka Streams不会自动更改分区数。不清楚为什么从您提供的信息中创建主题时只有一个分区。一种可能是,当您第一次启动应用程序时,您的输入主题只有一个分区,后来您向输入主题添加了第二个分区。
您需要使用文档中描述的应用程序重置工具清理应用程序(注意,这是一个两步过程):https://docs.confluent.io/current/streams/developer-guide/app-reset-tool.html
在kafka中的全局状态存储是否总是一个分区,或者我们可以为全局状态存储更改日志主题设置多个分区? 我找不到任何关于这方面的明确文件。
我目前正在考虑将opengl状态存储为某种适当类型的全局thread_local变量。那个设计有多糟糕?有什么陷阱吗?
我用状态存储构建了一个kafka流媒体应用程序。现在我正在尝试扩展这个应用程序。当在三个不同的服务器上运行应用程序时,Kafka会随机拆分分区和状态存储。 例如: Instance1获取:分区-0,分区-1 Instance2获取:partition-2,stateStore-repartition-0 Instance3获取:stateStore-重新分区-1,stateStore-重新分区-2
问题内容: 广泛的讨论问题。是否已经有任何库可以让我在Java中存储应用程序的执行状态? 例如,我有一个处理文件的应用程序,现在该应用程序可能在某个时刻被迫关闭。我想存储所有已处理文件和未处理文件的信息,以及处理正在进行的阶段正在进行的流程。 是否已经有抽象此功能的库,或者我将不得不从头开始实现它? 问题答案: 似乎您正在寻找的是可以使用Java Serialization API 执行的序列化。
null 谢谢你的澄清。
区分状态和更新 在Subversion,我们已经设法抹去cvs status和cvs update之间的混乱。 cvs status命令有两个目的:第一,显示用户在工作拷贝的所有本地修改,第二,显示给用户哪些文件是最新的。很不幸,因为CVS难以阅读的状态输出,许多CVS用户并没有充分利用这个命令的好处。相反,他们慢慢习惯运行cvs update或cvs -n update来快速查看区别,如果用户忘