目录
简而言之,序列化就是一种处理对象流的机制,即将对象的内容流化,将数据转化成字节流,以便存储在文件中或用于在网络中传输,当然用的最多的肯定就是网络传输,RPC在实现数据传递时便依赖序列化层,反序列化就是相反的过程。在选择序列化协议时,往往有如下几个指标可供参考:
JDK默认就为我们提供了序列化,不管你用没用过Kyro、hessian或Protobuf等各种热门高效的序列化技术,你肯定用过JDK默认的序列化实现,该方式只需要在对应的实体类上实现Serializable接口即可将该类标识为可被序列化,一个简单的demo如下:
public void test0 () throws Exception {
// 需实现Serializable接口
User user = new User("123", "jdks");
// 将对象写入到文件中
ObjectOutputStream objectOutputStream = new ObjectOutputStream(new FileOutputStream("user.txt"));
objectOutputStream.writeObject(user);
objectOutputStream.close();
// 将对象从文件中读出来
ObjectInputStream inputStream = new ObjectInputStream(new FileInputStream("user.txt"));
User newUser = (User) inputStream.readObject();
inputStream.close();
assert "jdks".equals(newUser.getName());
}
容易用的模型通常性能不是太好,JDK序列化就属于这个,同时该协议在传递的对象中还包含了元数据信息,占用空间较大。但是由于这是java自带的,因此简单、方便,且无需第三方依赖
Kryo 是一个快速序列化/反序列化工具,其使用了字节码生成机制(底层依赖了 ASM 库),因此具有比较好的运行速度。
Kryo 序列化出来的结果,是其自定义的、独有的一种格式,不再是 JSON 或者其他现有的通用格式;而且,其序列化出来的结果是二进制的(即 byte[];而 JSON 本质上是字符串 String);二进制数据显然体积更小,序列化、反序列化时的速度也更快。
Kryo 一般只用来进行序列化(然后作为缓存,或者落地到存储设备之中)、反序列化,而不用于在多个系统、甚至多种语言间进行数据交换 —— 目前 kryo 也只有 java 实现。
像Redis这样的存储工具,是可以安全的存储二进制数据,所以一般项目中可使用Kryo来替代JDK序列化进行存储。
引入Maven依赖:
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>4.0.2</version>
</dependency>
需要注意的是,由于kryo使用了较高版本的asm,可能会与业务现有依赖的asm产生冲突,这是一个比较常见的问题。只需将依赖改成:
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo-shaded</artifactId>
<version>4.0.2</version>
</dependency>
首先看下使用Kryo进行序列化的一个案例,如下:
public void test1() throws Exception {
Kryo kryo = new Kryo();
User user = new User("123", "kryo");
Output output = new Output(new FileOutputStream("userKryo.txt"));
kryo.writeObject(output, user);
output.close();
Input input = new Input(new FileInputStream("userKryo.txt"));
User newUser = kryo.readObject(input, User.class);
input.close();
assert "kryo".equals(newUser.getName());
}
可以看到与JDK序列化过程其实非常类似,整个过程也很清楚。
Kryo共支持三种读写方式,如果知道class字节码,并且对象不为空,便可直接使用入门案例中的方式:writeObject/readObject,如果对象可能为空,Kryo也提供了另外一种方式:writeObjectOrNull/readObjectOrNull,当然Kryo也支持将字节码的信息直接存放到序列化结果中,在反序列化时自行读取字节码信息:writeClassAndObject/readClassAndObject,此时反序列化后的是个obj对象,需要判断使用。针对泛型对象的反序列化,Kryo的解析就方便很多了,例如List<User>,看下面一个案例:
public void test4() throws Exception {
Kryo kryo = new Kryo();
List<User> list = Lists.newArrayList(new User("123", "kryoR"));
Output output = new Output(new FileOutputStream("userKryo3.txt"));
kryo.writeObject(output, list);
output.close();
Input input = new Input(new FileInputStream("userKryo3.txt"));
// 使用Kryo在反序列化自定义对象的list时无需像有些json工具一样透传泛型参数,因为Kryo在序列化结果里记录了泛型参数的实际类型的信息,反序列化时会根据这些信息来实例化对象
List newList = kryo.readObject(input, ArrayList.class);
input.close();
assert newList.get(0) instanceof User;
}
上面的案例中这一行代码:List newList = kryo.readObject(input, ArrayList.class); 如果ArrayList.class换成List.class,则会得到如下异常:
com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): java.util.List
因为Kryo不支持包含无参构造器类的反序列化,若尝试反序列化一个不包含无参构造器的类就会得到如上异常,当然为每一个类都增加无参构造器是每一个程序员都应当遵守的编程规范。
还有一个重要的点就是Kryo 不支持 Bean 中增删字段。如果使用 Kryo 序列化了一个类,存入了 Redis,然后对类进行了修改,会导致反序列化的异常。当然我们可以catch住异常,并清除该缓存,然后返回”缓存未命中“信息给上层调用者。
当Kryo在序列化一个对象时,默认需要将类的全限定名称写入,将类名一同写入序列化数据中是比较低效的,所以Kryo支持通过类注册进行优化:
kryo.register(SomeClassA.class);
kryo.register(SomeClassB.class);
kryo.register(SomeClassC.class);
在类注册时会给每一个class一个int类型的id值相关联,日后序列化和反序列化时都以该id值来替换类名,这显然比一大串的类名称要高效,但同时也要求反序列化时候的id必须与序列化过程保持一致,这意味着id与class的关联不能变,也就是注册的顺序非常重要,但是它有个很大的弊端,它不能保证同一个class每次注册的号码都相同,只与注册的顺序有关,这意味着不同的机器或同一个机器在重启前后都有可能拥有不同的编号,这会导致反序列化产生问题,所以在分布式项目中该问题就会被暴露出来,之前项目中也遇到过一次,反序列化后得到的对象一直为null,因此在Kryo中注册行为默认是关闭的,如果分布式项目非要用,可以在注册时指定id值,这样的话注册顺序就无关紧要了。
可以混合使用注册和未注册的类,默认使用id 0-9 注册所有基本类型,基本类包装器,String 和 void。所以要小心此范围内的注册覆盖的情况。
当 Kryo#setRegistrationRequired 设置为true时,可在遇到任何未注册的类时抛出异常,这能阻止应用程序使用类名字符串来序列化。
Kryo默认是线程不安全的,有两种解决方式,一个是通过Threadlocal的方式为某个线程存储一个实例:
private static final ThreadLocal<Kryo> kryoThreadLocal = new ThreadLocal<Kryo>() {
protected Kryo initialValue() {
Kryo kryo = new Kryo();
// 这里可以增加一系列配置信息
return kryo;
}
};
另外一种是通过KryoPool的方式,该方式在性能上也好于ThreadLocal:
public KryoPool createPool() {
return new KryoPool.Builder(() -> {
Kryo kryo = new Kryo();
// 此处也可以进行一系列配置,可通过实现KryoFactory接口来满足动态注册,抽象该类
return kryo;
}).softReferences().build();
}
这是对循环引用的支持,可以有效防止栈内存溢出,kryo默认会打开这个属性。当你确定不会有循环引用发生的时候,可以通过kryo.setReferences(false);关闭循环引用检测,从而提高一些性能,但不是很推荐
以10000个测试对象为例:
Kryo序列化消耗的时间181
Kryo反序列化消耗的时间223
JDK序列化消耗的时间458
JDK反序列化消耗的时间563
@Test
public void test5() throws Exception{
long time = System.currentTimeMillis();
Kryo kryo = new Kryo();
Output output = new Output(new FileOutputStream("kryoPerformance.txt"));
Map<String, String> map = new HashMap<>();
map.put("key", "value");
for (int i = 0;i < 10000; i++) {
kryo.writeObject(output, new User(String.valueOf(i), "test", false, Lists.newArrayList(String.valueOf(i)), map));
}
output.close();
System.out.println("Kryo序列化消耗的时间" + (System.currentTimeMillis() - time));
}
@Test
public void test6() throws Exception{
long time = System.currentTimeMillis();
Kryo kryo = new Kryo();
Input input = new Input(new FileInputStream("kryoPerformance.txt"));
User user = null;
try {
while (null != (user = kryo.readObject(input, User.class))) {
}
} catch (KryoException e) {
}
input.close();
System.out.println("Kryo反序列化消耗的时间" + (System.currentTimeMillis() - time));
}
@Test
public void test7() throws Exception{
long time = System.currentTimeMillis();
ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream("JDKPerformance.txt"));
Map<String, String> map = new HashMap<>();
map.put("key", "value");
for (int i = 0;i < 10000; i++) {
oos.writeObject(new User(String.valueOf(i), "test", false, Lists.newArrayList(String.valueOf(i)), map));
}
oos.close();
System.out.println("JDK序列化消耗的时间" + (System.currentTimeMillis() - time));
}
@Test
public void test8() throws Exception{
long time = System.currentTimeMillis();
ObjectInputStream ois = new ObjectInputStream(new FileInputStream("JDKPerformance.txt"));
User user = null;
try {
while (null != (user = (User) ois.readObject())) {
}
} catch (EOFException e) {
}
System.out.println("JDK反序列化消耗的时间" + (System.currentTimeMillis() - time));
}
由于上面的User对象存在循环引用且参数较多,经过测试在1000个对象时,jdk的速度会快一点,在普通对象上而言,Kryo还是比JDK速度要快的多,也更紧凑,如果提前注册将在程序中所使用的类的话,性能会更好一点。
但实际上Kryo经常被用于Redis中序列化自定义对象时,以替代JDK的序列化方式,以RedisTemplate为例,value的序列化方式默认即为JDK序列化,但是其性能确实比不上Kryo
@Bean
public RedisTemplate<String, Serializable> jdkRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Serializable> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer());
return redisTemplate;
}
那么如果改用Kryo序列化的话,势必要先自定义一个Kryo序列化类,如下:
public class KryoSerializer implements RedisSerializer<Object> {
private KryoPool kryoPool;
private static final Logger logger = LoggerFactory.getLogger(KryoSerializer.class);
public KryoSerializer() {
kryoPool = new KryoPool.Builder(Kryo::new).softReferences().build();
}
@Override
public byte[] serialize(Object data) throws SerializationException {
byte[] result = new byte[0];
if (null == data)
return result;
Kryo kryo = kryoPool.borrow();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
// 这里采用默认的缓冲字节数组大小即可,若指定的过大效率会非常慢
Output output = new Output(bos);
kryo.writeClassAndObject(output, data);
output.close();
// 释放当前实例
kryoPool.release(kryo);
result = bos.toByteArray();
try {
bos.close();
} catch (IOException e) {
logger.error("Close IO error:{}", e);
}
return result;
}
@Override
public Object deserialize(byte[] bytes) throws SerializationException {
Object result = null;
if (null != bytes && bytes.length > 0) {
Kryo kryo = kryoPool.borrow();
Input input = new Input(bytes);
result = kryo.readClassAndObject(input);
kryoPool.release(kryo);
input.close();
}
return result;
}
}
然后设值到对应的redisTemplate实例中:
@Bean
public RedisTemplate<String, Object> kryoRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new KryoSerializer());
return redisTemplate;
}
然后测试了下循环往Redis中设值,两者所要消耗的时间,由于循环了5000次,意味着需要向Redis发送5000次命令,因此效率其实都很低,第三种方式采用了管道即5000次命令一次性告诉Redis,但Redis并不支持事物,因此并不能保证全部成功,示例代码如下:
if (type == 0) {
// jdk序列化方式
long start = System.currentTimeMillis();
for (int i = 0; i < 5000; i++) {
String key = String.valueOf(i);
String keyName = "jdk:user" + key;
User user = new User(key, keyName, i, "18888888888", "塞外", "jdk@163.com");
jdkRedisTemplate.opsForValue().set(keyName, user);
}
log.info("Serializable 序列化方式耗时:" + (System.currentTimeMillis() - start));
} else if (type == 1) {
// kryo序列化方式
long start = System.currentTimeMillis();
for (int i = 0; i < 5000; i++) {
String key = String.valueOf(i);
String keyName = "kryo:user" + key;
User user = new User(key, keyName, i, "18888888888", "塞外", "kryo@163.com");
kryoRedisTemplate.opsForValue().set(keyName, user);
}
log.info("Kryo 序列化方式耗时:" + (System.currentTimeMillis() - start));
} else if (type == 2) {
// 使用管道方式批量增加
long start = System.currentTimeMillis();
List<Object> result = kryoRedisTemplate.executePipelined(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
// 打开管道
connection.openPipeline();
// 然后给本次管道内添加要一次执行的多条命令
KryoSerializer kryoSerializer = new KryoSerializer();
for (int i = 5000; i < 10001; i++) {
String key = String.valueOf(i);
String keyName = "kryo:user" + key;
User user = new User(key, keyName, i, "18888888888", "塞外", "kryo@163.com");
connection.set(keyName.getBytes(), kryoSerializer.serialize(user));
}
// 管道不需要手动关闭,否则拿不到返回值
return null;
}
});
// 可以对结果集进行获取 result
log.info("Kryo 批量序列化方式耗时:" + (System.currentTimeMillis() - start));
}
耗时如下:
Serializable 序列化方式耗时:8619
Kryo 序列化方式耗时:4478
批量序列化方式耗时:197
<dependency>
<groupId>com.caucho</groupId>
<artifactId>hessian</artifactId>
<version>4.0.51</version>
</dependency>
public byte[] hessianSerialize(Object data) throws IOException {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
Hessian2Output out = new Hessian2Output(bos);
out.writeObject(data);
out.flush();
return bos.toByteArray();
}
public <T> T hessianDeserialize(byte[] bytes, Class<T> clz) throws IOException {
Hessian2Input input = new Hessian2Input(new ByteArrayInputStream(bytes));
return (T) input.readObject(clz);
}
hessian序列化的实现机制是着重于数据,附带简单的类型信息的方法。支持跨语言,序列化后字节数适中,API易用。是国内主流RPC框架Dubbo、motan 的默认序列化协议,由于也没怎么用过就简单略过。
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.48</version>
</dependency>
public byte[] jsonSerialize(Object data) throws IOException{
SerializeWriter out = new SerializeWriter();
JSONSerializer serializer = new JSONSerializer(out);
// 注意补充对枚举类型的特殊处理
serializer.config(SerializerFeature.WriteEnumUsingToString, true);
// 额外补充类名可以在反序列化时获得更丰富的信息
serializer.config(SerializerFeature.WriteClassName, true);
serializer.write(data);
return out.toBytes("UTF-8");
}
public <T> T jsonDeserialize(byte[] data, Class<T> tClass) throws IOException {
return JSON.parseObject(new String(data), tClass);
}
作为一个json工具,被拉到序列化方案中似乎有点不妥,但新浪开源的motan RPC框架除了支持hessian之外,还支持了Fastjson的序列化,因此也可以将其作为一个跨语言序列化的简易实现方案。