// Copyright 2023 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. //go:build go1.21 package quic import ( "context" "testing" ) func TestConnInflowReturnOnRead(t *testing.T) { ctx := canceledContext() tc, s := newTestConnAndRemoteStream(t, serverSide, uniStream, func(c *Config) { c.MaxConnReadBufferSize = 64 }) tc.writeFrames(packetType1RTT, debugFrameStream{ id: s.id, data: make([]byte, 64), }) const readSize = 8 if n, err := s.ReadContext(ctx, make([]byte, readSize)); n != readSize || err != nil { t.Fatalf("s.Read() = %v, %v; want %v, nil", n, err, readSize) } tc.wantFrame("available window increases, send a MAX_DATA", packetType1RTT, debugFrameMaxData{ max: 64 + readSize, }) if n, err := s.ReadContext(ctx, make([]byte, 64)); n != 64-readSize || err != nil { t.Fatalf("s.Read() = %v, %v; want %v, nil", n, err, 64-readSize) } tc.wantFrame("available window increases, send a MAX_DATA", packetType1RTT, debugFrameMaxData{ max: 128, }) // Peer can write up to the new limit. tc.writeFrames(packetType1RTT, debugFrameStream{ id: s.id, off: 64, data: make([]byte, 64), }) tc.wantIdle("connection is idle") if n, err := s.ReadContext(ctx, make([]byte, 64)); n != 64 || err != nil { t.Fatalf("offset 64: s.Read() = %v, %v; want %v, nil", n, err, 64) } } func TestConnInflowReturnOnRacingReads(t *testing.T) { // Perform two reads at the same time, // one for half of MaxConnReadBufferSize // and one for one byte. // // We should observe a single MAX_DATA update. // Depending on the ordering of events, // this may include the credit from just the larger read // or the credit from both. ctx := canceledContext() tc := newTestConn(t, serverSide, func(c *Config) { c.MaxConnReadBufferSize = 64 }) tc.handshake() tc.ignoreFrame(frameTypeAck) tc.writeFrames(packetType1RTT, debugFrameStream{ id: newStreamID(clientSide, uniStream, 0), data: make([]byte, 32), }) tc.writeFrames(packetType1RTT, debugFrameStream{ id: newStreamID(clientSide, uniStream, 1), data: make([]byte, 32), }) s1, err := tc.conn.AcceptStream(ctx) if err != nil { t.Fatalf("conn.AcceptStream() = %v", err) } s2, err := tc.conn.AcceptStream(ctx) if err != nil { t.Fatalf("conn.AcceptStream() = %v", err) } read1 := runAsync(tc, func(ctx context.Context) (int, error) { return s1.ReadContext(ctx, make([]byte, 16)) }) read2 := runAsync(tc, func(ctx context.Context) (int, error) { return s2.ReadContext(ctx, make([]byte, 1)) }) // This MAX_DATA might extend the window by 16 or 17, depending on // whether the second write occurs before the update happens. tc.wantFrameType("MAX_DATA update is sent", packetType1RTT, debugFrameMaxData{}) tc.wantIdle("redundant MAX_DATA is not sent") if _, err := read1.result(); err != nil { t.Errorf("ReadContext #1 = %v", err) } if _, err := read2.result(); err != nil { t.Errorf("ReadContext #2 = %v", err) } } func TestConnInflowReturnOnClose(t *testing.T) { tc, s := newTestConnAndRemoteStream(t, serverSide, uniStream, func(c *Config) { c.MaxConnReadBufferSize = 64 }) tc.ignoreFrame(frameTypeStopSending) tc.writeFrames(packetType1RTT, debugFrameStream{ id: s.id, data: make([]byte, 64), }) s.CloseRead() tc.wantFrame("closing stream updates connection-level flow control", packetType1RTT, debugFrameMaxData{ max: 128, }) } func TestConnInflowReturnOnReset(t *testing.T) { tc, s := newTestConnAndRemoteStream(t, serverSide, uniStream, func(c *Config) { c.MaxConnReadBufferSize = 64 }) tc.ignoreFrame(frameTypeStopSending) tc.writeFrames(packetType1RTT, debugFrameStream{ id: s.id, data: make([]byte, 32), }) tc.writeFrames(packetType1RTT, debugFrameResetStream{ id: s.id, finalSize: 64, }) s.CloseRead() tc.wantFrame("receiving stream reseet updates connection-level flow control", packetType1RTT, debugFrameMaxData{ max: 128, }) } func TestConnInflowStreamViolation(t *testing.T) { tc := newTestConn(t, serverSide, func(c *Config) { c.MaxConnReadBufferSize = 100 }) tc.handshake() tc.ignoreFrame(frameTypeAck) // Total MAX_DATA consumed: 50 tc.writeFrames(packetType1RTT, debugFrameStream{ id: newStreamID(clientSide, bidiStream, 0), data: make([]byte, 50), }) // Total MAX_DATA consumed: 80 tc.writeFrames(packetType1RTT, debugFrameStream{ id: newStreamID(clientSide, uniStream, 0), off: 20, data: make([]byte, 10), }) // Total MAX_DATA consumed: 100 tc.writeFrames(packetType1RTT, debugFrameStream{ id: newStreamID(clientSide, bidiStream, 0), off: 70, fin: true, }) // This stream has already consumed quota for these bytes. // Total MAX_DATA consumed: 100 tc.writeFrames(packetType1RTT, debugFrameStream{ id: newStreamID(clientSide, uniStream, 0), data: make([]byte, 20), }) tc.wantIdle("peer has consumed all MAX_DATA quota") // Total MAX_DATA consumed: 101 tc.writeFrames(packetType1RTT, debugFrameStream{ id: newStreamID(clientSide, bidiStream, 2), data: make([]byte, 1), }) tc.wantFrame("peer violates MAX_DATA limit", packetType1RTT, debugFrameConnectionCloseTransport{ code: errFlowControl, }) } func TestConnInflowResetViolation(t *testing.T) { tc := newTestConn(t, serverSide, func(c *Config) { c.MaxConnReadBufferSize = 100 }) tc.handshake() tc.ignoreFrame(frameTypeAck) tc.writeFrames(packetType1RTT, debugFrameStream{ id: newStreamID(clientSide, bidiStream, 0), data: make([]byte, 100), }) tc.wantIdle("peer has consumed all MAX_DATA quota") tc.writeFrames(packetType1RTT, debugFrameResetStream{ id: newStreamID(clientSide, uniStream, 0), finalSize: 0, }) tc.wantIdle("stream reset does not consume MAX_DATA quota, no error") tc.writeFrames(packetType1RTT, debugFrameResetStream{ id: newStreamID(clientSide, uniStream, 1), finalSize: 1, }) tc.wantFrame("RESET_STREAM final size violates MAX_DATA limit", packetType1RTT, debugFrameConnectionCloseTransport{ code: errFlowControl, }) } func TestConnInflowMultipleStreams(t *testing.T) { ctx := canceledContext() tc := newTestConn(t, serverSide, func(c *Config) { c.MaxConnReadBufferSize = 128 }) tc.handshake() tc.ignoreFrame(frameTypeAck) var streams []*Stream for _, id := range []streamID{ newStreamID(clientSide, uniStream, 0), newStreamID(clientSide, uniStream, 1), newStreamID(clientSide, bidiStream, 0), newStreamID(clientSide, bidiStream, 1), } { tc.writeFrames(packetType1RTT, debugFrameStream{ id: id, data: make([]byte, 32), }) s, err := tc.conn.AcceptStream(ctx) if err != nil { t.Fatalf("AcceptStream() = %v", err) } streams = append(streams, s) if n, err := s.ReadContext(ctx, make([]byte, 1)); err != nil || n != 1 { t.Fatalf("s.Read() = %v, %v; want 1, nil", n, err) } } tc.wantIdle("streams have read data, but not enough to update MAX_DATA") if n, err := streams[0].ReadContext(ctx, make([]byte, 32)); err != nil || n != 31 { t.Fatalf("s.Read() = %v, %v; want 31, nil", n, err) } tc.wantFrame("read enough data to trigger a MAX_DATA update", packetType1RTT, debugFrameMaxData{ max: 128 + 32 + 1 + 1 + 1, }) tc.ignoreFrame(frameTypeStopSending) streams[2].CloseRead() tc.wantFrame("closed stream triggers another MAX_DATA update", packetType1RTT, debugFrameMaxData{ max: 128 + 32 + 1 + 32 + 1, }) } func TestConnOutflowBlocked(t *testing.T) { tc, s := newTestConnAndLocalStream(t, clientSide, uniStream, permissiveTransportParameters, func(p *transportParameters) { p.initialMaxData = 10 }) tc.ignoreFrame(frameTypeAck) data := makeTestData(32) n, err := s.Write(data) if n != len(data) || err != nil { t.Fatalf("s.Write() = %v, %v; want %v, nil", n, err, len(data)) } s.Flush() tc.wantFrame("stream writes data up to MAX_DATA limit", packetType1RTT, debugFrameStream{ id: s.id, data: data[:10], }) tc.wantIdle("stream is blocked by MAX_DATA limit") tc.writeFrames(packetType1RTT, debugFrameMaxData{ max: 20, }) tc.wantFrame("stream writes data up to new MAX_DATA limit", packetType1RTT, debugFrameStream{ id: s.id, off: 10, data: data[10:20], }) tc.wantIdle("stream is blocked by new MAX_DATA limit") tc.writeFrames(packetType1RTT, debugFrameMaxData{ max: 100, }) tc.wantFrame("stream writes remaining data", packetType1RTT, debugFrameStream{ id: s.id, off: 20, data: data[20:], }) } func TestConnOutflowMaxDataDecreases(t *testing.T) { tc, s := newTestConnAndLocalStream(t, clientSide, uniStream, permissiveTransportParameters, func(p *transportParameters) { p.initialMaxData = 10 }) tc.ignoreFrame(frameTypeAck) // Decrease in MAX_DATA is ignored. tc.writeFrames(packetType1RTT, debugFrameMaxData{ max: 5, }) data := makeTestData(32) n, err := s.Write(data) if n != len(data) || err != nil { t.Fatalf("s.Write() = %v, %v; want %v, nil", n, err, len(data)) } s.Flush() tc.wantFrame("stream writes data up to MAX_DATA limit", packetType1RTT, debugFrameStream{ id: s.id, data: data[:10], }) } func TestConnOutflowMaxDataRoundRobin(t *testing.T) { ctx := canceledContext() tc := newTestConn(t, clientSide, permissiveTransportParameters, func(p *transportParameters) { p.initialMaxData = 0 }) tc.handshake() tc.ignoreFrame(frameTypeAck) s1, err := tc.conn.newLocalStream(ctx, uniStream) if err != nil { t.Fatalf("conn.newLocalStream(%v) = %v", uniStream, err) } s2, err := tc.conn.newLocalStream(ctx, uniStream) if err != nil { t.Fatalf("conn.newLocalStream(%v) = %v", uniStream, err) } s1.Write(make([]byte, 10)) s1.Flush() s2.Write(make([]byte, 10)) s2.Flush() tc.writeFrames(packetType1RTT, debugFrameMaxData{ max: 1, }) tc.wantFrame("stream 1 writes data up to MAX_DATA limit", packetType1RTT, debugFrameStream{ id: s1.id, data: []byte{0}, }) tc.writeFrames(packetType1RTT, debugFrameMaxData{ max: 2, }) tc.wantFrame("stream 2 writes data up to MAX_DATA limit", packetType1RTT, debugFrameStream{ id: s2.id, data: []byte{0}, }) tc.writeFrames(packetType1RTT, debugFrameMaxData{ max: 3, }) tc.wantFrame("stream 1 writes data up to MAX_DATA limit", packetType1RTT, debugFrameStream{ id: s1.id, off: 1, data: []byte{0}, }) } func TestConnOutflowMetaAndData(t *testing.T) { tc, s := newTestConnAndLocalStream(t, clientSide, bidiStream, permissiveTransportParameters, func(p *transportParameters) { p.initialMaxData = 0 }) tc.ignoreFrame(frameTypeAck) data := makeTestData(32) s.Write(data) s.Flush() s.CloseRead() tc.wantFrame("CloseRead sends a STOP_SENDING, not flow controlled", packetType1RTT, debugFrameStopSending{ id: s.id, }) tc.writeFrames(packetType1RTT, debugFrameMaxData{ max: 100, }) tc.wantFrame("unblocked MAX_DATA", packetType1RTT, debugFrameStream{ id: s.id, data: data, }) } func TestConnOutflowResentData(t *testing.T) { tc, s := newTestConnAndLocalStream(t, clientSide, bidiStream, permissiveTransportParameters, func(p *transportParameters) { p.initialMaxData = 10 }) tc.ignoreFrame(frameTypeAck) data := makeTestData(15) s.Write(data[:8]) s.Flush() tc.wantFrame("data is under MAX_DATA limit, all sent", packetType1RTT, debugFrameStream{ id: s.id, data: data[:8], }) // Lose the last STREAM packet. const pto = false tc.triggerLossOrPTO(packetType1RTT, false) tc.wantFrame("lost STREAM data is retransmitted", packetType1RTT, debugFrameStream{ id: s.id, data: data[:8], }) s.Write(data[8:]) s.Flush() tc.wantFrame("new data is sent up to the MAX_DATA limit", packetType1RTT, debugFrameStream{ id: s.id, off: 8, data: data[8:10], }) }