memcache视频java_java实现memcache服务器的示例代码

黄和怡
2023-12-01

什么是Memcache?

Memcache集群环境下缓存解决方案

Memcache是一个高性能的分布式的内存对象缓存系统,通过在内存里维护一个统一的巨大的hash表,它能够用来存储各种格式的数据,包括图像、视频、文件以及数据库检索的结果等。简单的说就是将数据调用到内存中,然后从内存中读取,从而大大提高读取速度。

Memcache是danga的一个项目,最早是LiveJournal 服务的,最初为了加速 LiveJournal 访问速度而开发的,后来被很多大型的网站采用。

Memcached是以守护程序方式运行于一个或多个服务器中,随时会接收客户端的连接和操作

为什么会有Memcache和memcached两种名称?

其实Memcache是这个项目的名称,而memcached是它服务器端的主程序文件名,知道我的意思了吧。一个是项目名称,一个是主程序文件名,在网上看到了很多人不明白,于是混用了。

Memcached是高性能的,分布式的内存对象缓存系统,用于在动态应用中减少数据库负载,提升访问速度。Memcached由Danga Interactive开发,用于提升LiveJournal.com访问速度的。LJ每秒动态页面访问量几千次,用户700万。Memcached将数据库负载大幅度降低,更好的分配资源,更快速访问。

这篇文章将会涉及以下内容:

Java Socket多线程服务器

Java IO

Concurrency

Memcache特性和协议

Memcache

Memcache is an in-memory key-value store for small chunks of arbitrary data (strings, objects) from results of databasecalls, API calls, or page rendering.

即内存缓存数据库,是一个键值对数据库。该数据库的存在是为了将从其他服务中获取的数据暂存在内存中,在重复访问时可以直接从命中的缓存中返回。既加快了访问速率,也减少了其他服务的负载。这里将实现一个单服务器版本的Memcache,并且支持多个客户端的同时连接。

客户端将与服务器建立telnet连接,然后按照Memcache协议与服务器缓存进行交互。这里实现的指令为get,set和del。先来看一下各个指令的格式

set

set属于存储指令,存储指令的特点时,第一行输入基本信息,第二行输入其对应的value值。

set [noreply]\r\n

\r\n

如果存储成功,将会返回STORED,如果指令中包含noreply属性,则服务器将不会返回信息。

该指令中每个域的内容如下:

key: 键

flags: 16位无符号整数,会在get时随键值对返回

exptime: 过期时间,以秒为单位

bytes:即将发送的value的长度

noreply:是否需要服务器响应,为可选属性

如果指令不符合标准,服务器将会返回ERROR。

get

get属于获取指令,该指令特点如下:

get *\r\n

它支持传入多个key的值,如果缓存命中了一个或者多个key,则会返回相应的数据,并以END作为结尾。如果没有命中,则返回的消息中不包含该key对应的值。格式如下:

VALUE \r\n

\r\n

VALUE \r\n

\r\n

END

del

删除指令,该指令格式如下:

del [noreply]\r\n

如果删除成功,则返回DELETED\r\n,否则返回NOT_FOUND。如果有noreply参数,则服务器不会返回响应。

JAVA SOCKET

JAVA SOCKET需要了解的只是包括TCP协议,套接字,以及IO流。这里就不详细赘述,可以参考我的这系列文章,也建议去阅读JAVA Network Programming。一书。

代码实现

这里贴图功能出了点问题,可以去文末我的项目地址查看类图。

这里采用了指令模式和工厂模式实现指令的创建和执行的解耦。指令工厂将会接收commandLine并且返回一个Command实例。每一个Command都拥有execute方法用来执行各自独特的操作。这里只贴上del指令的特殊实现。

/**

* 各种指令

* 目前支持get,set,delete

*

* 以及自定义的

* error,end

*/

public interface Command {

/**

* 执行指令

* @param reader

* @param writer

*/

void execute(Reader reader, Writer writer);

/**

* 获取指令的类型

* @return

*/

CommandType getType();

}

