raft-java
Raft implementation library for Java.
参考自Raft论文和Raft作者的开源实现LogCabin。
项目Fork自wenweihu86/raft-java,修正了原项目com.baidu.brpc.exceptions.RpcException: serviceInterface must not be set repeatedly, please use another RpcClient异常问题,根据自己的理解增加了基于节点优先级的 Leader 选举方案和并发写入方案。
支持的功能
leader选举
基于节点优先级的 Leader 选举
并发写入
日志复制
snapshot
集群成员动态更变
脚本启动
在本地单机上部署一套3实例的raft集群,执行如下脚本:
cd raft-java-example && sh deploy.sh
该脚本会在raft-java-example/env目录部署三个实例example1、example2、example3;
同时会创建一个client目录,用于测试raft集群读写功能。
部署成功后,测试写操作,通过如下脚本:
cd env/client
./bin/run_client.sh "list://127.0.0.1:8051,127.0.0.1:8052,127.0.0.1:8053" hello world
测试读操作命令:
./bin/run_client.sh "list://127.0.0.1:8051,127.0.0.1:8052,127.0.0.1:8053" hello
IDE 启动
启动三个节点
top.aprilyolies.raft.example.server.ServerMain
启动参数
./data1 127.0.0.1:8051:1,127.0.0.1:8052:2,127.0.0.1:8053:3 127.0.0.1:8051:1
./data2 127.0.0.1:8051:1,127.0.0.1:8052:2,127.0.0.1:8053:3 127.0.0.1:8052:2
./data3 127.0.0.1:8051:1,127.0.0.1:8052:2,127.0.0.1:8053:3 127.0.0.1:8053:3
单线程客户端写入数据
top.aprilyolies.raft.example.client.ClientMain
启动参数
list://127.0.0.1:8051,127.0.0.1:8052,127.0.0.1:8053 hello world
单线程客户端读取数据
top.aprilyolies.raft.example.client.ClientMain
启动参数
list://127.0.0.1:8051,127.0.0.1:8052,127.0.0.1:8053 hello
多线程客户端读写数据
top.aprilyolies.raft.example.client.ConcurrentClientMain
启动参数
list://127.0.0.1:8051,127.0.0.1:8052,127.0.0.1:8053
使用方法
下面介绍如何在代码中使用raft-java依赖库来实现一套分布式存储系统。
配置依赖
com.github.wenweihu86.raft
raft-java-core
1.8.0
定义数据写入和读取接口
message SetRequest {
string key = 1;
string value = 2;
}
message SetResponse {
bool success = 1;
}
message GetRequest {
string key = 1;
}
message GetResponse {
string value = 1;
}
public interface ExampleService {
Example.SetResponse set(Example.SetRequest request);
Example.GetResponse get(Example.GetRequest request);
}
服务端使用方法
实现状态机StateMachine接口实现类
// 该接口三个方法主要是给Raft内部调用
public interface StateMachine {
/**
* 对状态机中数据进行snapshot,每个节点本地定时调用
* @param snapshotDir snapshot数据输出目录
*/
void writeSnapshot(String snapshotDir);
/**
* 读取snapshot到状态机,节点启动时调用
* @param snapshotDir snapshot数据目录
*/
void readSnapshot(String snapshotDir);
/**
* 将数据应用到状态机
* @param dataBytes 数据二进制
*/
void apply(byte[] dataBytes);
}
实现数据写入和读取接口
// ExampleService实现类中需要包含以下成员
private RaftNode raftNode;
private ExampleStateMachine stateMachine;
// 数据写入主要逻辑
byte[] data = request.toByteArray();
// 数据同步写入raft集群
boolean success = raftNode.replicate(data, Raft.EntryType.ENTRY_TYPE_DATA);
Example.SetResponse response = Example.SetResponse.newBuilder().setSuccess(success).build();
// 数据读取主要逻辑,由具体应用状态机实现
Example.GetResponse response = stateMachine.get(request);
服务端启动逻辑
// 初始化RPCServer
RPCServer server = new RPCServer(localServer.getEndPoint().getPort());
// 应用状态机
ExampleStateMachine stateMachine = new ExampleStateMachine();
// 设置Raft选项,比如:
RaftOptions.snapshotMinLogSize = 10 * 1024;
RaftOptions.snapshotPeriodSeconds = 30;
RaftOptions.maxSegmentFileSize = 1024 * 1024;
// 初始化RaftNode
RaftNode raftNode = new RaftNode(serverList, localServer, stateMachine);
// 注册Raft节点之间相互调用的服务
RaftConsensusService raftConsensusService = new RaftConsensusServiceImpl(raftNode);
server.registerService(raftConsensusService);
// 注册给Client调用的Raft服务
RaftClientService raftClientService = new RaftClientServiceImpl(raftNode);
server.registerService(raftClientService);
// 注册应用自己提供的服务
ExampleService exampleService = new ExampleServiceImpl(raftNode, stateMachine);
server.registerService(exampleService);
// 启动RPCServer,初始化Raft节点
server.start();
raftNode.init();