当前位置: 首页 > 知识库问答 >
问题:

Spring cloud kafka Stream Binder transactionIdPrefix性能大获成功

郑宏朗
2023-03-14

我们有一个Kafka过程,在这个过程中,我们消费来自一个主题的消息,然后进行一些充实,然后我们将消息发布到另一个主题。以下是事件

  • 消费者-消费信息

我正在使用Spring Cloud kafka活页夹版本3.0.0-RELEASE,事情进展顺利。最近我们引入了幂等生产者,并包含了transactionIdPrefix属性,我们观察到我们开始出现性能问题。以下是统计数据

  • 在transactionIdPrefix之前,系统需要大约20秒来处理具有30个并发用户的10k消息。
  • 在transactionIdPrefix之后,系统平均花费165秒处理30个并发用户的10k消息。

下面是我们的代码和配置。

@StreamListener("INPUT")
@SendTo("OUTPUT")
public void consumer(Message message){
Acknowledgement ack = messge.getHeaders().get(KafkaHeaders.ACKNOWLEDGEMENT,Acknowledgement.class))
try{
    String inputMessage = message.getPayload.toString();
    String enrichMessage = // Enrichment on inputMessage
    ack.acknowledgement()   
    return enrichMessage;
}catch( Exception exp){
    ack.acknowledgement();
    throw exp;  
}
}

配置是

  1. Spring。云流动Kafka。粘合剂交易transactionIdPrefix=TX-
  2. Spring。云流动Kafka。粘合剂交易制作人配置确认=全部
  3. Spring。云流动Kafka。粘合剂交易制作人配置重试次数=10次
  4. Spring。云流动Kafka。绑定。输入消费者autocommitofset=false
  5. Spring。云流动Kafka。绑定。输入消费者并发性=30
  6. Spring。云流动Kafka。绑定。输入消费者enableDlq=true
  7. Spring。云流动Kafka。绑定。输入消费者dlqName=错误。话题
  8. Spring。云流动Kafka。绑定。输入消费者自动提交错误=true
  9. Spring。云流动Kafka。绑定。输入消费者最大尝试=3
  10. Spring。云流动Kafka。粘合剂交易制作人配置使可能幂等性=真

我预计性能会有一点下降,但这看起来是一个巨大的性能差异。有没有人遇到过这样的问题,以及我们如何使用transactionIdPrefix属性提高性能的建议?

共有1个答案

齐向笛
2023-03-14

交易总是很昂贵;尤其是每次读取仅执行一次写入时。

使用功能模型Consumer)

public Consumer<List<Message>> () {
    messages -> {...}
}

这样,事务将适用于整个列表。

 类似资料:
  • 在我的ASP. net网站,我有一个连接到SQL服务器快速数据库。有时候我确实会犯很多错误,比如 系统。异常:超时已过期。从池中获取连接之前的超时时间。这可能是因为所有池连接都在使用中,并且达到了最大池大小。 搜索错误后,我发现可能是由于SQL Server连接未关闭。但是我已经正确地使用了SQL Server连接,并且正确地处理了它。我已使用using语句处理连接。在我的应用程序中,我在一天中的

  • 我正在对大小为50,000个元素的两个向量执行基于元素的操作,并且有不满意的性能问题(几秒钟)。是否存在明显的性能问题,例如使用不同的数据结构?

  • 这个练习很简单。首先,我们需要什么样的性能数据? CPU 使用情况: 它的负载如何? 哪些进程正在使用它? 内存使用情况: 使用了多少内存? 多少内存是空闲的? 多少内存用于缓存? 哪些进程消耗了它? 磁盘使用情况: 执行多少输入/输出操作? 由哪个进程? 网络使用情况: 传输了多少数据? 由哪个进程? 进程情况: 有多少进程? 他们在做什么 工作,还是等待什么? 如果在等待什么,它是什么呢?CP

  • 背景 APM 是应用性能监控的缩写。目前 APM 的主要功能着眼于分布式系统的性能诊断,其主要功能包括调用链展示,应用拓扑分析等。 Apache ShardingSphere 并不负责如何采集、存储以及展示应用性能监控的相关数据,而是将 SQL 解析与 SQL 执行这两块数据分片的最核心的相关信息发送至应用性能监控系统,并交由其处理。 换句话说,Apache ShardingSphere 仅负责产

  • 我一直在和来自Python的朱莉娅玩。我写了一些代码来解决这个难题。我在用蛮力的方法。代码在Python中运行需要一段时间(~30分钟),但在Julia中仍然需要大约3分钟。我确信这有点难看,因为我还在学习这门语言。我查看了优化器页面,在函数中输入了数据类型,并避免了全局变量。 与此相关的三个问题: > 肯定有更好的方法来做函数mymax。我找到了一些关于阿格马克斯的建议,但这给我带来了一个大麻烦