...

Source file src/runtime/netpoll_kqueue.go

Documentation: runtime

     1  // Copyright 2013 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  //go:build darwin || dragonfly || freebsd || netbsd || openbsd
     6  
     7  package runtime
     8  
     9  // Integrated network poller (kqueue-based implementation).
    10  
    11  import (
    12  	"internal/goarch"
    13  	"runtime/internal/atomic"
    14  	"unsafe"
    15  )
    16  
    17  var (
    18  	kq int32 = -1
    19  
    20  	netpollBreakRd, netpollBreakWr uintptr // for netpollBreak
    21  
    22  	netpollWakeSig atomic.Uint32 // used to avoid duplicate calls of netpollBreak
    23  )
    24  
    25  func netpollinit() {
    26  	kq = kqueue()
    27  	if kq < 0 {
    28  		println("runtime: kqueue failed with", -kq)
    29  		throw("runtime: netpollinit failed")
    30  	}
    31  	closeonexec(kq)
    32  	r, w, errno := nonblockingPipe()
    33  	if errno != 0 {
    34  		println("runtime: pipe failed with", -errno)
    35  		throw("runtime: pipe failed")
    36  	}
    37  	ev := keventt{
    38  		filter: _EVFILT_READ,
    39  		flags:  _EV_ADD,
    40  	}
    41  	*(*uintptr)(unsafe.Pointer(&ev.ident)) = uintptr(r)
    42  	n := kevent(kq, &ev, 1, nil, 0, nil)
    43  	if n < 0 {
    44  		println("runtime: kevent failed with", -n)
    45  		throw("runtime: kevent failed")
    46  	}
    47  	netpollBreakRd = uintptr(r)
    48  	netpollBreakWr = uintptr(w)
    49  }
    50  
    51  func netpollIsPollDescriptor(fd uintptr) bool {
    52  	return fd == uintptr(kq) || fd == netpollBreakRd || fd == netpollBreakWr
    53  }
    54  
    55  func netpollopen(fd uintptr, pd *pollDesc) int32 {
    56  	// Arm both EVFILT_READ and EVFILT_WRITE in edge-triggered mode (EV_CLEAR)
    57  	// for the whole fd lifetime. The notifications are automatically unregistered
    58  	// when fd is closed.
    59  	var ev [2]keventt
    60  	*(*uintptr)(unsafe.Pointer(&ev[0].ident)) = fd
    61  	ev[0].filter = _EVFILT_READ
    62  	ev[0].flags = _EV_ADD | _EV_CLEAR
    63  	ev[0].fflags = 0
    64  	ev[0].data = 0
    65  
    66  	if goarch.PtrSize == 4 {
    67  		// We only have a pointer-sized field to store into,
    68  		// so on a 32-bit system we get no sequence protection.
    69  		// TODO(iant): If we notice any problems we could at least
    70  		// steal the low-order 2 bits for a tiny sequence number.
    71  		ev[0].udata = (*byte)(unsafe.Pointer(pd))
    72  	} else {
    73  		tp := taggedPointerPack(unsafe.Pointer(pd), pd.fdseq.Load())
    74  		ev[0].udata = (*byte)(unsafe.Pointer(uintptr(tp)))
    75  	}
    76  	ev[1] = ev[0]
    77  	ev[1].filter = _EVFILT_WRITE
    78  	n := kevent(kq, &ev[0], 2, nil, 0, nil)
    79  	if n < 0 {
    80  		return -n
    81  	}
    82  	return 0
    83  }
    84  
    85  func netpollclose(fd uintptr) int32 {
    86  	// Don't need to unregister because calling close()
    87  	// on fd will remove any kevents that reference the descriptor.
    88  	return 0
    89  }
    90  
    91  func netpollarm(pd *pollDesc, mode int) {
    92  	throw("runtime: unused")
    93  }
    94  
    95  // netpollBreak interrupts a kevent.
    96  func netpollBreak() {
    97  	// Failing to cas indicates there is an in-flight wakeup, so we're done here.
    98  	if !netpollWakeSig.CompareAndSwap(0, 1) {
    99  		return
   100  	}
   101  
   102  	for {
   103  		var b byte
   104  		n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
   105  		if n == 1 || n == -_EAGAIN {
   106  			break
   107  		}
   108  		if n == -_EINTR {
   109  			continue
   110  		}
   111  		println("runtime: netpollBreak write failed with", -n)
   112  		throw("runtime: netpollBreak write failed")
   113  	}
   114  }
   115  
   116  // netpoll checks for ready network connections.
   117  // Returns list of goroutines that become runnable.
   118  // delay < 0: blocks indefinitely
   119  // delay == 0: does not block, just polls
   120  // delay > 0: block for up to that many nanoseconds
   121  func netpoll(delay int64) (gList, int32) {
   122  	if kq == -1 {
   123  		return gList{}, 0
   124  	}
   125  	var tp *timespec
   126  	var ts timespec
   127  	if delay < 0 {
   128  		tp = nil
   129  	} else if delay == 0 {
   130  		tp = &ts
   131  	} else {
   132  		ts.setNsec(delay)
   133  		if ts.tv_sec > 1e6 {
   134  			// Darwin returns EINVAL if the sleep time is too long.
   135  			ts.tv_sec = 1e6
   136  		}
   137  		tp = &ts
   138  	}
   139  	var events [64]keventt
   140  retry:
   141  	n := kevent(kq, nil, 0, &events[0], int32(len(events)), tp)
   142  	if n < 0 {
   143  		// Ignore the ETIMEDOUT error for now, but try to dive deep and
   144  		// figure out what really happened with n == ETIMEOUT,
   145  		// see https://go.dev/issue/59679 for details.
   146  		if n != -_EINTR && n != -_ETIMEDOUT {
   147  			println("runtime: kevent on fd", kq, "failed with", -n)
   148  			throw("runtime: netpoll failed")
   149  		}
   150  		// If a timed sleep was interrupted, just return to
   151  		// recalculate how long we should sleep now.
   152  		if delay > 0 {
   153  			return gList{}, 0
   154  		}
   155  		goto retry
   156  	}
   157  	var toRun gList
   158  	delta := int32(0)
   159  	for i := 0; i < int(n); i++ {
   160  		ev := &events[i]
   161  
   162  		if uintptr(ev.ident) == netpollBreakRd {
   163  			if ev.filter != _EVFILT_READ {
   164  				println("runtime: netpoll: break fd ready for", ev.filter)
   165  				throw("runtime: netpoll: break fd ready for something unexpected")
   166  			}
   167  			if delay != 0 {
   168  				// netpollBreak could be picked up by a
   169  				// nonblocking poll. Only read the byte
   170  				// if blocking.
   171  				var tmp [16]byte
   172  				read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
   173  				netpollWakeSig.Store(0)
   174  			}
   175  			continue
   176  		}
   177  
   178  		var mode int32
   179  		switch ev.filter {
   180  		case _EVFILT_READ:
   181  			mode += 'r'
   182  
   183  			// On some systems when the read end of a pipe
   184  			// is closed the write end will not get a
   185  			// _EVFILT_WRITE event, but will get a
   186  			// _EVFILT_READ event with EV_EOF set.
   187  			// Note that setting 'w' here just means that we
   188  			// will wake up a goroutine waiting to write;
   189  			// that goroutine will try the write again,
   190  			// and the appropriate thing will happen based
   191  			// on what that write returns (success, EPIPE, EAGAIN).
   192  			if ev.flags&_EV_EOF != 0 {
   193  				mode += 'w'
   194  			}
   195  		case _EVFILT_WRITE:
   196  			mode += 'w'
   197  		}
   198  		if mode != 0 {
   199  			var pd *pollDesc
   200  			var tag uintptr
   201  			if goarch.PtrSize == 4 {
   202  				// No sequence protection on 32-bit systems.
   203  				// See netpollopen for details.
   204  				pd = (*pollDesc)(unsafe.Pointer(ev.udata))
   205  				tag = 0
   206  			} else {
   207  				tp := taggedPointer(uintptr(unsafe.Pointer(ev.udata)))
   208  				pd = (*pollDesc)(tp.pointer())
   209  				tag = tp.tag()
   210  				if pd.fdseq.Load() != tag {
   211  					continue
   212  				}
   213  			}
   214  			pd.setEventErr(ev.flags == _EV_ERROR, tag)
   215  			delta += netpollready(&toRun, pd, mode)
   216  		}
   217  	}
   218  	return toRun, delta
   219  }
   220  

View as plain text