1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
| func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
mp := getg().m
top:
mp.clearAllpSnapshot()
pp := mp.p.ptr()
now, pollUntil, _ := pp.timers.check(0, nil)
// 1. 如果gcBlackenEnabled=1,表明在gc过程中,获取一个gc worker去执行
if gcBlackenEnabled != 0 {
gp, tnow := gcController.findRunnableGCWorker(pp, now)
if gp != nil {
return gp, false, true
}
now = tnow
}
// 2. 每隔61次,执行一次全局待运行的g
if pp.schedtick%61 == 0 && !sched.runq.empty() {
lock(&sched.lock)
gp := globrunqget()
unlock(&sched.lock)
if gp != nil {
return gp, false, false
}
}
// 3. 从p.runnext或者p.runq里面获取一个可执行g
if gp, inheritTime := runqget(pp); gp != nil {
return gp, inheritTime, false
}
// 4. 从全局sched.runq里面获取一批g,放入到p的队列
if !sched.runq.empty() {
lock(&sched.lock)
gp, q := globrunqgetbatch(int32(len(pp.runq)) / 2)
unlock(&sched.lock)
if gp != nil {
if runqputbatch(pp, &q); !q.empty() {
throw("Couldn't put Gs into empty local runq")
}
return gp, false, false
}
}
// 5. 非阻塞的方式获取网络read的相关g
if netpollinited() && netpollAnyWaiters() && sched.lastpoll.Load() != 0 && sched.pollingNet.Swap(1) == 0 {
list, delta := netpoll(0)
sched.pollingNet.Store(0)
if !list.empty() {
gp := list.pop()
injectglist(&list)
netpollAdjustWaiters(delta)
trace := traceAcquire()
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.ok() {
trace.GoUnpark(gp, 0)
traceRelease(trace)
}
return gp, false, false
}
}
// 6. 通过stealwork去其他p获取一批g列表
if mp.spinning || 2*sched.nmspinning.Load() < gomaxprocs-sched.npidle.Load() {
if !mp.spinning {
mp.becomeSpinning()
}
gp, inheritTime, tnow, w, newWork := stealWork(now)
if gp != nil {
return gp, inheritTime, false
}
if newWork {
goto top
}
now = tnow
if w != 0 && (pollUntil == 0 || w < pollUntil) {
pollUntil = w
}
}
// 7. 再检查下gc任务
if gcBlackenEnabled != 0 && gcMarkWorkAvailable(pp) && gcController.addIdleMarkWorker() {
node := (*gcBgMarkWorkerNode)(gcBgMarkWorkerPool.pop())
if node != nil {
pp.gcMarkWorkerMode = gcMarkWorkerIdleMode
gp := node.gp.ptr()
casgstatus(gp, _Gwaiting, _Grunnable)
return gp, false, false
}
gcController.removeIdleMarkWorker()
}
// 8. 再去全局sched.runq里面获取可执行的g
lock(&sched.lock)
if !sched.runq.empty() {
gp, q := globrunqgetbatch(int32(len(pp.runq)) / 2)
unlock(&sched.lock)
if gp == nil {
throw("global runq empty with non-zero runqsize")
}
if runqputbatch(pp, &q); !q.empty() {
throw("Couldn't put Gs into empty local runq")
}
return gp, false, false
}
// 9. 将p放入idle列表,然后调用netpoll等待网络相关的g ready
now = pidleput(pp, now)
unlock(&sched.lock)
if netpollinited() && (netpollAnyWaiters() || pollUntil != 0) && sched.lastpoll.Swap(0) != 0 {
list, delta := netpoll(delay) // 阻塞直到新工作可用
now = nanotime()
sched.pollUntil.Store(0)
sched.lastpoll.Store(now)
lock(&sched.lock)
pp, _ := pidleget(now)
unlock(&sched.lock)
if pp == nil {
injectglist(&list)
netpollAdjustWaiters(delta)
} else {
acquirep(pp)
if !list.empty() {
gp := list.pop()
injectglist(&list)
netpollAdjustWaiters(delta)
casgstatus(gp, _Gwaiting, _Grunnable)
return gp, false, false
}
goto top
}
}
stopm()
goto top
}
|