...

Source file src/golang.org/x/net/internal/quic/stream.go

Documentation: golang.org/x/net/internal/quic

     1  // Copyright 2023 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  //go:build go1.21
     6  
     7  package quic
     8  
     9  import (
    10  	"context"
    11  	"errors"
    12  	"fmt"
    13  	"io"
    14  	"math"
    15  )
    16  
    17  type Stream struct {
    18  	id   streamID
    19  	conn *Conn
    20  
    21  	// ingate's lock guards all receive-related state.
    22  	//
    23  	// The gate condition is set if a read from the stream will not block,
    24  	// either because the stream has available data or because the read will fail.
    25  	ingate      gate
    26  	in          pipe            // received data
    27  	inwin       int64           // last MAX_STREAM_DATA sent to the peer
    28  	insendmax   sentVal         // set when we should send MAX_STREAM_DATA to the peer
    29  	inmaxbuf    int64           // maximum amount of data we will buffer
    30  	insize      int64           // stream final size; -1 before this is known
    31  	inset       rangeset[int64] // received ranges
    32  	inclosed    sentVal         // set by CloseRead
    33  	inresetcode int64           // RESET_STREAM code received from the peer; -1 if not reset
    34  
    35  	// outgate's lock guards all send-related state.
    36  	//
    37  	// The gate condition is set if a write to the stream will not block,
    38  	// either because the stream has available flow control or because
    39  	// the write will fail.
    40  	outgate      gate
    41  	out          pipe            // buffered data to send
    42  	outflushed   int64           // offset of last flush call
    43  	outwin       int64           // maximum MAX_STREAM_DATA received from the peer
    44  	outmaxsent   int64           // maximum data offset we've sent to the peer
    45  	outmaxbuf    int64           // maximum amount of data we will buffer
    46  	outunsent    rangeset[int64] // ranges buffered but not yet sent (only flushed data)
    47  	outacked     rangeset[int64] // ranges sent and acknowledged
    48  	outopened    sentVal         // set if we should open the stream
    49  	outclosed    sentVal         // set by CloseWrite
    50  	outblocked   sentVal         // set when a write to the stream is blocked by flow control
    51  	outreset     sentVal         // set by Reset
    52  	outresetcode uint64          // reset code to send in RESET_STREAM
    53  	outdone      chan struct{}   // closed when all data sent
    54  
    55  	// Atomic stream state bits.
    56  	//
    57  	// These bits provide a fast way to coordinate between the
    58  	// send and receive sides of the stream, and the conn's loop.
    59  	//
    60  	// streamIn* bits must be set with ingate held.
    61  	// streamOut* bits must be set with outgate held.
    62  	// streamConn* bits are set by the conn's loop.
    63  	// streamQueue* bits must be set with streamsState.sendMu held.
    64  	state atomicBits[streamState]
    65  
    66  	prev, next *Stream // guarded by streamsState.sendMu
    67  }
    68  
    69  type streamState uint32
    70  
    71  const (
    72  	// streamInSendMeta is set when there are frames to send for the
    73  	// inbound side of the stream. For example, MAX_STREAM_DATA.
    74  	// Inbound frames are never flow-controlled.
    75  	streamInSendMeta = streamState(1 << iota)
    76  
    77  	// streamOutSendMeta is set when there are non-flow-controlled frames
    78  	// to send for the outbound side of the stream. For example, STREAM_DATA_BLOCKED.
    79  	// streamOutSendData is set when there are no non-flow-controlled outbound frames
    80  	// and the stream has data to send.
    81  	//
    82  	// At most one of streamOutSendMeta and streamOutSendData is set at any time.
    83  	streamOutSendMeta
    84  	streamOutSendData
    85  
    86  	// streamInDone and streamOutDone are set when the inbound or outbound
    87  	// sides of the stream are finished. When both are set, the stream
    88  	// can be removed from the Conn and forgotten.
    89  	streamInDone
    90  	streamOutDone
    91  
    92  	// streamConnRemoved is set when the stream has been removed from the conn.
    93  	streamConnRemoved
    94  
    95  	// streamQueueMeta and streamQueueData indicate which of the streamsState
    96  	// send queues the conn is currently on.
    97  	streamQueueMeta
    98  	streamQueueData
    99  )
   100  
   101  type streamQueue int
   102  
   103  const (
   104  	noQueue   = streamQueue(iota)
   105  	metaQueue // streamsState.queueMeta
   106  	dataQueue // streamsState.queueData
   107  )
   108  
   109  // streamResetByConnClose is assigned to Stream.inresetcode to indicate that a stream
   110  // was implicitly reset when the connection closed. It's out of the range of
   111  // possible reset codes the peer can send.
   112  const streamResetByConnClose = math.MaxInt64
   113  
   114  // wantQueue returns the send queue the stream should be on.
   115  func (s streamState) wantQueue() streamQueue {
   116  	switch {
   117  	case s&(streamInSendMeta|streamOutSendMeta) != 0:
   118  		return metaQueue
   119  	case s&(streamInDone|streamOutDone|streamConnRemoved) == streamInDone|streamOutDone:
   120  		return metaQueue
   121  	case s&streamOutSendData != 0:
   122  		// The stream has no non-flow-controlled frames to send,
   123  		// but does have data. Put it on the data queue, which is only
   124  		// processed when flow control is available.
   125  		return dataQueue
   126  	}
   127  	return noQueue
   128  }
   129  
   130  // inQueue returns the send queue the stream is currently on.
   131  func (s streamState) inQueue() streamQueue {
   132  	switch {
   133  	case s&streamQueueMeta != 0:
   134  		return metaQueue
   135  	case s&streamQueueData != 0:
   136  		return dataQueue
   137  	}
   138  	return noQueue
   139  }
   140  
   141  // newStream returns a new stream.
   142  //
   143  // The stream's ingate and outgate are locked.
   144  // (We create the stream with locked gates so after the caller
   145  // initializes the flow control window,
   146  // unlocking outgate will set the stream writability state.)
   147  func newStream(c *Conn, id streamID) *Stream {
   148  	s := &Stream{
   149  		conn:        c,
   150  		id:          id,
   151  		insize:      -1, // -1 indicates the stream size is unknown
   152  		inresetcode: -1, // -1 indicates no RESET_STREAM received
   153  		ingate:      newLockedGate(),
   154  		outgate:     newLockedGate(),
   155  	}
   156  	if !s.IsReadOnly() {
   157  		s.outdone = make(chan struct{})
   158  	}
   159  	return s
   160  }
   161  
   162  // IsReadOnly reports whether the stream is read-only
   163  // (a unidirectional stream created by the peer).
   164  func (s *Stream) IsReadOnly() bool {
   165  	return s.id.streamType() == uniStream && s.id.initiator() != s.conn.side
   166  }
   167  
   168  // IsWriteOnly reports whether the stream is write-only
   169  // (a unidirectional stream created locally).
   170  func (s *Stream) IsWriteOnly() bool {
   171  	return s.id.streamType() == uniStream && s.id.initiator() == s.conn.side
   172  }
   173  
   174  // Read reads data from the stream.
   175  // See ReadContext for more details.
   176  func (s *Stream) Read(b []byte) (n int, err error) {
   177  	return s.ReadContext(context.Background(), b)
   178  }
   179  
   180  // ReadContext reads data from the stream.
   181  //
   182  // ReadContext returns as soon as at least one byte of data is available.
   183  //
   184  // If the peer closes the stream cleanly, ReadContext returns io.EOF after
   185  // returning all data sent by the peer.
   186  // If the peer aborts reads on the stream, ReadContext returns
   187  // an error wrapping StreamResetCode.
   188  func (s *Stream) ReadContext(ctx context.Context, b []byte) (n int, err error) {
   189  	if s.IsWriteOnly() {
   190  		return 0, errors.New("read from write-only stream")
   191  	}
   192  	if err := s.ingate.waitAndLock(ctx, s.conn.testHooks); err != nil {
   193  		return 0, err
   194  	}
   195  	defer func() {
   196  		s.inUnlock()
   197  		s.conn.handleStreamBytesReadOffLoop(int64(n)) // must be done with ingate unlocked
   198  	}()
   199  	if s.inresetcode != -1 {
   200  		return 0, fmt.Errorf("stream reset by peer: %w", StreamErrorCode(s.inresetcode))
   201  	}
   202  	if s.inclosed.isSet() {
   203  		return 0, errors.New("read from closed stream")
   204  	}
   205  	if s.insize == s.in.start {
   206  		return 0, io.EOF
   207  	}
   208  	// Getting here indicates the stream contains data to be read.
   209  	if len(s.inset) < 1 || s.inset[0].start != 0 || s.inset[0].end <= s.in.start {
   210  		panic("BUG: inconsistent input stream state")
   211  	}
   212  	if size := int(s.inset[0].end - s.in.start); size < len(b) {
   213  		b = b[:size]
   214  	}
   215  	start := s.in.start
   216  	end := start + int64(len(b))
   217  	s.in.copy(start, b)
   218  	s.in.discardBefore(end)
   219  	if s.insize == -1 || s.insize > s.inwin {
   220  		if shouldUpdateFlowControl(s.inmaxbuf, s.in.start+s.inmaxbuf-s.inwin) {
   221  			// Update stream flow control with a STREAM_MAX_DATA frame.
   222  			s.insendmax.setUnsent()
   223  		}
   224  	}
   225  	if end == s.insize {
   226  		return len(b), io.EOF
   227  	}
   228  	return len(b), nil
   229  }
   230  
   231  // shouldUpdateFlowControl determines whether to send a flow control window update.
   232  //
   233  // We want to balance keeping the peer well-supplied with flow control with not sending
   234  // many small updates.
   235  func shouldUpdateFlowControl(maxWindow, addedWindow int64) bool {
   236  	return addedWindow >= maxWindow/8
   237  }
   238  
   239  // Write writes data to the stream.
   240  // See WriteContext for more details.
   241  func (s *Stream) Write(b []byte) (n int, err error) {
   242  	return s.WriteContext(context.Background(), b)
   243  }
   244  
   245  // WriteContext writes data to the stream.
   246  //
   247  // WriteContext writes data to the stream write buffer.
   248  // Buffered data is only sent when the buffer is sufficiently full.
   249  // Call the Flush method to ensure buffered data is sent.
   250  func (s *Stream) WriteContext(ctx context.Context, b []byte) (n int, err error) {
   251  	if s.IsReadOnly() {
   252  		return 0, errors.New("write to read-only stream")
   253  	}
   254  	canWrite := s.outgate.lock()
   255  	for {
   256  		// The first time through this loop, we may or may not be write blocked.
   257  		// We exit the loop after writing all data, so on subsequent passes through
   258  		// the loop we are always write blocked.
   259  		if len(b) > 0 && !canWrite {
   260  			// Our send buffer is full. Wait for the peer to ack some data.
   261  			s.outUnlock()
   262  			if err := s.outgate.waitAndLock(ctx, s.conn.testHooks); err != nil {
   263  				return n, err
   264  			}
   265  			// Successfully returning from waitAndLockGate means we are no longer
   266  			// write blocked. (Unlike traditional condition variables, gates do not
   267  			// have spurious wakeups.)
   268  		}
   269  		if s.outreset.isSet() {
   270  			s.outUnlock()
   271  			return n, errors.New("write to reset stream")
   272  		}
   273  		if s.outclosed.isSet() {
   274  			s.outUnlock()
   275  			return n, errors.New("write to closed stream")
   276  		}
   277  		if len(b) == 0 {
   278  			break
   279  		}
   280  		// Write limit is our send buffer limit.
   281  		// This is a stream offset.
   282  		lim := s.out.start + s.outmaxbuf
   283  		// Amount to write is min(the full buffer, data up to the write limit).
   284  		// This is a number of bytes.
   285  		nn := min(int64(len(b)), lim-s.out.end)
   286  		// Copy the data into the output buffer.
   287  		s.out.writeAt(b[:nn], s.out.end)
   288  		b = b[nn:]
   289  		n += int(nn)
   290  		// Possibly flush the output buffer.
   291  		// We automatically flush if:
   292  		//   - We have enough data to consume the send window.
   293  		//     Sending this data may cause the peer to extend the window.
   294  		//   - We have buffered as much data as we're willing do.
   295  		//     We need to send data to clear out buffer space.
   296  		//   - We have enough data to fill a 1-RTT packet using the smallest
   297  		//     possible maximum datagram size (1200 bytes, less header byte,
   298  		//     connection ID, packet number, and AEAD overhead).
   299  		const autoFlushSize = smallestMaxDatagramSize - 1 - connIDLen - 1 - aeadOverhead
   300  		shouldFlush := s.out.end >= s.outwin || // peer send window is full
   301  			s.out.end >= lim || // local send buffer is full
   302  			(s.out.end-s.outflushed) >= autoFlushSize // enough data buffered
   303  		if shouldFlush {
   304  			s.flushLocked()
   305  		}
   306  		if s.out.end > s.outwin {
   307  			// We're blocked by flow control.
   308  			// Send a STREAM_DATA_BLOCKED frame to let the peer know.
   309  			s.outblocked.set()
   310  		}
   311  		// If we have bytes left to send, we're blocked.
   312  		canWrite = false
   313  	}
   314  	s.outUnlock()
   315  	return n, nil
   316  }
   317  
   318  // Flush flushes data written to the stream.
   319  // It does not wait for the peer to acknowledge receipt of the data.
   320  // Use CloseContext to wait for the peer's acknowledgement.
   321  func (s *Stream) Flush() {
   322  	s.outgate.lock()
   323  	defer s.outUnlock()
   324  	s.flushLocked()
   325  }
   326  
   327  func (s *Stream) flushLocked() {
   328  	s.outopened.set()
   329  	if s.outflushed < s.outwin {
   330  		s.outunsent.add(s.outflushed, min(s.outwin, s.out.end))
   331  	}
   332  	s.outflushed = s.out.end
   333  }
   334  
   335  // Close closes the stream.
   336  // See CloseContext for more details.
   337  func (s *Stream) Close() error {
   338  	return s.CloseContext(context.Background())
   339  }
   340  
   341  // CloseContext closes the stream.
   342  // Any blocked stream operations will be unblocked and return errors.
   343  //
   344  // CloseContext flushes any data in the stream write buffer and waits for the peer to
   345  // acknowledge receipt of the data.
   346  // If the stream has been reset, it waits for the peer to acknowledge the reset.
   347  // If the context expires before the peer receives the stream's data,
   348  // CloseContext discards the buffer and returns the context error.
   349  func (s *Stream) CloseContext(ctx context.Context) error {
   350  	s.CloseRead()
   351  	if s.IsReadOnly() {
   352  		return nil
   353  	}
   354  	s.CloseWrite()
   355  	// TODO: Return code from peer's RESET_STREAM frame?
   356  	if err := s.conn.waitOnDone(ctx, s.outdone); err != nil {
   357  		return err
   358  	}
   359  	s.outgate.lock()
   360  	defer s.outUnlock()
   361  	if s.outclosed.isReceived() && s.outacked.isrange(0, s.out.end) {
   362  		return nil
   363  	}
   364  	return errors.New("stream reset")
   365  }
   366  
   367  // CloseRead aborts reads on the stream.
   368  // Any blocked reads will be unblocked and return errors.
   369  //
   370  // CloseRead notifies the peer that the stream has been closed for reading.
   371  // It does not wait for the peer to acknowledge the closure.
   372  // Use CloseContext to wait for the peer's acknowledgement.
   373  func (s *Stream) CloseRead() {
   374  	if s.IsWriteOnly() {
   375  		return
   376  	}
   377  	s.ingate.lock()
   378  	if s.inset.isrange(0, s.insize) || s.inresetcode != -1 {
   379  		// We've already received all data from the peer,
   380  		// so there's no need to send STOP_SENDING.
   381  		// This is the same as saying we sent one and they got it.
   382  		s.inclosed.setReceived()
   383  	} else {
   384  		s.inclosed.set()
   385  	}
   386  	discarded := s.in.end - s.in.start
   387  	s.in.discardBefore(s.in.end)
   388  	s.inUnlock()
   389  	s.conn.handleStreamBytesReadOffLoop(discarded) // must be done with ingate unlocked
   390  }
   391  
   392  // CloseWrite aborts writes on the stream.
   393  // Any blocked writes will be unblocked and return errors.
   394  //
   395  // CloseWrite sends any data in the stream write buffer to the peer.
   396  // It does not wait for the peer to acknowledge receipt of the data.
   397  // Use CloseContext to wait for the peer's acknowledgement.
   398  func (s *Stream) CloseWrite() {
   399  	if s.IsReadOnly() {
   400  		return
   401  	}
   402  	s.outgate.lock()
   403  	defer s.outUnlock()
   404  	s.outclosed.set()
   405  	s.flushLocked()
   406  }
   407  
   408  // Reset aborts writes on the stream and notifies the peer
   409  // that the stream was terminated abruptly.
   410  // Any blocked writes will be unblocked and return errors.
   411  //
   412  // Reset sends the application protocol error code, which must be
   413  // less than 2^62, to the peer.
   414  // It does not wait for the peer to acknowledge receipt of the error.
   415  // Use CloseContext to wait for the peer's acknowledgement.
   416  //
   417  // Reset does not affect reads.
   418  // Use CloseRead to abort reads on the stream.
   419  func (s *Stream) Reset(code uint64) {
   420  	const userClosed = true
   421  	s.resetInternal(code, userClosed)
   422  }
   423  
   424  // resetInternal resets the send side of the stream.
   425  //
   426  // If userClosed is true, this is s.Reset.
   427  // If userClosed is false, this is a reaction to a STOP_SENDING frame.
   428  func (s *Stream) resetInternal(code uint64, userClosed bool) {
   429  	s.outgate.lock()
   430  	defer s.outUnlock()
   431  	if s.IsReadOnly() {
   432  		return
   433  	}
   434  	if userClosed {
   435  		// Mark that the user closed the stream.
   436  		s.outclosed.set()
   437  	}
   438  	if s.outreset.isSet() {
   439  		return
   440  	}
   441  	if code > maxVarint {
   442  		code = maxVarint
   443  	}
   444  	// We could check here to see if the stream is closed and the
   445  	// peer has acked all the data and the FIN, but sending an
   446  	// extra RESET_STREAM in this case is harmless.
   447  	s.outreset.set()
   448  	s.outresetcode = code
   449  	s.out.discardBefore(s.out.end)
   450  	s.outunsent = rangeset[int64]{}
   451  	s.outblocked.clear()
   452  }
   453  
   454  // connHasClosed indicates the stream's conn has closed.
   455  func (s *Stream) connHasClosed() {
   456  	// If we're in the closing state, the user closed the conn.
   457  	// Otherwise, we the peer initiated the close.
   458  	// This only matters for the error we're going to return from stream operations.
   459  	localClose := s.conn.lifetime.state == connStateClosing
   460  
   461  	s.ingate.lock()
   462  	if !s.inset.isrange(0, s.insize) && s.inresetcode == -1 {
   463  		if localClose {
   464  			s.inclosed.set()
   465  		} else {
   466  			s.inresetcode = streamResetByConnClose
   467  		}
   468  	}
   469  	s.inUnlock()
   470  
   471  	s.outgate.lock()
   472  	if localClose {
   473  		s.outclosed.set()
   474  	}
   475  	s.outreset.set()
   476  	s.outUnlock()
   477  }
   478  
   479  // inUnlock unlocks s.ingate.
   480  // It sets the gate condition if reads from s will not block.
   481  // If s has receive-related frames to write or if both directions
   482  // are done and the stream should be removed, it notifies the Conn.
   483  func (s *Stream) inUnlock() {
   484  	state := s.inUnlockNoQueue()
   485  	s.conn.maybeQueueStreamForSend(s, state)
   486  }
   487  
   488  // inUnlockNoQueue is inUnlock,
   489  // but reports whether s has frames to write rather than notifying the Conn.
   490  func (s *Stream) inUnlockNoQueue() streamState {
   491  	canRead := s.inset.contains(s.in.start) || // data available to read
   492  		s.insize == s.in.start || // at EOF
   493  		s.inresetcode != -1 || // reset by peer
   494  		s.inclosed.isSet() // closed locally
   495  	defer s.ingate.unlock(canRead)
   496  	var state streamState
   497  	switch {
   498  	case s.IsWriteOnly():
   499  		state = streamInDone
   500  	case s.inresetcode != -1: // reset by peer
   501  		fallthrough
   502  	case s.in.start == s.insize: // all data received and read
   503  		// We don't increase MAX_STREAMS until the user calls ReadClose or Close,
   504  		// so the receive side is not finished until inclosed is set.
   505  		if s.inclosed.isSet() {
   506  			state = streamInDone
   507  		}
   508  	case s.insendmax.shouldSend(): // STREAM_MAX_DATA
   509  		state = streamInSendMeta
   510  	case s.inclosed.shouldSend(): // STOP_SENDING
   511  		state = streamInSendMeta
   512  	}
   513  	const mask = streamInDone | streamInSendMeta
   514  	return s.state.set(state, mask)
   515  }
   516  
   517  // outUnlock unlocks s.outgate.
   518  // It sets the gate condition if writes to s will not block.
   519  // If s has send-related frames to write or if both directions
   520  // are done and the stream should be removed, it notifies the Conn.
   521  func (s *Stream) outUnlock() {
   522  	state := s.outUnlockNoQueue()
   523  	s.conn.maybeQueueStreamForSend(s, state)
   524  }
   525  
   526  // outUnlockNoQueue is outUnlock,
   527  // but reports whether s has frames to write rather than notifying the Conn.
   528  func (s *Stream) outUnlockNoQueue() streamState {
   529  	isDone := s.outclosed.isReceived() && s.outacked.isrange(0, s.out.end) || // all data acked
   530  		s.outreset.isSet() // reset locally
   531  	if isDone {
   532  		select {
   533  		case <-s.outdone:
   534  		default:
   535  			if !s.IsReadOnly() {
   536  				close(s.outdone)
   537  			}
   538  		}
   539  	}
   540  	lim := s.out.start + s.outmaxbuf
   541  	canWrite := lim > s.out.end || // available send buffer
   542  		s.outclosed.isSet() || // closed locally
   543  		s.outreset.isSet() // reset locally
   544  	defer s.outgate.unlock(canWrite)
   545  	var state streamState
   546  	switch {
   547  	case s.IsReadOnly():
   548  		state = streamOutDone
   549  	case s.outclosed.isReceived() && s.outacked.isrange(0, s.out.end): // all data sent and acked
   550  		fallthrough
   551  	case s.outreset.isReceived(): // RESET_STREAM sent and acked
   552  		// We don't increase MAX_STREAMS until the user calls WriteClose or Close,
   553  		// so the send side is not finished until outclosed is set.
   554  		if s.outclosed.isSet() {
   555  			state = streamOutDone
   556  		}
   557  	case s.outreset.shouldSend(): // RESET_STREAM
   558  		state = streamOutSendMeta
   559  	case s.outreset.isSet(): // RESET_STREAM sent but not acknowledged
   560  	case s.outblocked.shouldSend(): // STREAM_DATA_BLOCKED
   561  		state = streamOutSendMeta
   562  	case len(s.outunsent) > 0: // STREAM frame with data
   563  		if s.outunsent.min() < s.outmaxsent {
   564  			state = streamOutSendMeta // resent data, will not consume flow control
   565  		} else {
   566  			state = streamOutSendData // new data, requires flow control
   567  		}
   568  	case s.outclosed.shouldSend() && s.out.end == s.outmaxsent: // empty STREAM frame with FIN bit
   569  		state = streamOutSendMeta
   570  	case s.outopened.shouldSend(): // STREAM frame with no data
   571  		state = streamOutSendMeta
   572  	}
   573  	const mask = streamOutDone | streamOutSendMeta | streamOutSendData
   574  	return s.state.set(state, mask)
   575  }
   576  
   577  // handleData handles data received in a STREAM frame.
   578  func (s *Stream) handleData(off int64, b []byte, fin bool) error {
   579  	s.ingate.lock()
   580  	defer s.inUnlock()
   581  	end := off + int64(len(b))
   582  	if err := s.checkStreamBounds(end, fin); err != nil {
   583  		return err
   584  	}
   585  	if s.inclosed.isSet() || s.inresetcode != -1 {
   586  		// The user read-closed the stream, or the peer reset it.
   587  		// Either way, we can discard this frame.
   588  		return nil
   589  	}
   590  	if s.insize == -1 && end > s.in.end {
   591  		added := end - s.in.end
   592  		if err := s.conn.handleStreamBytesReceived(added); err != nil {
   593  			return err
   594  		}
   595  	}
   596  	s.in.writeAt(b, off)
   597  	s.inset.add(off, end)
   598  	if fin {
   599  		s.insize = end
   600  		// The peer has enough flow control window to send the entire stream.
   601  		s.insendmax.clear()
   602  	}
   603  	return nil
   604  }
   605  
   606  // handleReset handles a RESET_STREAM frame.
   607  func (s *Stream) handleReset(code uint64, finalSize int64) error {
   608  	s.ingate.lock()
   609  	defer s.inUnlock()
   610  	const fin = true
   611  	if err := s.checkStreamBounds(finalSize, fin); err != nil {
   612  		return err
   613  	}
   614  	if s.inresetcode != -1 {
   615  		// The stream was already reset.
   616  		return nil
   617  	}
   618  	if s.insize == -1 {
   619  		added := finalSize - s.in.end
   620  		if err := s.conn.handleStreamBytesReceived(added); err != nil {
   621  			return err
   622  		}
   623  	}
   624  	s.conn.handleStreamBytesReadOnLoop(finalSize - s.in.start)
   625  	s.in.discardBefore(s.in.end)
   626  	s.inresetcode = int64(code)
   627  	s.insize = finalSize
   628  	return nil
   629  }
   630  
   631  // checkStreamBounds validates the stream offset in a STREAM or RESET_STREAM frame.
   632  func (s *Stream) checkStreamBounds(end int64, fin bool) error {
   633  	if end > s.inwin {
   634  		// The peer sent us data past the maximum flow control window we gave them.
   635  		return localTransportError{
   636  			code:   errFlowControl,
   637  			reason: "stream flow control window exceeded",
   638  		}
   639  	}
   640  	if s.insize != -1 && end > s.insize {
   641  		// The peer sent us data past the final size of the stream they previously gave us.
   642  		return localTransportError{
   643  			code:   errFinalSize,
   644  			reason: "data received past end of stream",
   645  		}
   646  	}
   647  	if fin && s.insize != -1 && end != s.insize {
   648  		// The peer changed the final size of the stream.
   649  		return localTransportError{
   650  			code:   errFinalSize,
   651  			reason: "final size of stream changed",
   652  		}
   653  	}
   654  	if fin && end < s.in.end {
   655  		// The peer has previously sent us data past the final size.
   656  		return localTransportError{
   657  			code:   errFinalSize,
   658  			reason: "end of stream occurs before prior data",
   659  		}
   660  	}
   661  	return nil
   662  }
   663  
   664  // handleStopSending handles a STOP_SENDING frame.
   665  func (s *Stream) handleStopSending(code uint64) error {
   666  	// Peer requests that we reset this stream.
   667  	// https://www.rfc-editor.org/rfc/rfc9000#section-3.5-4
   668  	const userReset = false
   669  	s.resetInternal(code, userReset)
   670  	return nil
   671  }
   672  
   673  // handleMaxStreamData handles an update received in a MAX_STREAM_DATA frame.
   674  func (s *Stream) handleMaxStreamData(maxStreamData int64) error {
   675  	s.outgate.lock()
   676  	defer s.outUnlock()
   677  	if maxStreamData <= s.outwin {
   678  		return nil
   679  	}
   680  	if s.outflushed > s.outwin {
   681  		s.outunsent.add(s.outwin, min(maxStreamData, s.outflushed))
   682  	}
   683  	s.outwin = maxStreamData
   684  	if s.out.end > s.outwin {
   685  		// We've still got more data than flow control window.
   686  		s.outblocked.setUnsent()
   687  	} else {
   688  		s.outblocked.clear()
   689  	}
   690  	return nil
   691  }
   692  
   693  // ackOrLoss handles the fate of stream frames other than STREAM.
   694  func (s *Stream) ackOrLoss(pnum packetNumber, ftype byte, fate packetFate) {
   695  	// Frames which carry new information each time they are sent
   696  	// (MAX_STREAM_DATA, STREAM_DATA_BLOCKED) must only be marked
   697  	// as received if the most recent packet carrying this frame is acked.
   698  	//
   699  	// Frames which are always the same (STOP_SENDING, RESET_STREAM)
   700  	// can be marked as received if any packet carrying this frame is acked.
   701  	switch ftype {
   702  	case frameTypeResetStream:
   703  		s.outgate.lock()
   704  		s.outreset.ackOrLoss(pnum, fate)
   705  		s.outUnlock()
   706  	case frameTypeStopSending:
   707  		s.ingate.lock()
   708  		s.inclosed.ackOrLoss(pnum, fate)
   709  		s.inUnlock()
   710  	case frameTypeMaxStreamData:
   711  		s.ingate.lock()
   712  		s.insendmax.ackLatestOrLoss(pnum, fate)
   713  		s.inUnlock()
   714  	case frameTypeStreamDataBlocked:
   715  		s.outgate.lock()
   716  		s.outblocked.ackLatestOrLoss(pnum, fate)
   717  		s.outUnlock()
   718  	default:
   719  		panic("unhandled frame type")
   720  	}
   721  }
   722  
   723  // ackOrLossData handles the fate of a STREAM frame.
   724  func (s *Stream) ackOrLossData(pnum packetNumber, start, end int64, fin bool, fate packetFate) {
   725  	s.outgate.lock()
   726  	defer s.outUnlock()
   727  	s.outopened.ackOrLoss(pnum, fate)
   728  	if fin {
   729  		s.outclosed.ackOrLoss(pnum, fate)
   730  	}
   731  	if s.outreset.isSet() {
   732  		// If the stream has been reset, we don't care any more.
   733  		return
   734  	}
   735  	switch fate {
   736  	case packetAcked:
   737  		s.outacked.add(start, end)
   738  		s.outunsent.sub(start, end)
   739  		// If this ack is for data at the start of the send buffer, we can now discard it.
   740  		if s.outacked.contains(s.out.start) {
   741  			s.out.discardBefore(s.outacked[0].end)
   742  		}
   743  	case packetLost:
   744  		// Mark everything lost, but not previously acked, as needing retransmission.
   745  		// We do this by adding all the lost bytes to outunsent, and then
   746  		// removing everything already acked.
   747  		s.outunsent.add(start, end)
   748  		for _, a := range s.outacked {
   749  			s.outunsent.sub(a.start, a.end)
   750  		}
   751  	}
   752  }
   753  
   754  // appendInFramesLocked appends STOP_SENDING and MAX_STREAM_DATA frames
   755  // to the current packet.
   756  //
   757  // It returns true if no more frames need appending,
   758  // false if not everything fit in the current packet.
   759  func (s *Stream) appendInFramesLocked(w *packetWriter, pnum packetNumber, pto bool) bool {
   760  	if s.inclosed.shouldSendPTO(pto) {
   761  		// We don't currently have an API for setting the error code.
   762  		// Just send zero.
   763  		code := uint64(0)
   764  		if !w.appendStopSendingFrame(s.id, code) {
   765  			return false
   766  		}
   767  		s.inclosed.setSent(pnum)
   768  	}
   769  	// TODO: STOP_SENDING
   770  	if s.insendmax.shouldSendPTO(pto) {
   771  		// MAX_STREAM_DATA
   772  		maxStreamData := s.in.start + s.inmaxbuf
   773  		if !w.appendMaxStreamDataFrame(s.id, maxStreamData) {
   774  			return false
   775  		}
   776  		s.inwin = maxStreamData
   777  		s.insendmax.setSent(pnum)
   778  	}
   779  	return true
   780  }
   781  
   782  // appendOutFramesLocked appends RESET_STREAM, STREAM_DATA_BLOCKED, and STREAM frames
   783  // to the current packet.
   784  //
   785  // It returns true if no more frames need appending,
   786  // false if not everything fit in the current packet.
   787  func (s *Stream) appendOutFramesLocked(w *packetWriter, pnum packetNumber, pto bool) bool {
   788  	if s.outreset.isSet() {
   789  		// RESET_STREAM
   790  		if s.outreset.shouldSendPTO(pto) {
   791  			if !w.appendResetStreamFrame(s.id, s.outresetcode, min(s.outwin, s.out.end)) {
   792  				return false
   793  			}
   794  			s.outreset.setSent(pnum)
   795  			s.frameOpensStream(pnum)
   796  		}
   797  		return true
   798  	}
   799  	if s.outblocked.shouldSendPTO(pto) {
   800  		// STREAM_DATA_BLOCKED
   801  		if !w.appendStreamDataBlockedFrame(s.id, s.outwin) {
   802  			return false
   803  		}
   804  		s.outblocked.setSent(pnum)
   805  		s.frameOpensStream(pnum)
   806  	}
   807  	for {
   808  		// STREAM
   809  		off, size := dataToSend(min(s.out.start, s.outwin), min(s.outflushed, s.outwin), s.outunsent, s.outacked, pto)
   810  		if end := off + size; end > s.outmaxsent {
   811  			// This will require connection-level flow control to send.
   812  			end = min(end, s.outmaxsent+s.conn.streams.outflow.avail())
   813  			end = max(end, off)
   814  			size = end - off
   815  		}
   816  		fin := s.outclosed.isSet() && off+size == s.out.end
   817  		shouldSend := size > 0 || // have data to send
   818  			s.outopened.shouldSendPTO(pto) || // should open the stream
   819  			(fin && s.outclosed.shouldSendPTO(pto)) // should close the stream
   820  		if !shouldSend {
   821  			return true
   822  		}
   823  		b, added := w.appendStreamFrame(s.id, off, int(size), fin)
   824  		if !added {
   825  			return false
   826  		}
   827  		s.out.copy(off, b)
   828  		end := off + int64(len(b))
   829  		if end > s.outmaxsent {
   830  			s.conn.streams.outflow.consume(end - s.outmaxsent)
   831  			s.outmaxsent = end
   832  		}
   833  		s.outunsent.sub(off, end)
   834  		s.frameOpensStream(pnum)
   835  		if fin {
   836  			s.outclosed.setSent(pnum)
   837  		}
   838  		if pto {
   839  			return true
   840  		}
   841  		if int64(len(b)) < size {
   842  			return false
   843  		}
   844  	}
   845  }
   846  
   847  // frameOpensStream records that we're sending a frame that will open the stream.
   848  //
   849  // If we don't have an acknowledgement from the peer for a previous frame opening the stream,
   850  // record this packet as being the latest one to open it.
   851  func (s *Stream) frameOpensStream(pnum packetNumber) {
   852  	if !s.outopened.isReceived() {
   853  		s.outopened.setSent(pnum)
   854  	}
   855  }
   856  
   857  // dataToSend returns the next range of data to send in a STREAM or CRYPTO_STREAM.
   858  func dataToSend(start, end int64, outunsent, outacked rangeset[int64], pto bool) (sendStart, size int64) {
   859  	switch {
   860  	case pto:
   861  		// On PTO, resend unacked data that fits in the probe packet.
   862  		// For simplicity, we send the range starting at s.out.start
   863  		// (which is definitely unacked, or else we would have discarded it)
   864  		// up to the next acked byte (if any).
   865  		//
   866  		// This may miss unacked data starting after that acked byte,
   867  		// but avoids resending data the peer has acked.
   868  		for _, r := range outacked {
   869  			if r.start > start {
   870  				return start, r.start - start
   871  			}
   872  		}
   873  		return start, end - start
   874  	case outunsent.numRanges() > 0:
   875  		return outunsent.min(), outunsent[0].size()
   876  	default:
   877  		return end, 0
   878  	}
   879  }
   880  

View as plain text