当前位置: 首页 > 知识库问答 >
问题:

kafka流应用程序从写入中分离读取

贺运良
2023-03-14

我对Kafka和Kafka流很陌生,所以请容忍我。我想知道我是否在正确的轨道上。

我正在给一个Kafka主题写信,试图通过rest服务访问数据。在访问原始数据之前,需要对其进行转换。

到目前为止,我拥有的是一个将原始数据写入主题的制作人。

1)现在我想要streams应用程序(应该是一个在容器中运行的jar),它可以将数据转换为我想要的形状。遵循这里的物化视图范式。

1的过度简化版本。)

    KStreamBuilder builder = new KStreamBuilder();

    KStream<String, String> source = 
    builder.stream("my-raw-data-topic");

    KafkaStreams streams = new KafkaStreams(builder,props);
    KTable<String, Long> t =  source.groupByKey().count("My-Table");
    streams.start();

2)和另一个streams应用程序(应该是一个在容器中运行的jar),该应用程序将KTable作为某种存储库保存,可以通过包装rest服务访问。

在这里,我坚持使用api的正确方法。访问和查询表的最小值是多少?我需要再次将转换拓扑分配给生成器吗?

KStreamBuilder builder = new KStreamBuilder();
KTable table = builder.table("My-Table"); //Casting?
KafkaStreams streams = new KafkaStreams(builder, props);

RestService service = new RestService(table); 
// Use the Table as Repository which is wrapped by a Rest-Service and gets updated reactivly 

现在这是伪代码

我走在正确的道路上吗?分开1.)和2.)有意义吗?这是使用流实现视图的缩进方式吗?对我来说,在我看到更多流量的情况下,通过容器独立扩展写入和读取会有好处。

在崩溃1)或2)时,如何处理KTable的重新填充。这是通过复制到流api来实现的,还是我需要通过代码来解决的。比如重置光标并回复事件?

共有1个答案

鲍永春
2023-03-14

几点意见:

在您的代码片段(1)中,您将构建器交给KafkaStreams构造函数后修改了拓扑:

KafkaStreams streams = new KafkaStreams(builder,props);
// don't modify builder anymore!

您不应该这样做,而是首先指定拓扑,然后创建KafkaStreams实例。

关于将您的应用程序一分为二。独立缩放这两个部分是有意义的。但一般来说很难说。但是,如果您同时吐出这两个部分,第一个需要将转换后的日期写入输出主题,第二个应该将此输出主题读取为表(builder.table("outout-topic-of-变换")以满足REST请求。

要访问KTable的存储,您需要通过提供的存储名称获取查询句柄:

ReadOnlyKeyValueStore keyValueStore =
streams.store("My-Table", QueryableStoreTypes.keyValueStore());

详见文件:

http://docs.confluent.io/current/streams/developer-guide.html#interactive-查询

 类似资料:
  • 我正在使用处理器 API 对状态存储进行一些低级处理。关键是我还需要在存储到商店后写入主题。如何在Spring Cloud Streams Kafka应用程序中完成?

  • 问题内容: 使用KTable时,当实例/使用者数等于分区数时,Kafka流不允许实例从特定主题的多个分区中读取。我尝试使用GlobalKTable实现此目的,但问题是数据将被覆盖,也无法对其应用聚合。 假设我有一个名为“ data_in”的主题,具有3个分区(P1,P2,P3)。当我运行Kafka流应用程序的3个实例(I1,I2,I3)时,我希望每个实例都从“ data_in”的所有分区中读取数据

  • GroupDataSource是Zebra提供的读写分离数据源,主要提供主从分离, 就近路由,端到端监控等功能。 如果你还不理解读写分离的概念,请参考Zebra读写分离介绍。 2.准备 2.1远程配置中心接入(zookeeper) (1)zookeeper配置文件 需要在项目中的src/main/resource目录下面添加zookeeper.properties文件,指定zookeeper的地址

  • 从1.r.58开始, 内置读写分离支持 ioc js形式的配置 NutDaoRunner新增了slaveDataSource属性 dataSource : { // master数据源 }, slaveDataSource : { // slave数据源,负责读 }, dao : { type : "org.

  • 在ReplicaSet副本集环境中,可以通过bugu-mongo设置读写分离。 默认情况下,写操作、读操作,全部都是在Primary上进行,如下图,这可能会导致Primary的负载比较高。 为了降低Primary的负载,可以设置成允许从Secondary读取数据,如下图: 慎重考虑 读写分离,看上去很美,但实际有如下问题需要考虑: 一般情况下,并不建议对ReplicaSet进行读写分离。因为,对于

  • ReplicaLoadBalanceAlgorithm SPI 名称 详细说明 ReplicaLoadBalanceAlgorithm 读库负载均衡算法 已知实现类 详细说明 RoundRobinReplicaLoadBalanceAlgorithm 基于轮询的读库负载均衡算法 RandomReplicaLoadBalanceAlgorithm 基于随机的读库负载均衡算法