...
Source file
src/runtime/netpoll_kqueue.go
Documentation: runtime
1
2
3
4
5
6
7 package runtime
8
9
10
11 import (
12 "internal/goarch"
13 "runtime/internal/atomic"
14 "unsafe"
15 )
16
17 var (
18 kq int32 = -1
19
20 netpollBreakRd, netpollBreakWr uintptr
21
22 netpollWakeSig atomic.Uint32
23 )
24
25 func netpollinit() {
26 kq = kqueue()
27 if kq < 0 {
28 println("runtime: kqueue failed with", -kq)
29 throw("runtime: netpollinit failed")
30 }
31 closeonexec(kq)
32 r, w, errno := nonblockingPipe()
33 if errno != 0 {
34 println("runtime: pipe failed with", -errno)
35 throw("runtime: pipe failed")
36 }
37 ev := keventt{
38 filter: _EVFILT_READ,
39 flags: _EV_ADD,
40 }
41 *(*uintptr)(unsafe.Pointer(&ev.ident)) = uintptr(r)
42 n := kevent(kq, &ev, 1, nil, 0, nil)
43 if n < 0 {
44 println("runtime: kevent failed with", -n)
45 throw("runtime: kevent failed")
46 }
47 netpollBreakRd = uintptr(r)
48 netpollBreakWr = uintptr(w)
49 }
50
51 func netpollIsPollDescriptor(fd uintptr) bool {
52 return fd == uintptr(kq) || fd == netpollBreakRd || fd == netpollBreakWr
53 }
54
55 func netpollopen(fd uintptr, pd *pollDesc) int32 {
56
57
58
59 var ev [2]keventt
60 *(*uintptr)(unsafe.Pointer(&ev[0].ident)) = fd
61 ev[0].filter = _EVFILT_READ
62 ev[0].flags = _EV_ADD | _EV_CLEAR
63 ev[0].fflags = 0
64 ev[0].data = 0
65
66 if goarch.PtrSize == 4 {
67
68
69
70
71 ev[0].udata = (*byte)(unsafe.Pointer(pd))
72 } else {
73 tp := taggedPointerPack(unsafe.Pointer(pd), pd.fdseq.Load())
74 ev[0].udata = (*byte)(unsafe.Pointer(uintptr(tp)))
75 }
76 ev[1] = ev[0]
77 ev[1].filter = _EVFILT_WRITE
78 n := kevent(kq, &ev[0], 2, nil, 0, nil)
79 if n < 0 {
80 return -n
81 }
82 return 0
83 }
84
85 func netpollclose(fd uintptr) int32 {
86
87
88 return 0
89 }
90
91 func netpollarm(pd *pollDesc, mode int) {
92 throw("runtime: unused")
93 }
94
95
96 func netpollBreak() {
97
98 if !netpollWakeSig.CompareAndSwap(0, 1) {
99 return
100 }
101
102 for {
103 var b byte
104 n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
105 if n == 1 || n == -_EAGAIN {
106 break
107 }
108 if n == -_EINTR {
109 continue
110 }
111 println("runtime: netpollBreak write failed with", -n)
112 throw("runtime: netpollBreak write failed")
113 }
114 }
115
116
117
118
119
120
121 func netpoll(delay int64) (gList, int32) {
122 if kq == -1 {
123 return gList{}, 0
124 }
125 var tp *timespec
126 var ts timespec
127 if delay < 0 {
128 tp = nil
129 } else if delay == 0 {
130 tp = &ts
131 } else {
132 ts.setNsec(delay)
133 if ts.tv_sec > 1e6 {
134
135 ts.tv_sec = 1e6
136 }
137 tp = &ts
138 }
139 var events [64]keventt
140 retry:
141 n := kevent(kq, nil, 0, &events[0], int32(len(events)), tp)
142 if n < 0 {
143
144
145
146 if n != -_EINTR && n != -_ETIMEDOUT {
147 println("runtime: kevent on fd", kq, "failed with", -n)
148 throw("runtime: netpoll failed")
149 }
150
151
152 if delay > 0 {
153 return gList{}, 0
154 }
155 goto retry
156 }
157 var toRun gList
158 delta := int32(0)
159 for i := 0; i < int(n); i++ {
160 ev := &events[i]
161
162 if uintptr(ev.ident) == netpollBreakRd {
163 if ev.filter != _EVFILT_READ {
164 println("runtime: netpoll: break fd ready for", ev.filter)
165 throw("runtime: netpoll: break fd ready for something unexpected")
166 }
167 if delay != 0 {
168
169
170
171 var tmp [16]byte
172 read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
173 netpollWakeSig.Store(0)
174 }
175 continue
176 }
177
178 var mode int32
179 switch ev.filter {
180 case _EVFILT_READ:
181 mode += 'r'
182
183
184
185
186
187
188
189
190
191
192 if ev.flags&_EV_EOF != 0 {
193 mode += 'w'
194 }
195 case _EVFILT_WRITE:
196 mode += 'w'
197 }
198 if mode != 0 {
199 var pd *pollDesc
200 var tag uintptr
201 if goarch.PtrSize == 4 {
202
203
204 pd = (*pollDesc)(unsafe.Pointer(ev.udata))
205 tag = 0
206 } else {
207 tp := taggedPointer(uintptr(unsafe.Pointer(ev.udata)))
208 pd = (*pollDesc)(tp.pointer())
209 tag = tp.tag()
210 if pd.fdseq.Load() != tag {
211 continue
212 }
213 }
214 pd.setEventErr(ev.flags == _EV_ERROR, tag)
215 delta += netpollready(&toRun, pd, mode)
216 }
217 }
218 return toRun, delta
219 }
220
View as plain text