1 // Copyright 2023 The Go Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style 3 // license that can be found in the LICENSE file. 4 5 //go:build go1.21 6 7 package quic 8 9 import ( 10 "sync/atomic" 11 "time" 12 ) 13 14 // connInflow tracks connection-level flow control for data sent by the peer to us. 15 // 16 // There are four byte offsets of significance in the stream of data received from the peer, 17 // each >= to the previous: 18 // 19 // - bytes read by the user 20 // - bytes received from the peer 21 // - limit sent to the peer in a MAX_DATA frame 22 // - potential new limit to sent to the peer 23 // 24 // We maintain a flow control window, so as bytes are read by the user 25 // the potential limit is extended correspondingly. 26 // 27 // We keep an atomic counter of bytes read by the user and not yet applied to the 28 // potential limit (credit). When this count grows large enough, we update the 29 // new limit to send and mark that we need to send a new MAX_DATA frame. 30 type connInflow struct { 31 sent sentVal // set when we need to send a MAX_DATA update to the peer 32 usedLimit int64 // total bytes sent by the peer, must be less than sentLimit 33 sentLimit int64 // last MAX_DATA sent to the peer 34 newLimit int64 // new MAX_DATA to send 35 36 credit atomic.Int64 // bytes read but not yet applied to extending the flow-control window 37 } 38 39 func (c *Conn) inflowInit() { 40 // The initial MAX_DATA limit is sent as a transport parameter. 41 c.streams.inflow.sentLimit = c.config.maxConnReadBufferSize() 42 c.streams.inflow.newLimit = c.streams.inflow.sentLimit 43 } 44 45 // handleStreamBytesReadOffLoop records that the user has consumed bytes from a stream. 46 // We may extend the peer's flow control window. 47 // 48 // This is called indirectly by the user, via Read or CloseRead. 49 func (c *Conn) handleStreamBytesReadOffLoop(n int64) { 50 if n == 0 { 51 return 52 } 53 if c.shouldUpdateFlowControl(c.streams.inflow.credit.Add(n)) { 54 // We should send a MAX_DATA update to the peer. 55 // Record this on the Conn's main loop. 56 c.sendMsg(func(now time.Time, c *Conn) { 57 // A MAX_DATA update may have already happened, so check again. 58 if c.shouldUpdateFlowControl(c.streams.inflow.credit.Load()) { 59 c.sendMaxDataUpdate() 60 } 61 }) 62 } 63 } 64 65 // handleStreamBytesReadOnLoop extends the peer's flow control window after 66 // data has been discarded due to a RESET_STREAM frame. 67 // 68 // This is called on the conn's loop. 69 func (c *Conn) handleStreamBytesReadOnLoop(n int64) { 70 if c.shouldUpdateFlowControl(c.streams.inflow.credit.Add(n)) { 71 c.sendMaxDataUpdate() 72 } 73 } 74 75 func (c *Conn) sendMaxDataUpdate() { 76 c.streams.inflow.sent.setUnsent() 77 // Apply current credit to the limit. 78 // We don't strictly need to do this here 79 // since appendMaxDataFrame will do so as well, 80 // but this avoids redundant trips down this path 81 // if the MAX_DATA frame doesn't go out right away. 82 c.streams.inflow.newLimit += c.streams.inflow.credit.Swap(0) 83 } 84 85 func (c *Conn) shouldUpdateFlowControl(credit int64) bool { 86 return shouldUpdateFlowControl(c.config.maxConnReadBufferSize(), credit) 87 } 88 89 // handleStreamBytesReceived records that the peer has sent us stream data. 90 func (c *Conn) handleStreamBytesReceived(n int64) error { 91 c.streams.inflow.usedLimit += n 92 if c.streams.inflow.usedLimit > c.streams.inflow.sentLimit { 93 return localTransportError{ 94 code: errFlowControl, 95 reason: "stream exceeded flow control limit", 96 } 97 } 98 return nil 99 } 100 101 // appendMaxDataFrame appends a MAX_DATA frame to the current packet. 102 // 103 // It returns true if no more frames need appending, 104 // false if it could not fit a frame in the current packet. 105 func (c *Conn) appendMaxDataFrame(w *packetWriter, pnum packetNumber, pto bool) bool { 106 if c.streams.inflow.sent.shouldSendPTO(pto) { 107 // Add any unapplied credit to the new limit now. 108 c.streams.inflow.newLimit += c.streams.inflow.credit.Swap(0) 109 if !w.appendMaxDataFrame(c.streams.inflow.newLimit) { 110 return false 111 } 112 c.streams.inflow.sentLimit += c.streams.inflow.newLimit 113 c.streams.inflow.sent.setSent(pnum) 114 } 115 return true 116 } 117 118 // ackOrLossMaxData records the fate of a MAX_DATA frame. 119 func (c *Conn) ackOrLossMaxData(pnum packetNumber, fate packetFate) { 120 c.streams.inflow.sent.ackLatestOrLoss(pnum, fate) 121 } 122 123 // connOutflow tracks connection-level flow control for data sent by us to the peer. 124 type connOutflow struct { 125 max int64 // largest MAX_DATA received from peer 126 used int64 // total bytes of STREAM data sent to peer 127 } 128 129 // setMaxData updates the connection-level flow control limit 130 // with the initial limit conveyed in transport parameters 131 // or an update from a MAX_DATA frame. 132 func (f *connOutflow) setMaxData(maxData int64) { 133 f.max = max(f.max, maxData) 134 } 135 136 // avail returns the number of connection-level flow control bytes available. 137 func (f *connOutflow) avail() int64 { 138 return f.max - f.used 139 } 140 141 // consume records consumption of n bytes of flow. 142 func (f *connOutflow) consume(n int64) { 143 f.used += n 144 } 145