...
1
2
3
4
5 package poll
6
7 import (
8 "errors"
9 "io"
10 "sync"
11 "time"
12 )
13
14 type FD struct {
15
16 fdmu fdMutex
17
18 Destroy func()
19
20
21 rmu sync.Mutex
22 wmu sync.Mutex
23 raio *asyncIO
24 waio *asyncIO
25 rtimer *time.Timer
26 wtimer *time.Timer
27 rtimedout bool
28 wtimedout bool
29
30
31
32
33
34 isFile bool
35 }
36
37
38
39
40 func (fd *FD) destroy() error {
41 if fd.Destroy != nil {
42 fd.Destroy()
43 }
44 return nil
45 }
46
47
48
49 func (fd *FD) Close() error {
50 if !fd.fdmu.increfAndClose() {
51 return errClosing(fd.isFile)
52 }
53 return nil
54 }
55
56
57 func (fd *FD) Read(fn func([]byte) (int, error), b []byte) (int, error) {
58 if err := fd.readLock(); err != nil {
59 return 0, err
60 }
61 defer fd.readUnlock()
62 if len(b) == 0 {
63 return 0, nil
64 }
65 fd.rmu.Lock()
66 if fd.rtimedout {
67 fd.rmu.Unlock()
68 return 0, ErrDeadlineExceeded
69 }
70 fd.raio = newAsyncIO(fn, b)
71 fd.rmu.Unlock()
72 n, err := fd.raio.Wait()
73 fd.raio = nil
74 if isHangup(err) {
75 err = io.EOF
76 }
77 if isInterrupted(err) {
78 err = ErrDeadlineExceeded
79 }
80 return n, err
81 }
82
83
84 func (fd *FD) Write(fn func([]byte) (int, error), b []byte) (int, error) {
85 if err := fd.writeLock(); err != nil {
86 return 0, err
87 }
88 defer fd.writeUnlock()
89 fd.wmu.Lock()
90 if fd.wtimedout {
91 fd.wmu.Unlock()
92 return 0, ErrDeadlineExceeded
93 }
94 fd.waio = newAsyncIO(fn, b)
95 fd.wmu.Unlock()
96 n, err := fd.waio.Wait()
97 fd.waio = nil
98 if isInterrupted(err) {
99 err = ErrDeadlineExceeded
100 }
101 return n, err
102 }
103
104
105 func (fd *FD) SetDeadline(t time.Time) error {
106 return setDeadlineImpl(fd, t, 'r'+'w')
107 }
108
109
110 func (fd *FD) SetReadDeadline(t time.Time) error {
111 return setDeadlineImpl(fd, t, 'r')
112 }
113
114
115 func (fd *FD) SetWriteDeadline(t time.Time) error {
116 return setDeadlineImpl(fd, t, 'w')
117 }
118
119 func setDeadlineImpl(fd *FD, t time.Time, mode int) error {
120 d := t.Sub(time.Now())
121 if mode == 'r' || mode == 'r'+'w' {
122 fd.rmu.Lock()
123 defer fd.rmu.Unlock()
124 if fd.rtimer != nil {
125 fd.rtimer.Stop()
126 fd.rtimer = nil
127 }
128 fd.rtimedout = false
129 }
130 if mode == 'w' || mode == 'r'+'w' {
131 fd.wmu.Lock()
132 defer fd.wmu.Unlock()
133 if fd.wtimer != nil {
134 fd.wtimer.Stop()
135 fd.wtimer = nil
136 }
137 fd.wtimedout = false
138 }
139 if !t.IsZero() && d > 0 {
140
141 if mode == 'r' || mode == 'r'+'w' {
142 var timer *time.Timer
143 timer = time.AfterFunc(d, func() {
144 fd.rmu.Lock()
145 defer fd.rmu.Unlock()
146 if fd.rtimer != timer {
147
148 return
149 }
150 fd.rtimedout = true
151 if fd.raio != nil {
152 fd.raio.Cancel()
153 }
154 })
155 fd.rtimer = timer
156 }
157 if mode == 'w' || mode == 'r'+'w' {
158 var timer *time.Timer
159 timer = time.AfterFunc(d, func() {
160 fd.wmu.Lock()
161 defer fd.wmu.Unlock()
162 if fd.wtimer != timer {
163
164 return
165 }
166 fd.wtimedout = true
167 if fd.waio != nil {
168 fd.waio.Cancel()
169 }
170 })
171 fd.wtimer = timer
172 }
173 }
174 if !t.IsZero() && d <= 0 {
175
176 if mode == 'r' || mode == 'r'+'w' {
177 fd.rtimedout = true
178 if fd.raio != nil {
179 fd.raio.Cancel()
180 }
181 }
182 if mode == 'w' || mode == 'r'+'w' {
183 fd.wtimedout = true
184 if fd.waio != nil {
185 fd.waio.Cancel()
186 }
187 }
188 }
189 return nil
190 }
191
192
193
194
195 func (fd *FD) ReadLock() error {
196 return fd.readLock()
197 }
198
199
200 func (fd *FD) ReadUnlock() {
201 fd.readUnlock()
202 }
203
204 func isHangup(err error) bool {
205 return err != nil && stringsHasSuffix(err.Error(), "Hangup")
206 }
207
208 func isInterrupted(err error) bool {
209 return err != nil && stringsHasSuffix(err.Error(), "interrupted")
210 }
211
212
213
214 func IsPollDescriptor(fd uintptr) bool {
215 return false
216 }
217
218
219
220 func (fd *FD) RawControl(f func(uintptr)) error {
221 return errors.New("not implemented")
222 }
223
224
225 func (fd *FD) RawRead(f func(uintptr) bool) error {
226 return errors.New("not implemented")
227 }
228
229
230 func (fd *FD) RawWrite(f func(uintptr) bool) error {
231 return errors.New("not implemented")
232 }
233
View as plain text