1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 package http2
27
28 import (
29 "bufio"
30 "bytes"
31 "context"
32 "crypto/tls"
33 "errors"
34 "fmt"
35 "io"
36 "log"
37 "math"
38 "net"
39 "net/http"
40 "net/textproto"
41 "net/url"
42 "os"
43 "reflect"
44 "runtime"
45 "strconv"
46 "strings"
47 "sync"
48 "time"
49
50 "golang.org/x/net/http/httpguts"
51 "golang.org/x/net/http2/hpack"
52 )
53
54 const (
55 prefaceTimeout = 10 * time.Second
56 firstSettingsTimeout = 2 * time.Second
57 handlerChunkWriteSize = 4 << 10
58 defaultMaxStreams = 250
59 maxQueuedControlFrames = 10000
60 )
61
62 var (
63 errClientDisconnected = errors.New("client disconnected")
64 errClosedBody = errors.New("body closed by handler")
65 errHandlerComplete = errors.New("http2: request body closed due to handler exiting")
66 errStreamClosed = errors.New("http2: stream closed")
67 )
68
69 var responseWriterStatePool = sync.Pool{
70 New: func() interface{} {
71 rws := &responseWriterState{}
72 rws.bw = bufio.NewWriterSize(chunkWriter{rws}, handlerChunkWriteSize)
73 return rws
74 },
75 }
76
77
78 var (
79 testHookOnConn func()
80 testHookGetServerConn func(*serverConn)
81 testHookOnPanicMu *sync.Mutex
82 testHookOnPanic func(sc *serverConn, panicVal interface{}) (rePanic bool)
83 )
84
85
86 type Server struct {
87
88
89
90
91 MaxHandlers int
92
93
94
95
96
97
98
99 MaxConcurrentStreams uint32
100
101
102
103
104
105
106 MaxDecoderHeaderTableSize uint32
107
108
109
110
111
112 MaxEncoderHeaderTableSize uint32
113
114
115
116
117
118 MaxReadFrameSize uint32
119
120
121
122 PermitProhibitedCipherSuites bool
123
124
125
126
127 IdleTimeout time.Duration
128
129
130
131
132
133
134 MaxUploadBufferPerConnection int32
135
136
137
138
139
140 MaxUploadBufferPerStream int32
141
142
143
144 NewWriteScheduler func() WriteScheduler
145
146
147
148
149
150 CountError func(errType string)
151
152
153
154
155 state *serverInternalState
156 }
157
158 func (s *Server) initialConnRecvWindowSize() int32 {
159 if s.MaxUploadBufferPerConnection >= initialWindowSize {
160 return s.MaxUploadBufferPerConnection
161 }
162 return 1 << 20
163 }
164
165 func (s *Server) initialStreamRecvWindowSize() int32 {
166 if s.MaxUploadBufferPerStream > 0 {
167 return s.MaxUploadBufferPerStream
168 }
169 return 1 << 20
170 }
171
172 func (s *Server) maxReadFrameSize() uint32 {
173 if v := s.MaxReadFrameSize; v >= minMaxFrameSize && v <= maxFrameSize {
174 return v
175 }
176 return defaultMaxReadFrameSize
177 }
178
179 func (s *Server) maxConcurrentStreams() uint32 {
180 if v := s.MaxConcurrentStreams; v > 0 {
181 return v
182 }
183 return defaultMaxStreams
184 }
185
186 func (s *Server) maxDecoderHeaderTableSize() uint32 {
187 if v := s.MaxDecoderHeaderTableSize; v > 0 {
188 return v
189 }
190 return initialHeaderTableSize
191 }
192
193 func (s *Server) maxEncoderHeaderTableSize() uint32 {
194 if v := s.MaxEncoderHeaderTableSize; v > 0 {
195 return v
196 }
197 return initialHeaderTableSize
198 }
199
200
201
202
203 func (s *Server) maxQueuedControlFrames() int {
204
205
206 return maxQueuedControlFrames
207 }
208
209 type serverInternalState struct {
210 mu sync.Mutex
211 activeConns map[*serverConn]struct{}
212 }
213
214 func (s *serverInternalState) registerConn(sc *serverConn) {
215 if s == nil {
216 return
217 }
218 s.mu.Lock()
219 s.activeConns[sc] = struct{}{}
220 s.mu.Unlock()
221 }
222
223 func (s *serverInternalState) unregisterConn(sc *serverConn) {
224 if s == nil {
225 return
226 }
227 s.mu.Lock()
228 delete(s.activeConns, sc)
229 s.mu.Unlock()
230 }
231
232 func (s *serverInternalState) startGracefulShutdown() {
233 if s == nil {
234 return
235 }
236 s.mu.Lock()
237 for sc := range s.activeConns {
238 sc.startGracefulShutdown()
239 }
240 s.mu.Unlock()
241 }
242
243
244
245
246
247
248 func ConfigureServer(s *http.Server, conf *Server) error {
249 if s == nil {
250 panic("nil *http.Server")
251 }
252 if conf == nil {
253 conf = new(Server)
254 }
255 conf.state = &serverInternalState{activeConns: make(map[*serverConn]struct{})}
256 if h1, h2 := s, conf; h2.IdleTimeout == 0 {
257 if h1.IdleTimeout != 0 {
258 h2.IdleTimeout = h1.IdleTimeout
259 } else {
260 h2.IdleTimeout = h1.ReadTimeout
261 }
262 }
263 s.RegisterOnShutdown(conf.state.startGracefulShutdown)
264
265 if s.TLSConfig == nil {
266 s.TLSConfig = new(tls.Config)
267 } else if s.TLSConfig.CipherSuites != nil && s.TLSConfig.MinVersion < tls.VersionTLS13 {
268
269
270
271 haveRequired := false
272 for _, cs := range s.TLSConfig.CipherSuites {
273 switch cs {
274 case tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
275
276
277 tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256:
278 haveRequired = true
279 }
280 }
281 if !haveRequired {
282 return fmt.Errorf("http2: TLSConfig.CipherSuites is missing an HTTP/2-required AES_128_GCM_SHA256 cipher (need at least one of TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 or TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256)")
283 }
284 }
285
286
287
288
289
290
291
292
293 s.TLSConfig.PreferServerCipherSuites = true
294
295 if !strSliceContains(s.TLSConfig.NextProtos, NextProtoTLS) {
296 s.TLSConfig.NextProtos = append(s.TLSConfig.NextProtos, NextProtoTLS)
297 }
298 if !strSliceContains(s.TLSConfig.NextProtos, "http/1.1") {
299 s.TLSConfig.NextProtos = append(s.TLSConfig.NextProtos, "http/1.1")
300 }
301
302 if s.TLSNextProto == nil {
303 s.TLSNextProto = map[string]func(*http.Server, *tls.Conn, http.Handler){}
304 }
305 protoHandler := func(hs *http.Server, c *tls.Conn, h http.Handler) {
306 if testHookOnConn != nil {
307 testHookOnConn()
308 }
309
310
311
312
313
314 var ctx context.Context
315 type baseContexter interface {
316 BaseContext() context.Context
317 }
318 if bc, ok := h.(baseContexter); ok {
319 ctx = bc.BaseContext()
320 }
321 conf.ServeConn(c, &ServeConnOpts{
322 Context: ctx,
323 Handler: h,
324 BaseConfig: hs,
325 })
326 }
327 s.TLSNextProto[NextProtoTLS] = protoHandler
328 return nil
329 }
330
331
332 type ServeConnOpts struct {
333
334
335 Context context.Context
336
337
338
339 BaseConfig *http.Server
340
341
342
343
344 Handler http.Handler
345
346
347
348
349
350 UpgradeRequest *http.Request
351
352
353
354 Settings []byte
355
356
357
358 SawClientPreface bool
359 }
360
361 func (o *ServeConnOpts) context() context.Context {
362 if o != nil && o.Context != nil {
363 return o.Context
364 }
365 return context.Background()
366 }
367
368 func (o *ServeConnOpts) baseConfig() *http.Server {
369 if o != nil && o.BaseConfig != nil {
370 return o.BaseConfig
371 }
372 return new(http.Server)
373 }
374
375 func (o *ServeConnOpts) handler() http.Handler {
376 if o != nil {
377 if o.Handler != nil {
378 return o.Handler
379 }
380 if o.BaseConfig != nil && o.BaseConfig.Handler != nil {
381 return o.BaseConfig.Handler
382 }
383 }
384 return http.DefaultServeMux
385 }
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401 func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) {
402 baseCtx, cancel := serverConnBaseContext(c, opts)
403 defer cancel()
404
405 sc := &serverConn{
406 srv: s,
407 hs: opts.baseConfig(),
408 conn: c,
409 baseCtx: baseCtx,
410 remoteAddrStr: c.RemoteAddr().String(),
411 bw: newBufferedWriter(c),
412 handler: opts.handler(),
413 streams: make(map[uint32]*stream),
414 readFrameCh: make(chan readFrameResult),
415 wantWriteFrameCh: make(chan FrameWriteRequest, 8),
416 serveMsgCh: make(chan interface{}, 8),
417 wroteFrameCh: make(chan frameWriteResult, 1),
418 bodyReadCh: make(chan bodyReadMsg),
419 doneServing: make(chan struct{}),
420 clientMaxStreams: math.MaxUint32,
421 advMaxStreams: s.maxConcurrentStreams(),
422 initialStreamSendWindowSize: initialWindowSize,
423 maxFrameSize: initialMaxFrameSize,
424 serveG: newGoroutineLock(),
425 pushEnabled: true,
426 sawClientPreface: opts.SawClientPreface,
427 }
428
429 s.state.registerConn(sc)
430 defer s.state.unregisterConn(sc)
431
432
433
434
435
436
437 if sc.hs.WriteTimeout != 0 {
438 sc.conn.SetWriteDeadline(time.Time{})
439 }
440
441 if s.NewWriteScheduler != nil {
442 sc.writeSched = s.NewWriteScheduler()
443 } else {
444 sc.writeSched = newRoundRobinWriteScheduler()
445 }
446
447
448
449
450 sc.flow.add(initialWindowSize)
451 sc.inflow.init(initialWindowSize)
452 sc.hpackEncoder = hpack.NewEncoder(&sc.headerWriteBuf)
453 sc.hpackEncoder.SetMaxDynamicTableSizeLimit(s.maxEncoderHeaderTableSize())
454
455 fr := NewFramer(sc.bw, c)
456 if s.CountError != nil {
457 fr.countError = s.CountError
458 }
459 fr.ReadMetaHeaders = hpack.NewDecoder(s.maxDecoderHeaderTableSize(), nil)
460 fr.MaxHeaderListSize = sc.maxHeaderListSize()
461 fr.SetMaxReadFrameSize(s.maxReadFrameSize())
462 sc.framer = fr
463
464 if tc, ok := c.(connectionStater); ok {
465 sc.tlsState = new(tls.ConnectionState)
466 *sc.tlsState = tc.ConnectionState()
467
468
469
470
471
472
473
474
475
476
477 if sc.tlsState.Version < tls.VersionTLS12 {
478 sc.rejectConn(ErrCodeInadequateSecurity, "TLS version too low")
479 return
480 }
481
482 if sc.tlsState.ServerName == "" {
483
484
485
486
487
488
489
490
491
492 }
493
494 if !s.PermitProhibitedCipherSuites && isBadCipher(sc.tlsState.CipherSuite) {
495
496
497
498
499
500
501
502
503
504
505 sc.rejectConn(ErrCodeInadequateSecurity, fmt.Sprintf("Prohibited TLS 1.2 Cipher Suite: %x", sc.tlsState.CipherSuite))
506 return
507 }
508 }
509
510 if opts.Settings != nil {
511 fr := &SettingsFrame{
512 FrameHeader: FrameHeader{valid: true},
513 p: opts.Settings,
514 }
515 if err := fr.ForeachSetting(sc.processSetting); err != nil {
516 sc.rejectConn(ErrCodeProtocol, "invalid settings")
517 return
518 }
519 opts.Settings = nil
520 }
521
522 if hook := testHookGetServerConn; hook != nil {
523 hook(sc)
524 }
525
526 if opts.UpgradeRequest != nil {
527 sc.upgradeRequest(opts.UpgradeRequest)
528 opts.UpgradeRequest = nil
529 }
530
531 sc.serve()
532 }
533
534 func serverConnBaseContext(c net.Conn, opts *ServeConnOpts) (ctx context.Context, cancel func()) {
535 ctx, cancel = context.WithCancel(opts.context())
536 ctx = context.WithValue(ctx, http.LocalAddrContextKey, c.LocalAddr())
537 if hs := opts.baseConfig(); hs != nil {
538 ctx = context.WithValue(ctx, http.ServerContextKey, hs)
539 }
540 return
541 }
542
543 func (sc *serverConn) rejectConn(err ErrCode, debug string) {
544 sc.vlogf("http2: server rejecting conn: %v, %s", err, debug)
545
546 sc.framer.WriteGoAway(0, err, []byte(debug))
547 sc.bw.Flush()
548 sc.conn.Close()
549 }
550
551 type serverConn struct {
552
553 srv *Server
554 hs *http.Server
555 conn net.Conn
556 bw *bufferedWriter
557 handler http.Handler
558 baseCtx context.Context
559 framer *Framer
560 doneServing chan struct{}
561 readFrameCh chan readFrameResult
562 wantWriteFrameCh chan FrameWriteRequest
563 wroteFrameCh chan frameWriteResult
564 bodyReadCh chan bodyReadMsg
565 serveMsgCh chan interface{}
566 flow outflow
567 inflow inflow
568 tlsState *tls.ConnectionState
569 remoteAddrStr string
570 writeSched WriteScheduler
571
572
573 serveG goroutineLock
574 pushEnabled bool
575 sawClientPreface bool
576 sawFirstSettings bool
577 needToSendSettingsAck bool
578 unackedSettings int
579 queuedControlFrames int
580 clientMaxStreams uint32
581 advMaxStreams uint32
582 curClientStreams uint32
583 curPushedStreams uint32
584 curHandlers uint32
585 maxClientStreamID uint32
586 maxPushPromiseID uint32
587 streams map[uint32]*stream
588 unstartedHandlers []unstartedHandler
589 initialStreamSendWindowSize int32
590 maxFrameSize int32
591 peerMaxHeaderListSize uint32
592 canonHeader map[string]string
593 canonHeaderKeysSize int
594 writingFrame bool
595 writingFrameAsync bool
596 needsFrameFlush bool
597 inGoAway bool
598 inFrameScheduleLoop bool
599 needToSendGoAway bool
600 goAwayCode ErrCode
601 shutdownTimer *time.Timer
602 idleTimer *time.Timer
603
604
605 headerWriteBuf bytes.Buffer
606 hpackEncoder *hpack.Encoder
607
608
609 shutdownOnce sync.Once
610 }
611
612 func (sc *serverConn) maxHeaderListSize() uint32 {
613 n := sc.hs.MaxHeaderBytes
614 if n <= 0 {
615 n = http.DefaultMaxHeaderBytes
616 }
617
618
619 const perFieldOverhead = 32
620 const typicalHeaders = 10
621 return uint32(n + typicalHeaders*perFieldOverhead)
622 }
623
624 func (sc *serverConn) curOpenStreams() uint32 {
625 sc.serveG.check()
626 return sc.curClientStreams + sc.curPushedStreams
627 }
628
629
630
631
632
633
634
635
636 type stream struct {
637
638 sc *serverConn
639 id uint32
640 body *pipe
641 cw closeWaiter
642 ctx context.Context
643 cancelCtx func()
644
645
646 bodyBytes int64
647 declBodyBytes int64
648 flow outflow
649 inflow inflow
650 state streamState
651 resetQueued bool
652 gotTrailerHeader bool
653 wroteHeaders bool
654 readDeadline *time.Timer
655 writeDeadline *time.Timer
656 closeErr error
657
658 trailer http.Header
659 reqTrailer http.Header
660 }
661
662 func (sc *serverConn) Framer() *Framer { return sc.framer }
663 func (sc *serverConn) CloseConn() error { return sc.conn.Close() }
664 func (sc *serverConn) Flush() error { return sc.bw.Flush() }
665 func (sc *serverConn) HeaderEncoder() (*hpack.Encoder, *bytes.Buffer) {
666 return sc.hpackEncoder, &sc.headerWriteBuf
667 }
668
669 func (sc *serverConn) state(streamID uint32) (streamState, *stream) {
670 sc.serveG.check()
671
672 if st, ok := sc.streams[streamID]; ok {
673 return st.state, st
674 }
675
676
677
678
679
680
681 if streamID%2 == 1 {
682 if streamID <= sc.maxClientStreamID {
683 return stateClosed, nil
684 }
685 } else {
686 if streamID <= sc.maxPushPromiseID {
687 return stateClosed, nil
688 }
689 }
690 return stateIdle, nil
691 }
692
693
694
695
696 func (sc *serverConn) setConnState(state http.ConnState) {
697 if sc.hs.ConnState != nil {
698 sc.hs.ConnState(sc.conn, state)
699 }
700 }
701
702 func (sc *serverConn) vlogf(format string, args ...interface{}) {
703 if VerboseLogs {
704 sc.logf(format, args...)
705 }
706 }
707
708 func (sc *serverConn) logf(format string, args ...interface{}) {
709 if lg := sc.hs.ErrorLog; lg != nil {
710 lg.Printf(format, args...)
711 } else {
712 log.Printf(format, args...)
713 }
714 }
715
716
717
718
719
720 func errno(v error) uintptr {
721 if rv := reflect.ValueOf(v); rv.Kind() == reflect.Uintptr {
722 return uintptr(rv.Uint())
723 }
724 return 0
725 }
726
727
728
729 func isClosedConnError(err error) bool {
730 if err == nil {
731 return false
732 }
733
734
735
736
737 str := err.Error()
738 if strings.Contains(str, "use of closed network connection") {
739 return true
740 }
741
742
743
744
745
746 if runtime.GOOS == "windows" {
747 if oe, ok := err.(*net.OpError); ok && oe.Op == "read" {
748 if se, ok := oe.Err.(*os.SyscallError); ok && se.Syscall == "wsarecv" {
749 const WSAECONNABORTED = 10053
750 const WSAECONNRESET = 10054
751 if n := errno(se.Err); n == WSAECONNRESET || n == WSAECONNABORTED {
752 return true
753 }
754 }
755 }
756 }
757 return false
758 }
759
760 func (sc *serverConn) condlogf(err error, format string, args ...interface{}) {
761 if err == nil {
762 return
763 }
764 if err == io.EOF || err == io.ErrUnexpectedEOF || isClosedConnError(err) || err == errPrefaceTimeout {
765
766 sc.vlogf(format, args...)
767 } else {
768 sc.logf(format, args...)
769 }
770 }
771
772
773
774
775
776
777 const maxCachedCanonicalHeadersKeysSize = 2048
778
779 func (sc *serverConn) canonicalHeader(v string) string {
780 sc.serveG.check()
781 buildCommonHeaderMapsOnce()
782 cv, ok := commonCanonHeader[v]
783 if ok {
784 return cv
785 }
786 cv, ok = sc.canonHeader[v]
787 if ok {
788 return cv
789 }
790 if sc.canonHeader == nil {
791 sc.canonHeader = make(map[string]string)
792 }
793 cv = http.CanonicalHeaderKey(v)
794 size := 100 + len(v)*2
795 if sc.canonHeaderKeysSize+size <= maxCachedCanonicalHeadersKeysSize {
796 sc.canonHeader[v] = cv
797 sc.canonHeaderKeysSize += size
798 }
799 return cv
800 }
801
802 type readFrameResult struct {
803 f Frame
804 err error
805
806
807
808
809 readMore func()
810 }
811
812
813
814
815
816 func (sc *serverConn) readFrames() {
817 gate := make(gate)
818 gateDone := gate.Done
819 for {
820 f, err := sc.framer.ReadFrame()
821 select {
822 case sc.readFrameCh <- readFrameResult{f, err, gateDone}:
823 case <-sc.doneServing:
824 return
825 }
826 select {
827 case <-gate:
828 case <-sc.doneServing:
829 return
830 }
831 if terminalReadFrameError(err) {
832 return
833 }
834 }
835 }
836
837
838 type frameWriteResult struct {
839 _ incomparable
840 wr FrameWriteRequest
841 err error
842 }
843
844
845
846
847
848 func (sc *serverConn) writeFrameAsync(wr FrameWriteRequest, wd *writeData) {
849 var err error
850 if wd == nil {
851 err = wr.write.writeFrame(sc)
852 } else {
853 err = sc.framer.endWrite()
854 }
855 sc.wroteFrameCh <- frameWriteResult{wr: wr, err: err}
856 }
857
858 func (sc *serverConn) closeAllStreamsOnConnClose() {
859 sc.serveG.check()
860 for _, st := range sc.streams {
861 sc.closeStream(st, errClientDisconnected)
862 }
863 }
864
865 func (sc *serverConn) stopShutdownTimer() {
866 sc.serveG.check()
867 if t := sc.shutdownTimer; t != nil {
868 t.Stop()
869 }
870 }
871
872 func (sc *serverConn) notePanic() {
873
874 if testHookOnPanicMu != nil {
875 testHookOnPanicMu.Lock()
876 defer testHookOnPanicMu.Unlock()
877 }
878 if testHookOnPanic != nil {
879 if e := recover(); e != nil {
880 if testHookOnPanic(sc, e) {
881 panic(e)
882 }
883 }
884 }
885 }
886
887 func (sc *serverConn) serve() {
888 sc.serveG.check()
889 defer sc.notePanic()
890 defer sc.conn.Close()
891 defer sc.closeAllStreamsOnConnClose()
892 defer sc.stopShutdownTimer()
893 defer close(sc.doneServing)
894
895 if VerboseLogs {
896 sc.vlogf("http2: server connection from %v on %p", sc.conn.RemoteAddr(), sc.hs)
897 }
898
899 sc.writeFrame(FrameWriteRequest{
900 write: writeSettings{
901 {SettingMaxFrameSize, sc.srv.maxReadFrameSize()},
902 {SettingMaxConcurrentStreams, sc.advMaxStreams},
903 {SettingMaxHeaderListSize, sc.maxHeaderListSize()},
904 {SettingHeaderTableSize, sc.srv.maxDecoderHeaderTableSize()},
905 {SettingInitialWindowSize, uint32(sc.srv.initialStreamRecvWindowSize())},
906 },
907 })
908 sc.unackedSettings++
909
910
911
912 if diff := sc.srv.initialConnRecvWindowSize() - initialWindowSize; diff > 0 {
913 sc.sendWindowUpdate(nil, int(diff))
914 }
915
916 if err := sc.readPreface(); err != nil {
917 sc.condlogf(err, "http2: server: error reading preface from client %v: %v", sc.conn.RemoteAddr(), err)
918 return
919 }
920
921
922
923
924 sc.setConnState(http.StateActive)
925 sc.setConnState(http.StateIdle)
926
927 if sc.srv.IdleTimeout != 0 {
928 sc.idleTimer = time.AfterFunc(sc.srv.IdleTimeout, sc.onIdleTimer)
929 defer sc.idleTimer.Stop()
930 }
931
932 go sc.readFrames()
933
934 settingsTimer := time.AfterFunc(firstSettingsTimeout, sc.onSettingsTimer)
935 defer settingsTimer.Stop()
936
937 loopNum := 0
938 for {
939 loopNum++
940 select {
941 case wr := <-sc.wantWriteFrameCh:
942 if se, ok := wr.write.(StreamError); ok {
943 sc.resetStream(se)
944 break
945 }
946 sc.writeFrame(wr)
947 case res := <-sc.wroteFrameCh:
948 sc.wroteFrame(res)
949 case res := <-sc.readFrameCh:
950
951
952 if sc.writingFrameAsync {
953 select {
954 case wroteRes := <-sc.wroteFrameCh:
955 sc.wroteFrame(wroteRes)
956 default:
957 }
958 }
959 if !sc.processFrameFromReader(res) {
960 return
961 }
962 res.readMore()
963 if settingsTimer != nil {
964 settingsTimer.Stop()
965 settingsTimer = nil
966 }
967 case m := <-sc.bodyReadCh:
968 sc.noteBodyRead(m.st, m.n)
969 case msg := <-sc.serveMsgCh:
970 switch v := msg.(type) {
971 case func(int):
972 v(loopNum)
973 case *serverMessage:
974 switch v {
975 case settingsTimerMsg:
976 sc.logf("timeout waiting for SETTINGS frames from %v", sc.conn.RemoteAddr())
977 return
978 case idleTimerMsg:
979 sc.vlogf("connection is idle")
980 sc.goAway(ErrCodeNo)
981 case shutdownTimerMsg:
982 sc.vlogf("GOAWAY close timer fired; closing conn from %v", sc.conn.RemoteAddr())
983 return
984 case gracefulShutdownMsg:
985 sc.startGracefulShutdownInternal()
986 case handlerDoneMsg:
987 sc.handlerDone()
988 default:
989 panic("unknown timer")
990 }
991 case *startPushRequest:
992 sc.startPush(v)
993 case func(*serverConn):
994 v(sc)
995 default:
996 panic(fmt.Sprintf("unexpected type %T", v))
997 }
998 }
999
1000
1001
1002
1003 if sc.queuedControlFrames > sc.srv.maxQueuedControlFrames() {
1004 sc.vlogf("http2: too many control frames in send queue, closing connection")
1005 return
1006 }
1007
1008
1009
1010
1011 sentGoAway := sc.inGoAway && !sc.needToSendGoAway && !sc.writingFrame
1012 gracefulShutdownComplete := sc.goAwayCode == ErrCodeNo && sc.curOpenStreams() == 0
1013 if sentGoAway && sc.shutdownTimer == nil && (sc.goAwayCode != ErrCodeNo || gracefulShutdownComplete) {
1014 sc.shutDownIn(goAwayTimeout)
1015 }
1016 }
1017 }
1018
1019 type serverMessage int
1020
1021
1022 var (
1023 settingsTimerMsg = new(serverMessage)
1024 idleTimerMsg = new(serverMessage)
1025 shutdownTimerMsg = new(serverMessage)
1026 gracefulShutdownMsg = new(serverMessage)
1027 handlerDoneMsg = new(serverMessage)
1028 )
1029
1030 func (sc *serverConn) onSettingsTimer() { sc.sendServeMsg(settingsTimerMsg) }
1031 func (sc *serverConn) onIdleTimer() { sc.sendServeMsg(idleTimerMsg) }
1032 func (sc *serverConn) onShutdownTimer() { sc.sendServeMsg(shutdownTimerMsg) }
1033
1034 func (sc *serverConn) sendServeMsg(msg interface{}) {
1035 sc.serveG.checkNotOn()
1036 select {
1037 case sc.serveMsgCh <- msg:
1038 case <-sc.doneServing:
1039 }
1040 }
1041
1042 var errPrefaceTimeout = errors.New("timeout waiting for client preface")
1043
1044
1045
1046
1047 func (sc *serverConn) readPreface() error {
1048 if sc.sawClientPreface {
1049 return nil
1050 }
1051 errc := make(chan error, 1)
1052 go func() {
1053
1054 buf := make([]byte, len(ClientPreface))
1055 if _, err := io.ReadFull(sc.conn, buf); err != nil {
1056 errc <- err
1057 } else if !bytes.Equal(buf, clientPreface) {
1058 errc <- fmt.Errorf("bogus greeting %q", buf)
1059 } else {
1060 errc <- nil
1061 }
1062 }()
1063 timer := time.NewTimer(prefaceTimeout)
1064 defer timer.Stop()
1065 select {
1066 case <-timer.C:
1067 return errPrefaceTimeout
1068 case err := <-errc:
1069 if err == nil {
1070 if VerboseLogs {
1071 sc.vlogf("http2: server: client %v said hello", sc.conn.RemoteAddr())
1072 }
1073 }
1074 return err
1075 }
1076 }
1077
1078 var errChanPool = sync.Pool{
1079 New: func() interface{} { return make(chan error, 1) },
1080 }
1081
1082 var writeDataPool = sync.Pool{
1083 New: func() interface{} { return new(writeData) },
1084 }
1085
1086
1087
1088 func (sc *serverConn) writeDataFromHandler(stream *stream, data []byte, endStream bool) error {
1089 ch := errChanPool.Get().(chan error)
1090 writeArg := writeDataPool.Get().(*writeData)
1091 *writeArg = writeData{stream.id, data, endStream}
1092 err := sc.writeFrameFromHandler(FrameWriteRequest{
1093 write: writeArg,
1094 stream: stream,
1095 done: ch,
1096 })
1097 if err != nil {
1098 return err
1099 }
1100 var frameWriteDone bool
1101 select {
1102 case err = <-ch:
1103 frameWriteDone = true
1104 case <-sc.doneServing:
1105 return errClientDisconnected
1106 case <-stream.cw:
1107
1108
1109
1110
1111
1112
1113
1114 select {
1115 case err = <-ch:
1116 frameWriteDone = true
1117 default:
1118 return errStreamClosed
1119 }
1120 }
1121 errChanPool.Put(ch)
1122 if frameWriteDone {
1123 writeDataPool.Put(writeArg)
1124 }
1125 return err
1126 }
1127
1128
1129
1130
1131
1132
1133
1134
1135 func (sc *serverConn) writeFrameFromHandler(wr FrameWriteRequest) error {
1136 sc.serveG.checkNotOn()
1137 select {
1138 case sc.wantWriteFrameCh <- wr:
1139 return nil
1140 case <-sc.doneServing:
1141
1142
1143 return errClientDisconnected
1144 }
1145 }
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155 func (sc *serverConn) writeFrame(wr FrameWriteRequest) {
1156 sc.serveG.check()
1157
1158
1159 var ignoreWrite bool
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179 if wr.StreamID() != 0 {
1180 _, isReset := wr.write.(StreamError)
1181 if state, _ := sc.state(wr.StreamID()); state == stateClosed && !isReset {
1182 ignoreWrite = true
1183 }
1184 }
1185
1186
1187
1188 switch wr.write.(type) {
1189 case *writeResHeaders:
1190 wr.stream.wroteHeaders = true
1191 case write100ContinueHeadersFrame:
1192 if wr.stream.wroteHeaders {
1193
1194
1195 if wr.done != nil {
1196 panic("wr.done != nil for write100ContinueHeadersFrame")
1197 }
1198 ignoreWrite = true
1199 }
1200 }
1201
1202 if !ignoreWrite {
1203 if wr.isControl() {
1204 sc.queuedControlFrames++
1205
1206
1207 if sc.queuedControlFrames < 0 {
1208 sc.conn.Close()
1209 }
1210 }
1211 sc.writeSched.Push(wr)
1212 }
1213 sc.scheduleFrameWrite()
1214 }
1215
1216
1217
1218
1219 func (sc *serverConn) startFrameWrite(wr FrameWriteRequest) {
1220 sc.serveG.check()
1221 if sc.writingFrame {
1222 panic("internal error: can only be writing one frame at a time")
1223 }
1224
1225 st := wr.stream
1226 if st != nil {
1227 switch st.state {
1228 case stateHalfClosedLocal:
1229 switch wr.write.(type) {
1230 case StreamError, handlerPanicRST, writeWindowUpdate:
1231
1232
1233 default:
1234 panic(fmt.Sprintf("internal error: attempt to send frame on a half-closed-local stream: %v", wr))
1235 }
1236 case stateClosed:
1237 panic(fmt.Sprintf("internal error: attempt to send frame on a closed stream: %v", wr))
1238 }
1239 }
1240 if wpp, ok := wr.write.(*writePushPromise); ok {
1241 var err error
1242 wpp.promisedID, err = wpp.allocatePromisedID()
1243 if err != nil {
1244 sc.writingFrameAsync = false
1245 wr.replyToWriter(err)
1246 return
1247 }
1248 }
1249
1250 sc.writingFrame = true
1251 sc.needsFrameFlush = true
1252 if wr.write.staysWithinBuffer(sc.bw.Available()) {
1253 sc.writingFrameAsync = false
1254 err := wr.write.writeFrame(sc)
1255 sc.wroteFrame(frameWriteResult{wr: wr, err: err})
1256 } else if wd, ok := wr.write.(*writeData); ok {
1257
1258
1259
1260 sc.framer.startWriteDataPadded(wd.streamID, wd.endStream, wd.p, nil)
1261 sc.writingFrameAsync = true
1262 go sc.writeFrameAsync(wr, wd)
1263 } else {
1264 sc.writingFrameAsync = true
1265 go sc.writeFrameAsync(wr, nil)
1266 }
1267 }
1268
1269
1270
1271
1272 var errHandlerPanicked = errors.New("http2: handler panicked")
1273
1274
1275
1276 func (sc *serverConn) wroteFrame(res frameWriteResult) {
1277 sc.serveG.check()
1278 if !sc.writingFrame {
1279 panic("internal error: expected to be already writing a frame")
1280 }
1281 sc.writingFrame = false
1282 sc.writingFrameAsync = false
1283
1284 wr := res.wr
1285
1286 if writeEndsStream(wr.write) {
1287 st := wr.stream
1288 if st == nil {
1289 panic("internal error: expecting non-nil stream")
1290 }
1291 switch st.state {
1292 case stateOpen:
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303 st.state = stateHalfClosedLocal
1304
1305
1306
1307
1308 sc.resetStream(streamError(st.id, ErrCodeNo))
1309 case stateHalfClosedRemote:
1310 sc.closeStream(st, errHandlerComplete)
1311 }
1312 } else {
1313 switch v := wr.write.(type) {
1314 case StreamError:
1315
1316 if st, ok := sc.streams[v.StreamID]; ok {
1317 sc.closeStream(st, v)
1318 }
1319 case handlerPanicRST:
1320 sc.closeStream(wr.stream, errHandlerPanicked)
1321 }
1322 }
1323
1324
1325 wr.replyToWriter(res.err)
1326
1327 sc.scheduleFrameWrite()
1328 }
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340 func (sc *serverConn) scheduleFrameWrite() {
1341 sc.serveG.check()
1342 if sc.writingFrame || sc.inFrameScheduleLoop {
1343 return
1344 }
1345 sc.inFrameScheduleLoop = true
1346 for !sc.writingFrameAsync {
1347 if sc.needToSendGoAway {
1348 sc.needToSendGoAway = false
1349 sc.startFrameWrite(FrameWriteRequest{
1350 write: &writeGoAway{
1351 maxStreamID: sc.maxClientStreamID,
1352 code: sc.goAwayCode,
1353 },
1354 })
1355 continue
1356 }
1357 if sc.needToSendSettingsAck {
1358 sc.needToSendSettingsAck = false
1359 sc.startFrameWrite(FrameWriteRequest{write: writeSettingsAck{}})
1360 continue
1361 }
1362 if !sc.inGoAway || sc.goAwayCode == ErrCodeNo {
1363 if wr, ok := sc.writeSched.Pop(); ok {
1364 if wr.isControl() {
1365 sc.queuedControlFrames--
1366 }
1367 sc.startFrameWrite(wr)
1368 continue
1369 }
1370 }
1371 if sc.needsFrameFlush {
1372 sc.startFrameWrite(FrameWriteRequest{write: flushFrameWriter{}})
1373 sc.needsFrameFlush = false
1374 continue
1375 }
1376 break
1377 }
1378 sc.inFrameScheduleLoop = false
1379 }
1380
1381
1382
1383
1384
1385
1386
1387
1388 func (sc *serverConn) startGracefulShutdown() {
1389 sc.serveG.checkNotOn()
1390 sc.shutdownOnce.Do(func() { sc.sendServeMsg(gracefulShutdownMsg) })
1391 }
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409 var goAwayTimeout = 1 * time.Second
1410
1411 func (sc *serverConn) startGracefulShutdownInternal() {
1412 sc.goAway(ErrCodeNo)
1413 }
1414
1415 func (sc *serverConn) goAway(code ErrCode) {
1416 sc.serveG.check()
1417 if sc.inGoAway {
1418 if sc.goAwayCode == ErrCodeNo {
1419 sc.goAwayCode = code
1420 }
1421 return
1422 }
1423 sc.inGoAway = true
1424 sc.needToSendGoAway = true
1425 sc.goAwayCode = code
1426 sc.scheduleFrameWrite()
1427 }
1428
1429 func (sc *serverConn) shutDownIn(d time.Duration) {
1430 sc.serveG.check()
1431 sc.shutdownTimer = time.AfterFunc(d, sc.onShutdownTimer)
1432 }
1433
1434 func (sc *serverConn) resetStream(se StreamError) {
1435 sc.serveG.check()
1436 sc.writeFrame(FrameWriteRequest{write: se})
1437 if st, ok := sc.streams[se.StreamID]; ok {
1438 st.resetQueued = true
1439 }
1440 }
1441
1442
1443
1444
1445 func (sc *serverConn) processFrameFromReader(res readFrameResult) bool {
1446 sc.serveG.check()
1447 err := res.err
1448 if err != nil {
1449 if err == ErrFrameTooLarge {
1450 sc.goAway(ErrCodeFrameSize)
1451 return true
1452 }
1453 clientGone := err == io.EOF || err == io.ErrUnexpectedEOF || isClosedConnError(err)
1454 if clientGone {
1455
1456
1457
1458
1459
1460
1461
1462
1463 return false
1464 }
1465 } else {
1466 f := res.f
1467 if VerboseLogs {
1468 sc.vlogf("http2: server read frame %v", summarizeFrame(f))
1469 }
1470 err = sc.processFrame(f)
1471 if err == nil {
1472 return true
1473 }
1474 }
1475
1476 switch ev := err.(type) {
1477 case StreamError:
1478 sc.resetStream(ev)
1479 return true
1480 case goAwayFlowError:
1481 sc.goAway(ErrCodeFlowControl)
1482 return true
1483 case ConnectionError:
1484 sc.logf("http2: server connection error from %v: %v", sc.conn.RemoteAddr(), ev)
1485 sc.goAway(ErrCode(ev))
1486 return true
1487 default:
1488 if res.err != nil {
1489 sc.vlogf("http2: server closing client connection; error reading frame from client %s: %v", sc.conn.RemoteAddr(), err)
1490 } else {
1491 sc.logf("http2: server closing client connection: %v", err)
1492 }
1493 return false
1494 }
1495 }
1496
1497 func (sc *serverConn) processFrame(f Frame) error {
1498 sc.serveG.check()
1499
1500
1501 if !sc.sawFirstSettings {
1502 if _, ok := f.(*SettingsFrame); !ok {
1503 return sc.countError("first_settings", ConnectionError(ErrCodeProtocol))
1504 }
1505 sc.sawFirstSettings = true
1506 }
1507
1508
1509
1510
1511
1512 if sc.inGoAway && (sc.goAwayCode != ErrCodeNo || f.Header().StreamID > sc.maxClientStreamID) {
1513
1514 if f, ok := f.(*DataFrame); ok {
1515 if !sc.inflow.take(f.Length) {
1516 return sc.countError("data_flow", streamError(f.Header().StreamID, ErrCodeFlowControl))
1517 }
1518 sc.sendWindowUpdate(nil, int(f.Length))
1519 }
1520 return nil
1521 }
1522
1523 switch f := f.(type) {
1524 case *SettingsFrame:
1525 return sc.processSettings(f)
1526 case *MetaHeadersFrame:
1527 return sc.processHeaders(f)
1528 case *WindowUpdateFrame:
1529 return sc.processWindowUpdate(f)
1530 case *PingFrame:
1531 return sc.processPing(f)
1532 case *DataFrame:
1533 return sc.processData(f)
1534 case *RSTStreamFrame:
1535 return sc.processResetStream(f)
1536 case *PriorityFrame:
1537 return sc.processPriority(f)
1538 case *GoAwayFrame:
1539 return sc.processGoAway(f)
1540 case *PushPromiseFrame:
1541
1542
1543 return sc.countError("push_promise", ConnectionError(ErrCodeProtocol))
1544 default:
1545 sc.vlogf("http2: server ignoring frame: %v", f.Header())
1546 return nil
1547 }
1548 }
1549
1550 func (sc *serverConn) processPing(f *PingFrame) error {
1551 sc.serveG.check()
1552 if f.IsAck() {
1553
1554
1555 return nil
1556 }
1557 if f.StreamID != 0 {
1558
1559
1560
1561
1562
1563 return sc.countError("ping_on_stream", ConnectionError(ErrCodeProtocol))
1564 }
1565 sc.writeFrame(FrameWriteRequest{write: writePingAck{f}})
1566 return nil
1567 }
1568
1569 func (sc *serverConn) processWindowUpdate(f *WindowUpdateFrame) error {
1570 sc.serveG.check()
1571 switch {
1572 case f.StreamID != 0:
1573 state, st := sc.state(f.StreamID)
1574 if state == stateIdle {
1575
1576
1577
1578
1579 return sc.countError("stream_idle", ConnectionError(ErrCodeProtocol))
1580 }
1581 if st == nil {
1582
1583
1584
1585
1586
1587 return nil
1588 }
1589 if !st.flow.add(int32(f.Increment)) {
1590 return sc.countError("bad_flow", streamError(f.StreamID, ErrCodeFlowControl))
1591 }
1592 default:
1593 if !sc.flow.add(int32(f.Increment)) {
1594 return goAwayFlowError{}
1595 }
1596 }
1597 sc.scheduleFrameWrite()
1598 return nil
1599 }
1600
1601 func (sc *serverConn) processResetStream(f *RSTStreamFrame) error {
1602 sc.serveG.check()
1603
1604 state, st := sc.state(f.StreamID)
1605 if state == stateIdle {
1606
1607
1608
1609
1610
1611 return sc.countError("reset_idle_stream", ConnectionError(ErrCodeProtocol))
1612 }
1613 if st != nil {
1614 st.cancelCtx()
1615 sc.closeStream(st, streamError(f.StreamID, f.ErrCode))
1616 }
1617 return nil
1618 }
1619
1620 func (sc *serverConn) closeStream(st *stream, err error) {
1621 sc.serveG.check()
1622 if st.state == stateIdle || st.state == stateClosed {
1623 panic(fmt.Sprintf("invariant; can't close stream in state %v", st.state))
1624 }
1625 st.state = stateClosed
1626 if st.readDeadline != nil {
1627 st.readDeadline.Stop()
1628 }
1629 if st.writeDeadline != nil {
1630 st.writeDeadline.Stop()
1631 }
1632 if st.isPushed() {
1633 sc.curPushedStreams--
1634 } else {
1635 sc.curClientStreams--
1636 }
1637 delete(sc.streams, st.id)
1638 if len(sc.streams) == 0 {
1639 sc.setConnState(http.StateIdle)
1640 if sc.srv.IdleTimeout != 0 {
1641 sc.idleTimer.Reset(sc.srv.IdleTimeout)
1642 }
1643 if h1ServerKeepAlivesDisabled(sc.hs) {
1644 sc.startGracefulShutdownInternal()
1645 }
1646 }
1647 if p := st.body; p != nil {
1648
1649
1650 sc.sendWindowUpdate(nil, p.Len())
1651
1652 p.CloseWithError(err)
1653 }
1654 if e, ok := err.(StreamError); ok {
1655 if e.Cause != nil {
1656 err = e.Cause
1657 } else {
1658 err = errStreamClosed
1659 }
1660 }
1661 st.closeErr = err
1662 st.cw.Close()
1663 sc.writeSched.CloseStream(st.id)
1664 }
1665
1666 func (sc *serverConn) processSettings(f *SettingsFrame) error {
1667 sc.serveG.check()
1668 if f.IsAck() {
1669 sc.unackedSettings--
1670 if sc.unackedSettings < 0 {
1671
1672
1673
1674 return sc.countError("ack_mystery", ConnectionError(ErrCodeProtocol))
1675 }
1676 return nil
1677 }
1678 if f.NumSettings() > 100 || f.HasDuplicates() {
1679
1680
1681
1682 return sc.countError("settings_big_or_dups", ConnectionError(ErrCodeProtocol))
1683 }
1684 if err := f.ForeachSetting(sc.processSetting); err != nil {
1685 return err
1686 }
1687
1688
1689 sc.needToSendSettingsAck = true
1690 sc.scheduleFrameWrite()
1691 return nil
1692 }
1693
1694 func (sc *serverConn) processSetting(s Setting) error {
1695 sc.serveG.check()
1696 if err := s.Valid(); err != nil {
1697 return err
1698 }
1699 if VerboseLogs {
1700 sc.vlogf("http2: server processing setting %v", s)
1701 }
1702 switch s.ID {
1703 case SettingHeaderTableSize:
1704 sc.hpackEncoder.SetMaxDynamicTableSize(s.Val)
1705 case SettingEnablePush:
1706 sc.pushEnabled = s.Val != 0
1707 case SettingMaxConcurrentStreams:
1708 sc.clientMaxStreams = s.Val
1709 case SettingInitialWindowSize:
1710 return sc.processSettingInitialWindowSize(s.Val)
1711 case SettingMaxFrameSize:
1712 sc.maxFrameSize = int32(s.Val)
1713 case SettingMaxHeaderListSize:
1714 sc.peerMaxHeaderListSize = s.Val
1715 default:
1716
1717
1718
1719 if VerboseLogs {
1720 sc.vlogf("http2: server ignoring unknown setting %v", s)
1721 }
1722 }
1723 return nil
1724 }
1725
1726 func (sc *serverConn) processSettingInitialWindowSize(val uint32) error {
1727 sc.serveG.check()
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737 old := sc.initialStreamSendWindowSize
1738 sc.initialStreamSendWindowSize = int32(val)
1739 growth := int32(val) - old
1740 for _, st := range sc.streams {
1741 if !st.flow.add(growth) {
1742
1743
1744
1745
1746
1747
1748 return sc.countError("setting_win_size", ConnectionError(ErrCodeFlowControl))
1749 }
1750 }
1751 return nil
1752 }
1753
1754 func (sc *serverConn) processData(f *DataFrame) error {
1755 sc.serveG.check()
1756 id := f.Header().StreamID
1757
1758 data := f.Data()
1759 state, st := sc.state(id)
1760 if id == 0 || state == stateIdle {
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771 return sc.countError("data_on_idle", ConnectionError(ErrCodeProtocol))
1772 }
1773
1774
1775
1776
1777 if st == nil || state != stateOpen || st.gotTrailerHeader || st.resetQueued {
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787 if !sc.inflow.take(f.Length) {
1788 return sc.countError("data_flow", streamError(id, ErrCodeFlowControl))
1789 }
1790 sc.sendWindowUpdate(nil, int(f.Length))
1791
1792 if st != nil && st.resetQueued {
1793
1794 return nil
1795 }
1796 return sc.countError("closed", streamError(id, ErrCodeStreamClosed))
1797 }
1798 if st.body == nil {
1799 panic("internal error: should have a body in this state")
1800 }
1801
1802
1803 if st.declBodyBytes != -1 && st.bodyBytes+int64(len(data)) > st.declBodyBytes {
1804 if !sc.inflow.take(f.Length) {
1805 return sc.countError("data_flow", streamError(id, ErrCodeFlowControl))
1806 }
1807 sc.sendWindowUpdate(nil, int(f.Length))
1808
1809 st.body.CloseWithError(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes))
1810
1811
1812
1813 return sc.countError("send_too_much", streamError(id, ErrCodeProtocol))
1814 }
1815 if f.Length > 0 {
1816
1817 if !takeInflows(&sc.inflow, &st.inflow, f.Length) {
1818 return sc.countError("flow_on_data_length", streamError(id, ErrCodeFlowControl))
1819 }
1820
1821 if len(data) > 0 {
1822 st.bodyBytes += int64(len(data))
1823 wrote, err := st.body.Write(data)
1824 if err != nil {
1825
1826
1827
1828 sc.sendWindowUpdate(nil, int(f.Length)-wrote)
1829 return nil
1830 }
1831 if wrote != len(data) {
1832 panic("internal error: bad Writer")
1833 }
1834 }
1835
1836
1837
1838
1839
1840
1841 pad := int32(f.Length) - int32(len(data))
1842 sc.sendWindowUpdate32(nil, pad)
1843 sc.sendWindowUpdate32(st, pad)
1844 }
1845 if f.StreamEnded() {
1846 st.endStream()
1847 }
1848 return nil
1849 }
1850
1851 func (sc *serverConn) processGoAway(f *GoAwayFrame) error {
1852 sc.serveG.check()
1853 if f.ErrCode != ErrCodeNo {
1854 sc.logf("http2: received GOAWAY %+v, starting graceful shutdown", f)
1855 } else {
1856 sc.vlogf("http2: received GOAWAY %+v, starting graceful shutdown", f)
1857 }
1858 sc.startGracefulShutdownInternal()
1859
1860
1861 sc.pushEnabled = false
1862 return nil
1863 }
1864
1865
1866 func (st *stream) isPushed() bool {
1867 return st.id%2 == 0
1868 }
1869
1870
1871
1872 func (st *stream) endStream() {
1873 sc := st.sc
1874 sc.serveG.check()
1875
1876 if st.declBodyBytes != -1 && st.declBodyBytes != st.bodyBytes {
1877 st.body.CloseWithError(fmt.Errorf("request declared a Content-Length of %d but only wrote %d bytes",
1878 st.declBodyBytes, st.bodyBytes))
1879 } else {
1880 st.body.closeWithErrorAndCode(io.EOF, st.copyTrailersToHandlerRequest)
1881 st.body.CloseWithError(io.EOF)
1882 }
1883 st.state = stateHalfClosedRemote
1884 }
1885
1886
1887
1888 func (st *stream) copyTrailersToHandlerRequest() {
1889 for k, vv := range st.trailer {
1890 if _, ok := st.reqTrailer[k]; ok {
1891
1892 st.reqTrailer[k] = vv
1893 }
1894 }
1895 }
1896
1897
1898
1899 func (st *stream) onReadTimeout() {
1900 if st.body != nil {
1901
1902
1903 st.body.CloseWithError(fmt.Errorf("%w", os.ErrDeadlineExceeded))
1904 }
1905 }
1906
1907
1908
1909 func (st *stream) onWriteTimeout() {
1910 st.sc.writeFrameFromHandler(FrameWriteRequest{write: StreamError{
1911 StreamID: st.id,
1912 Code: ErrCodeInternal,
1913 Cause: os.ErrDeadlineExceeded,
1914 }})
1915 }
1916
1917 func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error {
1918 sc.serveG.check()
1919 id := f.StreamID
1920
1921
1922
1923
1924
1925 if id%2 != 1 {
1926 return sc.countError("headers_even", ConnectionError(ErrCodeProtocol))
1927 }
1928
1929
1930
1931
1932 if st := sc.streams[f.StreamID]; st != nil {
1933 if st.resetQueued {
1934
1935
1936 return nil
1937 }
1938
1939
1940
1941
1942 if st.state == stateHalfClosedRemote {
1943 return sc.countError("headers_half_closed", streamError(id, ErrCodeStreamClosed))
1944 }
1945 return st.processTrailerHeaders(f)
1946 }
1947
1948
1949
1950
1951
1952
1953 if id <= sc.maxClientStreamID {
1954 return sc.countError("stream_went_down", ConnectionError(ErrCodeProtocol))
1955 }
1956 sc.maxClientStreamID = id
1957
1958 if sc.idleTimer != nil {
1959 sc.idleTimer.Stop()
1960 }
1961
1962
1963
1964
1965
1966
1967
1968 if sc.curClientStreams+1 > sc.advMaxStreams {
1969 if sc.unackedSettings == 0 {
1970
1971 return sc.countError("over_max_streams", streamError(id, ErrCodeProtocol))
1972 }
1973
1974
1975
1976
1977
1978 return sc.countError("over_max_streams_race", streamError(id, ErrCodeRefusedStream))
1979 }
1980
1981 initialState := stateOpen
1982 if f.StreamEnded() {
1983 initialState = stateHalfClosedRemote
1984 }
1985 st := sc.newStream(id, 0, initialState)
1986
1987 if f.HasPriority() {
1988 if err := sc.checkPriority(f.StreamID, f.Priority); err != nil {
1989 return err
1990 }
1991 sc.writeSched.AdjustStream(st.id, f.Priority)
1992 }
1993
1994 rw, req, err := sc.newWriterAndRequest(st, f)
1995 if err != nil {
1996 return err
1997 }
1998 st.reqTrailer = req.Trailer
1999 if st.reqTrailer != nil {
2000 st.trailer = make(http.Header)
2001 }
2002 st.body = req.Body.(*requestBody).pipe
2003 st.declBodyBytes = req.ContentLength
2004
2005 handler := sc.handler.ServeHTTP
2006 if f.Truncated {
2007
2008 handler = handleHeaderListTooLong
2009 } else if err := checkValidHTTP2RequestHeaders(req.Header); err != nil {
2010 handler = new400Handler(err)
2011 }
2012
2013
2014
2015
2016
2017
2018
2019
2020 if sc.hs.ReadTimeout != 0 {
2021 sc.conn.SetReadDeadline(time.Time{})
2022 st.readDeadline = time.AfterFunc(sc.hs.ReadTimeout, st.onReadTimeout)
2023 }
2024
2025 return sc.scheduleHandler(id, rw, req, handler)
2026 }
2027
2028 func (sc *serverConn) upgradeRequest(req *http.Request) {
2029 sc.serveG.check()
2030 id := uint32(1)
2031 sc.maxClientStreamID = id
2032 st := sc.newStream(id, 0, stateHalfClosedRemote)
2033 st.reqTrailer = req.Trailer
2034 if st.reqTrailer != nil {
2035 st.trailer = make(http.Header)
2036 }
2037 rw := sc.newResponseWriter(st, req)
2038
2039
2040
2041 if sc.hs.ReadTimeout != 0 {
2042 sc.conn.SetReadDeadline(time.Time{})
2043 }
2044
2045
2046
2047
2048 sc.curHandlers++
2049 go sc.runHandler(rw, req, sc.handler.ServeHTTP)
2050 }
2051
2052 func (st *stream) processTrailerHeaders(f *MetaHeadersFrame) error {
2053 sc := st.sc
2054 sc.serveG.check()
2055 if st.gotTrailerHeader {
2056 return sc.countError("dup_trailers", ConnectionError(ErrCodeProtocol))
2057 }
2058 st.gotTrailerHeader = true
2059 if !f.StreamEnded() {
2060 return sc.countError("trailers_not_ended", streamError(st.id, ErrCodeProtocol))
2061 }
2062
2063 if len(f.PseudoFields()) > 0 {
2064 return sc.countError("trailers_pseudo", streamError(st.id, ErrCodeProtocol))
2065 }
2066 if st.trailer != nil {
2067 for _, hf := range f.RegularFields() {
2068 key := sc.canonicalHeader(hf.Name)
2069 if !httpguts.ValidTrailerHeader(key) {
2070
2071
2072
2073 return sc.countError("trailers_bogus", streamError(st.id, ErrCodeProtocol))
2074 }
2075 st.trailer[key] = append(st.trailer[key], hf.Value)
2076 }
2077 }
2078 st.endStream()
2079 return nil
2080 }
2081
2082 func (sc *serverConn) checkPriority(streamID uint32, p PriorityParam) error {
2083 if streamID == p.StreamDep {
2084
2085
2086
2087
2088 return sc.countError("priority", streamError(streamID, ErrCodeProtocol))
2089 }
2090 return nil
2091 }
2092
2093 func (sc *serverConn) processPriority(f *PriorityFrame) error {
2094 if err := sc.checkPriority(f.StreamID, f.PriorityParam); err != nil {
2095 return err
2096 }
2097 sc.writeSched.AdjustStream(f.StreamID, f.PriorityParam)
2098 return nil
2099 }
2100
2101 func (sc *serverConn) newStream(id, pusherID uint32, state streamState) *stream {
2102 sc.serveG.check()
2103 if id == 0 {
2104 panic("internal error: cannot create stream with id 0")
2105 }
2106
2107 ctx, cancelCtx := context.WithCancel(sc.baseCtx)
2108 st := &stream{
2109 sc: sc,
2110 id: id,
2111 state: state,
2112 ctx: ctx,
2113 cancelCtx: cancelCtx,
2114 }
2115 st.cw.Init()
2116 st.flow.conn = &sc.flow
2117 st.flow.add(sc.initialStreamSendWindowSize)
2118 st.inflow.init(sc.srv.initialStreamRecvWindowSize())
2119 if sc.hs.WriteTimeout != 0 {
2120 st.writeDeadline = time.AfterFunc(sc.hs.WriteTimeout, st.onWriteTimeout)
2121 }
2122
2123 sc.streams[id] = st
2124 sc.writeSched.OpenStream(st.id, OpenStreamOptions{PusherID: pusherID})
2125 if st.isPushed() {
2126 sc.curPushedStreams++
2127 } else {
2128 sc.curClientStreams++
2129 }
2130 if sc.curOpenStreams() == 1 {
2131 sc.setConnState(http.StateActive)
2132 }
2133
2134 return st
2135 }
2136
2137 func (sc *serverConn) newWriterAndRequest(st *stream, f *MetaHeadersFrame) (*responseWriter, *http.Request, error) {
2138 sc.serveG.check()
2139
2140 rp := requestParam{
2141 method: f.PseudoValue("method"),
2142 scheme: f.PseudoValue("scheme"),
2143 authority: f.PseudoValue("authority"),
2144 path: f.PseudoValue("path"),
2145 }
2146
2147 isConnect := rp.method == "CONNECT"
2148 if isConnect {
2149 if rp.path != "" || rp.scheme != "" || rp.authority == "" {
2150 return nil, nil, sc.countError("bad_connect", streamError(f.StreamID, ErrCodeProtocol))
2151 }
2152 } else if rp.method == "" || rp.path == "" || (rp.scheme != "https" && rp.scheme != "http") {
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163 return nil, nil, sc.countError("bad_path_method", streamError(f.StreamID, ErrCodeProtocol))
2164 }
2165
2166 rp.header = make(http.Header)
2167 for _, hf := range f.RegularFields() {
2168 rp.header.Add(sc.canonicalHeader(hf.Name), hf.Value)
2169 }
2170 if rp.authority == "" {
2171 rp.authority = rp.header.Get("Host")
2172 }
2173
2174 rw, req, err := sc.newWriterAndRequestNoBody(st, rp)
2175 if err != nil {
2176 return nil, nil, err
2177 }
2178 bodyOpen := !f.StreamEnded()
2179 if bodyOpen {
2180 if vv, ok := rp.header["Content-Length"]; ok {
2181 if cl, err := strconv.ParseUint(vv[0], 10, 63); err == nil {
2182 req.ContentLength = int64(cl)
2183 } else {
2184 req.ContentLength = 0
2185 }
2186 } else {
2187 req.ContentLength = -1
2188 }
2189 req.Body.(*requestBody).pipe = &pipe{
2190 b: &dataBuffer{expected: req.ContentLength},
2191 }
2192 }
2193 return rw, req, nil
2194 }
2195
2196 type requestParam struct {
2197 method string
2198 scheme, authority, path string
2199 header http.Header
2200 }
2201
2202 func (sc *serverConn) newWriterAndRequestNoBody(st *stream, rp requestParam) (*responseWriter, *http.Request, error) {
2203 sc.serveG.check()
2204
2205 var tlsState *tls.ConnectionState
2206 if rp.scheme == "https" {
2207 tlsState = sc.tlsState
2208 }
2209
2210 needsContinue := httpguts.HeaderValuesContainsToken(rp.header["Expect"], "100-continue")
2211 if needsContinue {
2212 rp.header.Del("Expect")
2213 }
2214
2215 if cookies := rp.header["Cookie"]; len(cookies) > 1 {
2216 rp.header.Set("Cookie", strings.Join(cookies, "; "))
2217 }
2218
2219
2220 var trailer http.Header
2221 for _, v := range rp.header["Trailer"] {
2222 for _, key := range strings.Split(v, ",") {
2223 key = http.CanonicalHeaderKey(textproto.TrimString(key))
2224 switch key {
2225 case "Transfer-Encoding", "Trailer", "Content-Length":
2226
2227
2228 default:
2229 if trailer == nil {
2230 trailer = make(http.Header)
2231 }
2232 trailer[key] = nil
2233 }
2234 }
2235 }
2236 delete(rp.header, "Trailer")
2237
2238 var url_ *url.URL
2239 var requestURI string
2240 if rp.method == "CONNECT" {
2241 url_ = &url.URL{Host: rp.authority}
2242 requestURI = rp.authority
2243 } else {
2244 var err error
2245 url_, err = url.ParseRequestURI(rp.path)
2246 if err != nil {
2247 return nil, nil, sc.countError("bad_path", streamError(st.id, ErrCodeProtocol))
2248 }
2249 requestURI = rp.path
2250 }
2251
2252 body := &requestBody{
2253 conn: sc,
2254 stream: st,
2255 needsContinue: needsContinue,
2256 }
2257 req := &http.Request{
2258 Method: rp.method,
2259 URL: url_,
2260 RemoteAddr: sc.remoteAddrStr,
2261 Header: rp.header,
2262 RequestURI: requestURI,
2263 Proto: "HTTP/2.0",
2264 ProtoMajor: 2,
2265 ProtoMinor: 0,
2266 TLS: tlsState,
2267 Host: rp.authority,
2268 Body: body,
2269 Trailer: trailer,
2270 }
2271 req = req.WithContext(st.ctx)
2272
2273 rw := sc.newResponseWriter(st, req)
2274 return rw, req, nil
2275 }
2276
2277 func (sc *serverConn) newResponseWriter(st *stream, req *http.Request) *responseWriter {
2278 rws := responseWriterStatePool.Get().(*responseWriterState)
2279 bwSave := rws.bw
2280 *rws = responseWriterState{}
2281 rws.conn = sc
2282 rws.bw = bwSave
2283 rws.bw.Reset(chunkWriter{rws})
2284 rws.stream = st
2285 rws.req = req
2286 return &responseWriter{rws: rws}
2287 }
2288
2289 type unstartedHandler struct {
2290 streamID uint32
2291 rw *responseWriter
2292 req *http.Request
2293 handler func(http.ResponseWriter, *http.Request)
2294 }
2295
2296
2297
2298 func (sc *serverConn) scheduleHandler(streamID uint32, rw *responseWriter, req *http.Request, handler func(http.ResponseWriter, *http.Request)) error {
2299 sc.serveG.check()
2300 maxHandlers := sc.advMaxStreams
2301 if sc.curHandlers < maxHandlers {
2302 sc.curHandlers++
2303 go sc.runHandler(rw, req, handler)
2304 return nil
2305 }
2306 if len(sc.unstartedHandlers) > int(4*sc.advMaxStreams) {
2307 return sc.countError("too_many_early_resets", ConnectionError(ErrCodeEnhanceYourCalm))
2308 }
2309 sc.unstartedHandlers = append(sc.unstartedHandlers, unstartedHandler{
2310 streamID: streamID,
2311 rw: rw,
2312 req: req,
2313 handler: handler,
2314 })
2315 return nil
2316 }
2317
2318 func (sc *serverConn) handlerDone() {
2319 sc.serveG.check()
2320 sc.curHandlers--
2321 i := 0
2322 maxHandlers := sc.advMaxStreams
2323 for ; i < len(sc.unstartedHandlers); i++ {
2324 u := sc.unstartedHandlers[i]
2325 if sc.streams[u.streamID] == nil {
2326
2327 continue
2328 }
2329 if sc.curHandlers >= maxHandlers {
2330 break
2331 }
2332 sc.curHandlers++
2333 go sc.runHandler(u.rw, u.req, u.handler)
2334 sc.unstartedHandlers[i] = unstartedHandler{}
2335 }
2336 sc.unstartedHandlers = sc.unstartedHandlers[i:]
2337 if len(sc.unstartedHandlers) == 0 {
2338 sc.unstartedHandlers = nil
2339 }
2340 }
2341
2342
2343 func (sc *serverConn) runHandler(rw *responseWriter, req *http.Request, handler func(http.ResponseWriter, *http.Request)) {
2344 defer sc.sendServeMsg(handlerDoneMsg)
2345 didPanic := true
2346 defer func() {
2347 rw.rws.stream.cancelCtx()
2348 if req.MultipartForm != nil {
2349 req.MultipartForm.RemoveAll()
2350 }
2351 if didPanic {
2352 e := recover()
2353 sc.writeFrameFromHandler(FrameWriteRequest{
2354 write: handlerPanicRST{rw.rws.stream.id},
2355 stream: rw.rws.stream,
2356 })
2357
2358 if e != nil && e != http.ErrAbortHandler {
2359 const size = 64 << 10
2360 buf := make([]byte, size)
2361 buf = buf[:runtime.Stack(buf, false)]
2362 sc.logf("http2: panic serving %v: %v\n%s", sc.conn.RemoteAddr(), e, buf)
2363 }
2364 return
2365 }
2366 rw.handlerDone()
2367 }()
2368 handler(rw, req)
2369 didPanic = false
2370 }
2371
2372 func handleHeaderListTooLong(w http.ResponseWriter, r *http.Request) {
2373
2374
2375
2376
2377 const statusRequestHeaderFieldsTooLarge = 431
2378 w.WriteHeader(statusRequestHeaderFieldsTooLarge)
2379 io.WriteString(w, "<h1>HTTP Error 431</h1><p>Request Header Field(s) Too Large</p>")
2380 }
2381
2382
2383
2384 func (sc *serverConn) writeHeaders(st *stream, headerData *writeResHeaders) error {
2385 sc.serveG.checkNotOn()
2386 var errc chan error
2387 if headerData.h != nil {
2388
2389
2390
2391
2392 errc = errChanPool.Get().(chan error)
2393 }
2394 if err := sc.writeFrameFromHandler(FrameWriteRequest{
2395 write: headerData,
2396 stream: st,
2397 done: errc,
2398 }); err != nil {
2399 return err
2400 }
2401 if errc != nil {
2402 select {
2403 case err := <-errc:
2404 errChanPool.Put(errc)
2405 return err
2406 case <-sc.doneServing:
2407 return errClientDisconnected
2408 case <-st.cw:
2409 return errStreamClosed
2410 }
2411 }
2412 return nil
2413 }
2414
2415
2416 func (sc *serverConn) write100ContinueHeaders(st *stream) {
2417 sc.writeFrameFromHandler(FrameWriteRequest{
2418 write: write100ContinueHeadersFrame{st.id},
2419 stream: st,
2420 })
2421 }
2422
2423
2424
2425 type bodyReadMsg struct {
2426 st *stream
2427 n int
2428 }
2429
2430
2431
2432
2433 func (sc *serverConn) noteBodyReadFromHandler(st *stream, n int, err error) {
2434 sc.serveG.checkNotOn()
2435 if n > 0 {
2436 select {
2437 case sc.bodyReadCh <- bodyReadMsg{st, n}:
2438 case <-sc.doneServing:
2439 }
2440 }
2441 }
2442
2443 func (sc *serverConn) noteBodyRead(st *stream, n int) {
2444 sc.serveG.check()
2445 sc.sendWindowUpdate(nil, n)
2446 if st.state != stateHalfClosedRemote && st.state != stateClosed {
2447
2448
2449 sc.sendWindowUpdate(st, n)
2450 }
2451 }
2452
2453
2454 func (sc *serverConn) sendWindowUpdate32(st *stream, n int32) {
2455 sc.sendWindowUpdate(st, int(n))
2456 }
2457
2458
2459 func (sc *serverConn) sendWindowUpdate(st *stream, n int) {
2460 sc.serveG.check()
2461 var streamID uint32
2462 var send int32
2463 if st == nil {
2464 send = sc.inflow.add(n)
2465 } else {
2466 streamID = st.id
2467 send = st.inflow.add(n)
2468 }
2469 if send == 0 {
2470 return
2471 }
2472 sc.writeFrame(FrameWriteRequest{
2473 write: writeWindowUpdate{streamID: streamID, n: uint32(send)},
2474 stream: st,
2475 })
2476 }
2477
2478
2479
2480 type requestBody struct {
2481 _ incomparable
2482 stream *stream
2483 conn *serverConn
2484 closeOnce sync.Once
2485 sawEOF bool
2486 pipe *pipe
2487 needsContinue bool
2488 }
2489
2490 func (b *requestBody) Close() error {
2491 b.closeOnce.Do(func() {
2492 if b.pipe != nil {
2493 b.pipe.BreakWithError(errClosedBody)
2494 }
2495 })
2496 return nil
2497 }
2498
2499 func (b *requestBody) Read(p []byte) (n int, err error) {
2500 if b.needsContinue {
2501 b.needsContinue = false
2502 b.conn.write100ContinueHeaders(b.stream)
2503 }
2504 if b.pipe == nil || b.sawEOF {
2505 return 0, io.EOF
2506 }
2507 n, err = b.pipe.Read(p)
2508 if err == io.EOF {
2509 b.sawEOF = true
2510 }
2511 if b.conn == nil && inTests {
2512 return
2513 }
2514 b.conn.noteBodyReadFromHandler(b.stream, n, err)
2515 return
2516 }
2517
2518
2519
2520
2521
2522
2523
2524 type responseWriter struct {
2525 rws *responseWriterState
2526 }
2527
2528
2529 var (
2530 _ http.CloseNotifier = (*responseWriter)(nil)
2531 _ http.Flusher = (*responseWriter)(nil)
2532 _ stringWriter = (*responseWriter)(nil)
2533 )
2534
2535 type responseWriterState struct {
2536
2537 stream *stream
2538 req *http.Request
2539 conn *serverConn
2540
2541
2542 bw *bufio.Writer
2543
2544
2545 handlerHeader http.Header
2546 snapHeader http.Header
2547 trailers []string
2548 status int
2549 wroteHeader bool
2550 sentHeader bool
2551 handlerDone bool
2552
2553 sentContentLen int64
2554 wroteBytes int64
2555
2556 closeNotifierMu sync.Mutex
2557 closeNotifierCh chan bool
2558 }
2559
2560 type chunkWriter struct{ rws *responseWriterState }
2561
2562 func (cw chunkWriter) Write(p []byte) (n int, err error) {
2563 n, err = cw.rws.writeChunk(p)
2564 if err == errStreamClosed {
2565
2566
2567 err = cw.rws.stream.closeErr
2568 }
2569 return n, err
2570 }
2571
2572 func (rws *responseWriterState) hasTrailers() bool { return len(rws.trailers) > 0 }
2573
2574 func (rws *responseWriterState) hasNonemptyTrailers() bool {
2575 for _, trailer := range rws.trailers {
2576 if _, ok := rws.handlerHeader[trailer]; ok {
2577 return true
2578 }
2579 }
2580 return false
2581 }
2582
2583
2584
2585
2586 func (rws *responseWriterState) declareTrailer(k string) {
2587 k = http.CanonicalHeaderKey(k)
2588 if !httpguts.ValidTrailerHeader(k) {
2589
2590 rws.conn.logf("ignoring invalid trailer %q", k)
2591 return
2592 }
2593 if !strSliceContains(rws.trailers, k) {
2594 rws.trailers = append(rws.trailers, k)
2595 }
2596 }
2597
2598
2599
2600
2601
2602
2603
2604 func (rws *responseWriterState) writeChunk(p []byte) (n int, err error) {
2605 if !rws.wroteHeader {
2606 rws.writeHeader(200)
2607 }
2608
2609 if rws.handlerDone {
2610 rws.promoteUndeclaredTrailers()
2611 }
2612
2613 isHeadResp := rws.req.Method == "HEAD"
2614 if !rws.sentHeader {
2615 rws.sentHeader = true
2616 var ctype, clen string
2617 if clen = rws.snapHeader.Get("Content-Length"); clen != "" {
2618 rws.snapHeader.Del("Content-Length")
2619 if cl, err := strconv.ParseUint(clen, 10, 63); err == nil {
2620 rws.sentContentLen = int64(cl)
2621 } else {
2622 clen = ""
2623 }
2624 }
2625 _, hasContentLength := rws.snapHeader["Content-Length"]
2626 if !hasContentLength && clen == "" && rws.handlerDone && bodyAllowedForStatus(rws.status) && (len(p) > 0 || !isHeadResp) {
2627 clen = strconv.Itoa(len(p))
2628 }
2629 _, hasContentType := rws.snapHeader["Content-Type"]
2630
2631
2632 ce := rws.snapHeader.Get("Content-Encoding")
2633 hasCE := len(ce) > 0
2634 if !hasCE && !hasContentType && bodyAllowedForStatus(rws.status) && len(p) > 0 {
2635 ctype = http.DetectContentType(p)
2636 }
2637 var date string
2638 if _, ok := rws.snapHeader["Date"]; !ok {
2639
2640 date = time.Now().UTC().Format(http.TimeFormat)
2641 }
2642
2643 for _, v := range rws.snapHeader["Trailer"] {
2644 foreachHeaderElement(v, rws.declareTrailer)
2645 }
2646
2647
2648
2649
2650
2651
2652 if _, ok := rws.snapHeader["Connection"]; ok {
2653 v := rws.snapHeader.Get("Connection")
2654 delete(rws.snapHeader, "Connection")
2655 if v == "close" {
2656 rws.conn.startGracefulShutdown()
2657 }
2658 }
2659
2660 endStream := (rws.handlerDone && !rws.hasTrailers() && len(p) == 0) || isHeadResp
2661 err = rws.conn.writeHeaders(rws.stream, &writeResHeaders{
2662 streamID: rws.stream.id,
2663 httpResCode: rws.status,
2664 h: rws.snapHeader,
2665 endStream: endStream,
2666 contentType: ctype,
2667 contentLength: clen,
2668 date: date,
2669 })
2670 if err != nil {
2671 return 0, err
2672 }
2673 if endStream {
2674 return 0, nil
2675 }
2676 }
2677 if isHeadResp {
2678 return len(p), nil
2679 }
2680 if len(p) == 0 && !rws.handlerDone {
2681 return 0, nil
2682 }
2683
2684
2685
2686 hasNonemptyTrailers := rws.hasNonemptyTrailers()
2687 endStream := rws.handlerDone && !hasNonemptyTrailers
2688 if len(p) > 0 || endStream {
2689
2690 if err := rws.conn.writeDataFromHandler(rws.stream, p, endStream); err != nil {
2691 return 0, err
2692 }
2693 }
2694
2695 if rws.handlerDone && hasNonemptyTrailers {
2696 err = rws.conn.writeHeaders(rws.stream, &writeResHeaders{
2697 streamID: rws.stream.id,
2698 h: rws.handlerHeader,
2699 trailers: rws.trailers,
2700 endStream: true,
2701 })
2702 return len(p), err
2703 }
2704 return len(p), nil
2705 }
2706
2707
2708
2709
2710
2711
2712
2713
2714
2715
2716
2717
2718
2719
2720 const TrailerPrefix = "Trailer:"
2721
2722
2723
2724
2725
2726
2727
2728
2729
2730
2731
2732
2733
2734
2735
2736
2737
2738
2739
2740
2741
2742
2743 func (rws *responseWriterState) promoteUndeclaredTrailers() {
2744 for k, vv := range rws.handlerHeader {
2745 if !strings.HasPrefix(k, TrailerPrefix) {
2746 continue
2747 }
2748 trailerKey := strings.TrimPrefix(k, TrailerPrefix)
2749 rws.declareTrailer(trailerKey)
2750 rws.handlerHeader[http.CanonicalHeaderKey(trailerKey)] = vv
2751 }
2752
2753 if len(rws.trailers) > 1 {
2754 sorter := sorterPool.Get().(*sorter)
2755 sorter.SortStrings(rws.trailers)
2756 sorterPool.Put(sorter)
2757 }
2758 }
2759
2760 func (w *responseWriter) SetReadDeadline(deadline time.Time) error {
2761 st := w.rws.stream
2762 if !deadline.IsZero() && deadline.Before(time.Now()) {
2763
2764
2765 st.onReadTimeout()
2766 return nil
2767 }
2768 w.rws.conn.sendServeMsg(func(sc *serverConn) {
2769 if st.readDeadline != nil {
2770 if !st.readDeadline.Stop() {
2771
2772 return
2773 }
2774 }
2775 if deadline.IsZero() {
2776 st.readDeadline = nil
2777 } else if st.readDeadline == nil {
2778 st.readDeadline = time.AfterFunc(deadline.Sub(time.Now()), st.onReadTimeout)
2779 } else {
2780 st.readDeadline.Reset(deadline.Sub(time.Now()))
2781 }
2782 })
2783 return nil
2784 }
2785
2786 func (w *responseWriter) SetWriteDeadline(deadline time.Time) error {
2787 st := w.rws.stream
2788 if !deadline.IsZero() && deadline.Before(time.Now()) {
2789
2790
2791 st.onWriteTimeout()
2792 return nil
2793 }
2794 w.rws.conn.sendServeMsg(func(sc *serverConn) {
2795 if st.writeDeadline != nil {
2796 if !st.writeDeadline.Stop() {
2797
2798 return
2799 }
2800 }
2801 if deadline.IsZero() {
2802 st.writeDeadline = nil
2803 } else if st.writeDeadline == nil {
2804 st.writeDeadline = time.AfterFunc(deadline.Sub(time.Now()), st.onWriteTimeout)
2805 } else {
2806 st.writeDeadline.Reset(deadline.Sub(time.Now()))
2807 }
2808 })
2809 return nil
2810 }
2811
2812 func (w *responseWriter) Flush() {
2813 w.FlushError()
2814 }
2815
2816 func (w *responseWriter) FlushError() error {
2817 rws := w.rws
2818 if rws == nil {
2819 panic("Header called after Handler finished")
2820 }
2821 var err error
2822 if rws.bw.Buffered() > 0 {
2823 err = rws.bw.Flush()
2824 } else {
2825
2826
2827
2828
2829 _, err = chunkWriter{rws}.Write(nil)
2830 if err == nil {
2831 select {
2832 case <-rws.stream.cw:
2833 err = rws.stream.closeErr
2834 default:
2835 }
2836 }
2837 }
2838 return err
2839 }
2840
2841 func (w *responseWriter) CloseNotify() <-chan bool {
2842 rws := w.rws
2843 if rws == nil {
2844 panic("CloseNotify called after Handler finished")
2845 }
2846 rws.closeNotifierMu.Lock()
2847 ch := rws.closeNotifierCh
2848 if ch == nil {
2849 ch = make(chan bool, 1)
2850 rws.closeNotifierCh = ch
2851 cw := rws.stream.cw
2852 go func() {
2853 cw.Wait()
2854 ch <- true
2855 }()
2856 }
2857 rws.closeNotifierMu.Unlock()
2858 return ch
2859 }
2860
2861 func (w *responseWriter) Header() http.Header {
2862 rws := w.rws
2863 if rws == nil {
2864 panic("Header called after Handler finished")
2865 }
2866 if rws.handlerHeader == nil {
2867 rws.handlerHeader = make(http.Header)
2868 }
2869 return rws.handlerHeader
2870 }
2871
2872
2873 func checkWriteHeaderCode(code int) {
2874
2875
2876
2877
2878
2879
2880
2881
2882
2883
2884 if code < 100 || code > 999 {
2885 panic(fmt.Sprintf("invalid WriteHeader code %v", code))
2886 }
2887 }
2888
2889 func (w *responseWriter) WriteHeader(code int) {
2890 rws := w.rws
2891 if rws == nil {
2892 panic("WriteHeader called after Handler finished")
2893 }
2894 rws.writeHeader(code)
2895 }
2896
2897 func (rws *responseWriterState) writeHeader(code int) {
2898 if rws.wroteHeader {
2899 return
2900 }
2901
2902 checkWriteHeaderCode(code)
2903
2904
2905 if code >= 100 && code <= 199 {
2906
2907 h := rws.handlerHeader
2908
2909 _, cl := h["Content-Length"]
2910 _, te := h["Transfer-Encoding"]
2911 if cl || te {
2912 h = h.Clone()
2913 h.Del("Content-Length")
2914 h.Del("Transfer-Encoding")
2915 }
2916
2917 rws.conn.writeHeaders(rws.stream, &writeResHeaders{
2918 streamID: rws.stream.id,
2919 httpResCode: code,
2920 h: h,
2921 endStream: rws.handlerDone && !rws.hasTrailers(),
2922 })
2923
2924 return
2925 }
2926
2927 rws.wroteHeader = true
2928 rws.status = code
2929 if len(rws.handlerHeader) > 0 {
2930 rws.snapHeader = cloneHeader(rws.handlerHeader)
2931 }
2932 }
2933
2934 func cloneHeader(h http.Header) http.Header {
2935 h2 := make(http.Header, len(h))
2936 for k, vv := range h {
2937 vv2 := make([]string, len(vv))
2938 copy(vv2, vv)
2939 h2[k] = vv2
2940 }
2941 return h2
2942 }
2943
2944
2945
2946
2947
2948
2949
2950
2951
2952 func (w *responseWriter) Write(p []byte) (n int, err error) {
2953 return w.write(len(p), p, "")
2954 }
2955
2956 func (w *responseWriter) WriteString(s string) (n int, err error) {
2957 return w.write(len(s), nil, s)
2958 }
2959
2960
2961 func (w *responseWriter) write(lenData int, dataB []byte, dataS string) (n int, err error) {
2962 rws := w.rws
2963 if rws == nil {
2964 panic("Write called after Handler finished")
2965 }
2966 if !rws.wroteHeader {
2967 w.WriteHeader(200)
2968 }
2969 if !bodyAllowedForStatus(rws.status) {
2970 return 0, http.ErrBodyNotAllowed
2971 }
2972 rws.wroteBytes += int64(len(dataB)) + int64(len(dataS))
2973 if rws.sentContentLen != 0 && rws.wroteBytes > rws.sentContentLen {
2974
2975 return 0, errors.New("http2: handler wrote more than declared Content-Length")
2976 }
2977
2978 if dataB != nil {
2979 return rws.bw.Write(dataB)
2980 } else {
2981 return rws.bw.WriteString(dataS)
2982 }
2983 }
2984
2985 func (w *responseWriter) handlerDone() {
2986 rws := w.rws
2987 rws.handlerDone = true
2988 w.Flush()
2989 w.rws = nil
2990 responseWriterStatePool.Put(rws)
2991 }
2992
2993
2994 var (
2995 ErrRecursivePush = errors.New("http2: recursive push not allowed")
2996 ErrPushLimitReached = errors.New("http2: push would exceed peer's SETTINGS_MAX_CONCURRENT_STREAMS")
2997 )
2998
2999 var _ http.Pusher = (*responseWriter)(nil)
3000
3001 func (w *responseWriter) Push(target string, opts *http.PushOptions) error {
3002 st := w.rws.stream
3003 sc := st.sc
3004 sc.serveG.checkNotOn()
3005
3006
3007
3008 if st.isPushed() {
3009 return ErrRecursivePush
3010 }
3011
3012 if opts == nil {
3013 opts = new(http.PushOptions)
3014 }
3015
3016
3017 if opts.Method == "" {
3018 opts.Method = "GET"
3019 }
3020 if opts.Header == nil {
3021 opts.Header = http.Header{}
3022 }
3023 wantScheme := "http"
3024 if w.rws.req.TLS != nil {
3025 wantScheme = "https"
3026 }
3027
3028
3029 u, err := url.Parse(target)
3030 if err != nil {
3031 return err
3032 }
3033 if u.Scheme == "" {
3034 if !strings.HasPrefix(target, "/") {
3035 return fmt.Errorf("target must be an absolute URL or an absolute path: %q", target)
3036 }
3037 u.Scheme = wantScheme
3038 u.Host = w.rws.req.Host
3039 } else {
3040 if u.Scheme != wantScheme {
3041 return fmt.Errorf("cannot push URL with scheme %q from request with scheme %q", u.Scheme, wantScheme)
3042 }
3043 if u.Host == "" {
3044 return errors.New("URL must have a host")
3045 }
3046 }
3047 for k := range opts.Header {
3048 if strings.HasPrefix(k, ":") {
3049 return fmt.Errorf("promised request headers cannot include pseudo header %q", k)
3050 }
3051
3052
3053
3054
3055 if asciiEqualFold(k, "content-length") ||
3056 asciiEqualFold(k, "content-encoding") ||
3057 asciiEqualFold(k, "trailer") ||
3058 asciiEqualFold(k, "te") ||
3059 asciiEqualFold(k, "expect") ||
3060 asciiEqualFold(k, "host") {
3061 return fmt.Errorf("promised request headers cannot include %q", k)
3062 }
3063 }
3064 if err := checkValidHTTP2RequestHeaders(opts.Header); err != nil {
3065 return err
3066 }
3067
3068
3069
3070
3071 if opts.Method != "GET" && opts.Method != "HEAD" {
3072 return fmt.Errorf("method %q must be GET or HEAD", opts.Method)
3073 }
3074
3075 msg := &startPushRequest{
3076 parent: st,
3077 method: opts.Method,
3078 url: u,
3079 header: cloneHeader(opts.Header),
3080 done: errChanPool.Get().(chan error),
3081 }
3082
3083 select {
3084 case <-sc.doneServing:
3085 return errClientDisconnected
3086 case <-st.cw:
3087 return errStreamClosed
3088 case sc.serveMsgCh <- msg:
3089 }
3090
3091 select {
3092 case <-sc.doneServing:
3093 return errClientDisconnected
3094 case <-st.cw:
3095 return errStreamClosed
3096 case err := <-msg.done:
3097 errChanPool.Put(msg.done)
3098 return err
3099 }
3100 }
3101
3102 type startPushRequest struct {
3103 parent *stream
3104 method string
3105 url *url.URL
3106 header http.Header
3107 done chan error
3108 }
3109
3110 func (sc *serverConn) startPush(msg *startPushRequest) {
3111 sc.serveG.check()
3112
3113
3114
3115
3116 if msg.parent.state != stateOpen && msg.parent.state != stateHalfClosedRemote {
3117
3118 msg.done <- errStreamClosed
3119 return
3120 }
3121
3122
3123 if !sc.pushEnabled {
3124 msg.done <- http.ErrNotSupported
3125 return
3126 }
3127
3128
3129
3130
3131 allocatePromisedID := func() (uint32, error) {
3132 sc.serveG.check()
3133
3134
3135
3136 if !sc.pushEnabled {
3137 return 0, http.ErrNotSupported
3138 }
3139
3140 if sc.curPushedStreams+1 > sc.clientMaxStreams {
3141 return 0, ErrPushLimitReached
3142 }
3143
3144
3145
3146
3147
3148 if sc.maxPushPromiseID+2 >= 1<<31 {
3149 sc.startGracefulShutdownInternal()
3150 return 0, ErrPushLimitReached
3151 }
3152 sc.maxPushPromiseID += 2
3153 promisedID := sc.maxPushPromiseID
3154
3155
3156
3157
3158
3159
3160 promised := sc.newStream(promisedID, msg.parent.id, stateHalfClosedRemote)
3161 rw, req, err := sc.newWriterAndRequestNoBody(promised, requestParam{
3162 method: msg.method,
3163 scheme: msg.url.Scheme,
3164 authority: msg.url.Host,
3165 path: msg.url.RequestURI(),
3166 header: cloneHeader(msg.header),
3167 })
3168 if err != nil {
3169
3170 panic(fmt.Sprintf("newWriterAndRequestNoBody(%+v): %v", msg.url, err))
3171 }
3172
3173 sc.curHandlers++
3174 go sc.runHandler(rw, req, sc.handler.ServeHTTP)
3175 return promisedID, nil
3176 }
3177
3178 sc.writeFrame(FrameWriteRequest{
3179 write: &writePushPromise{
3180 streamID: msg.parent.id,
3181 method: msg.method,
3182 url: msg.url,
3183 h: msg.header,
3184 allocatePromisedID: allocatePromisedID,
3185 },
3186 stream: msg.parent,
3187 done: msg.done,
3188 })
3189 }
3190
3191
3192
3193 func foreachHeaderElement(v string, fn func(string)) {
3194 v = textproto.TrimString(v)
3195 if v == "" {
3196 return
3197 }
3198 if !strings.Contains(v, ",") {
3199 fn(v)
3200 return
3201 }
3202 for _, f := range strings.Split(v, ",") {
3203 if f = textproto.TrimString(f); f != "" {
3204 fn(f)
3205 }
3206 }
3207 }
3208
3209
3210 var connHeaders = []string{
3211 "Connection",
3212 "Keep-Alive",
3213 "Proxy-Connection",
3214 "Transfer-Encoding",
3215 "Upgrade",
3216 }
3217
3218
3219
3220
3221 func checkValidHTTP2RequestHeaders(h http.Header) error {
3222 for _, k := range connHeaders {
3223 if _, ok := h[k]; ok {
3224 return fmt.Errorf("request header %q is not valid in HTTP/2", k)
3225 }
3226 }
3227 te := h["Te"]
3228 if len(te) > 0 && (len(te) > 1 || (te[0] != "trailers" && te[0] != "")) {
3229 return errors.New(`request header "TE" may only be "trailers" in HTTP/2`)
3230 }
3231 return nil
3232 }
3233
3234 func new400Handler(err error) http.HandlerFunc {
3235 return func(w http.ResponseWriter, r *http.Request) {
3236 http.Error(w, err.Error(), http.StatusBadRequest)
3237 }
3238 }
3239
3240
3241
3242
3243 func h1ServerKeepAlivesDisabled(hs *http.Server) bool {
3244 var x interface{} = hs
3245 type I interface {
3246 doKeepAlives() bool
3247 }
3248 if hs, ok := x.(I); ok {
3249 return !hs.doKeepAlives()
3250 }
3251 return false
3252 }
3253
3254 func (sc *serverConn) countError(name string, err error) error {
3255 if sc == nil || sc.srv == nil {
3256 return err
3257 }
3258 f := sc.srv.CountError
3259 if f == nil {
3260 return err
3261 }
3262 var typ string
3263 var code ErrCode
3264 switch e := err.(type) {
3265 case ConnectionError:
3266 typ = "conn"
3267 code = ErrCode(e)
3268 case StreamError:
3269 typ = "stream"
3270 code = ErrCode(e.Code)
3271 default:
3272 return err
3273 }
3274 codeStr := errCodeName[code]
3275 if codeStr == "" {
3276 codeStr = strconv.Itoa(int(code))
3277 }
3278 f(fmt.Sprintf("%s_%s_%s", typ, codeStr, name))
3279 return err
3280 }
3281
View as plain text