...
1
2
3
4
5
6
7 package quic
8
9 import (
10 "context"
11 "errors"
12 "fmt"
13 "path/filepath"
14 "runtime"
15 "sync"
16 )
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 type asyncTestState struct {
33 mu sync.Mutex
34 notify chan struct{}
35 blocked map[*blockedAsync]struct{}
36 }
37
38
39 type asyncOp[T any] struct {
40 v T
41 err error
42
43 caller string
44 tc *testConn
45 donec chan struct{}
46 cancelFunc context.CancelFunc
47 }
48
49
50
51 func (a *asyncOp[T]) cancel() {
52 select {
53 case <-a.donec:
54 return
55 default:
56 }
57 a.cancelFunc()
58 <-a.tc.asyncTestState.notify
59 select {
60 case <-a.donec:
61 default:
62 panic(fmt.Errorf("%v: async op failed to finish after being canceled", a.caller))
63 }
64 }
65
66 var errNotDone = errors.New("async op is not done")
67
68
69
70
71
72
73
74
75 func (a *asyncOp[T]) result() (v T, err error) {
76 a.tc.wait()
77 select {
78 case <-a.donec:
79 return a.v, a.err
80 default:
81 return v, errNotDone
82 }
83 }
84
85
86 type blockedAsync struct {
87 until func() bool
88 donec chan struct{}
89 }
90
91 type asyncContextKey struct{}
92
93
94
95
96
97
98 func runAsync[T any](tc *testConn, f func(context.Context) (T, error)) *asyncOp[T] {
99 as := &tc.asyncTestState
100 if as.notify == nil {
101 as.notify = make(chan struct{})
102 as.mu.Lock()
103 as.blocked = make(map[*blockedAsync]struct{})
104 as.mu.Unlock()
105 }
106 _, file, line, _ := runtime.Caller(1)
107 ctx := context.WithValue(context.Background(), asyncContextKey{}, true)
108 ctx, cancel := context.WithCancel(ctx)
109 a := &asyncOp[T]{
110 tc: tc,
111 caller: fmt.Sprintf("%v:%v", filepath.Base(file), line),
112 donec: make(chan struct{}),
113 cancelFunc: cancel,
114 }
115 go func() {
116 a.v, a.err = f(ctx)
117 close(a.donec)
118 as.notify <- struct{}{}
119 }()
120 tc.t.Cleanup(func() {
121 if _, err := a.result(); err == errNotDone {
122 tc.t.Errorf("%v: async operation is still executing at end of test", a.caller)
123 a.cancel()
124 }
125 })
126
127 <-as.notify
128 tc.wait()
129 return a
130 }
131
132
133
134 func (as *asyncTestState) waitUntil(ctx context.Context, until func() bool) error {
135 if until() {
136 return nil
137 }
138 if err := ctx.Err(); err != nil {
139
140 return err
141 }
142 if ctx.Value(asyncContextKey{}) == nil {
143
144
145
146
147 panic("blocking async point with unexpected Context")
148 }
149 b := &blockedAsync{
150 until: until,
151 donec: make(chan struct{}),
152 }
153
154 as.mu.Lock()
155 as.blocked[b] = struct{}{}
156 as.mu.Unlock()
157
158
159 as.notify <- struct{}{}
160 select {
161 case <-b.donec:
162 case <-ctx.Done():
163 return ctx.Err()
164 }
165 return nil
166 }
167
168
169
170 func (as *asyncTestState) wakeAsync() bool {
171 as.mu.Lock()
172 var woken *blockedAsync
173 for w := range as.blocked {
174 if w.until() {
175 woken = w
176 delete(as.blocked, w)
177 break
178 }
179 }
180 as.mu.Unlock()
181 if woken == nil {
182 return false
183 }
184 close(woken.donec)
185 <-as.notify
186 return true
187 }
188
View as plain text