1
2
3
4
5
6
7 package quic
8
9 import (
10 "context"
11 "fmt"
12 "io"
13 "math"
14 "sync"
15 "testing"
16 )
17
18 func TestStreamsCreate(t *testing.T) {
19 ctx := canceledContext()
20 tc := newTestConn(t, clientSide, permissiveTransportParameters)
21 tc.handshake()
22
23 s, err := tc.conn.NewStream(ctx)
24 if err != nil {
25 t.Fatalf("NewStream: %v", err)
26 }
27 s.Flush()
28 tc.wantFrame("created bidirectional stream 0",
29 packetType1RTT, debugFrameStream{
30 id: 0,
31 data: []byte{},
32 })
33
34 s, err = tc.conn.NewSendOnlyStream(ctx)
35 if err != nil {
36 t.Fatalf("NewStream: %v", err)
37 }
38 s.Flush()
39 tc.wantFrame("created unidirectional stream 0",
40 packetType1RTT, debugFrameStream{
41 id: 2,
42 data: []byte{},
43 })
44
45 s, err = tc.conn.NewStream(ctx)
46 if err != nil {
47 t.Fatalf("NewStream: %v", err)
48 }
49 s.Flush()
50 tc.wantFrame("created bidirectional stream 1",
51 packetType1RTT, debugFrameStream{
52 id: 4,
53 data: []byte{},
54 })
55 }
56
57 func TestStreamsAccept(t *testing.T) {
58 ctx := canceledContext()
59 tc := newTestConn(t, serverSide)
60 tc.handshake()
61
62 tc.writeFrames(packetType1RTT,
63 debugFrameStream{
64 id: 0,
65 },
66 debugFrameStream{
67 id: 2,
68 },
69 debugFrameStream{
70 id: 4,
71 })
72
73 for _, accept := range []struct {
74 id streamID
75 readOnly bool
76 }{
77 {0, false},
78 {2, true},
79 {4, false},
80 } {
81 s, err := tc.conn.AcceptStream(ctx)
82 if err != nil {
83 t.Fatalf("conn.AcceptStream() = %v, want stream %v", err, accept.id)
84 }
85 if got, want := s.id, accept.id; got != want {
86 t.Fatalf("conn.AcceptStream() = stream %v, want %v", got, want)
87 }
88 if got, want := s.IsReadOnly(), accept.readOnly; got != want {
89 t.Fatalf("stream %v: s.IsReadOnly() = %v, want %v", accept.id, got, want)
90 }
91 }
92
93 _, err := tc.conn.AcceptStream(ctx)
94 if err != context.Canceled {
95 t.Fatalf("conn.AcceptStream() = %v, want context.Canceled", err)
96 }
97 }
98
99 func TestStreamsBlockingAccept(t *testing.T) {
100 tc := newTestConn(t, serverSide)
101 tc.handshake()
102
103 a := runAsync(tc, func(ctx context.Context) (*Stream, error) {
104 return tc.conn.AcceptStream(ctx)
105 })
106 if _, err := a.result(); err != errNotDone {
107 tc.t.Fatalf("AcceptStream() = _, %v; want errNotDone", err)
108 }
109
110 sid := newStreamID(clientSide, bidiStream, 0)
111 tc.writeFrames(packetType1RTT,
112 debugFrameStream{
113 id: sid,
114 })
115
116 s, err := a.result()
117 if err != nil {
118 t.Fatalf("conn.AcceptStream() = _, %v, want stream", err)
119 }
120 if got, want := s.id, sid; got != want {
121 t.Fatalf("conn.AcceptStream() = stream %v, want %v", got, want)
122 }
123 if got, want := s.IsReadOnly(), false; got != want {
124 t.Fatalf("s.IsReadOnly() = %v, want %v", got, want)
125 }
126 }
127
128 func TestStreamsLocalStreamNotCreated(t *testing.T) {
129
130
131
132
133 tc := newTestConn(t, serverSide)
134 tc.handshake()
135
136 tc.writeFrames(packetType1RTT,
137 debugFrameStream{
138 id: 1,
139 })
140 tc.wantFrame("peer sent STREAM frame for an uncreated local stream",
141 packetType1RTT, debugFrameConnectionCloseTransport{
142 code: errStreamState,
143 })
144 }
145
146 func TestStreamsLocalStreamClosed(t *testing.T) {
147 tc, s := newTestConnAndLocalStream(t, clientSide, uniStream, permissiveTransportParameters)
148 s.CloseWrite()
149 tc.wantFrame("FIN for closed stream",
150 packetType1RTT, debugFrameStream{
151 id: newStreamID(clientSide, uniStream, 0),
152 fin: true,
153 data: []byte{},
154 })
155 tc.writeAckForAll()
156
157 tc.writeFrames(packetType1RTT, debugFrameStopSending{
158 id: newStreamID(clientSide, uniStream, 0),
159 })
160 tc.wantIdle("frame for finalized stream is ignored")
161
162
163
164 if got := len(tc.conn.streams.streams); got != 0 {
165 t.Fatalf("after close, len(tc.conn.streams.streams) = %v, want 0", got)
166 }
167 if tc.conn.streams.queueMeta.head != nil {
168 t.Fatalf("after close, stream send queue is not empty; should be")
169 }
170 }
171
172 func TestStreamsStreamSendOnly(t *testing.T) {
173
174
175
176
177 ctx := canceledContext()
178 tc := newTestConn(t, serverSide, permissiveTransportParameters)
179 tc.handshake()
180
181 s, err := tc.conn.NewSendOnlyStream(ctx)
182 if err != nil {
183 t.Fatalf("NewStream: %v", err)
184 }
185 s.Flush()
186 tc.wantFrame("created unidirectional stream 0",
187 packetType1RTT, debugFrameStream{
188 id: 3,
189 data: []byte{},
190 })
191
192 tc.writeFrames(packetType1RTT,
193 debugFrameStream{
194 id: 3,
195 })
196 tc.wantFrame("peer sent STREAM frame for a send-only stream",
197 packetType1RTT, debugFrameConnectionCloseTransport{
198 code: errStreamState,
199 })
200 }
201
202 func TestStreamsWriteQueueFairness(t *testing.T) {
203 ctx := canceledContext()
204 const dataLen = 1 << 20
205 const numStreams = 3
206 tc := newTestConn(t, clientSide, func(p *transportParameters) {
207 p.initialMaxStreamsBidi = numStreams
208 p.initialMaxData = 1<<62 - 1
209 p.initialMaxStreamDataBidiRemote = dataLen
210 }, func(c *Config) {
211 c.MaxStreamWriteBufferSize = dataLen
212 })
213 tc.handshake()
214 tc.ignoreFrame(frameTypeAck)
215
216
217
218
219
220
221
222
223
224
225 data := make([]byte, dataLen)
226 var streams []*Stream
227 for i := 0; i < numStreams; i++ {
228 s, err := tc.conn.NewStream(ctx)
229 if err != nil {
230 t.Fatal(err)
231 }
232 streams = append(streams, s)
233 if n, err := s.WriteContext(ctx, data); n != len(data) || err != nil {
234 t.Fatalf("s.WriteContext() = %v, %v; want %v, nil", n, err, len(data))
235 }
236
237
238 tc.wait()
239 }
240
241 sent := make([]int64, len(streams))
242 for {
243 p := tc.readPacket()
244 if p == nil {
245 break
246 }
247 tc.writeFrames(packetType1RTT, debugFrameAck{
248 ranges: []i64range[packetNumber]{{0, p.num}},
249 })
250 for _, f := range p.frames {
251 sf, ok := f.(debugFrameStream)
252 if !ok {
253 t.Fatalf("got unexpected frame (want STREAM): %v", sf)
254 }
255 if got, want := sf.off, sent[sf.id.num()]; got != want {
256 t.Fatalf("got frame: %v\nwant offset: %v", sf, want)
257 }
258 sent[sf.id.num()] = sf.off + int64(len(sf.data))
259
260
261
262
263
264 minSent := sent[1]
265 maxSent := sent[1]
266 for _, s := range sent[2:] {
267 minSent = min(minSent, s)
268 maxSent = max(maxSent, s)
269 }
270 const maxDelta = maxUDPPayloadSize
271 if d := maxSent - minSent; d > maxDelta {
272 t.Fatalf("stream data sent: %v; delta=%v, want delta <= %v", sent, d, maxDelta)
273 }
274 }
275 }
276
277 for num, s := range sent {
278 if s != dataLen {
279 t.Errorf("stream %v sent %v bytes, want %v", num, s, dataLen)
280 }
281 }
282 }
283
284 func TestStreamsShutdown(t *testing.T) {
285
286
287
288
289
290 for _, test := range []struct {
291 name string
292 side streamSide
293 styp streamType
294 setup func(*testing.T, *testConn, *Stream)
295 shutdown func(*testing.T, *testConn, *Stream)
296 }{{
297 name: "closed",
298 side: localStream,
299 styp: uniStream,
300 setup: func(t *testing.T, tc *testConn, s *Stream) {
301 s.CloseContext(canceledContext())
302 },
303 shutdown: func(t *testing.T, tc *testConn, s *Stream) {
304 tc.writeAckForAll()
305 },
306 }, {
307 name: "local close",
308 side: localStream,
309 styp: bidiStream,
310 setup: func(t *testing.T, tc *testConn, s *Stream) {
311 tc.writeFrames(packetType1RTT, debugFrameResetStream{
312 id: s.id,
313 })
314 s.CloseContext(canceledContext())
315 },
316 shutdown: func(t *testing.T, tc *testConn, s *Stream) {
317 tc.writeAckForAll()
318 },
319 }, {
320 name: "remote reset",
321 side: localStream,
322 styp: bidiStream,
323 setup: func(t *testing.T, tc *testConn, s *Stream) {
324 s.CloseContext(canceledContext())
325 tc.wantIdle("all frames after CloseContext are ignored")
326 tc.writeAckForAll()
327 },
328 shutdown: func(t *testing.T, tc *testConn, s *Stream) {
329 tc.writeFrames(packetType1RTT, debugFrameResetStream{
330 id: s.id,
331 })
332 },
333 }, {
334 name: "local close",
335 side: remoteStream,
336 styp: uniStream,
337 setup: func(t *testing.T, tc *testConn, s *Stream) {
338 ctx := canceledContext()
339 tc.writeFrames(packetType1RTT, debugFrameStream{
340 id: s.id,
341 fin: true,
342 })
343 if n, err := s.ReadContext(ctx, make([]byte, 16)); n != 0 || err != io.EOF {
344 t.Errorf("ReadContext() = %v, %v; want 0, io.EOF", n, err)
345 }
346 },
347 shutdown: func(t *testing.T, tc *testConn, s *Stream) {
348 s.CloseRead()
349 },
350 }} {
351 name := fmt.Sprintf("%v/%v/%v", test.side, test.styp, test.name)
352 t.Run(name, func(t *testing.T) {
353 tc, s := newTestConnAndStream(t, serverSide, test.side, test.styp,
354 permissiveTransportParameters)
355 tc.ignoreFrame(frameTypeStreamBase)
356 tc.ignoreFrame(frameTypeStopSending)
357 test.setup(t, tc, s)
358 tc.wantIdle("conn should be idle after setup")
359 if got, want := len(tc.conn.streams.streams), 1; got != want {
360 t.Fatalf("after setup: %v streams in Conn's map; want %v", got, want)
361 }
362 test.shutdown(t, tc, s)
363 tc.wantIdle("conn should be idle after shutdown")
364 if got, want := len(tc.conn.streams.streams), 0; got != want {
365 t.Fatalf("after shutdown: %v streams in Conn's map; want %v", got, want)
366 }
367 })
368 }
369 }
370
371 func TestStreamsCreateAndCloseRemote(t *testing.T) {
372
373
374
375
376
377
378
379
380 defer func(vv bool) {
381 *testVV = vv
382 }(*testVV)
383 *testVV = false
384 ctx := canceledContext()
385 tc := newTestConn(t, serverSide, permissiveTransportParameters)
386 tc.handshake()
387 tc.ignoreFrame(frameTypeAck)
388 type op struct {
389 id streamID
390 }
391 type streamOp op
392 type resetOp op
393 type acceptOp op
394 const noStream = math.MaxInt64
395 stringID := func(id streamID) string {
396 return fmt.Sprintf("%v/%v", id.streamType(), id.num())
397 }
398 for _, op := range []any{
399 "opening bidi/5 implicitly opens bidi/0-4",
400 streamOp{newStreamID(clientSide, bidiStream, 5)},
401 acceptOp{newStreamID(clientSide, bidiStream, 5)},
402 "bidi/3 was implicitly opened",
403 streamOp{newStreamID(clientSide, bidiStream, 3)},
404 acceptOp{newStreamID(clientSide, bidiStream, 3)},
405 resetOp{newStreamID(clientSide, bidiStream, 3)},
406 "bidi/3 is done, frames for it are discarded",
407 streamOp{newStreamID(clientSide, bidiStream, 3)},
408 "open and close some uni streams as well",
409 streamOp{newStreamID(clientSide, uniStream, 0)},
410 acceptOp{newStreamID(clientSide, uniStream, 0)},
411 streamOp{newStreamID(clientSide, uniStream, 1)},
412 acceptOp{newStreamID(clientSide, uniStream, 1)},
413 streamOp{newStreamID(clientSide, uniStream, 2)},
414 acceptOp{newStreamID(clientSide, uniStream, 2)},
415 resetOp{newStreamID(clientSide, uniStream, 1)},
416 resetOp{newStreamID(clientSide, uniStream, 0)},
417 resetOp{newStreamID(clientSide, uniStream, 2)},
418 "closing an implicitly opened stream causes us to accept it",
419 resetOp{newStreamID(clientSide, bidiStream, 0)},
420 acceptOp{newStreamID(clientSide, bidiStream, 0)},
421 resetOp{newStreamID(clientSide, bidiStream, 1)},
422 acceptOp{newStreamID(clientSide, bidiStream, 1)},
423 resetOp{newStreamID(clientSide, bidiStream, 2)},
424 acceptOp{newStreamID(clientSide, bidiStream, 2)},
425 "stream bidi/3 was reset previously",
426 resetOp{newStreamID(clientSide, bidiStream, 3)},
427 resetOp{newStreamID(clientSide, bidiStream, 4)},
428 acceptOp{newStreamID(clientSide, bidiStream, 4)},
429 "stream bidi/5 was reset previously",
430 resetOp{newStreamID(clientSide, bidiStream, 5)},
431 "stream bidi/6 was not implicitly opened",
432 resetOp{newStreamID(clientSide, bidiStream, 6)},
433 acceptOp{newStreamID(clientSide, bidiStream, 6)},
434 } {
435 if _, ok := op.(acceptOp); !ok {
436 if s, err := tc.conn.AcceptStream(ctx); err == nil {
437 t.Fatalf("accepted stream %v, want none", stringID(s.id))
438 }
439 }
440 switch op := op.(type) {
441 case string:
442 t.Log("# " + op)
443 case streamOp:
444 t.Logf("open stream %v", stringID(op.id))
445 tc.writeFrames(packetType1RTT, debugFrameStream{
446 id: streamID(op.id),
447 })
448 case resetOp:
449 t.Logf("reset stream %v", stringID(op.id))
450 tc.writeFrames(packetType1RTT, debugFrameResetStream{
451 id: op.id,
452 })
453 case acceptOp:
454 s, err := tc.conn.AcceptStream(ctx)
455 if err != nil {
456 t.Fatalf("AcceptStream() = %q; want stream %v", err, stringID(op.id))
457 }
458 if s.id != op.id {
459 t.Fatalf("accepted stram %v; want stream %v", err, stringID(op.id))
460 }
461 t.Logf("accepted stream %v", stringID(op.id))
462
463
464 s.CloseContext(ctx)
465 }
466 p := tc.readPacket()
467 if p != nil {
468 tc.writeFrames(p.ptype, debugFrameAck{
469 ranges: []i64range[packetNumber]{{0, p.num + 1}},
470 })
471 }
472 }
473
474
475 if got := len(tc.conn.streams.streams); got != 0 {
476 t.Fatalf("after test, len(tc.conn.streams.streams) = %v, want 0", got)
477 }
478 if tc.conn.streams.queueMeta.head != nil {
479 t.Fatalf("after test, stream send queue is not empty; should be")
480 }
481 }
482
483 func TestStreamsCreateConcurrency(t *testing.T) {
484 cli, srv := newLocalConnPair(t, &Config{}, &Config{})
485
486 srvdone := make(chan int)
487 go func() {
488 defer close(srvdone)
489 for streams := 0; ; streams++ {
490 s, err := srv.AcceptStream(context.Background())
491 if err != nil {
492 srvdone <- streams
493 return
494 }
495 s.Close()
496 }
497 }()
498
499 var wg sync.WaitGroup
500 const concurrency = 10
501 const streams = 10
502 for i := 0; i < concurrency; i++ {
503 wg.Add(1)
504 go func() {
505 defer wg.Done()
506 for j := 0; j < streams; j++ {
507 s, err := cli.NewStream(context.Background())
508 if err != nil {
509 t.Errorf("NewStream: %v", err)
510 return
511 }
512 s.Flush()
513 _, err = io.ReadAll(s)
514 if err != nil {
515 t.Errorf("ReadFull: %v", err)
516 }
517 s.Close()
518 }
519 }()
520 }
521 wg.Wait()
522
523 cli.Abort(nil)
524 srv.Abort(nil)
525 if got, want := <-srvdone, concurrency*streams; got != want {
526 t.Errorf("accepted %v streams, want %v", got, want)
527 }
528 }
529
530 func TestStreamsPTOWithImplicitStream(t *testing.T) {
531 ctx := canceledContext()
532 tc := newTestConn(t, serverSide, permissiveTransportParameters)
533 tc.handshake()
534 tc.ignoreFrame(frameTypeAck)
535
536
537 tc.writeFrames(packetType1RTT, debugFrameStream{
538 id: newStreamID(clientSide, bidiStream, 1),
539 })
540
541
542 data := []byte("data")
543 s, err := tc.conn.AcceptStream(ctx)
544 if err != nil {
545 t.Fatalf("conn.AcceptStream() = %v, want stream", err)
546 }
547 s.Write(data)
548 s.Flush()
549 tc.wantFrame("data written to stream",
550 packetType1RTT, debugFrameStream{
551 id: newStreamID(clientSide, bidiStream, 1),
552 data: data,
553 })
554
555
556 const pto = true
557 tc.triggerLossOrPTO(packetType1RTT, true)
558 tc.wantFrame("data resent after PTO expires",
559 packetType1RTT, debugFrameStream{
560 id: newStreamID(clientSide, bidiStream, 1),
561 data: data,
562 })
563 }
564
View as plain text