当前位置: 首页 > 工具软件 > cache4j > 使用案例 >

两级缓存框架J2Cache的使用

仉宸
2023-12-01

title: 两级缓存框架J2Cache的使用 tags:

  • J2Cache
  • 缓存
  • Cache
  • Spring
  • Redis categories: j2cache date: 2017-03-25 18:18:53

说起cache总是想要唠两句buffer,至于cache和buffer的区别在此Cache 和 Buffer 都是缓存,主要区别是什么?

缓存在整个计算机生活中处处可见,比如网页静态资源缓存,DNS解析缓存, CPU缓存,maven仓库缓存等等

说回缓存,缓存最常见的几个特征如下:

  1. 命中率

  2. 最大容量

  3. 淘汰策略

    1. FIFO
    2. LRU
    3. LFU

对于缓存简单的分类方法:

  1. 本地cache
  2. 分布式cache

Java常用的本地Cache框架包括EhCache GuavaCache等

常用分布式Cache包括MemCached Redis等

通俗来说使用缓存的目的就是保存计算好的数据方便下次取用,降低下次取用的时间属于典型的用空间换时间的例子,符合计算机的局部性原理。最根本的说明是让数据贴近使用者。

对于本系统中采用Ehcache作为缓存框架。Ehcache作为常见的本地缓存框架也提供了对于分布式的支持,可以参考红薯的深入探讨在集群环境中使用 EhCache 缓存系统

系统中采用配置较为简单的rmi组播方式作为配置

    <cacheManagerPeerProviderFactory
            class="net.sf.ehcache.distribution.RMICacheManagerPeerProviderFactory"
            properties="peerDiscovery=automatic, multicastGroupAddress=230.0.0.1,
    multicastGroupPort=4446, timeToLive=32"/>
复制代码

但是在生产环境中总是偶尔出现缓存同步延迟和失败的问题。对于组播来说ehcache的策略比较简单,同时多网卡或者复杂网络环境下的表现也并不好。分布式EHCACHE系统在缓存同步上存在着不小的缺陷

通常此种情况下会使用Redis作为分布式缓存是较好的选择。

但是分布式缓存随之带来的问题就是网络开销的加大。

有没有可以兼得的方案呢?

自然是有的O(∩_∩)O

两级缓存

对于应用同时使用本地缓存和分布式缓存。

  • 一级缓存
     我们称之为本地缓存,进程内缓存(L1)或者LocalCache。常用的方案仍然是采用Ehcache或者是Guava Cache。本次方案仍然采用Ehcache。
  • 二级缓存

我们称之为分布式缓存,集中式缓存(L2)或者RemoteCache。常用的方案仍然是采用Redis或者是MemCached。本次方案将采用Redis。

由于大量的缓存读取会导致 L2 的网络带宽成为整个系统的瓶颈,因此 L1 的目标是降低对 L2 的读取次数

1. 读取顺序  -> L1 -> L2 -> DB

2. 数据更新

    1 从数据库中读取最新数据,依次更新 L1_1 -> L2 ,广播清除某个缓存信息 
    2 接收到广播(手工清除缓存 & 一级缓存自动失效),从 L1_2 中清除指定的缓存信息

如上所述,在做数据更新的时候需要收到通知,基本方案可以使用jms方案(目前采用redis订阅消息)
复制代码

取数据流程

更新数据流程

数据过期监听

实现EhCache的notifyElementExpired方法,发送清除消息(存在一些问题)

目前使用开源中国红薯的作品J2Cache作为支撑。

目前系统中采用了spring缓存抽象(小提示一下,当cacheService内部调用方法时无法走到cache切面,即缓存失效 )

设置Spring cache的方法

    <cache:annotation-driven cache-manager="cacheManager"/>
    <!-- 声明cacheManager -->
    <bean id="cacheManager" class="org.nutz.j2cache.spring.SpringJ2CacheManager" depends-on="ehCacheManager"/>
    <bean id="cacheManagerFactory"
          class="org.springframework.cache.ehcache.EhCacheManagerFactoryBean"
          p:configLocation="classpath:ehcache.xml"
          p:cacheManagerName="${ehcache.ehcache.name}"
          p:shared="true" />
     
    <!-- 声明cacheManager -->
    <bean id="ehCacheManager" class="org.springframework.cache.ehcache.EhCacheCacheManager"
          p:cacheManager-ref="cacheManagerFactory" />
复制代码

默认来说开启了Spring cache注解将会走到此切面

    public class CacheInterceptor extends CacheAspectSupport implements MethodInterceptor, Serializable {
     
       private static class ThrowableWrapper extends RuntimeException {
          private final Throwable original;
     
          ThrowableWrapper(Throwable original) {
             this.original = original;
          }
       }
     
       public Object invoke(final MethodInvocation invocation) throws Throwable {
          Method method = invocation.getMethod();
     
          Invoker aopAllianceInvoker = new Invoker() {
             public Object invoke() {
                try {
                   return invocation.proceed();
                } catch (Throwable ex) {
                   throw new ThrowableWrapper(ex);
                }
             }
          };
     
          try {
             return execute(aopAllianceInvoker, invocation.getThis(), method, invocation.getArguments());
          } catch (ThrowableWrapper th) {
             throw th.original;
          }
       }
    }
复制代码

在业务中添加对应注解到实现上,比如

    @Override
    @Cacheable(value = "remind", key = "#idOwnOrg+'CustomerLoseRemind'")
    public int getCustomerLoseNumber(String idOwnOrg)
复制代码

该方法当外部调用时会优先Check缓存中是否有值,如果没有值则执行业务逻辑否则返回缓存中的值(假如此结果返回null会怎么样?/(ㄒoㄒ)/~~)

当业务中执行某个逻辑时会对缓存中的值造成影响,可以自动清除Cache,比如

    @CacheEvict(value = "remind", allEntries = true)
    @Override
    public void handlerKHSRRemind(String idCustomer, String idOwnOrg)
复制代码

当然此处不妥会造成缓存集体失效,其实可以改造缓存失效的key,如果多个通过数组即可。

下一步优化可以精细化失效缓存key。

关于使用两级缓存Redis存储的实现为Hash可以自定义nameSpace此处为j2cache_test

对应hash的name为缓存的名称。hash的field为对应缓存的key。为了便于区分key的类型value为对象的fst序列化结果Java快速序列化框架FST

    protected byte[] getKeyName(Object key) {
        if (key instanceof Number)
            return ("I:" + key).getBytes();
        else if (key instanceof String || key instanceof StringBuilder || key instanceof StringBuffer)
            return ("S:" + key).getBytes();
        return ("O:" + key).getBytes();
    }
复制代码

应用app会订阅redis的消息收到如下格式的结果

    qixiaobo@qixiaobo.mac.pro:/Users/qixiaobo> redis-cli -h 192.168.1.116
                                           17-03-23 13:07
复制代码
    192.168.1.116:6379> SUBSCRIBE j2cache_channel_test
    Reading messages... (press Ctrl-C to quit)
    1) "subscribe"
    2) "j2cache_channel_test"
    3) (integer) 1
    1) "message"
    2) "j2cache_channel_test"
    3) "\xd2\xe6F\x00\x01\x18\x00shiro-activeSessionCache&\x00\x00\x00\xfc$eaa302d1-ec27-434a-b0d1-0131a8095575"
    1) "message"
    2) "j2cache_channel_test"
    3) "\xd2\xe6F\x00\x01\x18\x00shiro-activeSessionCache&\x00\x00\x00\xfc$eaa302d1-ec27-434a-b0d1-0131a8095575"
    1) "message"
    2) "j2cache_channel_test"
    3) "\xd2\xe6F\x00\x01\x18\x00shiro-activeSessionCache&\x00\x00\x00\xfc$eaa302d1-ec27-434a-b0d1-0131a8095575"
    1) "message"
    2) "j2cache_channel_test"
    3) "\xd2\xe6F\x00\x01\x18\x00shiro-activeSessionCache&\x00\x00\x00\xfc$eaa302d1-ec27-434a-b0d1-0131a8095575"
    1) "message"
    2) "j2cache_channel_test"
    3) "\xd2\xe6F\x00\x01\x18\x00shiro-activeSessionCache&\x00\x00\x00\xfc$eaa302d1-ec27-434a-b0d1-0131a8095575"
    1) "message"
    2) "j2cache_channel_test"
    3) "\xd2\xe6F\x00\x01\x18\x00shiro-activeSessionCache&\x00\x00\x00\xfc$eaa302d1-ec27-434a-b0d1-0131a8095575"
复制代码

应用启动时开启线程订阅redis消息,并完成消息的处理

    /**
     * 初始化缓存通道并连接
     *
     * @param name : 缓存实例名称
     */
    private RedisCacheChannel(String name) throws CacheException {
        this.name = name;
        try {
            long ct = System.currentTimeMillis();
            CacheManager.initCacheProvider(this);
            redisCacheProxy = new RedisCacheProvider().getResource();
            thread_subscribe = new Thread(new Runnable() {
                @Override
                public void run() {
                    redisCacheProxy.subscribe(RedisCacheChannel.this, SafeEncoder.encode(channel));
                }
            });
     
            thread_subscribe.start();
     
            log.info("Connected to channel:" + this.name + ", time " + (System.currentTimeMillis() - ct) + " ms.");
     
        } catch (Exception e) {
            throw new CacheException(e);
        }
    }
复制代码

 类似资料: