TAO的实时事件服务
我们已探研了如何使用TAO的COS事件服务来接收更新过的股票的价格,但是如果我们并不关心所有的股票又怎么样呢?一个方法是使用多个事件通道,每个通道承载不同的消息容量。例如,每个事件通道仅携带股票其中的一部分。在本节中,我们将探讨另一个广案,即使用TAO实事事件服务来为我们执行过滤。TAO的实时事件服务可做许多其它事情,像保存具有优先级的点对点,使用多播来节省网络资源,产生超时和间隔的事件,以及它可以与TAO的调度服务协作用于分析您系统的可调度性。
在本例中,我们将会使用我们前一章用过的相同的数据结构,也就是事件将是相同的。TAO的实时事件服务可被配置用于以类型安全的方式携带事件,或你可以使用自定义的编码来发送非在事件中非IDL结构体,但第一点它可以像COS 事件服务一样简单的使用。
作为消费者连接是很相似的。一些基类和签名变化了,但它基本上具有相同的思想:先让我们定义消费者对象:
class Stock_Consumer : public POA_RtecEventComm::PushConsumer { public: Stock_Consumer (); void push (const RtecEventComm::EventSet& data); void disconnect_push_consumer (void); // details omitted };
注意的是我们接收一个事件集合代替单个事件。事件通道可以使用这个特征来把多个事件放入队列和把它们放进一个单个操作。首先我们需要从any中抽取事件数据:
void
Stock_Consumer::push (const RtecEventComm::EventSet &data)
{
for (CORBA::ULong i = 0; i != data.length (); ++i) {
RtecEventComm::Event &e = data[i];
Quoter::Event *event;
if ((e.data.any_value >>= event) == 0)
continue; // Invalid event
注意事件有多个结构,它们有区分明显的事件头和事件数据体,并且事件数据体比一个any还多。事件头用于提供过滤,事件数据体字段可配置为在编译时携带无论哪种您想要的IDL结构体。现在我们可以输出新的股票价格了:
std::cout << "The new price for one stock in /"" << event->full_name.in () << "/" (" << event->symbol.in () << ") is " << event->price << std::endl; }
我们也必须实现断开连接的回调:
void Stock_Consumer::disconnect_push_consumer (void) { this->supplier_proxy_ = CosEventChannelAdmin::ProxyPushSupplier::_nil (); }
As with the COS Event Channel we can voluntarily disconnect, too:
与COS 事件通道一样我们也可以自愿断开连接。
void Stock_Consumer::disconnect () { // Do not receive any more events... this->supplier_proxy_->disconnect_push_supplier (); }
连接到实时事件通道与连接到正规的事件通道非常的相似。只有一点不同,就是我们必须指定为想要接收的事件。这一点使用相当复杂的IDL结构来描述,但是TAO提供了一个帮助类来生产它。我们将假定我们使用命名服务或其它相似的服务来获取一个事件服务的引用:
CORBA::Object_var tmp = naming_context->resolve (name); RtecEventChannelAdmin::EventChannel_var event_channel = RtecEventChannelAdmin::EventChannel::_narrow (tmp);
现在我们用事件通道来获取消费都连接使用的工厂:
RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin = event_channel->for_consumers ();
再用工厂找回代理:
void Stock_Consumer::connect (RtecEventChannelAdmin::ConsumerAdmin_ptr consumer_admin) { this->supplier_proxy_ = consumer_admin->obtain_push_supplier ();
现在我们列出我们想接收的事件。我们用简单的算法来给事件类型赋给每支股票:
CORBA::ULong rhat_event_type = (int('R') << 24) | (int('H') << 16) | (int('A') << 8) | int('T'); CORBA::ULong aaaa_event_type = (int('A') << 24) | (int('A') << 16) | (int('A') << 8) | int('A');
然后我们创建订阅:
ACE_ConsumerQOS_Factory subscriptions; subscriptions.insert_type (rhat_event_type, 0); subscriptions.insert_type (aaaa_event_type, 0);
和连接到代理:
RtecEventComm::PushConsumer_var myself = this->_this (); this->supplier_proxy_->connect_push_consumer ( myself.in (), subscriptions.get_ConsumerQOS ()); }
As with the COS Event Channel example we will make our implementation of the Modify_Stock
interface generate events whenever the price changes:
与COS事件通道示例一样无论什么时候只要股价变化我们都将创建Modify_Stock接口的实现来创建事件。
class Quoter_Modify_Stock_i : public POA_Quoter::Modify_Stock { public: Quoter_Modify_Stock_i (const char *symbol, const char *full_name, CORBA::Double price); void set_price (CORBA::Double new_price); void disconnect_push_supplier (void); private: Quoter::Event data_; RtecEventChannelAdmin::ProxyPushConsumer_var consumer_proxy_; POA_RtecEventComm::PushSupplier_tie < Quoter_Stock_i > supplier_personality_; };
set_price()方法的实现非常相似。第一我们存储新的价格:
void Quoter_Stock_i::set_price (CORBA::Double new_price) { this->data_.price = new_price;
下一步我们准备事件。这时我们必须创建一个序列,但我们仅有在里面有一个元素:
RtecEventComm::EventSet event (1); event.length (1);
We set the event type based on the stock symbol:
RtecEventComm::Event &e = event[0]; const char *symbol = this->data_.symbol; e.header.type = ((int(symbol[0]) << 24) | (int(symbol[1]) << 16) | (int(symbol[2]) << 8) | int(symbol[3])); e.header.source = 1;
在本示例中没有使用事件源,但它必须是非0的。现在我们可以设置事件数据体:
e.data.any_value <<= this->data_;
and send the event to the event channel:
this->consumer_proxy_->push (event); }
作为在COS事件通道的情形下,我们需要提一个供者特征与它连接。我们获得事件服务的访问方法,例如使用命名服务:
CORBA::Object_var tmp = naming_context->resolve (name); RtecEventChannelAdmin::EventChannel_var event_channel = RtecEventChannelAdmin::EventChannel::_narrow (tmp);
接下来我们用事件通到来获得提供者连接使用的工厂:
RtecEventChannelAdmin::SupplierAdmin_var supplier_admin = event_channel->for_suppliers ();
和用工厂获得一个代理:
this->consumer_proxy_ = supplier_admin->obtain_push_consumer ();
我们构建发布器因此事件通可以在基于他们通用的事件之的消费者和提供者之间进行匹配:
const char *symbol = this->data_.symbol; CORBA::ULong type = ((int(symbol[0]) << 24) | (int(symbol[1]) << 16) | (int(symbol[2]) << 8) | int(symbol[3])); CORBA::ULong source = 1; ACE_SupplierQOS_Factory publications; publications.insert_type (type, source, 0, 1);
最后我们连接到消费者代理上:
RtecEventComm::PushSupplier_var supplier = this->supplier_personality_._this (); this->consumer_proxy_->connect_push_supplier (supplier);
最后我们实现断开连接的回调:
void Quoter_Stock_i::disconnect_push_supplier (void) { // Forget about the consumer it is not there anymore this->consumer_proxy_ = RtecEventChannelAdmin::ProxyPushConsumer::_nil (); }
实现接收股价更新事件的消费者,
已提供了头文件 header file , 一起的还有client.cpp. 还提供了这些文件Quoter.idl, Makefile, Stock_i.h, Stock_i.cpp, Stock_Factory_i.h, Stock_Factory_i.cpp, 和server.cpp.
用您的方案与 Stock_Consumer.cpp比较。
要测试您的变化您需要运行三个程序,先要运行TAO的命名服务:
$ $TAO_ROOT/orbsvcs/Naming_Service/Naming_Service
然后运行TAO的实时事件服务
$ $TAO_ROOT/orbsvcs/Event_Service/Event_Service
再运行您的客户端:
$ client AAAA CCCC
最后运行服务端:
$ server MSFT BBBB CCCC < stock_list.txt
Here is the stock_list.txt file.
向上面那样配置,但是这次运行多个客户端和服务端:
$ client AAAA MSFT $ client PPPP $ server AAAA < stock_list1.txt $ server QQQQ < stock_list2.txt
客户端能接收来来自两个服务端的所有事件吗?
这是 stock_list1.txt 和stock_list2.txt 文件。