只有一个 rdk:broker-1 线程的 cpu 满,其它的都正常,另一个 rdk:broker-1 线程的 PID 为 18 。观察正常情况下两个 rdk:broker-1 线程的 PID 分别为 16 和 17,问题发生时为 17 和 18,不知道是不是发生了退出重新创建两个 rdk:broker-1 线程。
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
17 root 20 0 861236 13168 7956 R 99.9 0.3 132271:38 rdk:broker-1 // rd_kafka_set_thread_sysname("rdk:broker%" PRId32, rkb->rkb_nodeid);
librdkafka-0.11.6
[root@sanyue ~]# pstack 17
Thread 1 (process 17):
#0 cnd_timedwait_abs (cnd=cnd@entry=0x109b438, mtx=mtx@entry=0x109b410, tspec=tspec@entry=0x7f30833e4180) at tinycthread_extra.c:100
#1 0x000000000067dfe1 in rd_kafka_q_pop_serve (rkq=0x109b410, timeout_ms=<optimized out>, version=version@entry=0, cb_type=cb_type@entry=RD_KAFKA_Q_CB_RETURN, callback=callback@entry=0x0, opaque=opaque@entry=0x0) at rdkafka_queue.c:373
#2 0x000000000067e090 in rd_kafka_q_pop (rkq=<optimized out>, timeout_ms=<optimized out>, version=version@entry=0) at rdkafka_queue.c:399
#3 0x0000000000665b2f in rd_kafka_broker_ops_serve (rkb=rkb@entry=0x109ab50, timeout_ms=<optimized out>) at rdkafka_broker.c:2510
#4 0x0000000000665be4 in rd_kafka_broker_serve (rkb=rkb@entry=0x109ab50, abs_timeout=abs_timeout@entry=9539396929093) at rdkafka_broker.c:2532
#5 0x00000000006660b7 in rd_kafka_broker_ua_idle (rkb=rkb@entry=0x109ab50, timeout_ms=<optimized out>, timeout_ms@entry=-1) at rdkafka_broker.c:2617
#6 0x00000000006676f6 in rd_kafka_broker_thread_main (arg=arg@entry=0x109ab50) at rdkafka_broker.c:3571 // rd_kafka_broker_ua_idle(rkb, RD_POLL_INFINITE);
#7 0x00000000006b3ce7 in _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:583
#8 0x00007f3084fbfea5 in start_thread () from /lib64/libpthread.so.0
#9 0x00007f3084ce88dd in clone () from /lib64/libc.so.6
[root@sanyue ~]# pstack 17
Thread 1 (process 17):
#0 cnd_timedwait (cond=<optimized out>, mtx=<optimized out>, ts=<optimized out>) at tinycthread.c:470
#1 0x00000000006b423d in cnd_timedwait_abs (cnd=cnd@entry=0x109b438, mtx=mtx@entry=0x109b410, tspec=tspec@entry=0x7f30833e4180) at tinycthread_extra.c:100
#2 0x000000000067dfe1 in rd_kafka_q_pop_serve (rkq=0x109b410, timeout_ms=<optimized out>, version=version@entry=0, cb_type=cb_type@entry=RD_KAFKA_Q_CB_RETURN, callback=callback@entry=0x0, opaque=opaque@entry=0x0) at rdkafka_queue.c:373
#3 0x000000000067e090 in rd_kafka_q_pop (rkq=<optimized out>, timeout_ms=<optimized out>, version=version@entry=0) at rdkafka_queue.c:399
#4 0x0000000000665b2f in rd_kafka_broker_ops_serve (rkb=rkb@entry=0x109ab50, timeout_ms=<optimized out>) at rdkafka_broker.c:2510
#5 0x0000000000665be4 in rd_kafka_broker_serve (rkb=rkb@entry=0x109ab50, abs_timeout=abs_timeout@entry=9539396929093) at rdkafka_broker.c:2532
#6 0x00000000006660b7 in rd_kafka_broker_ua_idle (rkb=rkb@entry=0x109ab50, timeout_ms=<optimized out>, timeout_ms@entry=-1) at rdkafka_broker.c:2617
#7 0x00000000006676f6 in rd_kafka_broker_thread_main (arg=arg@entry=0x109ab50) at rdkafka_broker.c:3571
#8 0x00000000006b3ce7 in _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:583
#9 0x00007f3084fbfea5 in start_thread () from /lib64/libpthread.so.0
#10 0x00007f3084ce88dd in clone () from /lib64/libc.so.6
[root@sanyue ~]# pstack 17
Thread 1 (process 17):
#0 rd_kafka_q_pop_serve (rkq=0x109b410, timeout_ms=<optimized out>, version=version@entry=0, cb_type=cb_type@entry=RD_KAFKA_Q_CB_RETURN, callback=callback@entry=0x0, opaque=opaque@entry=0x0) at rdkafka_queue.c:347
#1 0x000000000067e090 in rd_kafka_q_pop (rkq=<optimized out>, timeout_ms=<optimized out>, version=version@entry=0) at rdkafka_queue.c:399
#2 0x0000000000665b2f in rd_kafka_broker_ops_serve (rkb=rkb@entry=0x109ab50, timeout_ms=<optimized out>) at rdkafka_broker.c:2510
#3 0x0000000000665be4 in rd_kafka_broker_serve (rkb=rkb@entry=0x109ab50, abs_timeout=abs_timeout@entry=9539396929093) at rdkafka_broker.c:2532
#4 0x00000000006660b7 in rd_kafka_broker_ua_idle (rkb=rkb@entry=0x109ab50, timeout_ms=<optimized out>, timeout_ms@entry=-1) at rdkafka_broker.c:2617
#5 0x00000000006676f6 in rd_kafka_broker_thread_main (arg=arg@entry=0x109ab50) at rdkafka_broker.c:3571
#6 0x00000000006b3ce7 in _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:583
#7 0x00007f3084fbfea5 in start_thread () from /lib64/libpthread.so.0
#8 0x00007f3084ce88dd in clone () from /lib64/libc.so.6
[root@sanyue ~]# pstack 17
Thread 1 (process 17):
#0 0x000000000056e910 in pthread_cond_timedwait@plt ()
#1 0x00000000006b3e89 in cnd_timedwait (cond=<optimized out>, mtx=<optimized out>, ts=<optimized out>) at tinycthread.c:469
#2 0x00000000006b423d in cnd_timedwait_abs (cnd=cnd@entry=0x109b438, mtx=mtx@entry=0x109b410, tspec=tspec@entry=0x7f30833e4180) at tinycthread_extra.c:100
#3 0x000000000067dfe1 in rd_kafka_q_pop_serve (rkq=0x109b410, timeout_ms=<optimized out>, version=version@entry=0, cb_type=cb_type@entry=RD_KAFKA_Q_CB_RETURN, callback=callback@entry=0x0, opaque=opaque@entry=0x0) at rdkafka_queue.c:373
#4 0x000000000067e090 in rd_kafka_q_pop (rkq=<optimized out>, timeout_ms=<optimized out>, version=version@entry=0) at rdkafka_queue.c:399
#5 0x0000000000665b2f in rd_kafka_broker_ops_serve (rkb=rkb@entry=0x109ab50, timeout_ms=<optimized out>) at rdkafka_broker.c:2510
#6 0x0000000000665be4 in rd_kafka_broker_serve (rkb=rkb@entry=0x109ab50, abs_timeout=abs_timeout@entry=9539396929093) at rdkafka_broker.c:2532
#7 0x00000000006660b7 in rd_kafka_broker_ua_idle (rkb=rkb@entry=0x109ab50, timeout_ms=<optimized out>, timeout_ms@entry=-1) at rdkafka_broker.c:2617
#8 0x00000000006676f6 in rd_kafka_broker_thread_main (arg=arg@entry=0x109ab50) at rdkafka_broker.c:3571
#9 0x00000000006b3ce7 in _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:583
#10 0x00007f3084fbfea5 in start_thread () from /lib64/libpthread.so.0
#11 0x00007f3084ce88dd in clone () from /lib64/libc.so.6
(gdb) bt
#0 0x00000000006b3e95 in cnd_timedwait (cond=<optimized out>, mtx=<optimized out>, ts=<optimized out>) at tinycthread.c:474
#1 0x00000000006b423d in cnd_timedwait_abs (cnd=cnd@entry=0x109b438, mtx=mtx@entry=0x109b410, tspec=tspec@entry=0x7f30833e4180) at tinycthread_extra.c:100
(gdb)
(gdb) p *(struct timespec*)0x7f30833e4180
$1 = {tv_sec = 1669498570, tv_nsec = 1000000000}
(gdb)
(gdb) p *(cnd_t*)0x109b438
$2 = {__data = {__lock = 0, __futex = 19079890, __total_seq = 9539945, __wakeup_seq = 9539945, __woken_seq = 9539945, __mutex = 0x109b410, __nwaiters = 0, __broadcast_seq = 0},
__size = "\000\000\000\000\322\"#\001i\221\221\000\000\000\000\000i\221\221\000\000\000\000\000i\221\221\000\000\000\000\000\020\264\t\001", '\000' <repeats 11 times>,
__align = 81947503561277440}
(gdb)
(gdb) p *(mtx_t*)0x109b410
$3 = {__data = {__lock = 1, __count = 0, __owner = 17, __nusers = 1, __kind = 0, __spins = 0, __elision = 0, __list = {__prev = 0x0, __next = 0x0}},
__size = "\001\000\000\000\000\000\000\000\021\000\000\000\001", '\000' <repeats 26 times>, __align = 1}
(gdb) n
347 in rdkafka_queue.c // while ((rko = TAILQ_FIRST(&rkq->rkq_q)) && !(rko = rd_kafka_op_filter(rkq, rko, version)));
(gdb) n
373 in rdkafka_queue.c // if (cnd_timedwait_abs(&rkq->rkq_cond, &rkq->rkq_lock, &timeout_tspec) == thrd_timedout) { mtx_unlock(&rkq->rkq_lock); return NULL; }
(gdb) n
347 in rdkafka_queue.c
(gdb)
373 in rdkafka_queue.c
rd_kafka_op_t *
rd_kafka_q_pop(rd_kafka_q_t *rkq, rd_ts_t timeout_us, int32_t version) {
return rd_kafka_q_pop_serve(rkq, timeout_us, version,
RD_KAFKA_Q_CB_RETURN, NULL, NULL);
}
/**
* Serve q like rd_kafka_q_serve() until an op is found that can be returned
* as an event to the application.
*
* @returns the first event:able op, or NULL on timeout.
*
* Locality: any thread
*/
rd_kafka_op_t *rd_kafka_q_pop_serve (rd_kafka_q_t *rkq, int timeout_ms,
int32_t version,
rd_kafka_q_cb_type_t cb_type,
rd_kafka_q_serve_cb_t *callback,
void *opaque) {
struct timespec timeout_tspec;
rd_timeout_init_timespec_us(&timeout_tspec, timeout_us);
while (1) { // 死循环发生在这里,原因是 cnd_timedwait_abs 未生效
while ((rko = TAILQ_FIRST(&rkq->rkq_q)) && !(rko = rd_kafka_op_filter(rkq, rko, version))) // 347 行
;
// 未生效的 cnd_timedwait_abs,实际是 pthread_cond_timedwait 没有生效
// gdb 跟踪过程中,发生退出,重新正常,导致定位结束,问题期间仍然能够正常消费 kafka 中的数据
if (cnd_timedwait_abs(&rkq->rkq_cond, &rkq->rkq_lock, &timeout_tspec) == thrd_timedout) { // 373 行
mtx_unlock(&rkq->rkq_lock);
return NULL;
}
}
}
// tinycthread_extra.c:94
int cnd_timedwait_abs(cnd_t *cnd, mtx_t *mtx, const struct timespec *tspec) {
if (tspec->tv_sec == RD_POLL_INFINITE) // -1
return cnd_wait(cnd, mtx);
else if (tspec->tv_sec == RD_POLL_NOWAIT) // 0
return thrd_timedout;
// Linux 上实际调用的是 pthread_cond_timedwait 函数
return cnd_timedwait(cnd, mtx, tspec); // 这里没有生效?
}
int cnd_timedwait(cnd_t *cond, mtx_t *mtx, const struct timespec *ts)
{
ret = pthread_cond_timedwait(cond, mtx, ts); // 469 行
}
/**
* @brief Serve broker ops.
* @returns the number of ops served
*/
static RD_WARN_UNUSED_RESULT int
rd_kafka_broker_ops_serve(rd_kafka_broker_t *rkb, rd_ts_t timeout_us) {
rd_kafka_op_t *rko;
int cnt = 0;
while ((rko = rd_kafka_q_pop(rkb->rkb_ops, timeout_us, 0)) &&
(cnt++, !rd_kafka_broker_op_serve(rkb, rko)))
timeout_us = RD_POLL_NOWAIT;
return cnt;
}
**主:**新版本已无该函数
/**
* Idle function for unassigned brokers
* If \p timeout_ms is not RD_POLL_INFINITE the serve loop will be exited
* regardless of state after this long (approximately).
*/
static void rd_kafka_broker_ua_idle (rd_kafka_broker_t *rkb, int timeout_ms) {
int initial_state = rkb->rkb_state;
rd_ts_t abs_timeout;
if (rd_kafka_terminating(rkb->rkb_rk))
timeout_ms = 1;
else if (timeout_ms == RD_POLL_INFINITE)
timeout_ms = rkb->rkb_blocking_max_ms;
abs_timeout = rd_timeout_init(timeout_ms);
/* Since ua_idle is used during connection setup
* in state ..BROKER_STATE_CONNECT we only run this loop
* as long as the state remains the same as the initial, on a state
* change - most likely to UP, a correct serve() function
* should be used instead.
* Regardless of constraints (terminating, timeouts), poll at
* least once. The state will not have changed on the first iteration.
*/
do {
rd_kafka_broker_toppars_serve(rkb);
rd_kafka_broker_serve(rkb, abs_timeout);
} while (!rd_kafka_broker_terminating(rkb) &&
(int)rkb->rkb_state == initial_state &&
!rd_timeout_expired(rd_timeout_remains(abs_timeout)));
}
static int rd_kafka_broker_thread_main(void *arg) {
rd_kafka_broker_t *rkb = arg;
rd_kafka_t *rk = rkb->rkb_rk;
rd_kafka_set_thread_name("%s", rkb->rkb_name);
rd_kafka_set_thread_sysname("rdk:broker%" PRId32, rkb->rkb_nodeid); // pthread_setname_np
rd_kafka_interceptors_on_thread_start(rk, RD_KAFKA_THREAD_BROKER);
case RD_KAFKA_BROKER_STATE_UP:
if (rkb->rkb_nodeid == RD_KAFKA_NODEID_UA)
rd_kafka_broker_ua_idle(rkb, RD_POLL_INFINITE); // 3571 行
else if (rk->rk_type == RD_KAFKA_PRODUCER)
rd_kafka_broker_producer_serve(rkb);
else if (rk->rk_type == RD_KAFKA_CONSUMER)
rd_kafka_broker_consumer_serve(rkb);
(gdb) p *rkb
$2 = {rkb_link = {tqe_next = 0x0, tqe_prev = 0x0}, rkb_nodeid = -1, rkb_rsal = 0x0, rkb_ts_rsal_last = 0, rkb_addr_last = 0x0, rkb_transport = 0x0, rkb_corrid = 0, rkb_connid = 0,
rkb_ops = 0x109b410, rkb_lock = {__data = {__lock = 0, __count = 0, __owner = 0, __nusers = 0, __kind = 0, __spins = 0, __elision = 0, __list = {__prev = 0x0, __next = 0x0}},
__size = '\000' <repeats 39 times>, __align = 0}, rkb_blocking_max_ms = 1000, rkb_toppars = {tqh_first = 0x0, tqh_last = 0x109abc8}, rkb_toppar_cnt = 0, rkb_active_toppars = {
cqh_first = 0x109abe0, cqh_last = 0x109abe0}, rkb_active_toppar_cnt = 0, rkb_active_toppar_next = 0x0, rkb_cgrp = 0x0, rkb_ts_fetch_backoff = 0, rkb_fetching = 0,
rkb_state = RD_KAFKA_BROKER_STATE_UP, rkb_ts_state = 19190729, rkb_timeout_scan_intvl = {ri_ts_last = 9539395929092, ri_fixed = 0, ri_backoff = 0}, rkb_blocking_request_cnt = {val = 0},
rkb_features = 0, rkb_ApiVersions = 0x0, rkb_ApiVersions_cnt = 0, rkb_ApiVersion_fail_intvl = {ri_ts_last = 0, ri_fixed = 1200000000, ri_backoff = 0}, rkb_source = RD_KAFKA_INTERNAL, rkb_c = {
tx_bytes = {val = 0}, tx = {val = 0}, tx_err = {val = 0}, tx_retries = {val = 0}, req_timeouts = {val = 0}, rx_bytes = {val = 0}, rx = {val = 0}, rx_err = {val = 0}, rx_corrid_err = {
val = 0}, rx_partial = {val = 0}, zbuf_grow = {val = 0}, buf_grow = {val = 0}, wakeups = {val = 0}}, rkb_req_timeouts = 0, rkb_ts_tx_last = 0, rkb_ts_metadata_poll = 320587665,
rkb_metadata_fast_poll_cnt = 0, rkb_thread = 139846337062656, rkb_refcnt = {val = 2}, rkb_rk = 0x1099620, rkb_recv_buf = 0x0, rkb_max_inflight = 0, rkb_outbufs = {rkbq_bufs = {
tqh_first = 0x0, tqh_last = 0x109ad20}, rkbq_cnt = {val = 0}, rkbq_msg_cnt = {val = 0}}, rkb_waitresps = {rkbq_bufs = {tqh_first = 0x0, tqh_last = 0x109ad38}, rkbq_cnt = {val = 0},
rkbq_msg_cnt = {val = 0}}, rkb_retrybufs = {rkbq_bufs = {tqh_first = 0x0, tqh_last = 0x109ad50}, rkbq_cnt = {val = 0}, rkbq_msg_cnt = {val = 0}}, rkb_avg_int_latency = {ra_v = {maxv = 0,
minv = 0, avg = 0, sum = 0, cnt = 0, start = 0}, ra_lock = {__data = {__lock = 0, __count = 0, __owner = 0, __nusers = 0, __kind = 0, __spins = 0, __elision = 0, __list = {__prev = 0x0,
__next = 0x0}}, __size = '\000' <repeats 39 times>, __align = 0}, ra_enabled = 0, ra_type = RD_AVG_GAUGE, ra_hdr = 0x0, ra_hist = {p50 = 0, p75 = 0, p90 = 0, p95 = 0, p99 = 0,
p99_99 = 0, oor = 0, hdrsize = 0, stddev = 0, mean = 0}}, rkb_avg_outbuf_latency = {ra_v = {maxv = 0, minv = 0, avg = 0, sum = 0, cnt = 0, start = 0}, ra_lock = {__data = {__lock = 0,
__count = 0, __owner = 0, __nusers = 0, __kind = 0, __spins = 0, __elision = 0, __list = {__prev = 0x0, __next = 0x0}}, __size = '\000' <repeats 39 times>, __align = 0}, ra_enabled = 0,
ra_type = RD_AVG_GAUGE, ra_hdr = 0x0, ra_hist = {p50 = 0, p75 = 0, p90 = 0, p95 = 0, p99 = 0, p99_99 = 0, oor = 0, hdrsize = 0, stddev = 0, mean = 0}}, rkb_avg_rtt = {ra_v = {maxv = 0,
minv = 0, avg = 0, sum = 0, cnt = 0, start = 0}, ra_lock = {__data = {__lock = 0, __count = 0, __owner = 0, __nusers = 0, __kind = 0, __spins = 0, __elision = 0, __list = {__prev = 0x0,
__next = 0x0}}, __size = '\000' <repeats 39 times>, __align = 0}, ra_enabled = 0, ra_type = RD_AVG_GAUGE, ra_hdr = 0x0, ra_hist = {p50 = 0, p75 = 0, p90 = 0, p95 = 0, p99 = 0,
p99_99 = 0, oor = 0, hdrsize = 0, stddev = 0, mean = 0}}, rkb_avg_throttle = {ra_v = {maxv = 0, minv = 0, avg = 0, sum = 0, cnt = 0, start = 0}, ra_lock = {__data = {__lock = 0,
__count = 0, __owner = 0, __nusers = 0, __kind = 0, __spins = 0, __elision = 0, __list = {__prev = 0x0, __next = 0x0}}, __size = '\000' <repeats 39 times>, __align = 0}, ra_enabled = 0,
ra_type = RD_AVG_GAUGE, ra_hdr = 0x0, ra_hist = {p50 = 0, p75 = 0, p90 = 0, p95 = 0, p99 = 0, p99_99 = 0, oor = 0, hdrsize = 0, stddev = 0, mean = 0}},
rkb_name = ":0/internal", '\000' <repeats 116 times>, rkb_nodename = ":0", '\000' <repeats 125 times>, rkb_port = 0, rkb_origname = 0x109b3d0 "", rkb_logname = 0x109b3f0 ":0/internal",
rkb_logname_lock = {__data = {__lock = 0, __count = 0, __owner = 0, __nusers = 0, __kind = 0, __spins = 0, __elision = 0, __list = {__prev = 0x0, __next = 0x0}},
__size = '\000' <repeats 39 times>, __align = 0}, rkb_wakeup_fd = {5, 6}, rkb_toppar_wakeup_fd = -1, rkb_connect_intvl = {ri_ts_last = 0, ri_fixed = 0, ri_backoff = 0},
rkb_proto = RD_KAFKA_PROTO_PLAINTEXT, rkb_down_reported = 0, rkb_err = {msg = '\000' <repeats 511 times>, err = 0}}
broker 在 socket 操作时最大阻塞时长(单位:毫秒),值越低响应越快,但会提升 cpu 使用率(Maximum time a broker socket operation may block. A lower value improves responsiveness at the expense of slightly higher CPU usage. **Deprecated**)。
rdkafka broker
寄存器 | 说明 |
---|---|
rdi | 第 1 个参数 |
rsi | 第 2 个参数 |
rdx | 第 3 个参数或第 2 个返回值 |
rcx | 第 4 个参数 |
r8 | 第 5 个参数 |
r9 | 第 6 个参数,再多得用 stack 保存 |
rax | 第 1 个返回值 |
rsp | 栈顶寄存器,相当于 32 位的 esp 寄存器 |
rbp | 栈基址寄存器,存放函数的栈帧起始地址 |