我是新的Kafka和尝试实现一个简单的生产者,发送数据到一个主题。如果主题不存在,我希望将sutiation作为异常处理。
private Producer<UUID, Object> producer = createProducer();
private static Producer createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"mybootstrapserveraddress");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "ADAPTER");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
UUIDSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,
1000);
props.put(ProducerConfig.RETRIES_CONFIG,
1);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG,
1000);
return new KafkaProducer<>(props);
}
public void send(Event event, String topic){
try {
UUID key = UUID.randomUUID();
producer.send(new ProducerRecord<>(topic, key , event), (rm, ex) -> {
if (ex != null) {
log.warn("Error sending message with key {}\n{}", new Object[]{key, ex.getMessage()});
} else {
log.info( "Partition for key-value {} is {}", new Object[]{key, rm.partition()});
}
});
} catch (Exception e) {
log.error("Failed to send message ",e);
} finally {
producer.flush();
}
}
但是,如果主题不存在,则继续轮询消息。将忽略ProducerConfig的超时和重试。
[kafka-producer-network-thread | ADAPTER] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=ADAPTER] Error while fetching metadata with correlation id 6 : {my-example-topic2=UNKNOWN_TOPIC_OR_PARTITION}
[kafka-producer-network-thread | ADAPTER] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=ADAPTER] Error while fetching metadata with correlation id 7 : {my-example-topic2=UNKNOWN_TOPIC_OR_PARTITION}
[kafka-producer-network-thread | ADAPTER] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=ADAPTER] Error while fetching metadata with correlation id 8 : {my-example-topic2=UNKNOWN_TOPIC_OR_PARTITION}
有办法解决这个问题吗?
当主题不存在时,获取元数据的重试应该在默认情况下60秒后结束,在结束时引发超时异常。与之相关的生产者配置参数是max.block.ms
(默认值60000)。据我所知,没有比减少这种超时或使用AdminClient(这是您不想做的事情)更早获得反馈的方法了。
我指定“kafka_auto_create_topics_enable:'false'”是因为我想手工创建主题,所以我进入第一个broker容器并键入以下内容: ./kafka-topics.sh--创建--zookeeper 172.19.0.2:2181--主题test1--分区4--复制-因子3 看起来一切都很好: ./kafka-topics.sh--list--zookeeper 172
假设我有多个设备。每个设备都有不同类型的传感器。现在我要把每个传感器的每个设备的数据发送给Kafka。但我对Kafka的主题感到困惑。用于处理此实时数据 null 情况2:向一个主题发送数据 设备1(传感器A,B,C),设备2(传感器A,B,C)...设备....->主题 > 这不是数据瓶颈吗。因为它将表现为队列,来自某个传感器的数据将在队列中落后,并且不会被实时处理。 设备1 ->传感器A-TO
我想使用spring cloud stream framework创建一个kafkaendpoint,它将有一个http post api到。如何动态更改属性 我可以使用实现来实现上述功能,但不知道是否有可能在Spring中开发此功能。
下面给出的kafka producer程序不是通过Eclipse在Windows中运行的,而是在Unix平台上运行的(即,当我在承载kafka代理的Unix中运行它时,它工作正常)。windows不支持Kafka制作人吗?但是,我可以从windows计算机ping ip地址。请帮忙。 这是我得到的异常错误。 log4j:WARN找不到记录器的附加程序(kafka.utils.VerifiableP
我正在处理xml,我需要每条记录发送一条消息,当我收到最后一条记录时,我关闭了kafka生产者,这里的问题是kafka生产者的发送方法是异步的,因此,有时当我关闭生产者时,它会拖曳我在某个地方读到过,我可以让制片人敞开心扉。我的问题是:这意味着什么,或者是否有更好的解决方案。 -编辑- 想象以下场景: 我们阅读标签并创建kafka生产者 对于每个元素,我们读取其属性,生成一个json对象并使用se
我们有一个带有三个代理(节点ID 0、1、2)的kafka集群和一个带有三个节点的zookeeper设置。