我用Kafka和spring-布特:
Kafka制作人班:
@Service
public class MyKafkaProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private static Logger LOGGER = LoggerFactory.getLogger(NotificationDispatcherSender.class);
// Send Message
public void sendMessage(String topicName, String message) throws Exception {
LOGGER.debug("========topic Name===== " + topicName + "=========message=======" + message);
ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topicName, message);
result.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
LOGGER.debug("sent message='{}' with offset={}", message, result.getRecordMetadata().offset());
}
@Override
public void onFailure(Throwable ex) {
LOGGER.error(Constants.PRODUCER_MESSAGE_EXCEPTION.getValue() + " : " + ex.getMessage());
}
});
}
}
Kafka-配置:
spring.kafka.producer.retries=0
spring.kafka.producer.batch-size=100000
spring.kafka.producer.request.timeout.ms=30000
spring.kafka.producer.linger.ms=10
spring.kafka.producer.acks=0
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.max.block.ms=5000
spring.kafka.bootstrap-servers=192.168.1.161:9092,192.168.1.162:9093
问题:
我有一个主题的5个分区,比方说my-topic
。
发生的情况是,我获得成功(即消息成功发送到Kafka)日志,但是topicmy-topic
的无分区的偏移量增加。
正如您在上面看到的,我添加了日志onsuccess
和onfailure
。我所期望的是,当Kafka不能发送消息给Kafka时,我应该得到一个错误,但在这种情况下,我没有收到任何错误消息。
Kafka的上述行为以100:5
的比例(即每向Kafka发送100条成功消息后)发生。
编辑1:为成功案例添加Kafka producer日志(即在消费者端成功接收消息)
ProducerConfig - logAll:180] ProducerConfig values:
acks = 0
batch.size = 1000
block.on.buffer.full = false
bootstrap.servers = [10.20.1.19:9092, 10.20.1.20:9093, 10.20.1.26:9094]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 10
max.block.ms = 5000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.fetch.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.ms = 50
request.timeout.ms = 60000
retries = 0
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
timeout.ms = 30000
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
2017-10-24 14:30:09, [INFO] [karma-unified-notification-manager - ProducerConfig - logAll:180] ProducerConfig values:
acks = 0
batch.size = 1000
block.on.buffer.full = false
bootstrap.servers = [10.20.1.19:9092, 10.20.1.20:9093, 10.20.1.26:9094]
buffer.memory = 33554432
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 10
max.block.ms = 5000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.fetch.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.ms = 50
request.timeout.ms = 60000
retries = 0
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
timeout.ms = 30000
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
它没有显示错误,因为您已将spring.kafka.producer.acks设置为0。将其设置为1,您的回调函数就可以工作了。然后您可以查看偏移是否正在增加。
你的代码对我来说很好...
@SpringBootApplication
public class So46892185Application {
public static void main(String[] args) {
SpringApplication.run(So46892185Application.class, args);
}
private static final Logger LOGGER = LoggerFactory.getLogger(So46892185Application.class);
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
for (int i = 0; i < 10; i++) {
send(template, "foo" + i);
}
};
}
public void send(KafkaTemplate<String, String> template, String message) {
ListenableFuture<SendResult<String, String>> result = template.send(topic().name(), message);
result.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
LOGGER.info("sent message='{}'"
+ " to partition={}"
+ " with offset={}", message, result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
}
@Override
public void onFailure(Throwable ex) {
LOGGER.error("Ex : " + ex.getMessage());
}
});
}
@Bean
public NewTopic topic() {
return new NewTopic("so46892185-3", 5, (short) 1);
}
}
结果
2017-10-23 11:12:05.907 INFO 86390 --- [ad | producer-1] com.example.So46892185Application
: sent message='foo3' to partition=1 with offset=0
2017-10-23 11:12:05.907 INFO 86390 --- [ad | producer-1] com.example.So46892185Application
: sent message='foo8' to partition=1 with offset=1
2017-10-23 11:12:05.907 INFO 86390 --- [ad | producer-1] com.example.So46892185Application
: sent message='foo1' to partition=2 with offset=0
2017-10-23 11:12:05.907 INFO 86390 --- [ad | producer-1] com.example.So46892185Application
: sent message='foo6' to partition=2 with offset=1
2017-10-23 11:12:05.907 INFO 86390 --- [ad | producer-1] com.example.So46892185Application
: sent message='foo0' to partition=0 with offset=0
2017-10-23 11:12:05.907 INFO 86390 --- [ad | producer-1] com.example.So46892185Application
: sent message='foo5' to partition=0 with offset=1
2017-10-23 11:12:05.907 INFO 86390 --- [ad | producer-1] com.example.So46892185Application
: sent message='foo4' to partition=3 with offset=0
2017-10-23 11:12:05.907 INFO 86390 --- [ad | producer-1] com.example.So46892185Application
: sent message='foo9' to partition=3 with offset=1
2017-10-23 11:12:05.907 INFO 86390 --- [ad | producer-1] com.example.So46892185Application
: sent message='foo2' to partition=4 with offset=0
2017-10-23 11:12:05.907 INFO 86390 --- [ad | producer-1] com.example.So46892185Application
: sent message='foo7' to partition=4 with offset=1
我有一个ReactorKafka项目,它消耗来自Kafka主题的消息,转换消息,然后写入到另一个主题。 我的理解是,只有在Reactor中成功完成所有顺序步骤后,才会提交偏移量。对吗?我想确保不会处理下一条记录,除非当前记录成功发送到目标Kafka主题。
为什么实际主题中的偏移值与同一主题中的偏移值不同?PFB偏移位置以及使用的命令。 我错过了什么?
我目前正在从具有特定偏移量的主题中获取消息。我正在使用寻求()来实现它。但是当我将enable.auto.commit设置为true或使用手动同步(委托同步()/委托同步())时,Seek()不起作用,因为它没有轮询来自特定偏移量的消息,而是从最后提交的偏移量中选择。 因此,在使用Seek()时,是否必须将偏移量存储在外部DB中,而不提交给Kafka?Seek和Commit不能并行工作吗? 客户端
我是流媒体代理(如Kafka)的新手,来自排队消息系统(如JMS、Rabbit MQ)。 我从Kafka文档中读到,消息作为记录存储在Kafka分区的偏移量中。消费者从偏移量读取。 消息和记录有什么区别[多个/部分消息是否构成记录?] 当消费者从偏移量读取时,消费者是否有可能读取部分消息?消费者是否需要基于某种逻辑将这些对等消息串起来? 或 1条消息=1条记录=1个偏移量 之所以会出现这个问题,是
本文向大家介绍Kafka 偏移量的演变清楚吗?相关面试题,主要包含被问及Kafka 偏移量的演变清楚吗?时的应答技巧和注意事项,需要的朋友参考一下 我在[《Apache Kafka消息格式的演变(0.7.x~0.10.x)》文章中介绍了 Kafka 几个版本的消息格式。仔细的同学肯定看到了在 MessageSet 中的 Message 都有一个 Offset 与之一一对应,本文将探讨 Kafka各
我正在使用Kafka2.0版和java消费者API来消费来自一个主题的消息。我们使用的是一个单节点Kafka服务器,每个分区有一个使用者。我注意到消费者正在丢失一些消息。场景是:消费者投票主题。我为每个线程创建了一个消费者。获取消息并将其交给处理程序来处理消息。然后使用“至少一次”的Kafka消费者语义来提交Kafka偏移量来提交偏移量。同时,我有另一个消费者使用不同的group-id运行。在这个