如何在Kafka中发送同步消息
实现这一点的一种方法是设置properties参数max.in。航班请求。每连接=1
。
但是我想知道是否有一种甚至直接或替代的方式在Kafka中发送同步消息。(比如producer.sync发送(...)等等)。
蒂洛提出的答案是正确的。总的来说,你建议使用max.In。航班请求。每connection=1用于启用仍然重试,但不会丢失消息顺序。它不适合有同步制作人。
正如Thilo所建议的,您可以调用Future#get
来阻止,直到发送完成。然而,您可能会遇到一些性能问题,因为当生产者队列中有batch.size
元素时,当大小为buffer.memory
的缓冲区已满或在<--plhd--2之后,生产者开始发送/>
毫秒。
如果有有限的线程推送到kafka,则必须等待max.block。ms
每次发送您的信息。因此,在某些情况下,您更喜欢使用:
// send message to producer queue
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(topic, key, message));
// flush producer queue to spare queuing time
producer.flush();
// throw error when kafka is unreachable
future.get(10, TimeUnit.SECONDS);
生产者API从发送返回一个Future
。您可以调用Future#get
来阻止,直到发送完成。
参见Javadocs中的示例:
如果要模拟简单的阻塞调用,可以立即调用get()方法:
byte[] key = "key".getBytes();
byte[] value = "value".getBytes();
ProducerRecord<byte[],byte[]> record =
new ProducerRecord<byte[],byte[]>("my-topic", key, value)
producer.send(record).get();
主要内容:1 invokeOneway单向发送,1.1 invokeOnewayImpl单向调用,2 sendMessageSync同步发送,2.1 invokeSync同步调用,3 sendMessageAsync异步发送消息,3.1 invokeAsync异步调用,3.2 onExceptionImpl异常处理,4 NettyClientHandler处理服务端消息,4.1 processResponseCommand处理响应,基于RocketMQ release-4.9.3,深入的介绍了P
我有一个应用程序,它定期生成原始JSON消息数组。我能够使用avro-tools将其转换为Avro。我这样做是因为由于Kafka-Connect JDBC接收器的限制,我需要消息包含模式。我可以在记事本上打开这个文件,看到它包括模式和几行数据。 现在,我想将其发送到我的中央Kafka代理,然后使用Kafka Connect JDBC接收器将数据放入数据库。我很难理解我应该如何将这些Avro文件发送
我遇到了以下关于从生产者同步发送。我知道上下文生产者中的异步机制 在此将来调用get()将阻塞,直到相关请求完成,然后返回记录的元数据或引发发送记录时发生的任何异常。 什么是真正的意思相关联的请求完成,我是相当这不是指完整的请求,但在什么程度上这个短语是指?直到经纪人?直到生产者等使用的缓冲区...? 当ack=all与同步生产者和异步生产者一起使用时,它有什么不同?两个场景都被阻塞以进行确认?
我目前正在探索Kafka,作为一个简单问题的初学者。 将有一个生产者向一个主题推送消息,但将有n个spark应用程序的消费者从kafka发送消息并插入到数据库中(每个消费者插入到不同的表中)。 是否有可能消费者会不同步(例如消费者的某些部分会停机很长一段时间),然后一个或多个消费者不会处理消息并插入到表中? 假设代码总是正确的,在按摩数据时不会出现异常。重要的是每条消息只处理一次。 我的问题是,K
我正在将SpringWebSocket 4.2.4与sockjs和stomp一起使用,并试图在异步任务中从服务器向所有订阅者发送消息,但运气不佳 我的班级是: 但是订阅者没有得到消息 有什么帮助吗?我做错了什么:( *编辑* 我的消息代理: 当我订阅时: 谢谢 **编辑2:** 谢谢你帮我解决这个问题:)
如何使用新的Spring Cloud Stream Kafka功能模型发送消息? 不推荐的方式是这样的。 但是我如何以函数式风格发送消息呢? 应用yml公司 我会自动连接MessageChannel,但对于process、process-out-0、output或类似的东西,没有MessageChannel Bean。或者我可以用供应商Bean发送消息吗?谁能给我举个例子吗?谢谢!