Source file
src/runtime/chan.go
Documentation: runtime
1
2
3
4
5 package runtime
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 import (
21 "internal/abi"
22 "runtime/internal/atomic"
23 "runtime/internal/math"
24 "unsafe"
25 )
26
27 const (
28 maxAlign = 8
29 hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
30 debugChan = false
31 )
32
33 type hchan struct {
34 qcount uint
35 dataqsiz uint
36 buf unsafe.Pointer
37 elemsize uint16
38 closed uint32
39 elemtype *_type
40 sendx uint
41 recvx uint
42 recvq waitq
43 sendq waitq
44
45
46
47
48
49
50
51 lock mutex
52 }
53
54 type waitq struct {
55 first *sudog
56 last *sudog
57 }
58
59
60 func reflect_makechan(t *chantype, size int) *hchan {
61 return makechan(t, size)
62 }
63
64 func makechan64(t *chantype, size int64) *hchan {
65 if int64(int(size)) != size {
66 panic(plainError("makechan: size out of range"))
67 }
68
69 return makechan(t, int(size))
70 }
71
72 func makechan(t *chantype, size int) *hchan {
73 elem := t.Elem
74
75
76 if elem.Size_ >= 1<<16 {
77 throw("makechan: invalid channel element type")
78 }
79 if hchanSize%maxAlign != 0 || elem.Align_ > maxAlign {
80 throw("makechan: bad alignment")
81 }
82
83 mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
84 if overflow || mem > maxAlloc-hchanSize || size < 0 {
85 panic(plainError("makechan: size out of range"))
86 }
87
88
89
90
91
92 var c *hchan
93 switch {
94 case mem == 0:
95
96 c = (*hchan)(mallocgc(hchanSize, nil, true))
97
98 c.buf = c.raceaddr()
99 case elem.PtrBytes == 0:
100
101
102 c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
103 c.buf = add(unsafe.Pointer(c), hchanSize)
104 default:
105
106 c = new(hchan)
107 c.buf = mallocgc(mem, elem, true)
108 }
109
110 c.elemsize = uint16(elem.Size_)
111 c.elemtype = elem
112 c.dataqsiz = uint(size)
113 lockInit(&c.lock, lockRankHchan)
114
115 if debugChan {
116 print("makechan: chan=", c, "; elemsize=", elem.Size_, "; dataqsiz=", size, "\n")
117 }
118 return c
119 }
120
121
122 func chanbuf(c *hchan, i uint) unsafe.Pointer {
123 return add(c.buf, uintptr(i)*uintptr(c.elemsize))
124 }
125
126
127
128
129
130 func full(c *hchan) bool {
131
132
133 if c.dataqsiz == 0 {
134
135 return c.recvq.first == nil
136 }
137
138 return c.qcount == c.dataqsiz
139 }
140
141
142
143
144 func chansend1(c *hchan, elem unsafe.Pointer) {
145 chansend(c, elem, true, getcallerpc())
146 }
147
148
160 func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
161 if c == nil {
162 if !block {
163 return false
164 }
165 gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
166 throw("unreachable")
167 }
168
169 if debugChan {
170 print("chansend: chan=", c, "\n")
171 }
172
173 if raceenabled {
174 racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))
175 }
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193 if !block && c.closed == 0 && full(c) {
194 return false
195 }
196
197 var t0 int64
198 if blockprofilerate > 0 {
199 t0 = cputicks()
200 }
201
202 lock(&c.lock)
203
204 if c.closed != 0 {
205 unlock(&c.lock)
206 panic(plainError("send on closed channel"))
207 }
208
209 if sg := c.recvq.dequeue(); sg != nil {
210
211
212 send(c, sg, ep, func() { unlock(&c.lock) }, 3)
213 return true
214 }
215
216 if c.qcount < c.dataqsiz {
217
218 qp := chanbuf(c, c.sendx)
219 if raceenabled {
220 racenotify(c, c.sendx, nil)
221 }
222 typedmemmove(c.elemtype, qp, ep)
223 c.sendx++
224 if c.sendx == c.dataqsiz {
225 c.sendx = 0
226 }
227 c.qcount++
228 unlock(&c.lock)
229 return true
230 }
231
232 if !block {
233 unlock(&c.lock)
234 return false
235 }
236
237
238 gp := getg()
239 mysg := acquireSudog()
240 mysg.releasetime = 0
241 if t0 != 0 {
242 mysg.releasetime = -1
243 }
244
245
246 mysg.elem = ep
247 mysg.waitlink = nil
248 mysg.g = gp
249 mysg.isSelect = false
250 mysg.c = c
251 gp.waiting = mysg
252 gp.param = nil
253 c.sendq.enqueue(mysg)
254
255
256
257
258 gp.parkingOnChan.Store(true)
259 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)
260
261
262
263
264 KeepAlive(ep)
265
266
267 if mysg != gp.waiting {
268 throw("G waiting list is corrupted")
269 }
270 gp.waiting = nil
271 gp.activeStackChans = false
272 closed := !mysg.success
273 gp.param = nil
274 if mysg.releasetime > 0 {
275 blockevent(mysg.releasetime-t0, 2)
276 }
277 mysg.c = nil
278 releaseSudog(mysg)
279 if closed {
280 if c.closed == 0 {
281 throw("chansend: spurious wakeup")
282 }
283 panic(plainError("send on closed channel"))
284 }
285 return true
286 }
287
288
289
290
291
292
293
294 func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
295 if raceenabled {
296 if c.dataqsiz == 0 {
297 racesync(c, sg)
298 } else {
299
300
301
302 racenotify(c, c.recvx, nil)
303 racenotify(c, c.recvx, sg)
304 c.recvx++
305 if c.recvx == c.dataqsiz {
306 c.recvx = 0
307 }
308 c.sendx = c.recvx
309 }
310 }
311 if sg.elem != nil {
312 sendDirect(c.elemtype, sg, ep)
313 sg.elem = nil
314 }
315 gp := sg.g
316 unlockf()
317 gp.param = unsafe.Pointer(sg)
318 sg.success = true
319 if sg.releasetime != 0 {
320 sg.releasetime = cputicks()
321 }
322 goready(gp, skip+1)
323 }
324
325
326
327
328
329
330
331
332
333
334
335 func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
336
337
338
339
340
341 dst := sg.elem
342 typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_)
343
344
345 memmove(dst, src, t.Size_)
346 }
347
348 func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
349
350
351
352 src := sg.elem
353 typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_)
354 memmove(dst, src, t.Size_)
355 }
356
357 func closechan(c *hchan) {
358 if c == nil {
359 panic(plainError("close of nil channel"))
360 }
361
362 lock(&c.lock)
363 if c.closed != 0 {
364 unlock(&c.lock)
365 panic(plainError("close of closed channel"))
366 }
367
368 if raceenabled {
369 callerpc := getcallerpc()
370 racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
371 racerelease(c.raceaddr())
372 }
373
374 c.closed = 1
375
376 var glist gList
377
378
379 for {
380 sg := c.recvq.dequeue()
381 if sg == nil {
382 break
383 }
384 if sg.elem != nil {
385 typedmemclr(c.elemtype, sg.elem)
386 sg.elem = nil
387 }
388 if sg.releasetime != 0 {
389 sg.releasetime = cputicks()
390 }
391 gp := sg.g
392 gp.param = unsafe.Pointer(sg)
393 sg.success = false
394 if raceenabled {
395 raceacquireg(gp, c.raceaddr())
396 }
397 glist.push(gp)
398 }
399
400
401 for {
402 sg := c.sendq.dequeue()
403 if sg == nil {
404 break
405 }
406 sg.elem = nil
407 if sg.releasetime != 0 {
408 sg.releasetime = cputicks()
409 }
410 gp := sg.g
411 gp.param = unsafe.Pointer(sg)
412 sg.success = false
413 if raceenabled {
414 raceacquireg(gp, c.raceaddr())
415 }
416 glist.push(gp)
417 }
418 unlock(&c.lock)
419
420
421 for !glist.empty() {
422 gp := glist.pop()
423 gp.schedlink = 0
424 goready(gp, 3)
425 }
426 }
427
428
429
430 func empty(c *hchan) bool {
431
432 if c.dataqsiz == 0 {
433 return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
434 }
435 return atomic.Loaduint(&c.qcount) == 0
436 }
437
438
439
440
441 func chanrecv1(c *hchan, elem unsafe.Pointer) {
442 chanrecv(c, elem, true)
443 }
444
445
446 func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
447 _, received = chanrecv(c, elem, true)
448 return
449 }
450
451
452
453
454
455
456
457 func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
458
459
460
461 if debugChan {
462 print("chanrecv: chan=", c, "\n")
463 }
464
465 if c == nil {
466 if !block {
467 return
468 }
469 gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
470 throw("unreachable")
471 }
472
473
474 if !block && empty(c) {
475
476
477
478
479
480
481
482
483
484 if atomic.Load(&c.closed) == 0 {
485
486
487
488
489 return
490 }
491
492
493
494 if empty(c) {
495
496 if raceenabled {
497 raceacquire(c.raceaddr())
498 }
499 if ep != nil {
500 typedmemclr(c.elemtype, ep)
501 }
502 return true, false
503 }
504 }
505
506 var t0 int64
507 if blockprofilerate > 0 {
508 t0 = cputicks()
509 }
510
511 lock(&c.lock)
512
513 if c.closed != 0 {
514 if c.qcount == 0 {
515 if raceenabled {
516 raceacquire(c.raceaddr())
517 }
518 unlock(&c.lock)
519 if ep != nil {
520 typedmemclr(c.elemtype, ep)
521 }
522 return true, false
523 }
524
525 } else {
526
527 if sg := c.sendq.dequeue(); sg != nil {
528
529
530
531
532 recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
533 return true, true
534 }
535 }
536
537 if c.qcount > 0 {
538
539 qp := chanbuf(c, c.recvx)
540 if raceenabled {
541 racenotify(c, c.recvx, nil)
542 }
543 if ep != nil {
544 typedmemmove(c.elemtype, ep, qp)
545 }
546 typedmemclr(c.elemtype, qp)
547 c.recvx++
548 if c.recvx == c.dataqsiz {
549 c.recvx = 0
550 }
551 c.qcount--
552 unlock(&c.lock)
553 return true, true
554 }
555
556 if !block {
557 unlock(&c.lock)
558 return false, false
559 }
560
561
562 gp := getg()
563 mysg := acquireSudog()
564 mysg.releasetime = 0
565 if t0 != 0 {
566 mysg.releasetime = -1
567 }
568
569
570 mysg.elem = ep
571 mysg.waitlink = nil
572 gp.waiting = mysg
573 mysg.g = gp
574 mysg.isSelect = false
575 mysg.c = c
576 gp.param = nil
577 c.recvq.enqueue(mysg)
578
579
580
581
582 gp.parkingOnChan.Store(true)
583 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)
584
585
586 if mysg != gp.waiting {
587 throw("G waiting list is corrupted")
588 }
589 gp.waiting = nil
590 gp.activeStackChans = false
591 if mysg.releasetime > 0 {
592 blockevent(mysg.releasetime-t0, 2)
593 }
594 success := mysg.success
595 gp.param = nil
596 mysg.c = nil
597 releaseSudog(mysg)
598 return true, success
599 }
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615 func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
616 if c.dataqsiz == 0 {
617 if raceenabled {
618 racesync(c, sg)
619 }
620 if ep != nil {
621
622 recvDirect(c.elemtype, sg, ep)
623 }
624 } else {
625
626
627
628
629 qp := chanbuf(c, c.recvx)
630 if raceenabled {
631 racenotify(c, c.recvx, nil)
632 racenotify(c, c.recvx, sg)
633 }
634
635 if ep != nil {
636 typedmemmove(c.elemtype, ep, qp)
637 }
638
639 typedmemmove(c.elemtype, qp, sg.elem)
640 c.recvx++
641 if c.recvx == c.dataqsiz {
642 c.recvx = 0
643 }
644 c.sendx = c.recvx
645 }
646 sg.elem = nil
647 gp := sg.g
648 unlockf()
649 gp.param = unsafe.Pointer(sg)
650 sg.success = true
651 if sg.releasetime != 0 {
652 sg.releasetime = cputicks()
653 }
654 goready(gp, skip+1)
655 }
656
657 func chanparkcommit(gp *g, chanLock unsafe.Pointer) bool {
658
659
660
661
662
663 gp.activeStackChans = true
664
665
666
667 gp.parkingOnChan.Store(false)
668
669
670
671
672
673 unlock((*mutex)(chanLock))
674 return true
675 }
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693 func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
694 return chansend(c, elem, false, getcallerpc())
695 }
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713 func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
714 return chanrecv(c, elem, false)
715 }
716
717
718 func reflect_chansend(c *hchan, elem unsafe.Pointer, nb bool) (selected bool) {
719 return chansend(c, elem, !nb, getcallerpc())
720 }
721
722
723 func reflect_chanrecv(c *hchan, nb bool, elem unsafe.Pointer) (selected bool, received bool) {
724 return chanrecv(c, elem, !nb)
725 }
726
727
728 func reflect_chanlen(c *hchan) int {
729 if c == nil {
730 return 0
731 }
732 return int(c.qcount)
733 }
734
735
736 func reflectlite_chanlen(c *hchan) int {
737 if c == nil {
738 return 0
739 }
740 return int(c.qcount)
741 }
742
743
744 func reflect_chancap(c *hchan) int {
745 if c == nil {
746 return 0
747 }
748 return int(c.dataqsiz)
749 }
750
751
752 func reflect_chanclose(c *hchan) {
753 closechan(c)
754 }
755
756 func (q *waitq) enqueue(sgp *sudog) {
757 sgp.next = nil
758 x := q.last
759 if x == nil {
760 sgp.prev = nil
761 q.first = sgp
762 q.last = sgp
763 return
764 }
765 sgp.prev = x
766 x.next = sgp
767 q.last = sgp
768 }
769
770 func (q *waitq) dequeue() *sudog {
771 for {
772 sgp := q.first
773 if sgp == nil {
774 return nil
775 }
776 y := sgp.next
777 if y == nil {
778 q.first = nil
779 q.last = nil
780 } else {
781 y.prev = nil
782 q.first = y
783 sgp.next = nil
784 }
785
786
787
788
789
790
791
792
793
794 if sgp.isSelect && !sgp.g.selectDone.CompareAndSwap(0, 1) {
795 continue
796 }
797
798 return sgp
799 }
800 }
801
802 func (c *hchan) raceaddr() unsafe.Pointer {
803
804
805
806
807
808 return unsafe.Pointer(&c.buf)
809 }
810
811 func racesync(c *hchan, sg *sudog) {
812 racerelease(chanbuf(c, 0))
813 raceacquireg(sg.g, chanbuf(c, 0))
814 racereleaseg(sg.g, chanbuf(c, 0))
815 raceacquire(chanbuf(c, 0))
816 }
817
818
819
820
821 func racenotify(c *hchan, idx uint, sg *sudog) {
822
823
824
825
826
827
828
829 qp := chanbuf(c, idx)
830
831
832
833
834
835
836 if c.elemsize == 0 {
837 if sg == nil {
838 raceacquire(qp)
839 racerelease(qp)
840 } else {
841 raceacquireg(sg.g, qp)
842 racereleaseg(sg.g, qp)
843 }
844 } else {
845 if sg == nil {
846 racereleaseacquire(qp)
847 } else {
848 racereleaseacquireg(sg.g, qp)
849 }
850 }
851 }
852
View as plain text