设为首页收藏本站

LUPA开源社区

 找回密码
 注册
文章 帖子 博客
LUPA开源社区 首页 业界资讯 技术文摘 查看内容

Go语言的分布式读写互斥

2015-5-6 21:56| 发布者: joejoe0332| 查看: 986| 评论: 0|原作者: BuN_Ny, OSC技术周刊, eason02|来自: oschina

摘要: Go语言默认的 sync.RWMutex 实现在多核环境中表现并不佳,因为所有的读者在进行原子增量操作时,会抢占相同的内存地址。该文探讨了一种 n-way RWMutex,也可以称为“大读者(big reader)”锁,它可以为每个 CPU 内核 ...

  Go语言默认的 sync.RWMutex 实现在多核环境中表现并不佳,因为所有的读者在进行原子增量操作时,会抢占相同的内存地址。该文探讨了一种 n-way RWMutex,也可以称为“大读者(big reader)”锁,它可以为每个 CPU 内核分配独立的 RWMutex。读者仅需在其核心中处理读锁,而写者则须依次处理所有锁。


查找当前 CPU

  读者使用 CPUID 指令来决定使用何种锁,该指令仅需返回当前活动 CPU 的 APICID,而不需要发出系统调用指令抑或改变运行时。这在 Intel 或 AMD 处理器上均是可以的;ARM 处理器则需要使用 CPU ID 寄存器 。 对于超过 256 个处理器的系统,必须使用 x2APIC, 另外除了 CPUID 还要用到带有EAX=0xb 的 EDX 寄存器。程序启动时,会构建(通过 CPU 亲和力系统调用) APICID 到 CPU 索引的映射, 该映射在处理器的整个生命周期中静态存在。由于 CPUID 指令的开销可能相当昂贵,goroutine 将只在其运行的内核中定期地更新状态结果。频繁更新可以减少内核锁阻塞,但同时也会导致花在加锁过程中的 CPUID 指令时间增加。


  陈旧的 CPU 信息。如果加上锁运行 goroutine 的 CPU 信息可能会是过时的 (goroutine 会转移到另一个核心)。在 reader 记住哪个是上锁的前提下,这只会影响性能,而不会影响准确性,当然,这样的转移也是不太可能的,就像操作系统内核尝试在同一个核心保持线程来改进缓存命中率一样。


性能

  这个模式的性能特征会被大量的参数所影响。特别是 CPUID 检测频率,readers 的数量,readers 和 writers 的比率,还有 readers 持有锁的时间,这些因素都非常重要。当在这个时间有且仅有一个 writer 活跃的时候,这个 writer 持有锁的时期不会影响 sync.RWMutex 和 DRWMutex 之间的性能差异。


  实验证明DRWMutex表现胜过多核系统,特别writer小于1%的时候,CPUID会在最多每10个锁之间被调用(这种变化取决于锁被持有的持续时间)。甚至在少核的情况下,DRWMutex也在普遍选择通过sync.Mutex使用sync.RWMutex的应用程序的情况下表现好过sync.RWMutex.


  下图显示核数量使用增加每10个的平均性能:


drwmutex -i 5000 -p 0.0001 -w 1 -r 100 -c 100


  错误条表示第25和第75个百分位。注意每第10核的下降;这是因为10个核组成一个运行标准检查系统的机器上的NUMA节点, 所以一旦增加一个NUMA节点,跨线程通信量变得更加宝贵。对于DRWMutex来说,由于对比sync.RWMutex更多的reader能够并行工作,所以性能也随之提升。


  查看go-nuts tread进一步讨论


cpu_amd64.s        

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include "textflag.h"
 
// func cpu() uint64
TEXT 路cpu(SB),NOSPLIT,$0-8
    MOVL    $0x01, AX // version information
    MOVL    $0x00, BX // any leaf will do
    MOVL    $0x00, CX // any subleaf will do
 
    // call CPUID
    BYTE $0x0f
    BYTE $0xa2
 
    SHRQ    $24, BX // logical cpu id is put in EBX[31-24]
    MOVQ    BX, ret+0(FP)
    RET

main.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
package main
 
import (
    "flag"
    "fmt"
    "math/rand"
    "os"
    "runtime"
    "runtime/pprof"
    "sync"
    "syscall"
    "time"
    "unsafe"
)
 
func cpu() uint64 // implemented in cpu_amd64.s
 
var cpus map[uint64]int
 
