# 一、什么是 Raft算法

Raft适用于一个管理日志一致性的协议,相比于Paxos协议Raft更易于理解和去实现它。为了提高理解性,Raft将一致性算法分为了几个部分,包括领导选取leader selection、日志复制log replication、安全safety,并且使用了更强的一致性来减少了必须需要考虑的状态。

Raft算法将Server划分为3种状态,或者也可以称作角色:
【1】Leader:负责Client交互和log复制,同一时刻系统中最多存在1个。
【2】Follower:被动响应请求RPC,从不主动发起请求RPC
【3】Candidate:一种临时的角色,只存在于Leader的选举阶段,某个节点想要变成Leader,那么就发起投票请求,同时自己变成Candidate。如果选举成功,则变为Candidate,否则退回为Follower

状态或者说角色的流转如下:

Nacos

Raft中,问题分解为:领导选取、日志复制、安全成员变化

复制状态机通过复制日志来实现

日志:每台机器保存一份日志,日志来自于客户端的请求,包含一系列的命令
状态机:状态机会按顺序执行这些命令
一致性模型:分布式环境下,保证多机的日志是一致的,这样回放到状态机中的状态是一致的

# 二、Raft算法选主流程

Raft中有Term的概念,Term类比中国历史上的朝代更替,Raft算法将时间划分成为任意不同长度的任期term

选举流程:
1、Follower增加当前的term,转变为Candidate
2、Candidate投票给自己,并发送RequestVote RPC给集群中的其他服务器。
3、收到 RequestVote的服务器,在同一term中只会按照先到先得投票给至多一个Candidate。且只会投票给log至少和自身一样新的Candidate

Nacos

关于Raft更详细的描述,可以查看这里,从分布式一致性到共识机制(二)Raft算法

# 三、Nacos中的 CP一致性

Spring Cloud Alibaba Nacos1.0.0正式支持APCP两种一致性协议,其中CP一致性协议实现,是基于简化的RaftCP一致性。

如何实现 Raft算法: Nacos server在启动时,会通过RunningConfig.onApplicationEvent()方法调用RaftCore.init()方法。

启动选举

public static void init() throws Exception {

    Loggers.RAFT.info("initializing Raft sub-system");

    // 启动Notifier,轮询Datums,通知RaftListener
    executor.submit(notifier);

    // 获取Raft集群节点,更新到PeerSet中
    peers.add(NamingProxy.getServers());

    long start = System.currentTimeMillis();

    // 从磁盘加载Datum和term数据进行数据恢复
    RaftStore.load();

    Loggers.RAFT.info("cache loaded, peer count: {}, datum count: {}, current term: {}",
        peers.size(), datums.size(), peers.getTerm());

    while (true) {
        if (notifier.tasks.size() <= 0) {
            break;
        }
        Thread.sleep(1000L);
        System.out.println(notifier.tasks.size());
    }

    Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start));

    GlobalExecutor.register(new MasterElection()); // Leader选举
    GlobalExecutor.register1(new HeartBeat()); // Raft心跳
    GlobalExecutor.register(new AddressServerUpdater(), GlobalExecutor.ADDRESS_SERVER_UPDATE_INTERVAL_MS);

    if (peers.size() > 0) {
        if (lock.tryLock(INIT_LOCK_TIME_SECONDS, TimeUnit.SECONDS)) {
            initialized = true;
            lock.unlock();
        }
    } else {
        throw new Exception("peers is empty.");
    }

    Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}",
        GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

init方法主要做了如下几件事:
【1】获取Raft集群节点peers.add(NamingProxy.getServers());
【2】Raft集群数据恢复RaftStore.load();
【3】Raft选举GlobalExecutor.register(new MasterElection());
【4】Raft心跳GlobalExecutor.register(new HeartBeat());
【5】Raft发布内容
【6】Raft保证内容一致性

选举流程: 其中,raft集群内部节点间是通过暴露的Restful接口,代码在RaftController中。RaftController控制器是Raft集群内部节点间通信使用的,具体的信息如下

POST HTTP://{ip:port}/v1/ns/raft/vote : 进行投票请求

POST HTTP://{ip:port}/v1/ns/raft/beat : Leader向Follower发送心跳信息

GET HTTP://{ip:port}/v1/ns/raft/peer : 获取该节点的RaftPeer信息

PUT HTTP://{ip:port}/v1/ns/raft/datum/reload : 重新加载某日志信息

POST HTTP://{ip:port}/v1/ns/raft/datum : Leader接收传来的数据并存入

DELETE HTTP://{ip:port}/v1/ns/raft/datum : Leader接收传来的数据删除操作

GET HTTP://{ip:port}/v1/ns/raft/datum : 获取该节点存储的数据信息

GET HTTP://{ip:port}/v1/ns/raft/state : 获取该节点的状态信息{UP or DOWN}

POST HTTP://{ip:port}/v1/ns/raft/datum/commit : Follower节点接收Leader传来得到数据存入操作

DELETE HTTP://{ip:port}/v1/ns/raft/datum : Follower节点接收Leader传来的数据删除操作

GET HTTP://{ip:port}/v1/ns/raft/leader : 获取当前集群的Leader节点信息

GET HTTP://{ip:port}/v1/ns/raft/listeners : 获取当前Raft集群的所有事件监听者
RaftPeerSet
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

心跳机制: Raft中使用心跳机制来触发Leader选举。心跳定时任务是在GlobalExecutor中,通过GlobalExecutor.register(new HeartBeat())注册心跳定时任务,具体操作包括:
【1】重置Leader节点的heart timeoutelection timeout
【2】sendBeat()发送心跳包

public class HeartBeat implements Runnable {
        @Override
        public void run() {
            try {

                if (!peers.isReady()) {
                    return;
                }

                RaftPeer local = peers.local();
                local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;
                if (local.heartbeatDueMs > 0) {
                    return;
                }

                local.resetHeartbeatDue();

                sendBeat();
            } catch (Exception e) {
                Loggers.RAFT.warn("[RAFT] error while sending beat {}", e);
            }

        }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

简单说明了下Nacos中的Raft一致性实现,更详细的流程,可以下载源码,查看RaftCore进行了解。源码可以通过以下地址检出:

git clone https://github.com/alibaba/nacos.git
1
Nacos

# 四、Nacos AP 实现

AP协议: Distro协议Distro是阿里巴巴的私有协议,目前流行的Nacos服务管理框架就采用了Distro协议。Distro协议被定位为临时数据的一致性协议 :该类型协议, 不需要把数据存储到磁盘或者数据库,因为临时数据通常和服务器保持一个session会话, 该会话只要存在,数据就不会丢失 。

Distro协议保证写必须永远是成功的,即使可能会发生网络分区。当网络恢复时,把各数据分片的数据进行合并。

Distro协议具有以下特点:
【1】专门为了注册中心而创造出的协议;
【2】客户端与服务端有两个重要的交互,服务注册心跳发送
【3】客户端以服务为维度向服务端注册,注册后每隔一段时间向服务端发送一次心跳,心跳包需要带上注册服务的全部信息,在客户端看来,服务端节点对等,所以请求的节点是随机的;
【4】客户端请求失败则换一个节点重新发送请求;
【5】服务端节点都存储所有数据,但每个节点只负责其中一部分服务,在接收到客户端的“写”(注册、心跳、下线等)请求后,服务端节点判断请求的服务是否为自己负责,如果是,则处理,否则交由负责的节点处理;
【6】每个服务端节点主动发送健康检查到其他节点,响应的节点被该节点视为健康节点;
【7】服务端在接收到客户端的服务心跳后,如果该服务不存在,则将该心跳请求当做注册请求来处理;
【8】服务端如果长时间未收到客户端心跳,则下线该服务;
【9】负责的节点在接收到服务注册、服务心跳等写请求后将数据写入后即返回,后台异步地将数据同步给其他节点;
【10】节点在收到读请求后直接从本机获取后返回,无论数据是否为最新。

Distro协议服务端节点发现使用寻址机制来实现服务端节点的管理。在Nacos中,寻址模式有三种:

Nacos

【1】单机模式: StandaloneMemberLookup
【2】文件模式: FileConfigMemberLookup利用监控cluster.conf文件的变动实现节点的管理。核心代码如下:

Nacos

【3】服务器模式: AddressServerMemberLookup使用地址服务器存储节点信息,服务端节点定时拉取信息进行管理

Nacos

核心代码:

Nacos

初始全量同步: Distro协议节点启动时会从其他节点全量同步数据。在Nacos中,整体流程如下:

Nacos

【1】启动一个定时任务线程DistroLoadDataTask加载数据,调用load()方法加载数据
【2】调用loadAllDataSnapshotFromRemote()方法从远程机器同步所有的数据
【3】从namingProxy代理获取所有的数据data
 ● 构造http请求,调用httpGet方法从指定的server获取数据
 ● 从获取的结果result中获取数据bytes
【4】处理数据processData
 ● 从data反序列化出datumMap
 ● 把数据存储到dataStore,也就是本地缓存dataMap
 ● 监听器不包括key,就创建一个空的service,并且绑定监听器
【5】监听器listener执行成功后,就更新data store

核心代码如下:

Nacos Nacos Nacos Nacos

增量同步: 新增数据使用异步广播同步:
【1】DistroProtocol使用sync()方法接收增量数据
【2】向其他节点发布广播任务:调用distroTaskEngineHolder发布延迟任务
【3】调用DistroDelayTaskProcessor.process()方法进行任务投递:将延迟任务转换为异步变更任务
【4】执行变更任务DistroSyncChangeTask.run()方法:向指定节点发送消息
 ● 调用DistroHttpAgent.syncData()方法发送数据
 ● 调用NamingProxy.syncData()方法发送数据
【5】异常任务调用handleFailedTask()方法进行处理
 ● 调用DistroFailedTaskHandler处理失败任务
 ● 调用DistroHttpCombinedKeyTaskFailedHandler将失败任务重新投递成延迟任务

核心代码如下:

Nacos Nacos Nacos Nacos

# 五、总结

Distro协议是阿里的私有协议,但是对外开源框架只有Nacos。所有我们只能从Nacos中一窥Distro协议。Distro协议是一个比较简单的最终一致性协议。整体由节点寻址、数据全量同步、异步增量同步、定时上报client所有信息、心跳探活其他节点等组成

本文中的Nacos源码版本为Nacos 1.3.2 ,属于优化过的源码,抽象出一致性协议抽象接口,和JRaft共用节点寻址模式。

(adsbygoogle = window.adsbygoogle || []).push({});