Lab1

MapReduce的工作流程

01

整个Lab1的目标基本上就是实现这张图上的功能。

在已有的代码里已经提供好了一些map和reduce方法,我们要做的就是通过一个Coordinator来给多个Worker分配任务,每个Worker执行一个map或reduce任务。

那么该如何在Coordinator和Worker之间进行信息的交流?这里有很多种方法,因为这是在本地执行,所以可以利用共享内存、管道等进程间通信方式来实现。而对于跨机器的进程,可以使用RPC来远程调用方法,恰好Lab里提供了RPC的使用方法。

大致思路如下:

Coordinator里有GetTask和FinishedTask方法,Worker会一直循环通过RPC调用这两个方法来获取任务和通知任务完成,根据获取到的不同任务类型执行不同方法,直到所有任务做完。

Worker里有两个主要函数performMap和performReduce分别用来执行map和reduce任务。

由于需要知道任务的一些信息,所以我需要在GetTask和FinishedTask的参数类型中进行定义。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Worker通过调用GetTask方法向Coordinator获取任务
// RPC调用GetTask方法的请求参数
type GetTaskArgs struct{}

// RPC调用GetTask方法的返回值
type GetTaskReply struct {
TaskType TaskType // 任务类型
TaskNum int // 任务编号
MapFile string // map任务的输入文件名
NReduceTasks int // 需要告诉map任务有多少个reduce
NMapTasks int // 需要告诉reduce任务有多少个map
}

//FinishedTask用来通知Coordinator任务执行结束
// RPC调用FinishedTask方法的请求参数
type FinishedTaskArgs struct {
TaskType TaskType
TaskNum int
}

// RPC调用FinishedTask方法的返回值
type FinishedTaskReply struct{}

在go里RPC注册函数的规则是

1
func (t *Type) Method(args interface{}, reply interface{}) error {...}

所以两个方法大体如下

1
2
3
4
5
6
7
func (c *Coordinator) GetTask(args *GetTaskArgs, reply *GetTaskReply) error {
...
}

func (c *Coordinator) FinishedTask(args *FinishedTaskArgs, reply *FinishedTaskReply) error {
...
}

假如此时有一个Worker向Coordinator获取任务,Coordinator会遍历所有任务,当有任务未开始或者执行超时,Coordinator就会向当前这个Worker发布任务。

当Worker执行完该任务,它会通知Coordinator该任务已完成,并打上标记。

对于Coordinator,就需要维护所有任务的信息,它的定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type Coordinator struct {
MapFiles []string // 所有map任务的输入文件名
nMapTasks int // map任务数
nReduceTasks int // reduce任务数

mapTasksFinished []bool // 第i个map任务是否完成
mapTasksIssued []time.Time // 设置第i个map任务的起始时间
reduceTasksFinished []bool // 第i个reduce任务是否完成
reduceTasksIssued []time.Time // 设置第i个reduce任务的起始时间

mu sync.Mutex // 互斥锁,用来控制多个Worker同时来访问资源的安全性

isDone bool // 所有任务是否做完的标记
}

Lab中还有一些小细节,比如map任务执行完后数据存放位置。根据Lab的提示,可以将所有这些中间文件以"mr-X-Y"的形式命名,存为临时文件,X表示第几个map任务,Y表示第几个reduce任务。而reduce执行完后将输出文件以"mr-out-X"的形式命名,X表示第几个reduce任务。

贴一张完成后并通过所有测试的结果

01

Lab2

一些参考资料:

Raft官网:Raft Consensus Algorithm

Raft可视化网站:Raft (thesecretlivesofdata.com)

Lab2A

Lab2A需要实现的是Raft启动时的Leader Election,也就是选举阶段。

目标

在2A的测试代码中,主要分为以下几个测试目标:

  • 正常情况下能否选出一个leader以及保持一段时间后要保证leader及term不变
  • 将leader离线,查看剩下的server能否选出新的leader,并且旧的leader恢复正常后不影响新leader
  • 断开半数以上的服务器包括leader,剩下的server不会选举出新leader