// determine mapping from APIC ID to CPU index by pinning the entire process to
// one core at the time, and seeing that its APIC ID is.
func init() {
    cpus = make(map[uint64]int)
 
    var aff uint64
    syscall.Syscall(syscall.SYS_SCHED_GETAFFINITY, uintptr(0), unsafe.Sizeof(aff), uintptr(unsafe.Pointer(&aff)))
 
    n := 0
    start := time.Now()
    var mask uint64 = 1
Outer:
    for {
        for (aff & mask) == 0 {
            mask <<= 1
            if mask == 0 || mask > aff {
                break Outer
            }
        }
 
        ret, _, err := syscall.Syscall(syscall.SYS_SCHED_SETAFFINITY, uintptr(0), unsafe.Sizeof(mask), uintptr(unsafe.Pointer(&mask)))
        if ret != 0 {
            panic(err.Error())
        }
 
        // what CPU do we have?
        <-time.After(1 * time.Millisecond)
        c := cpu()
 
        if oldn, ok := cpus[c]; ok {
            fmt.Println("cpu", n, "==", oldn, "-- both have CPUID", c)
        }
 
        cpus[c] = n
        mask <<= 1
        n++
    }
 
    fmt.Printf("%d/%d cpus found in %v: %v\n", len(cpus), runtime.NumCPU(), time.Now().Sub(start), cpus)
 
    ret, _, err := syscall.Syscall(syscall.SYS_SCHED_SETAFFINITY, uintptr(0), unsafe.Sizeof(aff), uintptr(unsafe.Pointer(&aff)))
    if ret != 0 {
        panic(err.Error())
    }
}
 
type RWMutex2 []sync.RWMutex
 
func (mx RWMutex2) Lock() {
    for core := range mx {
        mx[core].Lock()
    }
}
 
func (mx RWMutex2) Unlock() {
    for core := range mx {
        mx[core].Unlock()
    }
}
 
func main() {
    cpuprofile := flag.Bool("cpuprofile"false"enable CPU profiling")
    locks := flag.Uint64("i", 10000, "Number of iterations to perform")
    write := flag.Float64("p", 0.0001, "Probability of write locks")
    wwork := flag.Int("w", 1, "Amount of work for each writer")
    rwork := flag.Int("r", 100, "Amount of work for each reader")
    readers := flag.Int("n", runtime.GOMAXPROCS(0), "Total number of readers")
    checkcpu := flag.Uint64("c", 100, "Update CPU estimate every n iterations")
    flag.Parse()
 
    var o *os.File
    if *cpuprofile {
        o, _ := os.Create("rw.out")
        pprof.StartCPUProfile(o)
    }
 
    readers_per_core := *readers / runtime.GOMAXPROCS(0)
 
    var wg sync.WaitGroup
 
    var mx1 sync.RWMutex
 
    start1 := time.Now()
    for n := 0; n < runtime.GOMAXPROCS(0); n++ {
        for r := 0; r < readers_per_core; r++ {
            wg.Add(1)
            go func() {
                defer wg.Done()
                r := rand.New(rand.NewSource(rand.Int63()))
                for n := uint64(0); n < *locks; n++ {
                    if r.Float64() < *write {
                        mx1.Lock()
                        x := 0
                        for i := 0; i < *wwork; i++ {
                            x++
                        }
                        _ = x
                        mx1.Unlock()
                    else {
                        mx1.RLock()
                        x := 0
                        for i := 0; i < *rwork; i++ {
                            x++
                        }
                        _ = x
                        mx1.RUnlock()
                    }
                }
            }()
        }
    }
    wg.Wait()
    end1 := time.Now()
 
    t1 := end1.Sub(start1)
    fmt.Println("mx1", runtime.GOMAXPROCS(0), *readers, *locks, *write, *wwork, *rwork, *checkcpu, t1.Seconds(), t1)
 
    if *cpuprofile {
        pprof.StopCPUProfile()
        o.Close()
 
        o, _ = os.Create("rw2.out")
        pprof.StartCPUProfile(o)
    }
 
    mx2 := make(RWMutex2, len(cpus))
 
    start2 := time.Now()
    for n := 0; n < runtime.GOMAXPROCS(0); n++ {
        for r := 0; r < readers_per_core; r++ {
            wg.Add(1)
            go func() {
                defer wg.Done()
                c := cpus[cpu()]
                r := rand.New(rand.NewSource(rand.Int63()))
                for n := uint64(0); n < *locks; n++ {
                    if *checkcpu != 0 && n%*checkcpu == 0 {
                        c = cpus[cpu()]
                    }
 
                    if r.Float64() < *write {
                        mx2.Lock()
                        x := 0
                        for i := 0; i < *wwork; i++ {
                            x++
                        }
                        _ = x
                        mx2.Unlock()
                    else {
                        mx2[c].RLock()
                        x := 0
                        for i := 0; i < *rwork; i++ {
                            x++
                        }
                        _ = x
                        mx2[c].RUnlock()
                    }
                }
            }()
        }
    }
    wg.Wait()
    end2 := time.Now()
 
    pprof.StopCPUProfile()
    o.Close()
 
    t2 := end2.Sub(start2)
    fmt.Println("mx2", runtime.GOMAXPROCS(0), *readers, *locks, *write, *wwork, *rwork, *checkcpu, t2.Seconds(), t2)
}





酷毙

雷人

鲜花

鸡蛋

漂亮
  • 快毕业了,没工作经验,
    找份工作好难啊?
    赶紧去人才芯片公司磨练吧!!

最新评论

关于LUPA|人才芯片工程|人才招聘|LUPA认证|LUPA教育|LUPA开源社区 ( 浙B2-20090187 浙公网安备 33010602006705号   

返回顶部