Avro的模式主要由JSON对象来表示,Avro支持8种基本类型(Primitive Type)和6种复杂类型(Complex Type:records、enums、arrays、maps、unions 和fixed),基本类型可以由JSON字符串来表示。
Avro支持两种序列化编码方式:二进制编码和JSON编码,使用二进制编码会高效序列化,并且序列化后得到的结果会比较小。
基本类型:
{
"namespace": "cn.slimsmart.avro.demo",
"protocol": "messageProtocol",
"doc": "This is a message.",
"name": "Message",
"types": [
{"name":"message", "type":"record",
"fields":[
{"name":"name", "type":"string"},
{"name":"type", "type":"int"},
{"name":"price", "type":"double"},
{"name":"valid", "type":"boolean"},
{"name":"content", "type":"string"}
]
}
],
"messages": {
"sendMessage":{
"doc" : "message test",
"request" :[{"name":"message","type":"message" }],
"response" :"message"
}
}
}
2.序列化
Avro有两种序列化编码:binary和JSON。
3.rpc通信实现
Avro的RPC实现不需要定义服务接口,但需要从.avpr文件中解析协议,协议中定义了消息结构和消息服务。message.avpr中定义了一个类型叫message,定义了一个服务叫sendMessage。
添加依赖jar:
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-ipc</artifactId>
<version>1.7.7</version>
</dependency>
1)协议解析工具类
Utils.java
package cn.slimsmart.avro.demo;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import org.apache.avro.Protocol;
public class Utils {
public static Protocol getProtocol() {
Protocol protocol = null;
try {
URL url = Utils.class.getClassLoader().getResource("message.avpr");
protocol = Protocol.parse(new File(url.getPath()));
} catch (IOException e) {
e.printStackTrace();
}
return protocol;
}
}
2)服务端
package cn.slimsmart.avro.demo;
import org.apache.avro.Protocol;
import org.apache.avro.Protocol.Message;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.ipc.HttpServer;
import org.apache.avro.ipc.generic.GenericResponder;
public class Server extends GenericResponder {
private Protocol protocol = null;
private int port;
public Server(Protocol protocol, int port) {
super(protocol);
this.protocol = protocol;
this.port = port;
}
@Override
public Object respond(Message message, Object request) throws Exception {
GenericRecord req = (GenericRecord) request;
GenericRecord reMessage = null;
if (message.getName().equals("sendMessage")) {
GenericRecord msg = (GenericRecord)req.get("message");
System.out.print("接收到数据:");
System.out.println(msg);
//取得返回值的类型
reMessage = new GenericData.Record(protocol.getType("message"));
//直接构造回复
reMessage.put("name", "苹果");
reMessage.put("type", 100);
reMessage.put("price", 4.6);
reMessage.put("valid", true);
reMessage.put("content", "最新上架货物");
}
return reMessage;
}
public void run() {
try {
HttpServer server = new HttpServer(this, port);
server.start();
server.join();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new Server(Utils.getProtocol(), 9090).run();
}
}
3)客户端
package cn.slimsmart.avro.demo;
import java.net.URL;
import org.apache.avro.Protocol;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.ipc.HttpTransceiver;
import org.apache.avro.ipc.Transceiver;
import org.apache.avro.ipc.generic.GenericRequestor;
public class Client {
private Protocol protocol = null;
private String host = null;
private int port = 0;
private int count = 0;
public Client(Protocol protocol, String host, int port, int count) {
this.protocol = protocol;
this.host = host;
this.port = port;
this.count = count;
}
public long sendMessage() throws Exception {
GenericRecord requestData = new GenericData.Record(protocol.getType("message"));
requestData.put("name", "香梨");
requestData.put("type", 36);
requestData.put("price", 5.6);
requestData.put("valid", true);
requestData.put("content", "价钱便宜");
// 初始化请求数据
GenericRecord request = new GenericData.Record(protocol.getMessages().get("sendMessage").getRequest());
request.put("message", requestData);
Transceiver t = new HttpTransceiver(new URL("http://" + host + ":" + port));
GenericRequestor requestor = new GenericRequestor(protocol, t);
long start = System.currentTimeMillis();
for (int i = 0; i < count; i++) {
Object result = requestor.request("sendMessage", request);
if (result instanceof GenericData.Record) {
GenericData.Record record = (GenericData.Record) result;
System.out.println(record);
}
}
long end = System.currentTimeMillis();
System.out.println((end - start)+"ms");
return end - start;
}
public long run() {
long res = 0;
try {
res = sendMessage();
} catch (Exception e) {
e.printStackTrace();
}
return res;
}
public static void main(String[] args) throws Exception {
new Client(Utils.getProtocol(), "127.0.0.1", 9090, 5).run();
}
}
Avro IPC提供了如下几种服务端和客户端实现:
1.基于jetty的http实现
HttpServer 和HttpTransceiver
2.基于netty的实现
NettyServer和NettyTransceiver
3.基于TCP的实现
SocketServer和SocketTransceiver
4.基于UDP的实现
DatagramServer和DatagramTransceiver
5.基于加密的TCP实现
SaslSocketServer和SaslSocketTransceiver