Source file
src/runtime/netpoll.go
Documentation: runtime
1
2
3
4
5
6
7 package runtime
8
9 import (
10 "runtime/internal/atomic"
11 "runtime/internal/sys"
12 "unsafe"
13 )
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 const (
45 pollNoError = 0
46 pollErrClosing = 1
47 pollErrTimeout = 2
48 pollErrNotPollable = 3
49 )
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64 const (
65 pdNil uintptr = 0
66 pdReady uintptr = 1
67 pdWait uintptr = 2
68 )
69
70 const pollBlockSize = 4 * 1024
71
72
73
74
75 type pollDesc struct {
76 _ sys.NotInHeap
77 link *pollDesc
78 fd uintptr
79 fdseq atomic.Uintptr
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96 atomicInfo atomic.Uint32
97
98
99
100 rg atomic.Uintptr
101 wg atomic.Uintptr
102
103 lock mutex
104 closing bool
105 user uint32
106 rseq uintptr
107 rt timer
108 rd int64
109 wseq uintptr
110 wt timer
111 wd int64
112 self *pollDesc
113 }
114
115
116
117
118
119 type pollInfo uint32
120
121 const (
122 pollClosing = 1 << iota
123 pollEventErr
124 pollExpiredReadDeadline
125 pollExpiredWriteDeadline
126 pollFDSeq
127 )
128
129 const (
130 pollFDSeqBits = 20
131 pollFDSeqMask = 1<<pollFDSeqBits - 1
132 )
133
134 func (i pollInfo) closing() bool { return i&pollClosing != 0 }
135 func (i pollInfo) eventErr() bool { return i&pollEventErr != 0 }
136 func (i pollInfo) expiredReadDeadline() bool { return i&pollExpiredReadDeadline != 0 }
137 func (i pollInfo) expiredWriteDeadline() bool { return i&pollExpiredWriteDeadline != 0 }
138
139
140 func (pd *pollDesc) info() pollInfo {
141 return pollInfo(pd.atomicInfo.Load())
142 }
143
144
145
146
147
148
149
150
151 func (pd *pollDesc) publishInfo() {
152 var info uint32
153 if pd.closing {
154 info |= pollClosing
155 }
156 if pd.rd < 0 {
157 info |= pollExpiredReadDeadline
158 }
159 if pd.wd < 0 {
160 info |= pollExpiredWriteDeadline
161 }
162 info |= uint32(pd.fdseq.Load()&pollFDSeqMask) << pollFDSeq
163
164
165 x := pd.atomicInfo.Load()
166 for !pd.atomicInfo.CompareAndSwap(x, (x&pollEventErr)|info) {
167 x = pd.atomicInfo.Load()
168 }
169 }
170
171
172
173
174 func (pd *pollDesc) setEventErr(b bool, seq uintptr) {
175 mSeq := uint32(seq & pollFDSeqMask)
176 x := pd.atomicInfo.Load()
177 xSeq := (x >> pollFDSeq) & pollFDSeqMask
178 if seq != 0 && xSeq != mSeq {
179 return
180 }
181 for (x&pollEventErr != 0) != b && !pd.atomicInfo.CompareAndSwap(x, x^pollEventErr) {
182 x = pd.atomicInfo.Load()
183 xSeq := (x >> pollFDSeq) & pollFDSeqMask
184 if seq != 0 && xSeq != mSeq {
185 return
186 }
187 }
188 }
189
190 type pollCache struct {
191 lock mutex
192 first *pollDesc
193
194
195
196
197
198 }
199
200 var (
201 netpollInitLock mutex
202 netpollInited atomic.Uint32
203
204 pollcache pollCache
205 netpollWaiters atomic.Uint32
206 )
207
208
209 func poll_runtime_pollServerInit() {
210 netpollGenericInit()
211 }
212
213 func netpollGenericInit() {
214 if netpollInited.Load() == 0 {
215 lockInit(&netpollInitLock, lockRankNetpollInit)
216 lock(&netpollInitLock)
217 if netpollInited.Load() == 0 {
218 netpollinit()
219 netpollInited.Store(1)
220 }
221 unlock(&netpollInitLock)
222 }
223 }
224
225 func netpollinited() bool {
226 return netpollInited.Load() != 0
227 }
228
229
230
231
232
233 func poll_runtime_isPollServerDescriptor(fd uintptr) bool {
234 return netpollIsPollDescriptor(fd)
235 }
236
237
238 func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
239 pd := pollcache.alloc()
240 lock(&pd.lock)
241 wg := pd.wg.Load()
242 if wg != pdNil && wg != pdReady {
243 throw("runtime: blocked write on free polldesc")
244 }
245 rg := pd.rg.Load()
246 if rg != pdNil && rg != pdReady {
247 throw("runtime: blocked read on free polldesc")
248 }
249 pd.fd = fd
250 if pd.fdseq.Load() == 0 {
251
252 pd.fdseq.Store(1)
253 }
254 pd.closing = false
255 pd.setEventErr(false, 0)
256 pd.rseq++
257 pd.rg.Store(pdNil)
258 pd.rd = 0
259 pd.wseq++
260 pd.wg.Store(pdNil)
261 pd.wd = 0
262 pd.self = pd
263 pd.publishInfo()
264 unlock(&pd.lock)
265
266 errno := netpollopen(fd, pd)
267 if errno != 0 {
268 pollcache.free(pd)
269 return nil, int(errno)
270 }
271 return pd, 0
272 }
273
274
275 func poll_runtime_pollClose(pd *pollDesc) {
276 if !pd.closing {
277 throw("runtime: close polldesc w/o unblock")
278 }
279 wg := pd.wg.Load()
280 if wg != pdNil && wg != pdReady {
281 throw("runtime: blocked write on closing polldesc")
282 }
283 rg := pd.rg.Load()
284 if rg != pdNil && rg != pdReady {
285 throw("runtime: blocked read on closing polldesc")
286 }
287 netpollclose(pd.fd)
288 pollcache.free(pd)
289 }
290
291 func (c *pollCache) free(pd *pollDesc) {
292
293
294 lock(&pd.lock)
295
296
297
298 fdseq := pd.fdseq.Load()
299 fdseq = (fdseq + 1) & (1<<taggedPointerBits - 1)
300 pd.fdseq.Store(fdseq)
301
302 pd.publishInfo()
303
304 unlock(&pd.lock)
305
306 lock(&c.lock)
307 pd.link = c.first
308 c.first = pd
309 unlock(&c.lock)
310 }
311
312
313
314
315
316
317 func poll_runtime_pollReset(pd *pollDesc, mode int) int {
318 errcode := netpollcheckerr(pd, int32(mode))
319 if errcode != pollNoError {
320 return errcode
321 }
322 if mode == 'r' {
323 pd.rg.Store(pdNil)
324 } else if mode == 'w' {
325 pd.wg.Store(pdNil)
326 }
327 return pollNoError
328 }
329
330
331
332
333
334
335
336 func poll_runtime_pollWait(pd *pollDesc, mode int) int {
337 errcode := netpollcheckerr(pd, int32(mode))
338 if errcode != pollNoError {
339 return errcode
340 }
341
342 if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" || GOOS == "wasip1" {
343 netpollarm(pd, mode)
344 }
345 for !netpollblock(pd, int32(mode), false) {
346 errcode = netpollcheckerr(pd, int32(mode))
347 if errcode != pollNoError {
348 return errcode
349 }
350
351
352
353 }
354 return pollNoError
355 }
356
357
358 func poll_runtime_pollWaitCanceled(pd *pollDesc, mode int) {
359
360
361 for !netpollblock(pd, int32(mode), true) {
362 }
363 }
364
365
366 func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
367 lock(&pd.lock)
368 if pd.closing {
369 unlock(&pd.lock)
370 return
371 }
372 rd0, wd0 := pd.rd, pd.wd
373 combo0 := rd0 > 0 && rd0 == wd0
374 if d > 0 {
375 d += nanotime()
376 if d <= 0 {
377
378
379 d = 1<<63 - 1
380 }
381 }
382 if mode == 'r' || mode == 'r'+'w' {
383 pd.rd = d
384 }
385 if mode == 'w' || mode == 'r'+'w' {
386 pd.wd = d
387 }
388 pd.publishInfo()
389 combo := pd.rd > 0 && pd.rd == pd.wd
390 rtf := netpollReadDeadline
391 if combo {
392 rtf = netpollDeadline
393 }
394 if pd.rt.f == nil {
395 if pd.rd > 0 {
396 pd.rt.f = rtf
397
398
399
400 pd.rt.arg = pd.makeArg()
401 pd.rt.seq = pd.rseq
402 resettimer(&pd.rt, pd.rd)
403 }
404 } else if pd.rd != rd0 || combo != combo0 {
405 pd.rseq++
406 if pd.rd > 0 {
407 modtimer(&pd.rt, pd.rd, 0, rtf, pd.makeArg(), pd.rseq)
408 } else {
409 deltimer(&pd.rt)
410 pd.rt.f = nil
411 }
412 }
413 if pd.wt.f == nil {
414 if pd.wd > 0 && !combo {
415 pd.wt.f = netpollWriteDeadline
416 pd.wt.arg = pd.makeArg()
417 pd.wt.seq = pd.wseq
418 resettimer(&pd.wt, pd.wd)
419 }
420 } else if pd.wd != wd0 || combo != combo0 {
421 pd.wseq++
422 if pd.wd > 0 && !combo {
423 modtimer(&pd.wt, pd.wd, 0, netpollWriteDeadline, pd.makeArg(), pd.wseq)
424 } else {
425 deltimer(&pd.wt)
426 pd.wt.f = nil
427 }
428 }
429
430
431 delta := int32(0)
432 var rg, wg *g
433 if pd.rd < 0 {
434 rg = netpollunblock(pd, 'r', false, &delta)
435 }
436 if pd.wd < 0 {
437 wg = netpollunblock(pd, 'w', false, &delta)
438 }
439 unlock(&pd.lock)
440 if rg != nil {
441 netpollgoready(rg, 3)
442 }
443 if wg != nil {
444 netpollgoready(wg, 3)
445 }
446 netpollAdjustWaiters(delta)
447 }
448
449
450 func poll_runtime_pollUnblock(pd *pollDesc) {
451 lock(&pd.lock)
452 if pd.closing {
453 throw("runtime: unblock on closing polldesc")
454 }
455 pd.closing = true
456 pd.rseq++
457 pd.wseq++
458 var rg, wg *g
459 pd.publishInfo()
460 delta := int32(0)
461 rg = netpollunblock(pd, 'r', false, &delta)
462 wg = netpollunblock(pd, 'w', false, &delta)
463 if pd.rt.f != nil {
464 deltimer(&pd.rt)
465 pd.rt.f = nil
466 }
467 if pd.wt.f != nil {
468 deltimer(&pd.wt)
469 pd.wt.f = nil
470 }
471 unlock(&pd.lock)
472 if rg != nil {
473 netpollgoready(rg, 3)
474 }
475 if wg != nil {
476 netpollgoready(wg, 3)
477 }
478 netpollAdjustWaiters(delta)
479 }
480
481
482
483
484
485
486
487
488
489
490
491
492 func netpollready(toRun *gList, pd *pollDesc, mode int32) int32 {
493 delta := int32(0)
494 var rg, wg *g
495 if mode == 'r' || mode == 'r'+'w' {
496 rg = netpollunblock(pd, 'r', true, &delta)
497 }
498 if mode == 'w' || mode == 'r'+'w' {
499 wg = netpollunblock(pd, 'w', true, &delta)
500 }
501 if rg != nil {
502 toRun.push(rg)
503 }
504 if wg != nil {
505 toRun.push(wg)
506 }
507 return delta
508 }
509
510 func netpollcheckerr(pd *pollDesc, mode int32) int {
511 info := pd.info()
512 if info.closing() {
513 return pollErrClosing
514 }
515 if (mode == 'r' && info.expiredReadDeadline()) || (mode == 'w' && info.expiredWriteDeadline()) {
516 return pollErrTimeout
517 }
518
519
520
521 if mode == 'r' && info.eventErr() {
522 return pollErrNotPollable
523 }
524 return pollNoError
525 }
526
527 func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
528 r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
529 if r {
530
531
532
533 netpollAdjustWaiters(1)
534 }
535 return r
536 }
537
538 func netpollgoready(gp *g, traceskip int) {
539 goready(gp, traceskip+1)
540 }
541
542
543
544
545
546 func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
547 gpp := &pd.rg
548 if mode == 'w' {
549 gpp = &pd.wg
550 }
551
552
553 for {
554
555 if gpp.CompareAndSwap(pdReady, pdNil) {
556 return true
557 }
558 if gpp.CompareAndSwap(pdNil, pdWait) {
559 break
560 }
561
562
563
564 if v := gpp.Load(); v != pdReady && v != pdNil {
565 throw("runtime: double wait")
566 }
567 }
568
569
570
571
572 if waitio || netpollcheckerr(pd, mode) == pollNoError {
573 gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceBlockNet, 5)
574 }
575
576 old := gpp.Swap(pdNil)
577 if old > pdWait {
578 throw("runtime: corrupted polldesc")
579 }
580 return old == pdReady
581 }
582
583
584
585
586
587
588
589 func netpollunblock(pd *pollDesc, mode int32, ioready bool, delta *int32) *g {
590 gpp := &pd.rg
591 if mode == 'w' {
592 gpp = &pd.wg
593 }
594
595 for {
596 old := gpp.Load()
597 if old == pdReady {
598 return nil
599 }
600 if old == pdNil && !ioready {
601
602
603 return nil
604 }
605 new := pdNil
606 if ioready {
607 new = pdReady
608 }
609 if gpp.CompareAndSwap(old, new) {
610 if old == pdWait {
611 old = pdNil
612 } else if old != pdNil {
613 *delta -= 1
614 }
615 return (*g)(unsafe.Pointer(old))
616 }
617 }
618 }
619
620 func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
621 lock(&pd.lock)
622
623
624 currentSeq := pd.rseq
625 if !read {
626 currentSeq = pd.wseq
627 }
628 if seq != currentSeq {
629
630 unlock(&pd.lock)
631 return
632 }
633 delta := int32(0)
634 var rg *g
635 if read {
636 if pd.rd <= 0 || pd.rt.f == nil {
637 throw("runtime: inconsistent read deadline")
638 }
639 pd.rd = -1
640 pd.publishInfo()
641 rg = netpollunblock(pd, 'r', false, &delta)
642 }
643 var wg *g
644 if write {
645 if pd.wd <= 0 || pd.wt.f == nil && !read {
646 throw("runtime: inconsistent write deadline")
647 }
648 pd.wd = -1
649 pd.publishInfo()
650 wg = netpollunblock(pd, 'w', false, &delta)
651 }
652 unlock(&pd.lock)
653 if rg != nil {
654 netpollgoready(rg, 0)
655 }
656 if wg != nil {
657 netpollgoready(wg, 0)
658 }
659 netpollAdjustWaiters(delta)
660 }
661
662 func netpollDeadline(arg any, seq uintptr) {
663 netpolldeadlineimpl(arg.(*pollDesc), seq, true, true)
664 }
665
666 func netpollReadDeadline(arg any, seq uintptr) {
667 netpolldeadlineimpl(arg.(*pollDesc), seq, true, false)
668 }
669
670 func netpollWriteDeadline(arg any, seq uintptr) {
671 netpolldeadlineimpl(arg.(*pollDesc), seq, false, true)
672 }
673
674
675 func netpollAnyWaiters() bool {
676 return netpollWaiters.Load() > 0
677 }
678
679
680 func netpollAdjustWaiters(delta int32) {
681 if delta != 0 {
682 netpollWaiters.Add(delta)
683 }
684 }
685
686 func (c *pollCache) alloc() *pollDesc {
687 lock(&c.lock)
688 if c.first == nil {
689 const pdSize = unsafe.Sizeof(pollDesc{})
690 n := pollBlockSize / pdSize
691 if n == 0 {
692 n = 1
693 }
694
695
696 mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
697 for i := uintptr(0); i < n; i++ {
698 pd := (*pollDesc)(add(mem, i*pdSize))
699 pd.link = c.first
700 c.first = pd
701 }
702 }
703 pd := c.first
704 c.first = pd.link
705 lockInit(&pd.lock, lockRankPollDesc)
706 unlock(&c.lock)
707 return pd
708 }
709
710
711
712
713
714
715 func (pd *pollDesc) makeArg() (i any) {
716 x := (*eface)(unsafe.Pointer(&i))
717 x._type = pdType
718 x.data = unsafe.Pointer(&pd.self)
719 return
720 }
721
722 var (
723 pdEface any = (*pollDesc)(nil)
724 pdType *_type = efaceOf(&pdEface)._type
725 )
726
View as plain text