...

Source file src/golang.org/x/net/internal/quic/stream_test.go

Documentation: golang.org/x/net/internal/quic

     1  // Copyright 2023 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  //go:build go1.21
     6  
     7  package quic
     8  
     9  import (
    10  	"bytes"
    11  	"context"
    12  	"crypto/rand"
    13  	"errors"
    14  	"fmt"
    15  	"io"
    16  	"strings"
    17  	"testing"
    18  )
    19  
    20  func TestStreamWriteBlockedByOutputBuffer(t *testing.T) {
    21  	testStreamTypes(t, "", func(t *testing.T, styp streamType) {
    22  		ctx := canceledContext()
    23  		want := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
    24  		const writeBufferSize = 4
    25  		tc := newTestConn(t, clientSide, permissiveTransportParameters, func(c *Config) {
    26  			c.MaxStreamWriteBufferSize = writeBufferSize
    27  		})
    28  		tc.handshake()
    29  		tc.ignoreFrame(frameTypeAck)
    30  
    31  		s, err := tc.conn.newLocalStream(ctx, styp)
    32  		if err != nil {
    33  			t.Fatal(err)
    34  		}
    35  
    36  		// Non-blocking write.
    37  		n, err := s.WriteContext(ctx, want)
    38  		if n != writeBufferSize || err != context.Canceled {
    39  			t.Fatalf("s.WriteContext() = %v, %v; want %v, context.Canceled", n, err, writeBufferSize)
    40  		}
    41  		s.Flush()
    42  		tc.wantFrame("first write buffer of data sent",
    43  			packetType1RTT, debugFrameStream{
    44  				id:   s.id,
    45  				data: want[:writeBufferSize],
    46  			})
    47  		off := int64(writeBufferSize)
    48  
    49  		// Blocking write, which must wait for buffer space.
    50  		w := runAsync(tc, func(ctx context.Context) (int, error) {
    51  			n, err := s.WriteContext(ctx, want[writeBufferSize:])
    52  			s.Flush()
    53  			return n, err
    54  		})
    55  		tc.wantIdle("write buffer is full, no more data can be sent")
    56  
    57  		// The peer's ack of the STREAM frame allows progress.
    58  		tc.writeAckForAll()
    59  		tc.wantFrame("second write buffer of data sent",
    60  			packetType1RTT, debugFrameStream{
    61  				id:   s.id,
    62  				off:  off,
    63  				data: want[off:][:writeBufferSize],
    64  			})
    65  		off += writeBufferSize
    66  		tc.wantIdle("write buffer is full, no more data can be sent")
    67  
    68  		// The peer's ack of the second STREAM frame allows sending the remaining data.
    69  		tc.writeAckForAll()
    70  		tc.wantFrame("remaining data sent",
    71  			packetType1RTT, debugFrameStream{
    72  				id:   s.id,
    73  				off:  off,
    74  				data: want[off:],
    75  			})
    76  
    77  		if n, err := w.result(); n != len(want)-writeBufferSize || err != nil {
    78  			t.Fatalf("s.WriteContext() = %v, %v; want %v, nil",
    79  				len(want)-writeBufferSize, err, writeBufferSize)
    80  		}
    81  	})
    82  }
    83  
    84  func TestStreamWriteBlockedByStreamFlowControl(t *testing.T) {
    85  	testStreamTypes(t, "", func(t *testing.T, styp streamType) {
    86  		ctx := canceledContext()
    87  		want := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
    88  		tc := newTestConn(t, clientSide, func(p *transportParameters) {
    89  			p.initialMaxStreamsBidi = 100
    90  			p.initialMaxStreamsUni = 100
    91  			p.initialMaxData = 1 << 20
    92  		})
    93  		tc.handshake()
    94  		tc.ignoreFrame(frameTypeAck)
    95  
    96  		s, err := tc.conn.newLocalStream(ctx, styp)
    97  		if err != nil {
    98  			t.Fatal(err)
    99  		}
   100  
   101  		// Data is written to the stream output buffer, but we have no flow control.
   102  		_, err = s.WriteContext(ctx, want[:1])
   103  		if err != nil {
   104  			t.Fatalf("write with available output buffer: unexpected error: %v", err)
   105  		}
   106  		tc.wantFrame("write blocked by flow control triggers a STREAM_DATA_BLOCKED frame",
   107  			packetType1RTT, debugFrameStreamDataBlocked{
   108  				id:  s.id,
   109  				max: 0,
   110  			})
   111  
   112  		// Write more data.
   113  		_, err = s.WriteContext(ctx, want[1:])
   114  		if err != nil {
   115  			t.Fatalf("write with available output buffer: unexpected error: %v", err)
   116  		}
   117  		tc.wantIdle("adding more blocked data does not trigger another STREAM_DATA_BLOCKED")
   118  
   119  		// Provide some flow control window.
   120  		tc.writeFrames(packetType1RTT, debugFrameMaxStreamData{
   121  			id:  s.id,
   122  			max: 4,
   123  		})
   124  		tc.wantFrame("stream window extended, but still more data to write",
   125  			packetType1RTT, debugFrameStreamDataBlocked{
   126  				id:  s.id,
   127  				max: 4,
   128  			})
   129  		tc.wantFrame("stream window extended to 4, expect blocked write to progress",
   130  			packetType1RTT, debugFrameStream{
   131  				id:   s.id,
   132  				data: want[:4],
   133  			})
   134  
   135  		// Provide more flow control window.
   136  		tc.writeFrames(packetType1RTT, debugFrameMaxStreamData{
   137  			id:  s.id,
   138  			max: int64(len(want)),
   139  		})
   140  		tc.wantFrame("stream window extended further, expect blocked write to finish",
   141  			packetType1RTT, debugFrameStream{
   142  				id:   s.id,
   143  				off:  4,
   144  				data: want[4:],
   145  			})
   146  	})
   147  }
   148  
   149  func TestStreamIgnoresMaxStreamDataReduction(t *testing.T) {
   150  	// "A sender MUST ignore any MAX_STREAM_DATA [...] frames that
   151  	// do not increase flow control limits."
   152  	// https://www.rfc-editor.org/rfc/rfc9000#section-4.1-9
   153  	testStreamTypes(t, "", func(t *testing.T, styp streamType) {
   154  		ctx := canceledContext()
   155  		want := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
   156  		tc := newTestConn(t, clientSide, func(p *transportParameters) {
   157  			if styp == uniStream {
   158  				p.initialMaxStreamsUni = 1
   159  				p.initialMaxStreamDataUni = 4
   160  			} else {
   161  				p.initialMaxStreamsBidi = 1
   162  				p.initialMaxStreamDataBidiRemote = 4
   163  			}
   164  			p.initialMaxData = 1 << 20
   165  		})
   166  		tc.handshake()
   167  		tc.ignoreFrame(frameTypeAck)
   168  		tc.ignoreFrame(frameTypeStreamDataBlocked)
   169  
   170  		// Write [0,1).
   171  		s, err := tc.conn.newLocalStream(ctx, styp)
   172  		if err != nil {
   173  			t.Fatal(err)
   174  		}
   175  		s.WriteContext(ctx, want[:1])
   176  		s.Flush()
   177  		tc.wantFrame("sent data (1 byte) fits within flow control limit",
   178  			packetType1RTT, debugFrameStream{
   179  				id:   s.id,
   180  				off:  0,
   181  				data: want[:1],
   182  			})
   183  
   184  		// MAX_STREAM_DATA tries to decrease limit, and is ignored.
   185  		tc.writeFrames(packetType1RTT, debugFrameMaxStreamData{
   186  			id:  s.id,
   187  			max: 2,
   188  		})
   189  
   190  		// Write [1,4).
   191  		s.WriteContext(ctx, want[1:])
   192  		tc.wantFrame("stream limit is 4 bytes, ignoring decrease in MAX_STREAM_DATA",
   193  			packetType1RTT, debugFrameStream{
   194  				id:   s.id,
   195  				off:  1,
   196  				data: want[1:4],
   197  			})
   198  
   199  		// MAX_STREAM_DATA increases limit.
   200  		// Second MAX_STREAM_DATA decreases it, and is ignored.
   201  		tc.writeFrames(packetType1RTT, debugFrameMaxStreamData{
   202  			id:  s.id,
   203  			max: 8,
   204  		})
   205  		tc.writeFrames(packetType1RTT, debugFrameMaxStreamData{
   206  			id:  s.id,
   207  			max: 6,
   208  		})
   209  
   210  		// Write [1,4).
   211  		s.WriteContext(ctx, want[4:])
   212  		tc.wantFrame("stream limit is 8 bytes, ignoring decrease in MAX_STREAM_DATA",
   213  			packetType1RTT, debugFrameStream{
   214  				id:   s.id,
   215  				off:  4,
   216  				data: want[4:8],
   217  			})
   218  	})
   219  }
   220  
   221  func TestStreamWriteBlockedByWriteBufferLimit(t *testing.T) {
   222  	testStreamTypes(t, "", func(t *testing.T, styp streamType) {
   223  		ctx := canceledContext()
   224  		want := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
   225  		const maxWriteBuffer = 4
   226  		tc := newTestConn(t, clientSide, func(p *transportParameters) {
   227  			p.initialMaxStreamsBidi = 100
   228  			p.initialMaxStreamsUni = 100
   229  			p.initialMaxData = 1 << 20
   230  			p.initialMaxStreamDataBidiRemote = 1 << 20
   231  			p.initialMaxStreamDataUni = 1 << 20
   232  		}, func(c *Config) {
   233  			c.MaxStreamWriteBufferSize = maxWriteBuffer
   234  		})
   235  		tc.handshake()
   236  		tc.ignoreFrame(frameTypeAck)
   237  
   238  		// Write more data than StreamWriteBufferSize.
   239  		// The peer has given us plenty of flow control,
   240  		// so we're just blocked by our local limit.
   241  		s, err := tc.conn.newLocalStream(ctx, styp)
   242  		if err != nil {
   243  			t.Fatal(err)
   244  		}
   245  		w := runAsync(tc, func(ctx context.Context) (int, error) {
   246  			return s.WriteContext(ctx, want)
   247  		})
   248  		tc.wantFrame("stream write should send as much data as write buffer allows",
   249  			packetType1RTT, debugFrameStream{
   250  				id:   s.id,
   251  				off:  0,
   252  				data: want[:maxWriteBuffer],
   253  			})
   254  		tc.wantIdle("no STREAM_DATA_BLOCKED, we're blocked locally not by flow control")
   255  
   256  		// ACK for previously-sent data allows making more progress.
   257  		tc.writeAckForAll()
   258  		tc.wantFrame("ACK for previous data allows making progress",
   259  			packetType1RTT, debugFrameStream{
   260  				id:   s.id,
   261  				off:  maxWriteBuffer,
   262  				data: want[maxWriteBuffer:][:maxWriteBuffer],
   263  			})
   264  
   265  		// Cancel the write with data left to send.
   266  		w.cancel()
   267  		n, err := w.result()
   268  		if n != 2*maxWriteBuffer || err == nil {
   269  			t.Fatalf("WriteContext() = %v, %v; want %v bytes, error", n, err, 2*maxWriteBuffer)
   270  		}
   271  	})
   272  }
   273  
   274  func TestStreamReceive(t *testing.T) {
   275  	// "Endpoints MUST be able to deliver stream data to an application as
   276  	// an ordered byte stream."
   277  	// https://www.rfc-editor.org/rfc/rfc9000#section-2.2-2
   278  	want := make([]byte, 5000)
   279  	for i := range want {
   280  		want[i] = byte(i)
   281  	}
   282  	type frame struct {
   283  		start   int64
   284  		end     int64
   285  		fin     bool
   286  		want    int
   287  		wantEOF bool
   288  	}
   289  	for _, test := range []struct {
   290  		name   string
   291  		frames []frame
   292  	}{{
   293  		name: "linear",
   294  		frames: []frame{{
   295  			start: 0,
   296  			end:   1000,
   297  			want:  1000,
   298  		}, {
   299  			start: 1000,
   300  			end:   2000,
   301  			want:  2000,
   302  		}, {
   303  			start:   2000,
   304  			end:     3000,
   305  			want:    3000,
   306  			fin:     true,
   307  			wantEOF: true,
   308  		}},
   309  	}, {
   310  		name: "out of order",
   311  		frames: []frame{{
   312  			start: 1000,
   313  			end:   2000,
   314  		}, {
   315  			start: 2000,
   316  			end:   3000,
   317  		}, {
   318  			start: 0,
   319  			end:   1000,
   320  			want:  3000,
   321  		}},
   322  	}, {
   323  		name: "resent",
   324  		frames: []frame{{
   325  			start: 0,
   326  			end:   1000,
   327  			want:  1000,
   328  		}, {
   329  			start: 0,
   330  			end:   1000,
   331  			want:  1000,
   332  		}, {
   333  			start: 1000,
   334  			end:   2000,
   335  			want:  2000,
   336  		}, {
   337  			start: 0,
   338  			end:   1000,
   339  			want:  2000,
   340  		}, {
   341  			start: 1000,
   342  			end:   2000,
   343  			want:  2000,
   344  		}},
   345  	}, {
   346  		name: "overlapping",
   347  		frames: []frame{{
   348  			start: 0,
   349  			end:   1000,
   350  			want:  1000,
   351  		}, {
   352  			start: 3000,
   353  			end:   4000,
   354  			want:  1000,
   355  		}, {
   356  			start: 2000,
   357  			end:   3000,
   358  			want:  1000,
   359  		}, {
   360  			start: 1000,
   361  			end:   3000,
   362  			want:  4000,
   363  		}},
   364  	}, {
   365  		name: "early eof",
   366  		frames: []frame{{
   367  			start: 3000,
   368  			end:   3000,
   369  			fin:   true,
   370  			want:  0,
   371  		}, {
   372  			start: 1000,
   373  			end:   2000,
   374  			want:  0,
   375  		}, {
   376  			start: 0,
   377  			end:   1000,
   378  			want:  2000,
   379  		}, {
   380  			start:   2000,
   381  			end:     3000,
   382  			want:    3000,
   383  			wantEOF: true,
   384  		}},
   385  	}, {
   386  		name: "empty eof",
   387  		frames: []frame{{
   388  			start: 0,
   389  			end:   1000,
   390  			want:  1000,
   391  		}, {
   392  			start:   1000,
   393  			end:     1000,
   394  			fin:     true,
   395  			want:    1000,
   396  			wantEOF: true,
   397  		}},
   398  	}} {
   399  		testStreamTypes(t, test.name, func(t *testing.T, styp streamType) {
   400  			ctx := canceledContext()
   401  			tc := newTestConn(t, serverSide)
   402  			tc.handshake()
   403  			sid := newStreamID(clientSide, styp, 0)
   404  			var s *Stream
   405  			got := make([]byte, len(want))
   406  			var total int
   407  			for _, f := range test.frames {
   408  				t.Logf("receive [%v,%v)", f.start, f.end)
   409  				tc.writeFrames(packetType1RTT, debugFrameStream{
   410  					id:   sid,
   411  					off:  f.start,
   412  					data: want[f.start:f.end],
   413  					fin:  f.fin,
   414  				})
   415  				if s == nil {
   416  					var err error
   417  					s, err = tc.conn.AcceptStream(ctx)
   418  					if err != nil {
   419  						tc.t.Fatalf("conn.AcceptStream() = %v", err)
   420  					}
   421  				}
   422  				for {
   423  					n, err := s.ReadContext(ctx, got[total:])
   424  					t.Logf("s.ReadContext() = %v, %v", n, err)
   425  					total += n
   426  					if f.wantEOF && err != io.EOF {
   427  						t.Fatalf("ReadContext() error = %v; want io.EOF", err)
   428  					}
   429  					if !f.wantEOF && err == io.EOF {
   430  						t.Fatalf("ReadContext() error = io.EOF, want something else")
   431  					}
   432  					if err != nil {
   433  						break
   434  					}
   435  				}
   436  				if total != f.want {
   437  					t.Fatalf("total bytes read = %v, want %v", total, f.want)
   438  				}
   439  				for i := 0; i < total; i++ {
   440  					if got[i] != want[i] {
   441  						t.Fatalf("byte %v differs: got %v, want %v", i, got[i], want[i])
   442  					}
   443  				}
   444  			}
   445  		})
   446  	}
   447  
   448  }
   449  
   450  func TestStreamReceiveExtendsStreamWindow(t *testing.T) {
   451  	testStreamTypes(t, "", func(t *testing.T, styp streamType) {
   452  		const maxWindowSize = 20
   453  		ctx := canceledContext()
   454  		tc := newTestConn(t, serverSide, func(c *Config) {
   455  			c.MaxStreamReadBufferSize = maxWindowSize
   456  		})
   457  		tc.handshake()
   458  		tc.ignoreFrame(frameTypeAck)
   459  		sid := newStreamID(clientSide, styp, 0)
   460  		tc.writeFrames(packetType1RTT, debugFrameStream{
   461  			id:   sid,
   462  			off:  0,
   463  			data: make([]byte, maxWindowSize),
   464  		})
   465  		s, err := tc.conn.AcceptStream(ctx)
   466  		if err != nil {
   467  			t.Fatalf("AcceptStream: %v", err)
   468  		}
   469  		tc.wantIdle("stream window is not extended before data is read")
   470  		buf := make([]byte, maxWindowSize+1)
   471  		if n, err := s.ReadContext(ctx, buf); n != maxWindowSize || err != nil {
   472  			t.Fatalf("s.ReadContext() = %v, %v; want %v, nil", n, err, maxWindowSize)
   473  		}
   474  		tc.wantFrame("stream window is extended after reading data",
   475  			packetType1RTT, debugFrameMaxStreamData{
   476  				id:  sid,
   477  				max: maxWindowSize * 2,
   478  			})
   479  		tc.writeFrames(packetType1RTT, debugFrameStream{
   480  			id:   sid,
   481  			off:  maxWindowSize,
   482  			data: make([]byte, maxWindowSize),
   483  			fin:  true,
   484  		})
   485  		if n, err := s.ReadContext(ctx, buf); n != maxWindowSize || err != io.EOF {
   486  			t.Fatalf("s.ReadContext() = %v, %v; want %v, io.EOF", n, err, maxWindowSize)
   487  		}
   488  		tc.wantIdle("stream window is not extended after FIN")
   489  	})
   490  }
   491  
   492  func TestStreamReceiveViolatesStreamDataLimit(t *testing.T) {
   493  	// "A receiver MUST close the connection with an error of type FLOW_CONTROL_ERROR if
   494  	// the sender violates the advertised [...] stream data limits [...]"
   495  	// https://www.rfc-editor.org/rfc/rfc9000#section-4.1-8
   496  	testStreamTypes(t, "", func(t *testing.T, styp streamType) {
   497  		const maxStreamData = 10
   498  		for _, test := range []struct {
   499  			off  int64
   500  			size int64
   501  		}{{
   502  			off:  maxStreamData,
   503  			size: 1,
   504  		}, {
   505  			off:  0,
   506  			size: maxStreamData + 1,
   507  		}, {
   508  			off:  maxStreamData - 1,
   509  			size: 2,
   510  		}} {
   511  			tc := newTestConn(t, serverSide, func(c *Config) {
   512  				c.MaxStreamReadBufferSize = maxStreamData
   513  			})
   514  			tc.handshake()
   515  			tc.ignoreFrame(frameTypeAck)
   516  			tc.writeFrames(packetType1RTT, debugFrameStream{
   517  				id:   newStreamID(clientSide, styp, 0),
   518  				off:  test.off,
   519  				data: make([]byte, test.size),
   520  			})
   521  			tc.wantFrame(
   522  				fmt.Sprintf("data [%v,%v) violates stream data limit and closes connection",
   523  					test.off, test.off+test.size),
   524  				packetType1RTT, debugFrameConnectionCloseTransport{
   525  					code: errFlowControl,
   526  				},
   527  			)
   528  		}
   529  	})
   530  }
   531  
   532  func TestStreamReceiveDuplicateDataDoesNotViolateLimits(t *testing.T) {
   533  	testStreamTypes(t, "", func(t *testing.T, styp streamType) {
   534  		const maxData = 10
   535  		tc := newTestConn(t, serverSide, func(c *Config) {
   536  			// TODO: Add connection-level maximum data here as well.
   537  			c.MaxStreamReadBufferSize = maxData
   538  		})
   539  		tc.handshake()
   540  		tc.ignoreFrame(frameTypeAck)
   541  		for i := 0; i < 3; i++ {
   542  			tc.writeFrames(packetType1RTT, debugFrameStream{
   543  				id:   newStreamID(clientSide, styp, 0),
   544  				off:  0,
   545  				data: make([]byte, maxData),
   546  			})
   547  			tc.wantIdle(fmt.Sprintf("conn sends no frames after receiving data frame %v", i))
   548  		}
   549  	})
   550  }
   551  
   552  func finalSizeTest(t *testing.T, wantErr transportError, f func(tc *testConn, sid streamID) (finalSize int64), opts ...any) {
   553  	testStreamTypes(t, "", func(t *testing.T, styp streamType) {
   554  		for _, test := range []struct {
   555  			name       string
   556  			finalFrame func(tc *testConn, sid streamID, finalSize int64)
   557  		}{{
   558  			name: "FIN",
   559  			finalFrame: func(tc *testConn, sid streamID, finalSize int64) {
   560  				tc.writeFrames(packetType1RTT, debugFrameStream{
   561  					id:  sid,
   562  					off: finalSize,
   563  					fin: true,
   564  				})
   565  			},
   566  		}, {
   567  			name: "RESET_STREAM",
   568  			finalFrame: func(tc *testConn, sid streamID, finalSize int64) {
   569  				tc.writeFrames(packetType1RTT, debugFrameResetStream{
   570  					id:        sid,
   571  					finalSize: finalSize,
   572  				})
   573  			},
   574  		}} {
   575  			t.Run(test.name, func(t *testing.T) {
   576  				tc := newTestConn(t, serverSide, opts...)
   577  				tc.handshake()
   578  				sid := newStreamID(clientSide, styp, 0)
   579  				finalSize := f(tc, sid)
   580  				test.finalFrame(tc, sid, finalSize)
   581  				tc.wantFrame("change in final size of stream is an error",
   582  					packetType1RTT, debugFrameConnectionCloseTransport{
   583  						code: wantErr,
   584  					},
   585  				)
   586  			})
   587  		}
   588  	})
   589  }
   590  
   591  func TestStreamFinalSizeChangedAfterFin(t *testing.T) {
   592  	// "If a RESET_STREAM or STREAM frame is received indicating a change
   593  	// in the final size for the stream, an endpoint SHOULD respond with
   594  	// an error of type FINAL_SIZE_ERROR [...]"
   595  	// https://www.rfc-editor.org/rfc/rfc9000#section-4.5-5
   596  	finalSizeTest(t, errFinalSize, func(tc *testConn, sid streamID) (finalSize int64) {
   597  		tc.writeFrames(packetType1RTT, debugFrameStream{
   598  			id:  sid,
   599  			off: 10,
   600  			fin: true,
   601  		})
   602  		return 9
   603  	})
   604  }
   605  
   606  func TestStreamFinalSizeBeforePreviousData(t *testing.T) {
   607  	finalSizeTest(t, errFinalSize, func(tc *testConn, sid streamID) (finalSize int64) {
   608  		tc.writeFrames(packetType1RTT, debugFrameStream{
   609  			id:   sid,
   610  			off:  10,
   611  			data: []byte{0},
   612  		})
   613  		return 9
   614  	})
   615  }
   616  
   617  func TestStreamFinalSizePastMaxStreamData(t *testing.T) {
   618  	finalSizeTest(t, errFlowControl, func(tc *testConn, sid streamID) (finalSize int64) {
   619  		return 11
   620  	}, func(c *Config) {
   621  		c.MaxStreamReadBufferSize = 10
   622  	})
   623  }
   624  
   625  func TestStreamDataBeyondFinalSize(t *testing.T) {
   626  	// "A receiver SHOULD treat receipt of data at or beyond
   627  	// the final size as an error of type FINAL_SIZE_ERROR [...]"
   628  	// https://www.rfc-editor.org/rfc/rfc9000#section-4.5-5
   629  	testStreamTypes(t, "", func(t *testing.T, styp streamType) {
   630  		tc := newTestConn(t, serverSide)
   631  		tc.handshake()
   632  		sid := newStreamID(clientSide, styp, 0)
   633  
   634  		const write1size = 4
   635  		tc.writeFrames(packetType1RTT, debugFrameStream{
   636  			id:   sid,
   637  			off:  0,
   638  			data: make([]byte, 16),
   639  			fin:  true,
   640  		})
   641  		tc.writeFrames(packetType1RTT, debugFrameStream{
   642  			id:   sid,
   643  			off:  16,
   644  			data: []byte{0},
   645  		})
   646  		tc.wantFrame("received data past final size of stream",
   647  			packetType1RTT, debugFrameConnectionCloseTransport{
   648  				code: errFinalSize,
   649  			},
   650  		)
   651  	})
   652  }
   653  
   654  func TestStreamReceiveUnblocksReader(t *testing.T) {
   655  	testStreamTypes(t, "", func(t *testing.T, styp streamType) {
   656  		tc := newTestConn(t, serverSide)
   657  		tc.handshake()
   658  		want := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
   659  		sid := newStreamID(clientSide, styp, 0)
   660  
   661  		// AcceptStream blocks until a STREAM frame is received.
   662  		accept := runAsync(tc, func(ctx context.Context) (*Stream, error) {
   663  			return tc.conn.AcceptStream(ctx)
   664  		})
   665  		const write1size = 4
   666  		tc.writeFrames(packetType1RTT, debugFrameStream{
   667  			id:   sid,
   668  			off:  0,
   669  			data: want[:write1size],
   670  		})
   671  		s, err := accept.result()
   672  		if err != nil {
   673  			t.Fatalf("AcceptStream() = %v", err)
   674  		}
   675  
   676  		// ReadContext succeeds immediately, since we already have data.
   677  		got := make([]byte, len(want))
   678  		read := runAsync(tc, func(ctx context.Context) (int, error) {
   679  			return s.ReadContext(ctx, got)
   680  		})
   681  		if n, err := read.result(); n != write1size || err != nil {
   682  			t.Fatalf("ReadContext = %v, %v; want %v, nil", n, err, write1size)
   683  		}
   684  
   685  		// ReadContext blocks waiting for more data.
   686  		read = runAsync(tc, func(ctx context.Context) (int, error) {
   687  			return s.ReadContext(ctx, got[write1size:])
   688  		})
   689  		tc.writeFrames(packetType1RTT, debugFrameStream{
   690  			id:   sid,
   691  			off:  write1size,
   692  			data: want[write1size:],
   693  			fin:  true,
   694  		})
   695  		if n, err := read.result(); n != len(want)-write1size || err != io.EOF {
   696  			t.Fatalf("ReadContext = %v, %v; want %v, io.EOF", n, err, len(want)-write1size)
   697  		}
   698  		if !bytes.Equal(got, want) {
   699  			t.Fatalf("read bytes %x, want %x", got, want)
   700  		}
   701  	})
   702  }
   703  
   704  // testStreamSendFrameInvalidState calls the test func with a stream ID for:
   705  //
   706  //   - a remote bidirectional stream that the peer has not created
   707  //   - a remote unidirectional stream
   708  //
   709  // It then sends the returned frame (STREAM, STREAM_DATA_BLOCKED, etc.)
   710  // to the conn and expects a STREAM_STATE_ERROR.
   711  func testStreamSendFrameInvalidState(t *testing.T, f func(sid streamID) debugFrame) {
   712  	testSides(t, "stream_not_created", func(t *testing.T, side connSide) {
   713  		tc := newTestConn(t, side, permissiveTransportParameters)
   714  		tc.handshake()
   715  		tc.writeFrames(packetType1RTT, f(newStreamID(side, bidiStream, 0)))
   716  		tc.wantFrame("frame for local stream which has not been created",
   717  			packetType1RTT, debugFrameConnectionCloseTransport{
   718  				code: errStreamState,
   719  			})
   720  	})
   721  	testSides(t, "uni_stream", func(t *testing.T, side connSide) {
   722  		ctx := canceledContext()
   723  		tc := newTestConn(t, side, permissiveTransportParameters)
   724  		tc.handshake()
   725  		sid := newStreamID(side, uniStream, 0)
   726  		s, err := tc.conn.NewSendOnlyStream(ctx)
   727  		if err != nil {
   728  			t.Fatal(err)
   729  		}
   730  		s.Flush() // open the stream
   731  		tc.wantFrame("new stream is opened",
   732  			packetType1RTT, debugFrameStream{
   733  				id:   sid,
   734  				data: []byte{},
   735  			})
   736  		tc.writeFrames(packetType1RTT, f(sid))
   737  		tc.wantFrame("send-oriented frame for send-only stream",
   738  			packetType1RTT, debugFrameConnectionCloseTransport{
   739  				code: errStreamState,
   740  			})
   741  	})
   742  }
   743  
   744  func TestStreamResetStreamInvalidState(t *testing.T) {
   745  	// "An endpoint that receives a RESET_STREAM frame for a send-only
   746  	// stream MUST terminate the connection with error STREAM_STATE_ERROR."
   747  	// https://www.rfc-editor.org/rfc/rfc9000#section-19.4-3
   748  	testStreamSendFrameInvalidState(t, func(sid streamID) debugFrame {
   749  		return debugFrameResetStream{
   750  			id:        sid,
   751  			code:      0,
   752  			finalSize: 0,
   753  		}
   754  	})
   755  }
   756  
   757  func TestStreamStreamFrameInvalidState(t *testing.T) {
   758  	// "An endpoint MUST terminate the connection with error STREAM_STATE_ERROR
   759  	// if it receives a STREAM frame for a locally initiated stream
   760  	// that has not yet been created, or for a send-only stream."
   761  	// https://www.rfc-editor.org/rfc/rfc9000.html#section-19.8-3
   762  	testStreamSendFrameInvalidState(t, func(sid streamID) debugFrame {
   763  		return debugFrameStream{
   764  			id: sid,
   765  		}
   766  	})
   767  }
   768  
   769  func TestStreamDataBlockedInvalidState(t *testing.T) {
   770  	// "An endpoint MUST terminate the connection with error STREAM_STATE_ERROR
   771  	// if it receives a STREAM frame for a locally initiated stream
   772  	// that has not yet been created, or for a send-only stream."
   773  	// https://www.rfc-editor.org/rfc/rfc9000.html#section-19.8-3
   774  	testStreamSendFrameInvalidState(t, func(sid streamID) debugFrame {
   775  		return debugFrameStream{
   776  			id: sid,
   777  		}
   778  	})
   779  }
   780  
   781  // testStreamReceiveFrameInvalidState calls the test func with a stream ID for:
   782  //
   783  //   - a remote bidirectional stream that the peer has not created
   784  //   - a local unidirectional stream
   785  //
   786  // It then sends the returned frame (MAX_STREAM_DATA, STOP_SENDING, etc.)
   787  // to the conn and expects a STREAM_STATE_ERROR.
   788  func testStreamReceiveFrameInvalidState(t *testing.T, f func(sid streamID) debugFrame) {
   789  	testSides(t, "stream_not_created", func(t *testing.T, side connSide) {
   790  		tc := newTestConn(t, side)
   791  		tc.handshake()
   792  		tc.writeFrames(packetType1RTT, f(newStreamID(side, bidiStream, 0)))
   793  		tc.wantFrame("frame for local stream which has not been created",
   794  			packetType1RTT, debugFrameConnectionCloseTransport{
   795  				code: errStreamState,
   796  			})
   797  	})
   798  	testSides(t, "uni_stream", func(t *testing.T, side connSide) {
   799  		tc := newTestConn(t, side)
   800  		tc.handshake()
   801  		tc.writeFrames(packetType1RTT, f(newStreamID(side.peer(), uniStream, 0)))
   802  		tc.wantFrame("receive-oriented frame for receive-only stream",
   803  			packetType1RTT, debugFrameConnectionCloseTransport{
   804  				code: errStreamState,
   805  			})
   806  	})
   807  }
   808  
   809  func TestStreamStopSendingInvalidState(t *testing.T) {
   810  	// "Receiving a STOP_SENDING frame for a locally initiated stream
   811  	// that has not yet been created MUST be treated as a connection error
   812  	// of type STREAM_STATE_ERROR. An endpoint that receives a STOP_SENDING
   813  	// frame for a receive-only stream MUST terminate the connection with
   814  	// error STREAM_STATE_ERROR."
   815  	// https://www.rfc-editor.org/rfc/rfc9000#section-19.5-2
   816  	testStreamReceiveFrameInvalidState(t, func(sid streamID) debugFrame {
   817  		return debugFrameStopSending{
   818  			id: sid,
   819  		}
   820  	})
   821  }
   822  
   823  func TestStreamMaxStreamDataInvalidState(t *testing.T) {
   824  	// "Receiving a MAX_STREAM_DATA frame for a locally initiated stream
   825  	// that has not yet been created MUST be treated as a connection error
   826  	// of type STREAM_STATE_ERROR. An endpoint that receives a MAX_STREAM_DATA
   827  	// frame for a receive-only stream MUST terminate the connection
   828  	// with error STREAM_STATE_ERROR."
   829  	// https://www.rfc-editor.org/rfc/rfc9000#section-19.10-2
   830  	testStreamReceiveFrameInvalidState(t, func(sid streamID) debugFrame {
   831  		return debugFrameMaxStreamData{
   832  			id:  sid,
   833  			max: 1000,
   834  		}
   835  	})
   836  }
   837  
   838  func TestStreamOffsetTooLarge(t *testing.T) {
   839  	// "Receipt of a frame that exceeds [2^62-1] MUST be treated as a
   840  	// connection error of type FRAME_ENCODING_ERROR or FLOW_CONTROL_ERROR."
   841  	// https://www.rfc-editor.org/rfc/rfc9000.html#section-19.8-9
   842  	tc := newTestConn(t, serverSide)
   843  	tc.handshake()
   844  
   845  	tc.writeFrames(packetType1RTT,
   846  		debugFrameStream{
   847  			id:   newStreamID(clientSide, bidiStream, 0),
   848  			off:  1<<62 - 1,
   849  			data: []byte{0},
   850  		})
   851  	got, _ := tc.readFrame()
   852  	want1 := debugFrameConnectionCloseTransport{code: errFrameEncoding}
   853  	want2 := debugFrameConnectionCloseTransport{code: errFlowControl}
   854  	if !frameEqual(got, want1) && !frameEqual(got, want2) {
   855  		t.Fatalf("STREAM offset exceeds 2^62-1\ngot:  %v\nwant: %v\n  or: %v", got, want1, want2)
   856  	}
   857  }
   858  
   859  func TestStreamReadFromWriteOnlyStream(t *testing.T) {
   860  	_, s := newTestConnAndLocalStream(t, serverSide, uniStream, permissiveTransportParameters)
   861  	buf := make([]byte, 10)
   862  	wantErr := "read from write-only stream"
   863  	if n, err := s.Read(buf); err == nil || !strings.Contains(err.Error(), wantErr) {
   864  		t.Errorf("s.Read() = %v, %v; want error %q", n, err, wantErr)
   865  	}
   866  }
   867  
   868  func TestStreamWriteToReadOnlyStream(t *testing.T) {
   869  	_, s := newTestConnAndRemoteStream(t, serverSide, uniStream)
   870  	buf := make([]byte, 10)
   871  	wantErr := "write to read-only stream"
   872  	if n, err := s.Write(buf); err == nil || !strings.Contains(err.Error(), wantErr) {
   873  		t.Errorf("s.Write() = %v, %v; want error %q", n, err, wantErr)
   874  	}
   875  }
   876  
   877  func TestStreamReadFromClosedStream(t *testing.T) {
   878  	tc, s := newTestConnAndRemoteStream(t, serverSide, bidiStream, permissiveTransportParameters)
   879  	s.CloseRead()
   880  	tc.wantFrame("CloseRead sends a STOP_SENDING frame",
   881  		packetType1RTT, debugFrameStopSending{
   882  			id: s.id,
   883  		})
   884  	wantErr := "read from closed stream"
   885  	if n, err := s.Read(make([]byte, 16)); err == nil || !strings.Contains(err.Error(), wantErr) {
   886  		t.Errorf("s.Read() = %v, %v; want error %q", n, err, wantErr)
   887  	}
   888  	// Data which shows up after STOP_SENDING is discarded.
   889  	tc.writeFrames(packetType1RTT, debugFrameStream{
   890  		id:   s.id,
   891  		data: []byte{1, 2, 3},
   892  		fin:  true,
   893  	})
   894  	if n, err := s.Read(make([]byte, 16)); err == nil || !strings.Contains(err.Error(), wantErr) {
   895  		t.Errorf("s.Read() = %v, %v; want error %q", n, err, wantErr)
   896  	}
   897  }
   898  
   899  func TestStreamCloseReadWithAllDataReceived(t *testing.T) {
   900  	tc, s := newTestConnAndRemoteStream(t, serverSide, bidiStream, permissiveTransportParameters)
   901  	tc.writeFrames(packetType1RTT, debugFrameStream{
   902  		id:   s.id,
   903  		data: []byte{1, 2, 3},
   904  		fin:  true,
   905  	})
   906  	s.CloseRead()
   907  	tc.wantIdle("CloseRead in Data Recvd state doesn't need to send STOP_SENDING")
   908  	// We had all the data for the stream, but CloseRead discarded it.
   909  	wantErr := "read from closed stream"
   910  	if n, err := s.Read(make([]byte, 16)); err == nil || !strings.Contains(err.Error(), wantErr) {
   911  		t.Errorf("s.Read() = %v, %v; want error %q", n, err, wantErr)
   912  	}
   913  }
   914  
   915  func TestStreamWriteToClosedStream(t *testing.T) {
   916  	tc, s := newTestConnAndLocalStream(t, serverSide, bidiStream, permissiveTransportParameters)
   917  	s.CloseWrite()
   918  	tc.wantFrame("stream is opened after being closed",
   919  		packetType1RTT, debugFrameStream{
   920  			id:   s.id,
   921  			off:  0,
   922  			fin:  true,
   923  			data: []byte{},
   924  		})
   925  	wantErr := "write to closed stream"
   926  	if n, err := s.Write([]byte{}); err == nil || !strings.Contains(err.Error(), wantErr) {
   927  		t.Errorf("s.Write() = %v, %v; want error %q", n, err, wantErr)
   928  	}
   929  }
   930  
   931  func TestStreamResetBlockedStream(t *testing.T) {
   932  	tc, s := newTestConnAndLocalStream(t, serverSide, bidiStream, permissiveTransportParameters,
   933  		func(c *Config) {
   934  			c.MaxStreamWriteBufferSize = 4
   935  		})
   936  	tc.ignoreFrame(frameTypeStreamDataBlocked)
   937  	writing := runAsync(tc, func(ctx context.Context) (int, error) {
   938  		return s.WriteContext(ctx, []byte{0, 1, 2, 3, 4, 5, 6, 7})
   939  	})
   940  	tc.wantFrame("stream writes data until write buffer fills",
   941  		packetType1RTT, debugFrameStream{
   942  			id:   s.id,
   943  			off:  0,
   944  			data: []byte{0, 1, 2, 3},
   945  		})
   946  	s.Reset(42)
   947  	tc.wantFrame("stream is reset",
   948  		packetType1RTT, debugFrameResetStream{
   949  			id:        s.id,
   950  			code:      42,
   951  			finalSize: 4,
   952  		})
   953  	wantErr := "write to reset stream"
   954  	if n, err := writing.result(); n != 4 || !strings.Contains(err.Error(), wantErr) {
   955  		t.Errorf("s.Write() interrupted by Reset: %v, %q; want 4, %q", n, err, wantErr)
   956  	}
   957  	tc.writeAckForAll()
   958  	tc.wantIdle("buffer space is available, but stream has been reset")
   959  	s.Reset(100)
   960  	tc.wantIdle("resetting stream a second time has no effect")
   961  	if n, err := s.Write([]byte{}); err == nil || !strings.Contains(err.Error(), wantErr) {
   962  		t.Errorf("s.Write() = %v, %v; want error %q", n, err, wantErr)
   963  	}
   964  }
   965  
   966  func TestStreamWriteMoreThanOnePacketOfData(t *testing.T) {
   967  	tc, s := newTestConnAndLocalStream(t, serverSide, uniStream, func(p *transportParameters) {
   968  		p.initialMaxStreamsUni = 1
   969  		p.initialMaxData = 1 << 20
   970  		p.initialMaxStreamDataUni = 1 << 20
   971  	})
   972  	want := make([]byte, 4096)
   973  	rand.Read(want) // doesn't need to be crypto/rand, but non-deprecated and harmless
   974  	w := runAsync(tc, func(ctx context.Context) (int, error) {
   975  		n, err := s.WriteContext(ctx, want)
   976  		s.Flush()
   977  		return n, err
   978  	})
   979  	got := make([]byte, 0, len(want))
   980  	for {
   981  		f, _ := tc.readFrame()
   982  		if f == nil {
   983  			break
   984  		}
   985  		sf, ok := f.(debugFrameStream)
   986  		if !ok {
   987  			t.Fatalf("unexpected frame: %v", sf)
   988  		}
   989  		if len(got) != int(sf.off) {
   990  			t.Fatalf("got frame: %v\nwant offset %v", sf, len(got))
   991  		}
   992  		got = append(got, sf.data...)
   993  	}
   994  	if n, err := w.result(); n != len(want) || err != nil {
   995  		t.Fatalf("s.WriteContext() = %v, %v; want %v, nil", n, err, len(want))
   996  	}
   997  	if !bytes.Equal(got, want) {
   998  		t.Fatalf("mismatch in received stream data")
   999  	}
  1000  }
  1001  
  1002  func TestStreamCloseWaitsForAcks(t *testing.T) {
  1003  	ctx := canceledContext()
  1004  	tc, s := newTestConnAndLocalStream(t, serverSide, uniStream, permissiveTransportParameters)
  1005  	data := make([]byte, 100)
  1006  	s.WriteContext(ctx, data)
  1007  	s.Flush()
  1008  	tc.wantFrame("conn sends data for the stream",
  1009  		packetType1RTT, debugFrameStream{
  1010  			id:   s.id,
  1011  			data: data,
  1012  		})
  1013  	if err := s.CloseContext(ctx); err != context.Canceled {
  1014  		t.Fatalf("s.Close() = %v, want context.Canceled (data not acked yet)", err)
  1015  	}
  1016  	tc.wantFrame("conn sends FIN for closed stream",
  1017  		packetType1RTT, debugFrameStream{
  1018  			id:   s.id,
  1019  			off:  int64(len(data)),
  1020  			fin:  true,
  1021  			data: []byte{},
  1022  		})
  1023  	closing := runAsync(tc, func(ctx context.Context) (struct{}, error) {
  1024  		return struct{}{}, s.CloseContext(ctx)
  1025  	})
  1026  	if _, err := closing.result(); err != errNotDone {
  1027  		t.Fatalf("s.CloseContext() = %v, want it to block waiting for acks", err)
  1028  	}
  1029  	tc.writeAckForAll()
  1030  	if _, err := closing.result(); err != nil {
  1031  		t.Fatalf("s.CloseContext() = %v, want nil (all data acked)", err)
  1032  	}
  1033  }
  1034  
  1035  func TestStreamCloseReadOnly(t *testing.T) {
  1036  	tc, s := newTestConnAndRemoteStream(t, serverSide, uniStream, permissiveTransportParameters)
  1037  	if err := s.CloseContext(canceledContext()); err != nil {
  1038  		t.Errorf("s.CloseContext() = %v, want nil", err)
  1039  	}
  1040  	tc.wantFrame("closed stream sends STOP_SENDING",
  1041  		packetType1RTT, debugFrameStopSending{
  1042  			id: s.id,
  1043  		})
  1044  }
  1045  
  1046  func TestStreamCloseUnblocked(t *testing.T) {
  1047  	for _, test := range []struct {
  1048  		name    string
  1049  		unblock func(tc *testConn, s *Stream)
  1050  		success bool
  1051  	}{{
  1052  		name: "data received",
  1053  		unblock: func(tc *testConn, s *Stream) {
  1054  			tc.writeAckForAll()
  1055  		},
  1056  		success: true,
  1057  	}, {
  1058  		name: "stop sending received",
  1059  		unblock: func(tc *testConn, s *Stream) {
  1060  			tc.writeFrames(packetType1RTT, debugFrameStopSending{
  1061  				id: s.id,
  1062  			})
  1063  		},
  1064  	}, {
  1065  		name: "stream reset",
  1066  		unblock: func(tc *testConn, s *Stream) {
  1067  			s.Reset(0)
  1068  			tc.wait() // wait for test conn to process the Reset
  1069  		},
  1070  	}} {
  1071  		t.Run(test.name, func(t *testing.T) {
  1072  			ctx := canceledContext()
  1073  			tc, s := newTestConnAndLocalStream(t, serverSide, uniStream, permissiveTransportParameters)
  1074  			data := make([]byte, 100)
  1075  			s.WriteContext(ctx, data)
  1076  			s.Flush()
  1077  			tc.wantFrame("conn sends data for the stream",
  1078  				packetType1RTT, debugFrameStream{
  1079  					id:   s.id,
  1080  					data: data,
  1081  				})
  1082  			if err := s.CloseContext(ctx); err != context.Canceled {
  1083  				t.Fatalf("s.Close() = %v, want context.Canceled (data not acked yet)", err)
  1084  			}
  1085  			tc.wantFrame("conn sends FIN for closed stream",
  1086  				packetType1RTT, debugFrameStream{
  1087  					id:   s.id,
  1088  					off:  int64(len(data)),
  1089  					fin:  true,
  1090  					data: []byte{},
  1091  				})
  1092  			closing := runAsync(tc, func(ctx context.Context) (struct{}, error) {
  1093  				return struct{}{}, s.CloseContext(ctx)
  1094  			})
  1095  			if _, err := closing.result(); err != errNotDone {
  1096  				t.Fatalf("s.CloseContext() = %v, want it to block waiting for acks", err)
  1097  			}
  1098  			test.unblock(tc, s)
  1099  			_, err := closing.result()
  1100  			switch {
  1101  			case err == errNotDone:
  1102  				t.Fatalf("s.CloseContext() still blocking; want it to have returned")
  1103  			case err == nil && !test.success:
  1104  				t.Fatalf("s.CloseContext() = nil, want error")
  1105  			case err != nil && test.success:
  1106  				t.Fatalf("s.CloseContext() = %v, want nil (all data acked)", err)
  1107  			}
  1108  		})
  1109  	}
  1110  }
  1111  
  1112  func TestStreamCloseWriteWhenBlockedByStreamFlowControl(t *testing.T) {
  1113  	ctx := canceledContext()
  1114  	tc, s := newTestConnAndLocalStream(t, serverSide, uniStream, permissiveTransportParameters,
  1115  		func(p *transportParameters) {
  1116  			//p.initialMaxData = 0
  1117  			p.initialMaxStreamDataUni = 0
  1118  		})
  1119  	tc.ignoreFrame(frameTypeStreamDataBlocked)
  1120  	if _, err := s.WriteContext(ctx, []byte{0, 1}); err != nil {
  1121  		t.Fatalf("s.Write = %v", err)
  1122  	}
  1123  	s.CloseWrite()
  1124  	tc.wantIdle("stream write is blocked by flow control")
  1125  
  1126  	tc.writeFrames(packetType1RTT, debugFrameMaxStreamData{
  1127  		id:  s.id,
  1128  		max: 1,
  1129  	})
  1130  	tc.wantFrame("send data up to flow control limit",
  1131  		packetType1RTT, debugFrameStream{
  1132  			id:   s.id,
  1133  			data: []byte{0},
  1134  		})
  1135  	tc.wantIdle("stream write is again blocked by flow control")
  1136  
  1137  	tc.writeFrames(packetType1RTT, debugFrameMaxStreamData{
  1138  		id:  s.id,
  1139  		max: 2,
  1140  	})
  1141  	tc.wantFrame("send remaining data and FIN",
  1142  		packetType1RTT, debugFrameStream{
  1143  			id:   s.id,
  1144  			off:  1,
  1145  			data: []byte{1},
  1146  			fin:  true,
  1147  		})
  1148  }
  1149  
  1150  func TestStreamPeerResetsWithUnreadAndUnsentData(t *testing.T) {
  1151  	testStreamTypes(t, "", func(t *testing.T, styp streamType) {
  1152  		ctx := canceledContext()
  1153  		tc, s := newTestConnAndRemoteStream(t, serverSide, styp)
  1154  		data := []byte{0, 1, 2, 3, 4, 5, 6, 7}
  1155  		tc.writeFrames(packetType1RTT, debugFrameStream{
  1156  			id:   s.id,
  1157  			data: data,
  1158  		})
  1159  		got := make([]byte, 4)
  1160  		if n, err := s.ReadContext(ctx, got); n != len(got) || err != nil {
  1161  			t.Fatalf("Read start of stream: got %v, %v; want %v, nil", n, err, len(got))
  1162  		}
  1163  		const sentCode = 42
  1164  		tc.writeFrames(packetType1RTT, debugFrameResetStream{
  1165  			id:        s.id,
  1166  			finalSize: 20,
  1167  			code:      sentCode,
  1168  		})
  1169  		wantErr := StreamErrorCode(sentCode)
  1170  		if n, err := s.ReadContext(ctx, got); n != 0 || !errors.Is(err, wantErr) {
  1171  			t.Fatalf("Read reset stream: got %v, %v; want 0, %v", n, err, wantErr)
  1172  		}
  1173  	})
  1174  }
  1175  
  1176  func TestStreamPeerResetWakesBlockedRead(t *testing.T) {
  1177  	testStreamTypes(t, "", func(t *testing.T, styp streamType) {
  1178  		tc, s := newTestConnAndRemoteStream(t, serverSide, styp)
  1179  		reader := runAsync(tc, func(ctx context.Context) (int, error) {
  1180  			got := make([]byte, 4)
  1181  			return s.ReadContext(ctx, got)
  1182  		})
  1183  		const sentCode = 42
  1184  		tc.writeFrames(packetType1RTT, debugFrameResetStream{
  1185  			id:        s.id,
  1186  			finalSize: 20,
  1187  			code:      sentCode,
  1188  		})
  1189  		wantErr := StreamErrorCode(sentCode)
  1190  		if n, err := reader.result(); n != 0 || !errors.Is(err, wantErr) {
  1191  			t.Fatalf("Read reset stream: got %v, %v; want 0, %v", n, err, wantErr)
  1192  		}
  1193  	})
  1194  }
  1195  
  1196  func TestStreamPeerResetFollowedByData(t *testing.T) {
  1197  	testStreamTypes(t, "", func(t *testing.T, styp streamType) {
  1198  		tc, s := newTestConnAndRemoteStream(t, serverSide, styp)
  1199  		tc.writeFrames(packetType1RTT, debugFrameResetStream{
  1200  			id:        s.id,
  1201  			finalSize: 4,
  1202  			code:      1,
  1203  		})
  1204  		tc.writeFrames(packetType1RTT, debugFrameStream{
  1205  			id:   s.id,
  1206  			data: []byte{0, 1, 2, 3},
  1207  		})
  1208  		// Another reset with a different code, for good measure.
  1209  		tc.writeFrames(packetType1RTT, debugFrameResetStream{
  1210  			id:        s.id,
  1211  			finalSize: 4,
  1212  			code:      2,
  1213  		})
  1214  		wantErr := StreamErrorCode(1)
  1215  		if n, err := s.Read(make([]byte, 16)); n != 0 || !errors.Is(err, wantErr) {
  1216  			t.Fatalf("Read from reset stream: got %v, %v; want 0, %v", n, err, wantErr)
  1217  		}
  1218  	})
  1219  }
  1220  
  1221  func TestStreamResetInvalidCode(t *testing.T) {
  1222  	tc, s := newTestConnAndLocalStream(t, serverSide, uniStream, permissiveTransportParameters)
  1223  	s.Reset(1 << 62)
  1224  	tc.wantFrame("reset with invalid code sends a RESET_STREAM anyway",
  1225  		packetType1RTT, debugFrameResetStream{
  1226  			id: s.id,
  1227  			// The code we send here isn't specified,
  1228  			// so this could really be any value.
  1229  			code: (1 << 62) - 1,
  1230  		})
  1231  }
  1232  
  1233  func TestStreamResetReceiveOnly(t *testing.T) {
  1234  	tc, s := newTestConnAndRemoteStream(t, serverSide, uniStream)
  1235  	s.Reset(0)
  1236  	tc.wantIdle("resetting a receive-only stream has no effect")
  1237  }
  1238  
  1239  func TestStreamPeerStopSendingForActiveStream(t *testing.T) {
  1240  	// "An endpoint that receives a STOP_SENDING frame MUST send a RESET_STREAM frame if
  1241  	// the stream is in the "Ready" or "Send" state."
  1242  	// https://www.rfc-editor.org/rfc/rfc9000#section-3.5-4
  1243  	testStreamTypes(t, "", func(t *testing.T, styp streamType) {
  1244  		tc, s := newTestConnAndLocalStream(t, serverSide, styp, permissiveTransportParameters)
  1245  		for i := 0; i < 4; i++ {
  1246  			s.Write([]byte{byte(i)})
  1247  			s.Flush()
  1248  			tc.wantFrame("write sends a STREAM frame to peer",
  1249  				packetType1RTT, debugFrameStream{
  1250  					id:   s.id,
  1251  					off:  int64(i),
  1252  					data: []byte{byte(i)},
  1253  				})
  1254  		}
  1255  		tc.writeFrames(packetType1RTT, debugFrameStopSending{
  1256  			id:   s.id,
  1257  			code: 42,
  1258  		})
  1259  		tc.wantFrame("receiving STOP_SENDING causes stream reset",
  1260  			packetType1RTT, debugFrameResetStream{
  1261  				id:        s.id,
  1262  				code:      42,
  1263  				finalSize: 4,
  1264  			})
  1265  		if n, err := s.Write([]byte{0}); err == nil {
  1266  			t.Errorf("s.Write() after STOP_SENDING = %v, %v; want error", n, err)
  1267  		}
  1268  		// This ack will result in some of the previous frames being marked as lost.
  1269  		tc.writeAckForLatest()
  1270  		tc.wantIdle("lost STREAM frames for reset stream are not resent")
  1271  	})
  1272  }
  1273  
  1274  func TestStreamReceiveDataBlocked(t *testing.T) {
  1275  	tc := newTestConn(t, serverSide, permissiveTransportParameters)
  1276  	tc.handshake()
  1277  	tc.ignoreFrame(frameTypeAck)
  1278  
  1279  	// We don't do anything with these frames,
  1280  	// but should accept them if the peer sends one.
  1281  	tc.writeFrames(packetType1RTT, debugFrameStreamDataBlocked{
  1282  		id:  newStreamID(clientSide, bidiStream, 0),
  1283  		max: 100,
  1284  	})
  1285  	tc.writeFrames(packetType1RTT, debugFrameDataBlocked{
  1286  		max: 100,
  1287  	})
  1288  	tc.wantIdle("no response to STREAM_DATA_BLOCKED and DATA_BLOCKED")
  1289  }
  1290  
  1291  func TestStreamFlushExplicit(t *testing.T) {
  1292  	testStreamTypes(t, "", func(t *testing.T, styp streamType) {
  1293  		tc, s := newTestConnAndLocalStream(t, clientSide, styp, permissiveTransportParameters)
  1294  		want := []byte{0, 1, 2, 3}
  1295  		n, err := s.Write(want)
  1296  		if n != len(want) || err != nil {
  1297  			t.Fatalf("s.Write() = %v, %v; want %v, nil", n, err, len(want))
  1298  		}
  1299  		tc.wantIdle("unflushed data is not sent")
  1300  		s.Flush()
  1301  		tc.wantFrame("data is sent after flush",
  1302  			packetType1RTT, debugFrameStream{
  1303  				id:   s.id,
  1304  				data: want,
  1305  			})
  1306  	})
  1307  }
  1308  
  1309  func TestStreamFlushImplicitExact(t *testing.T) {
  1310  	testStreamTypes(t, "", func(t *testing.T, styp streamType) {
  1311  		const writeBufferSize = 4
  1312  		tc, s := newTestConnAndLocalStream(t, clientSide, styp,
  1313  			permissiveTransportParameters,
  1314  			func(c *Config) {
  1315  				c.MaxStreamWriteBufferSize = writeBufferSize
  1316  			})
  1317  		want := []byte{0, 1, 2, 3, 4, 5, 6}
  1318  
  1319  		// This write doesn't quite fill the output buffer.
  1320  		n, err := s.Write(want[:3])
  1321  		if n != 3 || err != nil {
  1322  			t.Fatalf("s.Write() = %v, %v; want %v, nil", n, err, len(want))
  1323  		}
  1324  		tc.wantIdle("unflushed data is not sent")
  1325  
  1326  		// This write fills the output buffer exactly.
  1327  		n, err = s.Write(want[3:4])
  1328  		if n != 1 || err != nil {
  1329  			t.Fatalf("s.Write() = %v, %v; want %v, nil", n, err, len(want))
  1330  		}
  1331  		tc.wantFrame("data is sent after write buffer fills",
  1332  			packetType1RTT, debugFrameStream{
  1333  				id:   s.id,
  1334  				data: want[0:4],
  1335  			})
  1336  
  1337  	})
  1338  }
  1339  
  1340  func TestStreamFlushImplicitLargerThanBuffer(t *testing.T) {
  1341  	testStreamTypes(t, "", func(t *testing.T, styp streamType) {
  1342  		const writeBufferSize = 4
  1343  		tc, s := newTestConnAndLocalStream(t, clientSide, styp,
  1344  			permissiveTransportParameters,
  1345  			func(c *Config) {
  1346  				c.MaxStreamWriteBufferSize = writeBufferSize
  1347  			})
  1348  		want := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
  1349  
  1350  		w := runAsync(tc, func(ctx context.Context) (int, error) {
  1351  			n, err := s.WriteContext(ctx, want)
  1352  			return n, err
  1353  		})
  1354  
  1355  		tc.wantFrame("data is sent after write buffer fills",
  1356  			packetType1RTT, debugFrameStream{
  1357  				id:   s.id,
  1358  				data: want[0:4],
  1359  			})
  1360  		tc.writeAckForAll()
  1361  		tc.wantFrame("ack permits sending more data",
  1362  			packetType1RTT, debugFrameStream{
  1363  				id:   s.id,
  1364  				off:  4,
  1365  				data: want[4:8],
  1366  			})
  1367  		tc.writeAckForAll()
  1368  
  1369  		tc.wantIdle("write buffer is not full")
  1370  		if n, err := w.result(); n != len(want) || err != nil {
  1371  			t.Fatalf("Write() = %v, %v; want %v, nil", n, err, len(want))
  1372  		}
  1373  
  1374  		s.Flush()
  1375  		tc.wantFrame("flush sends last buffer of data",
  1376  			packetType1RTT, debugFrameStream{
  1377  				id:   s.id,
  1378  				off:  8,
  1379  				data: want[8:],
  1380  			})
  1381  	})
  1382  }
  1383  
  1384  type streamSide string
  1385  
  1386  const (
  1387  	localStream  = streamSide("local")
  1388  	remoteStream = streamSide("remote")
  1389  )
  1390  
  1391  func newTestConnAndStream(t *testing.T, side connSide, sside streamSide, styp streamType, opts ...any) (*testConn, *Stream) {
  1392  	if sside == localStream {
  1393  		return newTestConnAndLocalStream(t, side, styp, opts...)
  1394  	} else {
  1395  		return newTestConnAndRemoteStream(t, side, styp, opts...)
  1396  	}
  1397  }
  1398  
  1399  func newTestConnAndLocalStream(t *testing.T, side connSide, styp streamType, opts ...any) (*testConn, *Stream) {
  1400  	t.Helper()
  1401  	tc := newTestConn(t, side, opts...)
  1402  	tc.handshake()
  1403  	tc.ignoreFrame(frameTypeAck)
  1404  	return tc, newLocalStream(t, tc, styp)
  1405  }
  1406  
  1407  func newLocalStream(t *testing.T, tc *testConn, styp streamType) *Stream {
  1408  	t.Helper()
  1409  	ctx := canceledContext()
  1410  	s, err := tc.conn.newLocalStream(ctx, styp)
  1411  	if err != nil {
  1412  		t.Fatalf("conn.newLocalStream(%v) = %v", styp, err)
  1413  	}
  1414  	return s
  1415  }
  1416  
  1417  func newTestConnAndRemoteStream(t *testing.T, side connSide, styp streamType, opts ...any) (*testConn, *Stream) {
  1418  	t.Helper()
  1419  	tc := newTestConn(t, side, opts...)
  1420  	tc.handshake()
  1421  	tc.ignoreFrame(frameTypeAck)
  1422  	return tc, newRemoteStream(t, tc, styp)
  1423  }
  1424  
  1425  func newRemoteStream(t *testing.T, tc *testConn, styp streamType) *Stream {
  1426  	t.Helper()
  1427  	ctx := canceledContext()
  1428  	tc.writeFrames(packetType1RTT, debugFrameStream{
  1429  		id: newStreamID(tc.conn.side.peer(), styp, 0),
  1430  	})
  1431  	s, err := tc.conn.AcceptStream(ctx)
  1432  	if err != nil {
  1433  		t.Fatalf("conn.AcceptStream() = %v", err)
  1434  	}
  1435  	return s
  1436  }
  1437  
  1438  // permissiveTransportParameters may be passed as an option to newTestConn.
  1439  func permissiveTransportParameters(p *transportParameters) {
  1440  	p.initialMaxStreamsBidi = maxStreamsLimit
  1441  	p.initialMaxStreamsUni = maxStreamsLimit
  1442  	p.initialMaxData = maxVarint
  1443  	p.initialMaxStreamDataBidiRemote = maxVarint
  1444  	p.initialMaxStreamDataBidiLocal = maxVarint
  1445  	p.initialMaxStreamDataUni = maxVarint
  1446  }
  1447  
  1448  func makeTestData(n int) []byte {
  1449  	b := make([]byte, n)
  1450  	for i := 0; i < n; i++ {
  1451  		b[i] = byte(i)
  1452  	}
  1453  	return b
  1454  }
  1455  

View as plain text