1
2
3
4
5
6
7 package http2
8
9 import (
10 "context"
11 "crypto/tls"
12 "errors"
13 "net/http"
14 "sync"
15 )
16
17
18 type ClientConnPool interface {
19
20
21
22
23
24
25 GetClientConn(req *http.Request, addr string) (*ClientConn, error)
26 MarkDead(*ClientConn)
27 }
28
29
30
31 type clientConnPoolIdleCloser interface {
32 ClientConnPool
33 closeIdleConnections()
34 }
35
36 var (
37 _ clientConnPoolIdleCloser = (*clientConnPool)(nil)
38 _ clientConnPoolIdleCloser = noDialClientConnPool{}
39 )
40
41
42 type clientConnPool struct {
43 t *Transport
44
45 mu sync.Mutex
46
47
48 conns map[string][]*ClientConn
49 dialing map[string]*dialCall
50 keys map[*ClientConn][]string
51 addConnCalls map[string]*addConnCall
52 }
53
54 func (p *clientConnPool) GetClientConn(req *http.Request, addr string) (*ClientConn, error) {
55 return p.getClientConn(req, addr, dialOnMiss)
56 }
57
58 const (
59 dialOnMiss = true
60 noDialOnMiss = false
61 )
62
63 func (p *clientConnPool) getClientConn(req *http.Request, addr string, dialOnMiss bool) (*ClientConn, error) {
64
65 if isConnectionCloseRequest(req) && dialOnMiss {
66
67 traceGetConn(req, addr)
68 const singleUse = true
69 cc, err := p.t.dialClientConn(req.Context(), addr, singleUse)
70 if err != nil {
71 return nil, err
72 }
73 return cc, nil
74 }
75 for {
76 p.mu.Lock()
77 for _, cc := range p.conns[addr] {
78 if cc.ReserveNewRequest() {
79
80
81
82 if !cc.getConnCalled {
83 traceGetConn(req, addr)
84 }
85 cc.getConnCalled = false
86 p.mu.Unlock()
87 return cc, nil
88 }
89 }
90 if !dialOnMiss {
91 p.mu.Unlock()
92 return nil, ErrNoCachedConn
93 }
94 traceGetConn(req, addr)
95 call := p.getStartDialLocked(req.Context(), addr)
96 p.mu.Unlock()
97 <-call.done
98 if shouldRetryDial(call, req) {
99 continue
100 }
101 cc, err := call.res, call.err
102 if err != nil {
103 return nil, err
104 }
105 if cc.ReserveNewRequest() {
106 return cc, nil
107 }
108 }
109 }
110
111
112 type dialCall struct {
113 _ incomparable
114 p *clientConnPool
115
116
117 ctx context.Context
118 done chan struct{}
119 res *ClientConn
120 err error
121 }
122
123
124 func (p *clientConnPool) getStartDialLocked(ctx context.Context, addr string) *dialCall {
125 if call, ok := p.dialing[addr]; ok {
126
127 return call
128 }
129 call := &dialCall{p: p, done: make(chan struct{}), ctx: ctx}
130 if p.dialing == nil {
131 p.dialing = make(map[string]*dialCall)
132 }
133 p.dialing[addr] = call
134 go call.dial(call.ctx, addr)
135 return call
136 }
137
138
139 func (c *dialCall) dial(ctx context.Context, addr string) {
140 const singleUse = false
141 c.res, c.err = c.p.t.dialClientConn(ctx, addr, singleUse)
142
143 c.p.mu.Lock()
144 delete(c.p.dialing, addr)
145 if c.err == nil {
146 c.p.addConnLocked(addr, c.res)
147 }
148 c.p.mu.Unlock()
149
150 close(c.done)
151 }
152
153
154
155
156
157
158
159
160
161 func (p *clientConnPool) addConnIfNeeded(key string, t *Transport, c *tls.Conn) (used bool, err error) {
162 p.mu.Lock()
163 for _, cc := range p.conns[key] {
164 if cc.CanTakeNewRequest() {
165 p.mu.Unlock()
166 return false, nil
167 }
168 }
169 call, dup := p.addConnCalls[key]
170 if !dup {
171 if p.addConnCalls == nil {
172 p.addConnCalls = make(map[string]*addConnCall)
173 }
174 call = &addConnCall{
175 p: p,
176 done: make(chan struct{}),
177 }
178 p.addConnCalls[key] = call
179 go call.run(t, key, c)
180 }
181 p.mu.Unlock()
182
183 <-call.done
184 if call.err != nil {
185 return false, call.err
186 }
187 return !dup, nil
188 }
189
190 type addConnCall struct {
191 _ incomparable
192 p *clientConnPool
193 done chan struct{}
194 err error
195 }
196
197 func (c *addConnCall) run(t *Transport, key string, tc *tls.Conn) {
198 cc, err := t.NewClientConn(tc)
199
200 p := c.p
201 p.mu.Lock()
202 if err != nil {
203 c.err = err
204 } else {
205 cc.getConnCalled = true
206 p.addConnLocked(key, cc)
207 }
208 delete(p.addConnCalls, key)
209 p.mu.Unlock()
210 close(c.done)
211 }
212
213
214 func (p *clientConnPool) addConnLocked(key string, cc *ClientConn) {
215 for _, v := range p.conns[key] {
216 if v == cc {
217 return
218 }
219 }
220 if p.conns == nil {
221 p.conns = make(map[string][]*ClientConn)
222 }
223 if p.keys == nil {
224 p.keys = make(map[*ClientConn][]string)
225 }
226 p.conns[key] = append(p.conns[key], cc)
227 p.keys[cc] = append(p.keys[cc], key)
228 }
229
230 func (p *clientConnPool) MarkDead(cc *ClientConn) {
231 p.mu.Lock()
232 defer p.mu.Unlock()
233 for _, key := range p.keys[cc] {
234 vv, ok := p.conns[key]
235 if !ok {
236 continue
237 }
238 newList := filterOutClientConn(vv, cc)
239 if len(newList) > 0 {
240 p.conns[key] = newList
241 } else {
242 delete(p.conns, key)
243 }
244 }
245 delete(p.keys, cc)
246 }
247
248 func (p *clientConnPool) closeIdleConnections() {
249 p.mu.Lock()
250 defer p.mu.Unlock()
251
252
253
254
255
256
257 for _, vv := range p.conns {
258 for _, cc := range vv {
259 cc.closeIfIdle()
260 }
261 }
262 }
263
264 func filterOutClientConn(in []*ClientConn, exclude *ClientConn) []*ClientConn {
265 out := in[:0]
266 for _, v := range in {
267 if v != exclude {
268 out = append(out, v)
269 }
270 }
271
272
273 if len(in) != len(out) {
274 in[len(in)-1] = nil
275 }
276 return out
277 }
278
279
280
281
282 type noDialClientConnPool struct{ *clientConnPool }
283
284 func (p noDialClientConnPool) GetClientConn(req *http.Request, addr string) (*ClientConn, error) {
285 return p.getClientConn(req, addr, noDialOnMiss)
286 }
287
288
289
290
291
292 func shouldRetryDial(call *dialCall, req *http.Request) bool {
293 if call.err == nil {
294
295 return false
296 }
297 if call.ctx == req.Context() {
298
299
300
301 return false
302 }
303 if !errors.Is(call.err, context.Canceled) && !errors.Is(call.err, context.DeadlineExceeded) {
304
305
306 return false
307 }
308
309
310 return call.ctx.Err() != nil
311 }
312
View as plain text