Spring云流是否支持下面的Kafka流应用程序。下面是Kafka示例应用程序摘录中的代码。感谢您的任何反馈或支持。
...
StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<String, Purchase> purchaseKStream = streamsBuilder.stream.....
KStream<String, PurchasePattern> patternKStream = purchaseKStream.mapValues...
patternKStream.print(Printed.<String, PurchasePattern>toSysOut().withLabel("patterns"));
patternKStream.to("patterns", Produced.with(stringSerde, purchasePatternSerde));
purchaseKStream.print(Printed.<String, Purchase>toSysOut().withLabel("purchases"));
purchaseKStream.to("purchases", Produced.with(stringSerde, purchaseSerde));
// adding State to processor
String rewardsStateStoreName = "rewardsPointsStore";
RewardsStreamPartitioner streamPartitioner = new RewardsStreamPartitioner();
KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore(rewardsStateStoreName);
StoreBuilder<KeyValueStore<String, Integer>> storeBuilder = Stores.keyValueStoreBuilder(storeSupplier.....
streamsBuilder.addStateStore(storeBuilder);
KStream<String, Purchase> transByCustomerStream = purchaseKStream.through("customer_transactions",....
KStream<String, RewardAccumulator> statefulRewardAccumulator = transByCustomerStream
.transformValues(() -> new PurchaseRewardTransformer(rewardsStateStoreName), rewardsStateStoreName);
statefulRewardAccumulator.print(Printed.<String, RewardAccumulator>toSysOut().withLabel("stateful-rewards"));
statefulRewardAccumulator.to("rewards", Produced.with(stringSerde, rewardAccumulatorSerde));
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), getProperties());
假设您使用的是Spring Cloud StreamHorsham(3.0.0)
版本,以下伪代码应该可以工作。我没有测试此代码,但这应该适用于目标的正确配置等。请查看文档。
@SpringBootApplicaiton
public class SampleApp {
public static void main(String[] args) {
SpringApplication.run(SampleApp.class, args);
}
@Bean
public Function<KStream<String, Purchase>, KStream<String, RewardAccumulator>> process() {
return purchaseKStream -> {
purchaseKStream.print(Printed.<String, Purchase>toSysOut().withLabel("purchases"));
KStream<String, PurchasePattern> patternKStream = purchaseKStream.mapValues...
patternKStream.print(Printed.<String, PurchasePattern>toSysOut().withLabel("patterns"));
patternKStream.to("patterns", Produced.with(stringSerde, purchasePatternSerde));
purchaseKStream.to("purchases", Produced.with(stringSerde, purchaseSerde));
KStream<String, Purchase> transByCustomerStream = purchaseKStream.through("customer_transactions",....
KStream<String, RewardAccumulator> statefulRewardAccumulator = transByCustomerStream.transformValues(()
statefulRewardAccumulator.print(Printed.<String, RewardAccumulator>toSysOut().withLabel("stateful-rewards"));
return statefulRewardAccumulator;
}
}
@Bean
public StoreBuilder storeBuilder() {
String rewardsStateStoreName = "rewardsPointsStore";
RewardsStreamPartitioner streamPartitioner = new RewardsStreamPartitioner();
KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore(rewardsStateStoreName);
StoreBuilder<KeyValueStore<String, Integer>> storeBuilder = Stores.keyValueStoreBuilder(storeSupplier.....
return storeBuilder;
}
}
问题内容: 我正在将Java代码库迁移到纯Scala,并且只能使用这一段代码。我有一个IntervalMap的实现,即一个数据结构,可让您有效地将范围映射到,和操作全部所在的位置(与IntervalTree或SegmentTree略有不同)。 这段代码使用Java,并且在迁移到Scala时遇到了两个大问题: Scala没有-我决定使用(奇怪的Scala有但没有)存储密钥并将值存储在辅助中来解决它。
问题内容: 我正在从elasticsearch1.4.3迁移到2.4,并替换了elasticsearch文档中引用的一段代码,而其他参考则需要替换andFilter? 码:- 问题答案: 您可以这样做:
我升级为不和谐。JSV12,但它破坏了我现有的v11代码。下面是一些导致错误的示例: 如何将代码迁移到Discord。JSV12并修复这些错误?在哪里可以看到v12引入的突破性更改?
Pillow 是对 PIL 的功能增加,想要在 Pillow 下运行 PIL 的代码,只需要: 把这个: import Image 修改成: from PIL import Image 注意,:py:mod:`_imaging` 模块已经被移除,现在可以这样导入:: from PIL.Image import core as _imaging 另外,图像插件导入机制已经改变。Pi
问题内容: 我正在从弹性搜索1.4.3迁移到2.4,并且替换了弹性搜索文档中引用的一段代码,而其他参考则需要替换andFilter? 码:- 问题答案: 您可以这样做:
在我编写的一个应用程序中,我有一个从Core-Data解析大量数据并将其显示到图形中的过程。在进行此处理时,我最终还将数据写入CSV文件。我创建了一个名为CSVLine的单独类,它有助于创建CSV文件。 对于我的140k测试用例,记录了我的Objective-C代码需要大约12秒才能运行。将类“迁移”到swift后,现在需要280-360秒才能运行。显然我做了一些可怕的事情。 使用仪器,我能够识别