1
2
3
4
5
6
7 package quic
8
9 import (
10 "context"
11 "sync"
12 "sync/atomic"
13 "time"
14 )
15
16 type streamsState struct {
17 queue queue[*Stream]
18
19
20
21
22
23
24
25
26 streams map[streamID]maybeStream
27
28
29 localLimit [streamTypeCount]localStreamLimits
30 remoteLimit [streamTypeCount]remoteStreamLimits
31
32
33 peerInitialMaxStreamDataRemote [streamTypeCount]int64
34 peerInitialMaxStreamDataBidiLocal int64
35
36
37 inflow connInflow
38 outflow connOutflow
39
40
41
42 needSend atomic.Bool
43 sendMu sync.Mutex
44 queueMeta streamRing
45 queueData streamRing
46 }
47
48
49 type maybeStream struct {
50 s *Stream
51 }
52
53 func (c *Conn) streamsInit() {
54 c.streams.streams = make(map[streamID]maybeStream)
55 c.streams.queue = newQueue[*Stream]()
56 c.streams.localLimit[bidiStream].init()
57 c.streams.localLimit[uniStream].init()
58 c.streams.remoteLimit[bidiStream].init(c.config.maxBidiRemoteStreams())
59 c.streams.remoteLimit[uniStream].init(c.config.maxUniRemoteStreams())
60 c.inflowInit()
61 }
62
63 func (c *Conn) streamsCleanup() {
64 c.streams.queue.close(errConnClosed)
65 c.streams.localLimit[bidiStream].connHasClosed()
66 c.streams.localLimit[uniStream].connHasClosed()
67 for _, s := range c.streams.streams {
68 if s.s != nil {
69 s.s.connHasClosed()
70 }
71 }
72 }
73
74
75 func (c *Conn) AcceptStream(ctx context.Context) (*Stream, error) {
76 return c.streams.queue.get(ctx, c.testHooks)
77 }
78
79
80
81
82
83 func (c *Conn) NewStream(ctx context.Context) (*Stream, error) {
84 return c.newLocalStream(ctx, bidiStream)
85 }
86
87
88
89
90
91 func (c *Conn) NewSendOnlyStream(ctx context.Context) (*Stream, error) {
92 return c.newLocalStream(ctx, uniStream)
93 }
94
95 func (c *Conn) newLocalStream(ctx context.Context, styp streamType) (*Stream, error) {
96 num, err := c.streams.localLimit[styp].open(ctx, c)
97 if err != nil {
98 return nil, err
99 }
100
101 s := newStream(c, newStreamID(c.side, styp, num))
102 s.outmaxbuf = c.config.maxStreamWriteBufferSize()
103 s.outwin = c.streams.peerInitialMaxStreamDataRemote[styp]
104 if styp == bidiStream {
105 s.inmaxbuf = c.config.maxStreamReadBufferSize()
106 s.inwin = c.config.maxStreamReadBufferSize()
107 }
108 s.inUnlock()
109 s.outUnlock()
110
111
112 if err := c.runOnLoop(ctx, func(now time.Time, c *Conn) {
113 c.streams.streams[s.id] = maybeStream{s}
114 }); err != nil {
115 return nil, err
116 }
117 return s, nil
118 }
119
120
121
122
123
124
125 type streamFrameType uint8
126
127 const (
128 sendStream = streamFrameType(iota)
129 recvStream
130 )
131
132
133
134 func (c *Conn) streamForID(id streamID) *Stream {
135 return c.streams.streams[id].s
136 }
137
138
139
140
141
142
143
144
145
146
147 func (c *Conn) streamForFrame(now time.Time, id streamID, ftype streamFrameType) *Stream {
148 if id.streamType() == uniStream {
149 if (id.initiator() == c.side) != (ftype == sendStream) {
150
151
152 c.abort(now, localTransportError{
153 code: errStreamState,
154 reason: "invalid frame for unidirectional stream",
155 })
156 return nil
157 }
158 }
159
160 ms, isOpen := c.streams.streams[id]
161 if ms.s != nil {
162 return ms.s
163 }
164
165 num := id.num()
166 styp := id.streamType()
167 if id.initiator() == c.side {
168 if num < c.streams.localLimit[styp].opened {
169
170 return nil
171 }
172
173
174 c.abort(now, localTransportError{
175 code: errStreamState,
176 reason: "received frame for unknown stream",
177 })
178 return nil
179 } else {
180
181
182
183 if !isOpen && num < c.streams.remoteLimit[styp].opened {
184
185 return nil
186 }
187 }
188
189 prevOpened := c.streams.remoteLimit[styp].opened
190 if err := c.streams.remoteLimit[styp].open(id); err != nil {
191 c.abort(now, err)
192 return nil
193 }
194
195
196
197
198 for n := newStreamID(id.initiator(), id.streamType(), prevOpened); n < id; n += 4 {
199 c.streams.streams[n] = maybeStream{}
200 }
201
202 s := newStream(c, id)
203 s.inmaxbuf = c.config.maxStreamReadBufferSize()
204 s.inwin = c.config.maxStreamReadBufferSize()
205 if id.streamType() == bidiStream {
206 s.outmaxbuf = c.config.maxStreamWriteBufferSize()
207 s.outwin = c.streams.peerInitialMaxStreamDataBidiLocal
208 }
209 s.inUnlock()
210 s.outUnlock()
211
212 c.streams.streams[id] = maybeStream{s}
213 c.streams.queue.put(s)
214 return s
215 }
216
217
218 func (c *Conn) maybeQueueStreamForSend(s *Stream, state streamState) {
219 if state.wantQueue() == state.inQueue() {
220 return
221 }
222 c.streams.sendMu.Lock()
223 defer c.streams.sendMu.Unlock()
224 state = s.state.load()
225 c.queueStreamForSendLocked(s, state)
226
227 c.streams.needSend.Store(true)
228 c.wake()
229 }
230
231
232
233
234
235 func (c *Conn) queueStreamForSendLocked(s *Stream, state streamState) {
236 for {
237 wantQueue := state.wantQueue()
238 inQueue := state.inQueue()
239 if inQueue == wantQueue {
240 return
241 }
242
243 switch inQueue {
244 case metaQueue:
245 c.streams.queueMeta.remove(s)
246 case dataQueue:
247 c.streams.queueData.remove(s)
248 }
249
250 switch wantQueue {
251 case metaQueue:
252 c.streams.queueMeta.append(s)
253 state = s.state.set(streamQueueMeta, streamQueueMeta|streamQueueData)
254 case dataQueue:
255 c.streams.queueData.append(s)
256 state = s.state.set(streamQueueData, streamQueueMeta|streamQueueData)
257 case noQueue:
258 state = s.state.set(0, streamQueueMeta|streamQueueData)
259 }
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275 }
276 }
277
278
279
280
281
282 func (c *Conn) appendStreamFrames(w *packetWriter, pnum packetNumber, pto bool) bool {
283
284 if !c.appendMaxDataFrame(w, pnum, pto) {
285 return false
286 }
287
288
289 if !c.streams.remoteLimit[uniStream].appendFrame(w, uniStream, pnum, pto) {
290 return false
291 }
292 if !c.streams.remoteLimit[bidiStream].appendFrame(w, bidiStream, pnum, pto) {
293 return false
294 }
295
296 if pto {
297 return c.appendStreamFramesPTO(w, pnum)
298 }
299 if !c.streams.needSend.Load() {
300 return true
301 }
302 c.streams.sendMu.Lock()
303 defer c.streams.sendMu.Unlock()
304
305 for c.streams.queueMeta.head != nil {
306 s := c.streams.queueMeta.head
307 state := s.state.load()
308 if state&(streamQueueMeta|streamConnRemoved) != streamQueueMeta {
309 panic("BUG: queueMeta stream is not streamQueueMeta")
310 }
311 if state&streamInSendMeta != 0 {
312 s.ingate.lock()
313 ok := s.appendInFramesLocked(w, pnum, pto)
314 state = s.inUnlockNoQueue()
315 if !ok {
316 return false
317 }
318 if state&streamInSendMeta != 0 {
319 panic("BUG: streamInSendMeta set after successfully appending frames")
320 }
321 }
322 if state&streamOutSendMeta != 0 {
323 s.outgate.lock()
324
325
326 ok := s.appendOutFramesLocked(w, pnum, pto)
327 state = s.outUnlockNoQueue()
328
329
330
331 if !ok && state&streamOutSendMeta != 0 {
332 return false
333 }
334 if state&streamOutSendMeta != 0 {
335 panic("BUG: streamOutSendMeta set after successfully appending frames")
336 }
337 }
338
339 c.streams.queueMeta.remove(s)
340 if state&(streamInDone|streamOutDone) == streamInDone|streamOutDone {
341
342 state = s.state.set(streamConnRemoved, streamQueueMeta|streamConnRemoved)
343 delete(c.streams.streams, s.id)
344
345
346
347 if s.id.initiator() != c.side {
348 c.streams.remoteLimit[s.id.streamType()].close()
349 }
350 } else {
351 state = s.state.set(0, streamQueueMeta|streamConnRemoved)
352 }
353
354
355
356
357 c.queueStreamForSendLocked(s, state)
358 }
359
360 for c.streams.queueData.head != nil {
361 avail := c.streams.outflow.avail()
362 if avail == 0 {
363 break
364 }
365 s := c.streams.queueData.head
366 s.outgate.lock()
367 ok := s.appendOutFramesLocked(w, pnum, pto)
368 state := s.outUnlockNoQueue()
369 if !ok {
370
371
372
373
374
375
376
377 if avail > 512 {
378 c.streams.queueData.head = s.next
379 }
380 return false
381 }
382 if state&streamQueueData == 0 {
383 panic("BUG: queueData stream is not streamQueueData")
384 }
385 if state&streamOutSendData != 0 {
386
387
388
389
390
391 if c.streams.outflow.avail() != 0 {
392 panic("BUG: streamOutSendData set and flow control available after send")
393 }
394 c.streams.queueData.head = s.next
395 return true
396 }
397 c.streams.queueData.remove(s)
398 state = s.state.set(0, streamQueueData)
399 c.queueStreamForSendLocked(s, state)
400 }
401 if c.streams.queueMeta.head == nil && c.streams.queueData.head == nil {
402 c.streams.needSend.Store(false)
403 }
404 return true
405 }
406
407
408
409
410
411
412 func (c *Conn) appendStreamFramesPTO(w *packetWriter, pnum packetNumber) bool {
413 c.streams.sendMu.Lock()
414 defer c.streams.sendMu.Unlock()
415 const pto = true
416 for _, ms := range c.streams.streams {
417 s := ms.s
418 if s == nil {
419 continue
420 }
421 const pto = true
422 s.ingate.lock()
423 inOK := s.appendInFramesLocked(w, pnum, pto)
424 s.inUnlockNoQueue()
425 if !inOK {
426 return false
427 }
428
429 s.outgate.lock()
430 outOK := s.appendOutFramesLocked(w, pnum, pto)
431 s.outUnlockNoQueue()
432 if !outOK {
433 return false
434 }
435 }
436 return true
437 }
438
439
440 type streamRing struct {
441 head *Stream
442 }
443
444
445
446 func (r *streamRing) remove(s *Stream) {
447 if s.next == s {
448 r.head = nil
449 } else {
450 s.prev.next = s.next
451 s.next.prev = s.prev
452 if r.head == s {
453 r.head = s.next
454 }
455 }
456 }
457
458
459
460 func (r *streamRing) append(s *Stream) {
461 if r.head == nil {
462 r.head = s
463 s.next = s
464 s.prev = s
465 } else {
466 s.prev = r.head.prev
467 s.next = r.head
468 s.prev.next = s
469 s.next.prev = s
470 }
471 }
472
View as plain text