...

Source file src/golang.org/x/net/internal/quic/conn_async_test.go

Documentation: golang.org/x/net/internal/quic

     1  // Copyright 2023 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  //go:build go1.21
     6  
     7  package quic
     8  
     9  import (
    10  	"context"
    11  	"errors"
    12  	"fmt"
    13  	"path/filepath"
    14  	"runtime"
    15  	"sync"
    16  )
    17  
    18  // asyncTestState permits handling asynchronous operations in a synchronous test.
    19  //
    20  // For example, a test may want to write to a stream and observe that
    21  // STREAM frames are sent with the contents of the write in response
    22  // to MAX_STREAM_DATA frames received from the peer.
    23  // The Stream.Write is an asynchronous operation, but the test is simpler
    24  // if we can start the write, observe the first STREAM frame sent,
    25  // send a MAX_STREAM_DATA frame, observe the next STREAM frame sent, etc.
    26  //
    27  // We do this by instrumenting points where operations can block.
    28  // We start async operations like Write in a goroutine,
    29  // and wait for the operation to either finish or hit a blocking point.
    30  // When the connection event loop is idle, we check a list of
    31  // blocked operations to see if any can be woken.
    32  type asyncTestState struct {
    33  	mu      sync.Mutex
    34  	notify  chan struct{}
    35  	blocked map[*blockedAsync]struct{}
    36  }
    37  
    38  // An asyncOp is an asynchronous operation that results in (T, error).
    39  type asyncOp[T any] struct {
    40  	v   T
    41  	err error
    42  
    43  	caller     string
    44  	tc         *testConn
    45  	donec      chan struct{}
    46  	cancelFunc context.CancelFunc
    47  }
    48  
    49  // cancel cancels the async operation's context, and waits for
    50  // the operation to complete.
    51  func (a *asyncOp[T]) cancel() {
    52  	select {
    53  	case <-a.donec:
    54  		return // already done
    55  	default:
    56  	}
    57  	a.cancelFunc()
    58  	<-a.tc.asyncTestState.notify
    59  	select {
    60  	case <-a.donec:
    61  	default:
    62  		panic(fmt.Errorf("%v: async op failed to finish after being canceled", a.caller))
    63  	}
    64  }
    65  
    66  var errNotDone = errors.New("async op is not done")
    67  
    68  // result returns the result of the async operation.
    69  // It returns errNotDone if the operation is still in progress.
    70  //
    71  // Note that unlike a traditional async/await, this doesn't block
    72  // waiting for the operation to complete. Since tests have full
    73  // control over the progress of operations, an asyncOp can only
    74  // become done in reaction to the test taking some action.
    75  func (a *asyncOp[T]) result() (v T, err error) {
    76  	a.tc.wait()
    77  	select {
    78  	case <-a.donec:
    79  		return a.v, a.err
    80  	default:
    81  		return v, errNotDone
    82  	}
    83  }
    84  
    85  // A blockedAsync is a blocked async operation.
    86  type blockedAsync struct {
    87  	until func() bool   // when this returns true, the operation is unblocked
    88  	donec chan struct{} // closed when the operation is unblocked
    89  }
    90  
    91  type asyncContextKey struct{}
    92  
    93  // runAsync starts an asynchronous operation.
    94  //
    95  // The function f should call a blocking function such as
    96  // Stream.Write or Conn.AcceptStream and return its result.
    97  // It must use the provided context.
    98  func runAsync[T any](tc *testConn, f func(context.Context) (T, error)) *asyncOp[T] {
    99  	as := &tc.asyncTestState
   100  	if as.notify == nil {
   101  		as.notify = make(chan struct{})
   102  		as.mu.Lock()
   103  		as.blocked = make(map[*blockedAsync]struct{})
   104  		as.mu.Unlock()
   105  	}
   106  	_, file, line, _ := runtime.Caller(1)
   107  	ctx := context.WithValue(context.Background(), asyncContextKey{}, true)
   108  	ctx, cancel := context.WithCancel(ctx)
   109  	a := &asyncOp[T]{
   110  		tc:         tc,
   111  		caller:     fmt.Sprintf("%v:%v", filepath.Base(file), line),
   112  		donec:      make(chan struct{}),
   113  		cancelFunc: cancel,
   114  	}
   115  	go func() {
   116  		a.v, a.err = f(ctx)
   117  		close(a.donec)
   118  		as.notify <- struct{}{}
   119  	}()
   120  	tc.t.Cleanup(func() {
   121  		if _, err := a.result(); err == errNotDone {
   122  			tc.t.Errorf("%v: async operation is still executing at end of test", a.caller)
   123  			a.cancel()
   124  		}
   125  	})
   126  	// Wait for the operation to either finish or block.
   127  	<-as.notify
   128  	tc.wait()
   129  	return a
   130  }
   131  
   132  // waitUntil waits for a blocked async operation to complete.
   133  // The operation is complete when the until func returns true.
   134  func (as *asyncTestState) waitUntil(ctx context.Context, until func() bool) error {
   135  	if until() {
   136  		return nil
   137  	}
   138  	if err := ctx.Err(); err != nil {
   139  		// Context has already expired.
   140  		return err
   141  	}
   142  	if ctx.Value(asyncContextKey{}) == nil {
   143  		// Context is not one that we've created, and hasn't expired.
   144  		// This probably indicates that we've tried to perform a
   145  		// blocking operation without using the async test harness here,
   146  		// which may have unpredictable results.
   147  		panic("blocking async point with unexpected Context")
   148  	}
   149  	b := &blockedAsync{
   150  		until: until,
   151  		donec: make(chan struct{}),
   152  	}
   153  	// Record this as a pending blocking operation.
   154  	as.mu.Lock()
   155  	as.blocked[b] = struct{}{}
   156  	as.mu.Unlock()
   157  	// Notify the creator of the operation that we're blocked,
   158  	// and wait to be woken up.
   159  	as.notify <- struct{}{}
   160  	select {
   161  	case <-b.donec:
   162  	case <-ctx.Done():
   163  		return ctx.Err()
   164  	}
   165  	return nil
   166  }
   167  
   168  // wakeAsync tries to wake up a blocked async operation.
   169  // It returns true if one was woken, false otherwise.
   170  func (as *asyncTestState) wakeAsync() bool {
   171  	as.mu.Lock()
   172  	var woken *blockedAsync
   173  	for w := range as.blocked {
   174  		if w.until() {
   175  			woken = w
   176  			delete(as.blocked, w)
   177  			break
   178  		}
   179  	}
   180  	as.mu.Unlock()
   181  	if woken == nil {
   182  		return false
   183  	}
   184  	close(woken.donec)
   185  	<-as.notify // must not hold as.mu while blocked here
   186  	return true
   187  }
   188  

View as plain text