当前位置: 首页 > 工具软件 > AMI通知 > 使用案例 >

ICE异步程序设计-----AMI/AMD

黎承颜
2023-12-01

1 简介

AMI


异步方法调用(AMI) 这个术语描述的是客户端的异步编程模型支持。
如果你使用AMI 发出远地调用,在Ice run time 等待答复的同时,发出调用的线程不会阻塞。相反,发出调用的线程可以继续进行各种活动,当答复最终到达时, Ice run time 会通知应用。通知是通过回调发给应用提供的编程语言对象的

AMD

一个服务器在同一时刻所能支持的同步请求数受到Ice run time 的服务器线程池的尺寸限制(参见15.3 节)。如果所有线程都在忙于分派长时间运行的操作,那么就没有线程可用于处理新的请求,客户就会经验到不可接受的无响应状态。异步方法分派(AMD) 是AMI 的服务器端等价物 ,能够解决这个可伸缩性问题
在使用AMD 时,服务器可以接收一个请求,然后挂起其处理,以尽快释放分派线程。当处理恢复、结果已得出时,服务器要使用Ice runtime 提供的回调对象,显式地发送响应。

使用AMD时,客户端如果需要等待返回值,那就一直等待,否则就继续往下执行。
为什么使用AMD?
用实际的术语说, AMD 操作通常会把请求数据(也就是,回调对象和操作参数)放入队列 ,供应用的某个线程(或线程池)随后处理用。这样,服务器就使分派线程的使用率降到了最低限度,能够高效地支持数千并发客户。
另外, AMD 还可用于需要在完成了客户的请求之后继续进行处理的操作。为了使客户的延迟降到最低限度,操作在返回结果后,仍留在分派线程中,继续用分派线程执行其他工作。
AMD解决的主要是提高服务端的负荷能力。


区别:

(1)AMI使得客户端可以异步,AMD使得服务端可以异步。

(2)使用AMI服务端代码不需要修改  ,使用AMD客户端代码不需要修改。

(3)对同步模型的支持

AMI 的语言映射仍然允许应用使用同步调用模型:如果为某个操作指定AMI 元数据,用于同步调用的代理方法会完整地得以保留;同时,还会生成一个额外的代理方法,用以支持异步调用。但是, AMD 操作的语言映射不允许我们的实现同时使用两种分派模型。如果你指定了AMD 元数据,用于同步分派的方法就会被用于异步分派的方法取代。


2 使用元数据修改代码生成

程序员如果想要使用异步模型(AMI、AMD,或两者都使用),需要给Slice 定义批注上元数据(4.17 节)。程序员可以在两个层面上指定这种元数据:接口或类的层面,或单个操作的层面。如果你是在为一个接口或类进行指定,那么为它的所有操作生成的代码都将会有异步支持。而如果只有某些操作需要异步支持,那么你可以只为这些操作指定元数据,从而使生成的代码的数据降到最低限度。

考虑下面的Slice 定义:
["ami"] interface I {
bool isValid();
float computeRate();
};
interface J {
["amd"] void startProcess();
["ami", "amd"] int endProcess();
};
在这个例子中,为接口I 的所有代理方法生成的代码都将具有同步和异步调用支持。在接口J 中, startProcess 操作使用的是异步分派,而endProcess 操作则支持异步的调用和分派。

3 使用AMI

3.1  代码映射

C++ 代码生成器为每个AMI 操作生成以下代码:

(1)一个抽象的回调类, Ice run time 用它来通知应用,操作已完成。类名是按这样的模式取的   AMI_class_op。

这个类提供两个方法:

void ice_response(<params>);

void ice_exception(const Ice::Exception &);

(2) 一个额外的代理方法
其名字是操作在映射后的名字,加上_async。这个方法的返回类型是void,第一个参数是一个智能指针,指向上面描述的回调类的一个实例。其他的参数由操作的各个in 参数组成,次序是声明时的次序。

