当前位置: 首页 > 工具软件 > See-KafKa > 使用案例 >

python kafka消费者超时参数_Python - Kafka:消费者失败(Python - Kafka: consumer failing)

吕博耘
2023-12-01

I have a simple Producer-Consumer setup: 1 producer(as a thread) and 2 consumers(as 2 processes). The run method of producer:

def run(self):

producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers,

api_version=(0, 10))

while not self.stop_event.is_set():

self.logger.info("Checking for new changes")

self.check_for_new_changes(producer)

self.logger.info("Sleeping for {minutes}

minutes...".format(minutes=self.time_to_sleep / 60))

time.sleep(self.time_to_sleep)

producer.close()

Basically it checks for changes, sends messages if new changes found and then goes to sleep for 5 minutes.

The run method:

def run(self):

if self.group_id:

consumer = KafkaConsumer(bootstrap_servers=self.bootstrap_servers,

consumer_timeout_ms=1000,

api_version=(0, 10),

group_id=self.group_id)

else:

consumer = KafkaConsumer(bootstrap_servers=self.bootstrap_servers,

consumer_timeout_ms=1000,

api_version=(0, 10))

consumer.subscribe(['new_change'])

while not self.stop_event.is_set():

for msg in consumer:

self.logger.info("New message:\n{msg}".format(msg=msg))

self.process_new_change(json.loads(msg.value))

if self.stop_event.is_set():

consumer.close()

return

consumer.close()

It seems to work fine but after running for a while I get these messages in the coordinator log:

[2017-12-17 02:06:40,639] INFO [GroupCoordinator 0]: Member kafka-python-1.3.5-f5cdcad3-bc1a-4623-a42b-f5de5e8bded1 in group meta_data_consumer has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)

[2017-12-17 02:06:40,659] INFO [GroupCoordinator 0]: Preparing to rebalance group meta_data_consumer with old generation 15 (__consumer_offsets-6) (kafka.coordinator.group.GroupCoordinator)

[2017-12-17 02:06:40,659] INFO [GroupCoordinator 0]: Group meta_data_consumer with generation 16 is now empty (__consumer_offsets-6) (kafka.coordinator.group.GroupCoordinator)

[2017-12-17 02:06:41,784] INFO [GroupCoordinator 0]: Member kafka-python-1.3.5-bdea8ce3-922f-4ee1-9959-13341e1730f5 in group failures_consumer has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)

[2017-12-17 02:06:41,785] INFO [GroupCoordinator 0]: Preparing to rebalance group failures_consumer with old generation 9 (__consumer_offsets-35) (kafka.coordinator.group.GroupCoordinator)

[2017-12-17 02:06:41,785] INFO [GroupCoordinator 0]: Group failures_consumer with generation 10 is now empty (__consumer_offsets-35) (kafka.coordinator.group.GroupCoordinator)

This kills my consumers and they stop running. I don't see any exceptions or errors in the consumer logs.

What might cause them to fail?

 类似资料: