...

Source file src/golang.org/x/net/internal/quic/conn.go

Documentation: golang.org/x/net/internal/quic

     1  // Copyright 2023 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  //go:build go1.21
     6  
     7  package quic
     8  
     9  import (
    10  	"context"
    11  	"crypto/tls"
    12  	"errors"
    13  	"fmt"
    14  	"log/slog"
    15  	"net/netip"
    16  	"time"
    17  )
    18  
    19  // A Conn is a QUIC connection.
    20  //
    21  // Multiple goroutines may invoke methods on a Conn simultaneously.
    22  type Conn struct {
    23  	side      connSide
    24  	endpoint  *Endpoint
    25  	config    *Config
    26  	testHooks connTestHooks
    27  	peerAddr  netip.AddrPort
    28  
    29  	msgc  chan any
    30  	donec chan struct{} // closed when conn loop exits
    31  
    32  	w           packetWriter
    33  	acks        [numberSpaceCount]ackState // indexed by number space
    34  	lifetime    lifetimeState
    35  	idle        idleState
    36  	connIDState connIDState
    37  	loss        lossState
    38  	streams     streamsState
    39  
    40  	// Packet protection keys, CRYPTO streams, and TLS state.
    41  	keysInitial   fixedKeyPair
    42  	keysHandshake fixedKeyPair
    43  	keysAppData   updatingKeyPair
    44  	crypto        [numberSpaceCount]cryptoStream
    45  	tls           *tls.QUICConn
    46  
    47  	// retryToken is the token provided by the peer in a Retry packet.
    48  	retryToken []byte
    49  
    50  	// handshakeConfirmed is set when the handshake is confirmed.
    51  	// For server connections, it tracks sending HANDSHAKE_DONE.
    52  	handshakeConfirmed sentVal
    53  
    54  	peerAckDelayExponent int8 // -1 when unknown
    55  
    56  	// Tests only: Send a PING in a specific number space.
    57  	testSendPingSpace numberSpace
    58  	testSendPing      sentVal
    59  
    60  	log *slog.Logger
    61  }
    62  
    63  // connTestHooks override conn behavior in tests.
    64  type connTestHooks interface {
    65  	// init is called after a conn is created.
    66  	init()
    67  
    68  	// nextMessage is called to request the next event from msgc.
    69  	// Used to give tests control of the connection event loop.
    70  	nextMessage(msgc chan any, nextTimeout time.Time) (now time.Time, message any)
    71  
    72  	// handleTLSEvent is called with each TLS event.
    73  	handleTLSEvent(tls.QUICEvent)
    74  
    75  	// newConnID is called to generate a new connection ID.
    76  	// Permits tests to generate consistent connection IDs rather than random ones.
    77  	newConnID(seq int64) ([]byte, error)
    78  
    79  	// waitUntil blocks until the until func returns true or the context is done.
    80  	// Used to synchronize asynchronous blocking operations in tests.
    81  	waitUntil(ctx context.Context, until func() bool) error
    82  
    83  	// timeNow returns the current time.
    84  	timeNow() time.Time
    85  }
    86  
    87  // newServerConnIDs is connection IDs associated with a new server connection.
    88  type newServerConnIDs struct {
    89  	srcConnID         []byte // source from client's current Initial
    90  	dstConnID         []byte // destination from client's current Initial
    91  	originalDstConnID []byte // destination from client's first Initial
    92  	retrySrcConnID    []byte // source from server's Retry
    93  }
    94  
    95  func newConn(now time.Time, side connSide, cids newServerConnIDs, peerAddr netip.AddrPort, config *Config, e *Endpoint) (conn *Conn, _ error) {
    96  	c := &Conn{
    97  		side:                 side,
    98  		endpoint:             e,
    99  		config:               config,
   100  		peerAddr:             peerAddr,
   101  		msgc:                 make(chan any, 1),
   102  		donec:                make(chan struct{}),
   103  		peerAckDelayExponent: -1,
   104  	}
   105  	defer func() {
   106  		// If we hit an error in newConn, close donec so tests don't get stuck waiting for it.
   107  		// This is only relevant if we've got a bug, but it makes tracking that bug down
   108  		// much easier.
   109  		if conn == nil {
   110  			close(c.donec)
   111  		}
   112  	}()
   113  
   114  	// A one-element buffer allows us to wake a Conn's event loop as a
   115  	// non-blocking operation.
   116  	c.msgc = make(chan any, 1)
   117  
   118  	if e.testHooks != nil {
   119  		e.testHooks.newConn(c)
   120  	}
   121  
   122  	// initialConnID is the connection ID used to generate Initial packet protection keys.
   123  	var initialConnID []byte
   124  	if c.side == clientSide {
   125  		if err := c.connIDState.initClient(c); err != nil {
   126  			return nil, err
   127  		}
   128  		initialConnID, _ = c.connIDState.dstConnID()
   129  	} else {
   130  		initialConnID = cids.originalDstConnID
   131  		if cids.retrySrcConnID != nil {
   132  			initialConnID = cids.retrySrcConnID
   133  		}
   134  		if err := c.connIDState.initServer(c, cids); err != nil {
   135  			return nil, err
   136  		}
   137  	}
   138  
   139  	// TODO: PMTU discovery.
   140  	c.logConnectionStarted(cids.originalDstConnID, peerAddr)
   141  	c.keysAppData.init()
   142  	c.loss.init(c.side, smallestMaxDatagramSize, now)
   143  	c.streamsInit()
   144  	c.lifetimeInit()
   145  	c.restartIdleTimer(now)
   146  
   147  	if err := c.startTLS(now, initialConnID, transportParameters{
   148  		initialSrcConnID:               c.connIDState.srcConnID(),
   149  		originalDstConnID:              cids.originalDstConnID,
   150  		retrySrcConnID:                 cids.retrySrcConnID,
   151  		ackDelayExponent:               ackDelayExponent,
   152  		maxUDPPayloadSize:              maxUDPPayloadSize,
   153  		maxAckDelay:                    maxAckDelay,
   154  		disableActiveMigration:         true,
   155  		initialMaxData:                 config.maxConnReadBufferSize(),
   156  		initialMaxStreamDataBidiLocal:  config.maxStreamReadBufferSize(),
   157  		initialMaxStreamDataBidiRemote: config.maxStreamReadBufferSize(),
   158  		initialMaxStreamDataUni:        config.maxStreamReadBufferSize(),
   159  		initialMaxStreamsBidi:          c.streams.remoteLimit[bidiStream].max,
   160  		initialMaxStreamsUni:           c.streams.remoteLimit[uniStream].max,
   161  		activeConnIDLimit:              activeConnIDLimit,
   162  	}); err != nil {
   163  		return nil, err
   164  	}
   165  
   166  	if c.testHooks != nil {
   167  		c.testHooks.init()
   168  	}
   169  	go c.loop(now)
   170  	return c, nil
   171  }
   172  
   173  func (c *Conn) String() string {
   174  	return fmt.Sprintf("quic.Conn(%v,->%v)", c.side, c.peerAddr)
   175  }
   176  
   177  // confirmHandshake is called when the handshake is confirmed.
   178  // https://www.rfc-editor.org/rfc/rfc9001#section-4.1.2
   179  func (c *Conn) confirmHandshake(now time.Time) {
   180  	// If handshakeConfirmed is unset, the handshake is not confirmed.
   181  	// If it is unsent, the handshake is confirmed and we need to send a HANDSHAKE_DONE.
   182  	// If it is sent, we have sent a HANDSHAKE_DONE.
   183  	// If it is received, the handshake is confirmed and we do not need to send anything.
   184  	if c.handshakeConfirmed.isSet() {
   185  		return // already confirmed
   186  	}
   187  	if c.side == serverSide {
   188  		// When the server confirms the handshake, it sends a HANDSHAKE_DONE.
   189  		c.handshakeConfirmed.setUnsent()
   190  		c.endpoint.serverConnEstablished(c)
   191  	} else {
   192  		// The client never sends a HANDSHAKE_DONE, so we set handshakeConfirmed
   193  		// to the received state, indicating that the handshake is confirmed and we
   194  		// don't need to send anything.
   195  		c.handshakeConfirmed.setReceived()
   196  	}
   197  	c.restartIdleTimer(now)
   198  	c.loss.confirmHandshake()
   199  	// "An endpoint MUST discard its Handshake keys when the TLS handshake is confirmed"
   200  	// https://www.rfc-editor.org/rfc/rfc9001#section-4.9.2-1
   201  	c.discardKeys(now, handshakeSpace)
   202  }
   203  
   204  // discardKeys discards unused packet protection keys.
   205  // https://www.rfc-editor.org/rfc/rfc9001#section-4.9
   206  func (c *Conn) discardKeys(now time.Time, space numberSpace) {
   207  	switch space {
   208  	case initialSpace:
   209  		c.keysInitial.discard()
   210  	case handshakeSpace:
   211  		c.keysHandshake.discard()
   212  	}
   213  	c.loss.discardKeys(now, space)
   214  }
   215  
   216  // receiveTransportParameters applies transport parameters sent by the peer.
   217  func (c *Conn) receiveTransportParameters(p transportParameters) error {
   218  	isRetry := c.retryToken != nil
   219  	if err := c.connIDState.validateTransportParameters(c, isRetry, p); err != nil {
   220  		return err
   221  	}
   222  	c.streams.outflow.setMaxData(p.initialMaxData)
   223  	c.streams.localLimit[bidiStream].setMax(p.initialMaxStreamsBidi)
   224  	c.streams.localLimit[uniStream].setMax(p.initialMaxStreamsUni)
   225  	c.streams.peerInitialMaxStreamDataBidiLocal = p.initialMaxStreamDataBidiLocal
   226  	c.streams.peerInitialMaxStreamDataRemote[bidiStream] = p.initialMaxStreamDataBidiRemote
   227  	c.streams.peerInitialMaxStreamDataRemote[uniStream] = p.initialMaxStreamDataUni
   228  	c.receivePeerMaxIdleTimeout(p.maxIdleTimeout)
   229  	c.peerAckDelayExponent = p.ackDelayExponent
   230  	c.loss.setMaxAckDelay(p.maxAckDelay)
   231  	if err := c.connIDState.setPeerActiveConnIDLimit(c, p.activeConnIDLimit); err != nil {
   232  		return err
   233  	}
   234  	if p.preferredAddrConnID != nil {
   235  		var (
   236  			seq           int64 = 1 // sequence number of this conn id is 1
   237  			retirePriorTo int64 = 0 // retire nothing
   238  			resetToken    [16]byte
   239  		)
   240  		copy(resetToken[:], p.preferredAddrResetToken)
   241  		if err := c.connIDState.handleNewConnID(c, seq, retirePriorTo, p.preferredAddrConnID, resetToken); err != nil {
   242  			return err
   243  		}
   244  	}
   245  	// TODO: stateless_reset_token
   246  	// TODO: max_udp_payload_size
   247  	// TODO: disable_active_migration
   248  	// TODO: preferred_address
   249  	return nil
   250  }
   251  
   252  type (
   253  	timerEvent struct{}
   254  	wakeEvent  struct{}
   255  )
   256  
   257  var errIdleTimeout = errors.New("idle timeout")
   258  
   259  // loop is the connection main loop.
   260  //
   261  // Except where otherwise noted, all connection state is owned by the loop goroutine.
   262  //
   263  // The loop processes messages from c.msgc and timer events.
   264  // Other goroutines may examine or modify conn state by sending the loop funcs to execute.
   265  func (c *Conn) loop(now time.Time) {
   266  	defer c.cleanup()
   267  
   268  	// The connection timer sends a message to the connection loop on expiry.
   269  	// We need to give it an expiry when creating it, so set the initial timeout to
   270  	// an arbitrary large value. The timer will be reset before this expires (and it
   271  	// isn't a problem if it does anyway). Skip creating the timer in tests which
   272  	// take control of the connection message loop.
   273  	var timer *time.Timer
   274  	var lastTimeout time.Time
   275  	hooks := c.testHooks
   276  	if hooks == nil {
   277  		timer = time.AfterFunc(1*time.Hour, func() {
   278  			c.sendMsg(timerEvent{})
   279  		})
   280  		defer timer.Stop()
   281  	}
   282  
   283  	for c.lifetime.state != connStateDone {
   284  		sendTimeout := c.maybeSend(now) // try sending
   285  
   286  		// Note that we only need to consider the ack timer for the App Data space,
   287  		// since the Initial and Handshake spaces always ack immediately.
   288  		nextTimeout := sendTimeout
   289  		nextTimeout = firstTime(nextTimeout, c.idle.nextTimeout)
   290  		if c.isAlive() {
   291  			nextTimeout = firstTime(nextTimeout, c.loss.timer)
   292  			nextTimeout = firstTime(nextTimeout, c.acks[appDataSpace].nextAck)
   293  		} else {
   294  			nextTimeout = firstTime(nextTimeout, c.lifetime.drainEndTime)
   295  		}
   296  
   297  		var m any
   298  		if hooks != nil {
   299  			// Tests only: Wait for the test to tell us to continue.
   300  			now, m = hooks.nextMessage(c.msgc, nextTimeout)
   301  		} else if !nextTimeout.IsZero() && nextTimeout.Before(now) {
   302  			// A connection timer has expired.
   303  			now = time.Now()
   304  			m = timerEvent{}
   305  		} else {
   306  			// Reschedule the connection timer if necessary
   307  			// and wait for the next event.
   308  			if !nextTimeout.Equal(lastTimeout) && !nextTimeout.IsZero() {
   309  				// Resetting a timer created with time.AfterFunc guarantees
   310  				// that the timer will run again. We might generate a spurious
   311  				// timer event under some circumstances, but that's okay.
   312  				timer.Reset(nextTimeout.Sub(now))
   313  				lastTimeout = nextTimeout
   314  			}
   315  			m = <-c.msgc
   316  			now = time.Now()
   317  		}
   318  		switch m := m.(type) {
   319  		case *datagram:
   320  			c.handleDatagram(now, m)
   321  			m.recycle()
   322  		case timerEvent:
   323  			// A connection timer has expired.
   324  			if c.idleAdvance(now) {
   325  				// The connection idle timer has expired.
   326  				c.abortImmediately(now, errIdleTimeout)
   327  				return
   328  			}
   329  			c.loss.advance(now, c.handleAckOrLoss)
   330  			if c.lifetimeAdvance(now) {
   331  				// The connection has completed the draining period,
   332  				// and may be shut down.
   333  				return
   334  			}
   335  		case wakeEvent:
   336  			// We're being woken up to try sending some frames.
   337  		case func(time.Time, *Conn):
   338  			// Send a func to msgc to run it on the main Conn goroutine
   339  			m(now, c)
   340  		default:
   341  			panic(fmt.Sprintf("quic: unrecognized conn message %T", m))
   342  		}
   343  	}
   344  }
   345  
   346  func (c *Conn) cleanup() {
   347  	c.logConnectionClosed()
   348  	c.endpoint.connDrained(c)
   349  	c.tls.Close()
   350  	close(c.donec)
   351  }
   352  
   353  // sendMsg sends a message to the conn's loop.
   354  // It does not wait for the message to be processed.
   355  // The conn may close before processing the message, in which case it is lost.
   356  func (c *Conn) sendMsg(m any) {
   357  	select {
   358  	case c.msgc <- m:
   359  	case <-c.donec:
   360  	}
   361  }
   362  
   363  // wake wakes up the conn's loop.
   364  func (c *Conn) wake() {
   365  	select {
   366  	case c.msgc <- wakeEvent{}:
   367  	default:
   368  	}
   369  }
   370  
   371  // runOnLoop executes a function within the conn's loop goroutine.
   372  func (c *Conn) runOnLoop(ctx context.Context, f func(now time.Time, c *Conn)) error {
   373  	donec := make(chan struct{})
   374  	msg := func(now time.Time, c *Conn) {
   375  		defer close(donec)
   376  		f(now, c)
   377  	}
   378  	if c.testHooks != nil {
   379  		// In tests, we can't rely on being able to send a message immediately:
   380  		// c.msgc might be full, and testConnHooks.nextMessage might be waiting
   381  		// for us to block before it processes the next message.
   382  		// To avoid a deadlock, we send the message in waitUntil.
   383  		// If msgc is empty, the message is buffered.
   384  		// If msgc is full, we block and let nextMessage process the queue.
   385  		msgc := c.msgc
   386  		c.testHooks.waitUntil(ctx, func() bool {
   387  			for {
   388  				select {
   389  				case msgc <- msg:
   390  					msgc = nil // send msg only once
   391  				case <-donec:
   392  					return true
   393  				case <-c.donec:
   394  					return true
   395  				default:
   396  					return false
   397  				}
   398  			}
   399  		})
   400  	} else {
   401  		c.sendMsg(msg)
   402  	}
   403  	select {
   404  	case <-donec:
   405  	case <-c.donec:
   406  		return errors.New("quic: connection closed")
   407  	}
   408  	return nil
   409  }
   410  
   411  func (c *Conn) waitOnDone(ctx context.Context, ch <-chan struct{}) error {
   412  	if c.testHooks != nil {
   413  		return c.testHooks.waitUntil(ctx, func() bool {
   414  			select {
   415  			case <-ch:
   416  				return true
   417  			default:
   418  			}
   419  			return false
   420  		})
   421  	}
   422  	// Check the channel before the context.
   423  	// We always prefer to return results when available,
   424  	// even when provided with an already-canceled context.
   425  	select {
   426  	case <-ch:
   427  		return nil
   428  	default:
   429  	}
   430  	select {
   431  	case <-ch:
   432  	case <-ctx.Done():
   433  		return ctx.Err()
   434  	}
   435  	return nil
   436  }
   437  
   438  // firstTime returns the earliest non-zero time, or zero if both times are zero.
   439  func firstTime(a, b time.Time) time.Time {
   440  	switch {
   441  	case a.IsZero():
   442  		return b
   443  	case b.IsZero():
   444  		return a
   445  	case a.Before(b):
   446  		return a
   447  	default:
   448  		return b
   449  	}
   450  }
   451  

View as plain text