// Copyright 2023 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. //go:build go1.21 package quic import ( "context" "errors" "fmt" "io" "math" ) type Stream struct { id streamID conn *Conn // ingate's lock guards all receive-related state. // // The gate condition is set if a read from the stream will not block, // either because the stream has available data or because the read will fail. ingate gate in pipe // received data inwin int64 // last MAX_STREAM_DATA sent to the peer insendmax sentVal // set when we should send MAX_STREAM_DATA to the peer inmaxbuf int64 // maximum amount of data we will buffer insize int64 // stream final size; -1 before this is known inset rangeset[int64] // received ranges inclosed sentVal // set by CloseRead inresetcode int64 // RESET_STREAM code received from the peer; -1 if not reset // outgate's lock guards all send-related state. // // The gate condition is set if a write to the stream will not block, // either because the stream has available flow control or because // the write will fail. outgate gate out pipe // buffered data to send outflushed int64 // offset of last flush call outwin int64 // maximum MAX_STREAM_DATA received from the peer outmaxsent int64 // maximum data offset we've sent to the peer outmaxbuf int64 // maximum amount of data we will buffer outunsent rangeset[int64] // ranges buffered but not yet sent (only flushed data) outacked rangeset[int64] // ranges sent and acknowledged outopened sentVal // set if we should open the stream outclosed sentVal // set by CloseWrite outblocked sentVal // set when a write to the stream is blocked by flow control outreset sentVal // set by Reset outresetcode uint64 // reset code to send in RESET_STREAM outdone chan struct{} // closed when all data sent // Atomic stream state bits. // // These bits provide a fast way to coordinate between the // send and receive sides of the stream, and the conn's loop. // // streamIn* bits must be set with ingate held. // streamOut* bits must be set with outgate held. // streamConn* bits are set by the conn's loop. // streamQueue* bits must be set with streamsState.sendMu held. state atomicBits[streamState] prev, next *Stream // guarded by streamsState.sendMu } type streamState uint32 const ( // streamInSendMeta is set when there are frames to send for the // inbound side of the stream. For example, MAX_STREAM_DATA. // Inbound frames are never flow-controlled. streamInSendMeta = streamState(1 << iota) // streamOutSendMeta is set when there are non-flow-controlled frames // to send for the outbound side of the stream. For example, STREAM_DATA_BLOCKED. // streamOutSendData is set when there are no non-flow-controlled outbound frames // and the stream has data to send. // // At most one of streamOutSendMeta and streamOutSendData is set at any time. streamOutSendMeta streamOutSendData // streamInDone and streamOutDone are set when the inbound or outbound // sides of the stream are finished. When both are set, the stream // can be removed from the Conn and forgotten. streamInDone streamOutDone // streamConnRemoved is set when the stream has been removed from the conn. streamConnRemoved // streamQueueMeta and streamQueueData indicate which of the streamsState // send queues the conn is currently on. streamQueueMeta streamQueueData ) type streamQueue int const ( noQueue = streamQueue(iota) metaQueue // streamsState.queueMeta dataQueue // streamsState.queueData ) // streamResetByConnClose is assigned to Stream.inresetcode to indicate that a stream // was implicitly reset when the connection closed. It's out of the range of // possible reset codes the peer can send. const streamResetByConnClose = math.MaxInt64 // wantQueue returns the send queue the stream should be on. func (s streamState) wantQueue() streamQueue { switch { case s&(streamInSendMeta|streamOutSendMeta) != 0: return metaQueue case s&(streamInDone|streamOutDone|streamConnRemoved) == streamInDone|streamOutDone: return metaQueue case s&streamOutSendData != 0: // The stream has no non-flow-controlled frames to send, // but does have data. Put it on the data queue, which is only // processed when flow control is available. return dataQueue } return noQueue } // inQueue returns the send queue the stream is currently on. func (s streamState) inQueue() streamQueue { switch { case s&streamQueueMeta != 0: return metaQueue case s&streamQueueData != 0: return dataQueue } return noQueue } // newStream returns a new stream. // // The stream's ingate and outgate are locked. // (We create the stream with locked gates so after the caller // initializes the flow control window, // unlocking outgate will set the stream writability state.) func newStream(c *Conn, id streamID) *Stream { s := &Stream{ conn: c, id: id, insize: -1, // -1 indicates the stream size is unknown inresetcode: -1, // -1 indicates no RESET_STREAM received ingate: newLockedGate(), outgate: newLockedGate(), } if !s.IsReadOnly() { s.outdone = make(chan struct{}) } return s } // IsReadOnly reports whether the stream is read-only // (a unidirectional stream created by the peer). func (s *Stream) IsReadOnly() bool { return s.id.streamType() == uniStream && s.id.initiator() != s.conn.side } // IsWriteOnly reports whether the stream is write-only // (a unidirectional stream created locally). func (s *Stream) IsWriteOnly() bool { return s.id.streamType() == uniStream && s.id.initiator() == s.conn.side } // Read reads data from the stream. // See ReadContext for more details. func (s *Stream) Read(b []byte) (n int, err error) { return s.ReadContext(context.Background(), b) } // ReadContext reads data from the stream. // // ReadContext returns as soon as at least one byte of data is available. // // If the peer closes the stream cleanly, ReadContext returns io.EOF after // returning all data sent by the peer. // If the peer aborts reads on the stream, ReadContext returns // an error wrapping StreamResetCode. func (s *Stream) ReadContext(ctx context.Context, b []byte) (n int, err error) { if s.IsWriteOnly() { return 0, errors.New("read from write-only stream") } if err := s.ingate.waitAndLock(ctx, s.conn.testHooks); err != nil { return 0, err } defer func() { s.inUnlock() s.conn.handleStreamBytesReadOffLoop(int64(n)) // must be done with ingate unlocked }() if s.inresetcode != -1 { return 0, fmt.Errorf("stream reset by peer: %w", StreamErrorCode(s.inresetcode)) } if s.inclosed.isSet() { return 0, errors.New("read from closed stream") } if s.insize == s.in.start { return 0, io.EOF } // Getting here indicates the stream contains data to be read. if len(s.inset) < 1 || s.inset[0].start != 0 || s.inset[0].end <= s.in.start { panic("BUG: inconsistent input stream state") } if size := int(s.inset[0].end - s.in.start); size < len(b) { b = b[:size] } start := s.in.start end := start + int64(len(b)) s.in.copy(start, b) s.in.discardBefore(end) if s.insize == -1 || s.insize > s.inwin { if shouldUpdateFlowControl(s.inmaxbuf, s.in.start+s.inmaxbuf-s.inwin) { // Update stream flow control with a STREAM_MAX_DATA frame. s.insendmax.setUnsent() } } if end == s.insize { return len(b), io.EOF } return len(b), nil } // shouldUpdateFlowControl determines whether to send a flow control window update. // // We want to balance keeping the peer well-supplied with flow control with not sending // many small updates. func shouldUpdateFlowControl(maxWindow, addedWindow int64) bool { return addedWindow >= maxWindow/8 } // Write writes data to the stream. // See WriteContext for more details. func (s *Stream) Write(b []byte) (n int, err error) { return s.WriteContext(context.Background(), b) } // WriteContext writes data to the stream. // // WriteContext writes data to the stream write buffer. // Buffered data is only sent when the buffer is sufficiently full. // Call the Flush method to ensure buffered data is sent. func (s *Stream) WriteContext(ctx context.Context, b []byte) (n int, err error) { if s.IsReadOnly() { return 0, errors.New("write to read-only stream") } canWrite := s.outgate.lock() for { // The first time through this loop, we may or may not be write blocked. // We exit the loop after writing all data, so on subsequent passes through // the loop we are always write blocked. if len(b) > 0 && !canWrite { // Our send buffer is full. Wait for the peer to ack some data. s.outUnlock() if err := s.outgate.waitAndLock(ctx, s.conn.testHooks); err != nil { return n, err } // Successfully returning from waitAndLockGate means we are no longer // write blocked. (Unlike traditional condition variables, gates do not // have spurious wakeups.) } if s.outreset.isSet() { s.outUnlock() return n, errors.New("write to reset stream") } if s.outclosed.isSet() { s.outUnlock() return n, errors.New("write to closed stream") } if len(b) == 0 { break } // Write limit is our send buffer limit. // This is a stream offset. lim := s.out.start + s.outmaxbuf // Amount to write is min(the full buffer, data up to the write limit). // This is a number of bytes. nn := min(int64(len(b)), lim-s.out.end) // Copy the data into the output buffer. s.out.writeAt(b[:nn], s.out.end) b = b[nn:] n += int(nn) // Possibly flush the output buffer. // We automatically flush if: // - We have enough data to consume the send window. // Sending this data may cause the peer to extend the window. // - We have buffered as much data as we're willing do. // We need to send data to clear out buffer space. // - We have enough data to fill a 1-RTT packet using the smallest // possible maximum datagram size (1200 bytes, less header byte, // connection ID, packet number, and AEAD overhead). const autoFlushSize = smallestMaxDatagramSize - 1 - connIDLen - 1 - aeadOverhead shouldFlush := s.out.end >= s.outwin || // peer send window is full s.out.end >= lim || // local send buffer is full (s.out.end-s.outflushed) >= autoFlushSize // enough data buffered if shouldFlush { s.flushLocked() } if s.out.end > s.outwin { // We're blocked by flow control. // Send a STREAM_DATA_BLOCKED frame to let the peer know. s.outblocked.set() } // If we have bytes left to send, we're blocked. canWrite = false } s.outUnlock() return n, nil } // Flush flushes data written to the stream. // It does not wait for the peer to acknowledge receipt of the data. // Use CloseContext to wait for the peer's acknowledgement. func (s *Stream) Flush() { s.outgate.lock() defer s.outUnlock() s.flushLocked() } func (s *Stream) flushLocked() { s.outopened.set() if s.outflushed < s.outwin { s.outunsent.add(s.outflushed, min(s.outwin, s.out.end)) } s.outflushed = s.out.end } // Close closes the stream. // See CloseContext for more details. func (s *Stream) Close() error { return s.CloseContext(context.Background()) } // CloseContext closes the stream. // Any blocked stream operations will be unblocked and return errors. // // CloseContext flushes any data in the stream write buffer and waits for the peer to // acknowledge receipt of the data. // If the stream has been reset, it waits for the peer to acknowledge the reset. // If the context expires before the peer receives the stream's data, // CloseContext discards the buffer and returns the context error. func (s *Stream) CloseContext(ctx context.Context) error { s.CloseRead() if s.IsReadOnly() { return nil } s.CloseWrite() // TODO: Return code from peer's RESET_STREAM frame? if err := s.conn.waitOnDone(ctx, s.outdone); err != nil { return err } s.outgate.lock() defer s.outUnlock() if s.outclosed.isReceived() && s.outacked.isrange(0, s.out.end) { return nil } return errors.New("stream reset") } // CloseRead aborts reads on the stream. // Any blocked reads will be unblocked and return errors. // // CloseRead notifies the peer that the stream has been closed for reading. // It does not wait for the peer to acknowledge the closure. // Use CloseContext to wait for the peer's acknowledgement. func (s *Stream) CloseRead() { if s.IsWriteOnly() { return } s.ingate.lock() if s.inset.isrange(0, s.insize) || s.inresetcode != -1 { // We've already received all data from the peer, // so there's no need to send STOP_SENDING. // This is the same as saying we sent one and they got it. s.inclosed.setReceived() } else { s.inclosed.set() } discarded := s.in.end - s.in.start s.in.discardBefore(s.in.end) s.inUnlock() s.conn.handleStreamBytesReadOffLoop(discarded) // must be done with ingate unlocked } // CloseWrite aborts writes on the stream. // Any blocked writes will be unblocked and return errors. // // CloseWrite sends any data in the stream write buffer to the peer. // It does not wait for the peer to acknowledge receipt of the data. // Use CloseContext to wait for the peer's acknowledgement. func (s *Stream) CloseWrite() { if s.IsReadOnly() { return } s.outgate.lock() defer s.outUnlock() s.outclosed.set() s.flushLocked() } // Reset aborts writes on the stream and notifies the peer // that the stream was terminated abruptly. // Any blocked writes will be unblocked and return errors. // // Reset sends the application protocol error code, which must be // less than 2^62, to the peer. // It does not wait for the peer to acknowledge receipt of the error. // Use CloseContext to wait for the peer's acknowledgement. // // Reset does not affect reads. // Use CloseRead to abort reads on the stream. func (s *Stream) Reset(code uint64) { const userClosed = true s.resetInternal(code, userClosed) } // resetInternal resets the send side of the stream. // // If userClosed is true, this is s.Reset. // If userClosed is false, this is a reaction to a STOP_SENDING frame. func (s *Stream) resetInternal(code uint64, userClosed bool) { s.outgate.lock() defer s.outUnlock() if s.IsReadOnly() { return } if userClosed { // Mark that the user closed the stream. s.outclosed.set() } if s.outreset.isSet() { return } if code > maxVarint { code = maxVarint } // We could check here to see if the stream is closed and the // peer has acked all the data and the FIN, but sending an // extra RESET_STREAM in this case is harmless. s.outreset.set() s.outresetcode = code s.out.discardBefore(s.out.end) s.outunsent = rangeset[int64]{} s.outblocked.clear() } // connHasClosed indicates the stream's conn has closed. func (s *Stream) connHasClosed() { // If we're in the closing state, the user closed the conn. // Otherwise, we the peer initiated the close. // This only matters for the error we're going to return from stream operations. localClose := s.conn.lifetime.state == connStateClosing s.ingate.lock() if !s.inset.isrange(0, s.insize) && s.inresetcode == -1 { if localClose { s.inclosed.set() } else { s.inresetcode = streamResetByConnClose } } s.inUnlock() s.outgate.lock() if localClose { s.outclosed.set() } s.outreset.set() s.outUnlock() } // inUnlock unlocks s.ingate. // It sets the gate condition if reads from s will not block. // If s has receive-related frames to write or if both directions // are done and the stream should be removed, it notifies the Conn. func (s *Stream) inUnlock() { state := s.inUnlockNoQueue() s.conn.maybeQueueStreamForSend(s, state) } // inUnlockNoQueue is inUnlock, // but reports whether s has frames to write rather than notifying the Conn. func (s *Stream) inUnlockNoQueue() streamState { canRead := s.inset.contains(s.in.start) || // data available to read s.insize == s.in.start || // at EOF s.inresetcode != -1 || // reset by peer s.inclosed.isSet() // closed locally defer s.ingate.unlock(canRead) var state streamState switch { case s.IsWriteOnly(): state = streamInDone case s.inresetcode != -1: // reset by peer fallthrough case s.in.start == s.insize: // all data received and read // We don't increase MAX_STREAMS until the user calls ReadClose or Close, // so the receive side is not finished until inclosed is set. if s.inclosed.isSet() { state = streamInDone } case s.insendmax.shouldSend(): // STREAM_MAX_DATA state = streamInSendMeta case s.inclosed.shouldSend(): // STOP_SENDING state = streamInSendMeta } const mask = streamInDone | streamInSendMeta return s.state.set(state, mask) } // outUnlock unlocks s.outgate. // It sets the gate condition if writes to s will not block. // If s has send-related frames to write or if both directions // are done and the stream should be removed, it notifies the Conn. func (s *Stream) outUnlock() { state := s.outUnlockNoQueue() s.conn.maybeQueueStreamForSend(s, state) } // outUnlockNoQueue is outUnlock, // but reports whether s has frames to write rather than notifying the Conn. func (s *Stream) outUnlockNoQueue() streamState { isDone := s.outclosed.isReceived() && s.outacked.isrange(0, s.out.end) || // all data acked s.outreset.isSet() // reset locally if isDone { select { case <-s.outdone: default: if !s.IsReadOnly() { close(s.outdone) } } } lim := s.out.start + s.outmaxbuf canWrite := lim > s.out.end || // available send buffer s.outclosed.isSet() || // closed locally s.outreset.isSet() // reset locally defer s.outgate.unlock(canWrite) var state streamState switch { case s.IsReadOnly(): state = streamOutDone case s.outclosed.isReceived() && s.outacked.isrange(0, s.out.end): // all data sent and acked fallthrough case s.outreset.isReceived(): // RESET_STREAM sent and acked // We don't increase MAX_STREAMS until the user calls WriteClose or Close, // so the send side is not finished until outclosed is set. if s.outclosed.isSet() { state = streamOutDone } case s.outreset.shouldSend(): // RESET_STREAM state = streamOutSendMeta case s.outreset.isSet(): // RESET_STREAM sent but not acknowledged case s.outblocked.shouldSend(): // STREAM_DATA_BLOCKED state = streamOutSendMeta case len(s.outunsent) > 0: // STREAM frame with data if s.outunsent.min() < s.outmaxsent { state = streamOutSendMeta // resent data, will not consume flow control } else { state = streamOutSendData // new data, requires flow control } case s.outclosed.shouldSend() && s.out.end == s.outmaxsent: // empty STREAM frame with FIN bit state = streamOutSendMeta case s.outopened.shouldSend(): // STREAM frame with no data state = streamOutSendMeta } const mask = streamOutDone | streamOutSendMeta | streamOutSendData return s.state.set(state, mask) } // handleData handles data received in a STREAM frame. func (s *Stream) handleData(off int64, b []byte, fin bool) error { s.ingate.lock() defer s.inUnlock() end := off + int64(len(b)) if err := s.checkStreamBounds(end, fin); err != nil { return err } if s.inclosed.isSet() || s.inresetcode != -1 { // The user read-closed the stream, or the peer reset it. // Either way, we can discard this frame. return nil } if s.insize == -1 && end > s.in.end { added := end - s.in.end if err := s.conn.handleStreamBytesReceived(added); err != nil { return err } } s.in.writeAt(b, off) s.inset.add(off, end) if fin { s.insize = end // The peer has enough flow control window to send the entire stream. s.insendmax.clear() } return nil } // handleReset handles a RESET_STREAM frame. func (s *Stream) handleReset(code uint64, finalSize int64) error { s.ingate.lock() defer s.inUnlock() const fin = true if err := s.checkStreamBounds(finalSize, fin); err != nil { return err } if s.inresetcode != -1 { // The stream was already reset. return nil } if s.insize == -1 { added := finalSize - s.in.end if err := s.conn.handleStreamBytesReceived(added); err != nil { return err } } s.conn.handleStreamBytesReadOnLoop(finalSize - s.in.start) s.in.discardBefore(s.in.end) s.inresetcode = int64(code) s.insize = finalSize return nil } // checkStreamBounds validates the stream offset in a STREAM or RESET_STREAM frame. func (s *Stream) checkStreamBounds(end int64, fin bool) error { if end > s.inwin { // The peer sent us data past the maximum flow control window we gave them. return localTransportError{ code: errFlowControl, reason: "stream flow control window exceeded", } } if s.insize != -1 && end > s.insize { // The peer sent us data past the final size of the stream they previously gave us. return localTransportError{ code: errFinalSize, reason: "data received past end of stream", } } if fin && s.insize != -1 && end != s.insize { // The peer changed the final size of the stream. return localTransportError{ code: errFinalSize, reason: "final size of stream changed", } } if fin && end < s.in.end { // The peer has previously sent us data past the final size. return localTransportError{ code: errFinalSize, reason: "end of stream occurs before prior data", } } return nil } // handleStopSending handles a STOP_SENDING frame. func (s *Stream) handleStopSending(code uint64) error { // Peer requests that we reset this stream. // https://www.rfc-editor.org/rfc/rfc9000#section-3.5-4 const userReset = false s.resetInternal(code, userReset) return nil } // handleMaxStreamData handles an update received in a MAX_STREAM_DATA frame. func (s *Stream) handleMaxStreamData(maxStreamData int64) error { s.outgate.lock() defer s.outUnlock() if maxStreamData <= s.outwin { return nil } if s.outflushed > s.outwin { s.outunsent.add(s.outwin, min(maxStreamData, s.outflushed)) } s.outwin = maxStreamData if s.out.end > s.outwin { // We've still got more data than flow control window. s.outblocked.setUnsent() } else { s.outblocked.clear() } return nil } // ackOrLoss handles the fate of stream frames other than STREAM. func (s *Stream) ackOrLoss(pnum packetNumber, ftype byte, fate packetFate) { // Frames which carry new information each time they are sent // (MAX_STREAM_DATA, STREAM_DATA_BLOCKED) must only be marked // as received if the most recent packet carrying this frame is acked. // // Frames which are always the same (STOP_SENDING, RESET_STREAM) // can be marked as received if any packet carrying this frame is acked. switch ftype { case frameTypeResetStream: s.outgate.lock() s.outreset.ackOrLoss(pnum, fate) s.outUnlock() case frameTypeStopSending: s.ingate.lock() s.inclosed.ackOrLoss(pnum, fate) s.inUnlock() case frameTypeMaxStreamData: s.ingate.lock() s.insendmax.ackLatestOrLoss(pnum, fate) s.inUnlock() case frameTypeStreamDataBlocked: s.outgate.lock() s.outblocked.ackLatestOrLoss(pnum, fate) s.outUnlock() default: panic("unhandled frame type") } } // ackOrLossData handles the fate of a STREAM frame. func (s *Stream) ackOrLossData(pnum packetNumber, start, end int64, fin bool, fate packetFate) { s.outgate.lock() defer s.outUnlock() s.outopened.ackOrLoss(pnum, fate) if fin { s.outclosed.ackOrLoss(pnum, fate) } if s.outreset.isSet() { // If the stream has been reset, we don't care any more. return } switch fate { case packetAcked: s.outacked.add(start, end) s.outunsent.sub(start, end) // If this ack is for data at the start of the send buffer, we can now discard it. if s.outacked.contains(s.out.start) { s.out.discardBefore(s.outacked[0].end) } case packetLost: // Mark everything lost, but not previously acked, as needing retransmission. // We do this by adding all the lost bytes to outunsent, and then // removing everything already acked. s.outunsent.add(start, end) for _, a := range s.outacked { s.outunsent.sub(a.start, a.end) } } } // appendInFramesLocked appends STOP_SENDING and MAX_STREAM_DATA frames // to the current packet. // // It returns true if no more frames need appending, // false if not everything fit in the current packet. func (s *Stream) appendInFramesLocked(w *packetWriter, pnum packetNumber, pto bool) bool { if s.inclosed.shouldSendPTO(pto) { // We don't currently have an API for setting the error code. // Just send zero. code := uint64(0) if !w.appendStopSendingFrame(s.id, code) { return false } s.inclosed.setSent(pnum) } // TODO: STOP_SENDING if s.insendmax.shouldSendPTO(pto) { // MAX_STREAM_DATA maxStreamData := s.in.start + s.inmaxbuf if !w.appendMaxStreamDataFrame(s.id, maxStreamData) { return false } s.inwin = maxStreamData s.insendmax.setSent(pnum) } return true } // appendOutFramesLocked appends RESET_STREAM, STREAM_DATA_BLOCKED, and STREAM frames // to the current packet. // // It returns true if no more frames need appending, // false if not everything fit in the current packet. func (s *Stream) appendOutFramesLocked(w *packetWriter, pnum packetNumber, pto bool) bool { if s.outreset.isSet() { // RESET_STREAM if s.outreset.shouldSendPTO(pto) { if !w.appendResetStreamFrame(s.id, s.outresetcode, min(s.outwin, s.out.end)) { return false } s.outreset.setSent(pnum) s.frameOpensStream(pnum) } return true } if s.outblocked.shouldSendPTO(pto) { // STREAM_DATA_BLOCKED if !w.appendStreamDataBlockedFrame(s.id, s.outwin) { return false } s.outblocked.setSent(pnum) s.frameOpensStream(pnum) } for { // STREAM off, size := dataToSend(min(s.out.start, s.outwin), min(s.outflushed, s.outwin), s.outunsent, s.outacked, pto) if end := off + size; end > s.outmaxsent { // This will require connection-level flow control to send. end = min(end, s.outmaxsent+s.conn.streams.outflow.avail()) end = max(end, off) size = end - off } fin := s.outclosed.isSet() && off+size == s.out.end shouldSend := size > 0 || // have data to send s.outopened.shouldSendPTO(pto) || // should open the stream (fin && s.outclosed.shouldSendPTO(pto)) // should close the stream if !shouldSend { return true } b, added := w.appendStreamFrame(s.id, off, int(size), fin) if !added { return false } s.out.copy(off, b) end := off + int64(len(b)) if end > s.outmaxsent { s.conn.streams.outflow.consume(end - s.outmaxsent) s.outmaxsent = end } s.outunsent.sub(off, end) s.frameOpensStream(pnum) if fin { s.outclosed.setSent(pnum) } if pto { return true } if int64(len(b)) < size { return false } } } // frameOpensStream records that we're sending a frame that will open the stream. // // If we don't have an acknowledgement from the peer for a previous frame opening the stream, // record this packet as being the latest one to open it. func (s *Stream) frameOpensStream(pnum packetNumber) { if !s.outopened.isReceived() { s.outopened.setSent(pnum) } } // dataToSend returns the next range of data to send in a STREAM or CRYPTO_STREAM. func dataToSend(start, end int64, outunsent, outacked rangeset[int64], pto bool) (sendStart, size int64) { switch { case pto: // On PTO, resend unacked data that fits in the probe packet. // For simplicity, we send the range starting at s.out.start // (which is definitely unacked, or else we would have discarded it) // up to the next acked byte (if any). // // This may miss unacked data starting after that acked byte, // but avoids resending data the peer has acked. for _, r := range outacked { if r.start > start { return start, r.start - start } } return start, end - start case outunsent.numRanges() > 0: return outunsent.min(), outunsent[0].size() default: return end, 0 } }