我有多个冗余的应用程序实例,希望消费一个主题的所有事件,并存储它们独立的磁盘查找(通过一个rocksdb)。
为了便于讨论,让我们假设这些冗余消费者正在服务无状态http请求;因此,不使用kafka共享负载,而是使用kafka将数据从生产者复制到每个实例LocalStore中。
在查看生成的主题时,每个消费应用程序创建了3个额外的主题:
下面是创建存储区的代码
public class ProgramMappingEventStoreFactory {
private static final Logger logger = Logger.getLogger(ProgramMappingEventStoreFactory.class.getName());
private final static String STORE_NAME = "program-mapping-store";
private final static String APPLICATION_NAME = "epg-mapping-catalog_program-mapping";
public static ReadOnlyKeyValueStore<ProgramMappingEventKey, ProgramMappingEvent> newInstance(String kafkaBootstrapServerUrl,
String avroRegistryUrl,
String topic,
String storeDirectory)
{
Properties kafkaConfig = new KafkaConfigBuilder().withBootstrapServers(kafkaBootstrapServerUrl)
.withSchemaRegistryUrl(avroRegistryUrl)
.withApplicationId(createApplicationId(APPLICATION_NAME))
.withGroupId(UUID.randomUUID().toString())
.withClientId(UUID.randomUUID().toString())
.withDefaultKeySerdeClass(SpecificAvroSerde.class)
.withDefaultValueSerdeClass(SpecificAvroSerde.class)
.withStoreDirectory(storeDirectory)
.build();
StreamsBuilder streamBuilder = new StreamsBuilder();
bootstrapStore(streamBuilder, topic);
KafkaStreams streams = new KafkaStreams(streamBuilder.build(), kafkaConfig);
streams.start();
try {
return getStoreAndBlockUntilQueryable(STORE_NAME,
QueryableStoreTypes.keyValueStore(),
streams);
} catch (InterruptedException e) {
throw new IllegalStateException("Failed to create the LiveMediaPolicyIdStore", e);
}
}
private static <T> T getStoreAndBlockUntilQueryable(String storeName,
QueryableStoreType<T> queryableStoreType,
KafkaStreams streams)
throws InterruptedException
{
while (true) {
try {
return streams.store(storeName, queryableStoreType);
} catch (InvalidStateStoreException ignored) {
Thread.sleep(100);
}
}
}
private static void bootstrapStore(StreamsBuilder builder, String topic) {
KTable<ProgramMappingEventKey, ProgramMappingEvent> table = builder.table(topic);
table.groupBy((k, v) -> KeyValue.pair(k, v)).reduce((newValue, aggValue) -> newValue,
(newValue, aggValue) -> null,
Materialized.as(STORE_NAME));
}
private static String createApplicationId(String applicationName) {
try {
return String.format("%s-%s", applicationName, InetAddress.getLocalHost().getHostName());
} catch (UnknownHostException e) {
logger.warning(() -> "Failed to find the hostname, generating a uique applicationId");
return String.format("%s-%s", applicationName, UUID.randomUUID());
}
}
}
如果要将相同的状态加载到多个实例中,应该在所有实例上使用globalktable
和唯一的application.id
(builder.globaltable()
)。
如果使用ktable
,则会对数据进行分区,迫使您对每个实例使用不同的application.id
。这可以被认为是一种反模式。
我也不确定为什么要groupby((k,v)->keyvalue.pair(k,v)).reduce()
--这会导致不必要的重新分区主题。
我有一个kafka流应用程序,它在其中使用stateStore(由RocksDB支持)。 stream thread所做的只是从kafka主题获取数据并将数据放入State Store。(还有其他线程从statestore读取数据并进行业务逻辑处理)。 我观察到它创造了一个新的Kafka主题“变化日志”,因为Statestore。 但我没有明白“变化”Kafka话题有什么用处? 为什么需要它(更改
我想在生成一条发送给Kafka主题的消息后,获取偏移量和分区信息。我通读了spring cloud stream kafka绑定文档,发现这可以通过fecting RECORD\u元数据kafka头来实现。 来自Spring文档:(https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.R
我有一个要求加入3个Kafka主题。前两个主题A和B将使用inner join添加,因为消息键相同,并且生成一个POJO与B相同的新Kafka流。现在,使用这个累积的流,我需要加入另一个主题C,并且我需要根据C中存在的字段对输出进行分组。 到目前为止,我有以下方法: 前两个主题(A和B)的KStream-KStream inner join是否可以不发布任何主题的累积输出,并且仍然可以在下面使用它
问题内容: 在轮询Kafka时,我已经使用该功能订阅了多个主题。现在,我想设置的偏离,我想从每个主题阅读,而无需每次重新订阅后,并从一个话题。 在轮询数据之前,是否可以迭代调用每个主题名称 来 达到结果?偏移量如何精确存储在Kafka中? 我每个主题有一个分区,并且只有一个使用者可以读取所有主题。 问题答案: Kafka如何存储每个主题的偏移量? 卡夫卡已将抵销存储从动物园管理员转移到卡夫卡经纪人
我需要获取存储中的行数,存储在低级处理器API中维护。我看到,方法“近似数字条目()”可以在此存储中提供键值映射的近似计数。你能澄清一下准确度的%吗,这意味着如果商店里有100行,我们会得到95行作为近似计数吗?或者它有时会低于50行吗?只是想了解影响计数准确性的因素。 注意:假设流应用程序使用单个主题并在单个实例上运行。存储是通过低级处理器API访问的,不确定默认情况下是否应用了任何缓存。提交频
我要用Kafka流计算平均值。所以我做了一个有状态的操作,聚合,需要创建一个状态存储,但是这种情况不会发生。 这里是平均值的函数: 以下是例外情况: 问题是基本目录不存在,但我希望kafka流在必要时创建目录。 编辑 ----- 我注意到,如果我有1个处理器,使一个变量的平均值没有问题,但如果我有2个处理器是。 1个处理器的配置文件: 2个处理器的配置文件: 现在我启动处理器: 类型元组包含配置文