Java手写Raft篇:核心流程与并发模型优化
前言
这是一篇基于Java实现raft分布式KV存储系统的博客。本项目基于 https://github.com/stateIs0/lu-raft-kv 该开源项目进行的二次开发。主要按照Raft协议的生命周期(选主、日志复制、安全性、一致性读)进行了改造。本篇博客记录了我的改造思路,以及一些值得注意的地方。Raft 算法作为分布式一致性协议的标准解法,以其清晰的模块化设计著称。本文将基于我如何改造原开源项目,深入剖析其中的核心机制。从选主逻辑、日志复制的细节,到解决“幽灵复现”问题的 No-Op 日志,再到利用 CompletableFuture 对并发模型的重构,以此记录构建高一致性分布式存储系统的思考过程。
一、 选举机制 (Leader Election)
选主是集群启动或 Leader 宕机后的首要任务。在此阶段,不仅要保证票数过半,更要严格校验节点资格。
1. 状态流转与拉票
当 Follower 的心跳倒计时结束(在一定范围内随机时间,我选择的是跟最大选举时间到它的两倍这个范围,避免同时存在多个候选人导致效率下降)仍未收到 Leader 消息时,将触发选举流程:
- 自增任期:将
term + 1,并将状态流转为 Candidate。 - 给自己投票:先投自己一票,避免 split vote。
- 广播请求:向所有节点发起拉票请求,请求中必须携带
lastLogTerm 和 lastLogIndex,以便其他节点校验“日志是否足够新”。
2. 投票方逻辑与并发控制
节点收到投票请求时的处理逻辑非常关键:
- Term 更新:如果请求中的 Term 严格大于自身,需将之前的投票记录(votedFor)置空,防止因持有旧锁导致无法投票,这样也尽可能的保证在一个投票间隔中能选出Leader。
- 加锁防重:整个投票判断过程需要上锁,避免高并发下的重复投票。
- 资格校验:只有当请求者的 Term >= 自身 Term,且日志比自己新(Log Completeness),且自己在本任期未投过票时,才予以投票。
- 收尾:更新本地 Term 和投票信息,释放锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| public RvoteResult requestVote(RvoteParam param) { try { RvoteResult.Builder builder = RvoteResult.newBuilder(); if (!voteLock.tryLock()) { return builder.term(node.getCurrentTerm()).voteGranted(false).build(); }
if (param.getTerm() < node.getCurrentTerm()) { return builder.term(node.getCurrentTerm()).voteGranted(false).build(); }
if (param.getTerm() > node.getCurrentTerm()) { node.setCurrentTerm(param.getTerm()); node.setVotedFor(null); node.status = NodeStatus.FOLLOWER; }
LOGGER.info("node {} current vote for [{}], param candidateId : {}", node.peerSet.getSelf(), node.getVotedFor(), param.getCandidateId());
if ((StringUtil.isNullOrEmpty(node.getVotedFor()) || node.getVotedFor().equals(param.getCandidateId()))) {
if (node.getLogModule().getLast() != null) { if (node.getLogModule().getLast().getTerm() > param.getLastLogTerm()) { log.info("节点{} 拒绝给 候选人 {} 投票,因为日志的term值更大",node.peerSet.getSelf(),param.getCandidateId()); return RvoteResult.fail(); } if (node.getLogModule().getLastIndex() > param.getLastLogIndex()) { log.info("节点{} 拒绝给 候选人 {} 投票,因为日志的Index更大",node.peerSet.getSelf(),param.getCandidateId()); return RvoteResult.fail(); } }
node.status = NodeStatus.FOLLOWER; node.peerSet.setLeader(new Peer(param.getCandidateId())); node.setCurrentTerm(param.getTerm()); node.setVotedFor(param.getCandidateId()); log.warn("节点 {} 在第 {} 轮中 , 投给了 {} 节点",node.peerSet.getSelf(),node.currentTerm,node.getVotedFor()); return builder.term(node.currentTerm).voteGranted(true).build(); }
return builder.term(node.currentTerm).voteGranted(false).build();
} finally { voteLock.unlock(); } }
|
3. 异步并发模型的优化
在实现拉票请求时,我对并发模型进行了重构:
- 旧方案:
线程池 + Future + CountDownLatch。Future.get() 会阻塞调用线程,且 CountDownLatch 代码冗余。 - 新方案:
CompletableFuture + 线程池。- 使用
List<CompletableFuture<>> 保存所有 RPC 请求。 - 利用
CompletableFuture.allOf(...) 实现多线程的非阻塞同步。 - 配合
AtomicInteger 记录获胜票数,线程安全且代码更优雅。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
| if (status == NodeStatus.LEADER) { return; }
long current = System.currentTimeMillis();
long myElectionTime = electionTime + ThreadLocalRandom.current().nextInt(10000); if (current - preElectionTime < myElectionTime) { return; } status = NodeStatus.CANDIDATE; log.error("node {} will become CANDIDATE and start election leader, current term : [{}], LastEntry : [{}]", peerSet.getSelf(), currentTerm, logModule.getLast());
preElectionTime = System.currentTimeMillis() + ThreadLocalRandom.current().nextInt(200) + 150;
currentTerm = currentTerm + 1;
votedFor = peerSet.getSelf().getAddr();
List<Peer> peers = peerSet.getPeersWithOutSelf();
ArrayList<CompletableFuture<RvoteResult>> futureList = new ArrayList<>();
AtomicInteger NumberOfVotesInFavor = new AtomicInteger(0); log.info("peerList size : {}, peer list content : {}", peers.size(), peers);
for (Peer peer : peers) {
futureList.add(CompletableFuture.supplyAsync(()->{
long lastTerm = 0L; LogEntry last = logModule.getLast(); if (last != null) { lastTerm = last.getTerm(); }
RvoteParam param = RvoteParam.builder(). term(currentTerm). candidateId(peerSet.getSelf().getAddr()). lastLogIndex(LongConvert.convert(logModule.getLastIndex())). lastLogTerm(lastTerm). build();
Request request = Request.builder() .cmd(Request.R_VOTE) .obj(param) .url(peer.getAddr()) .build();
try { return getRpcClient().<RvoteResult>send(request,500); } catch (RaftRemotingException e) { log.error("ElectionTask RPC Fail , URL : " + request.getUrl()); return null; } },RaftThreadPool.giveMePool()) .whenComplete((result,throwable) ->{ if(result == null){ return; } boolean isVoteGranted = result.isVoteGranted(); if (isVoteGranted) { NumberOfVotesInFavor.incrementAndGet(); } else { long resTerm = result.getTerm(); if (resTerm >= currentTerm) { currentTerm = resTerm; } } })); }
try { CompletableFuture[] futureArray = futureList.toArray(new CompletableFuture[0]); CompletableFuture.allOf(futureArray).get(2, TimeUnit.SECONDS); } catch (Exception e) { log.error("CompletableFuture.allOf Exception error.", e); }
int success = NumberOfVotesInFavor.get(); log.warn("node {} maybe become leader , success count = {} , status : {}", peerSet.getSelf(), success, NodeStatus.Enum.value(status));
if (status == NodeStatus.FOLLOWER) { log.warn("节点 {} 目前不是候选人 ,无法成为领导者", peerSet.getSelf().getAddr()); return; }
if (success >= (peers.size()+1) / 2) { log.warn("node {} become leader 他的选票有 {} 票", peerSet.getSelf(),success+1); status = NodeStatus.LEADER; peerSet.setLeader(peerSet.getSelf()); votedFor = ""; becomeLeaderToDoThing(); } else { votedFor = ""; log.warn("节点 {} 在 {} 轮落选,他获得的选票有 {} 票",peerSet.getSelf(),currentTerm,success+1); }
preElectionTime = System.currentTimeMillis() + ThreadLocalRandom.current().nextInt(200) + 150;
|
二、 日志复制与心跳 (Log Replication & Heartbeat)
Leader 确立后,进入正常的日志同步阶段。
1. 心跳与日志同步的合并
为了简化逻辑,我将“同步日志”与“发送心跳”合并处理:
- Leader 周期性遍历所有 Follower。
- 判断同步需求:检查
Leader.lastIndex >= Follower.nextIndex。- 如果满足,说明有新日志,发送从
nextIndex 开始的日志条目。 - 如果不满足,说明日志已经同步,仅发送空的心跳包保活(根据心跳结果,判断自己是否还能继续当leader)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
| class HeartBeatTask implements Runnable {
@Override public void run() {
if (status != NodeStatus.LEADER) { return; }
long current = System.currentTimeMillis(); if (current - preHeartBeatTime < heartBeatTick) { return; } log.info("=========== NextIndex ============="); for (Peer peer : peerSet.getPeersWithOutSelf()) { log.info("Peer {} nextIndex={}", peer.getAddr(), nextIndexs.get(peer)); }
preHeartBeatTime = System.currentTimeMillis();
List<CompletableFuture<Boolean>> futureList = new ArrayList<>(); List<Boolean> resultList = Collections.synchronizedList(new ArrayList<>());
for (Peer peer : peerSet.getPeersWithOutSelf()) { Long nextIndex = nextIndexs.get(peer); if(logModule.getLastIndex() >= nextIndex){ CompletableFuture<Boolean> future = CompletableFuture.supplyAsync( () -> replication(peer), RaftThreadPool.giveMePool() ).exceptionally( ex ->{ log.error("{} 节点发送日志失败", peer.getAddr()); return false;} ).whenComplete((result, throwable) -> { if (result != null) { resultList.add(result); } if (throwable != null) { log.error("{} 节点发送日志失败", peer.getAddr()); } }); futureList.add(future); }else { AentryParam param = AentryParam.builder() .entries(null) .leaderId(peerSet.getSelf().getAddr()) .serverId(peer.getAddr()) .term(currentTerm) .leaderCommit(commitIndex) .build();
Request request = new Request( Request.A_ENTRIES, param, peer.getAddr());
CompletableFuture.supplyAsync(()-> getRpcClient().<AentryResult>send(request,450),RaftThreadPool.giveMePool()) .exceptionally( ex -> null).whenComplete((result, throwable)->{ if(result == null){ log.error("无法收到{} 节点的心跳回应,该节点可能已经宕机",peer.getAddr()); return; } long term = result.getTerm(); if (term > currentTerm) { log.error("self will become follower, he's term : {}, my term : {}", term, currentTerm); currentTerm = term; votedFor = ""; status = NodeStatus.FOLLOWER; } }); } } try { CompletableFuture[] futureArray = futureList.toArray(new CompletableFuture[0]); CompletableFuture.allOf(futureArray).get(2, TimeUnit.SECONDS); } catch (Exception e) { log.error("CompletableFuture.allOf Exception error.", e); }
List<Long> matchIndexList = new ArrayList<>(matchIndexs.values()); int median = 0; if (matchIndexList.size() >= 2) { Collections.sort(matchIndexList); median = matchIndexList.size() / 2; } Long N = matchIndexList.get(median); if (N > commitIndex) { LogEntry entry = logModule.read(N); if (entry != null && entry.getTerm() == currentTerm) { commitIndex = N; RaftThreadPool.execute(()->{ commitLock.lock(); try { long myCommitIndex = commitIndex; for(long i = lastApplied + 1; i <= myCommitIndex; i++){ getStateMachine().apply(logModule.read(i)); log.info("异步应用日志成功, logEntry info : {}", logModule.read(i)); } lastApplied = myCommitIndex; } catch (Exception e) { log.error("leader 节点 {} 日志应用到状态机失败",peerSet.getSelf()); } finally { commitLock.unlock(); } }); } } } }
|
2. 批量发送与回溯
- 批量优化:发送日志时,会把从
nextIndex 到 Leader lastIndex 的所有日志打包发送。Follower 匹配成功后可批量写入,极大提升同步吞吐。1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| AentryParam aentryParam = AentryParam.builder().build(); aentryParam.setTerm(currentTerm); aentryParam.setServerId(peer.getAddr()); aentryParam.setLeaderId(peerSet.getSelf().getAddr());
aentryParam.setLeaderCommit(commitIndex);
Long nextIndex = nextIndexs.get(peer); Long myNewIndex = logModule.getLastIndex(); LinkedList<LogEntry> logEntries = new LinkedList<>(); if (myNewIndex >= nextIndex) { for (long i = nextIndex; i <= myNewIndex; i++) { LogEntry l = logModule.read(i); if (l != null) { logEntries.add(l); } } }
LogEntry preLog = getPreLog(logEntries.getFirst()); aentryParam.setPreLogTerm(preLog.getTerm()); aentryParam.setPrevLogIndex(preLog.getIndex());
aentryParam.setEntries(logEntries.toArray(new LogEntry[0]));
Request request = Request.builder() .cmd(Request.A_ENTRIES) .obj(aentryParam) .url(peer.getAddr()) .build();
|
getPreLog() 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13
| private LogEntry getPreLog(LogEntry logEntry) {
LogEntry entry;
if(logEntry.getIndex() == 0){ log.warn("已经匹配到底了,需要从头开始覆盖"); entry = LogEntry.builder().index(-1L).term(-1).command(null).build(); }else { entry = logModule.read(logEntry.getIndex() - 1); }
return entry; }
|
- 失败回溯:发送请求会携带
prevLogTerm 和 prevLogIndex。如果 Follower 返回失败(日志不匹配或 Leader Term 过旧):- 如果是Leader Term 过旧,那就降级为Follower。
- 如果不是因为Term过旧,Leader 就将该 Follower 的
nextIndex 减一。 - 下次心跳时重试前一条日志,直到找到匹配点(MatchIndex)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| try { AentryResult result = getRpcClient().send(request,450); if (result == null) { return false; } if (result.isSuccess()) { log.info("append follower entry success , follower=[{}], entry=[{}]", peer, aentryParam.getEntries()); nextIndexs.put(peer, myNewIndex+1); matchIndexs.put(peer, myNewIndex); return true; } else if (!result.isSuccess()) { if (result.getTerm() > currentTerm) { log.warn("follower [{}] term [{}] than more self, and my term = [{}], so, I will become follower", peer, result.getTerm(), currentTerm); currentTerm = result.getTerm(); status = NodeStatus.FOLLOWER; } else { nextIndexs.put(peer, nextIndex - 1); log.warn("follower {} nextIndex not match, will reduce nextIndex and retry RPC append, nextIndex : [{}]", peer.getAddr(), nextIndex); } return false; } } catch (Exception e) { log.warn("当前接受节点 {} 已宕机", peer.getAddr()); return false; } return false;
|
3. Follower 的处理逻辑
Follower 在接收端的处理也做了细致的锁管理:
- 日志写入:收到日志后严格校验匹配性。匹配成功则加锁写入本地日志,之后根据leader 的
commitIndex跟自己的 lastIndex 更新自己 commitIndex,异步应用到状态机,并返回成功。 - 心跳处理:收到心跳时,若 Leader Term 小于自己,则拒绝并告知对方退位。若 Term 合法,重置选举超时,转为 Follower(因为自己之前可能是候选人,或者分区之后又合区的老leader)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
| public AentryResult appendEntries(AentryParam param) { AentryResult result = AentryResult.fail(); try { if (!appendLock.tryLock()) { return result; }
result.setTerm(node.getCurrentTerm()); if (param.getTerm() < node.getCurrentTerm()) { return result; }
node.preHeartBeatTime = System.currentTimeMillis(); node.preElectionTime = System.currentTimeMillis(); node.peerSet.setLeader(new Peer(param.getLeaderId())); if (param.getTerm() >= node.getCurrentTerm()) { LOGGER.debug("node {} become FOLLOWER, currentTerm : {}, param Term : {}, param serverId = {}", node.peerSet.getSelf(), node.currentTerm, param.getTerm(), param.getServerId()); node.status = NodeStatus.FOLLOWER; } node.setCurrentTerm(param.getTerm());
if (param.getEntries() == null || param.getEntries().length == 0) { LOGGER.info("node {} append heartbeat success , he's term : {}, my term : {}", param.getLeaderId(), param.getTerm(), node.getCurrentTerm());
if (param.getLeaderCommit() > node.getCommitIndex()) { long newCommitIndex = Math.min(param.getLeaderCommit(), node.getLogModule().getLastIndex()); node.setCommitIndex(newCommitIndex); submitToStateMachineAsync(); } return AentryResult.newBuilder().term(node.getCurrentTerm()).success(true).build(); }
if (param.getPrevLogIndex() != -1) { LogEntry logEntry = node.getLogModule().read(param.getPrevLogIndex()); if (logEntry == null || logEntry.getTerm() != param.getPreLogTerm()) { return result; } }
long nextLogIndex = param.getPrevLogIndex() + 1; LogEntry existLog = node.getLogModule().read(nextLogIndex); if (existLog != null) { if (existLog.getTerm() != param.getEntries()[0].getTerm()) { node.getLogModule().removeOnStartIndex(nextLogIndex); } else { result.setSuccess(true); result.setTerm(node.getCurrentTerm()); return result; } }
for (LogEntry entry : param.getEntries()) { node.getLogModule().write(entry); } result.setSuccess(true); result.setTerm(node.getCurrentTerm());
if (param.getLeaderCommit() > node.getCommitIndex()) { long newCommitIndex = Math.min(param.getLeaderCommit(), node.getLogModule().getLastIndex()); node.setCommitIndex(newCommitIndex); submitToStateMachineAsync(); }
node.status = NodeStatus.FOLLOWER; return result;
} finally { if (appendLock.isHeldByCurrentThread()) { appendLock.unlock(); } } }
|
- 异步应用 (Async Apply):
- 心跳包中包含 Leader 的
commitIndex。 - Follower 取
min(leaderCommit, lastLogIndex) 作为本地提交点。 - 优化点:写入本地日志后,或者更新 commitIndex 后,采用异步方式将日志应用(Apply)到状态机,避免阻塞 IO 线程,提升效率。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| private void submitToStateMachineAsync() { commitExecutor.submit(() -> { commitLock.lock(); try { long nextCommit = node.getLastApplied() + 1; long commitIndex = node.getCommitIndex();
if (nextCommit > commitIndex) { LOGGER.debug("暂无需要提交的日志,lastApplied={}, commitIndex={}", nextCommit-1, commitIndex); return; }
LOGGER.info("开始异步提交日志,从索引{}到{}", nextCommit, commitIndex); while (nextCommit <= commitIndex) { LogEntry logEntry = node.getLogModule().read(nextCommit); if (logEntry != null) { try { node.stateMachine.apply(logEntry); node.setLastApplied(nextCommit); LOGGER.info("异步提交日志成功,索引={}", nextCommit); } catch (Exception e) { LOGGER.error("异步提交日志到状态机失败,索引={}", nextCommit, e); break; } } else { LOGGER.warn("日志不存在,索引={}", nextCommit); break; } nextCommit++; } } finally { commitLock.unlock(); } }); }
|
三、 CommitIndex 的计算与安全性
如何确定一条日志可以被安全提交?
1. 基于中位数的 CommitIndex 计算
不再简单依赖单次 RPC 的成功率,而是基于全局视图:
- 在每次心跳/日志发送完成(
CompletableFuture.allOf 结束)后,统计所有节点的 matchIndex。 - 求中位数:通过排序或统计算法,找出超过半数节点都拥有的最大索引值。
- 任期检查:Raft 规定只能提交当前任期的日志。因此,计算出的索引对应的日志 term 必须等于
currentTerm 才能更新 commitIndex。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| try { CompletableFuture[] futureArray = futureList.toArray(new CompletableFuture[0]); CompletableFuture.allOf(futureArray).get(2, TimeUnit.SECONDS); } catch (Exception e) { log.error("CompletableFuture.allOf Exception error.", e); }
List<Long> matchIndexList = new ArrayList<>(matchIndexs.values());
int median = 0; if (matchIndexList.size() >= 2) { Collections.sort(matchIndexList); median = matchIndexList.size() / 2; } Long N = matchIndexList.get(median); if (N > commitIndex) { LogEntry entry = logModule.read(N); if (entry != null && entry.getTerm() == currentTerm) { commitIndex = N; RaftThreadPool.execute(()->{ commitLock.lock(); try { long myCommitIndex = commitIndex; for(long i = lastApplied + 1; i <= myCommitIndex; i++){ getStateMachine().apply(logModule.read(i)); log.info("异步应用日志成功, logEntry info : {}", logModule.read(i)); } lastApplied = myCommitIndex; } catch (Exception e) { log.error("leader 节点 {} 日志应用到状态机失败",peerSet.getSelf()); } finally { commitLock.unlock(); } }); } }
|
2. 解决“幽灵复现”:No-Op Log
针对“只能提交当前任期日志”导致旧日志可能无法及时提交的问题(如果允许提交可能会导致同一个位置的日志重复提交),采用了插入空日志策略:
- 新 Leader 上任后,立即append一条空日志(No-Op Log)。
- 一旦这条当前任期的空日志被提交,根据日志连续性原则,之前所有未提交的旧日志也会被间接提交。
- 初始化:新 Leader 将所有 Follower 的
nextIndex 初始化为 lastIndex + 1, matchIndex 初始化为 0。利用后续的心跳回溯机制自动对其。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| private void becomeLeaderToDoThing() { nextIndexs = new ConcurrentHashMap<>(); matchIndexs = new ConcurrentHashMap<>(); for (Peer peer : peerSet.getPeersWithOutSelf()) { nextIndexs.put(peer, logModule.getLastIndex() + 1); matchIndexs.put(peer, 0L); }
LogEntry logEntry = LogEntry.builder() .command(null) .term(currentTerm) .build();
logModule.write(logEntry); log.info("write logModule success, logEntry info : {}, log index : {}", logEntry, logEntry.getIndex()); }
|
四、 线性一致性读 (Linearizable Read)
为了防止读取到旧数据(Stale Read),实现了 Follower Read 和 Leader Read。
- Leader Read:Leader 收到读请求后,不能直接返回。需先广播一轮心跳(同样使用
CompletableFuture 机制),确认自己仍持有过半数选票(未发生脑裂)。确认成功后,等待状态机应用到 commitIndex 后返回结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| private boolean isStillLeader(){ AtomicInteger Agree = new AtomicInteger(0); List<CompletableFuture<AentryResult>> futureList = new ArrayList<>();
for (Peer peer : peerSet.getPeersWithOutSelf()) { AentryParam param = AentryParam.builder() .entries(null) .leaderId(peerSet.getSelf().getAddr()) .serverId(peer.getAddr()) .term(currentTerm) .leaderCommit(commitIndex) .build();
Request request = new Request( Request.A_ENTRIES, param, peer.getAddr()); futureList.add(CompletableFuture.supplyAsync(()-> getRpcClient().<AentryResult>send(request,300) ,RaftThreadPool.giveMePool()).exceptionally(ex -> null) .whenComplete((result,throwable)->{ if(result == null){ log.error("无法收到 {} 节点的心跳回应,该节点可能已经宕机",peer.getAddr()); }else { long term = result.getTerm(); if(term > currentTerm){ log.error("收到更大的term来自节点{} 当前节点{}目前已不适合当leader",peer.getAddr(),peerSet.getSelf().getAddr()); currentTerm = term; votedFor = ""; status = NodeStatus.FOLLOWER; }else{ if(result.isSuccess()){ log.info("成功收到来自节点{}的心跳回复且合法",peer.getAddr()); Agree.incrementAndGet(); } } } })); }
try { CompletableFuture[] futureArray = futureList.toArray(new CompletableFuture[0]); CompletableFuture.allOf(futureArray).get(2, TimeUnit.SECONDS); } catch (Exception e) { log.error("CompletableFuture.allOf Exception error.", e); }
if(status!=NodeStatus.LEADER){ return false; } int agreeNum = Agree.get(); if(agreeNum >= (peerSet.getPeersWithOutSelf().size()+1)/2){ return true; } return false; }
|
- Follower Read:
- Follower 收到读请求,向 Leader 发送
ReadIndex RPC。 - Leader 收到后,执行上述“确认领导权”流程,返回当前的
commitIndex。 - Follower 拿到
commitIndex 后,等待本地 appliedIndex >= commitIndex,再将数据返回给客户端。
响应客户端请求的接口代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
| public ClientKVAck handlerClientRequest(ClientKVReq request) {
log.warn("handlerClientRequest handler {} operation, and key : [{}], value : [{}]", ClientKVReq.Type.value(request.getType()), request.getKey(), request.getValue());
if (status != NodeStatus.LEADER && request.getType() == ClientKVReq.PUT) { log.warn("I not am leader , only invoke redirect write method, leader addr : {}, my addr : {}", peerSet.getLeader(), peerSet.getSelf().getAddr()); return redirect(request); }
if (request.getType() == ClientKVReq.GET && status == NodeStatus.LEADER) {
boolean isLeader = isStillLeader();
if(!isLeader){ return ClientKVAck.fail(); }
if(lastApplied != commitIndex){ commitLock.lock(); try { for(long i = lastApplied + 1; i <= commitIndex; i++){ getStateMachine().apply(logModule.read(i)); log.info("异步应用日志成功, logEntry info : {}", logModule.read(i)); } lastApplied = commitIndex; } catch (Exception e) { log.error("leader 节点 {} 日志应用到状态机失败",peerSet.getSelf()); } finally { commitLock.unlock(); } } LogEntry logEntry = stateMachine.get(request.getKey()); if (logEntry != null) { return new ClientKVAck(logEntry); } return new ClientKVAck(null); }
if (request.getType() == ClientKVReq.GET && status == NodeStatus.FOLLOWER){ Request request1 = new Request( Request.GET_READ_INDEX, null, peerSet.getLeader().getAddr());
GetCommitIndexResult result; try { result = getRpcClient().send(request1,300); } catch (Exception e) { log.error("无法与leader {} 取得连续,无法返回读请求",peerSet.getLeader().getAddr()); return ClientKVAck.fail(); }
if(!result.isSuccess()){ return ClientKVAck.fail(); }
long readIndex = result.getReadIndex();
long start = System.currentTimeMillis(); commitLock.lock(); try { while(System.currentTimeMillis() - start <= 2*1000){ long lastIndex = logModule.getLastIndex(); long shouldCommit = Math.min(readIndex,lastIndex); for(long i = lastApplied + 1; i <= shouldCommit; i++){ getStateMachine().apply(logModule.read(i)); log.info("应用日志成功, logEntry info : {}", logModule.read(i)); } lastApplied = shouldCommit; if(lastApplied == readIndex){ break; } try { commitLock.unlock(); Thread.sleep(20); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("同步 ReadIndex 时线程被中断"); return ClientKVAck.fail(); } finally { commitLock.lock(); } } } catch (Exception e) { return ClientKVAck.fail(); } finally { commitLock.unlock(); }
if(lastApplied!=readIndex){ return ClientKVAck.fail(); } LogEntry logEntry = stateMachine.get(request.getKey()); if (logEntry != null) { return new ClientKVAck(logEntry); } return new ClientKVAck(null); }
if(request.getType() == ClientKVReq.GET && status == NodeStatus.CANDIDATE){ return ClientKVAck.fail(); }
LogEntry logEntry = LogEntry.builder() .command(Command.builder(). key(request.getKey()). value(request.getValue()). build()) .term(currentTerm) .build();
logModule.write(logEntry); log.info("write logModule success, logEntry info : {}, log index : {}", logEntry, logEntry.getIndex()); return ClientKVAck.ok(); }
|
五、 原项目的一些问题
- 原项目只有在有新日志来的时候才会进行日志同步,没有新日志来的时候不会进行日志同步,但是一个落后的节点恢复之后,应该让他同步日志,这样就不会同步及时,于是我改造成了每次心跳时进行日志同步。
- 原项目在多个异步线程的同步(比如拉票请求,使用多线程进行发送拉票,但必须都完成之后才开始统计)上使用了 线程池+future+countDownLatch 使用future.get()获得结果,然后使用countDownLatch.await()等待所有线程完成,但是future.get()这个方法会阻塞线程,导致无法进行下一步操作。于是我改造成了CompletableFuture+线程池的方式实现。具体见前文中的代码。
- 原项目把lastApplied初始化为0,我认为并不合理,因为这个就默认0位置的日志已经应用了。所以应该改为-1。(我看了一下原项目并没有说明0位置的日志是垃圾日志,并且写入日志也都是从0开始,所以0位置是有用的,不应该跳过)。
- 原项目没有对RPC请求进行超时限制。这样在投票中极有可能发生如下情况:一个节点发起拉票请求,但有一个节点宕机,其他节点都投了赞成票。这个宕机节点会导致RPC请求一直处于等待状态,在最后统计得票的时候,会在多个异步任务同步这里超时(一般设为3秒)。但就是在这个时候,其他节点也极有可能发起新一轮的拉票请求,这样本来第一次发送拉票的节点,本应该在他那一轮变成leader但是在3秒之后,虽然拿到了多数票数,但是自己状态是follower(这里参考前文写的节点处理拉票请求),于是失败。这样就会导致多轮无效投票,导致效率低。于是设置一个RPC超时时间(500ms)保证能快速结算票数,让其他节点发起新一轮投票的概率变小。(其实原项目在投票的时候跟我的实现有一定出入,他并未对新一轮投票作出管理(我在当新一轮投票来临时,把自己得投票人置为空,并且把自己变成follower),只有自己变成候选人,然后落选之后才会把投票人清空,我觉得这样效率不高)
- 针对第5点的补充说明,原项目只有自己变成候选人,然后落选之后才会把投票人清空。这样做的话,在一个周期内如果没选出来领导人,那就必须等到下一个周期(因为重置投票人会在一个周期后,而不是一个新的term)。而我的改进,让一个节点收到更大的term之后会让出自己的位置,并且释放投票人,这样在每次新的term中每个节点都可以进行投票。这样在一个选举周期内大概率可以成功选举。我这种方案的随机时间设置范围最好为[electiontimeout, 2 * electiontimeout - 1],避免冲突。 (其实我这个思路跟6.824 lab2思路一样)
- 原项目并未实现Follower Read。我的具体实现见前文
五、 总结
以下是我的一些心得 共12条
- 投票时,如果遇到更新的term则把之前的投票人置空,防止无法投票(投票时也需要上锁,避免重复投票),然后判断自己没有投过票,当前发送请求的候选人是否有资格,即持有比自己更新的日志,如果都满足这个followe节点就可以投票给该节点。并更新相应的信息,释放锁。
- Follower收到日志时,判断日志是否匹配,不匹配返回失败,匹配则写入本地。写入本地之后,根据leader的commitIndex异步应用日志,提升效率。但都需要上锁(写入与提交都需要)
- follower收到心跳时,判断leader的term是否小于自己,如果小于返回失败,并且告诉leader自己的term,让他退位。如果大于等于自己,则把自己的状态变为follower(因为节点的状态可能是候选人,或者分区之后又加入的老leader)。然后重置投票倒计时。然后每次心跳还会带有leader的commitIndex,把这个跟自己最后的日志索引取最小值,然后异步应用日志到这个位置,跟leader保持同步。
- 在learder中把同步日志到followe操作 跟 心跳放在一起,即每次判断不同的follower是否需要同步日志,如果不需要则发送心跳。通过判断自己的lastIndex>=每个follower的nextIndex(对于每一个服务器,需要发送给他的下一个日志条目的索引值(初始化为领导人最后索引值加一))。如果是则发送nextIndex对应的日志。
- 在发送日志阶段,每次发送会带上indexIndex的前一条日志的term和index用于日志匹配。如果follower拒收,那就是日志不匹配(也有可能是leader的term小了,那么leader就应该退为follower),那么就把他的nextIndex 减一。这样下次心跳时就会发送前一个日志给他。直到成功,更新他对应的nextIndex以及matchIndex(对于每一个服务器,已经复制给他的日志的最高索引值)
- 合并发送,每次发送日志时,会把nextindex对应日志一直到自己的lastIndex日志一起发送,这样follower一旦匹配就可以批量写入新日志,快速同步。
- 当一个节点的投票倒计时结束后,还没收到leader的心跳。自己则会把自己的term+1,把自己变成候选人,投自己一票,然后向所有其他节点发起拉票,这个请求会携带自己的term,最后一个日志的term,index。通过这个其他节点才能判断你是否有资格。
- 把实现发起拉票操作的 线程池+Future+CountDownLatch 改成了 CompletableFuture+线程池。因为Future的get方法会阻塞调用,性能不如直接使用CompletableFuture的whenComplete。并且使用一个List去保存这些对象,使用CompletableFuture.allOf方法保持多个异步线程的同步,从而不使用CountDownLatch进行同步。使用AtomicInteger记录投票成功数量,保证安全性。
- 在确定Leader的commitIndex时,不再使用每次同步的结果来判断(如果大部分成功就提交)。因为现在是在心跳时同步日志,给每个follower同步的不一定是同一个索引的日志。而是在每次心跳(日志)发送完成之后(跟8一样必须在发送都完成之后),使用每个follower的matchIndex 来判断leade的commitIndex。转化为已知每个节点的最大匹配数,求超过一半的匹配索引(其实就是求中位数即可)。得到对应的commitIndex。判断当前commitIndex的日志是否是自己当前任期的,否则不能提交(防止幽灵复现问题,即重复提交同一个位置的日志),但是可以通过提交当前任期的日志顺带一并提交之前的,但是如果当前任期一直没新日志难道迟迟不提交吗?
- 插入空日志,插入空日志就是解决9中提到的幽灵复现问题。每次新leader上任就插入一条空日志,这样就可以实现顺带提交之前任期未提交的日志。也可以及时提交,不依赖于新的日志。新上任的leader初始化所有的 nextIndex 值为自己的最后一条日志的 index + 1,这样下次心跳如果不匹配就会跟第5步一样。
- Leader发送心跳也是使用的CompletableFuture但是这时就不用同步了,根据follower的返回值中的term是否大于自己的term来判断自己是否还能继续当,否则变成follower。
- 实现线性一致性的follower read。如果是leader收到读请求,先广播请求来判断当前自己是否还是leader(具体实现还是跟拉票类似,CompletableFuture+线程池),如果收到过半数的成功回复,则保证将日志应用至commitIndex的位置之后返回读取请求。如果失败则不响应或者返回失败。如果是follower收到了此次读请求,那么发送一个请求当前最新commitIndex(readIndex)的RPC给leader,leader收到请求也会先判断自己是否还是leader,之后返回当前的commitIndex。Follower拿到之后即保证应用至commitIndex位置之后返回读请求,否则返回失败。