1
2
3
4
5
6
7 package quic
8
9 import (
10 "context"
11 "errors"
12 "fmt"
13 "io"
14 "math"
15 )
16
17 type Stream struct {
18 id streamID
19 conn *Conn
20
21
22
23
24
25 ingate gate
26 in pipe
27 inwin int64
28 insendmax sentVal
29 inmaxbuf int64
30 insize int64
31 inset rangeset[int64]
32 inclosed sentVal
33 inresetcode int64
34
35
36
37
38
39
40 outgate gate
41 out pipe
42 outflushed int64
43 outwin int64
44 outmaxsent int64
45 outmaxbuf int64
46 outunsent rangeset[int64]
47 outacked rangeset[int64]
48 outopened sentVal
49 outclosed sentVal
50 outblocked sentVal
51 outreset sentVal
52 outresetcode uint64
53 outdone chan struct{}
54
55
56
57
58
59
60
61
62
63
64 state atomicBits[streamState]
65
66 prev, next *Stream
67 }
68
69 type streamState uint32
70
71 const (
72
73
74
75 streamInSendMeta = streamState(1 << iota)
76
77
78
79
80
81
82
83 streamOutSendMeta
84 streamOutSendData
85
86
87
88
89 streamInDone
90 streamOutDone
91
92
93 streamConnRemoved
94
95
96
97 streamQueueMeta
98 streamQueueData
99 )
100
101 type streamQueue int
102
103 const (
104 noQueue = streamQueue(iota)
105 metaQueue
106 dataQueue
107 )
108
109
110
111
112 const streamResetByConnClose = math.MaxInt64
113
114
115 func (s streamState) wantQueue() streamQueue {
116 switch {
117 case s&(streamInSendMeta|streamOutSendMeta) != 0:
118 return metaQueue
119 case s&(streamInDone|streamOutDone|streamConnRemoved) == streamInDone|streamOutDone:
120 return metaQueue
121 case s&streamOutSendData != 0:
122
123
124
125 return dataQueue
126 }
127 return noQueue
128 }
129
130
131 func (s streamState) inQueue() streamQueue {
132 switch {
133 case s&streamQueueMeta != 0:
134 return metaQueue
135 case s&streamQueueData != 0:
136 return dataQueue
137 }
138 return noQueue
139 }
140
141
142
143
144
145
146
147 func newStream(c *Conn, id streamID) *Stream {
148 s := &Stream{
149 conn: c,
150 id: id,
151 insize: -1,
152 inresetcode: -1,
153 ingate: newLockedGate(),
154 outgate: newLockedGate(),
155 }
156 if !s.IsReadOnly() {
157 s.outdone = make(chan struct{})
158 }
159 return s
160 }
161
162
163
164 func (s *Stream) IsReadOnly() bool {
165 return s.id.streamType() == uniStream && s.id.initiator() != s.conn.side
166 }
167
168
169
170 func (s *Stream) IsWriteOnly() bool {
171 return s.id.streamType() == uniStream && s.id.initiator() == s.conn.side
172 }
173
174
175
176 func (s *Stream) Read(b []byte) (n int, err error) {
177 return s.ReadContext(context.Background(), b)
178 }
179
180
181
182
183
184
185
186
187
188 func (s *Stream) ReadContext(ctx context.Context, b []byte) (n int, err error) {
189 if s.IsWriteOnly() {
190 return 0, errors.New("read from write-only stream")
191 }
192 if err := s.ingate.waitAndLock(ctx, s.conn.testHooks); err != nil {
193 return 0, err
194 }
195 defer func() {
196 s.inUnlock()
197 s.conn.handleStreamBytesReadOffLoop(int64(n))
198 }()
199 if s.inresetcode != -1 {
200 return 0, fmt.Errorf("stream reset by peer: %w", StreamErrorCode(s.inresetcode))
201 }
202 if s.inclosed.isSet() {
203 return 0, errors.New("read from closed stream")
204 }
205 if s.insize == s.in.start {
206 return 0, io.EOF
207 }
208
209 if len(s.inset) < 1 || s.inset[0].start != 0 || s.inset[0].end <= s.in.start {
210 panic("BUG: inconsistent input stream state")
211 }
212 if size := int(s.inset[0].end - s.in.start); size < len(b) {
213 b = b[:size]
214 }
215 start := s.in.start
216 end := start + int64(len(b))
217 s.in.copy(start, b)
218 s.in.discardBefore(end)
219 if s.insize == -1 || s.insize > s.inwin {
220 if shouldUpdateFlowControl(s.inmaxbuf, s.in.start+s.inmaxbuf-s.inwin) {
221
222 s.insendmax.setUnsent()
223 }
224 }
225 if end == s.insize {
226 return len(b), io.EOF
227 }
228 return len(b), nil
229 }
230
231
232
233
234
235 func shouldUpdateFlowControl(maxWindow, addedWindow int64) bool {
236 return addedWindow >= maxWindow/8
237 }
238
239
240
241 func (s *Stream) Write(b []byte) (n int, err error) {
242 return s.WriteContext(context.Background(), b)
243 }
244
245
246
247
248
249
250 func (s *Stream) WriteContext(ctx context.Context, b []byte) (n int, err error) {
251 if s.IsReadOnly() {
252 return 0, errors.New("write to read-only stream")
253 }
254 canWrite := s.outgate.lock()
255 for {
256
257
258
259 if len(b) > 0 && !canWrite {
260
261 s.outUnlock()
262 if err := s.outgate.waitAndLock(ctx, s.conn.testHooks); err != nil {
263 return n, err
264 }
265
266
267
268 }
269 if s.outreset.isSet() {
270 s.outUnlock()
271 return n, errors.New("write to reset stream")
272 }
273 if s.outclosed.isSet() {
274 s.outUnlock()
275 return n, errors.New("write to closed stream")
276 }
277 if len(b) == 0 {
278 break
279 }
280
281
282 lim := s.out.start + s.outmaxbuf
283
284
285 nn := min(int64(len(b)), lim-s.out.end)
286
287 s.out.writeAt(b[:nn], s.out.end)
288 b = b[nn:]
289 n += int(nn)
290
291
292
293
294
295
296
297
298
299 const autoFlushSize = smallestMaxDatagramSize - 1 - connIDLen - 1 - aeadOverhead
300 shouldFlush := s.out.end >= s.outwin ||
301 s.out.end >= lim ||
302 (s.out.end-s.outflushed) >= autoFlushSize
303 if shouldFlush {
304 s.flushLocked()
305 }
306 if s.out.end > s.outwin {
307
308
309 s.outblocked.set()
310 }
311
312 canWrite = false
313 }
314 s.outUnlock()
315 return n, nil
316 }
317
318
319
320
321 func (s *Stream) Flush() {
322 s.outgate.lock()
323 defer s.outUnlock()
324 s.flushLocked()
325 }
326
327 func (s *Stream) flushLocked() {
328 s.outopened.set()
329 if s.outflushed < s.outwin {
330 s.outunsent.add(s.outflushed, min(s.outwin, s.out.end))
331 }
332 s.outflushed = s.out.end
333 }
334
335
336
337 func (s *Stream) Close() error {
338 return s.CloseContext(context.Background())
339 }
340
341
342
343
344
345
346
347
348
349 func (s *Stream) CloseContext(ctx context.Context) error {
350 s.CloseRead()
351 if s.IsReadOnly() {
352 return nil
353 }
354 s.CloseWrite()
355
356 if err := s.conn.waitOnDone(ctx, s.outdone); err != nil {
357 return err
358 }
359 s.outgate.lock()
360 defer s.outUnlock()
361 if s.outclosed.isReceived() && s.outacked.isrange(0, s.out.end) {
362 return nil
363 }
364 return errors.New("stream reset")
365 }
366
367
368
369
370
371
372
373 func (s *Stream) CloseRead() {
374 if s.IsWriteOnly() {
375 return
376 }
377 s.ingate.lock()
378 if s.inset.isrange(0, s.insize) || s.inresetcode != -1 {
379
380
381
382 s.inclosed.setReceived()
383 } else {
384 s.inclosed.set()
385 }
386 discarded := s.in.end - s.in.start
387 s.in.discardBefore(s.in.end)
388 s.inUnlock()
389 s.conn.handleStreamBytesReadOffLoop(discarded)
390 }
391
392
393
394
395
396
397
398 func (s *Stream) CloseWrite() {
399 if s.IsReadOnly() {
400 return
401 }
402 s.outgate.lock()
403 defer s.outUnlock()
404 s.outclosed.set()
405 s.flushLocked()
406 }
407
408
409
410
411
412
413
414
415
416
417
418
419 func (s *Stream) Reset(code uint64) {
420 const userClosed = true
421 s.resetInternal(code, userClosed)
422 }
423
424
425
426
427
428 func (s *Stream) resetInternal(code uint64, userClosed bool) {
429 s.outgate.lock()
430 defer s.outUnlock()
431 if s.IsReadOnly() {
432 return
433 }
434 if userClosed {
435
436 s.outclosed.set()
437 }
438 if s.outreset.isSet() {
439 return
440 }
441 if code > maxVarint {
442 code = maxVarint
443 }
444
445
446
447 s.outreset.set()
448 s.outresetcode = code
449 s.out.discardBefore(s.out.end)
450 s.outunsent = rangeset[int64]{}
451 s.outblocked.clear()
452 }
453
454
455 func (s *Stream) connHasClosed() {
456
457
458
459 localClose := s.conn.lifetime.state == connStateClosing
460
461 s.ingate.lock()
462 if !s.inset.isrange(0, s.insize) && s.inresetcode == -1 {
463 if localClose {
464 s.inclosed.set()
465 } else {
466 s.inresetcode = streamResetByConnClose
467 }
468 }
469 s.inUnlock()
470
471 s.outgate.lock()
472 if localClose {
473 s.outclosed.set()
474 }
475 s.outreset.set()
476 s.outUnlock()
477 }
478
479
480
481
482
483 func (s *Stream) inUnlock() {
484 state := s.inUnlockNoQueue()
485 s.conn.maybeQueueStreamForSend(s, state)
486 }
487
488
489
490 func (s *Stream) inUnlockNoQueue() streamState {
491 canRead := s.inset.contains(s.in.start) ||
492 s.insize == s.in.start ||
493 s.inresetcode != -1 ||
494 s.inclosed.isSet()
495 defer s.ingate.unlock(canRead)
496 var state streamState
497 switch {
498 case s.IsWriteOnly():
499 state = streamInDone
500 case s.inresetcode != -1:
501 fallthrough
502 case s.in.start == s.insize:
503
504
505 if s.inclosed.isSet() {
506 state = streamInDone
507 }
508 case s.insendmax.shouldSend():
509 state = streamInSendMeta
510 case s.inclosed.shouldSend():
511 state = streamInSendMeta
512 }
513 const mask = streamInDone | streamInSendMeta
514 return s.state.set(state, mask)
515 }
516
517
518
519
520
521 func (s *Stream) outUnlock() {
522 state := s.outUnlockNoQueue()
523 s.conn.maybeQueueStreamForSend(s, state)
524 }
525
526
527
528 func (s *Stream) outUnlockNoQueue() streamState {
529 isDone := s.outclosed.isReceived() && s.outacked.isrange(0, s.out.end) ||
530 s.outreset.isSet()
531 if isDone {
532 select {
533 case <-s.outdone:
534 default:
535 if !s.IsReadOnly() {
536 close(s.outdone)
537 }
538 }
539 }
540 lim := s.out.start + s.outmaxbuf
541 canWrite := lim > s.out.end ||
542 s.outclosed.isSet() ||
543 s.outreset.isSet()
544 defer s.outgate.unlock(canWrite)
545 var state streamState
546 switch {
547 case s.IsReadOnly():
548 state = streamOutDone
549 case s.outclosed.isReceived() && s.outacked.isrange(0, s.out.end):
550 fallthrough
551 case s.outreset.isReceived():
552
553
554 if s.outclosed.isSet() {
555 state = streamOutDone
556 }
557 case s.outreset.shouldSend():
558 state = streamOutSendMeta
559 case s.outreset.isSet():
560 case s.outblocked.shouldSend():
561 state = streamOutSendMeta
562 case len(s.outunsent) > 0:
563 if s.outunsent.min() < s.outmaxsent {
564 state = streamOutSendMeta
565 } else {
566 state = streamOutSendData
567 }
568 case s.outclosed.shouldSend() && s.out.end == s.outmaxsent:
569 state = streamOutSendMeta
570 case s.outopened.shouldSend():
571 state = streamOutSendMeta
572 }
573 const mask = streamOutDone | streamOutSendMeta | streamOutSendData
574 return s.state.set(state, mask)
575 }
576
577
578 func (s *Stream) handleData(off int64, b []byte, fin bool) error {
579 s.ingate.lock()
580 defer s.inUnlock()
581 end := off + int64(len(b))
582 if err := s.checkStreamBounds(end, fin); err != nil {
583 return err
584 }
585 if s.inclosed.isSet() || s.inresetcode != -1 {
586
587
588 return nil
589 }
590 if s.insize == -1 && end > s.in.end {
591 added := end - s.in.end
592 if err := s.conn.handleStreamBytesReceived(added); err != nil {
593 return err
594 }
595 }
596 s.in.writeAt(b, off)
597 s.inset.add(off, end)
598 if fin {
599 s.insize = end
600
601 s.insendmax.clear()
602 }
603 return nil
604 }
605
606
607 func (s *Stream) handleReset(code uint64, finalSize int64) error {
608 s.ingate.lock()
609 defer s.inUnlock()
610 const fin = true
611 if err := s.checkStreamBounds(finalSize, fin); err != nil {
612 return err
613 }
614 if s.inresetcode != -1 {
615
616 return nil
617 }
618 if s.insize == -1 {
619 added := finalSize - s.in.end
620 if err := s.conn.handleStreamBytesReceived(added); err != nil {
621 return err
622 }
623 }
624 s.conn.handleStreamBytesReadOnLoop(finalSize - s.in.start)
625 s.in.discardBefore(s.in.end)
626 s.inresetcode = int64(code)
627 s.insize = finalSize
628 return nil
629 }
630
631
632 func (s *Stream) checkStreamBounds(end int64, fin bool) error {
633 if end > s.inwin {
634
635 return localTransportError{
636 code: errFlowControl,
637 reason: "stream flow control window exceeded",
638 }
639 }
640 if s.insize != -1 && end > s.insize {
641
642 return localTransportError{
643 code: errFinalSize,
644 reason: "data received past end of stream",
645 }
646 }
647 if fin && s.insize != -1 && end != s.insize {
648
649 return localTransportError{
650 code: errFinalSize,
651 reason: "final size of stream changed",
652 }
653 }
654 if fin && end < s.in.end {
655
656 return localTransportError{
657 code: errFinalSize,
658 reason: "end of stream occurs before prior data",
659 }
660 }
661 return nil
662 }
663
664
665 func (s *Stream) handleStopSending(code uint64) error {
666
667
668 const userReset = false
669 s.resetInternal(code, userReset)
670 return nil
671 }
672
673
674 func (s *Stream) handleMaxStreamData(maxStreamData int64) error {
675 s.outgate.lock()
676 defer s.outUnlock()
677 if maxStreamData <= s.outwin {
678 return nil
679 }
680 if s.outflushed > s.outwin {
681 s.outunsent.add(s.outwin, min(maxStreamData, s.outflushed))
682 }
683 s.outwin = maxStreamData
684 if s.out.end > s.outwin {
685
686 s.outblocked.setUnsent()
687 } else {
688 s.outblocked.clear()
689 }
690 return nil
691 }
692
693
694 func (s *Stream) ackOrLoss(pnum packetNumber, ftype byte, fate packetFate) {
695
696
697
698
699
700
701 switch ftype {
702 case frameTypeResetStream:
703 s.outgate.lock()
704 s.outreset.ackOrLoss(pnum, fate)
705 s.outUnlock()
706 case frameTypeStopSending:
707 s.ingate.lock()
708 s.inclosed.ackOrLoss(pnum, fate)
709 s.inUnlock()
710 case frameTypeMaxStreamData:
711 s.ingate.lock()
712 s.insendmax.ackLatestOrLoss(pnum, fate)
713 s.inUnlock()
714 case frameTypeStreamDataBlocked:
715 s.outgate.lock()
716 s.outblocked.ackLatestOrLoss(pnum, fate)
717 s.outUnlock()
718 default:
719 panic("unhandled frame type")
720 }
721 }
722
723
724 func (s *Stream) ackOrLossData(pnum packetNumber, start, end int64, fin bool, fate packetFate) {
725 s.outgate.lock()
726 defer s.outUnlock()
727 s.outopened.ackOrLoss(pnum, fate)
728 if fin {
729 s.outclosed.ackOrLoss(pnum, fate)
730 }
731 if s.outreset.isSet() {
732
733 return
734 }
735 switch fate {
736 case packetAcked:
737 s.outacked.add(start, end)
738 s.outunsent.sub(start, end)
739
740 if s.outacked.contains(s.out.start) {
741 s.out.discardBefore(s.outacked[0].end)
742 }
743 case packetLost:
744
745
746
747 s.outunsent.add(start, end)
748 for _, a := range s.outacked {
749 s.outunsent.sub(a.start, a.end)
750 }
751 }
752 }
753
754
755
756
757
758
759 func (s *Stream) appendInFramesLocked(w *packetWriter, pnum packetNumber, pto bool) bool {
760 if s.inclosed.shouldSendPTO(pto) {
761
762
763 code := uint64(0)
764 if !w.appendStopSendingFrame(s.id, code) {
765 return false
766 }
767 s.inclosed.setSent(pnum)
768 }
769
770 if s.insendmax.shouldSendPTO(pto) {
771
772 maxStreamData := s.in.start + s.inmaxbuf
773 if !w.appendMaxStreamDataFrame(s.id, maxStreamData) {
774 return false
775 }
776 s.inwin = maxStreamData
777 s.insendmax.setSent(pnum)
778 }
779 return true
780 }
781
782
783
784
785
786
787 func (s *Stream) appendOutFramesLocked(w *packetWriter, pnum packetNumber, pto bool) bool {
788 if s.outreset.isSet() {
789
790 if s.outreset.shouldSendPTO(pto) {
791 if !w.appendResetStreamFrame(s.id, s.outresetcode, min(s.outwin, s.out.end)) {
792 return false
793 }
794 s.outreset.setSent(pnum)
795 s.frameOpensStream(pnum)
796 }
797 return true
798 }
799 if s.outblocked.shouldSendPTO(pto) {
800
801 if !w.appendStreamDataBlockedFrame(s.id, s.outwin) {
802 return false
803 }
804 s.outblocked.setSent(pnum)
805 s.frameOpensStream(pnum)
806 }
807 for {
808
809 off, size := dataToSend(min(s.out.start, s.outwin), min(s.outflushed, s.outwin), s.outunsent, s.outacked, pto)
810 if end := off + size; end > s.outmaxsent {
811
812 end = min(end, s.outmaxsent+s.conn.streams.outflow.avail())
813 end = max(end, off)
814 size = end - off
815 }
816 fin := s.outclosed.isSet() && off+size == s.out.end
817 shouldSend := size > 0 ||
818 s.outopened.shouldSendPTO(pto) ||
819 (fin && s.outclosed.shouldSendPTO(pto))
820 if !shouldSend {
821 return true
822 }
823 b, added := w.appendStreamFrame(s.id, off, int(size), fin)
824 if !added {
825 return false
826 }
827 s.out.copy(off, b)
828 end := off + int64(len(b))
829 if end > s.outmaxsent {
830 s.conn.streams.outflow.consume(end - s.outmaxsent)
831 s.outmaxsent = end
832 }
833 s.outunsent.sub(off, end)
834 s.frameOpensStream(pnum)
835 if fin {
836 s.outclosed.setSent(pnum)
837 }
838 if pto {
839 return true
840 }
841 if int64(len(b)) < size {
842 return false
843 }
844 }
845 }
846
847
848
849
850
851 func (s *Stream) frameOpensStream(pnum packetNumber) {
852 if !s.outopened.isReceived() {
853 s.outopened.setSent(pnum)
854 }
855 }
856
857
858 func dataToSend(start, end int64, outunsent, outacked rangeset[int64], pto bool) (sendStart, size int64) {
859 switch {
860 case pto:
861
862
863
864
865
866
867
868 for _, r := range outacked {
869 if r.start > start {
870 return start, r.start - start
871 }
872 }
873 return start, end - start
874 case outunsent.numRanges() > 0:
875 return outunsent.min(), outunsent[0].size()
876 default:
877 return end, 0
878 }
879 }
880
View as plain text