我在单节点中运行kafka,我想看到kafka生产者行为,当我关闭我的kafka代理,然后我在几秒钟内重新启动我的代理,所以我创建Spring启动程序,我可能会发送1000个客户JSON对象并打印每次发送的JSON对象的偏移量。我的应用程序工作正常,但是当我关闭我的kafka代理并在几秒钟后我重新启动我的代理时,我的生产者返回以正常发送来自最新偏移量的对象。对于我的例子,当偏移量= 983在控制台中,我下降我的kafka,当我再次启动我的经纪人时,我看到kafka开始从偏移量= 984发送消息,但我发现3或4条消息丢失了此消息错误!!!
983
offset acked
2019-04-01 16:20:34.635 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2019-04-01 16:20:34.638 INFO 7656 --- [ntainer#0-0-C-1] o.a.kafka.clients.FetchSessionHandler : [Consumer clientId=consumer-2, groupId=jsa-group] Error sending fetch request (sessionId=1910675333, epoch=31) to node 0: org.apache.kafka.common.errors.DisconnectException.
2019-04-01 16:20:34.689 WARN 7656 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-2, groupId=jsa-group] Connection to node 0 could not be established. Broker may not be available.
2019-04-01 16:20:34.844 WARN 7656 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-2, groupId=jsa-group] Connection to node 0 could not be established. Broker may not be available.
2019-04-01 16:20:35.050 WARN 7656 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-2, groupId=jsa-group] Connection to node 0 could not be established. Broker may not be available.
2019-04-01 16:20:35.557 WARN 7656 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-2, groupId=jsa-group] Connection to node 0 could not be established. Broker may not be available.
.........
dali 36
2019-04-01 16:20:55.592 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:55.594 ERROR 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Offset commit failed on partition jsa-kafka-topic-0 at offset 984: This is not the correct coordinator.
2019-04-01 16:20:55.594 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2019-04-01 16:20:55.696 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:55.696 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
984
offset acked
985
offset acked
986
offset acked
987
offset acked
988
offset acked
989
offset acked
990
offset acked
991
offset acked
992
offset acked
993
offset acked
994
offset acked
995
offset acked
996
offset acked
997
offset acked
998
offset acked
999
offset acked
1000
offset acked
1001
offset acked
1002
offset acked
1003
offset acked
1004
offset acked
2019-04-01 16:20:55.799 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:55.803 ERROR 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Offset commit failed on partition jsa-kafka-topic-0 at offset 984: This is not the correct coordinator.
2019-04-01 16:20:55.803 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2019-04-01 16:20:55.904 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:55.904 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2019-04-01 16:20:56.006 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:56.010 ERROR 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Offset commit failed on partition jsa-kafka-topic-0 at offset 984: This is not the correct coordinator.
2019-04-01 16:20:56.011 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2019-04-01 16:20:56.111 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:56.111 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2019-04-01 16:20:56.213 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:56.215 ERROR 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Offset commit failed on partition jsa-kafka-topic-0 at offset 984: This is not the correct coordinator.
2019-04-01 16:20:56.215 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2019-04-01 16:20:56.317 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:56.317 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2019-04-01 16:20:56.419 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:56.422 ERROR 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Offset commit failed on partition jsa-kafka-topic-0 at offset 984: This is not the correct coordinator.
2019-04-01 16:20:56.422 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2019-04-01 16:20:56.523 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:56.523 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
dali 37
1005
offset acked
2019-04-01 16:20:56.625 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:56.628 ERROR 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Offset commit failed on partition jsa-kafka-topic-0 at offset 984: This is not the correct coordinator.
2019-04-01 16:20:56.628 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2019-04-01 16:20:56.729 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:56.729 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2019-04-01 16:20:56.831 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:56.834 ERROR 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Offset commit failed on partition jsa-kafka-topic-0 at offset 984: This is not the correct coordinator.
2019-04-01 16:20:56.834 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2019-04-01 16:20:56.936 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:56.936 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2019-04-01 16:20:57.037 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:57.040 ERROR 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Offset commit failed on partition jsa-kafka-topic-0 at offset 984: This is not the correct coordinator.
2019-04-01 16:20:57.040 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2019-04-01 16:20:57.140 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:57.141 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2019-04-01 16:20:57.245 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:57.252 ERROR 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Offset commit failed on partition jsa-kafka-topic-0 at offset 984: This is not the correct coordinator.
2019-04-01 16:20:57.252 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2019-04-01 16:20:57.358 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:57.358 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2019-04-01 16:20:57.460 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:57.466 ERROR 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Offset commit failed on partition jsa-kafka-topic-0 at offset 984: This is not the correct coordinator.
2019-04-01 16:20:57.467 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2019-04-01 16:20:57.567 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:57.568 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
dali 38
1006
offset acked
2019-04-01 16:20:57.671 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:57.681 ERROR 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Offset commit failed on partition jsa-kafka-topic-0 at offset 984: This is not the correct coordinator.
2019-04-01 16:20:57.681 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2019-04-01 16:20:57.785 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:57.786 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2019-04-01 16:20:57.889 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:57.896 ERROR 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Offset commit failed on partition jsa-kafka-topic-0 at offset 984: The coordinator is loading and hence can't process requests.
2019-04-01 16:20:58.007 ERROR 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Offset commit failed on partition jsa-kafka-topic-0 at offset 984: The coordinator is loading and hence can't process requests.
2019-04-01 16:20:58.125 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Revoking previously assigned partitions [jsa-kafka-topic-0]
2019-04-01 16:20:58.125 INFO 7656 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked: [jsa-kafka-topic-0]
2019-04-01 16:20:58.125 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] (Re-)joining group
2019-04-01 16:20:58.149 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Successfully joined group with generation 31
2019-04-01 16:20:58.149 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Setting newly assigned partitions [jsa-kafka-topic-0]
2019-04-01 16:20:58.154 INFO 7656 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [jsa-kafka-topic-0]
dali 39
1007
offset acked
dali 40
1008
offset acked
dali 41
1009
offset acked
dali 42
1010
offset acked
dali 43
1011
offset acked
//...continue to send normally
为什么我的offset=1004和offset=1005和d offset=1006的消息以及offset=983的第一条消息(当我关闭我的代理时)在我再次启动我的kafka代理时无法发送。
//我的主要类
public static void main(String[] args) {
SpringApplication.run(SpringKafkaSendConsumeJavaObjectApplication.class, args);
}
@Bean
ApplicationRunner run(CustomerRepository personRepository) {
return args -> {
List<Customer> list = new ArrayList<>(100);
for (int i = 0; i < 100; i++) {
list.add(new Customer("dali "+i, 25, "homme"));
Runnable runnable = new MyRunnable(list,producer);
runnable.run();
};
}
这是我的运行方法,我每隔一秒发送一个客户。
public class MyRunnable implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(MyRunnable.class);
private List<Customer> customers;
private KafkaProducer kafkaProducer;
public MyRunnable(List<Customer> customers,KafkaProducer kafkaProducer) {
this.customers = customers.stream().collect(Collectors.toList());
this.kafkaProducer = kafkaProducer;
}
@Override
public void run() {
customers.forEach(customer -> {
System.out.println(customer.getName());
kafkaProducer.send(customer);
try {
Thread.sleep(1000);//1second
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
//这是我的发送方法
@Service
public class KafkaProducer {
private static final Logger LOGGER=LoggerFactory.getLogger(KafkaProducer.class);
@Autowired
private KafkaTemplate<String, Customer> kafkaTemplate;
@Value("${jsa.kafka.topic}")
String kafkaTopic = "jsa-test";
@Async
public void send(Customer customer) {
//LOGGER.info("sending data= '{}' " , customer);
ListenableFuture<SendResult<String,Customer>> listenableFuture = kafkaTemplate.send(kafkaTopic ,customer);
listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String,Customer>>() {
@Override
public void onFailure(Throwable throwable) {
System.out.println(throwable.getMessage());
}
@Override
public void onSuccess(final SendResult<String, Customer> message) {
System.out.println(message.getRecordMetadata().offset());
System.out.println("offset acked");
}
});
}
//这是我在application.properties中的kafka配置
#Kafka Cluster
jsa.kafka.bootstrap-servers=localhost:9092
#consumer group id
jsa.kafka.consumer.group-id=jsa-group
#topic name
jsa.kafka.topic=jsa-kafka-topic
#server port
server.port=9000
#reconnect.backoff.ms=10000
#Integer.MAX_VALUE
retries=2147483647
retry.backoff.ms=1000
#5 minutes
request.timeout.ms=305000
#Integer.MAX_VALUE
max.block.ms=2147483647
你要么需要等待Future
Kafka制片人。flush()Javadocs
/**
* Invoking this method makes all buffered records immediately available to send (even if <code>linger.ms</code> is
* greater than 0) and blocks on the completion of the requests associated with these records. The post-condition
* of <code>flush()</code> is that any previously sent record will have completed (e.g. <code>Future.isDone() == true</code>).
* A request is considered completed when it is successfully acknowledged
* according to the <code>acks</code> configuration you have specified or else it results in an error.
* <p>
* Other threads can continue sending records while one thread is blocked waiting for a flush call to complete,
* however no guarantee is made about the completion of records sent after the flush call begins.
* <p>
* This method can be useful when consuming from some input system and producing into Kafka. The <code>flush()</code> call
* gives a convenient way to ensure all previously sent messages have actually completed.
* <p>
* This example shows how to consume from one Kafka topic and produce to another Kafka topic:
* <pre>
* {@code
* for(ConsumerRecord<String, String> record: consumer.poll(100))
* producer.send(new ProducerRecord("my-topic", record.key(), record.value());
* producer.flush();
* consumer.commit();
* }
* </pre>
*
* Note that the above example may drop records if the produce request fails. If we want to ensure that this does not occur
* we need to set <code>retries=<large_number></code> in our config.
* </p>
* <p>
* Applications don't need to call this method for transactional producers, since the {@link #commitTransaction()} will
* flush all buffered records before performing the commit. This ensures that all the {@link #send(ProducerRecord)}
* calls made since the previous {@link #beginTransaction()} are completed before the commit.
* </p>
*
* @throws InterruptException If the thread is interrupted while blocked
*/
@Override
public void flush() {
docker合成文件:
Kafka客户:0.11.0.0-cp1Kafka经纪人: 在Kafka broker滚动重启时,我们的应用程序在发送到broker时丢失了一些消息。我相信滚动重启不应该丢失任何信息。以下是我们正在使用的生产者(将生产者与异步发送()一起使用,而不使用回调/未来等)设置: 我在日志中看到了这些例外 但日志显示重试尝试离开了,我很好奇为什么它没有重试呢?如果有人有任何想法,请告诉我?
我有两个< code>kafka 0.10.1的代理集群,之前在我的开发服务器上正确运行< code>zookeeper 3.3.6。 我最近尝试将broker版本升级到最新的,但没有开始。配置没有太大变化 谁能告诉我可能会出什么问题吗。为什么经纪人没有起步? 已更改服务器。代理服务器1上的属性 已更改代理服务器2上的server.properties 注意: 1.Zookeeper正在两台服务器
Kafka初学者和融合包。我想启动多个代理,以便了解主题。可以通过此设置完成- 此设置可以在服务器配置文件中定义,也可以在脚本中定义。 但是我该如何运行它们呢?。如果我只是向引导服务器添加多个endpoint,就会出现以下错误: Java . lang . illegalargumentexception:要求失败:每个侦听器必须有不同的名称,listeners: PLAINTEXT://:909
我们的集群中有带有 Ambari GUI 的 Hadoop 集群版本 2.6.4,我们有 3 台 Kafka 机器,它们是独立的机器,而 3 台 Zookeper 服务器安装在其他机器上 - master01/02/03 其中一台Kafka机器出现了一个奇怪的问题,而其他Kafka设备没有这个问题 问题是,当我们在几分钟后启动Kafka经纪人时,它会崩溃 以下是日志: 出自Kafka.呃 从日志的
亲爱的朋友和大学 presto协调器不具有高可用性 但最让我们担心的是,我们在presto workers上看不到任何自动启动配置 它的意思是,如果一个presto worker是关闭的,那么(从我的理解)presto没有能力识别这个发出的,并在一段时间后启动presto worker