当前位置: 首页 > 知识库问答 >
问题:

将Kafka Streams代码迁移到Spring Cloud Stream?

纪秋月
2023-03-14

Spring云流是否支持下面的Kafka流应用程序。下面是Kafka示例应用程序摘录中的代码。感谢您的任何反馈或支持。

        ...
        StreamsBuilder streamsBuilder = new StreamsBuilder();

        KStream<String, Purchase> purchaseKStream = streamsBuilder.stream.....
        KStream<String, PurchasePattern> patternKStream = purchaseKStream.mapValues...
        patternKStream.print(Printed.<String, PurchasePattern>toSysOut().withLabel("patterns"));
        patternKStream.to("patterns", Produced.with(stringSerde, purchasePatternSerde));
        purchaseKStream.print(Printed.<String, Purchase>toSysOut().withLabel("purchases"));
        purchaseKStream.to("purchases", Produced.with(stringSerde, purchaseSerde));

        // adding State to processor
        String rewardsStateStoreName = "rewardsPointsStore";
        RewardsStreamPartitioner streamPartitioner = new RewardsStreamPartitioner();
        KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore(rewardsStateStoreName);
        StoreBuilder<KeyValueStore<String, Integer>> storeBuilder = Stores.keyValueStoreBuilder(storeSupplier.....
        streamsBuilder.addStateStore(storeBuilder);
        KStream<String, Purchase> transByCustomerStream = purchaseKStream.through("customer_transactions",....
        KStream<String, RewardAccumulator> statefulRewardAccumulator = transByCustomerStream
                .transformValues(() -> new PurchaseRewardTransformer(rewardsStateStoreName), rewardsStateStoreName);
        statefulRewardAccumulator.print(Printed.<String, RewardAccumulator>toSysOut().withLabel("stateful-rewards"));
        statefulRewardAccumulator.to("rewards", Produced.with(stringSerde, rewardAccumulatorSerde));

        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), getProperties());

共有1个答案

楚知
2023-03-14

假设您使用的是Spring Cloud StreamHorsham(3.0.0)版本,以下伪代码应该可以工作。我没有测试此代码,但这应该适用于目标的正确配置等。请查看文档。

@SpringBootApplicaiton
public class SampleApp {

  public static void main(String[] args) {
   SpringApplication.run(SampleApp.class, args);
  }

   @Bean
   public Function<KStream<String, Purchase>, KStream<String, RewardAccumulator>> process() {
     return purchaseKStream -> {
           purchaseKStream.print(Printed.<String, Purchase>toSysOut().withLabel("purchases"));
           KStream<String, PurchasePattern> patternKStream = purchaseKStream.mapValues...
           patternKStream.print(Printed.<String, PurchasePattern>toSysOut().withLabel("patterns"));
           patternKStream.to("patterns", Produced.with(stringSerde, purchasePatternSerde));
                purchaseKStream.to("purchases", Produced.with(stringSerde, purchaseSerde));

           KStream<String, Purchase> transByCustomerStream = purchaseKStream.through("customer_transactions",....
           KStream<String, RewardAccumulator> statefulRewardAccumulator = transByCustomerStream.transformValues(()
                statefulRewardAccumulator.print(Printed.<String, RewardAccumulator>toSysOut().withLabel("stateful-rewards"));

           return statefulRewardAccumulator;
       }        
   }

   @Bean
   public StoreBuilder storeBuilder() {
     String rewardsStateStoreName = "rewardsPointsStore";
     RewardsStreamPartitioner streamPartitioner = new RewardsStreamPartitioner();
     KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore(rewardsStateStoreName);
     StoreBuilder<KeyValueStore<String, Integer>> storeBuilder = Stores.keyValueStoreBuilder(storeSupplier.....
     return storeBuilder;
   }

}
 类似资料:
  • 问题内容: 我正在将Java代码库迁移到纯Scala,并且只能使用这一段代码。我有一个IntervalMap的实现,即一个数据结构,可让您有效地将范围映射到,和操作全部所在的位置(与IntervalTree或SegmentTree略有不同)。 这段代码使用Java,并且在迁移到Scala时遇到了两个大问题: Scala没有-我决定使用(奇怪的Scala有但没有)存储密钥并将值存储在辅助中来解决它。

  • 问题内容: 我正在从elasticsearch1.4.3迁移到2.4,并替换了elasticsearch文档中引用的一段代码,而其他参考则需要替换andFilter? 码:- 问题答案: 您可以这样做:

  • 我升级为不和谐。JSV12,但它破坏了我现有的v11代码。下面是一些导致错误的示例: 如何将代码迁移到Discord。JSV12并修复这些错误?在哪里可以看到v12引入的突破性更改?

  • Pillow 是对 PIL 的功能增加,想要在 Pillow 下运行 PIL 的代码,只需要: 把这个: import Image 修改成: from PIL import Image 注意,:py:mod:`_imaging` 模块已经被移除,现在可以这样导入:: from PIL.Image import core as _imaging 另外,图像插件导入机制已经改变。Pi

  • 问题内容: 我正在从弹性搜索1.4.3迁移到2.4,并且替换了弹性搜索文档中引用的一段代码,而其他参考则需要替换andFilter? 码:- 问题答案: 您可以这样做:

  • 在我编写的一个应用程序中,我有一个从Core-Data解析大量数据并将其显示到图形中的过程。在进行此处理时,我最终还将数据写入CSV文件。我创建了一个名为CSVLine的单独类,它有助于创建CSV文件。 对于我的140k测试用例,记录了我的Objective-C代码需要大约12秒才能运行。将类“迁移”到swift后,现在需要280-360秒才能运行。显然我做了一些可怕的事情。 使用仪器,我能够识别