/**

* 指令工厂 单一实例

*/

public class CommandFactory {

private static CommandFactory commandFactory;

private static Cache memcache;

private CommandFactory(){}

public static CommandFactory getInstance(Cache cache) {

if (commandFactory == null) {

commandFactory = new CommandFactory();

memcache = cache;

}

return commandFactory;

}

/**

* 根据指令的类型获取Command

* @param commandLine

* @return

*/

public Command getCommand(String commandLine){

if (commandLine.matches("^set .*$")){

return new SetCommand(commandLine, memcache);

}else if (commandLine.matches("^get .*$")){

return new GetCommand(commandLine, memcache);

}else if (commandLine.matches("^del .*$")){

return new DeleteCommand(commandLine, memcache);

}else if (commandLine.matches("^end$")){

return new EndCommand(commandLine);

}else{

return new ErrorCommand(commandLine, ErrorCommand.ErrorType.ERROR);

}

}

}

/**

* 删除缓存指令

*/

public class DeleteCommand implements Command{

private final String command;

private final Cache cache;

private String key;

private boolean noReply;

public DeleteCommand(final String command, final Cache cache){

this.command = command;

this.cache = cache;

initCommand();

}

private void initCommand(){

if (this.command.contains("noreply")){

noReply = true;

}

String[] info = command.split(" ");

key = info[1];

}

@Override

public void execute(Reader reader, Writer writer) {

BufferedWriter bfw = (BufferedWriter) writer;

Item item = cache.delete(key);

if (!noReply){

try {

if (item == null){

bfw.write("NOT_FOUND\r\n");

}else {

bfw.write("DELETED\r\n");

}

bfw.flush();

} catch (IOException e) {

try {

bfw.write("ERROR\r\n");

bfw.flush();

} catch (IOException e1) {

e1.printStackTrace();

}

e.printStackTrace();

}

}

}

@Override

public CommandType getType() {

return CommandType.SEARCH;

}

}

然后是实现内存服务器,为了支持先进先出功能,这里使用了LinkedTreeMap作为底层实现,并且重写了removeOldest方法。同时还使用CacheManager的后台线程及时清除过期的缓存条目。

public class Memcache implements Cache{

private Logger logger = Logger.getLogger(Memcache.class.getName());

//利用LinkedHashMap实现LRU

private static LinkedHashMap cache;

private final int maxSize;

//负载因子

private final float DEFAULT_LOAD_FACTOR = 0.75f;

public Memcache(final int maxSize){

this.maxSize = maxSize;

//确保cache不会在达到maxSize之后自动扩容

int capacity = (int) Math.ceil(maxSize / DEFAULT_LOAD_FACTOR) + 1;

this.cache = new LinkedHashMap(capacity, DEFAULT_LOAD_FACTOR, true){

@Override

protected boolean removeEldestEntry(Map.Entry eldest) {

if (size() > maxSize){

logger.info("缓存数量已经达到上限,会删除最近最少使用的条目");

}

return size() > maxSize;

}

};

//实现同步访问

Collections.synchronizedMap(cache);

}

public synchronized boolean isFull(){

return cache.size() >= maxSize;

}

@Override

public Item get(String key) {

Item item = cache.get(key);

if (item == null){

logger.info("缓存中key:" + key + "不存在");

return null;

}else if(item!=null && item.isExpired()){ //如果缓存过期则删除并返回null

logger.info("从缓存中读取key:" + key + " value:" + item.getValue() + "已经失效");

cache.remove(key);

return null;

}

logger.info("从缓存中读取key:" + key + " value:" + item.getValue() + " 剩余有效时间" + item.remainTime());

return item;

}

@Override

public void set(String key, Item value) {

logger.info("向缓存中写入key:" + key + " value:" + value);

cache.put(key, value);

}

@Override

public Item delete(String key) {

logger.info("从缓存中删除key:" + key);

return cache.remove(key);

}

@Override

public int size(){

return cache.size();

}

@Override

public int capacity() {

return maxSize;

}

@Override

public Iterator> iterator() {

return cache.entrySet().iterator();

}

}