分析

根据论文中的描述,每个server有三个状态:Follower、Candidate、Leader,初始时都为Follower。server会有一个随机的选举超时时间,如果当一个server选举超时,那么他就会从Follower转变成Candidate,并将自己的Term+1,也就是表明开启一个新的任期。只有当Candidate收到超过半数的投票,它才会变成Leader。

因为2A仅涉及选举过程,所以我们只考虑server的以下几个状态

1
2
3
4
5
6
7
type Raft struct {
state State // 该Raft peer状态
currentTerm int // 服务器最后知道的任期号(服务启动时,初始化为0,单调递增)
votedFor int // 当前任期内投票信息(每一个任期开始时为-1)
lastReceive time.Time // folloer上一次收到心跳检测的时间
...
}

为了检测选举超时,我们可以开启一个线程,通过不断死循环,每次循环中会sleep随机时间(随机时间使得能够在一个比较快的时间内选出Leader),接着让该server中的最近收到心跳检测的时间和sleep前保存的一个时间相比,如果途中没收到任何消息,该server就进行选举。

该线程大致如下

1
2
3
4
5
6
7
8
9
10
func (rf* Raft) ticker() {
for {
startTime := time.Now()
time.Sleep(rand)
...
if rf.state != leader && rf.lastReceive.Before(startTime) {
go rf.StartElection()
}
}
}

在选举过程中,首先将server转变成Candidate,只要是转变成Candidate就说明要开启新任期,那么currentTerm就会+1,votedFor会投给自己,同时刷新超时时间。然后去遍历每一个server,给他们发送投票请求。只要超过半数投票,就会转换成Leader,并通知其他server不需要选举了。

而对于Follower,假如我当前收到了一个投票请求,只有当请求的Candidate任期不小于我,且该任期内没有投过票,我才会给它投票。如果我的任期比它大,那么Candidate会转变成Follower并将任期更新成我的任期,同时重置其他状态,退出选举。

索取投票流程大致如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (rf *Raft) StartElection() {
rf.ConvertToCandidate() // 开启选举,转变成Candidate
votes := 1 // 给自己投一票
...
for server := 0; server < len(rf.peers); server++ {
if server == rf.me {
continue
}
...
go func(server int) { // 开启一个线程进行投票请求
rf.sendRequestVote(...) // 向server发送投票请求
// 发现有比该Candidate更大的任期,转变成Follower并更新Term,重新等待选举超时
if reply.Term > rf.currentTerm {
rf.ConvertToFollower(reply.Term) // 其他人Term比我大,我变成Follower
return
}
if reply.VoteGranted {
votes++
// 只要超过半数投票,就成为Leader,并通知其他server
if rf.state == Candidate && votes*2 > len(rf.peers) {
rf.ConvertToLeader()
go rf.SendHeartbeat() // 开启心跳检测
}
}
}(server)
}
}

因为Leader会定时发送心跳检测,我们可以将心跳检测另开一个线程,类似于刚刚的ticker()。需要注意的是发送心跳检测的时间间隔必须远小于投票超时时间,否则会发生一个Leader还没发送心跳检测就会产生新的Candidate开启选举,当前这个Leader就失效了。

对于断线后重连的Leader,因为在Raft协议中只有最大的Term才是真Leader,所以当真Leader发送心跳检测时,如果发现有Term比我小的,那么就会将它Term更新并置为Follower。还有一种情况是旧Leader先发送了心跳检测,那么如果server收到后发现发送者的Term比该server小,server会返回一个false。

因为代码中涉及到很多多线程,所以在读写数据时最好都要加上锁,在我的代码中全部是使用的它已经给定的互斥锁,没有考虑锁的粒度,因此可能会比较慢。在不考虑各种优化的情况下成功通过了2A的测试。

01

02

Lab2B

lab2B需要实现Raft的日志同步阶段,细节非常的多。

目标

