...

Source file src/golang.org/x/net/internal/quic/conn_streams_test.go

Documentation: golang.org/x/net/internal/quic

     1  // Copyright 2023 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  //go:build go1.21
     6  
     7  package quic
     8  
     9  import (
    10  	"context"
    11  	"fmt"
    12  	"io"
    13  	"math"
    14  	"sync"
    15  	"testing"
    16  )
    17  
    18  func TestStreamsCreate(t *testing.T) {
    19  	ctx := canceledContext()
    20  	tc := newTestConn(t, clientSide, permissiveTransportParameters)
    21  	tc.handshake()
    22  
    23  	s, err := tc.conn.NewStream(ctx)
    24  	if err != nil {
    25  		t.Fatalf("NewStream: %v", err)
    26  	}
    27  	s.Flush() // open the stream
    28  	tc.wantFrame("created bidirectional stream 0",
    29  		packetType1RTT, debugFrameStream{
    30  			id:   0, // client-initiated, bidi, number 0
    31  			data: []byte{},
    32  		})
    33  
    34  	s, err = tc.conn.NewSendOnlyStream(ctx)
    35  	if err != nil {
    36  		t.Fatalf("NewStream: %v", err)
    37  	}
    38  	s.Flush() // open the stream
    39  	tc.wantFrame("created unidirectional stream 0",
    40  		packetType1RTT, debugFrameStream{
    41  			id:   2, // client-initiated, uni, number 0
    42  			data: []byte{},
    43  		})
    44  
    45  	s, err = tc.conn.NewStream(ctx)
    46  	if err != nil {
    47  		t.Fatalf("NewStream: %v", err)
    48  	}
    49  	s.Flush() // open the stream
    50  	tc.wantFrame("created bidirectional stream 1",
    51  		packetType1RTT, debugFrameStream{
    52  			id:   4, // client-initiated, uni, number 4
    53  			data: []byte{},
    54  		})
    55  }
    56  
    57  func TestStreamsAccept(t *testing.T) {
    58  	ctx := canceledContext()
    59  	tc := newTestConn(t, serverSide)
    60  	tc.handshake()
    61  
    62  	tc.writeFrames(packetType1RTT,
    63  		debugFrameStream{
    64  			id: 0, // client-initiated, bidi, number 0
    65  		},
    66  		debugFrameStream{
    67  			id: 2, // client-initiated, uni, number 0
    68  		},
    69  		debugFrameStream{
    70  			id: 4, // client-initiated, bidi, number 1
    71  		})
    72  
    73  	for _, accept := range []struct {
    74  		id       streamID
    75  		readOnly bool
    76  	}{
    77  		{0, false},
    78  		{2, true},
    79  		{4, false},
    80  	} {
    81  		s, err := tc.conn.AcceptStream(ctx)
    82  		if err != nil {
    83  			t.Fatalf("conn.AcceptStream() = %v, want stream %v", err, accept.id)
    84  		}
    85  		if got, want := s.id, accept.id; got != want {
    86  			t.Fatalf("conn.AcceptStream() = stream %v, want %v", got, want)
    87  		}
    88  		if got, want := s.IsReadOnly(), accept.readOnly; got != want {
    89  			t.Fatalf("stream %v: s.IsReadOnly() = %v, want %v", accept.id, got, want)
    90  		}
    91  	}
    92  
    93  	_, err := tc.conn.AcceptStream(ctx)
    94  	if err != context.Canceled {
    95  		t.Fatalf("conn.AcceptStream() = %v, want context.Canceled", err)
    96  	}
    97  }
    98  
    99  func TestStreamsBlockingAccept(t *testing.T) {
   100  	tc := newTestConn(t, serverSide)
   101  	tc.handshake()
   102  
   103  	a := runAsync(tc, func(ctx context.Context) (*Stream, error) {
   104  		return tc.conn.AcceptStream(ctx)
   105  	})
   106  	if _, err := a.result(); err != errNotDone {
   107  		tc.t.Fatalf("AcceptStream() = _, %v; want errNotDone", err)
   108  	}
   109  
   110  	sid := newStreamID(clientSide, bidiStream, 0)
   111  	tc.writeFrames(packetType1RTT,
   112  		debugFrameStream{
   113  			id: sid,
   114  		})
   115  
   116  	s, err := a.result()
   117  	if err != nil {
   118  		t.Fatalf("conn.AcceptStream() = _, %v, want stream", err)
   119  	}
   120  	if got, want := s.id, sid; got != want {
   121  		t.Fatalf("conn.AcceptStream() = stream %v, want %v", got, want)
   122  	}
   123  	if got, want := s.IsReadOnly(), false; got != want {
   124  		t.Fatalf("s.IsReadOnly() = %v, want %v", got, want)
   125  	}
   126  }
   127  
   128  func TestStreamsLocalStreamNotCreated(t *testing.T) {
   129  	// "An endpoint MUST terminate the connection with error STREAM_STATE_ERROR
   130  	// if it receives a STREAM frame for a locally initiated stream that has
   131  	// not yet been created [...]"
   132  	// https://www.rfc-editor.org/rfc/rfc9000.html#section-19.8-3
   133  	tc := newTestConn(t, serverSide)
   134  	tc.handshake()
   135  
   136  	tc.writeFrames(packetType1RTT,
   137  		debugFrameStream{
   138  			id: 1, // server-initiated, bidi, number 0
   139  		})
   140  	tc.wantFrame("peer sent STREAM frame for an uncreated local stream",
   141  		packetType1RTT, debugFrameConnectionCloseTransport{
   142  			code: errStreamState,
   143  		})
   144  }
   145  
   146  func TestStreamsLocalStreamClosed(t *testing.T) {
   147  	tc, s := newTestConnAndLocalStream(t, clientSide, uniStream, permissiveTransportParameters)
   148  	s.CloseWrite()
   149  	tc.wantFrame("FIN for closed stream",
   150  		packetType1RTT, debugFrameStream{
   151  			id:   newStreamID(clientSide, uniStream, 0),
   152  			fin:  true,
   153  			data: []byte{},
   154  		})
   155  	tc.writeAckForAll()
   156  
   157  	tc.writeFrames(packetType1RTT, debugFrameStopSending{
   158  		id: newStreamID(clientSide, uniStream, 0),
   159  	})
   160  	tc.wantIdle("frame for finalized stream is ignored")
   161  
   162  	// ACKing the last stream packet should have cleaned up the stream.
   163  	// Check that we don't have any state left.
   164  	if got := len(tc.conn.streams.streams); got != 0 {
   165  		t.Fatalf("after close, len(tc.conn.streams.streams) = %v, want 0", got)
   166  	}
   167  	if tc.conn.streams.queueMeta.head != nil {
   168  		t.Fatalf("after close, stream send queue is not empty; should be")
   169  	}
   170  }
   171  
   172  func TestStreamsStreamSendOnly(t *testing.T) {
   173  	// "An endpoint MUST terminate the connection with error STREAM_STATE_ERROR
   174  	// if it receives a STREAM frame for a locally initiated stream that has
   175  	// not yet been created [...]"
   176  	// https://www.rfc-editor.org/rfc/rfc9000.html#section-19.8-3
   177  	ctx := canceledContext()
   178  	tc := newTestConn(t, serverSide, permissiveTransportParameters)
   179  	tc.handshake()
   180  
   181  	s, err := tc.conn.NewSendOnlyStream(ctx)
   182  	if err != nil {
   183  		t.Fatalf("NewStream: %v", err)
   184  	}
   185  	s.Flush() // open the stream
   186  	tc.wantFrame("created unidirectional stream 0",
   187  		packetType1RTT, debugFrameStream{
   188  			id:   3, // server-initiated, uni, number 0
   189  			data: []byte{},
   190  		})
   191  
   192  	tc.writeFrames(packetType1RTT,
   193  		debugFrameStream{
   194  			id: 3, // server-initiated, bidi, number 0
   195  		})
   196  	tc.wantFrame("peer sent STREAM frame for a send-only stream",
   197  		packetType1RTT, debugFrameConnectionCloseTransport{
   198  			code: errStreamState,
   199  		})
   200  }
   201  
   202  func TestStreamsWriteQueueFairness(t *testing.T) {
   203  	ctx := canceledContext()
   204  	const dataLen = 1 << 20
   205  	const numStreams = 3
   206  	tc := newTestConn(t, clientSide, func(p *transportParameters) {
   207  		p.initialMaxStreamsBidi = numStreams
   208  		p.initialMaxData = 1<<62 - 1
   209  		p.initialMaxStreamDataBidiRemote = dataLen
   210  	}, func(c *Config) {
   211  		c.MaxStreamWriteBufferSize = dataLen
   212  	})
   213  	tc.handshake()
   214  	tc.ignoreFrame(frameTypeAck)
   215  
   216  	// Create a number of streams, and write a bunch of data to them.
   217  	// The streams are not limited by flow control.
   218  	//
   219  	// The first stream we create is going to immediately consume all
   220  	// available congestion window.
   221  	//
   222  	// Once we've created all the remaining streams,
   223  	// we start sending acks back to open up the congestion window.
   224  	// We verify that all streams can make progress.
   225  	data := make([]byte, dataLen)
   226  	var streams []*Stream
   227  	for i := 0; i < numStreams; i++ {
   228  		s, err := tc.conn.NewStream(ctx)
   229  		if err != nil {
   230  			t.Fatal(err)
   231  		}
   232  		streams = append(streams, s)
   233  		if n, err := s.WriteContext(ctx, data); n != len(data) || err != nil {
   234  			t.Fatalf("s.WriteContext() = %v, %v; want %v, nil", n, err, len(data))
   235  		}
   236  		// Wait for the stream to finish writing whatever frames it can before
   237  		// congestion control blocks it.
   238  		tc.wait()
   239  	}
   240  
   241  	sent := make([]int64, len(streams))
   242  	for {
   243  		p := tc.readPacket()
   244  		if p == nil {
   245  			break
   246  		}
   247  		tc.writeFrames(packetType1RTT, debugFrameAck{
   248  			ranges: []i64range[packetNumber]{{0, p.num}},
   249  		})
   250  		for _, f := range p.frames {
   251  			sf, ok := f.(debugFrameStream)
   252  			if !ok {
   253  				t.Fatalf("got unexpected frame (want STREAM): %v", sf)
   254  			}
   255  			if got, want := sf.off, sent[sf.id.num()]; got != want {
   256  				t.Fatalf("got frame: %v\nwant offset: %v", sf, want)
   257  			}
   258  			sent[sf.id.num()] = sf.off + int64(len(sf.data))
   259  			// Look at the amount of data sent by all streams, excluding the first one.
   260  			// (The first stream got a head start when it consumed the initial window.)
   261  			//
   262  			// We expect that difference between the streams making the most and least progress
   263  			// so far will be less than the maximum datagram size.
   264  			minSent := sent[1]
   265  			maxSent := sent[1]
   266  			for _, s := range sent[2:] {
   267  				minSent = min(minSent, s)
   268  				maxSent = max(maxSent, s)
   269  			}
   270  			const maxDelta = maxUDPPayloadSize
   271  			if d := maxSent - minSent; d > maxDelta {
   272  				t.Fatalf("stream data sent: %v; delta=%v, want delta <= %v", sent, d, maxDelta)
   273  			}
   274  		}
   275  	}
   276  	// Final check that every stream sent the full amount of data expected.
   277  	for num, s := range sent {
   278  		if s != dataLen {
   279  			t.Errorf("stream %v sent %v bytes, want %v", num, s, dataLen)
   280  		}
   281  	}
   282  }
   283  
   284  func TestStreamsShutdown(t *testing.T) {
   285  	// These tests verify that a stream is removed from the Conn's map of live streams
   286  	// after it is fully shut down.
   287  	//
   288  	// Each case consists of a setup step, after which one stream should exist,
   289  	// and a shutdown step, after which no streams should remain in the Conn.
   290  	for _, test := range []struct {
   291  		name     string
   292  		side     streamSide
   293  		styp     streamType
   294  		setup    func(*testing.T, *testConn, *Stream)
   295  		shutdown func(*testing.T, *testConn, *Stream)
   296  	}{{
   297  		name: "closed",
   298  		side: localStream,
   299  		styp: uniStream,
   300  		setup: func(t *testing.T, tc *testConn, s *Stream) {
   301  			s.CloseContext(canceledContext())
   302  		},
   303  		shutdown: func(t *testing.T, tc *testConn, s *Stream) {
   304  			tc.writeAckForAll()
   305  		},
   306  	}, {
   307  		name: "local close",
   308  		side: localStream,
   309  		styp: bidiStream,
   310  		setup: func(t *testing.T, tc *testConn, s *Stream) {
   311  			tc.writeFrames(packetType1RTT, debugFrameResetStream{
   312  				id: s.id,
   313  			})
   314  			s.CloseContext(canceledContext())
   315  		},
   316  		shutdown: func(t *testing.T, tc *testConn, s *Stream) {
   317  			tc.writeAckForAll()
   318  		},
   319  	}, {
   320  		name: "remote reset",
   321  		side: localStream,
   322  		styp: bidiStream,
   323  		setup: func(t *testing.T, tc *testConn, s *Stream) {
   324  			s.CloseContext(canceledContext())
   325  			tc.wantIdle("all frames after CloseContext are ignored")
   326  			tc.writeAckForAll()
   327  		},
   328  		shutdown: func(t *testing.T, tc *testConn, s *Stream) {
   329  			tc.writeFrames(packetType1RTT, debugFrameResetStream{
   330  				id: s.id,
   331  			})
   332  		},
   333  	}, {
   334  		name: "local close",
   335  		side: remoteStream,
   336  		styp: uniStream,
   337  		setup: func(t *testing.T, tc *testConn, s *Stream) {
   338  			ctx := canceledContext()
   339  			tc.writeFrames(packetType1RTT, debugFrameStream{
   340  				id:  s.id,
   341  				fin: true,
   342  			})
   343  			if n, err := s.ReadContext(ctx, make([]byte, 16)); n != 0 || err != io.EOF {
   344  				t.Errorf("ReadContext() = %v, %v; want 0, io.EOF", n, err)
   345  			}
   346  		},
   347  		shutdown: func(t *testing.T, tc *testConn, s *Stream) {
   348  			s.CloseRead()
   349  		},
   350  	}} {
   351  		name := fmt.Sprintf("%v/%v/%v", test.side, test.styp, test.name)
   352  		t.Run(name, func(t *testing.T) {
   353  			tc, s := newTestConnAndStream(t, serverSide, test.side, test.styp,
   354  				permissiveTransportParameters)
   355  			tc.ignoreFrame(frameTypeStreamBase)
   356  			tc.ignoreFrame(frameTypeStopSending)
   357  			test.setup(t, tc, s)
   358  			tc.wantIdle("conn should be idle after setup")
   359  			if got, want := len(tc.conn.streams.streams), 1; got != want {
   360  				t.Fatalf("after setup: %v streams in Conn's map; want %v", got, want)
   361  			}
   362  			test.shutdown(t, tc, s)
   363  			tc.wantIdle("conn should be idle after shutdown")
   364  			if got, want := len(tc.conn.streams.streams), 0; got != want {
   365  				t.Fatalf("after shutdown: %v streams in Conn's map; want %v", got, want)
   366  			}
   367  		})
   368  	}
   369  }
   370  
   371  func TestStreamsCreateAndCloseRemote(t *testing.T) {
   372  	// This test exercises creating new streams in response to frames
   373  	// from the peer, and cleaning up after streams are fully closed.
   374  	//
   375  	// It's overfitted to the current implementation, but works through
   376  	// a number of corner cases in that implementation.
   377  	//
   378  	// Disable verbose logging in this test: It sends a lot of packets,
   379  	// and they're not especially interesting on their own.
   380  	defer func(vv bool) {
   381  		*testVV = vv
   382  	}(*testVV)
   383  	*testVV = false
   384  	ctx := canceledContext()
   385  	tc := newTestConn(t, serverSide, permissiveTransportParameters)
   386  	tc.handshake()
   387  	tc.ignoreFrame(frameTypeAck)
   388  	type op struct {
   389  		id streamID
   390  	}
   391  	type streamOp op
   392  	type resetOp op
   393  	type acceptOp op
   394  	const noStream = math.MaxInt64
   395  	stringID := func(id streamID) string {
   396  		return fmt.Sprintf("%v/%v", id.streamType(), id.num())
   397  	}
   398  	for _, op := range []any{
   399  		"opening bidi/5 implicitly opens bidi/0-4",
   400  		streamOp{newStreamID(clientSide, bidiStream, 5)},
   401  		acceptOp{newStreamID(clientSide, bidiStream, 5)},
   402  		"bidi/3 was implicitly opened",
   403  		streamOp{newStreamID(clientSide, bidiStream, 3)},
   404  		acceptOp{newStreamID(clientSide, bidiStream, 3)},
   405  		resetOp{newStreamID(clientSide, bidiStream, 3)},
   406  		"bidi/3 is done, frames for it are discarded",
   407  		streamOp{newStreamID(clientSide, bidiStream, 3)},
   408  		"open and close some uni streams as well",
   409  		streamOp{newStreamID(clientSide, uniStream, 0)},
   410  		acceptOp{newStreamID(clientSide, uniStream, 0)},
   411  		streamOp{newStreamID(clientSide, uniStream, 1)},
   412  		acceptOp{newStreamID(clientSide, uniStream, 1)},
   413  		streamOp{newStreamID(clientSide, uniStream, 2)},
   414  		acceptOp{newStreamID(clientSide, uniStream, 2)},
   415  		resetOp{newStreamID(clientSide, uniStream, 1)},
   416  		resetOp{newStreamID(clientSide, uniStream, 0)},
   417  		resetOp{newStreamID(clientSide, uniStream, 2)},
   418  		"closing an implicitly opened stream causes us to accept it",
   419  		resetOp{newStreamID(clientSide, bidiStream, 0)},
   420  		acceptOp{newStreamID(clientSide, bidiStream, 0)},
   421  		resetOp{newStreamID(clientSide, bidiStream, 1)},
   422  		acceptOp{newStreamID(clientSide, bidiStream, 1)},
   423  		resetOp{newStreamID(clientSide, bidiStream, 2)},
   424  		acceptOp{newStreamID(clientSide, bidiStream, 2)},
   425  		"stream bidi/3 was reset previously",
   426  		resetOp{newStreamID(clientSide, bidiStream, 3)},
   427  		resetOp{newStreamID(clientSide, bidiStream, 4)},
   428  		acceptOp{newStreamID(clientSide, bidiStream, 4)},
   429  		"stream bidi/5 was reset previously",
   430  		resetOp{newStreamID(clientSide, bidiStream, 5)},
   431  		"stream bidi/6 was not implicitly opened",
   432  		resetOp{newStreamID(clientSide, bidiStream, 6)},
   433  		acceptOp{newStreamID(clientSide, bidiStream, 6)},
   434  	} {
   435  		if _, ok := op.(acceptOp); !ok {
   436  			if s, err := tc.conn.AcceptStream(ctx); err == nil {
   437  				t.Fatalf("accepted stream %v, want none", stringID(s.id))
   438  			}
   439  		}
   440  		switch op := op.(type) {
   441  		case string:
   442  			t.Log("# " + op)
   443  		case streamOp:
   444  			t.Logf("open stream %v", stringID(op.id))
   445  			tc.writeFrames(packetType1RTT, debugFrameStream{
   446  				id: streamID(op.id),
   447  			})
   448  		case resetOp:
   449  			t.Logf("reset stream %v", stringID(op.id))
   450  			tc.writeFrames(packetType1RTT, debugFrameResetStream{
   451  				id: op.id,
   452  			})
   453  		case acceptOp:
   454  			s, err := tc.conn.AcceptStream(ctx)
   455  			if err != nil {
   456  				t.Fatalf("AcceptStream() = %q; want stream %v", err, stringID(op.id))
   457  			}
   458  			if s.id != op.id {
   459  				t.Fatalf("accepted stram %v; want stream %v", err, stringID(op.id))
   460  			}
   461  			t.Logf("accepted stream %v", stringID(op.id))
   462  			// Immediately close the stream, so the stream becomes done when the
   463  			// peer closes its end.
   464  			s.CloseContext(ctx)
   465  		}
   466  		p := tc.readPacket()
   467  		if p != nil {
   468  			tc.writeFrames(p.ptype, debugFrameAck{
   469  				ranges: []i64range[packetNumber]{{0, p.num + 1}},
   470  			})
   471  		}
   472  	}
   473  	// Every stream should be fully closed now.
   474  	// Check that we don't have any state left.
   475  	if got := len(tc.conn.streams.streams); got != 0 {
   476  		t.Fatalf("after test, len(tc.conn.streams.streams) = %v, want 0", got)
   477  	}
   478  	if tc.conn.streams.queueMeta.head != nil {
   479  		t.Fatalf("after test, stream send queue is not empty; should be")
   480  	}
   481  }
   482  
   483  func TestStreamsCreateConcurrency(t *testing.T) {
   484  	cli, srv := newLocalConnPair(t, &Config{}, &Config{})
   485  
   486  	srvdone := make(chan int)
   487  	go func() {
   488  		defer close(srvdone)
   489  		for streams := 0; ; streams++ {
   490  			s, err := srv.AcceptStream(context.Background())
   491  			if err != nil {
   492  				srvdone <- streams
   493  				return
   494  			}
   495  			s.Close()
   496  		}
   497  	}()
   498  
   499  	var wg sync.WaitGroup
   500  	const concurrency = 10
   501  	const streams = 10
   502  	for i := 0; i < concurrency; i++ {
   503  		wg.Add(1)
   504  		go func() {
   505  			defer wg.Done()
   506  			for j := 0; j < streams; j++ {
   507  				s, err := cli.NewStream(context.Background())
   508  				if err != nil {
   509  					t.Errorf("NewStream: %v", err)
   510  					return
   511  				}
   512  				s.Flush()
   513  				_, err = io.ReadAll(s)
   514  				if err != nil {
   515  					t.Errorf("ReadFull: %v", err)
   516  				}
   517  				s.Close()
   518  			}
   519  		}()
   520  	}
   521  	wg.Wait()
   522  
   523  	cli.Abort(nil)
   524  	srv.Abort(nil)
   525  	if got, want := <-srvdone, concurrency*streams; got != want {
   526  		t.Errorf("accepted %v streams, want %v", got, want)
   527  	}
   528  }
   529  
   530  func TestStreamsPTOWithImplicitStream(t *testing.T) {
   531  	ctx := canceledContext()
   532  	tc := newTestConn(t, serverSide, permissiveTransportParameters)
   533  	tc.handshake()
   534  	tc.ignoreFrame(frameTypeAck)
   535  
   536  	// Peer creates stream 1, and implicitly creates stream 0.
   537  	tc.writeFrames(packetType1RTT, debugFrameStream{
   538  		id: newStreamID(clientSide, bidiStream, 1),
   539  	})
   540  
   541  	// We accept stream 1 and write data to it.
   542  	data := []byte("data")
   543  	s, err := tc.conn.AcceptStream(ctx)
   544  	if err != nil {
   545  		t.Fatalf("conn.AcceptStream() = %v, want stream", err)
   546  	}
   547  	s.Write(data)
   548  	s.Flush()
   549  	tc.wantFrame("data written to stream",
   550  		packetType1RTT, debugFrameStream{
   551  			id:   newStreamID(clientSide, bidiStream, 1),
   552  			data: data,
   553  		})
   554  
   555  	// PTO expires, and the data is resent.
   556  	const pto = true
   557  	tc.triggerLossOrPTO(packetType1RTT, true)
   558  	tc.wantFrame("data resent after PTO expires",
   559  		packetType1RTT, debugFrameStream{
   560  			id:   newStreamID(clientSide, bidiStream, 1),
   561  			data: data,
   562  		})
   563  }
   564  

View as plain text