/**

* 缓存管理器

* 后台线程

* 将cache中过期的缓存删除

*/

public class CacheManager implements Runnable {

private Logger logger = Logger.getLogger(CacheManager.class.getName());

//缓存

public Cache cache;

public CacheManager(Cache cache){

this.cache = cache;

}

@Override

public void run() {

while (true){

Iterator> itemIterator = cache.iterator();

while (itemIterator.hasNext()){

Map.Entry entry = itemIterator.next();

Item item = entry.getValue();

if(item.isExpired()){

logger.info("key:" + entry.getKey() + " value" + item.getValue() + " 已经过期,从数据库中删除");

itemIterator.remove();

}

}

try {

//每隔5秒钟再运行该后台程序

TimeUnit.SECONDS.sleep(5);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}

最后是实现一个多线程的Socket服务器,这里就是将ServerSocket绑定到一个接口,并且将accept到的Socket交给额外的线程处理。

/**

* 服务器

*/

public class IOServer implements Server {

private boolean stop;

//端口号

private final int port;

//服务器线程

private ServerSocket serverSocket;

private final Logger logger = Logger.getLogger(IOServer.class.getName());

//线程池,线程容量为maxConnection

private final ExecutorService executorService;

private final Cache cache;

public IOServer(int port, int maxConnection, Cache cache){

if (maxConnection<=0) throw new IllegalArgumentException("支持的最大连接数量必须为正整数");

this.port = port;

executorService = Executors.newFixedThreadPool(maxConnection);

this.cache = cache;

}

@Override

public void start() {

try {

serverSocket = new ServerSocket(port);

logger.info("服务器在端口"+port+"上启动");

while (true){

try {

Socket socket = serverSocket.accept();

logger.info("收到"+socket.getLocalAddress()+"的连接");

executorService.submit(new SocketHandler(socket, cache));

} catch (IOException e) {

e.printStackTrace();

}

}

} catch (IOException e) {

logger.log(Level.WARNING, "服务器即将关闭...");

e.printStackTrace();

} finally {

executorService.shutdown();

shutDown();

}

}

/**

* 服务器是否仍在运行

* @return

*/

public boolean isRunning() {

return !serverSocket.isClosed();

}

/**

* 停止服务器

*/

public void shutDown(){

try {

if (serverSocket!=null){

serverSocket.close();

}

} catch (IOException e) {

e.printStackTrace();

}

}

}

/**

* 处理各个客户端的连接

* 在获得end指令后关闭连接s

*/

public class SocketHandler implements Runnable{

private static Logger logger = Logger.getLogger(SocketHandler.class.getName());

private final Socket socket;

private final Cache cache;

private boolean finish;

public SocketHandler(Socket s, Cache cache){

this.socket = s;

this.cache = cache;

}

@Override

public void run() {

try {

//获取socket输入流

final BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));

//获取socket输出流

final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));

CommandFactory commandFactory = CommandFactory.getInstance(cache);

while (!finish){

final String commandLine = reader.readLine();

logger.info("ip:" + socket.getLocalAddress() + " 指令:" + commandLine);

if (commandLine == null || commandLine.trim().isEmpty()) {

continue;

}

//使用指令工厂获取指令实例

final Command command = commandFactory.getCommand(commandLine);

command.execute(reader, writer);

if (command.getType() == CommandType.END){

logger.info("请求关闭连接");

finish = true;

}

}

} catch (IOException e) {

e.printStackTrace();

logger.info("关闭来自" + socket.getLocalAddress() + "的连接");

} finally {

try {

if (socket != null){

socket.close();

}

} catch (IOException e) {

e.printStackTrace();

}

}

}

}

项目地址请戳这里,如果觉得还不错的话,希望能给个星哈><

参考资料

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

 类似资料: