1
2
3
4
5
6
7 package http2
8
9 import (
10 "bufio"
11 "bytes"
12 "compress/gzip"
13 "context"
14 "crypto/rand"
15 "crypto/tls"
16 "errors"
17 "fmt"
18 "io"
19 "io/fs"
20 "log"
21 "math"
22 "math/bits"
23 mathrand "math/rand"
24 "net"
25 "net/http"
26 "net/http/httptrace"
27 "net/textproto"
28 "os"
29 "sort"
30 "strconv"
31 "strings"
32 "sync"
33 "sync/atomic"
34 "time"
35
36 "golang.org/x/net/http/httpguts"
37 "golang.org/x/net/http2/hpack"
38 "golang.org/x/net/idna"
39 )
40
41 const (
42
43
44 transportDefaultConnFlow = 1 << 30
45
46
47
48
49 transportDefaultStreamFlow = 4 << 20
50
51 defaultUserAgent = "Go-http-client/2.0"
52
53
54
55
56 initialMaxConcurrentStreams = 100
57
58
59
60 defaultMaxConcurrentStreams = 1000
61 )
62
63
64
65
66
67 type Transport struct {
68
69
70
71
72
73
74
75 DialTLSContext func(ctx context.Context, network, addr string, cfg *tls.Config) (net.Conn, error)
76
77
78
79
80
81
82
83
84
85 DialTLS func(network, addr string, cfg *tls.Config) (net.Conn, error)
86
87
88
89 TLSClientConfig *tls.Config
90
91
92
93 ConnPool ClientConnPool
94
95
96
97
98
99
100
101
102
103 DisableCompression bool
104
105
106
107 AllowHTTP bool
108
109
110
111
112
113
114
115
116 MaxHeaderListSize uint32
117
118
119
120
121
122
123
124
125 MaxReadFrameSize uint32
126
127
128
129
130
131
132 MaxDecoderHeaderTableSize uint32
133
134
135
136
137
138 MaxEncoderHeaderTableSize uint32
139
140
141
142
143
144
145
146
147
148 StrictMaxConcurrentStreams bool
149
150
151
152
153
154
155
156 ReadIdleTimeout time.Duration
157
158
159
160
161 PingTimeout time.Duration
162
163
164
165
166 WriteByteTimeout time.Duration
167
168
169
170
171
172 CountError func(errType string)
173
174
175
176
177 t1 *http.Transport
178
179 connPoolOnce sync.Once
180 connPoolOrDef ClientConnPool
181 }
182
183 func (t *Transport) maxHeaderListSize() uint32 {
184 if t.MaxHeaderListSize == 0 {
185 return 10 << 20
186 }
187 if t.MaxHeaderListSize == 0xffffffff {
188 return 0
189 }
190 return t.MaxHeaderListSize
191 }
192
193 func (t *Transport) maxFrameReadSize() uint32 {
194 if t.MaxReadFrameSize == 0 {
195 return 0
196 }
197 if t.MaxReadFrameSize < minMaxFrameSize {
198 return minMaxFrameSize
199 }
200 if t.MaxReadFrameSize > maxFrameSize {
201 return maxFrameSize
202 }
203 return t.MaxReadFrameSize
204 }
205
206 func (t *Transport) disableCompression() bool {
207 return t.DisableCompression || (t.t1 != nil && t.t1.DisableCompression)
208 }
209
210 func (t *Transport) pingTimeout() time.Duration {
211 if t.PingTimeout == 0 {
212 return 15 * time.Second
213 }
214 return t.PingTimeout
215
216 }
217
218
219
220
221
222 func ConfigureTransport(t1 *http.Transport) error {
223 _, err := ConfigureTransports(t1)
224 return err
225 }
226
227
228
229
230 func ConfigureTransports(t1 *http.Transport) (*Transport, error) {
231 return configureTransports(t1)
232 }
233
234 func configureTransports(t1 *http.Transport) (*Transport, error) {
235 connPool := new(clientConnPool)
236 t2 := &Transport{
237 ConnPool: noDialClientConnPool{connPool},
238 t1: t1,
239 }
240 connPool.t = t2
241 if err := registerHTTPSProtocol(t1, noDialH2RoundTripper{t2}); err != nil {
242 return nil, err
243 }
244 if t1.TLSClientConfig == nil {
245 t1.TLSClientConfig = new(tls.Config)
246 }
247 if !strSliceContains(t1.TLSClientConfig.NextProtos, "h2") {
248 t1.TLSClientConfig.NextProtos = append([]string{"h2"}, t1.TLSClientConfig.NextProtos...)
249 }
250 if !strSliceContains(t1.TLSClientConfig.NextProtos, "http/1.1") {
251 t1.TLSClientConfig.NextProtos = append(t1.TLSClientConfig.NextProtos, "http/1.1")
252 }
253 upgradeFn := func(authority string, c *tls.Conn) http.RoundTripper {
254 addr := authorityAddr("https", authority)
255 if used, err := connPool.addConnIfNeeded(addr, t2, c); err != nil {
256 go c.Close()
257 return erringRoundTripper{err}
258 } else if !used {
259
260
261
262
263 go c.Close()
264 }
265 return t2
266 }
267 if m := t1.TLSNextProto; len(m) == 0 {
268 t1.TLSNextProto = map[string]func(string, *tls.Conn) http.RoundTripper{
269 "h2": upgradeFn,
270 }
271 } else {
272 m["h2"] = upgradeFn
273 }
274 return t2, nil
275 }
276
277 func (t *Transport) connPool() ClientConnPool {
278 t.connPoolOnce.Do(t.initConnPool)
279 return t.connPoolOrDef
280 }
281
282 func (t *Transport) initConnPool() {
283 if t.ConnPool != nil {
284 t.connPoolOrDef = t.ConnPool
285 } else {
286 t.connPoolOrDef = &clientConnPool{t: t}
287 }
288 }
289
290
291
292 type ClientConn struct {
293 t *Transport
294 tconn net.Conn
295 tlsState *tls.ConnectionState
296 reused uint32
297 singleUse bool
298 getConnCalled bool
299
300
301 readerDone chan struct{}
302 readerErr error
303
304 idleTimeout time.Duration
305 idleTimer *time.Timer
306
307 mu sync.Mutex
308 cond *sync.Cond
309 flow outflow
310 inflow inflow
311 doNotReuse bool
312 closing bool
313 closed bool
314 seenSettings bool
315 wantSettingsAck bool
316 goAway *GoAwayFrame
317 goAwayDebug string
318 streams map[uint32]*clientStream
319 streamsReserved int
320 nextStreamID uint32
321 pendingRequests int
322 pings map[[8]byte]chan struct{}
323 br *bufio.Reader
324 lastActive time.Time
325 lastIdle time.Time
326
327 maxFrameSize uint32
328 maxConcurrentStreams uint32
329 peerMaxHeaderListSize uint64
330 peerMaxHeaderTableSize uint32
331 initialWindowSize uint32
332
333
334
335
336 reqHeaderMu chan struct{}
337
338
339
340
341 wmu sync.Mutex
342 bw *bufio.Writer
343 fr *Framer
344 werr error
345 hbuf bytes.Buffer
346 henc *hpack.Encoder
347 }
348
349
350
351 type clientStream struct {
352 cc *ClientConn
353
354
355 ctx context.Context
356 reqCancel <-chan struct{}
357
358 trace *httptrace.ClientTrace
359 ID uint32
360 bufPipe pipe
361 requestedGzip bool
362 isHead bool
363
364 abortOnce sync.Once
365 abort chan struct{}
366 abortErr error
367
368 peerClosed chan struct{}
369 donec chan struct{}
370 on100 chan struct{}
371
372 respHeaderRecv chan struct{}
373 res *http.Response
374
375 flow outflow
376 inflow inflow
377 bytesRemain int64
378 readErr error
379
380 reqBody io.ReadCloser
381 reqBodyContentLength int64
382 reqBodyClosed chan struct{}
383
384
385 sentEndStream bool
386 sentHeaders bool
387
388
389 firstByte bool
390 pastHeaders bool
391 pastTrailers bool
392 num1xx uint8
393 readClosed bool
394 readAborted bool
395
396 trailer http.Header
397 resTrailer *http.Header
398 }
399
400 var got1xxFuncForTests func(int, textproto.MIMEHeader) error
401
402
403
404 func (cs *clientStream) get1xxTraceFunc() func(int, textproto.MIMEHeader) error {
405 if fn := got1xxFuncForTests; fn != nil {
406 return fn
407 }
408 return traceGot1xxResponseFunc(cs.trace)
409 }
410
411 func (cs *clientStream) abortStream(err error) {
412 cs.cc.mu.Lock()
413 defer cs.cc.mu.Unlock()
414 cs.abortStreamLocked(err)
415 }
416
417 func (cs *clientStream) abortStreamLocked(err error) {
418 cs.abortOnce.Do(func() {
419 cs.abortErr = err
420 close(cs.abort)
421 })
422 if cs.reqBody != nil {
423 cs.closeReqBodyLocked()
424 }
425
426 if cs.cc.cond != nil {
427
428 cs.cc.cond.Broadcast()
429 }
430 }
431
432 func (cs *clientStream) abortRequestBodyWrite() {
433 cc := cs.cc
434 cc.mu.Lock()
435 defer cc.mu.Unlock()
436 if cs.reqBody != nil && cs.reqBodyClosed == nil {
437 cs.closeReqBodyLocked()
438 cc.cond.Broadcast()
439 }
440 }
441
442 func (cs *clientStream) closeReqBodyLocked() {
443 if cs.reqBodyClosed != nil {
444 return
445 }
446 cs.reqBodyClosed = make(chan struct{})
447 reqBodyClosed := cs.reqBodyClosed
448 go func() {
449 cs.reqBody.Close()
450 close(reqBodyClosed)
451 }()
452 }
453
454 type stickyErrWriter struct {
455 conn net.Conn
456 timeout time.Duration
457 err *error
458 }
459
460 func (sew stickyErrWriter) Write(p []byte) (n int, err error) {
461 if *sew.err != nil {
462 return 0, *sew.err
463 }
464 for {
465 if sew.timeout != 0 {
466 sew.conn.SetWriteDeadline(time.Now().Add(sew.timeout))
467 }
468 nn, err := sew.conn.Write(p[n:])
469 n += nn
470 if n < len(p) && nn > 0 && errors.Is(err, os.ErrDeadlineExceeded) {
471
472 continue
473 }
474 if sew.timeout != 0 {
475 sew.conn.SetWriteDeadline(time.Time{})
476 }
477 *sew.err = err
478 return n, err
479 }
480 }
481
482
483
484
485
486
487
488 type noCachedConnError struct{}
489
490 func (noCachedConnError) IsHTTP2NoCachedConnError() {}
491 func (noCachedConnError) Error() string { return "http2: no cached connection was available" }
492
493
494
495
496 func isNoCachedConnError(err error) bool {
497 _, ok := err.(interface{ IsHTTP2NoCachedConnError() })
498 return ok
499 }
500
501 var ErrNoCachedConn error = noCachedConnError{}
502
503
504 type RoundTripOpt struct {
505
506
507
508
509 OnlyCachedConn bool
510 }
511
512 func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) {
513 return t.RoundTripOpt(req, RoundTripOpt{})
514 }
515
516
517
518 func authorityAddr(scheme string, authority string) (addr string) {
519 host, port, err := net.SplitHostPort(authority)
520 if err != nil {
521 host = authority
522 port = ""
523 }
524 if port == "" {
525 port = "443"
526 if scheme == "http" {
527 port = "80"
528 }
529 }
530 if a, err := idna.ToASCII(host); err == nil {
531 host = a
532 }
533
534 if strings.HasPrefix(host, "[") && strings.HasSuffix(host, "]") {
535 return host + ":" + port
536 }
537 return net.JoinHostPort(host, port)
538 }
539
540 var retryBackoffHook func(time.Duration) *time.Timer
541
542 func backoffNewTimer(d time.Duration) *time.Timer {
543 if retryBackoffHook != nil {
544 return retryBackoffHook(d)
545 }
546 return time.NewTimer(d)
547 }
548
549
550 func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Response, error) {
551 if !(req.URL.Scheme == "https" || (req.URL.Scheme == "http" && t.AllowHTTP)) {
552 return nil, errors.New("http2: unsupported scheme")
553 }
554
555 addr := authorityAddr(req.URL.Scheme, req.URL.Host)
556 for retry := 0; ; retry++ {
557 cc, err := t.connPool().GetClientConn(req, addr)
558 if err != nil {
559 t.vlogf("http2: Transport failed to get client conn for %s: %v", addr, err)
560 return nil, err
561 }
562 reused := !atomic.CompareAndSwapUint32(&cc.reused, 0, 1)
563 traceGotConn(req, cc, reused)
564 res, err := cc.RoundTrip(req)
565 if err != nil && retry <= 6 {
566 roundTripErr := err
567 if req, err = shouldRetryRequest(req, err); err == nil {
568
569 if retry == 0 {
570 t.vlogf("RoundTrip retrying after failure: %v", roundTripErr)
571 continue
572 }
573 backoff := float64(uint(1) << (uint(retry) - 1))
574 backoff += backoff * (0.1 * mathrand.Float64())
575 d := time.Second * time.Duration(backoff)
576 timer := backoffNewTimer(d)
577 select {
578 case <-timer.C:
579 t.vlogf("RoundTrip retrying after failure: %v", roundTripErr)
580 continue
581 case <-req.Context().Done():
582 timer.Stop()
583 err = req.Context().Err()
584 }
585 }
586 }
587 if err != nil {
588 t.vlogf("RoundTrip failure: %v", err)
589 return nil, err
590 }
591 return res, nil
592 }
593 }
594
595
596
597
598 func (t *Transport) CloseIdleConnections() {
599 if cp, ok := t.connPool().(clientConnPoolIdleCloser); ok {
600 cp.closeIdleConnections()
601 }
602 }
603
604 var (
605 errClientConnClosed = errors.New("http2: client conn is closed")
606 errClientConnUnusable = errors.New("http2: client conn not usable")
607 errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY")
608 )
609
610
611
612
613
614 func shouldRetryRequest(req *http.Request, err error) (*http.Request, error) {
615 if !canRetryError(err) {
616 return nil, err
617 }
618
619
620 if req.Body == nil || req.Body == http.NoBody {
621 return req, nil
622 }
623
624
625
626 if req.GetBody != nil {
627 body, err := req.GetBody()
628 if err != nil {
629 return nil, err
630 }
631 newReq := *req
632 newReq.Body = body
633 return &newReq, nil
634 }
635
636
637
638
639 if err == errClientConnUnusable {
640 return req, nil
641 }
642
643 return nil, fmt.Errorf("http2: Transport: cannot retry err [%v] after Request.Body was written; define Request.GetBody to avoid this error", err)
644 }
645
646 func canRetryError(err error) bool {
647 if err == errClientConnUnusable || err == errClientConnGotGoAway {
648 return true
649 }
650 if se, ok := err.(StreamError); ok {
651 if se.Code == ErrCodeProtocol && se.Cause == errFromPeer {
652
653 return true
654 }
655 return se.Code == ErrCodeRefusedStream
656 }
657 return false
658 }
659
660 func (t *Transport) dialClientConn(ctx context.Context, addr string, singleUse bool) (*ClientConn, error) {
661 host, _, err := net.SplitHostPort(addr)
662 if err != nil {
663 return nil, err
664 }
665 tconn, err := t.dialTLS(ctx, "tcp", addr, t.newTLSConfig(host))
666 if err != nil {
667 return nil, err
668 }
669 return t.newClientConn(tconn, singleUse)
670 }
671
672 func (t *Transport) newTLSConfig(host string) *tls.Config {
673 cfg := new(tls.Config)
674 if t.TLSClientConfig != nil {
675 *cfg = *t.TLSClientConfig.Clone()
676 }
677 if !strSliceContains(cfg.NextProtos, NextProtoTLS) {
678 cfg.NextProtos = append([]string{NextProtoTLS}, cfg.NextProtos...)
679 }
680 if cfg.ServerName == "" {
681 cfg.ServerName = host
682 }
683 return cfg
684 }
685
686 func (t *Transport) dialTLS(ctx context.Context, network, addr string, tlsCfg *tls.Config) (net.Conn, error) {
687 if t.DialTLSContext != nil {
688 return t.DialTLSContext(ctx, network, addr, tlsCfg)
689 } else if t.DialTLS != nil {
690 return t.DialTLS(network, addr, tlsCfg)
691 }
692
693 tlsCn, err := t.dialTLSWithContext(ctx, network, addr, tlsCfg)
694 if err != nil {
695 return nil, err
696 }
697 state := tlsCn.ConnectionState()
698 if p := state.NegotiatedProtocol; p != NextProtoTLS {
699 return nil, fmt.Errorf("http2: unexpected ALPN protocol %q; want %q", p, NextProtoTLS)
700 }
701 if !state.NegotiatedProtocolIsMutual {
702 return nil, errors.New("http2: could not negotiate protocol mutually")
703 }
704 return tlsCn, nil
705 }
706
707
708
709 func (t *Transport) disableKeepAlives() bool {
710 return t.t1 != nil && t.t1.DisableKeepAlives
711 }
712
713 func (t *Transport) expectContinueTimeout() time.Duration {
714 if t.t1 == nil {
715 return 0
716 }
717 return t.t1.ExpectContinueTimeout
718 }
719
720 func (t *Transport) maxDecoderHeaderTableSize() uint32 {
721 if v := t.MaxDecoderHeaderTableSize; v > 0 {
722 return v
723 }
724 return initialHeaderTableSize
725 }
726
727 func (t *Transport) maxEncoderHeaderTableSize() uint32 {
728 if v := t.MaxEncoderHeaderTableSize; v > 0 {
729 return v
730 }
731 return initialHeaderTableSize
732 }
733
734 func (t *Transport) NewClientConn(c net.Conn) (*ClientConn, error) {
735 return t.newClientConn(c, t.disableKeepAlives())
736 }
737
738 func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, error) {
739 cc := &ClientConn{
740 t: t,
741 tconn: c,
742 readerDone: make(chan struct{}),
743 nextStreamID: 1,
744 maxFrameSize: 16 << 10,
745 initialWindowSize: 65535,
746 maxConcurrentStreams: initialMaxConcurrentStreams,
747 peerMaxHeaderListSize: 0xffffffffffffffff,
748 streams: make(map[uint32]*clientStream),
749 singleUse: singleUse,
750 wantSettingsAck: true,
751 pings: make(map[[8]byte]chan struct{}),
752 reqHeaderMu: make(chan struct{}, 1),
753 }
754 if d := t.idleConnTimeout(); d != 0 {
755 cc.idleTimeout = d
756 cc.idleTimer = time.AfterFunc(d, cc.onIdleTimeout)
757 }
758 if VerboseLogs {
759 t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr())
760 }
761
762 cc.cond = sync.NewCond(&cc.mu)
763 cc.flow.add(int32(initialWindowSize))
764
765
766
767 cc.bw = bufio.NewWriter(stickyErrWriter{
768 conn: c,
769 timeout: t.WriteByteTimeout,
770 err: &cc.werr,
771 })
772 cc.br = bufio.NewReader(c)
773 cc.fr = NewFramer(cc.bw, cc.br)
774 if t.maxFrameReadSize() != 0 {
775 cc.fr.SetMaxReadFrameSize(t.maxFrameReadSize())
776 }
777 if t.CountError != nil {
778 cc.fr.countError = t.CountError
779 }
780 maxHeaderTableSize := t.maxDecoderHeaderTableSize()
781 cc.fr.ReadMetaHeaders = hpack.NewDecoder(maxHeaderTableSize, nil)
782 cc.fr.MaxHeaderListSize = t.maxHeaderListSize()
783
784 cc.henc = hpack.NewEncoder(&cc.hbuf)
785 cc.henc.SetMaxDynamicTableSizeLimit(t.maxEncoderHeaderTableSize())
786 cc.peerMaxHeaderTableSize = initialHeaderTableSize
787
788 if t.AllowHTTP {
789 cc.nextStreamID = 3
790 }
791
792 if cs, ok := c.(connectionStater); ok {
793 state := cs.ConnectionState()
794 cc.tlsState = &state
795 }
796
797 initialSettings := []Setting{
798 {ID: SettingEnablePush, Val: 0},
799 {ID: SettingInitialWindowSize, Val: transportDefaultStreamFlow},
800 }
801 if max := t.maxFrameReadSize(); max != 0 {
802 initialSettings = append(initialSettings, Setting{ID: SettingMaxFrameSize, Val: max})
803 }
804 if max := t.maxHeaderListSize(); max != 0 {
805 initialSettings = append(initialSettings, Setting{ID: SettingMaxHeaderListSize, Val: max})
806 }
807 if maxHeaderTableSize != initialHeaderTableSize {
808 initialSettings = append(initialSettings, Setting{ID: SettingHeaderTableSize, Val: maxHeaderTableSize})
809 }
810
811 cc.bw.Write(clientPreface)
812 cc.fr.WriteSettings(initialSettings...)
813 cc.fr.WriteWindowUpdate(0, transportDefaultConnFlow)
814 cc.inflow.init(transportDefaultConnFlow + initialWindowSize)
815 cc.bw.Flush()
816 if cc.werr != nil {
817 cc.Close()
818 return nil, cc.werr
819 }
820
821 go cc.readLoop()
822 return cc, nil
823 }
824
825 func (cc *ClientConn) healthCheck() {
826 pingTimeout := cc.t.pingTimeout()
827
828
829 ctx, cancel := context.WithTimeout(context.Background(), pingTimeout)
830 defer cancel()
831 cc.vlogf("http2: Transport sending health check")
832 err := cc.Ping(ctx)
833 if err != nil {
834 cc.vlogf("http2: Transport health check failure: %v", err)
835 cc.closeForLostPing()
836 } else {
837 cc.vlogf("http2: Transport health check success")
838 }
839 }
840
841
842 func (cc *ClientConn) SetDoNotReuse() {
843 cc.mu.Lock()
844 defer cc.mu.Unlock()
845 cc.doNotReuse = true
846 }
847
848 func (cc *ClientConn) setGoAway(f *GoAwayFrame) {
849 cc.mu.Lock()
850 defer cc.mu.Unlock()
851
852 old := cc.goAway
853 cc.goAway = f
854
855
856 if cc.goAwayDebug == "" {
857 cc.goAwayDebug = string(f.DebugData())
858 }
859 if old != nil && old.ErrCode != ErrCodeNo {
860 cc.goAway.ErrCode = old.ErrCode
861 }
862 last := f.LastStreamID
863 for streamID, cs := range cc.streams {
864 if streamID > last {
865 cs.abortStreamLocked(errClientConnGotGoAway)
866 }
867 }
868 }
869
870
871
872
873
874
875 func (cc *ClientConn) CanTakeNewRequest() bool {
876 cc.mu.Lock()
877 defer cc.mu.Unlock()
878 return cc.canTakeNewRequestLocked()
879 }
880
881
882
883
884 func (cc *ClientConn) ReserveNewRequest() bool {
885 cc.mu.Lock()
886 defer cc.mu.Unlock()
887 if st := cc.idleStateLocked(); !st.canTakeNewRequest {
888 return false
889 }
890 cc.streamsReserved++
891 return true
892 }
893
894
895 type ClientConnState struct {
896
897 Closed bool
898
899
900
901
902
903 Closing bool
904
905
906 StreamsActive int
907
908
909
910 StreamsReserved int
911
912
913
914
915 StreamsPending int
916
917
918
919
920 MaxConcurrentStreams uint32
921
922
923
924 LastIdle time.Time
925 }
926
927
928 func (cc *ClientConn) State() ClientConnState {
929 cc.wmu.Lock()
930 maxConcurrent := cc.maxConcurrentStreams
931 if !cc.seenSettings {
932 maxConcurrent = 0
933 }
934 cc.wmu.Unlock()
935
936 cc.mu.Lock()
937 defer cc.mu.Unlock()
938 return ClientConnState{
939 Closed: cc.closed,
940 Closing: cc.closing || cc.singleUse || cc.doNotReuse || cc.goAway != nil,
941 StreamsActive: len(cc.streams),
942 StreamsReserved: cc.streamsReserved,
943 StreamsPending: cc.pendingRequests,
944 LastIdle: cc.lastIdle,
945 MaxConcurrentStreams: maxConcurrent,
946 }
947 }
948
949
950
951 type clientConnIdleState struct {
952 canTakeNewRequest bool
953 }
954
955 func (cc *ClientConn) idleState() clientConnIdleState {
956 cc.mu.Lock()
957 defer cc.mu.Unlock()
958 return cc.idleStateLocked()
959 }
960
961 func (cc *ClientConn) idleStateLocked() (st clientConnIdleState) {
962 if cc.singleUse && cc.nextStreamID > 1 {
963 return
964 }
965 var maxConcurrentOkay bool
966 if cc.t.StrictMaxConcurrentStreams {
967
968
969
970
971 maxConcurrentOkay = true
972 } else {
973 maxConcurrentOkay = int64(len(cc.streams)+cc.streamsReserved+1) <= int64(cc.maxConcurrentStreams)
974 }
975
976 st.canTakeNewRequest = cc.goAway == nil && !cc.closed && !cc.closing && maxConcurrentOkay &&
977 !cc.doNotReuse &&
978 int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32 &&
979 !cc.tooIdleLocked()
980 return
981 }
982
983 func (cc *ClientConn) canTakeNewRequestLocked() bool {
984 st := cc.idleStateLocked()
985 return st.canTakeNewRequest
986 }
987
988
989
990 func (cc *ClientConn) tooIdleLocked() bool {
991
992
993
994
995 return cc.idleTimeout != 0 && !cc.lastIdle.IsZero() && time.Since(cc.lastIdle.Round(0)) > cc.idleTimeout
996 }
997
998
999
1000
1001
1002
1003
1004 func (cc *ClientConn) onIdleTimeout() {
1005 cc.closeIfIdle()
1006 }
1007
1008 func (cc *ClientConn) closeConn() {
1009 t := time.AfterFunc(250*time.Millisecond, cc.forceCloseConn)
1010 defer t.Stop()
1011 cc.tconn.Close()
1012 }
1013
1014
1015
1016 func (cc *ClientConn) forceCloseConn() {
1017 tc, ok := cc.tconn.(*tls.Conn)
1018 if !ok {
1019 return
1020 }
1021 if nc := tc.NetConn(); nc != nil {
1022 nc.Close()
1023 }
1024 }
1025
1026 func (cc *ClientConn) closeIfIdle() {
1027 cc.mu.Lock()
1028 if len(cc.streams) > 0 || cc.streamsReserved > 0 {
1029 cc.mu.Unlock()
1030 return
1031 }
1032 cc.closed = true
1033 nextID := cc.nextStreamID
1034
1035 cc.mu.Unlock()
1036
1037 if VerboseLogs {
1038 cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, nextID-2)
1039 }
1040 cc.closeConn()
1041 }
1042
1043 func (cc *ClientConn) isDoNotReuseAndIdle() bool {
1044 cc.mu.Lock()
1045 defer cc.mu.Unlock()
1046 return cc.doNotReuse && len(cc.streams) == 0
1047 }
1048
1049 var shutdownEnterWaitStateHook = func() {}
1050
1051
1052 func (cc *ClientConn) Shutdown(ctx context.Context) error {
1053 if err := cc.sendGoAway(); err != nil {
1054 return err
1055 }
1056
1057 done := make(chan struct{})
1058 cancelled := false
1059 go func() {
1060 cc.mu.Lock()
1061 defer cc.mu.Unlock()
1062 for {
1063 if len(cc.streams) == 0 || cc.closed {
1064 cc.closed = true
1065 close(done)
1066 break
1067 }
1068 if cancelled {
1069 break
1070 }
1071 cc.cond.Wait()
1072 }
1073 }()
1074 shutdownEnterWaitStateHook()
1075 select {
1076 case <-done:
1077 cc.closeConn()
1078 return nil
1079 case <-ctx.Done():
1080 cc.mu.Lock()
1081
1082 cancelled = true
1083 cc.cond.Broadcast()
1084 cc.mu.Unlock()
1085 return ctx.Err()
1086 }
1087 }
1088
1089 func (cc *ClientConn) sendGoAway() error {
1090 cc.mu.Lock()
1091 closing := cc.closing
1092 cc.closing = true
1093 maxStreamID := cc.nextStreamID
1094 cc.mu.Unlock()
1095 if closing {
1096
1097 return nil
1098 }
1099
1100 cc.wmu.Lock()
1101 defer cc.wmu.Unlock()
1102
1103 if err := cc.fr.WriteGoAway(maxStreamID, ErrCodeNo, nil); err != nil {
1104 return err
1105 }
1106 if err := cc.bw.Flush(); err != nil {
1107 return err
1108 }
1109
1110 return nil
1111 }
1112
1113
1114
1115 func (cc *ClientConn) closeForError(err error) {
1116 cc.mu.Lock()
1117 cc.closed = true
1118 for _, cs := range cc.streams {
1119 cs.abortStreamLocked(err)
1120 }
1121 cc.cond.Broadcast()
1122 cc.mu.Unlock()
1123 cc.closeConn()
1124 }
1125
1126
1127
1128
1129 func (cc *ClientConn) Close() error {
1130 err := errors.New("http2: client connection force closed via ClientConn.Close")
1131 cc.closeForError(err)
1132 return nil
1133 }
1134
1135
1136 func (cc *ClientConn) closeForLostPing() {
1137 err := errors.New("http2: client connection lost")
1138 if f := cc.t.CountError; f != nil {
1139 f("conn_close_lost_ping")
1140 }
1141 cc.closeForError(err)
1142 }
1143
1144
1145
1146 var errRequestCanceled = errors.New("net/http: request canceled")
1147
1148 func commaSeparatedTrailers(req *http.Request) (string, error) {
1149 keys := make([]string, 0, len(req.Trailer))
1150 for k := range req.Trailer {
1151 k = canonicalHeader(k)
1152 switch k {
1153 case "Transfer-Encoding", "Trailer", "Content-Length":
1154 return "", fmt.Errorf("invalid Trailer key %q", k)
1155 }
1156 keys = append(keys, k)
1157 }
1158 if len(keys) > 0 {
1159 sort.Strings(keys)
1160 return strings.Join(keys, ","), nil
1161 }
1162 return "", nil
1163 }
1164
1165 func (cc *ClientConn) responseHeaderTimeout() time.Duration {
1166 if cc.t.t1 != nil {
1167 return cc.t.t1.ResponseHeaderTimeout
1168 }
1169
1170
1171
1172
1173 return 0
1174 }
1175
1176
1177
1178
1179 func checkConnHeaders(req *http.Request) error {
1180 if v := req.Header.Get("Upgrade"); v != "" {
1181 return fmt.Errorf("http2: invalid Upgrade request header: %q", req.Header["Upgrade"])
1182 }
1183 if vv := req.Header["Transfer-Encoding"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && vv[0] != "chunked") {
1184 return fmt.Errorf("http2: invalid Transfer-Encoding request header: %q", vv)
1185 }
1186 if vv := req.Header["Connection"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && !asciiEqualFold(vv[0], "close") && !asciiEqualFold(vv[0], "keep-alive")) {
1187 return fmt.Errorf("http2: invalid Connection request header: %q", vv)
1188 }
1189 return nil
1190 }
1191
1192
1193
1194
1195 func actualContentLength(req *http.Request) int64 {
1196 if req.Body == nil || req.Body == http.NoBody {
1197 return 0
1198 }
1199 if req.ContentLength != 0 {
1200 return req.ContentLength
1201 }
1202 return -1
1203 }
1204
1205 func (cc *ClientConn) decrStreamReservations() {
1206 cc.mu.Lock()
1207 defer cc.mu.Unlock()
1208 cc.decrStreamReservationsLocked()
1209 }
1210
1211 func (cc *ClientConn) decrStreamReservationsLocked() {
1212 if cc.streamsReserved > 0 {
1213 cc.streamsReserved--
1214 }
1215 }
1216
1217 func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
1218 ctx := req.Context()
1219 cs := &clientStream{
1220 cc: cc,
1221 ctx: ctx,
1222 reqCancel: req.Cancel,
1223 isHead: req.Method == "HEAD",
1224 reqBody: req.Body,
1225 reqBodyContentLength: actualContentLength(req),
1226 trace: httptrace.ContextClientTrace(ctx),
1227 peerClosed: make(chan struct{}),
1228 abort: make(chan struct{}),
1229 respHeaderRecv: make(chan struct{}),
1230 donec: make(chan struct{}),
1231 }
1232 go cs.doRequest(req)
1233
1234 waitDone := func() error {
1235 select {
1236 case <-cs.donec:
1237 return nil
1238 case <-ctx.Done():
1239 return ctx.Err()
1240 case <-cs.reqCancel:
1241 return errRequestCanceled
1242 }
1243 }
1244
1245 handleResponseHeaders := func() (*http.Response, error) {
1246 res := cs.res
1247 if res.StatusCode > 299 {
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257 cs.abortRequestBodyWrite()
1258 }
1259 res.Request = req
1260 res.TLS = cc.tlsState
1261 if res.Body == noBody && actualContentLength(req) == 0 {
1262
1263
1264
1265 if err := waitDone(); err != nil {
1266 return nil, err
1267 }
1268 }
1269 return res, nil
1270 }
1271
1272 cancelRequest := func(cs *clientStream, err error) error {
1273 cs.cc.mu.Lock()
1274 bodyClosed := cs.reqBodyClosed
1275 cs.cc.mu.Unlock()
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289 if bodyClosed != nil {
1290 <-bodyClosed
1291 }
1292 return err
1293 }
1294
1295 for {
1296 select {
1297 case <-cs.respHeaderRecv:
1298 return handleResponseHeaders()
1299 case <-cs.abort:
1300 select {
1301 case <-cs.respHeaderRecv:
1302
1303
1304
1305
1306 return handleResponseHeaders()
1307 default:
1308 waitDone()
1309 return nil, cs.abortErr
1310 }
1311 case <-ctx.Done():
1312 err := ctx.Err()
1313 cs.abortStream(err)
1314 return nil, cancelRequest(cs, err)
1315 case <-cs.reqCancel:
1316 cs.abortStream(errRequestCanceled)
1317 return nil, cancelRequest(cs, errRequestCanceled)
1318 }
1319 }
1320 }
1321
1322
1323
1324
1325 func (cs *clientStream) doRequest(req *http.Request) {
1326 err := cs.writeRequest(req)
1327 cs.cleanupWriteRequest(err)
1328 }
1329
1330
1331
1332
1333
1334
1335
1336
1337 func (cs *clientStream) writeRequest(req *http.Request) (err error) {
1338 cc := cs.cc
1339 ctx := cs.ctx
1340
1341 if err := checkConnHeaders(req); err != nil {
1342 return err
1343 }
1344
1345
1346
1347
1348 if cc.reqHeaderMu == nil {
1349 panic("RoundTrip on uninitialized ClientConn")
1350 }
1351 select {
1352 case cc.reqHeaderMu <- struct{}{}:
1353 case <-cs.reqCancel:
1354 return errRequestCanceled
1355 case <-ctx.Done():
1356 return ctx.Err()
1357 }
1358
1359 cc.mu.Lock()
1360 if cc.idleTimer != nil {
1361 cc.idleTimer.Stop()
1362 }
1363 cc.decrStreamReservationsLocked()
1364 if err := cc.awaitOpenSlotForStreamLocked(cs); err != nil {
1365 cc.mu.Unlock()
1366 <-cc.reqHeaderMu
1367 return err
1368 }
1369 cc.addStreamLocked(cs)
1370 if isConnectionCloseRequest(req) {
1371 cc.doNotReuse = true
1372 }
1373 cc.mu.Unlock()
1374
1375
1376 if !cc.t.disableCompression() &&
1377 req.Header.Get("Accept-Encoding") == "" &&
1378 req.Header.Get("Range") == "" &&
1379 !cs.isHead {
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392 cs.requestedGzip = true
1393 }
1394
1395 continueTimeout := cc.t.expectContinueTimeout()
1396 if continueTimeout != 0 {
1397 if !httpguts.HeaderValuesContainsToken(req.Header["Expect"], "100-continue") {
1398 continueTimeout = 0
1399 } else {
1400 cs.on100 = make(chan struct{}, 1)
1401 }
1402 }
1403
1404
1405
1406
1407
1408 err = cs.encodeAndWriteHeaders(req)
1409 <-cc.reqHeaderMu
1410 if err != nil {
1411 return err
1412 }
1413
1414 hasBody := cs.reqBodyContentLength != 0
1415 if !hasBody {
1416 cs.sentEndStream = true
1417 } else {
1418 if continueTimeout != 0 {
1419 traceWait100Continue(cs.trace)
1420 timer := time.NewTimer(continueTimeout)
1421 select {
1422 case <-timer.C:
1423 err = nil
1424 case <-cs.on100:
1425 err = nil
1426 case <-cs.abort:
1427 err = cs.abortErr
1428 case <-ctx.Done():
1429 err = ctx.Err()
1430 case <-cs.reqCancel:
1431 err = errRequestCanceled
1432 }
1433 timer.Stop()
1434 if err != nil {
1435 traceWroteRequest(cs.trace, err)
1436 return err
1437 }
1438 }
1439
1440 if err = cs.writeRequestBody(req); err != nil {
1441 if err != errStopReqBodyWrite {
1442 traceWroteRequest(cs.trace, err)
1443 return err
1444 }
1445 } else {
1446 cs.sentEndStream = true
1447 }
1448 }
1449
1450 traceWroteRequest(cs.trace, err)
1451
1452 var respHeaderTimer <-chan time.Time
1453 var respHeaderRecv chan struct{}
1454 if d := cc.responseHeaderTimeout(); d != 0 {
1455 timer := time.NewTimer(d)
1456 defer timer.Stop()
1457 respHeaderTimer = timer.C
1458 respHeaderRecv = cs.respHeaderRecv
1459 }
1460
1461
1462
1463 for {
1464 select {
1465 case <-cs.peerClosed:
1466 return nil
1467 case <-respHeaderTimer:
1468 return errTimeout
1469 case <-respHeaderRecv:
1470 respHeaderRecv = nil
1471 respHeaderTimer = nil
1472 case <-cs.abort:
1473 return cs.abortErr
1474 case <-ctx.Done():
1475 return ctx.Err()
1476 case <-cs.reqCancel:
1477 return errRequestCanceled
1478 }
1479 }
1480 }
1481
1482 func (cs *clientStream) encodeAndWriteHeaders(req *http.Request) error {
1483 cc := cs.cc
1484 ctx := cs.ctx
1485
1486 cc.wmu.Lock()
1487 defer cc.wmu.Unlock()
1488
1489
1490 select {
1491 case <-cs.abort:
1492 return cs.abortErr
1493 case <-ctx.Done():
1494 return ctx.Err()
1495 case <-cs.reqCancel:
1496 return errRequestCanceled
1497 default:
1498 }
1499
1500
1501
1502
1503
1504
1505 trailers, err := commaSeparatedTrailers(req)
1506 if err != nil {
1507 return err
1508 }
1509 hasTrailers := trailers != ""
1510 contentLen := actualContentLength(req)
1511 hasBody := contentLen != 0
1512 hdrs, err := cc.encodeHeaders(req, cs.requestedGzip, trailers, contentLen)
1513 if err != nil {
1514 return err
1515 }
1516
1517
1518 endStream := !hasBody && !hasTrailers
1519 cs.sentHeaders = true
1520 err = cc.writeHeaders(cs.ID, endStream, int(cc.maxFrameSize), hdrs)
1521 traceWroteHeaders(cs.trace)
1522 return err
1523 }
1524
1525
1526
1527
1528
1529 func (cs *clientStream) cleanupWriteRequest(err error) {
1530 cc := cs.cc
1531
1532 if cs.ID == 0 {
1533
1534 cc.decrStreamReservations()
1535 }
1536
1537
1538
1539
1540
1541 cc.mu.Lock()
1542 mustCloseBody := false
1543 if cs.reqBody != nil && cs.reqBodyClosed == nil {
1544 mustCloseBody = true
1545 cs.reqBodyClosed = make(chan struct{})
1546 }
1547 bodyClosed := cs.reqBodyClosed
1548 cc.mu.Unlock()
1549 if mustCloseBody {
1550 cs.reqBody.Close()
1551 close(bodyClosed)
1552 }
1553 if bodyClosed != nil {
1554 <-bodyClosed
1555 }
1556
1557 if err != nil && cs.sentEndStream {
1558
1559
1560
1561 select {
1562 case <-cs.peerClosed:
1563 err = nil
1564 default:
1565 }
1566 }
1567 if err != nil {
1568 cs.abortStream(err)
1569 if cs.sentHeaders {
1570 if se, ok := err.(StreamError); ok {
1571 if se.Cause != errFromPeer {
1572 cc.writeStreamReset(cs.ID, se.Code, err)
1573 }
1574 } else {
1575 cc.writeStreamReset(cs.ID, ErrCodeCancel, err)
1576 }
1577 }
1578 cs.bufPipe.CloseWithError(err)
1579 } else {
1580 if cs.sentHeaders && !cs.sentEndStream {
1581 cc.writeStreamReset(cs.ID, ErrCodeNo, nil)
1582 }
1583 cs.bufPipe.CloseWithError(errRequestCanceled)
1584 }
1585 if cs.ID != 0 {
1586 cc.forgetStreamID(cs.ID)
1587 }
1588
1589 cc.wmu.Lock()
1590 werr := cc.werr
1591 cc.wmu.Unlock()
1592 if werr != nil {
1593 cc.Close()
1594 }
1595
1596 close(cs.donec)
1597 }
1598
1599
1600
1601 func (cc *ClientConn) awaitOpenSlotForStreamLocked(cs *clientStream) error {
1602 for {
1603 cc.lastActive = time.Now()
1604 if cc.closed || !cc.canTakeNewRequestLocked() {
1605 return errClientConnUnusable
1606 }
1607 cc.lastIdle = time.Time{}
1608 if int64(len(cc.streams)) < int64(cc.maxConcurrentStreams) {
1609 return nil
1610 }
1611 cc.pendingRequests++
1612 cc.cond.Wait()
1613 cc.pendingRequests--
1614 select {
1615 case <-cs.abort:
1616 return cs.abortErr
1617 default:
1618 }
1619 }
1620 }
1621
1622
1623 func (cc *ClientConn) writeHeaders(streamID uint32, endStream bool, maxFrameSize int, hdrs []byte) error {
1624 first := true
1625 for len(hdrs) > 0 && cc.werr == nil {
1626 chunk := hdrs
1627 if len(chunk) > maxFrameSize {
1628 chunk = chunk[:maxFrameSize]
1629 }
1630 hdrs = hdrs[len(chunk):]
1631 endHeaders := len(hdrs) == 0
1632 if first {
1633 cc.fr.WriteHeaders(HeadersFrameParam{
1634 StreamID: streamID,
1635 BlockFragment: chunk,
1636 EndStream: endStream,
1637 EndHeaders: endHeaders,
1638 })
1639 first = false
1640 } else {
1641 cc.fr.WriteContinuation(streamID, endHeaders, chunk)
1642 }
1643 }
1644 cc.bw.Flush()
1645 return cc.werr
1646 }
1647
1648
1649 var (
1650
1651 errStopReqBodyWrite = errors.New("http2: aborting request body write")
1652
1653
1654 errStopReqBodyWriteAndCancel = errors.New("http2: canceling request")
1655
1656 errReqBodyTooLong = errors.New("http2: request body larger than specified content length")
1657 )
1658
1659
1660
1661
1662
1663
1664 func (cs *clientStream) frameScratchBufferLen(maxFrameSize int) int {
1665 const max = 512 << 10
1666 n := int64(maxFrameSize)
1667 if n > max {
1668 n = max
1669 }
1670 if cl := cs.reqBodyContentLength; cl != -1 && cl+1 < n {
1671
1672
1673
1674
1675 n = cl + 1
1676 }
1677 if n < 1 {
1678 return 1
1679 }
1680 return int(n)
1681 }
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691 var bufPools [7]sync.Pool
1692 func bufPoolIndex(size int) int {
1693 if size <= 16384 {
1694 return 0
1695 }
1696 size -= 1
1697 bits := bits.Len(uint(size))
1698 index := bits - 14
1699 if index >= len(bufPools) {
1700 return len(bufPools) - 1
1701 }
1702 return index
1703 }
1704
1705 func (cs *clientStream) writeRequestBody(req *http.Request) (err error) {
1706 cc := cs.cc
1707 body := cs.reqBody
1708 sentEnd := false
1709
1710 hasTrailers := req.Trailer != nil
1711 remainLen := cs.reqBodyContentLength
1712 hasContentLen := remainLen != -1
1713
1714 cc.mu.Lock()
1715 maxFrameSize := int(cc.maxFrameSize)
1716 cc.mu.Unlock()
1717
1718
1719 scratchLen := cs.frameScratchBufferLen(maxFrameSize)
1720 var buf []byte
1721 index := bufPoolIndex(scratchLen)
1722 if bp, ok := bufPools[index].Get().(*[]byte); ok && len(*bp) >= scratchLen {
1723 defer bufPools[index].Put(bp)
1724 buf = *bp
1725 } else {
1726 buf = make([]byte, scratchLen)
1727 defer bufPools[index].Put(&buf)
1728 }
1729
1730 var sawEOF bool
1731 for !sawEOF {
1732 n, err := body.Read(buf)
1733 if hasContentLen {
1734 remainLen -= int64(n)
1735 if remainLen == 0 && err == nil {
1736
1737
1738
1739
1740
1741
1742
1743 var scratch [1]byte
1744 var n1 int
1745 n1, err = body.Read(scratch[:])
1746 remainLen -= int64(n1)
1747 }
1748 if remainLen < 0 {
1749 err = errReqBodyTooLong
1750 return err
1751 }
1752 }
1753 if err != nil {
1754 cc.mu.Lock()
1755 bodyClosed := cs.reqBodyClosed != nil
1756 cc.mu.Unlock()
1757 switch {
1758 case bodyClosed:
1759 return errStopReqBodyWrite
1760 case err == io.EOF:
1761 sawEOF = true
1762 err = nil
1763 default:
1764 return err
1765 }
1766 }
1767
1768 remain := buf[:n]
1769 for len(remain) > 0 && err == nil {
1770 var allowed int32
1771 allowed, err = cs.awaitFlowControl(len(remain))
1772 if err != nil {
1773 return err
1774 }
1775 cc.wmu.Lock()
1776 data := remain[:allowed]
1777 remain = remain[allowed:]
1778 sentEnd = sawEOF && len(remain) == 0 && !hasTrailers
1779 err = cc.fr.WriteData(cs.ID, sentEnd, data)
1780 if err == nil {
1781
1782
1783
1784
1785
1786
1787 err = cc.bw.Flush()
1788 }
1789 cc.wmu.Unlock()
1790 }
1791 if err != nil {
1792 return err
1793 }
1794 }
1795
1796 if sentEnd {
1797
1798
1799
1800 return nil
1801 }
1802
1803
1804
1805
1806 cc.mu.Lock()
1807 trailer := req.Trailer
1808 err = cs.abortErr
1809 cc.mu.Unlock()
1810 if err != nil {
1811 return err
1812 }
1813
1814 cc.wmu.Lock()
1815 defer cc.wmu.Unlock()
1816 var trls []byte
1817 if len(trailer) > 0 {
1818 trls, err = cc.encodeTrailers(trailer)
1819 if err != nil {
1820 return err
1821 }
1822 }
1823
1824
1825
1826 if len(trls) > 0 {
1827 err = cc.writeHeaders(cs.ID, true, maxFrameSize, trls)
1828 } else {
1829 err = cc.fr.WriteData(cs.ID, true, nil)
1830 }
1831 if ferr := cc.bw.Flush(); ferr != nil && err == nil {
1832 err = ferr
1833 }
1834 return err
1835 }
1836
1837
1838
1839
1840
1841 func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
1842 cc := cs.cc
1843 ctx := cs.ctx
1844 cc.mu.Lock()
1845 defer cc.mu.Unlock()
1846 for {
1847 if cc.closed {
1848 return 0, errClientConnClosed
1849 }
1850 if cs.reqBodyClosed != nil {
1851 return 0, errStopReqBodyWrite
1852 }
1853 select {
1854 case <-cs.abort:
1855 return 0, cs.abortErr
1856 case <-ctx.Done():
1857 return 0, ctx.Err()
1858 case <-cs.reqCancel:
1859 return 0, errRequestCanceled
1860 default:
1861 }
1862 if a := cs.flow.available(); a > 0 {
1863 take := a
1864 if int(take) > maxBytes {
1865
1866 take = int32(maxBytes)
1867 }
1868 if take > int32(cc.maxFrameSize) {
1869 take = int32(cc.maxFrameSize)
1870 }
1871 cs.flow.take(take)
1872 return take, nil
1873 }
1874 cc.cond.Wait()
1875 }
1876 }
1877
1878 var errNilRequestURL = errors.New("http2: Request.URI is nil")
1879
1880
1881 func (cc *ClientConn) encodeHeaders(req *http.Request, addGzipHeader bool, trailers string, contentLength int64) ([]byte, error) {
1882 cc.hbuf.Reset()
1883 if req.URL == nil {
1884 return nil, errNilRequestURL
1885 }
1886
1887 host := req.Host
1888 if host == "" {
1889 host = req.URL.Host
1890 }
1891 host, err := httpguts.PunycodeHostPort(host)
1892 if err != nil {
1893 return nil, err
1894 }
1895 if !httpguts.ValidHostHeader(host) {
1896 return nil, errors.New("http2: invalid Host header")
1897 }
1898
1899 var path string
1900 if req.Method != "CONNECT" {
1901 path = req.URL.RequestURI()
1902 if !validPseudoPath(path) {
1903 orig := path
1904 path = strings.TrimPrefix(path, req.URL.Scheme+"://"+host)
1905 if !validPseudoPath(path) {
1906 if req.URL.Opaque != "" {
1907 return nil, fmt.Errorf("invalid request :path %q from URL.Opaque = %q", orig, req.URL.Opaque)
1908 } else {
1909 return nil, fmt.Errorf("invalid request :path %q", orig)
1910 }
1911 }
1912 }
1913 }
1914
1915
1916
1917
1918 for k, vv := range req.Header {
1919 if !httpguts.ValidHeaderFieldName(k) {
1920 return nil, fmt.Errorf("invalid HTTP header name %q", k)
1921 }
1922 for _, v := range vv {
1923 if !httpguts.ValidHeaderFieldValue(v) {
1924
1925 return nil, fmt.Errorf("invalid HTTP header value for header %q", k)
1926 }
1927 }
1928 }
1929
1930 enumerateHeaders := func(f func(name, value string)) {
1931
1932
1933
1934
1935
1936 f(":authority", host)
1937 m := req.Method
1938 if m == "" {
1939 m = http.MethodGet
1940 }
1941 f(":method", m)
1942 if req.Method != "CONNECT" {
1943 f(":path", path)
1944 f(":scheme", req.URL.Scheme)
1945 }
1946 if trailers != "" {
1947 f("trailer", trailers)
1948 }
1949
1950 var didUA bool
1951 for k, vv := range req.Header {
1952 if asciiEqualFold(k, "host") || asciiEqualFold(k, "content-length") {
1953
1954
1955 continue
1956 } else if asciiEqualFold(k, "connection") ||
1957 asciiEqualFold(k, "proxy-connection") ||
1958 asciiEqualFold(k, "transfer-encoding") ||
1959 asciiEqualFold(k, "upgrade") ||
1960 asciiEqualFold(k, "keep-alive") {
1961
1962
1963
1964
1965 continue
1966 } else if asciiEqualFold(k, "user-agent") {
1967
1968
1969
1970
1971 didUA = true
1972 if len(vv) < 1 {
1973 continue
1974 }
1975 vv = vv[:1]
1976 if vv[0] == "" {
1977 continue
1978 }
1979 } else if asciiEqualFold(k, "cookie") {
1980
1981
1982
1983 for _, v := range vv {
1984 for {
1985 p := strings.IndexByte(v, ';')
1986 if p < 0 {
1987 break
1988 }
1989 f("cookie", v[:p])
1990 p++
1991
1992 for p+1 <= len(v) && v[p] == ' ' {
1993 p++
1994 }
1995 v = v[p:]
1996 }
1997 if len(v) > 0 {
1998 f("cookie", v)
1999 }
2000 }
2001 continue
2002 }
2003
2004 for _, v := range vv {
2005 f(k, v)
2006 }
2007 }
2008 if shouldSendReqContentLength(req.Method, contentLength) {
2009 f("content-length", strconv.FormatInt(contentLength, 10))
2010 }
2011 if addGzipHeader {
2012 f("accept-encoding", "gzip")
2013 }
2014 if !didUA {
2015 f("user-agent", defaultUserAgent)
2016 }
2017 }
2018
2019
2020
2021
2022
2023 hlSize := uint64(0)
2024 enumerateHeaders(func(name, value string) {
2025 hf := hpack.HeaderField{Name: name, Value: value}
2026 hlSize += uint64(hf.Size())
2027 })
2028
2029 if hlSize > cc.peerMaxHeaderListSize {
2030 return nil, errRequestHeaderListSize
2031 }
2032
2033 trace := httptrace.ContextClientTrace(req.Context())
2034 traceHeaders := traceHasWroteHeaderField(trace)
2035
2036
2037 enumerateHeaders(func(name, value string) {
2038 name, ascii := lowerHeader(name)
2039 if !ascii {
2040
2041
2042 return
2043 }
2044 cc.writeHeader(name, value)
2045 if traceHeaders {
2046 traceWroteHeaderField(trace, name, value)
2047 }
2048 })
2049
2050 return cc.hbuf.Bytes(), nil
2051 }
2052
2053
2054
2055
2056
2057
2058 func shouldSendReqContentLength(method string, contentLength int64) bool {
2059 if contentLength > 0 {
2060 return true
2061 }
2062 if contentLength < 0 {
2063 return false
2064 }
2065
2066
2067 switch method {
2068 case "POST", "PUT", "PATCH":
2069 return true
2070 default:
2071 return false
2072 }
2073 }
2074
2075
2076 func (cc *ClientConn) encodeTrailers(trailer http.Header) ([]byte, error) {
2077 cc.hbuf.Reset()
2078
2079 hlSize := uint64(0)
2080 for k, vv := range trailer {
2081 for _, v := range vv {
2082 hf := hpack.HeaderField{Name: k, Value: v}
2083 hlSize += uint64(hf.Size())
2084 }
2085 }
2086 if hlSize > cc.peerMaxHeaderListSize {
2087 return nil, errRequestHeaderListSize
2088 }
2089
2090 for k, vv := range trailer {
2091 lowKey, ascii := lowerHeader(k)
2092 if !ascii {
2093
2094
2095 continue
2096 }
2097
2098
2099 for _, v := range vv {
2100 cc.writeHeader(lowKey, v)
2101 }
2102 }
2103 return cc.hbuf.Bytes(), nil
2104 }
2105
2106 func (cc *ClientConn) writeHeader(name, value string) {
2107 if VerboseLogs {
2108 log.Printf("http2: Transport encoding header %q = %q", name, value)
2109 }
2110 cc.henc.WriteField(hpack.HeaderField{Name: name, Value: value})
2111 }
2112
2113 type resAndError struct {
2114 _ incomparable
2115 res *http.Response
2116 err error
2117 }
2118
2119
2120 func (cc *ClientConn) addStreamLocked(cs *clientStream) {
2121 cs.flow.add(int32(cc.initialWindowSize))
2122 cs.flow.setConnFlow(&cc.flow)
2123 cs.inflow.init(transportDefaultStreamFlow)
2124 cs.ID = cc.nextStreamID
2125 cc.nextStreamID += 2
2126 cc.streams[cs.ID] = cs
2127 if cs.ID == 0 {
2128 panic("assigned stream ID 0")
2129 }
2130 }
2131
2132 func (cc *ClientConn) forgetStreamID(id uint32) {
2133 cc.mu.Lock()
2134 slen := len(cc.streams)
2135 delete(cc.streams, id)
2136 if len(cc.streams) != slen-1 {
2137 panic("forgetting unknown stream id")
2138 }
2139 cc.lastActive = time.Now()
2140 if len(cc.streams) == 0 && cc.idleTimer != nil {
2141 cc.idleTimer.Reset(cc.idleTimeout)
2142 cc.lastIdle = time.Now()
2143 }
2144
2145
2146 cc.cond.Broadcast()
2147
2148 closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() || cc.goAway != nil
2149 if closeOnIdle && cc.streamsReserved == 0 && len(cc.streams) == 0 {
2150 if VerboseLogs {
2151 cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, cc.nextStreamID-2)
2152 }
2153 cc.closed = true
2154 defer cc.closeConn()
2155 }
2156
2157 cc.mu.Unlock()
2158 }
2159
2160
2161 type clientConnReadLoop struct {
2162 _ incomparable
2163 cc *ClientConn
2164 }
2165
2166
2167 func (cc *ClientConn) readLoop() {
2168 rl := &clientConnReadLoop{cc: cc}
2169 defer rl.cleanup()
2170 cc.readerErr = rl.run()
2171 if ce, ok := cc.readerErr.(ConnectionError); ok {
2172 cc.wmu.Lock()
2173 cc.fr.WriteGoAway(0, ErrCode(ce), nil)
2174 cc.wmu.Unlock()
2175 }
2176 }
2177
2178
2179
2180 type GoAwayError struct {
2181 LastStreamID uint32
2182 ErrCode ErrCode
2183 DebugData string
2184 }
2185
2186 func (e GoAwayError) Error() string {
2187 return fmt.Sprintf("http2: server sent GOAWAY and closed the connection; LastStreamID=%v, ErrCode=%v, debug=%q",
2188 e.LastStreamID, e.ErrCode, e.DebugData)
2189 }
2190
2191 func isEOFOrNetReadError(err error) bool {
2192 if err == io.EOF {
2193 return true
2194 }
2195 ne, ok := err.(*net.OpError)
2196 return ok && ne.Op == "read"
2197 }
2198
2199 func (rl *clientConnReadLoop) cleanup() {
2200 cc := rl.cc
2201 cc.t.connPool().MarkDead(cc)
2202 defer cc.closeConn()
2203 defer close(cc.readerDone)
2204
2205 if cc.idleTimer != nil {
2206 cc.idleTimer.Stop()
2207 }
2208
2209
2210
2211
2212 err := cc.readerErr
2213 cc.mu.Lock()
2214 if cc.goAway != nil && isEOFOrNetReadError(err) {
2215 err = GoAwayError{
2216 LastStreamID: cc.goAway.LastStreamID,
2217 ErrCode: cc.goAway.ErrCode,
2218 DebugData: cc.goAwayDebug,
2219 }
2220 } else if err == io.EOF {
2221 err = io.ErrUnexpectedEOF
2222 }
2223 cc.closed = true
2224
2225 for _, cs := range cc.streams {
2226 select {
2227 case <-cs.peerClosed:
2228
2229
2230 default:
2231 cs.abortStreamLocked(err)
2232 }
2233 }
2234 cc.cond.Broadcast()
2235 cc.mu.Unlock()
2236 }
2237
2238
2239
2240 func (cc *ClientConn) countReadFrameError(err error) {
2241 f := cc.t.CountError
2242 if f == nil || err == nil {
2243 return
2244 }
2245 if ce, ok := err.(ConnectionError); ok {
2246 errCode := ErrCode(ce)
2247 f(fmt.Sprintf("read_frame_conn_error_%s", errCode.stringToken()))
2248 return
2249 }
2250 if errors.Is(err, io.EOF) {
2251 f("read_frame_eof")
2252 return
2253 }
2254 if errors.Is(err, io.ErrUnexpectedEOF) {
2255 f("read_frame_unexpected_eof")
2256 return
2257 }
2258 if errors.Is(err, ErrFrameTooLarge) {
2259 f("read_frame_too_large")
2260 return
2261 }
2262 f("read_frame_other")
2263 }
2264
2265 func (rl *clientConnReadLoop) run() error {
2266 cc := rl.cc
2267 gotSettings := false
2268 readIdleTimeout := cc.t.ReadIdleTimeout
2269 var t *time.Timer
2270 if readIdleTimeout != 0 {
2271 t = time.AfterFunc(readIdleTimeout, cc.healthCheck)
2272 defer t.Stop()
2273 }
2274 for {
2275 f, err := cc.fr.ReadFrame()
2276 if t != nil {
2277 t.Reset(readIdleTimeout)
2278 }
2279 if err != nil {
2280 cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
2281 }
2282 if se, ok := err.(StreamError); ok {
2283 if cs := rl.streamByID(se.StreamID); cs != nil {
2284 if se.Cause == nil {
2285 se.Cause = cc.fr.errDetail
2286 }
2287 rl.endStreamError(cs, se)
2288 }
2289 continue
2290 } else if err != nil {
2291 cc.countReadFrameError(err)
2292 return err
2293 }
2294 if VerboseLogs {
2295 cc.vlogf("http2: Transport received %s", summarizeFrame(f))
2296 }
2297 if !gotSettings {
2298 if _, ok := f.(*SettingsFrame); !ok {
2299 cc.logf("protocol error: received %T before a SETTINGS frame", f)
2300 return ConnectionError(ErrCodeProtocol)
2301 }
2302 gotSettings = true
2303 }
2304
2305 switch f := f.(type) {
2306 case *MetaHeadersFrame:
2307 err = rl.processHeaders(f)
2308 case *DataFrame:
2309 err = rl.processData(f)
2310 case *GoAwayFrame:
2311 err = rl.processGoAway(f)
2312 case *RSTStreamFrame:
2313 err = rl.processResetStream(f)
2314 case *SettingsFrame:
2315 err = rl.processSettings(f)
2316 case *PushPromiseFrame:
2317 err = rl.processPushPromise(f)
2318 case *WindowUpdateFrame:
2319 err = rl.processWindowUpdate(f)
2320 case *PingFrame:
2321 err = rl.processPing(f)
2322 default:
2323 cc.logf("Transport: unhandled response frame type %T", f)
2324 }
2325 if err != nil {
2326 if VerboseLogs {
2327 cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, summarizeFrame(f), err)
2328 }
2329 return err
2330 }
2331 }
2332 }
2333
2334 func (rl *clientConnReadLoop) processHeaders(f *MetaHeadersFrame) error {
2335 cs := rl.streamByID(f.StreamID)
2336 if cs == nil {
2337
2338
2339
2340 return nil
2341 }
2342 if cs.readClosed {
2343 rl.endStreamError(cs, StreamError{
2344 StreamID: f.StreamID,
2345 Code: ErrCodeProtocol,
2346 Cause: errors.New("protocol error: headers after END_STREAM"),
2347 })
2348 return nil
2349 }
2350 if !cs.firstByte {
2351 if cs.trace != nil {
2352
2353
2354
2355
2356 traceFirstResponseByte(cs.trace)
2357 }
2358 cs.firstByte = true
2359 }
2360 if !cs.pastHeaders {
2361 cs.pastHeaders = true
2362 } else {
2363 return rl.processTrailers(cs, f)
2364 }
2365
2366 res, err := rl.handleResponse(cs, f)
2367 if err != nil {
2368 if _, ok := err.(ConnectionError); ok {
2369 return err
2370 }
2371
2372 rl.endStreamError(cs, StreamError{
2373 StreamID: f.StreamID,
2374 Code: ErrCodeProtocol,
2375 Cause: err,
2376 })
2377 return nil
2378 }
2379 if res == nil {
2380
2381 return nil
2382 }
2383 cs.resTrailer = &res.Trailer
2384 cs.res = res
2385 close(cs.respHeaderRecv)
2386 if f.StreamEnded() {
2387 rl.endStream(cs)
2388 }
2389 return nil
2390 }
2391
2392
2393
2394
2395
2396
2397
2398 func (rl *clientConnReadLoop) handleResponse(cs *clientStream, f *MetaHeadersFrame) (*http.Response, error) {
2399 if f.Truncated {
2400 return nil, errResponseHeaderListSize
2401 }
2402
2403 status := f.PseudoValue("status")
2404 if status == "" {
2405 return nil, errors.New("malformed response from server: missing status pseudo header")
2406 }
2407 statusCode, err := strconv.Atoi(status)
2408 if err != nil {
2409 return nil, errors.New("malformed response from server: malformed non-numeric status pseudo header")
2410 }
2411
2412 regularFields := f.RegularFields()
2413 strs := make([]string, len(regularFields))
2414 header := make(http.Header, len(regularFields))
2415 res := &http.Response{
2416 Proto: "HTTP/2.0",
2417 ProtoMajor: 2,
2418 Header: header,
2419 StatusCode: statusCode,
2420 Status: status + " " + http.StatusText(statusCode),
2421 }
2422 for _, hf := range regularFields {
2423 key := canonicalHeader(hf.Name)
2424 if key == "Trailer" {
2425 t := res.Trailer
2426 if t == nil {
2427 t = make(http.Header)
2428 res.Trailer = t
2429 }
2430 foreachHeaderElement(hf.Value, func(v string) {
2431 t[canonicalHeader(v)] = nil
2432 })
2433 } else {
2434 vv := header[key]
2435 if vv == nil && len(strs) > 0 {
2436
2437
2438
2439
2440 vv, strs = strs[:1:1], strs[1:]
2441 vv[0] = hf.Value
2442 header[key] = vv
2443 } else {
2444 header[key] = append(vv, hf.Value)
2445 }
2446 }
2447 }
2448
2449 if statusCode >= 100 && statusCode <= 199 {
2450 if f.StreamEnded() {
2451 return nil, errors.New("1xx informational response with END_STREAM flag")
2452 }
2453 cs.num1xx++
2454 const max1xxResponses = 5
2455 if cs.num1xx > max1xxResponses {
2456 return nil, errors.New("http2: too many 1xx informational responses")
2457 }
2458 if fn := cs.get1xxTraceFunc(); fn != nil {
2459 if err := fn(statusCode, textproto.MIMEHeader(header)); err != nil {
2460 return nil, err
2461 }
2462 }
2463 if statusCode == 100 {
2464 traceGot100Continue(cs.trace)
2465 select {
2466 case cs.on100 <- struct{}{}:
2467 default:
2468 }
2469 }
2470 cs.pastHeaders = false
2471 return nil, nil
2472 }
2473
2474 res.ContentLength = -1
2475 if clens := res.Header["Content-Length"]; len(clens) == 1 {
2476 if cl, err := strconv.ParseUint(clens[0], 10, 63); err == nil {
2477 res.ContentLength = int64(cl)
2478 } else {
2479
2480
2481 }
2482 } else if len(clens) > 1 {
2483
2484
2485 } else if f.StreamEnded() && !cs.isHead {
2486 res.ContentLength = 0
2487 }
2488
2489 if cs.isHead {
2490 res.Body = noBody
2491 return res, nil
2492 }
2493
2494 if f.StreamEnded() {
2495 if res.ContentLength > 0 {
2496 res.Body = missingBody{}
2497 } else {
2498 res.Body = noBody
2499 }
2500 return res, nil
2501 }
2502
2503 cs.bufPipe.setBuffer(&dataBuffer{expected: res.ContentLength})
2504 cs.bytesRemain = res.ContentLength
2505 res.Body = transportResponseBody{cs}
2506
2507 if cs.requestedGzip && asciiEqualFold(res.Header.Get("Content-Encoding"), "gzip") {
2508 res.Header.Del("Content-Encoding")
2509 res.Header.Del("Content-Length")
2510 res.ContentLength = -1
2511 res.Body = &gzipReader{body: res.Body}
2512 res.Uncompressed = true
2513 }
2514 return res, nil
2515 }
2516
2517 func (rl *clientConnReadLoop) processTrailers(cs *clientStream, f *MetaHeadersFrame) error {
2518 if cs.pastTrailers {
2519
2520 return ConnectionError(ErrCodeProtocol)
2521 }
2522 cs.pastTrailers = true
2523 if !f.StreamEnded() {
2524
2525
2526 return ConnectionError(ErrCodeProtocol)
2527 }
2528 if len(f.PseudoFields()) > 0 {
2529
2530
2531 return ConnectionError(ErrCodeProtocol)
2532 }
2533
2534 trailer := make(http.Header)
2535 for _, hf := range f.RegularFields() {
2536 key := canonicalHeader(hf.Name)
2537 trailer[key] = append(trailer[key], hf.Value)
2538 }
2539 cs.trailer = trailer
2540
2541 rl.endStream(cs)
2542 return nil
2543 }
2544
2545
2546
2547 type transportResponseBody struct {
2548 cs *clientStream
2549 }
2550
2551 func (b transportResponseBody) Read(p []byte) (n int, err error) {
2552 cs := b.cs
2553 cc := cs.cc
2554
2555 if cs.readErr != nil {
2556 return 0, cs.readErr
2557 }
2558 n, err = b.cs.bufPipe.Read(p)
2559 if cs.bytesRemain != -1 {
2560 if int64(n) > cs.bytesRemain {
2561 n = int(cs.bytesRemain)
2562 if err == nil {
2563 err = errors.New("net/http: server replied with more than declared Content-Length; truncated")
2564 cs.abortStream(err)
2565 }
2566 cs.readErr = err
2567 return int(cs.bytesRemain), err
2568 }
2569 cs.bytesRemain -= int64(n)
2570 if err == io.EOF && cs.bytesRemain > 0 {
2571 err = io.ErrUnexpectedEOF
2572 cs.readErr = err
2573 return n, err
2574 }
2575 }
2576 if n == 0 {
2577
2578 return
2579 }
2580
2581 cc.mu.Lock()
2582 connAdd := cc.inflow.add(n)
2583 var streamAdd int32
2584 if err == nil {
2585 streamAdd = cs.inflow.add(n)
2586 }
2587 cc.mu.Unlock()
2588
2589 if connAdd != 0 || streamAdd != 0 {
2590 cc.wmu.Lock()
2591 defer cc.wmu.Unlock()
2592 if connAdd != 0 {
2593 cc.fr.WriteWindowUpdate(0, mustUint31(connAdd))
2594 }
2595 if streamAdd != 0 {
2596 cc.fr.WriteWindowUpdate(cs.ID, mustUint31(streamAdd))
2597 }
2598 cc.bw.Flush()
2599 }
2600 return
2601 }
2602
2603 var errClosedResponseBody = errors.New("http2: response body closed")
2604
2605 func (b transportResponseBody) Close() error {
2606 cs := b.cs
2607 cc := cs.cc
2608
2609 cs.bufPipe.BreakWithError(errClosedResponseBody)
2610 cs.abortStream(errClosedResponseBody)
2611
2612 unread := cs.bufPipe.Len()
2613 if unread > 0 {
2614 cc.mu.Lock()
2615
2616 connAdd := cc.inflow.add(unread)
2617 cc.mu.Unlock()
2618
2619
2620
2621 cc.wmu.Lock()
2622
2623 if connAdd > 0 {
2624 cc.fr.WriteWindowUpdate(0, uint32(connAdd))
2625 }
2626 cc.bw.Flush()
2627 cc.wmu.Unlock()
2628 }
2629
2630 select {
2631 case <-cs.donec:
2632 case <-cs.ctx.Done():
2633
2634
2635
2636 return nil
2637 case <-cs.reqCancel:
2638 return errRequestCanceled
2639 }
2640 return nil
2641 }
2642
2643 func (rl *clientConnReadLoop) processData(f *DataFrame) error {
2644 cc := rl.cc
2645 cs := rl.streamByID(f.StreamID)
2646 data := f.Data()
2647 if cs == nil {
2648 cc.mu.Lock()
2649 neverSent := cc.nextStreamID
2650 cc.mu.Unlock()
2651 if f.StreamID >= neverSent {
2652
2653 cc.logf("http2: Transport received unsolicited DATA frame; closing connection")
2654 return ConnectionError(ErrCodeProtocol)
2655 }
2656
2657
2658
2659
2660
2661
2662 if f.Length > 0 {
2663 cc.mu.Lock()
2664 ok := cc.inflow.take(f.Length)
2665 connAdd := cc.inflow.add(int(f.Length))
2666 cc.mu.Unlock()
2667 if !ok {
2668 return ConnectionError(ErrCodeFlowControl)
2669 }
2670 if connAdd > 0 {
2671 cc.wmu.Lock()
2672 cc.fr.WriteWindowUpdate(0, uint32(connAdd))
2673 cc.bw.Flush()
2674 cc.wmu.Unlock()
2675 }
2676 }
2677 return nil
2678 }
2679 if cs.readClosed {
2680 cc.logf("protocol error: received DATA after END_STREAM")
2681 rl.endStreamError(cs, StreamError{
2682 StreamID: f.StreamID,
2683 Code: ErrCodeProtocol,
2684 })
2685 return nil
2686 }
2687 if !cs.firstByte {
2688 cc.logf("protocol error: received DATA before a HEADERS frame")
2689 rl.endStreamError(cs, StreamError{
2690 StreamID: f.StreamID,
2691 Code: ErrCodeProtocol,
2692 })
2693 return nil
2694 }
2695 if f.Length > 0 {
2696 if cs.isHead && len(data) > 0 {
2697 cc.logf("protocol error: received DATA on a HEAD request")
2698 rl.endStreamError(cs, StreamError{
2699 StreamID: f.StreamID,
2700 Code: ErrCodeProtocol,
2701 })
2702 return nil
2703 }
2704
2705 cc.mu.Lock()
2706 if !takeInflows(&cc.inflow, &cs.inflow, f.Length) {
2707 cc.mu.Unlock()
2708 return ConnectionError(ErrCodeFlowControl)
2709 }
2710
2711
2712 var refund int
2713 if pad := int(f.Length) - len(data); pad > 0 {
2714 refund += pad
2715 }
2716
2717 didReset := false
2718 var err error
2719 if len(data) > 0 {
2720 if _, err = cs.bufPipe.Write(data); err != nil {
2721
2722
2723 didReset = true
2724 refund += len(data)
2725 }
2726 }
2727
2728 sendConn := cc.inflow.add(refund)
2729 var sendStream int32
2730 if !didReset {
2731 sendStream = cs.inflow.add(refund)
2732 }
2733 cc.mu.Unlock()
2734
2735 if sendConn > 0 || sendStream > 0 {
2736 cc.wmu.Lock()
2737 if sendConn > 0 {
2738 cc.fr.WriteWindowUpdate(0, uint32(sendConn))
2739 }
2740 if sendStream > 0 {
2741 cc.fr.WriteWindowUpdate(cs.ID, uint32(sendStream))
2742 }
2743 cc.bw.Flush()
2744 cc.wmu.Unlock()
2745 }
2746
2747 if err != nil {
2748 rl.endStreamError(cs, err)
2749 return nil
2750 }
2751 }
2752
2753 if f.StreamEnded() {
2754 rl.endStream(cs)
2755 }
2756 return nil
2757 }
2758
2759 func (rl *clientConnReadLoop) endStream(cs *clientStream) {
2760
2761
2762 if !cs.readClosed {
2763 cs.readClosed = true
2764
2765
2766
2767
2768 rl.cc.mu.Lock()
2769 defer rl.cc.mu.Unlock()
2770 cs.bufPipe.closeWithErrorAndCode(io.EOF, cs.copyTrailers)
2771 close(cs.peerClosed)
2772 }
2773 }
2774
2775 func (rl *clientConnReadLoop) endStreamError(cs *clientStream, err error) {
2776 cs.readAborted = true
2777 cs.abortStream(err)
2778 }
2779
2780 func (rl *clientConnReadLoop) streamByID(id uint32) *clientStream {
2781 rl.cc.mu.Lock()
2782 defer rl.cc.mu.Unlock()
2783 cs := rl.cc.streams[id]
2784 if cs != nil && !cs.readAborted {
2785 return cs
2786 }
2787 return nil
2788 }
2789
2790 func (cs *clientStream) copyTrailers() {
2791 for k, vv := range cs.trailer {
2792 t := cs.resTrailer
2793 if *t == nil {
2794 *t = make(http.Header)
2795 }
2796 (*t)[k] = vv
2797 }
2798 }
2799
2800 func (rl *clientConnReadLoop) processGoAway(f *GoAwayFrame) error {
2801 cc := rl.cc
2802 cc.t.connPool().MarkDead(cc)
2803 if f.ErrCode != 0 {
2804
2805 cc.vlogf("transport got GOAWAY with error code = %v", f.ErrCode)
2806 if fn := cc.t.CountError; fn != nil {
2807 fn("recv_goaway_" + f.ErrCode.stringToken())
2808 }
2809 }
2810 cc.setGoAway(f)
2811 return nil
2812 }
2813
2814 func (rl *clientConnReadLoop) processSettings(f *SettingsFrame) error {
2815 cc := rl.cc
2816
2817
2818 cc.wmu.Lock()
2819 defer cc.wmu.Unlock()
2820
2821 if err := rl.processSettingsNoWrite(f); err != nil {
2822 return err
2823 }
2824 if !f.IsAck() {
2825 cc.fr.WriteSettingsAck()
2826 cc.bw.Flush()
2827 }
2828 return nil
2829 }
2830
2831 func (rl *clientConnReadLoop) processSettingsNoWrite(f *SettingsFrame) error {
2832 cc := rl.cc
2833 cc.mu.Lock()
2834 defer cc.mu.Unlock()
2835
2836 if f.IsAck() {
2837 if cc.wantSettingsAck {
2838 cc.wantSettingsAck = false
2839 return nil
2840 }
2841 return ConnectionError(ErrCodeProtocol)
2842 }
2843
2844 var seenMaxConcurrentStreams bool
2845 err := f.ForeachSetting(func(s Setting) error {
2846 switch s.ID {
2847 case SettingMaxFrameSize:
2848 cc.maxFrameSize = s.Val
2849 case SettingMaxConcurrentStreams:
2850 cc.maxConcurrentStreams = s.Val
2851 seenMaxConcurrentStreams = true
2852 case SettingMaxHeaderListSize:
2853 cc.peerMaxHeaderListSize = uint64(s.Val)
2854 case SettingInitialWindowSize:
2855
2856
2857
2858
2859 if s.Val > math.MaxInt32 {
2860 return ConnectionError(ErrCodeFlowControl)
2861 }
2862
2863
2864
2865
2866 delta := int32(s.Val) - int32(cc.initialWindowSize)
2867 for _, cs := range cc.streams {
2868 cs.flow.add(delta)
2869 }
2870 cc.cond.Broadcast()
2871
2872 cc.initialWindowSize = s.Val
2873 case SettingHeaderTableSize:
2874 cc.henc.SetMaxDynamicTableSize(s.Val)
2875 cc.peerMaxHeaderTableSize = s.Val
2876 default:
2877 cc.vlogf("Unhandled Setting: %v", s)
2878 }
2879 return nil
2880 })
2881 if err != nil {
2882 return err
2883 }
2884
2885 if !cc.seenSettings {
2886 if !seenMaxConcurrentStreams {
2887
2888
2889
2890
2891 cc.maxConcurrentStreams = defaultMaxConcurrentStreams
2892 }
2893 cc.seenSettings = true
2894 }
2895
2896 return nil
2897 }
2898
2899 func (rl *clientConnReadLoop) processWindowUpdate(f *WindowUpdateFrame) error {
2900 cc := rl.cc
2901 cs := rl.streamByID(f.StreamID)
2902 if f.StreamID != 0 && cs == nil {
2903 return nil
2904 }
2905
2906 cc.mu.Lock()
2907 defer cc.mu.Unlock()
2908
2909 fl := &cc.flow
2910 if cs != nil {
2911 fl = &cs.flow
2912 }
2913 if !fl.add(int32(f.Increment)) {
2914 return ConnectionError(ErrCodeFlowControl)
2915 }
2916 cc.cond.Broadcast()
2917 return nil
2918 }
2919
2920 func (rl *clientConnReadLoop) processResetStream(f *RSTStreamFrame) error {
2921 cs := rl.streamByID(f.StreamID)
2922 if cs == nil {
2923
2924 return nil
2925 }
2926 serr := streamError(cs.ID, f.ErrCode)
2927 serr.Cause = errFromPeer
2928 if f.ErrCode == ErrCodeProtocol {
2929 rl.cc.SetDoNotReuse()
2930 }
2931 if fn := cs.cc.t.CountError; fn != nil {
2932 fn("recv_rststream_" + f.ErrCode.stringToken())
2933 }
2934 cs.abortStream(serr)
2935
2936 cs.bufPipe.CloseWithError(serr)
2937 return nil
2938 }
2939
2940
2941 func (cc *ClientConn) Ping(ctx context.Context) error {
2942 c := make(chan struct{})
2943
2944 var p [8]byte
2945 for {
2946 if _, err := rand.Read(p[:]); err != nil {
2947 return err
2948 }
2949 cc.mu.Lock()
2950
2951 if _, found := cc.pings[p]; !found {
2952 cc.pings[p] = c
2953 cc.mu.Unlock()
2954 break
2955 }
2956 cc.mu.Unlock()
2957 }
2958 errc := make(chan error, 1)
2959 go func() {
2960 cc.wmu.Lock()
2961 defer cc.wmu.Unlock()
2962 if err := cc.fr.WritePing(false, p); err != nil {
2963 errc <- err
2964 return
2965 }
2966 if err := cc.bw.Flush(); err != nil {
2967 errc <- err
2968 return
2969 }
2970 }()
2971 select {
2972 case <-c:
2973 return nil
2974 case err := <-errc:
2975 return err
2976 case <-ctx.Done():
2977 return ctx.Err()
2978 case <-cc.readerDone:
2979
2980 return cc.readerErr
2981 }
2982 }
2983
2984 func (rl *clientConnReadLoop) processPing(f *PingFrame) error {
2985 if f.IsAck() {
2986 cc := rl.cc
2987 cc.mu.Lock()
2988 defer cc.mu.Unlock()
2989
2990 if c, ok := cc.pings[f.Data]; ok {
2991 close(c)
2992 delete(cc.pings, f.Data)
2993 }
2994 return nil
2995 }
2996 cc := rl.cc
2997 cc.wmu.Lock()
2998 defer cc.wmu.Unlock()
2999 if err := cc.fr.WritePing(true, f.Data); err != nil {
3000 return err
3001 }
3002 return cc.bw.Flush()
3003 }
3004
3005 func (rl *clientConnReadLoop) processPushPromise(f *PushPromiseFrame) error {
3006
3007
3008
3009
3010
3011
3012
3013 return ConnectionError(ErrCodeProtocol)
3014 }
3015
3016 func (cc *ClientConn) writeStreamReset(streamID uint32, code ErrCode, err error) {
3017
3018
3019
3020
3021 cc.wmu.Lock()
3022 cc.fr.WriteRSTStream(streamID, code)
3023 cc.bw.Flush()
3024 cc.wmu.Unlock()
3025 }
3026
3027 var (
3028 errResponseHeaderListSize = errors.New("http2: response header list larger than advertised limit")
3029 errRequestHeaderListSize = errors.New("http2: request header list larger than peer's advertised limit")
3030 )
3031
3032 func (cc *ClientConn) logf(format string, args ...interface{}) {
3033 cc.t.logf(format, args...)
3034 }
3035
3036 func (cc *ClientConn) vlogf(format string, args ...interface{}) {
3037 cc.t.vlogf(format, args...)
3038 }
3039
3040 func (t *Transport) vlogf(format string, args ...interface{}) {
3041 if VerboseLogs {
3042 t.logf(format, args...)
3043 }
3044 }
3045
3046 func (t *Transport) logf(format string, args ...interface{}) {
3047 log.Printf(format, args...)
3048 }
3049
3050 var noBody io.ReadCloser = noBodyReader{}
3051
3052 type noBodyReader struct{}
3053
3054 func (noBodyReader) Close() error { return nil }
3055 func (noBodyReader) Read([]byte) (int, error) { return 0, io.EOF }
3056
3057 type missingBody struct{}
3058
3059 func (missingBody) Close() error { return nil }
3060 func (missingBody) Read([]byte) (int, error) { return 0, io.ErrUnexpectedEOF }
3061
3062 func strSliceContains(ss []string, s string) bool {
3063 for _, v := range ss {
3064 if v == s {
3065 return true
3066 }
3067 }
3068 return false
3069 }
3070
3071 type erringRoundTripper struct{ err error }
3072
3073 func (rt erringRoundTripper) RoundTripErr() error { return rt.err }
3074 func (rt erringRoundTripper) RoundTrip(*http.Request) (*http.Response, error) { return nil, rt.err }
3075
3076
3077
3078 type gzipReader struct {
3079 _ incomparable
3080 body io.ReadCloser
3081 zr *gzip.Reader
3082 zerr error
3083 }
3084
3085 func (gz *gzipReader) Read(p []byte) (n int, err error) {
3086 if gz.zerr != nil {
3087 return 0, gz.zerr
3088 }
3089 if gz.zr == nil {
3090 gz.zr, err = gzip.NewReader(gz.body)
3091 if err != nil {
3092 gz.zerr = err
3093 return 0, err
3094 }
3095 }
3096 return gz.zr.Read(p)
3097 }
3098
3099 func (gz *gzipReader) Close() error {
3100 if err := gz.body.Close(); err != nil {
3101 return err
3102 }
3103 gz.zerr = fs.ErrClosed
3104 return nil
3105 }
3106
3107 type errorReader struct{ err error }
3108
3109 func (r errorReader) Read(p []byte) (int, error) { return 0, r.err }
3110
3111
3112
3113 func isConnectionCloseRequest(req *http.Request) bool {
3114 return req.Close || httpguts.HeaderValuesContainsToken(req.Header["Connection"], "close")
3115 }
3116
3117
3118
3119 func registerHTTPSProtocol(t *http.Transport, rt noDialH2RoundTripper) (err error) {
3120 defer func() {
3121 if e := recover(); e != nil {
3122 err = fmt.Errorf("%v", e)
3123 }
3124 }()
3125 t.RegisterProtocol("https", rt)
3126 return nil
3127 }
3128
3129
3130
3131
3132
3133 type noDialH2RoundTripper struct{ *Transport }
3134
3135 func (rt noDialH2RoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
3136 res, err := rt.Transport.RoundTrip(req)
3137 if isNoCachedConnError(err) {
3138 return nil, http.ErrSkipAltProtocol
3139 }
3140 return res, err
3141 }
3142
3143 func (t *Transport) idleConnTimeout() time.Duration {
3144 if t.t1 != nil {
3145 return t.t1.IdleConnTimeout
3146 }
3147 return 0
3148 }
3149
3150 func traceGetConn(req *http.Request, hostPort string) {
3151 trace := httptrace.ContextClientTrace(req.Context())
3152 if trace == nil || trace.GetConn == nil {
3153 return
3154 }
3155 trace.GetConn(hostPort)
3156 }
3157
3158 func traceGotConn(req *http.Request, cc *ClientConn, reused bool) {
3159 trace := httptrace.ContextClientTrace(req.Context())
3160 if trace == nil || trace.GotConn == nil {
3161 return
3162 }
3163 ci := httptrace.GotConnInfo{Conn: cc.tconn}
3164 ci.Reused = reused
3165 cc.mu.Lock()
3166 ci.WasIdle = len(cc.streams) == 0 && reused
3167 if ci.WasIdle && !cc.lastActive.IsZero() {
3168 ci.IdleTime = time.Since(cc.lastActive)
3169 }
3170 cc.mu.Unlock()
3171
3172 trace.GotConn(ci)
3173 }
3174
3175 func traceWroteHeaders(trace *httptrace.ClientTrace) {
3176 if trace != nil && trace.WroteHeaders != nil {
3177 trace.WroteHeaders()
3178 }
3179 }
3180
3181 func traceGot100Continue(trace *httptrace.ClientTrace) {
3182 if trace != nil && trace.Got100Continue != nil {
3183 trace.Got100Continue()
3184 }
3185 }
3186
3187 func traceWait100Continue(trace *httptrace.ClientTrace) {
3188 if trace != nil && trace.Wait100Continue != nil {
3189 trace.Wait100Continue()
3190 }
3191 }
3192
3193 func traceWroteRequest(trace *httptrace.ClientTrace, err error) {
3194 if trace != nil && trace.WroteRequest != nil {
3195 trace.WroteRequest(httptrace.WroteRequestInfo{Err: err})
3196 }
3197 }
3198
3199 func traceFirstResponseByte(trace *httptrace.ClientTrace) {
3200 if trace != nil && trace.GotFirstResponseByte != nil {
3201 trace.GotFirstResponseByte()
3202 }
3203 }
3204
3205 func traceHasWroteHeaderField(trace *httptrace.ClientTrace) bool {
3206 return trace != nil && trace.WroteHeaderField != nil
3207 }
3208
3209 func traceWroteHeaderField(trace *httptrace.ClientTrace, k, v string) {
3210 if trace != nil && trace.WroteHeaderField != nil {
3211 trace.WroteHeaderField(k, []string{v})
3212 }
3213 }
3214
3215 func traceGot1xxResponseFunc(trace *httptrace.ClientTrace) func(int, textproto.MIMEHeader) error {
3216 if trace != nil {
3217 return trace.Got1xxResponse
3218 }
3219 return nil
3220 }
3221
3222
3223
3224 func (t *Transport) dialTLSWithContext(ctx context.Context, network, addr string, cfg *tls.Config) (*tls.Conn, error) {
3225 dialer := &tls.Dialer{
3226 Config: cfg,
3227 }
3228 cn, err := dialer.DialContext(ctx, network, addr)
3229 if err != nil {
3230 return nil, err
3231 }
3232 tlsCn := cn.(*tls.Conn)
3233 return tlsCn, nil
3234 }
3235
View as plain text