我想使用amqp设置一个消费者,以便从特定队列读取。一些谷歌指出,这可以通过amqp_basic_get来实现,查看文档,实际的消息是通过amqp_read_消息检索的。我还发现了这个例子,我试图按照这个例子来实现基本的。然而,我无法从特定队列获取和读取消息。
我的场景是这样的:我有两个程序,通过发布和使用Rabbitmq服务器进行通信。在每个通道中,都声明了一个连接,有两个通道,一个用于消费,一个用于发布。信息流是这样的:程序A获取当前时间并发布到Rabbitmq。当收到这个消息时,程序B得到它自己的时间,将它的时间和收到的时间打包在一个消息中,并发布给Rabbitmq。程序A应该使用此消息。然而,我不能成功地从命名的队列中读取。
程序A(在c中,并使用amqp. c)执行如下:
... after creating the connection
//Create channels
amqp_channel_open_ok_t *res = amqp_channel_open(conn, channelIDPub);
assert(res != NULL);
amqp_channel_open_ok_t *res2 = amqp_channel_open(conn, channelIDSub);
assert(res2 != NULL);
//Declare exchange
exchange = "exchangeName";
exchangetype = "direct";
amqp_exchange_declare(conn, channelIDPub, amqp_cstring_bytes(exchange.c_str()),
amqp_cstring_bytes(exchangetype.c_str()), 0, 0, 0, 0,
amqp_empty_table);
...
throw_on_amqp_error(amqp_get_rpc_reply(conn), printText.c_str());
//Bind the exchange to the queue
const char* qname = "namedQueue";
amqp_bytes_t queue = amqp_bytes_malloc_dup(amqp_cstring_bytes(qname));
amqp_queue_declare_ok_t *r = amqp_queue_declare(
conn, channelIDSub, queue, 0, 0, 0, 0, amqp_empty_table);
throw_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");
if (queue.bytes == NULL) {
fprintf(stderr, "Out of memory while copying queue name");
return;
}
amqp_queue_bind(conn, channelIDSub, queue, amqp_cstring_bytes(exchange.c_str()),
amqp_cstring_bytes(queueBindingKey.c_str()), amqp_empty_table);
throw_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue");
amqp_basic_consume(conn, channelIDSub, queue, amqp_empty_bytes, 0, 0, 1,
amqp_empty_table);
throw_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
// ...
// In order to get a message from rabbitmq
amqp_rpc_reply_t res, res2;
amqp_message_t message;
amqp_boolean_t no_ack = false;
amqp_maybe_release_buffers(conn);
printf("were here, with queue name %s, on channel %d\n", queueName, channelIDSub);
amqp_time_t deadline;
struct timeval timeout = { 1 , 0 };//same timeout used in consume(json)
int time_rc = amqp_time_from_now(&deadline, &timeout);
assert(time_rc == AMQP_STATUS_OK);
do {
res = amqp_basic_get(conn, channelIDSub, amqp_cstring_bytes("namedQueue"), no_ack);
} while (res.reply_type == AMQP_RESPONSE_NORMAL &&
res.reply.id == AMQP_BASIC_GET_EMPTY_METHOD
&& amqp_time_has_past(deadline) == AMQP_STATUS_OK);
if (AMQP_RESPONSE_NORMAL != res.reply_type || AMQP_BASIC_GET_OK_METHOD != res.reply.id)
{
printf("amqp_basic_get error codes amqp_response_normal %d, amqp_basic_get_ok_method %d\n", res.reply_type, res.reply.id);
return false;
}
res2 = amqp_read_message(conn,channelID,&message,0);
printf("error %s\n", amqp_error_string2(res2.library_error));
printf("5:reply type %d\n", res2.reply_type);
if (AMQP_RESPONSE_NORMAL != res2.reply_type) {
printf("6:reply type %d\n", res2.reply_type);
return false;
}
payload = std::string(reinterpret_cast< char const * >(message.body.bytes), message.body.len);
printf("then were here\n %s", payload.c_str());
amqp_destroy_message(&message);
程序B(python)如下所示
#!/usr/bin/env python3
import pika
import json
from datetime import datetime, timezone
import time
import threading
cosimTime = 0.0
newData = False
lock = threading.Lock()
thread_stop = False
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
connectionPublish = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channelConsume = connection.channel()
channelPublish = connectionPublish.channel()
print("Declaring exchange")
channelConsume.exchange_declare(exchange='exchangeName', exchange_type='direct')
channelPublish.exchange_declare(exchange='exchangeName', exchange_type='direct')
print("Creating queue")
result = channelConsume.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
result2 = channelPublish.queue_declare(queue='namedQueue', exclusive=False, auto_delete=False)
channelConsume.queue_bind(exchange='exchangeName', queue=queue_name,
routing_key='fromB')
channelPublish.queue_bind(exchange='exchangeName', queue="namedQueue",
routing_key='toB')
print(' [*] Waiting for logs. To exit press CTRL+C')
def callbackConsume(ch, method, properties, body):
global newData, cosimTime
print("\nReceived [x] %r" % body)
#cosimTime = datetime.datetime.strptime(body, "%Y-%m-%dT%H:%M:%S.%f%z")
with lock:
newData = True
cosimTime = body.decode()
cosimTime = json.loads(cosimTime)
#print(cosimTime)
def publishRtime():
global newData
while not thread_stop:
if newData:
#if True:
with lock:
newData = False
msg = {}
msg['rtime'] = datetime.now(timezone.utc).astimezone().isoformat(timespec='milliseconds')
msg['cosimtime'] = cosimTime["simAtTime"]
print("\nSending [y] %s" % str(msg))
channelPublish.basic_publish(exchange='exchangeName',
routing_key='toB',
body=json.dumps(msg))
#time.sleep(1)
channelConsume.basic_consume(
queue=queue_name, on_message_callback=callbackConsume, auto_ack=True)
try:
thread = threading.Thread(target = publishRtime)
thread.start()
channelConsume.start_consuming()
except KeyboardInterrupt:
print("Exiting...")
channelConsume.stop_consuming()
thread_stop = True
connection.close()
程序A输出的是什么:
amqp_basic_get错误代码amqp_response_normal1amqp_basic_get_ok_method3932232
这是AMQP_BASIC_GET_EMPTY_方法的代码。
程序B获取数据,并连续发布。
如果我稍微修改B,只发布一个特定的字符串,那么amqp_basic_get似乎成功返回,但是它在代码AMQP_RESPONSE_LIBRARY_EXCEPTIONamqp_read_message失败。
知道如何让它工作吗?我错过了什么设置?
问题出在queue_declare
中,其中auto_delete
参数两边不匹配。
我可以创建10个线程。但问题是,当我试图以并行方式使用这些线程访问单个页面时。我也尝试过将私有静态PDFTextStripper实例放入同步块中。但我还是得到了以下例外: COSStream已被关闭,无法读取。也许它的附件已经被关闭? 试图打印前10页每页的第一个单词,但不起作用。这是我第一次尝试多线程和PDF阅读。任何帮助都将不胜感激。 如果我使用一个线程按顺序读取for循环中的每个页面,那么它
我正试图用IOS和firebase建立一个聊天室。当我在firebase控制台的模拟器中测试时,一切正常,但在IOS应用程序中不工作。 我用的结构是这样的 root/members/room id/auth。uid/userdata root/messages/room_id/messages/message 我用IOS创建了一个新房间 并为该房间添加另一名成员(目前使用硬编码uid): 然后像这
我的问题是演员永远不会关闭,所以我最终有一大堆演员: 但是,虽然我可以用“getContext().stop(getSelf())”关闭Actor,但将它作为一个单例使用似乎更有意义,因为Actor中没有保存状态。但是如何在不创建新的演员的情况下获得对这个演员的引用呢?
我今天刚开始Oozie,在运行Hive操作时注意到一个不一致的错误。当我运行相同的Oozie工作流时,有时它会成功,有时它会失败,因为配置单元操作出现了以下错误: 1)这是什么原因造成的?
我正在获取O并且从未进行成功的Hibernate连接测试,在学习了本教程“JasperReports with hibernate-module1”和JasperReports with hibernate-module2之后,弹出一个错误,说“Could not parse mapping document from resource com/report/mappings/department
我试图从__consumer_offsets主题中使用,因为这似乎是检索关于消费者的kafka度量(如消息滞后等)的最简单的方法。理想的方法是从jmx访问它,但希望先尝试一下,返回的消息似乎是加密的或不可读的。尝试添加stringDeserializer属性。有没有人对如何纠正这一点有什么建议?这里的提法也是重复的 重复的consumer_offset 没有帮助,因为它没有引用我的问题,即在Jav