1
2
3
4
5
6
7 package quic
8
9 import (
10 "context"
11 "crypto/tls"
12 "errors"
13 "fmt"
14 "log/slog"
15 "net/netip"
16 "time"
17 )
18
19
20
21
22 type Conn struct {
23 side connSide
24 endpoint *Endpoint
25 config *Config
26 testHooks connTestHooks
27 peerAddr netip.AddrPort
28
29 msgc chan any
30 donec chan struct{}
31
32 w packetWriter
33 acks [numberSpaceCount]ackState
34 lifetime lifetimeState
35 idle idleState
36 connIDState connIDState
37 loss lossState
38 streams streamsState
39
40
41 keysInitial fixedKeyPair
42 keysHandshake fixedKeyPair
43 keysAppData updatingKeyPair
44 crypto [numberSpaceCount]cryptoStream
45 tls *tls.QUICConn
46
47
48 retryToken []byte
49
50
51
52 handshakeConfirmed sentVal
53
54 peerAckDelayExponent int8
55
56
57 testSendPingSpace numberSpace
58 testSendPing sentVal
59
60 log *slog.Logger
61 }
62
63
64 type connTestHooks interface {
65
66 init()
67
68
69
70 nextMessage(msgc chan any, nextTimeout time.Time) (now time.Time, message any)
71
72
73 handleTLSEvent(tls.QUICEvent)
74
75
76
77 newConnID(seq int64) ([]byte, error)
78
79
80
81 waitUntil(ctx context.Context, until func() bool) error
82
83
84 timeNow() time.Time
85 }
86
87
88 type newServerConnIDs struct {
89 srcConnID []byte
90 dstConnID []byte
91 originalDstConnID []byte
92 retrySrcConnID []byte
93 }
94
95 func newConn(now time.Time, side connSide, cids newServerConnIDs, peerAddr netip.AddrPort, config *Config, e *Endpoint) (conn *Conn, _ error) {
96 c := &Conn{
97 side: side,
98 endpoint: e,
99 config: config,
100 peerAddr: peerAddr,
101 msgc: make(chan any, 1),
102 donec: make(chan struct{}),
103 peerAckDelayExponent: -1,
104 }
105 defer func() {
106
107
108
109 if conn == nil {
110 close(c.donec)
111 }
112 }()
113
114
115
116 c.msgc = make(chan any, 1)
117
118 if e.testHooks != nil {
119 e.testHooks.newConn(c)
120 }
121
122
123 var initialConnID []byte
124 if c.side == clientSide {
125 if err := c.connIDState.initClient(c); err != nil {
126 return nil, err
127 }
128 initialConnID, _ = c.connIDState.dstConnID()
129 } else {
130 initialConnID = cids.originalDstConnID
131 if cids.retrySrcConnID != nil {
132 initialConnID = cids.retrySrcConnID
133 }
134 if err := c.connIDState.initServer(c, cids); err != nil {
135 return nil, err
136 }
137 }
138
139
140 c.logConnectionStarted(cids.originalDstConnID, peerAddr)
141 c.keysAppData.init()
142 c.loss.init(c.side, smallestMaxDatagramSize, now)
143 c.streamsInit()
144 c.lifetimeInit()
145 c.restartIdleTimer(now)
146
147 if err := c.startTLS(now, initialConnID, transportParameters{
148 initialSrcConnID: c.connIDState.srcConnID(),
149 originalDstConnID: cids.originalDstConnID,
150 retrySrcConnID: cids.retrySrcConnID,
151 ackDelayExponent: ackDelayExponent,
152 maxUDPPayloadSize: maxUDPPayloadSize,
153 maxAckDelay: maxAckDelay,
154 disableActiveMigration: true,
155 initialMaxData: config.maxConnReadBufferSize(),
156 initialMaxStreamDataBidiLocal: config.maxStreamReadBufferSize(),
157 initialMaxStreamDataBidiRemote: config.maxStreamReadBufferSize(),
158 initialMaxStreamDataUni: config.maxStreamReadBufferSize(),
159 initialMaxStreamsBidi: c.streams.remoteLimit[bidiStream].max,
160 initialMaxStreamsUni: c.streams.remoteLimit[uniStream].max,
161 activeConnIDLimit: activeConnIDLimit,
162 }); err != nil {
163 return nil, err
164 }
165
166 if c.testHooks != nil {
167 c.testHooks.init()
168 }
169 go c.loop(now)
170 return c, nil
171 }
172
173 func (c *Conn) String() string {
174 return fmt.Sprintf("quic.Conn(%v,->%v)", c.side, c.peerAddr)
175 }
176
177
178
179 func (c *Conn) confirmHandshake(now time.Time) {
180
181
182
183
184 if c.handshakeConfirmed.isSet() {
185 return
186 }
187 if c.side == serverSide {
188
189 c.handshakeConfirmed.setUnsent()
190 c.endpoint.serverConnEstablished(c)
191 } else {
192
193
194
195 c.handshakeConfirmed.setReceived()
196 }
197 c.restartIdleTimer(now)
198 c.loss.confirmHandshake()
199
200
201 c.discardKeys(now, handshakeSpace)
202 }
203
204
205
206 func (c *Conn) discardKeys(now time.Time, space numberSpace) {
207 switch space {
208 case initialSpace:
209 c.keysInitial.discard()
210 case handshakeSpace:
211 c.keysHandshake.discard()
212 }
213 c.loss.discardKeys(now, space)
214 }
215
216
217 func (c *Conn) receiveTransportParameters(p transportParameters) error {
218 isRetry := c.retryToken != nil
219 if err := c.connIDState.validateTransportParameters(c, isRetry, p); err != nil {
220 return err
221 }
222 c.streams.outflow.setMaxData(p.initialMaxData)
223 c.streams.localLimit[bidiStream].setMax(p.initialMaxStreamsBidi)
224 c.streams.localLimit[uniStream].setMax(p.initialMaxStreamsUni)
225 c.streams.peerInitialMaxStreamDataBidiLocal = p.initialMaxStreamDataBidiLocal
226 c.streams.peerInitialMaxStreamDataRemote[bidiStream] = p.initialMaxStreamDataBidiRemote
227 c.streams.peerInitialMaxStreamDataRemote[uniStream] = p.initialMaxStreamDataUni
228 c.receivePeerMaxIdleTimeout(p.maxIdleTimeout)
229 c.peerAckDelayExponent = p.ackDelayExponent
230 c.loss.setMaxAckDelay(p.maxAckDelay)
231 if err := c.connIDState.setPeerActiveConnIDLimit(c, p.activeConnIDLimit); err != nil {
232 return err
233 }
234 if p.preferredAddrConnID != nil {
235 var (
236 seq int64 = 1
237 retirePriorTo int64 = 0
238 resetToken [16]byte
239 )
240 copy(resetToken[:], p.preferredAddrResetToken)
241 if err := c.connIDState.handleNewConnID(c, seq, retirePriorTo, p.preferredAddrConnID, resetToken); err != nil {
242 return err
243 }
244 }
245
246
247
248
249 return nil
250 }
251
252 type (
253 timerEvent struct{}
254 wakeEvent struct{}
255 )
256
257 var errIdleTimeout = errors.New("idle timeout")
258
259
260
261
262
263
264
265 func (c *Conn) loop(now time.Time) {
266 defer c.cleanup()
267
268
269
270
271
272
273 var timer *time.Timer
274 var lastTimeout time.Time
275 hooks := c.testHooks
276 if hooks == nil {
277 timer = time.AfterFunc(1*time.Hour, func() {
278 c.sendMsg(timerEvent{})
279 })
280 defer timer.Stop()
281 }
282
283 for c.lifetime.state != connStateDone {
284 sendTimeout := c.maybeSend(now)
285
286
287
288 nextTimeout := sendTimeout
289 nextTimeout = firstTime(nextTimeout, c.idle.nextTimeout)
290 if c.isAlive() {
291 nextTimeout = firstTime(nextTimeout, c.loss.timer)
292 nextTimeout = firstTime(nextTimeout, c.acks[appDataSpace].nextAck)
293 } else {
294 nextTimeout = firstTime(nextTimeout, c.lifetime.drainEndTime)
295 }
296
297 var m any
298 if hooks != nil {
299
300 now, m = hooks.nextMessage(c.msgc, nextTimeout)
301 } else if !nextTimeout.IsZero() && nextTimeout.Before(now) {
302
303 now = time.Now()
304 m = timerEvent{}
305 } else {
306
307
308 if !nextTimeout.Equal(lastTimeout) && !nextTimeout.IsZero() {
309
310
311
312 timer.Reset(nextTimeout.Sub(now))
313 lastTimeout = nextTimeout
314 }
315 m = <-c.msgc
316 now = time.Now()
317 }
318 switch m := m.(type) {
319 case *datagram:
320 c.handleDatagram(now, m)
321 m.recycle()
322 case timerEvent:
323
324 if c.idleAdvance(now) {
325
326 c.abortImmediately(now, errIdleTimeout)
327 return
328 }
329 c.loss.advance(now, c.handleAckOrLoss)
330 if c.lifetimeAdvance(now) {
331
332
333 return
334 }
335 case wakeEvent:
336
337 case func(time.Time, *Conn):
338
339 m(now, c)
340 default:
341 panic(fmt.Sprintf("quic: unrecognized conn message %T", m))
342 }
343 }
344 }
345
346 func (c *Conn) cleanup() {
347 c.logConnectionClosed()
348 c.endpoint.connDrained(c)
349 c.tls.Close()
350 close(c.donec)
351 }
352
353
354
355
356 func (c *Conn) sendMsg(m any) {
357 select {
358 case c.msgc <- m:
359 case <-c.donec:
360 }
361 }
362
363
364 func (c *Conn) wake() {
365 select {
366 case c.msgc <- wakeEvent{}:
367 default:
368 }
369 }
370
371
372 func (c *Conn) runOnLoop(ctx context.Context, f func(now time.Time, c *Conn)) error {
373 donec := make(chan struct{})
374 msg := func(now time.Time, c *Conn) {
375 defer close(donec)
376 f(now, c)
377 }
378 if c.testHooks != nil {
379
380
381
382
383
384
385 msgc := c.msgc
386 c.testHooks.waitUntil(ctx, func() bool {
387 for {
388 select {
389 case msgc <- msg:
390 msgc = nil
391 case <-donec:
392 return true
393 case <-c.donec:
394 return true
395 default:
396 return false
397 }
398 }
399 })
400 } else {
401 c.sendMsg(msg)
402 }
403 select {
404 case <-donec:
405 case <-c.donec:
406 return errors.New("quic: connection closed")
407 }
408 return nil
409 }
410
411 func (c *Conn) waitOnDone(ctx context.Context, ch <-chan struct{}) error {
412 if c.testHooks != nil {
413 return c.testHooks.waitUntil(ctx, func() bool {
414 select {
415 case <-ch:
416 return true
417 default:
418 }
419 return false
420 })
421 }
422
423
424
425 select {
426 case <-ch:
427 return nil
428 default:
429 }
430 select {
431 case <-ch:
432 case <-ctx.Done():
433 return ctx.Err()
434 }
435 return nil
436 }
437
438
439 func firstTime(a, b time.Time) time.Time {
440 switch {
441 case a.IsZero():
442 return b
443 case b.IsZero():
444 return a
445 case a.Before(b):
446 return a
447 default:
448 return b
449 }
450 }
451
View as plain text