read tcp :49560->:9092: i/o timeout under sarama kafka golang panic

袁枫涟
2023-12-01

我们经常出现这个问题,除了上次修复完后,正常跑也总有这种问题。因为我们的场景是文件扫描,文件扫描的处理方式是很重的,消费时间长。所以经常触发重平衡。

那么又回到了原始问题。第一性原理,运维和架构,就是要做工程上的最佳实践,而不是什么技术新旧。最佳实践就是最佳适配。

kafka的consumer重平衡机制,注定了它不适合做长逻辑耗时业务的处理。(它的背景本身是无逻辑处理,只是传输日志)

 -----------------------------------------------------------------

20220419复现:

参数

config.Consumer.Group.Session.Timeout = time.Second * 120

config.Consumer.Group.Heartbeat.Interval = time.Second * 20

调整成
config.Consumer.Group.Session.Timeout = time.Second * 30  config.Consumer.Group.Heartbeat.Interval = time.Second * 5
就复现:
{"Level":"panic","Time":"2022-04-19T20:01:14.874530508+08:00","LoggerName":"","Message":
"Error from consumer: read tcp :33178-\u003e:9093: i/o timeout",
"Caller":{"Defined":true,"PC":8910019,"File":"/data/share//golang/cloudscan/pubsub/groupconsumer.go","Line":147,
"Function":"cloudscan/pubsub.(*GroupConsumer).StartConsume.func1"},"Stack":"cloudscan/pubsub.(*GroupConsumer).StartConsume.func1\n\t
/data/share//golang/cloudscan/pubsub/groupconsumer.go:147"

----------------------------------------------------------------

为了应对这种情况,我们调大了Sarama 的参数config.Consumer.Group.Session.Timeout = time.Second * 120,也就是心跳超时时间,但我们的网络超时时间很小,默认30秒,30秒我们的场景,文件扫描消费时间长,30秒是可能处理不完数据的。最终配置:

config.Consumer.Group.Session.Timeout = time.Second * 120   // c.Net.ReadTimeout (default 30 * time.Second) to > config.Consumer.Group.Session.Timeout(120)  // https://github.com/Shopify/sarama/issues/1422  config.Net.ReadTimeout = config.Consumer.Group.Session.Timeout + 30*time.Second   config.Consumer.Group.Heartbeat.Interval = time.Second * 20  config.Consumer.MaxProcessingTime = time.Minute * 10 

日志报:

Jan 29 13:41:29 csmeta[21871]: panic: Error from dir consumer: read tcp :50626->:9092: i/o timeout
Jan 29 13:40:05  csscand[27300]: panic: Error from file consumer: read tcp :49560->:9092: i/o timeout

看起来是io错误,第一定位到磁盘?server端? 掉坑里

在仔细看,tcp网络错误,地位网络?又掉坑里

实际可能是golang context 相关错误,和sarama consumer算法相关。错误在客户端。

源码地址:https://github.com/Shopify/sarama/blob/v1.19.0/consumer_group.go#L18

而且这个context相关的报tcp i/o time out错误。其他工程也有:

https://github.com/jackc/pgx/issues/831

I am on the same situation described by @adw1n and @atombender. I can't gracefully handle a good amount of requests with context cancelled because pgx returns an "i/o timeout" which is not really a network timeout. The helper function mentioned by @jackc doesn't help in this situation

https://github.com/jackc/pgconn/issues/80

Context expiration is implemented by SetDeadline on the underlying net.Conn. This means that the error returned from a query can be the underlying net error instead of the context error. This is confusing for callers.

See jackc/pgx#831.

sarama github相关讨论:

https://github.com/Shopify/sarama/issues/1192

Versions

Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.
Sarama Version: v1.19.0
Kafka Version: v2.0
Go Version: v1.11

Problem Description

Getting i/o timeout error from broker when using consumer group.

Error:          Received unexpected error read tcp <ip>:63133-><ip>:9092: i/o timeout

Error is returned from https://github.com/Shopify/sarama/blob/master/broker.go#L592

Note: same kafka cluster is working fine when not using consumer group

I experienced perhaps the same problem. One consumer instance using the ConsumerGroup API is fine, but when I tried to start another instance, the other instance got the same error and could not join the group.

Sarama v1.19.0
Kafka 1.1.0
Go v1.10.3

Solved the problem by watching ConsumerGroupSession.Context().Done() as well in ConsumeClaim.
The comment at https://github.com/Shopify/sarama/blob/v1.19.0/consumer_group.go#L18 is clear enough, but it seems that the example at https://godoc.org/github.com/Shopify/sarama#example-ConsumerGroup does not take rebalancing into consideration.

func (h MyHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	var msg SyncMessage
	for {
		select {
		case cMsg := <-claim.Messages():
			err := json.Unmarshal(cMsg.Value, &msg)
			if err != nil {
				return err
			}
			// do something
			sess.MarkMessage(cMsg, "")
		case <-sess.Context().Done():
			return nil
		}
	}
}


type ConsumerGroup interface {
 // Consume joins a cluster of consumers for a given list of topics and
 // starts a blocking ConsumerGroupSession through the ConsumerGroupHandler.
 //
 // The life-cycle of a session is represented by the following steps:
 //
 // 1. The consumers join the group (as explained in https://kafka.apache.org/documentation/#intro_consumers)
 // and is assigned their "fair share" of partitions, aka 'claims'.
 // 2. Before processing starts, the handler's Setup() hook is called to notify the user
 // of the claims and allow any necessary preparation or alteration of state.
 // 3. For each of the assigned claims the handler's ConsumeClaim() function is then called
 // in a separate goroutine which requires it to be thread-safe. Any state must be carefully protected
 // from concurrent reads/writes.
 // 4. The session will persist until one of the ConsumeClaim() functions exits. This can be either when the
 // parent context is cancelled or when a server-side rebalance cycle is initiated.
 // 5. Once all the ConsumeClaim() loops have exited, the handler's Cleanup() hook is called
 // to allow the user to perform any final tasks before a rebalance.
 // 6. Finally, marked offsets are committed one last time before claims are released.
 //
 // Please note, that once a relance is triggered, sessions must be completed within
 // Config.Consumer.Group.Rebalance.Timeout. This means that ConsumeClaim() functions must exit
 // as quickly as possible to allow time for Cleanup() and the final offset commit. If the timeout
 // is exceeded, the consumer will be removed from the group by Kafka, which will cause offset
 // commit failures.
 Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error
  
 // Errors returns a read channel of errors that occurred during the consumer life-cycle.
 // By default, errors are logged and not returned over this channel.
 // If you want to implement any custom error handling, set your config's
 // Consumer.Return.Errors setting to true, and read from this channel.
 Errors() <-chan error
  
 // Close stops the ConsumerGroup and detaches any running sessions. It is required to call
 // this function before the object passes out of scope, as it will otherwise leak memory.
 Close() error
 }

--------------------------------------------------------------------------------------------------------------------------------------------------------

 类似资料: