什么是Memcache?
Memcache集群环境下缓存解决方案
Memcache是一个高性能的分布式的内存对象缓存系统,通过在内存里维护一个统一的巨大的hash表,它能够用来存储各种格式的数据,包括图像、视频、文件以及数据库检索的结果等。简单的说就是将数据调用到内存中,然后从内存中读取,从而大大提高读取速度。
Memcache是danga的一个项目,最早是LiveJournal 服务的,最初为了加速 LiveJournal 访问速度而开发的,后来被很多大型的网站采用。
Memcached是以守护程序方式运行于一个或多个服务器中,随时会接收客户端的连接和操作
为什么会有Memcache和memcached两种名称?
其实Memcache是这个项目的名称,而memcached是它服务器端的主程序文件名,知道我的意思了吧。一个是项目名称,一个是主程序文件名,在网上看到了很多人不明白,于是混用了。
Memcached是高性能的,分布式的内存对象缓存系统,用于在动态应用中减少数据库负载,提升访问速度。Memcached由Danga Interactive开发,用于提升LiveJournal.com访问速度的。LJ每秒动态页面访问量几千次,用户700万。Memcached将数据库负载大幅度降低,更好的分配资源,更快速访问。
这篇文章将会涉及以下内容:
Memcache
Memcache is an in-memory key-value store for small chunks of arbitrary data (strings, objects) from results of databasecalls, API calls, or page rendering.
即内存缓存数据库,是一个键值对数据库。该数据库的存在是为了将从其他服务中获取的数据暂存在内存中,在重复访问时可以直接从命中的缓存中返回。既加快了访问速率,也减少了其他服务的负载。这里将实现一个单服务器版本的Memcache,并且支持多个客户端的同时连接。
客户端将与服务器建立telnet连接,然后按照Memcache协议与服务器缓存进行交互。这里实现的指令为get,set和del。先来看一下各个指令的格式
set
set属于存储指令,存储指令的特点时,第一行输入基本信息,第二行输入其对应的value值。
set <key> <flags> <exptime> <bytes> [noreply]\r\n
<value>\r\n
如果存储成功,将会返回STORED,如果指令中包含noreply属性,则服务器将不会返回信息。
该指令中每个域的内容如下:
如果指令不符合标准,服务器将会返回ERROR。
get
get属于获取指令,该指令特点如下:
get <key>*\r\n
它支持传入多个key的值,如果缓存命中了一个或者多个key,则会返回相应的数据,并以END作为结尾。如果没有命中,则返回的消息中不包含该key对应的值。格式如下:
VALUE <key> <flags> <bytes>\r\n <data block>\r\n VALUE <key> <flags> <bytes>\r\n <data block>\r\n END del
删除指令,该指令格式如下:
del <key> [noreply]\r\n
如果删除成功,则返回DELETED\r\n,否则返回NOT_FOUND。如果有noreply参数,则服务器不会返回响应。
JAVA SOCKET
JAVA SOCKET需要了解的只是包括TCP协议,套接字,以及IO流。这里就不详细赘述,可以参考我的这系列文章,也建议去阅读JAVA Network Programming。一书。
代码实现
这里贴图功能出了点问题,可以去文末我的项目地址查看类图。
这里采用了指令模式和工厂模式实现指令的创建和执行的解耦。指令工厂将会接收commandLine并且返回一个Command实例。每一个Command都拥有execute方法用来执行各自独特的操作。这里只贴上del指令的特殊实现。
/** * 各种指令 * 目前支持get,set,delete * * 以及自定义的 * error,end */ public interface Command { /** * 执行指令 * @param reader * @param writer */ void execute(Reader reader, Writer writer); /** * 获取指令的类型 * @return */ CommandType getType(); }
/** * 指令工厂 单一实例 */ public class CommandFactory { private static CommandFactory commandFactory; private static Cache<Item> memcache; private CommandFactory(){} public static CommandFactory getInstance(Cache<Item> cache) { if (commandFactory == null) { commandFactory = new CommandFactory(); memcache = cache; } return commandFactory; } /** * 根据指令的类型获取Command * @param commandLine * @return */ public Command getCommand(String commandLine){ if (commandLine.matches("^set .*$")){ return new SetCommand(commandLine, memcache); }else if (commandLine.matches("^get .*$")){ return new GetCommand(commandLine, memcache); }else if (commandLine.matches("^del .*$")){ return new DeleteCommand(commandLine, memcache); }else if (commandLine.matches("^end$")){ return new EndCommand(commandLine); }else{ return new ErrorCommand(commandLine, ErrorCommand.ErrorType.ERROR); } } }
/** * 删除缓存指令 */ public class DeleteCommand implements Command{ private final String command; private final Cache<Item> cache; private String key; private boolean noReply; public DeleteCommand(final String command, final Cache<Item> cache){ this.command = command; this.cache = cache; initCommand(); } private void initCommand(){ if (this.command.contains("noreply")){ noReply = true; } String[] info = command.split(" "); key = info[1]; } @Override public void execute(Reader reader, Writer writer) { BufferedWriter bfw = (BufferedWriter) writer; Item item = cache.delete(key); if (!noReply){ try { if (item == null){ bfw.write("NOT_FOUND\r\n"); }else { bfw.write("DELETED\r\n"); } bfw.flush(); } catch (IOException e) { try { bfw.write("ERROR\r\n"); bfw.flush(); } catch (IOException e1) { e1.printStackTrace(); } e.printStackTrace(); } } } @Override public CommandType getType() { return CommandType.SEARCH; } }
然后是实现内存服务器,为了支持先进先出功能,这里使用了LinkedTreeMap作为底层实现,并且重写了removeOldest方法。同时还使用CacheManager的后台线程及时清除过期的缓存条目。
public class Memcache implements Cache<Item>{ private Logger logger = Logger.getLogger(Memcache.class.getName()); //利用LinkedHashMap实现LRU private static LinkedHashMap<String, Item> cache; private final int maxSize; //负载因子 private final float DEFAULT_LOAD_FACTOR = 0.75f; public Memcache(final int maxSize){ this.maxSize = maxSize; //确保cache不会在达到maxSize之后自动扩容 int capacity = (int) Math.ceil(maxSize / DEFAULT_LOAD_FACTOR) + 1; this.cache = new LinkedHashMap<String, Item>(capacity, DEFAULT_LOAD_FACTOR, true){ @Override protected boolean removeEldestEntry(Map.Entry<String,Item> eldest) { if (size() > maxSize){ logger.info("缓存数量已经达到上限,会删除最近最少使用的条目"); } return size() > maxSize; } }; //实现同步访问 Collections.synchronizedMap(cache); } public synchronized boolean isFull(){ return cache.size() >= maxSize; } @Override public Item get(String key) { Item item = cache.get(key); if (item == null){ logger.info("缓存中key:" + key + "不存在"); return null; }else if(item!=null && item.isExpired()){ //如果缓存过期则删除并返回null logger.info("从缓存中读取key:" + key + " value:" + item.getValue() + "已经失效"); cache.remove(key); return null; } logger.info("从缓存中读取key:" + key + " value:" + item.getValue() + " 剩余有效时间" + item.remainTime()); return item; } @Override public void set(String key, Item value) { logger.info("向缓存中写入key:" + key + " value:" + value); cache.put(key, value); } @Override public Item delete(String key) { logger.info("从缓存中删除key:" + key); return cache.remove(key); } @Override public int size(){ return cache.size(); } @Override public int capacity() { return maxSize; } @Override public Iterator<Map.Entry<String, Item>> iterator() { return cache.entrySet().iterator(); } }
/** * 缓存管理器 * 后台线程 * 将cache中过期的缓存删除 */ public class CacheManager implements Runnable { private Logger logger = Logger.getLogger(CacheManager.class.getName()); //缓存 public Cache<Item> cache; public CacheManager(Cache<Item> cache){ this.cache = cache; } @Override public void run() { while (true){ Iterator<Map.Entry<String, Item>> itemIterator = cache.iterator(); while (itemIterator.hasNext()){ Map.Entry<String, Item> entry = itemIterator.next(); Item item = entry.getValue(); if(item.isExpired()){ logger.info("key:" + entry.getKey() + " value" + item.getValue() + " 已经过期,从数据库中删除"); itemIterator.remove(); } } try { //每隔5秒钟再运行该后台程序 TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } } } }
最后是实现一个多线程的Socket服务器,这里就是将ServerSocket绑定到一个接口,并且将accept到的Socket交给额外的线程处理。
/** * 服务器 */ public class IOServer implements Server { private boolean stop; //端口号 private final int port; //服务器线程 private ServerSocket serverSocket; private final Logger logger = Logger.getLogger(IOServer.class.getName()); //线程池,线程容量为maxConnection private final ExecutorService executorService; private final Cache<Item> cache; public IOServer(int port, int maxConnection, Cache<Item> cache){ if (maxConnection<=0) throw new IllegalArgumentException("支持的最大连接数量必须为正整数"); this.port = port; executorService = Executors.newFixedThreadPool(maxConnection); this.cache = cache; } @Override public void start() { try { serverSocket = new ServerSocket(port); logger.info("服务器在端口"+port+"上启动"); while (true){ try { Socket socket = serverSocket.accept(); logger.info("收到"+socket.getLocalAddress()+"的连接"); executorService.submit(new SocketHandler(socket, cache)); } catch (IOException e) { e.printStackTrace(); } } } catch (IOException e) { logger.log(Level.WARNING, "服务器即将关闭..."); e.printStackTrace(); } finally { executorService.shutdown(); shutDown(); } } /** * 服务器是否仍在运行 * @return */ public boolean isRunning() { return !serverSocket.isClosed(); } /** * 停止服务器 */ public void shutDown(){ try { if (serverSocket!=null){ serverSocket.close(); } } catch (IOException e) { e.printStackTrace(); } } }
/** * 处理各个客户端的连接 * 在获得end指令后关闭连接s */ public class SocketHandler implements Runnable{ private static Logger logger = Logger.getLogger(SocketHandler.class.getName()); private final Socket socket; private final Cache<Item> cache; private boolean finish; public SocketHandler(Socket s, Cache<Item> cache){ this.socket = s; this.cache = cache; } @Override public void run() { try { //获取socket输入流 final BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); //获取socket输出流 final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())); CommandFactory commandFactory = CommandFactory.getInstance(cache); while (!finish){ final String commandLine = reader.readLine(); logger.info("ip:" + socket.getLocalAddress() + " 指令:" + commandLine); if (commandLine == null || commandLine.trim().isEmpty()) { continue; } //使用指令工厂获取指令实例 final Command command = commandFactory.getCommand(commandLine); command.execute(reader, writer); if (command.getType() == CommandType.END){ logger.info("请求关闭连接"); finish = true; } } } catch (IOException e) { e.printStackTrace(); logger.info("关闭来自" + socket.getLocalAddress() + "的连接"); } finally { try { if (socket != null){ socket.close(); } } catch (IOException e) { e.printStackTrace(); } } } }
项目地址请戳这里,如果觉得还不错的话,希望能给个星哈><
参考资料
memcached官网
memcache协议
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持小牛知识库。
本文向大家介绍go语言实现聊天服务器的示例代码,包括了go语言实现聊天服务器的示例代码的使用技巧和注意事项,需要的朋友参考一下 看了两天 go 语言,是时候练练手了。 go 的 routine(例程) 和 chan(通道) 简直是神器,实现多线程(在 go 里准确的来说是 多例程)简直不要太轻松。 于是动手码了一个傻瓜版的黑框聊天器。 server 端: 监听 TCP 连接;支持自定义客户端命令;
本文向大家介绍Java 实现 web服务器的简单实例,包括了Java 实现 web服务器的简单实例的使用技巧和注意事项,需要的朋友参考一下 Java 实现 web服务器的简单实例 实例代码: 感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!
本文向大家介绍Spring Boot 实现Restful webservice服务端示例代码,包括了Spring Boot 实现Restful webservice服务端示例代码的使用技巧和注意事项,需要的朋友参考一下 1.Spring Boot configurations 2.Spring Boot Application 3.Rest Controller 4.Aspect 5.Contro
本文向大家介绍django-crontab实现服务端的定时任务的示例代码,包括了django-crontab实现服务端的定时任务的示例代码的使用技巧和注意事项,需要的朋友参考一下 安装 在Django项目中使用 settings.py 配置任务 官方demo 上述demo参数说明 参数1:定时 例如47 11 * * * 表示每天的11时47分执行 参数2:方法的python模块路径,如果执行d
本文向大家介绍Springcloud实现服务多版本控制的示例代码,包括了Springcloud实现服务多版本控制的示例代码的使用技巧和注意事项,需要的朋友参考一下 需求 小程序新版本上线需要审核,如果有接口新版本返回内容发生了变化,后端直接上线会导致旧版本报错,不上线审核又通不过。 之前是通过写新接口来兼容,但是这样会有很多兼容代码或者冗余代码,开发也不容易能想到这一点,经常直接修改了旧接口,于是
本文向大家介绍实战node静态文件服务器的示例代码,包括了实战node静态文件服务器的示例代码的使用技巧和注意事项,需要的朋友参考一下 本篇文章主要介绍了实战node静态文件服务器的示例,分享给大家,具体如下: 支持功能: 读取静态文件 访问目录可以自动寻找下面的index.html文件, 如果没有index.html则列出文件列表 MIME类型支持 缓存支持/控制 支持gzip压缩 Range支