Lab2B主要有8个测试(整理自2020 Spring 6.824 Lab2B: Raft Log Replication笔记 - 知乎 (zhihu.com) ):

  • 简单提交一个log,检查各个Raft server关于该log有没有达成协议
  • 检查在没有断联的情况下有没有重发、多发
  • 测试少数节点失败重连时的系统情况
  • 测试多数节点失败重连时的系统情况
  • 检查并发向Raft准备同步的日志里提交的情况
  • 老leader断联后收到一堆log,新leader也收到一堆log,重连老leader,检查是否正常
  • 检查日志同步能力,不断断联不同leader,再一下子连接回来,考察复杂网络情况下Raft能不能保证数据同步稳定性
  • 检验整个集群的commit需要的时间和RPC次数,以及没网的情况下leader和term是否有变化

分析

首先我们得要改进Lab2A的选举,因为有了日志,leader在获取选票时要把另外两个参数考虑进去,所以最终的RequestVoteArgs为

1
2
3
4
5
6
7
type RequestVoteArgs struct {
Term int // Candidate任期号
CandidateId int // 请求投票的Candidate id

LastLogIndex int // Candidate最新日志条目的索引值
LastLogTerm int // Candidate最新日志条目对应的任期号
}

收到该RPC请求的server也需要多比较LastLogIndex和LastLogTerm来判断是否可以给该Candidate投票。

在Lab2A的基础上,首先比较的是自己最后一个log的Term和选举人的Term,如果比我大或者一样大且最后一个log的索引号不小于我,我才会给他投票,其他情况均不投票。

然后需要改进的是leader发送心跳检测的内容。一个是在Raft这个类中多了几个变量

1
2
3
4
5
6
7
8
type Raft struct {
...
commitIndex int // 已知被提交的最大日志条目的索引值(初始化为0,单调递增)
lastApplied int // 被状态机执行的最大日志条目的索引值(初始化为0,单调递增)
nextIndex []int // 对于每一个server,需要发给它的下一个日志条目的索引(初始化为leader上一条日志的索引值+1)
matchIndex []int // 对于每一个server,已复制到该server的日志条目的最高索引值(初始化为0,单调递增)
...
}

另外,在之前我们只考虑了无log发送的情况,现在多了log后,AppendEntryArgs为

1
2
3
4
5
6
7
8
9
type AppendEntryArgs struct {
Term int // leader任期号
LeaderId int // leader id,为了其他Client能重定向到leader

PrevLogIndex int // 当前日志之前的日志的索引值
PrevLogTerm int // 当前日志之前的日志的leader任期号
Entries []LogEntry // 将要存储的日志条目
LeaderCommit int // leader提交的日志条目索引值
}

LeaderCommit参数主要是为了让server的commitIndex和leader进行同步,说明这之前的log已经被大多数server复制,已经可以准备提交到上层去执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
func (rf *Raft) AppendEntry {
...
// 如果leader的commitIndex比当前server大,说明这之间的日志已被大多数server复制
// 我已经可以在ApplyLog方法中提交这些操作了
if args.LeaderCommit > rf.commitIndex {
if args.LeaderCommit < len(rf.log)-1 {
rf.commitIndex = args.LeaderCommit
} else {
rf.commitIndex = len(rf.log) - 1
}
}
...
}

PrevLogIndex为leader的nextIndex[server]-1,PrevLogTerm是该index上日志的Term。如果说两个参数中有任意一个没和server的对上,server会返回一个false,leader就会将nextIndex[server]–,直到log为同一个term以及index相同。这里有个优化,就是在返回的参数中给定server的冲突的term第一次出现的下标,只有加上这个优化才能通过第七个测试,虽然论文上说该优化在实际中用处不大。

1
2
3
4
5
6
7
8
9
10
11
12
func (rf *Raft) AppendEntry {
...
if 任期不匹配 {
for i := 1; i <= args.PrevLogIndex; i++ {
if rf.log[i].Term == reply.ConflictTerm {
reply.ConflictIndex = i
break
}
}
}
...
}