例如,假定我们定义了下面这个操作:
interface I {
["ami"] int foo(short s, out long l);
};
下面是为操作foo 生成的回调类:
class AMI_I_foo : public ... {
public:
virtual void ice_response(Ice::Int, Ice::Long) = 0;
virtual void ice_exception(const Ice::Exception &) = 0;
};
typedef IceUtil::Handle<AMI_I_foo> AMI_I_fooPtr;
下面是为操作foo 的异步调用生成的 代理方法:
void foo_async(const AMI_I_fooPtr &, Ice::Short);


3.2 一个使用AMI的例子

为了演示Ice 中的AMI 的用法,让我们定义一个简单的计算引擎的
Slice 接口
sequence<float> Row;
sequence<Row> Grid;
exception RangeError {};
interface Model {
["ami"] Grid interpolate(Grid data, float factor)
throws RangeError;
};


C++ 客户
我们首先必须定义我们的回调实现类,它派生自生成的
AMI_Model_interpolate 类:
class AMI_Model_interpolateI : public AMI_Model_interpolate {
public:
virtual void ice_response(const Grid & result)
{
cout << "received the grid" << endl;
// ... postprocessing ...
}
virtual void ice_exception(const Ice::Exception & ex)
{
try {
ex.ice_throw();
} catch (const RangeError & e) {
cerr << "interpolate failed: range error" << endl;
} catch (const Ice::LocalException & e) {
cerr << "interpolate failed: " << e << endl;
}
}
};

调用interpolate 的代码同样直截了当:
ModelPrx model =  ModelPrx::checkedCast(base);
AMI_Model_interpolatePtr cb = new AMI_Model_interpolateI;
Grid grid;
initializeGrid(grid);
model->interpolate_async(cb, grid, 0.5);
在获取了Model 对象的代理之后,客户实例化一个回调对象,初始化一个栅格,然后调用interpolate 的异步版本。当Ice run time 接收到对这个请求的响应时,会调用客户提供的回调对象。

3.3 并发问题
在Ice 中,异步调用由客户线程池提供支持 ( 第15 章),池中的线程主要负责处理答复消息。理解下列与异步调用相关的并发问题很重要:
  • 一个回调对象不能同时用于多个调用。需要聚合来自多个答复的信息的应用可以创建一个单独的对象,让回调对象对它进行委托。
  • 对回调对象的调用来自Ice run time 的客户线程池中的线程,因此,如果在答复到达的同时,应用可能要与回调对象进行交互,就有可能需要进行同步。
  • 客户线程池中的线程的数目决定了,同时可以为多少异步调用发出回调。客户线程池的缺省尺寸是一,意味着对回调对象的调用是序列化的。如果线程池的尺寸增大了,而同一回调对象被用于多个调用,应用就可能需要进行同步。

4 使用AMD

异步分派方法的型构与AMI 方法的类似:返回类型是void,参数由一个回调对象、以及操作的in 参数组成。在AMI 中,回调对象是由应用提供的,而在AMD 中,回调对象是由Ice run time 提供的,同时它还提供了一些方法,用于返回操作的结果,或报告异常。我们的实现不需要在分派方法返回之前调用回调对象;回调对象可以在任何时候,由任何线程调用,但只能被调用一次。


4.1 代码映射

C++ 代码生成器为每个AMD 操作生成以下代码:
(1)一个抽象的回调类.

实现用它来通知Ice run time,操作已完成。

类名是按这样的模式取的:AMD_class_op。

类的方法:

void ice_response(<params>);

void ice_exception(const Ice::Exception &);服务器可以用这个版本的ice_exception 报告用户异常或本地异常。

void ice_exception(const std::exception &);服务器可以用这个版本的ice_exception 报告标准的异常。
void ice_exception()             服务器可以用这个版本的ice_exception 报告未知异常。

(2)分派方法.

其名字有后缀_async。

异步分派方法,它的第一个参数就是由ICE实现的回调类AMD_class_op ,在这个方法里,我们要两种方案:

  1. 直接做具体工作,完成后在末尾调用回调类的ice_response方法告知客户端已完成。这种方案就和之前普通版的服务端一样,是同步执行的。
  2. 把 回调类和请求所需要的参数放入一个指定的位置,再由其它线程取出执行和通知客户端。这种方案就是异步分派方法,具体实现时还可以有多种方式,如使用命令模 式把参数和具体操作直接封装成一个对象放入队列,然后由另一线程(或线程池)取出执行。

