...

Source file src/golang.org/x/net/internal/quic/conn_flow.go

Documentation: golang.org/x/net/internal/quic

     1  // Copyright 2023 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  //go:build go1.21
     6  
     7  package quic
     8  
     9  import (
    10  	"sync/atomic"
    11  	"time"
    12  )
    13  
    14  // connInflow tracks connection-level flow control for data sent by the peer to us.
    15  //
    16  // There are four byte offsets of significance in the stream of data received from the peer,
    17  // each >= to the previous:
    18  //
    19  //   - bytes read by the user
    20  //   - bytes received from the peer
    21  //   - limit sent to the peer in a MAX_DATA frame
    22  //   - potential new limit to sent to the peer
    23  //
    24  // We maintain a flow control window, so as bytes are read by the user
    25  // the potential limit is extended correspondingly.
    26  //
    27  // We keep an atomic counter of bytes read by the user and not yet applied to the
    28  // potential limit (credit). When this count grows large enough, we update the
    29  // new limit to send and mark that we need to send a new MAX_DATA frame.
    30  type connInflow struct {
    31  	sent      sentVal // set when we need to send a MAX_DATA update to the peer
    32  	usedLimit int64   // total bytes sent by the peer, must be less than sentLimit
    33  	sentLimit int64   // last MAX_DATA sent to the peer
    34  	newLimit  int64   // new MAX_DATA to send
    35  
    36  	credit atomic.Int64 // bytes read but not yet applied to extending the flow-control window
    37  }
    38  
    39  func (c *Conn) inflowInit() {
    40  	// The initial MAX_DATA limit is sent as a transport parameter.
    41  	c.streams.inflow.sentLimit = c.config.maxConnReadBufferSize()
    42  	c.streams.inflow.newLimit = c.streams.inflow.sentLimit
    43  }
    44  
    45  // handleStreamBytesReadOffLoop records that the user has consumed bytes from a stream.
    46  // We may extend the peer's flow control window.
    47  //
    48  // This is called indirectly by the user, via Read or CloseRead.
    49  func (c *Conn) handleStreamBytesReadOffLoop(n int64) {
    50  	if n == 0 {
    51  		return
    52  	}
    53  	if c.shouldUpdateFlowControl(c.streams.inflow.credit.Add(n)) {
    54  		// We should send a MAX_DATA update to the peer.
    55  		// Record this on the Conn's main loop.
    56  		c.sendMsg(func(now time.Time, c *Conn) {
    57  			// A MAX_DATA update may have already happened, so check again.
    58  			if c.shouldUpdateFlowControl(c.streams.inflow.credit.Load()) {
    59  				c.sendMaxDataUpdate()
    60  			}
    61  		})
    62  	}
    63  }
    64  
    65  // handleStreamBytesReadOnLoop extends the peer's flow control window after
    66  // data has been discarded due to a RESET_STREAM frame.
    67  //
    68  // This is called on the conn's loop.
    69  func (c *Conn) handleStreamBytesReadOnLoop(n int64) {
    70  	if c.shouldUpdateFlowControl(c.streams.inflow.credit.Add(n)) {
    71  		c.sendMaxDataUpdate()
    72  	}
    73  }
    74  
    75  func (c *Conn) sendMaxDataUpdate() {
    76  	c.streams.inflow.sent.setUnsent()
    77  	// Apply current credit to the limit.
    78  	// We don't strictly need to do this here
    79  	// since appendMaxDataFrame will do so as well,
    80  	// but this avoids redundant trips down this path
    81  	// if the MAX_DATA frame doesn't go out right away.
    82  	c.streams.inflow.newLimit += c.streams.inflow.credit.Swap(0)
    83  }
    84  
    85  func (c *Conn) shouldUpdateFlowControl(credit int64) bool {
    86  	return shouldUpdateFlowControl(c.config.maxConnReadBufferSize(), credit)
    87  }
    88  
    89  // handleStreamBytesReceived records that the peer has sent us stream data.
    90  func (c *Conn) handleStreamBytesReceived(n int64) error {
    91  	c.streams.inflow.usedLimit += n
    92  	if c.streams.inflow.usedLimit > c.streams.inflow.sentLimit {
    93  		return localTransportError{
    94  			code:   errFlowControl,
    95  			reason: "stream exceeded flow control limit",
    96  		}
    97  	}
    98  	return nil
    99  }
   100  
   101  // appendMaxDataFrame appends a MAX_DATA frame to the current packet.
   102  //
   103  // It returns true if no more frames need appending,
   104  // false if it could not fit a frame in the current packet.
   105  func (c *Conn) appendMaxDataFrame(w *packetWriter, pnum packetNumber, pto bool) bool {
   106  	if c.streams.inflow.sent.shouldSendPTO(pto) {
   107  		// Add any unapplied credit to the new limit now.
   108  		c.streams.inflow.newLimit += c.streams.inflow.credit.Swap(0)
   109  		if !w.appendMaxDataFrame(c.streams.inflow.newLimit) {
   110  			return false
   111  		}
   112  		c.streams.inflow.sentLimit += c.streams.inflow.newLimit
   113  		c.streams.inflow.sent.setSent(pnum)
   114  	}
   115  	return true
   116  }
   117  
   118  // ackOrLossMaxData records the fate of a MAX_DATA frame.
   119  func (c *Conn) ackOrLossMaxData(pnum packetNumber, fate packetFate) {
   120  	c.streams.inflow.sent.ackLatestOrLoss(pnum, fate)
   121  }
   122  
   123  // connOutflow tracks connection-level flow control for data sent by us to the peer.
   124  type connOutflow struct {
   125  	max  int64 // largest MAX_DATA received from peer
   126  	used int64 // total bytes of STREAM data sent to peer
   127  }
   128  
   129  // setMaxData updates the connection-level flow control limit
   130  // with the initial limit conveyed in transport parameters
   131  // or an update from a MAX_DATA frame.
   132  func (f *connOutflow) setMaxData(maxData int64) {
   133  	f.max = max(f.max, maxData)
   134  }
   135  
   136  // avail returns the number of connection-level flow control bytes available.
   137  func (f *connOutflow) avail() int64 {
   138  	return f.max - f.used
   139  }
   140  
   141  // consume records consumption of n bytes of flow.
   142  func (f *connOutflow) consume(n int64) {
   143  	f.used += n
   144  }
   145  

View as plain text