在leader的心跳检测方法中就需要加上对应的措施来应对不匹配

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (rf *Raft) sendHeartbeat {
...
if reply.Successful {
...
} else {
if reply.ConflictTerm > 0 {
index := 0
// 找到leader对应的该任期出现的位置
for i := reply.ConflictIndex; i >= 1; i-- {
if rf.log[i].Term == reply.ConflictTerm {
index = i
break
}
}
if index > 0 { // 存在相同的term
rf.nextIndex[server] = index + 1
} else { // 不存在相同term
rf.nextIndex[server] = reply.ConflictIndex
}
} else {
rf.nextIndex[server] = reply.ConflictIndex
}
}
...
}

2B主要的坑点是多了很多的index或term这样的参数,判断时需要分成好几类,很容易出错,以及在强化2A选举过程时有很多代码进行了重构,导致有一些细节丢失,2B的第七个测试老是出现选举不出leader的问题,到最后才发现是在某处忘记将身份改成follower。

基本上2B的代码都是照着论文中的Figure2的描述完成的,同时参考了网上很多份讲解,才彻底搞懂了几个参数之间的关系。主要的时间还是花费在debug上,有时候可能会因为数据忘记上锁导致data race。

03

04

Lab2C

lab2C需要实现Raft的可持久化阶段。

目标

