Source file
src/runtime/select.go
Documentation: runtime
1
2
3
4
5 package runtime
6
7
8
9 import (
10 "internal/abi"
11 "unsafe"
12 )
13
14 const debugSelect = false
15
16
17
18
19 type scase struct {
20 c *hchan
21 elem unsafe.Pointer
22 }
23
24 var (
25 chansendpc = abi.FuncPCABIInternal(chansend)
26 chanrecvpc = abi.FuncPCABIInternal(chanrecv)
27 )
28
29 func selectsetpc(pc *uintptr) {
30 *pc = getcallerpc()
31 }
32
33 func sellock(scases []scase, lockorder []uint16) {
34 var c *hchan
35 for _, o := range lockorder {
36 c0 := scases[o].c
37 if c0 != c {
38 c = c0
39 lock(&c.lock)
40 }
41 }
42 }
43
44 func selunlock(scases []scase, lockorder []uint16) {
45
46
47
48
49
50
51
52
53 for i := len(lockorder) - 1; i >= 0; i-- {
54 c := scases[lockorder[i]].c
55 if i > 0 && c == scases[lockorder[i-1]].c {
56 continue
57 }
58 unlock(&c.lock)
59 }
60 }
61
62 func selparkcommit(gp *g, _ unsafe.Pointer) bool {
63
64
65
66
67
68 gp.activeStackChans = true
69
70
71
72 gp.parkingOnChan.Store(false)
73
74
75
76
77
78
79
80
81
82
83 var lastc *hchan
84 for sg := gp.waiting; sg != nil; sg = sg.waitlink {
85 if sg.c != lastc && lastc != nil {
86
87
88
89
90
91
92 unlock(&lastc.lock)
93 }
94 lastc = sg.c
95 }
96 if lastc != nil {
97 unlock(&lastc.lock)
98 }
99 return true
100 }
101
102 func block() {
103 gopark(nil, nil, waitReasonSelectNoCases, traceBlockForever, 1)
104 }
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121 func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) {
122 if debugSelect {
123 print("select: cas0=", cas0, "\n")
124 }
125
126
127
128 cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
129 order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
130
131 ncases := nsends + nrecvs
132 scases := cas1[:ncases:ncases]
133 pollorder := order1[:ncases:ncases]
134 lockorder := order1[ncases:][:ncases:ncases]
135
136
137
138
139
140 var pcs []uintptr
141 if raceenabled && pc0 != nil {
142 pc1 := (*[1 << 16]uintptr)(unsafe.Pointer(pc0))
143 pcs = pc1[:ncases:ncases]
144 }
145 casePC := func(casi int) uintptr {
146 if pcs == nil {
147 return 0
148 }
149 return pcs[casi]
150 }
151
152 var t0 int64
153 if blockprofilerate > 0 {
154 t0 = cputicks()
155 }
156
157
158
159
160
161
162
163
164
165
166 norder := 0
167 for i := range scases {
168 cas := &scases[i]
169
170
171 if cas.c == nil {
172 cas.elem = nil
173 continue
174 }
175
176 j := cheaprandn(uint32(norder + 1))
177 pollorder[norder] = pollorder[j]
178 pollorder[j] = uint16(i)
179 norder++
180 }
181 pollorder = pollorder[:norder]
182 lockorder = lockorder[:norder]
183
184
185
186 for i := range lockorder {
187 j := i
188
189 c := scases[pollorder[i]].c
190 for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {
191 k := (j - 1) / 2
192 lockorder[j] = lockorder[k]
193 j = k
194 }
195 lockorder[j] = pollorder[i]
196 }
197 for i := len(lockorder) - 1; i >= 0; i-- {
198 o := lockorder[i]
199 c := scases[o].c
200 lockorder[i] = lockorder[0]
201 j := 0
202 for {
203 k := j*2 + 1
204 if k >= i {
205 break
206 }
207 if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {
208 k++
209 }
210 if c.sortkey() < scases[lockorder[k]].c.sortkey() {
211 lockorder[j] = lockorder[k]
212 j = k
213 continue
214 }
215 break
216 }
217 lockorder[j] = o
218 }
219
220 if debugSelect {
221 for i := 0; i+1 < len(lockorder); i++ {
222 if scases[lockorder[i]].c.sortkey() > scases[lockorder[i+1]].c.sortkey() {
223 print("i=", i, " x=", lockorder[i], " y=", lockorder[i+1], "\n")
224 throw("select: broken sort")
225 }
226 }
227 }
228
229
230 sellock(scases, lockorder)
231
232 var (
233 gp *g
234 sg *sudog
235 c *hchan
236 k *scase
237 sglist *sudog
238 sgnext *sudog
239 qp unsafe.Pointer
240 nextp **sudog
241 )
242
243
244 var casi int
245 var cas *scase
246 var caseSuccess bool
247 var caseReleaseTime int64 = -1
248 var recvOK bool
249 for _, casei := range pollorder {
250 casi = int(casei)
251 cas = &scases[casi]
252 c = cas.c
253
254 if casi >= nsends {
255 sg = c.sendq.dequeue()
256 if sg != nil {
257 goto recv
258 }
259 if c.qcount > 0 {
260 goto bufrecv
261 }
262 if c.closed != 0 {
263 goto rclose
264 }
265 } else {
266 if raceenabled {
267 racereadpc(c.raceaddr(), casePC(casi), chansendpc)
268 }
269 if c.closed != 0 {
270 goto sclose
271 }
272 sg = c.recvq.dequeue()
273 if sg != nil {
274 goto send
275 }
276 if c.qcount < c.dataqsiz {
277 goto bufsend
278 }
279 }
280 }
281
282 if !block {
283 selunlock(scases, lockorder)
284 casi = -1
285 goto retc
286 }
287
288
289 gp = getg()
290 if gp.waiting != nil {
291 throw("gp.waiting != nil")
292 }
293 nextp = &gp.waiting
294 for _, casei := range lockorder {
295 casi = int(casei)
296 cas = &scases[casi]
297 c = cas.c
298 sg := acquireSudog()
299 sg.g = gp
300 sg.isSelect = true
301
302
303 sg.elem = cas.elem
304 sg.releasetime = 0
305 if t0 != 0 {
306 sg.releasetime = -1
307 }
308 sg.c = c
309
310 *nextp = sg
311 nextp = &sg.waitlink
312
313 if casi < nsends {
314 c.sendq.enqueue(sg)
315 } else {
316 c.recvq.enqueue(sg)
317 }
318 }
319
320
321 gp.param = nil
322
323
324
325
326 gp.parkingOnChan.Store(true)
327 gopark(selparkcommit, nil, waitReasonSelect, traceBlockSelect, 1)
328 gp.activeStackChans = false
329
330 sellock(scases, lockorder)
331
332 gp.selectDone.Store(0)
333 sg = (*sudog)(gp.param)
334 gp.param = nil
335
336
337
338
339
340 casi = -1
341 cas = nil
342 caseSuccess = false
343 sglist = gp.waiting
344
345 for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
346 sg1.isSelect = false
347 sg1.elem = nil
348 sg1.c = nil
349 }
350 gp.waiting = nil
351
352 for _, casei := range lockorder {
353 k = &scases[casei]
354 if sg == sglist {
355
356 casi = int(casei)
357 cas = k
358 caseSuccess = sglist.success
359 if sglist.releasetime > 0 {
360 caseReleaseTime = sglist.releasetime
361 }
362 } else {
363 c = k.c
364 if int(casei) < nsends {
365 c.sendq.dequeueSudoG(sglist)
366 } else {
367 c.recvq.dequeueSudoG(sglist)
368 }
369 }
370 sgnext = sglist.waitlink
371 sglist.waitlink = nil
372 releaseSudog(sglist)
373 sglist = sgnext
374 }
375
376 if cas == nil {
377 throw("selectgo: bad wakeup")
378 }
379
380 c = cas.c
381
382 if debugSelect {
383 print("wait-return: cas0=", cas0, " c=", c, " cas=", cas, " send=", casi < nsends, "\n")
384 }
385
386 if casi < nsends {
387 if !caseSuccess {
388 goto sclose
389 }
390 } else {
391 recvOK = caseSuccess
392 }
393
394 if raceenabled {
395 if casi < nsends {
396 raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
397 } else if cas.elem != nil {
398 raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc)
399 }
400 }
401 if msanenabled {
402 if casi < nsends {
403 msanread(cas.elem, c.elemtype.Size_)
404 } else if cas.elem != nil {
405 msanwrite(cas.elem, c.elemtype.Size_)
406 }
407 }
408 if asanenabled {
409 if casi < nsends {
410 asanread(cas.elem, c.elemtype.Size_)
411 } else if cas.elem != nil {
412 asanwrite(cas.elem, c.elemtype.Size_)
413 }
414 }
415
416 selunlock(scases, lockorder)
417 goto retc
418
419 bufrecv:
420
421 if raceenabled {
422 if cas.elem != nil {
423 raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc)
424 }
425 racenotify(c, c.recvx, nil)
426 }
427 if msanenabled && cas.elem != nil {
428 msanwrite(cas.elem, c.elemtype.Size_)
429 }
430 if asanenabled && cas.elem != nil {
431 asanwrite(cas.elem, c.elemtype.Size_)
432 }
433 recvOK = true
434 qp = chanbuf(c, c.recvx)
435 if cas.elem != nil {
436 typedmemmove(c.elemtype, cas.elem, qp)
437 }
438 typedmemclr(c.elemtype, qp)
439 c.recvx++
440 if c.recvx == c.dataqsiz {
441 c.recvx = 0
442 }
443 c.qcount--
444 selunlock(scases, lockorder)
445 goto retc
446
447 bufsend:
448
449 if raceenabled {
450 racenotify(c, c.sendx, nil)
451 raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
452 }
453 if msanenabled {
454 msanread(cas.elem, c.elemtype.Size_)
455 }
456 if asanenabled {
457 asanread(cas.elem, c.elemtype.Size_)
458 }
459 typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
460 c.sendx++
461 if c.sendx == c.dataqsiz {
462 c.sendx = 0
463 }
464 c.qcount++
465 selunlock(scases, lockorder)
466 goto retc
467
468 recv:
469
470 recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
471 if debugSelect {
472 print("syncrecv: cas0=", cas0, " c=", c, "\n")
473 }
474 recvOK = true
475 goto retc
476
477 rclose:
478
479 selunlock(scases, lockorder)
480 recvOK = false
481 if cas.elem != nil {
482 typedmemclr(c.elemtype, cas.elem)
483 }
484 if raceenabled {
485 raceacquire(c.raceaddr())
486 }
487 goto retc
488
489 send:
490
491 if raceenabled {
492 raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
493 }
494 if msanenabled {
495 msanread(cas.elem, c.elemtype.Size_)
496 }
497 if asanenabled {
498 asanread(cas.elem, c.elemtype.Size_)
499 }
500 send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
501 if debugSelect {
502 print("syncsend: cas0=", cas0, " c=", c, "\n")
503 }
504 goto retc
505
506 retc:
507 if caseReleaseTime > 0 {
508 blockevent(caseReleaseTime-t0, 1)
509 }
510 return casi, recvOK
511
512 sclose:
513
514 selunlock(scases, lockorder)
515 panic(plainError("send on closed channel"))
516 }
517
518 func (c *hchan) sortkey() uintptr {
519 return uintptr(unsafe.Pointer(c))
520 }
521
522
523
524 type runtimeSelect struct {
525 dir selectDir
526 typ unsafe.Pointer
527 ch *hchan
528 val unsafe.Pointer
529 }
530
531
532 type selectDir int
533
534 const (
535 _ selectDir = iota
536 selectSend
537 selectRecv
538 selectDefault
539 )
540
541
542 func reflect_rselect(cases []runtimeSelect) (int, bool) {
543 if len(cases) == 0 {
544 block()
545 }
546 sel := make([]scase, len(cases))
547 orig := make([]int, len(cases))
548 nsends, nrecvs := 0, 0
549 dflt := -1
550 for i, rc := range cases {
551 var j int
552 switch rc.dir {
553 case selectDefault:
554 dflt = i
555 continue
556 case selectSend:
557 j = nsends
558 nsends++
559 case selectRecv:
560 nrecvs++
561 j = len(cases) - nrecvs
562 }
563
564 sel[j] = scase{c: rc.ch, elem: rc.val}
565 orig[j] = i
566 }
567
568
569 if nsends+nrecvs == 0 {
570 return dflt, false
571 }
572
573
574 if nsends+nrecvs < len(cases) {
575 copy(sel[nsends:], sel[len(cases)-nrecvs:])
576 copy(orig[nsends:], orig[len(cases)-nrecvs:])
577 }
578
579 order := make([]uint16, 2*(nsends+nrecvs))
580 var pc0 *uintptr
581 if raceenabled {
582 pcs := make([]uintptr, nsends+nrecvs)
583 for i := range pcs {
584 selectsetpc(&pcs[i])
585 }
586 pc0 = &pcs[0]
587 }
588
589 chosen, recvOK := selectgo(&sel[0], &order[0], pc0, nsends, nrecvs, dflt == -1)
590
591
592 if chosen < 0 {
593 chosen = dflt
594 } else {
595 chosen = orig[chosen]
596 }
597 return chosen, recvOK
598 }
599
600 func (q *waitq) dequeueSudoG(sgp *sudog) {
601 x := sgp.prev
602 y := sgp.next
603 if x != nil {
604 if y != nil {
605
606 x.next = y
607 y.prev = x
608 sgp.next = nil
609 sgp.prev = nil
610 return
611 }
612
613 x.next = nil
614 q.last = x
615 sgp.prev = nil
616 return
617 }
618 if y != nil {
619
620 y.prev = nil
621 q.first = y
622 sgp.next = nil
623 return
624 }
625
626
627
628 if q.first == sgp {
629 q.first = nil
630 q.last = nil
631 }
632 }
633
View as plain text