// Copyright 2023 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. //go:build go1.21 package quic import ( "sync/atomic" "time" ) // connInflow tracks connection-level flow control for data sent by the peer to us. // // There are four byte offsets of significance in the stream of data received from the peer, // each >= to the previous: // // - bytes read by the user // - bytes received from the peer // - limit sent to the peer in a MAX_DATA frame // - potential new limit to sent to the peer // // We maintain a flow control window, so as bytes are read by the user // the potential limit is extended correspondingly. // // We keep an atomic counter of bytes read by the user and not yet applied to the // potential limit (credit). When this count grows large enough, we update the // new limit to send and mark that we need to send a new MAX_DATA frame. type connInflow struct { sent sentVal // set when we need to send a MAX_DATA update to the peer usedLimit int64 // total bytes sent by the peer, must be less than sentLimit sentLimit int64 // last MAX_DATA sent to the peer newLimit int64 // new MAX_DATA to send credit atomic.Int64 // bytes read but not yet applied to extending the flow-control window } func (c *Conn) inflowInit() { // The initial MAX_DATA limit is sent as a transport parameter. c.streams.inflow.sentLimit = c.config.maxConnReadBufferSize() c.streams.inflow.newLimit = c.streams.inflow.sentLimit } // handleStreamBytesReadOffLoop records that the user has consumed bytes from a stream. // We may extend the peer's flow control window. // // This is called indirectly by the user, via Read or CloseRead. func (c *Conn) handleStreamBytesReadOffLoop(n int64) { if n == 0 { return } if c.shouldUpdateFlowControl(c.streams.inflow.credit.Add(n)) { // We should send a MAX_DATA update to the peer. // Record this on the Conn's main loop. c.sendMsg(func(now time.Time, c *Conn) { // A MAX_DATA update may have already happened, so check again. if c.shouldUpdateFlowControl(c.streams.inflow.credit.Load()) { c.sendMaxDataUpdate() } }) } } // handleStreamBytesReadOnLoop extends the peer's flow control window after // data has been discarded due to a RESET_STREAM frame. // // This is called on the conn's loop. func (c *Conn) handleStreamBytesReadOnLoop(n int64) { if c.shouldUpdateFlowControl(c.streams.inflow.credit.Add(n)) { c.sendMaxDataUpdate() } } func (c *Conn) sendMaxDataUpdate() { c.streams.inflow.sent.setUnsent() // Apply current credit to the limit. // We don't strictly need to do this here // since appendMaxDataFrame will do so as well, // but this avoids redundant trips down this path // if the MAX_DATA frame doesn't go out right away. c.streams.inflow.newLimit += c.streams.inflow.credit.Swap(0) } func (c *Conn) shouldUpdateFlowControl(credit int64) bool { return shouldUpdateFlowControl(c.config.maxConnReadBufferSize(), credit) } // handleStreamBytesReceived records that the peer has sent us stream data. func (c *Conn) handleStreamBytesReceived(n int64) error { c.streams.inflow.usedLimit += n if c.streams.inflow.usedLimit > c.streams.inflow.sentLimit { return localTransportError{ code: errFlowControl, reason: "stream exceeded flow control limit", } } return nil } // appendMaxDataFrame appends a MAX_DATA frame to the current packet. // // It returns true if no more frames need appending, // false if it could not fit a frame in the current packet. func (c *Conn) appendMaxDataFrame(w *packetWriter, pnum packetNumber, pto bool) bool { if c.streams.inflow.sent.shouldSendPTO(pto) { // Add any unapplied credit to the new limit now. c.streams.inflow.newLimit += c.streams.inflow.credit.Swap(0) if !w.appendMaxDataFrame(c.streams.inflow.newLimit) { return false } c.streams.inflow.sentLimit += c.streams.inflow.newLimit c.streams.inflow.sent.setSent(pnum) } return true } // ackOrLossMaxData records the fate of a MAX_DATA frame. func (c *Conn) ackOrLossMaxData(pnum packetNumber, fate packetFate) { c.streams.inflow.sent.ackLatestOrLoss(pnum, fate) } // connOutflow tracks connection-level flow control for data sent by us to the peer. type connOutflow struct { max int64 // largest MAX_DATA received from peer used int64 // total bytes of STREAM data sent to peer } // setMaxData updates the connection-level flow control limit // with the initial limit conveyed in transport parameters // or an update from a MAX_DATA frame. func (f *connOutflow) setMaxData(maxData int64) { f.max = max(f.max, maxData) } // avail returns the number of connection-level flow control bytes available. func (f *connOutflow) avail() int64 { return f.max - f.used } // consume records consumption of n bytes of flow. func (f *connOutflow) consume(n int64) { f.used += n }