我正在使用SpringKafka模板来生成消息。而且它生成消息的速度太慢了。生成 8 条消息大约需要 15000 分钟。
以下是我如何创建Kafka模板:
@Bean
public ProducerFactory<String, GenericRecord> highSpeedAvroProducerFactory(
@Qualifier("highSpeedProducerProperties") KafkaProperties properties) {
final Map<String, Object> kafkaPropertiesMap = properties.getKafkaPropertiesMap();
System.out.println(kafkaPropertiesMap);
kafkaPropertiesMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
kafkaPropertiesMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroGenericSerializer.class);
return new DefaultKafkaProducerFactory<>(kafkaPropertiesMap);
}
@Bean
public KafkaTemplate<String, GenericRecord> highSpeedAvroKafkaTemplate(
@Qualifier("highSpeedAvroProducerFactory") ProducerFactory<String, GenericRecord> highSpeedAvroProducerFactory) {
return new KafkaTemplate<>(highSpeedAvroProducerFactory);
}
以下是我如何使用模板发送消息:
@Async("servicingPlatformUpdateExecutor")
public void afterWrite(List<? extends Account> items) {
LOGGER.info("Batch start:{}",items.size());
for (Test test : items) {
if (test.isOmega()) {
ObjectKeyRecord objectKeyRecord = ObjectKeyRecord.newBuilder().setType("test").setId(test.getId()).build();
LOGGER.info("build start, {}",test.getId());
GenericRecord message = MessageUtils.buildEventRecord(
schemaService.findSchema(topicName)
.orElseThrow(() -> new OmegaException("SchemaNotFoundException", topicName)), objectKeyRecord, test);
LOGGER.info("build end, {}",account.getId());
LOGGER.info("send Started , {}",account.getId());
ListenableFuture<SendResult<String, GenericRecord>> future = highSpeedAvroKafkaTemplate.send(topicName, objectKeyRecord.toString(), message);
LOGGER.info("send Done , {}",test.getId());
future.addCallback(new KafkaProducerFutureCallback(kafkaSender, topicName, objectKeyRecord.toString(), message));
}
}
LOGGER.info("Batch end}");
}
生产者属性:
metric.reporters = []
metadata.max.age.ms = 300000
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [***VALID BROKERS****))]
ssl.keystore.type = JKS
sasl.mechanism = GSSAPI
max.block.ms = 9223372036854775807
interceptor.classes = null
ssl.truststore.password = null
client.id = producer-1
ssl.endpoint.identification.algorithm = null
request.timeout.ms = 30000
acks = all
receive.buffer.bytes = 32768
ssl.truststore.type = JKS
retries = 2147483647
ssl.truststore.location = null
ssl.keystore.password = null
send.buffer.bytes = 131072
compression.type = none
metadata.fetch.timeout.ms = 60000
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
buffer.memory = 800000000
timeout.ms = 30000
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
sasl.kerberos.service.name = kafka
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
block.on.buffer.full = false
ssl.key.password = null
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
max.in.flight.requests.per.connection = 10
metrics.num.samples = 2
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2]
batch.size = 40000000
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = SASL_SSL
max.request.size = 1048576
value.serializer = class com.message.serialization.AvroGenericSerializer
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
linger.ms = 2
下面是显示调用kakpatemplate send方法需要几毫秒的日志:
2018-04-27 05:29:05.691 INFO - testservice - - UpdateExecutor-1 - com.test.testservice.adapter.batch.testsyncjob.UpdateWriteListener:70 - build start, 1
2018-04-27 05:29:05.691 INFO - testservice - - UpdateExecutor-1 - com.test.testservice.adapter.batch.testsyncjob.UpdateWriteListener:75 - build end, 1
2018-04-27 05:29:05.691 INFO - testservice - - UpdateExecutor-1 - com.test.testservice.adapter.batch.testsyncjob.UpdateWriteListener:76 - send Started , 1
2018-04-27 05:29:05.778 INFO - testservice - - UpdateExecutor-1 - com.test.testservice.adapter.batch.testsyncjob.UpdateWriteListener:79 - send Done , 1
2018-04-27 05:29:07.794 INFO - testservice - - kafka-producer-network-thread | producer-1 - com.test.testservice.adapter.batch.testsyncjob.KafkaProducerFutureCallback:38
关于如何提高发件人性能的任何建议将不胜感激
Spring Kakfa版本:1.2.3 .发布Kafka客户端:0.10.2.1
更新 1:
将Serializer更改为ByteArraySerializer,然后生成相同的。我仍然看到kafketate上的每个send方法调用都需要100到200毫秒
ObjectKeyRecord objectKeyRecord = ObjectKeyRecord.newBuilder().setType("test").setId(test.getId()).build();
GenericRecord message = MessageUtils.buildEventRecord(
schemaService.findSchema(testConversionTopicName)
.orElseThrow(() -> new TestException("SchemaNotFoundException", testTopicName)), objectKeyRecord, test);
byte[] messageBytes = serializer.serialize(testConversionTopicName,message);
LOGGER.info("send Started , {}",test.getId());
ListenableFuture<SendResult<String, byte[]>> future = highSpeedAvroKafkaTemplate.send(testConversionTopicName, objectKeyRecord.toString(), messageBytes);
LOGGER.info("send Done , {}",test.getId());
future.addCallback(new KafkaProducerFutureCallback(kafkaSender, testConversionTopicName, objectKeyRecord.toString(), message));
你是否介绍了你的申请?e、 g.使用YourKit。
我怀疑是Avro序列化程序;我能够在274毫秒内发送15000条1000字节的消息。
@SpringBootApplication
public class So50060086Application {
public static void main(String[] args) {
SpringApplication.run(So50060086Application.class, args);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
Thread.sleep(5_000);
String payload = new String(new byte[999]);
StopWatch watch = new StopWatch();
watch.start();
for (int i = 0; i < 15_000; i++) {
template.send("so50060086a", "" + i + payload);
}
watch.stop();
System.out.println(watch.prettyPrint());
};
}
@Bean
public NewTopic topic() {
return new NewTopic("so50060086a", 1, (short) 1);
}
}
和
StopWatch '': running time (millis) = 274
Dokuwiki主题/模板制作 之前也有写过,不过已经删掉了,因为提到的方法什么的都过时了,很多内容也没说清楚。看评论时发现这方面的东西还是有人需要的,于是决定重新写一遍。对于主题/模板的制作,除了官方文档之外,也可以在符合 Dokuwiki 标准的 starter 模板基础上进行修改,不少不错的模板都是基于它的。和官方文档类似,这里以 Starter 模板为例。 先安装 Starter 模板,并
之前也有写过,不过已经删掉了,因为提到的方法什么的都过时了,很多内容也没说清楚。看评论时发现这方面的东西还是有人需要的,于是决定重新写一遍。对于主题/模板的制作,除了官方文档之外,也可以在符合 Dokuwiki 标准的 starter 模板基础上进行修改,不少不错的模板都是基于它的。和官方文档类似,这里以 Starter 模板为例。 先安装 Starter 模板,并将 lib/tpl/starte
我正在使用一个与kafka集成的spring boot应用程序,我想实现一个endpoint来停止和启动kafka发布消息。消息由另一个endpoint以异步方式触发。 豆子卡
模板文件:User/password_reset.html <form class="form-horizontal js-ajax-form" action="{:U('user/login/doforgot_password')}" method="post"> <label class="control-label" for="input_email">注册邮箱</label>
模板文件:User/forgot_password.html <form class="form-horizontal js-ajax-form" action="{:U('user/login/doforgot_password')}" method="post"> <label class="control-label" for="input_email">注册邮箱</label>
我对kafka制作人有问题。实际上我正在使用Spring kafka,并通过KafkaTemboard leke发送消息: 问题是有时发送消息需要 4-20 秒。有很多消息需要 100 毫秒才能发送。所以我有几个问题: > < li> 消息大小和吞吐量之间是否有关联,这种关系是什么? 我应该首先检查什么,也许我没有很好地调整,任何方向?