整理自(MIT 6.824-Lab 2 学习记录_LLLSoul的博客-CSDN博客

  • 节点宕机恢复验证持久化正确性
  • 验证网络分区故障的情况下持久化数据的正确性
  • 验证Leader宕机能否正确回复日志(除了所有类型节点都要在append、vote那里要持久化,Leader还有其它的地方需要持久化)
  • 测试paper中图8的错误情况,避免直接提交以前term的日志,其实只要知道怎么做就行了,在apply前加个判断term是否为最新的条件
  • 模拟不可靠网络的情况
  • 基于不可靠网络的图8测试
  • 并发测试

分析

先要完成persist和readPersist两个函数的内容,这两部分比较简单,框架中已经给了足够的提示。

难的主要是在测试,虽然2B测了上百次基本上能够通过,但是到了2C中,Figure 8和Figure 8(unreliable)这两个测试刚开始一直会出现apply error或者是fail to agreement,在网上找相关失败原因后发现可能的原因是集群内同步效率低,或者是一些细节没处理好。

接着在代码中发现没有处理Figure 8的情况,刚开始之去判断了大多数节点的commitIndex是否大于leader的commitIndex,没有比较最后一个log的term和currentTerm是否相同,这就或导致Figure 8中的情况发生。也就是说leader重连后需要先收到一个最新的log才能对之前的log进行提交。

05

06

Lab2D

lab2D需要实现Raft的日志压缩阶段。

目标

本阶段的目标主要是测试日志压缩,类似于2B和2C,会模拟很多网络故障或机器故障,然后在此基础上测试日志是否能够同步,并需要在到达一定时候修剪日志。

分析

之所以需要压缩日志是因为不可能让服务器一直让log添加下去,否则迟早有一天会爆满。

在最后一个阶段,需要修改一些之前的代码,比如一些下标。在前几个lab中,我都是直接用从0开始的下标当作真实下标来用,但加入日志压缩后,会出现下标和日志内容对不上,因为前面的日志已经被我删掉了,我得重新计算日志的下标。因为需要保留快照的信息,也就是lastIncludedIndex和lastIncludedTerm,可以让第0个日志为快照的内容(之前第0个日志一直为空日志)。并在每个日志中加上Index信息,这样子我们就可以用Index-lastIncludedIndex来计算下标。

本阶段需要实现Snapshot()和一个InstallSnapshot RPC。Snapshot()是上层对Raft的调用,如果Raft收到一个Snapshot(),就说明上层需要压缩日志,所以部分的代码不是很难

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (rf *Raft) Snapshot(index int, snapshot []byte) {
...

// server当前已存储的快照
lastIncludedIndex := rf.log[0].Index
// 如果已经存在不用加或者是超过commitIndex不合法
if index <= lastIncludedIndex || index > rf.commitIndex {
return
}

// 截取从index开始的日志
// 需要注意这里不能够直接rf.log = rf.log[index-lastIncludedIndex:]
// 使用上面这种的话,底层引用不会减少,就让压缩日志失去意义
rf.log = append([]LogEntry{}, rf.log[index-lastIncludedIndex:]...)
rf.snapshot = snapshot
if index > rf.commitIndex {
rf.commitIndex = index
}
if index > rf.lastApplied {
rf.lastApplied = index
}
rf.persist()
}

对于InstallSnapshot RPC,我们需要定义参数和返回参数,在论文中已经给出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type InstallSnapshotArgs struct {
Term int // leader的term
LeaderId int // follower重定向
LastIncludedIndex int // 快照替换了该索引之前包括该索引的所有日志
LastIncludedTerm int // LastIncludedIndex的任期
Data []byte // 快照分块从offset开始的字节流
// 下面两个参数在该lab中不需要用到
// Offset int
// Done bool
}

type InstallSnapshotReply struct {
Term int // follower的term
}

而InstallSnapshot()函数类似与发送心跳的AppendEntry(),是leader对folloer调用,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func (rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {
...
// 一些term的判断,类似于AppendEntry
...

lastIncludedIndex, lastIndex := rf.log[0].Index, rf.log[len(rf.log)-1].Index

// 已经含有该快照
if args.LastIncludedIndex <= lastIncludedIndex {
return
}

if args.LastIncludedIndex < lastIndex {
// log中存在leader发来的快照对应的index
rf.log = append([]LogEntry{}, rf.log[args.LastIncludedIndex-lastIncludedIndex:]...)
} else {
// 不存在,那么我就需要删除所有日志,并在最开始补齐一个快照
rf.log = []LogEntry{{
Term: args.LastIncludedTerm,
Index: args.LastIncludedIndex,
Command: nil,
}}
}
rf.snapshot = args.Data
rf.persist()

// 更新commitIndex和lastApplied
if args.LastIncludedIndex > rf.commitIndex {
rf.commitIndex = args.LastIncludedIndex
}
if args.LastIncludedIndex > rf.lastApplied {
rf.lastApplied = args.LastIncludedIndex
}
...
}

在leader的发送心跳前还需要判断follower的nextIndex是否大于我的lastIncludedIndex,如果大于,就发送普通的心跳检测,否则发送快照。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (rf *Raft) sendHeartbeat() {
...
for server := rf.peers {
lastIncludedIndex := rf.log[0].Index

if rf.nextIndex[server] > lastIncludedIndex {
...
// 普通心跳检测
rf.sendAppendEntry(...)
} else {
...
// 快照
rf.sendInstallSnapshot(...)
}
}
...
}

07

08

最后是一张lab2总的测试结果

09

Lab3

Lab3A

在Lab2中已经实现了Raft,Lab3主要是在Raft的基础上实现一个具有容错机制的存储服务器,需要满足线性一致性。Raft为上层保证了共识性,但对于客户端和服务端来说还不是线性一致,所以Lab3中主要就是要在客户端和服务端实现线性一致性的语义。

目标

需要完成client和server端的逻辑。

在测试中,会让client快速发送指令给server,server需要对这些指令实现线性一致。

从一个client到多个client,以及模拟了网络损坏的情况,或者是server宕机的情况。

分析

比较容易完成的是client端的代码,作为客户端,我只需要发送我的命令给server,如果失败了重复发送直到成功。

因为client是通过RPC发送的,所以先要对RPC的两个参数进行定义

1
2
3
4
5
6
7
8
9
10
11
12
13
type PutAppendArgs struct {
Key string // 键
Value string // 值
Op string // 请求类型

// 用于server确认客户端的最新请求,防止重复执行
ClientId int64 // 客户端id
Seq int64 // 请求id
}

type PutAppendReply struct {
Err Err
}

然后是client的PutAppend()方法(Get()方法同理)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (ck *Clerk) PutAppend(key string, value string, op string) {
// 有新的操作让seq+1
ck.seq++
args := PutAppendArgs{
Key: key,
Value: value,
Op: op,
ClientId: ck.clientId,
Seq: ck.seq,
}

for {
reply := PutAppendReply{}
ok := ck.servers[ck.lastLeader].Call("KVServer.PutAppend", &args, &reply)
if ok {
if reply.Err == OK {
return
} else if reply.Err == ErrWrongLeader {
ck.lastLeader = (ck.lastLeader + 1) % len(ck.servers)
}
} else {
ck.lastLeader = (ck.lastLeader + 1) % len(ck.servers)
}
time.Sleep(1 * time.Millisecond)
}
}

在server中,会收到来自client端的RPC请求,server需要将请求的操作通过Raft的Start()方法交给Raft层,并等待Raft层达成共识,接着Raft层会通过applyCh管道,发送已经达成一致的操作,最后server才能够执行该条操作。

这里参考了MIT 6.824: Distributed Systems- 实现Raft Lab3A | 鱼儿的博客 (yuerblog.cc) 的做法。

server首先保存关于该操作的上下文,并给每个操作一个管道,主要是为了能够快速处理多个操作。然后server利用select等待操作的一致,最后将该上下文删除并返回结果,所以大致如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
op := Op{
Op: args.Op,
Key: args.Key,
Value: args.Value,
ClientId: args.ClientId,
Seq: args.Seq,
}
index, term, isleader := kv.rf.Start(op)

// raft不是leader直接返回,因为只有leader才能执行
if !isleader {
reply.Err = ErrWrongLeader
return
}

op.Index = index
op.Term = term

// 保存操作
opContext := &OpContext{
op: &op,
committed: make(chan struct{}),
}

// select等待reqMap[index].committed,即等待raft层提交该操作
kv.reqMap[index] = opContext

select {
case <-opContext.committed:
if opContext.leaderChanged {
reply.Err = ErrWrongLeader
} else {
reply.Err = OK
}
case <-time.After(2 * time.Second): // 超时
reply.Err = ErrWrongLeader
}

if context, err := kv.reqMap[index]; err {
if context == opContext {
delete(kv.reqMap, index)
}
}
}

因为raft层随时会提交操作,所以server需要另开一个线程监听applyCh,一但有操作提交,server就可以开始判断该操作是否过时之类的,如果都合法,则执行该操作,然后给reqMap[index]中的管道发送信息,告诉他该操作已经执行完毕。

apply大致如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func (kv *KVServer) apply() {
for !kv.killed() {
msg := <-kv.applyCh
op := msg.Command.(Op)
index := msg.CommandIndex
term := msg.CommandTerm

// raft层提交了index的操作
reqOp, exsistOp := kv.reqMap[index]
lastSeq, exsistSeq := kv.seqMap[op.ClientId]

// 是一个新的操作
if !exsistSeq || op.Seq > lastSeq {
// 更新client的最新操作序号
kv.seqMap[op.ClientId] = op.Seq
if op.Op == "Put" {
kv.kvTable[op.Key] = op.Value
} else if op.Op == "Append" {
kv.kvTable[op.Key] += op.Value
}
}

// 存在一个正在等待完成的操作
if exsistOp {
if term != reqOp.op.Term {
reqOp.leaderChanged = true
}

if op.Op == "Get" {
reqOp.value, reqOp.ok = kv.kvTable[op.Key]
if !reqOp.ok {
reqOp.value = ""
}
}
// 给PutAppend中的select中的committed发送通知
close(reqOp.committed)
}
}
}

还需要注意的一点是,Lab3A中有个测试是ops complete fast enough。在这个测试中,Test会发送1000个PutAppend请求,要求请求速度快于33.3333ms/op。我们需要对Raft层进行一些修改,当Raft的Start()方法一收到来自server的command,就要立即对其他follower发送心跳检测,以便快速达到一致性,可能还需要适当调慢心跳检测发送的速度,因为如果发送速度太快,可能会导致多个线程中锁的争抢,从而拖慢整个程序的速度。如果还是速度慢,可以试试运行时不加-race编译选项。

01