https://github.com/real-logic/aeron
高效且可靠的UDP单播,UDP多播和进程间通讯(IPC)消息传输。支持Java,.NET,C和C++客户端。所有客户端都能跨机器进行消息交互,或者在同一机器上进行进程间通讯,非常高效。消息流可以通过Archive模块进行持久化,以供回放。Aeron Cluster模块提供基于Raft一致性算法的容错能力。Agrona提供SBE序列化功能。
功能:UDP单播和多播,IPC
模块:
Archive:持久化
Aeron Cluster:集群
Agrona:SBE序列化
1. 启动MediaDriver
2. 将Aeron实例连接到MediaDriver
3. 在Aeron上添加一个指定channel和streamId的publication。publication只有和至少一个subscriber连接时,才能pub消息,否则result返回Publication.NOT_CONNECTED。
MediaDriver可以独立运行,同时处理多个进程;也可以在嵌入式在应用中启动。
嵌入式MediaDriver需要通过系统属性配置或者通过context进行以下配置,然后传给MediaDriver:
LogBuffer目录:context.aeronDirectoryName(),用于缓存需要发送的消息
ThreadMode:MediaDriver的运行线程模型
启动代码:
final MediaDriver driver = MediaDriver.launch();
or
final MediaDriver driver = MediaDriver.launchEmbedded();
Aeron客户端需要与一个运行中的MediaDriver协同操作,来创建Publication,Subscription和housekeeping。为了使Aeron与MediaDriver建立连接,Aeron需要在connect方法中指定MediaDriver的LogBuffer目录,如果不指定,则使用默认路径。
如果使用嵌入式MediaDriver,且目录随机生成,可以使用以下代码来指定Aeron的连接目录:
final MediaDriver driver = MediaDriver.launchEmbedded();
Aeron aeron = Aeron.connect(new Aeron.Context().aeronDirectoryName(driver.aeronDirectoryName()))
不同事件下,可能触发不同的事件处理器,这里处理器在Aeron context中进行配置,有ClientConductor线程进行调用。
常见的处理器包括:
errorHandler
availableImageHandler
unavailableImageHandler
在发送和接收消息时,需要使用到directBuffer来进行消息访问和修改,以达到最高效。
两个重要接口为:
DirectBuffer
MutableDirectBuffer
sub在特定的channe和stream上进行消息监听,它会汇集相同channe和stream上的0-n个images。images通过sessionId进行标识。
final Aeron aeron = Aeron.connect(new Aeron.Context());
final Subscription subscription = aeron.addSubscription(CHANNEL, STREAM_ID);
addSubscription方法会阻塞,直到MediaDriver ack或者超时。
(1)Polling
Sub通过poll或者Image的poll方法控制着何时将数据传给FragmentHandler,并且将轮询操作代理给了匹配的Image对象。每次poll完,可以通过idleStrategy进行idle。
这里可以直到,数据先进入image(接收到消息的缓存区),然后通过轮询调用poll进行数据拉取,将数据从image缓冲区拉取指定数量的消息交给FragmentHandler进行处理。
MediaDriver --> image --> FragmentHandler
(2)FragmentHandler
数据从image实例中被传输到FragmentHandler后,通过回调函数获取message,回调函数的入参包括:buffer, offset, length, header。
(3)Message Reassembly
pub在发送数据时,会将大的消息分割成数据帧。sub为了将这个数据帧重组起来,然后再发送给FragmentHandler,可以通过组合FragmentAssembler形成一条处理链来完成。
虽然消息重组对延时影响非常小,但也不是完全没有负面影响,如果希望超低延时,那么将消息大小限制到MTU大小是一个很好的方案。
消息允许的最大长度是term长度1/8的最小值,也就是16MB。消息如果体如果大于该值,就需要使用应用程序级的块协议来进行分块处理。分块处理具有较好的故障恢复性能,以及较好的流同步性能。
(4)Advanced Polling
有时,我们希望更多的控制Subscription/Image的消息拉取机制,例如,如果想并行的将消息流进行存档,那么可以调用Image的blockPoll或者rawPoll方法。
还可以控制消息轮询行为,Image或者Subscription的controlledPoll方法接受一个ControlledFragmentHandler,ControlledFragmentHandler用于指定在消息片段被处理后应该执行的操作,这些操作可以是:
ABORT:当前的轮询操作不会导致position前移
BREAK:将当前的postion作为当前fragment的结束点进行提交
COMMIT:继续处理,当时将当前的postion作为当前fragment的结束点进行提交,以便进行flow control
CONTINUE:按照标准FragmentHandler继续处理
通过指定channel和stream进行消息发送的应用程序。
final Aeron aeron = Aeron.connect(new Aeron.Context());
final Publication publication = aeron.addPublication(CHANNEL, STREAM_ID);
addPublication方法会阻塞,直到MediaDriver ack或者超时。
(1)Handing Back Pressure
Aeron针对publisher具有内置的背压功能,超过流控限制时,将禁止再pub消息。
当调用Publication.offer方法时,如果返回正数,则表示消息已经被发送;但是如果返回复数,则表示消息没有被加入队列来进行pub。返回值常量含义如下:
NOT_CONNECTED:表示没有sub和pub建立连接
BACK_PRESSURE:表示由于来自sub端的背压导致消息未被发送,可以重试
ADMIN_ACTION:表示由于管理行为导致消息未被发送,例如log rotation,可以重试
CLOSED:pub被关闭而导致的消息未发送
MAX_POSITION_EXCEEDED:pub达到了由term-length限定的最大position。这可能是设置了较小的term-length导致的,pub的最大position是2^31 * term-length。
处理背压的方式包括(不是所有的处理方式):
1. 重试直到成功,spin or idle
2. 忽略失败,并继续pub后续消息
3. 重试直到成功或者超时
4. 异步重试。周期性重试,但不进行idle,而是处理其他工作。
需要1-3个线程。
MediaDriver有3个重要的Agent:
1. Conductor: 响应客户端请求,houseKeeping,丢失检测,发送NAK,翻转buffer等;
2. Sender:将消息从publisher传送到网络媒介;
3. Receiver:将网络媒介中的消息传送给subcriber。
aeron.threading.mode可以取如下值:
INVOKER: 不创建线程,客户端使用MediaDriver.Context.driverAgentInvoker直接调用
SHARED:三个agent共用一个线程,总共就一个线程
SHARED_NETWORK:Sender和Receiver使用同一个线程,Conductor使用一个线程
DEDICATED:每个agent都使用一个单独的线程,一共三个线程
用于配置agent在没有任务要处理时如何工作,有两种重要的策略:
1. BusySpinIdleStrategy:在agent空闲时进行空转,这可能导致CPU达100%;
2. BackOffIdleStrategy:在agent空闲时采用spinning, yielding, and parking的回退策略,对CPU比较友好,但只空闲非常短的时间时,对活动的响应较小。
对于这两种策略,有一对MediaDriver:
1. MediaDriver:默认,使用BackOffIdleStrategy
2. LowLatencyMediaDriver:Conductor使用BusySpinIdleStrategy,Sender and Receiver使用
NoOpIdleStrategy,默认使用
DEDICATED线程模型。
Aeron应用程序的大部分线程需求都由应用程序控制。但是,每个Aeron实例都有一个名为ClientConductor的后台线程,它负责处理housekeeping,并与媒体驱动程序命令交互。这个线程可以由应用程序通过设置Aeron.context.threadfactory()或让Aeron自定义自己的线程来控制。但是这个线程不需要一个单独的CPU来进行处理。
Subcriber需要不断的轮询并将消息发送给应用程序,为了达到低延时和高吞吐,通常使用BusySpinIdleStrategy
,而且使用单独的CPU进行处理。
Maximum Transmission Unit,用于设置组播保温的最大长度
MediaDriver的配置参数aeron.mtu.length控制数据帧的MTU长度。该值在注册期间传递给Aeron客户机。因此,应用程序不必关心媒体驱动程序使用的MTU值,而是使用相同的值。
超过接口MTU的MTU值将导致IP分片数据报,这可能会增加消息丢失的可能性。如果要在接口MTU的基础上增加MTU,需要先考虑各种增加接口MTU的方法,以便进行准备。
MTU是Aeron发送作为一个单独数据帧的最大消息,MTU长度对套接字缓冲区大小也有影响。
Aeron通常作为client端与MediaDriver进行交互,这些交互是通过各种buffer来完成的,默认情况下,java.io.tmpdir
or /dev/shm/作为buffer的文件路径,我们可以通过aeron.dir来指定对应的路径。
term buffer的最大长度是1GB,term buffer的大小决定了pub可能比sub领先的程度。
流控临界值设置的越大,会有更大的吞吐量。
3.