这个方法的返回类型是void,第一个参数是一个智能指针,指向上面描述的回调类的一个实例。其他的参数由操作的各个in 参数组成,次序是声明时的次序。

例如,假定我们定义了下面这个操作:
interface I {
["amd"] int foo(short s, out long l);
};
下面是为操作foo 生成的回调类:

class AMD_I_foo : public ... {
public:
void ice_response(Ice::Int, Ice::Long);
void ice_exception(const Ice::Exception &);
void ice_exception(const std::exception &);
void ice_exception();
};
下面是为操作foo 的异步调用生成的分派方法:
void foo_async(const AMD_I_fooPtr &, Ice::Short);

4.2 例子

sequence<float> Row;
sequence<Row> Grid;
exception RangeError {};
interface Model {
["ami", "amd"] Grid interpolate(Grid data, float factor)
throws RangeError;
};

服务端的servant(servant就是服务端实际工作的代码)
我们的servant 类派生自Model,并且提供了interpolate_async
方法的定义:
class ModelI : virtual public Model,
virtual public IceUtil::Mutex {
public:
virtual void interpolate_async(const AMD_Model_interpolatePtr &,const Grid &,Ice::Float,const Ice::Current &);
private:
std::list<JobPtr> _jobs;
};
interpolate_async 的实现使用了同步来在一个Job 中安全地记录回调对象,并把它放进一个队列中:

void ModelI::interpolate_async(const AMD_Model_interpolatePtr & cb,const Grid & data,Ice::Float factor,const Ice::Current & current)
{
IceUtil::Mutex::Lock sync(*this);
JobPtr job = new Job(cb, data, factor);
_jobs.push_back(job);
}
在把信息放进队列之后,该操作把控制返回给Ice run time,使分派线程能够去处理另外的请求。一个应用线程从队列中移除下一个Job,并调用
execute 来进行插值。下面是Job 的定义:
class Job : public IceUtil::Shared {
public:
Job(const AMD_Model_interpolatePtr &,const Grid &,Ice::Float);
void execute();

private:
bool interpolateGrid();

AMD_Model_interpolatePtr _cb;
Grid _grid;
Ice::Float _factor;
};
typedef IceUtil::Handle<Job> JobPtr;
execute 的实现使用interpolateGrid ( 没有给出) 来完成计算工
作:
Job::Job(const AMD_Model_interpolatePtr & cb,const Grid & grid,Ice::Float factor) :_cb(cb), _grid(grid), _factor(factor)
{
}
void Job::execute()
{
if(!interpolateGrid()) {
_cb->ice_exception(RangeError());
return;
}
_cb->ice_response(_grid);

}
如果interpolateGrid 返回false, ice_exception 就会被调用,表明发生了范围错误。在调用了ice_exception 之后,使用return 语句是必要的,因为ice_exception 并没有抛出异常;它仅仅是整编了参数,并把它发送给客户。如果插值成功, ice_response 会被调用,把修改后的栅格返回给客户。


一个完整的例子

(1)Slice定义(hello.ice)

interface Hello

{

   ["ami", "amd"] idempotent void sayHello(int delay)

        throws RequestCanceledException;

   void shutdown();

};

(2)服务端声明文件代码:(helloI.h)

#include <Hello.h>

#include <WorkQueue.h>

class HelloI : virtual publicDemo::Hello

{

public:

   HelloI(const WorkQueuePtr&);

   //分派函数

   virtual void sayHello_async(const Demo::AMD_Hello_sayHelloPtr&, int,const   Ice::Current&);

   virtual void shutdown(const Ice::Current&);

private:

   WorkQueuePtr _workQueue;

};

(3)服务端实现文件代码(helloI.cpp)

HelloI::HelloI(constWorkQueuePtr& workQueue) :

   _workQueue(workQueue)

{

}

void

HelloI::sayHello_async(

constDemo::AMD_Hello_sayHelloPtr& cb,

 int delay, const Ice::Current&)

{

   if(delay == 0)

   {

        cout << "Hello World!"<< endl;

        cb->ice_response();

   }

   else

   {

        _workQueue->add(cb, delay);

   }

}


void

HelloI::shutdown(constIce::Current& curr)

{

    cout << "Shutting down..."<< endl;

    _workQueue->destroy();

   curr.adapter->getCommunicator()->shutdown();

}


int

AsyncServer::run(int argc, char*argv[])

{

   if(argc > 1)

   {

        cerr << appName() <<": too many arguments" << endl;

        return EXIT_FAILURE;

   }

   callbackOnInterrupt();

   Ice::ObjectAdapterPtr adapter =communicator()->createObjectAdapter("Hello");

   _workQueue = new WorkQueue();

    Demo::HelloPtr hello = newHelloI(_workQueue); //定义服务端servrant

   adapter->add(hello, communicator()->stringToIdentity(“hello”));//serverant加入适配器

    // 启动工作队列

   _workQueue->start();

   adapter->activate(); //

   communicator()->waitForShutdown();

   _workQueue->getThreadControl().join();//等待工作队列停止

   return EXIT_SUCCESS;

}

class WorkQueue : publicIceUtil::Thread

{

public:

   WorkQueue();

   virtual void run();

   //将回调对象加入队列

   void add(const Demo::AMD_Hello_sayHelloPtr&, int);

   void destroy();

private:

   //存储回调对象以及调用参数的的结构体

   struct CallbackEntry

   {

        Demo::AMD_Hello_sayHelloPtr cb;

        int delay;

   };

   IceUtil::Monitor<IceUtil::Mutex> _monitor;

   std::list<CallbackEntry> _callbacks;

   bool _done;

};

typedefIceUtil::Handle<WorkQueue> WorkQueuePtr;


VoidWorkQueue::run()

{

   IceUtil::Monitor<IceUtil::Mutex>::Lock lock(_monitor);

   while(!_done)

   {

        if(_callbacks.size() == 0)

       {

            _monitor.wait();

       }

        if(_callbacks.size() != 0)

       {

            //从队列中取下一个回调对象

  CallbackEntry entry = _callbacks.front();

  //等待delay

  _monitor.timedWait(IceUtil::Time::milliSeconds(entry.delay));

            if(!_done)

           {

      _callbacks.pop_front();

                //…处理业务逻辑 Do Something here

               entry.cb->ice_response();

           }

       }

   }


void WorkQueue::add(const Demo::AMD_Hello_sayHelloPtr&cb,int delay)

{

   IceUtil::Monitor<IceUtil::Mutex>::Lock lock(_monitor);

   if(!_done)

   {

       CallbackEntry entry;

       entry.cb =cb;

       entry.delay = delay;

        if(_callbacks.size() == 0)

       {

            _monitor.notify();

       }

        _callbacks.push_back(entry);

   }

   else

   {

  //发送异常通知

 cb->ice_exception(Demo::RequestCanceledException());

   }

}



5 总结 

同步的远地调用是对本地的方法调用的自然扩展,它利用了程序员的面向对象编程经验,使初学分布式应用开发的程序员的学习曲线平缓下来。但是,同步调用的阻塞本质使得有些应用任务的实现变得更为困难,甚至不可能,因此, Ice 提供了一个直截了当的接口,你可以用这个接口来访问Ice 的异步设施。如果使用异步方法调用,发出调用的线程可以调用一个操作,然后马上就重获控制,不用阻塞起来等待操作完成。当Ice run time 收到结果时,它会通过回调通知应用。与此类似,异步方法分派允许servant 在任何时候发送操作的结果,而不一定要在操作实现中发送。通过把费时的请求放在队列中,后面再进行处理, servant 可以改善可伸缩性,并节省线程资源。


注:本文内容主要来源于Ice官方文档

 类似资料: