当前位置: 首页 > 知识库问答 >
问题:

如何利用kafka流应用提高从kafka读到kafka转发的性能

宗鸿博
2023-03-14

我有一个带有1.0.0 Kafka stream API的Kafka stream应用程序。我有单个代理0.10.2.0kafka和单个分区的单个主题。除producer Request.Timeout.ms外,所有可配置参数都相同。我配置了producer request.timeout.ms(使用5分钟的时间)来修复Kafka Streams程序在产生问题时抛出异常。

有时context forward用更多的时间向kafka发送更少的记录,有时context forward用更少的时间向kafka发送更多的记录。

我试图通过增加max.poll.records和poll.ms值来提高阅读性能。但没有运气。

如何在阅读转发的同时提高性能?Kafka民意调查和前进将如何工作?哪些参数有助于提高性能?

acks = 1
batch.size = 16384
buffer.memory = 33554432
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
linger.ms = 100
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 240000
retries = 10
retry.backoff.ms = 100
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
transaction.timeout.ms = 60000
transactional.id = null
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
check.crcs = true
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
heartbeat.interval.ms = 3000
internal.leave.group.on.close = false
isolation.level = read_uncommitted
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 2147483647
max.poll.records = 10000
metadata.max.age.ms = 300000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
application.server =
buffered.records.per.partition = 1000
cache.max.bytes.buffering = 10485760
commit.interval.ms = 30000
connections.max.idle.ms = 540000
key.serde = null
metadata.max.age.ms = 300000
num.standby.replicas = 0
num.stream.threads = 1
poll.ms = 1000
processing.guarantee = at_least_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 1
request.timeout.ms = 40000
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
timestamp.extractor = null
value.serde = null
windowstore.changelog.additional.retention.ms = 86400000
zookeeper.connect =

共有1个答案

邢财
2023-03-14

您可以通过控制键和增加主题的分区数在您的操作中引入并行性。

以上将增加并行处理的Kafka流的数量。这可以通过增加Kafka streams应用程序的线程数来处理

 类似资料:
  • 我们运行在apache kafka 0.10.0. x和Spring 3. x上,不能使用Spring kafka,因为它支持Spring框架版本4. x。 因此,我们使用原生的Kafka Producer API来生成消息。 现在我关心的是我的制片人的表现。问题是我相信有人打电话给是真正连接到Kafka broker,然后将消息放入缓冲区,然后尝试发送,然后可能会调用。 现在,KafkaProd

  • 我想用kafka流实现请求-响应模式,我使用spring boot kafka,其中添加了一些数据作为报头,命名为关联id,但是当kafka流API处理请求消息时,报头数据会丢失,无法发送到响应主题!我该怎么解决,还是用另一种方法??

  • 我第一次在kafka中使用Node,使用Kafka-Node。使用消息需要调用外部API,这甚至可能需要一秒钟的时间来响应。我希望克服我的消费者的突然失败,这样,如果一个消费者失败了,另一个将替换它的消费者将收到相同的消息,即它的工作没有完成。 我正在使用Kafka0.10并尝试使用ConsumerGroup。 我想到了在options中设置,并且只在消息的工作完成后提交消息(就像我以前对一些Ja

  • 我写了Kafka流应用程序,我想把它部署在Kafka集群上。因此,我构建了一个jar文件,并使用以下命令运行它:

  • 我尝试使用Kafka流将一个带有String/JSON消息的主题转换为另一个作为Avro消息的主题。 并得到如下所示的异常: 这是正确的做法吗?我对Kafka溪流和阿夫罗是新来的

  • 主要内容:推特,LinkedIn,Netflix公司,Mozilla, OracleKafka支持许多最好的工业应用。 在本章中,我们将简要介绍一些Kafka最显着的应用。 推特 Twitter是一种在线社交网络服务,提供发送和接收用户推文的平台。 注册用户可以阅读和发布推文,但未注册的用户只能阅读推文。 Twitter使用Storm-Kafka作为其流处理基础设施的一部分。 LinkedIn 在LinkedIn上使用Apache Kafka来获取活动流数据和运营指标。 Kafk