我编写了一个C(Rabbitmq-c)工作应用程序,它使用由Python脚本(pika)发布的队列。
我有以下奇怪的行为,我似乎无法解决:
知道会发生什么吗?
我已经尝试确保每个消费者都有自己的渠道(这有必要吗?)但还是一样的行为。。。
以下是消费者(工作者)的代码:
conn = amqp_new_connection();
sock = (amqp_socket_t *)(uint64_t)amqp_tcp_socket_new(conn);
amqp_socket_open(sock, "localhost", 5672);
amqp_login(conn,
"/",
0,
131072,
0,
AMQP_SASL_METHOD_PLAIN,
"guest",
"guest");
if (amqp_channel_open(conn, chan) == NULL)
LOG_ERR(" [!] Failed to open amqp channel!\n");
if ((q = amqp_queue_declare(conn,
chan,
amqp_cstring_bytes("ranges"),
0,
0,
0,
0,
amqp_empty_table)) == NULL)
LOG_ERR(" [!] Failed to declare queue!\n");
LOG_INFO(" [x] Queue (message count = %d)\n", q->message_count);
amqp_queue_bind(conn, chan, amqp_cstring_bytes("ranges"), amqp_empty_bytes, amqp_empty_table);
amqp_basic_consume(conn, chan, amqp_cstring_bytes("ranges"), amqp_empty_bytes, 0, 0, 0, amqp_empty_table);
while(1) {
amqp_maybe_release_buffers(conn);
amqp_consume_message(conn, &e, NULL, 0);
{
int n;
amqp_frame_t f;
unsigned char buf[8];
unsigned char *pbuf = buf;
amqp_simple_wait_frame(conn, &f); // METHOD frame
amqp_simple_wait_frame(conn, &f); // HEADER frame
n = f.payload.properties.body_size;
if (n != sizeof(range_buf))
LOG_ERR(" [!] Invalid message size!");
while (n) {
amqp_simple_wait_frame(conn, &f); // BODY frame
memcpy(pbuf,
f.payload.body_fragment.bytes,
f.payload.body_fragment.len);
n -= f.payload.body_fragment.len;
pbuf += f.payload.body_fragment.len;
}
// do something with buf
LOG_INFO(" [x] Message recevied from queue\n");
}
amqp_destroy_envelope(&e);
amqp_maybe_release_buffers(conn);
}
这可能对你有帮助
消息确认
完成一项任务可能需要几秒钟的时间。您可能想知道,如果其中一个消费者开始了一项长期任务,但只完成了部分任务就去世了,会发生什么情况。使用我们当前的代码,一旦RabbitMQ向客户发送消息,它就会立即将其从内存中删除。在这种情况下,如果您杀死一个工人,我们将丢失它刚刚处理的消息。我们还将丢失发送给此特定工作进程但尚未处理的所有消息。
但我们不想失去任何任务。如果一名工人死亡,我们希望将任务交付给另一名工人。
为了确保消息不会丢失,RabbitMQ支持消息确认。消费者会发回一个确认(knowledgement),告诉RabbitMQ已经收到并处理了一条特定的消息,RabbitMQ可以自由删除它。
如果消费者死亡而未发送ack,RabbitMQ将理解消息未被完全处理,并将其重新传递给另一个消费者。这样,即使工人偶尔死亡,也可以确保不会丢失任何信息。
没有任何消息超时;RabbitMQ将仅在工作连接断开时重新传递消息。即使处理一条消息需要非常非常长的时间,也没关系。
默认情况下打开消息确认。
这里的问题很可能是您的消费者在启动时预取了所有消息。这是RabbitMQ的默认行为,但您可以减少消费者预取的消息数量,以便更好地将工作负载分配给多个工作人员。
这仅仅意味着一个或多个消费者将接收所有消息,而不会为新消费者留下任何消息。
如果将qos应用于消费者,并将预取限制为10条消息。消费者只会排队等待前10条消息,而新消费者可以填补空缺。
您正在寻找的实现此功能称为amqp\u basic\u qos,此外,您可以在此处阅读有关消费者预取的更多信息。
是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?
我是Kafka的新手,我有一个使用Java Apache Camel库实现的Kafka消费者。我发现的问题是-消费者花了很长的时间(>15分钟)来处理很少的消息-这对于我们的用例来说是很好的。 需要一些配置帮助,因为相同的消息会在15分钟后重新发送,如果在15分钟内没有处理(我相信线程控制不会返回)。我想这可能是默认间隔,不确定这是哪一个属性。 那么,我必须在哪里修复配置 生产者级别,以便它不重新
我花了几个小时想弄清楚发生了什么,但没能找到解决办法。 这是我在一台机器上的设置: 1名zookeeper跑步 我正在使用kafka控制台生成器插入消息。如果我检查复制偏移量(
我们运行一个集群工作线程应用程序,该应用程序依赖于 Kafka 使用高级消费者 API 使用消息。群集中的所有节点共享同一个使用者组。现在我们想要的是将该逻辑的一部分迁移到 Kafka 流处理器 API。这里的方法是什么?如果分配了相同的 groupId/clientId,流拓扑是否会与现有使用者就消息进行斗争?我们应该分配不同的 groupId/clientId 吗?流式传输拓扑?说“组”。 “
Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka
我刚接触Kafka,很少阅读教程。我无法理解使用者和分区之间的关系。 请回答我下面的问题。 > 消费者是否由ZK分配到单个分区,如果是,如果生产者将消息发送到不同的分区,那么其他分区的消费者将如何使用该消息? 我有一个主题,它有3个分区。我发布消息,它会转到P0。我有5个消费者(不同的消费者群体)。所有消费者都会阅读P0的信息吗?若我增加了许多消费者,他们会从相同的P0中阅读信息吗?如果所有消费者