当前位置: 首页 > 知识库问答 >
问题:

无法获取和读取具有amqp_basic_get和amqp_read_message的单个消息

鱼意远
2023-03-14

我想使用amqp设置一个消费者,以便从特定队列读取。一些谷歌指出,这可以通过amqp_basic_get来实现,查看文档,实际的消息是通过amqp_read_消息检索的。我还发现了这个例子,我试图按照这个例子来实现基本的。然而,我无法从特定队列获取和读取消息。

我的场景是这样的:我有两个程序,通过发布和使用Rabbitmq服务器进行通信。在每个通道中,都声明了一个连接,有两个通道,一个用于消费,一个用于发布。信息流是这样的:程序A获取当前时间并发布到Rabbitmq。当收到这个消息时,程序B得到它自己的时间,将它的时间和收到的时间打包在一个消息中,并发布给Rabbitmq。程序A应该使用此消息。然而,我不能成功地从命名的队列中读取。

程序A(在c中,并使用amqp. c)执行如下:

                        ... after creating the connection
        //Create channels
        amqp_channel_open_ok_t *res = amqp_channel_open(conn, channelIDPub);
        assert(res != NULL);
        amqp_channel_open_ok_t *res2 = amqp_channel_open(conn, channelIDSub);
        assert(res2 != NULL);
    
        //Declare exchange
        exchange = "exchangeName";
        exchangetype = "direct";
        amqp_exchange_declare(conn, channelIDPub, amqp_cstring_bytes(exchange.c_str()),
                              amqp_cstring_bytes(exchangetype.c_str()), 0, 0, 0, 0,
                              amqp_empty_table);
                                          ...
        throw_on_amqp_error(amqp_get_rpc_reply(conn), printText.c_str());
    
        //Bind the exchange to the queue
        const char* qname = "namedQueue";
        amqp_bytes_t queue = amqp_bytes_malloc_dup(amqp_cstring_bytes(qname));
        amqp_queue_declare_ok_t *r = amqp_queue_declare(
                conn, channelIDSub, queue, 0, 0, 0, 0, amqp_empty_table);
        throw_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");
        
        if (queue.bytes == NULL) {
            fprintf(stderr, "Out of memory while copying queue name");
            return;
        }
    
        amqp_queue_bind(conn, channelIDSub, queue, amqp_cstring_bytes(exchange.c_str()),
                        amqp_cstring_bytes(queueBindingKey.c_str()), amqp_empty_table);
        throw_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue");
    
        amqp_basic_consume(conn, channelIDSub, queue, amqp_empty_bytes, 0, 0, 1,
                           amqp_empty_table);
        throw_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
    
                                           // ...
        // In order to get a message from rabbitmq
        amqp_rpc_reply_t res, res2;
        amqp_message_t message;
        amqp_boolean_t no_ack = false;
    
        amqp_maybe_release_buffers(conn);
        printf("were here, with queue name %s, on channel %d\n", queueName, channelIDSub);
    
        amqp_time_t deadline;
        struct timeval timeout = { 1 , 0 };//same timeout used in consume(json)
        int time_rc = amqp_time_from_now(&deadline, &timeout);
        assert(time_rc == AMQP_STATUS_OK);
    
        do {
            res = amqp_basic_get(conn, channelIDSub, amqp_cstring_bytes("namedQueue"), no_ack);
        } while (res.reply_type == AMQP_RESPONSE_NORMAL &&
                res.reply.id == AMQP_BASIC_GET_EMPTY_METHOD 
                && amqp_time_has_past(deadline) == AMQP_STATUS_OK);
    
        if (AMQP_RESPONSE_NORMAL != res.reply_type || AMQP_BASIC_GET_OK_METHOD != res.reply.id)
        {
            printf("amqp_basic_get error codes amqp_response_normal %d, amqp_basic_get_ok_method %d\n", res.reply_type, res.reply.id);
            return false;
        }
    
        res2 = amqp_read_message(conn,channelID,&message,0);
        printf("error %s\n", amqp_error_string2(res2.library_error));
        printf("5:reply type %d\n", res2.reply_type);
    
        if (AMQP_RESPONSE_NORMAL != res2.reply_type) {
            printf("6:reply type %d\n", res2.reply_type);
            return false;
        }
    
        payload = std::string(reinterpret_cast< char const * >(message.body.bytes), message.body.len);
    
        printf("then were here\n %s", payload.c_str());
        amqp_destroy_message(&message);

程序B(python)如下所示

        #!/usr/bin/env python3
        import pika
        import json
        from datetime import datetime, timezone
        import time
        import threading
    
        cosimTime = 0.0
        newData = False
        lock = threading.Lock()
        thread_stop = False
    
        connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        connectionPublish = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        channelConsume = connection.channel()
        channelPublish = connectionPublish.channel()
    
        print("Declaring exchange")
        channelConsume.exchange_declare(exchange='exchangeName', exchange_type='direct')
        channelPublish.exchange_declare(exchange='exchangeName', exchange_type='direct')
    
        print("Creating queue")
        result = channelConsume.queue_declare(queue='', exclusive=True)
        queue_name = result.method.queue
        result2 = channelPublish.queue_declare(queue='namedQueue', exclusive=False, auto_delete=False)
    
        channelConsume.queue_bind(exchange='exchangeName', queue=queue_name,
                       routing_key='fromB')
    
        channelPublish.queue_bind(exchange='exchangeName', queue="namedQueue",
                       routing_key='toB')
    
        print(' [*] Waiting for logs. To exit press CTRL+C')
    
    def callbackConsume(ch, method, properties, body):
        global newData, cosimTime
        print("\nReceived [x] %r" % body)
        #cosimTime = datetime.datetime.strptime(body, "%Y-%m-%dT%H:%M:%S.%f%z")
        with lock:
            newData = True
            cosimTime = body.decode()
            cosimTime = json.loads(cosimTime)
            #print(cosimTime)
    
    def publishRtime():
        global newData
        while not thread_stop:
            if newData:
            #if True:
                with lock:
                    newData = False
                    msg = {}
                    
                    msg['rtime'] = datetime.now(timezone.utc).astimezone().isoformat(timespec='milliseconds')
    
                    msg['cosimtime'] = cosimTime["simAtTime"]
                    print("\nSending [y] %s" % str(msg))
                    channelPublish.basic_publish(exchange='exchangeName',
                                        routing_key='toB',
                                        body=json.dumps(msg))
                    #time.sleep(1)
    
    channelConsume.basic_consume(
        queue=queue_name, on_message_callback=callbackConsume, auto_ack=True)
    
    try:
        thread = threading.Thread(target = publishRtime)
        thread.start()
        channelConsume.start_consuming()
    except KeyboardInterrupt:
        print("Exiting...")
        channelConsume.stop_consuming()
        thread_stop = True
        connection.close()

程序A输出的是什么:

amqp_basic_get错误代码amqp_response_normal1amqp_basic_get_ok_method3932232

这是AMQP_BASIC_GET_EMPTY_方法的代码。

程序B获取数据,并连续发布。

如果我稍微修改B,只发布一个特定的字符串,那么amqp_basic_get似乎成功返回,但是它在代码AMQP_RESPONSE_LIBRARY_EXCEPTIONamqp_read_message失败。

知道如何让它工作吗?我错过了什么设置?

共有1个答案

苏宾鸿
2023-03-14

问题出在queue_declare中,其中auto_delete参数两边不匹配。

 类似资料:
  • 我可以创建10个线程。但问题是,当我试图以并行方式使用这些线程访问单个页面时。我也尝试过将私有静态PDFTextStripper实例放入同步块中。但我还是得到了以下例外: COSStream已被关闭,无法读取。也许它的附件已经被关闭? 试图打印前10页每页的第一个单词,但不起作用。这是我第一次尝试多线程和PDF阅读。任何帮助都将不胜感激。 如果我使用一个线程按顺序读取for循环中的每个页面,那么它

  • 我正试图用IOS和firebase建立一个聊天室。当我在firebase控制台的模拟器中测试时,一切正常,但在IOS应用程序中不工作。 我用的结构是这样的 root/members/room id/auth。uid/userdata root/messages/room_id/messages/message 我用IOS创建了一个新房间 并为该房间添加另一名成员(目前使用硬编码uid): 然后像这

  • 我的问题是演员永远不会关闭,所以我最终有一大堆演员: 但是,虽然我可以用“getContext().stop(getSelf())”关闭Actor,但将它作为一个单例使用似乎更有意义,因为Actor中没有保存状态。但是如何在不创建新的演员的情况下获得对这个演员的引用呢?

  • 我今天刚开始Oozie,在运行Hive操作时注意到一个不一致的错误。当我运行相同的Oozie工作流时,有时它会成功,有时它会失败,因为配置单元操作出现了以下错误: 1)这是什么原因造成的?

  • 我正在获取O并且从未进行成功的Hibernate连接测试,在学习了本教程“JasperReports with hibernate-module1”和JasperReports with hibernate-module2之后,弹出一个错误,说“Could not parse mapping document from resource com/report/mappings/department

  • 我试图从__consumer_offsets主题中使用,因为这似乎是检索关于消费者的kafka度量(如消息滞后等)的最简单的方法。理想的方法是从jmx访问它,但希望先尝试一下,返回的消息似乎是加密的或不可读的。尝试添加stringDeserializer属性。有没有人对如何纠正这一点有什么建议?这里的提法也是重复的 重复的consumer_offset 没有帮助,因为它没有引用我的问题,即在Jav