我在一个视频教程中看到,当制作人发布消息时,Kafka Broker支持3种类型的确认。
0-开火并忘记
1-领导确认
2-确认所有经纪人
我正在使用Kafka的Java API发布消息。这是必须为每个使用服务器的代理设置的吗。每个经纪人的特定属性,还是必须由制作人设定?如果必须由制作人设置,请解释如何使用Java API设置。
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
public class KafkaProducerApp {
public static void main(String[] args){
Properties properties = new Properties();
properties.put("bootstrap.servers","localhost:9092,localhost:9093,localhost:9094");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);
try{
for(int i=0;i<150;i++) {
RecordMetadata ack = kafkaProducer.send(new ProducerRecord<String, String>("replicated_topic", Integer.toString(i), "MyMessage" + Integer.toString(i))).get();
System.out.println(" Offset = " + ack.offset());
System.out.println(" Partition = " + ack.partition());
}
} catch (Exception ex){
ex.printStackTrace();
} finally {
kafkaProducer.close();
}
}
}
我认为你应该了解acks属性实际做了什么,看看幕后。如果可以的话,你会看到这个属性是由生产者配置的。
例如,你不能丢失任何信息,比如审计日志。下面是我们如何启动producer配置的代码:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("acks", "all"); //We are using acks=all in order to get the strongest guarantee we can.
props.put("retries", "3");
props.put("max.in.flight.requests.per.connection", "5");
这是一个微小但强大的变化,它对消息是否会到达有重大影响。
这张图片来自《Kafka在行动》一书,该书对acks
property的描述更加清晰:
它是生产者属性,设置与代码中的其他属性类似:
properties.put("acks","all");
可以在此处找到所有可配置生产者属性的列表。
您可能还想查看broker(或topic)属性min.insync。与此生产者配置相关的副本
。
假设我有多个设备。每个设备都有不同类型的传感器。现在我要把每个传感器的每个设备的数据发送给Kafka。但我对Kafka的主题感到困惑。用于处理此实时数据 null 情况2:向一个主题发送数据 设备1(传感器A,B,C),设备2(传感器A,B,C)...设备....->主题 > 这不是数据瓶颈吗。因为它将表现为队列,来自某个传感器的数据将在队列中落后,并且不会被实时处理。 设备1 ->传感器A-TO
问题内容: 使用以下代码,我发送Elasticsearch文档以进行索引。我尝试将基本对象转换为JSON并通过生产者发送。但是,每条消息(从控制台检查)都附加了乱码,例如 - t。{“ productId”:2455 出站配置 有什么线索吗? 使用的插件:Spring Extension Kafka 问题答案: 我今天遇到了这个问题,可以通过在生产者配置中设置正确的value-serializer
我有一个服务器a,在服务器a中,我安装了kafka并启动了kafka和Zookeeper。我还创建了一个主题作为my_topic。现在我有一个应用程序B运行在服务器B中,应用程序B有一些数据,我想把这些数据推送到服务器A中的my_topic。我是否也需要在服务器B中安装kafka并在服务器B中创建一个生产者?如果是,如何将来自服务器B的消息推送到服务器A中的主题?介质是什么?
我正在尝试为Kafka制作人创建一个简单的捆绑包,在apache Karaf版本4.0.3中。 这是我的Java代码 我已经在pom.xml中明确声明了各自的依赖关系 我也部署了那个kafka客户端包。 但在启动生成器时,我看到以下第一次尝试时的异常。 然后连续这个。。。 有没有人对捆绑包提出类似的问题?
我正在考虑创建一个独立的Kafka生产者,它作为守护进程运行,通过套接字接收消息,并将其可靠地发送给Kafka。 但是,我决不能是第一个想到这个想法的人。这样做的目的是避免使用PHP或Node编写Kafka生成器,而只是通过套接字将消息从这些语言传递到独立的守护进程,这些语言负责传递,而主应用程序则一直在做自己的事情。 此守护进程应负责在发生中断时进行重试传递,并充当服务器上运行的所有程序的传递点
我想让我的Kafka制作人变得富有交易性。我正在发送10条消息。如果发生任何错误,则不应向Kafka发送任何消息,即无或全部。 我使用的是Spring Boot KafkaTemplate。 我正在发送文件中提到的10条信息,如下所示。应发送9条消息,且I消息大小超过1MB,由于 https://docs.spring.io/spring-kafka/reference/html/#using-K