1
2
3
4
5
6
7 package http2
8
9 import (
10 "bufio"
11 "bytes"
12 "compress/flate"
13 "compress/gzip"
14 "context"
15 "crypto/rand"
16 "crypto/tls"
17 "errors"
18 "fmt"
19 "io"
20 "io/fs"
21 "log"
22 "math"
23 "math/bits"
24 mathrand "math/rand"
25 "net"
26 "net/http/httptrace"
27 "net/http/internal"
28 "net/http/internal/httpcommon"
29 "net/textproto"
30 "slices"
31 "strconv"
32 "strings"
33 "sync"
34 "sync/atomic"
35 "time"
36
37 "golang.org/x/net/http/httpguts"
38 "golang.org/x/net/http2/hpack"
39 "golang.org/x/net/idna"
40 )
41
42 const (
43
44
45 transportDefaultConnFlow = 1 << 30
46
47
48
49
50 transportDefaultStreamFlow = 4 << 20
51
52 defaultUserAgent = "Go-http-client/2.0"
53
54
55
56
57 initialMaxConcurrentStreams = 100
58
59
60
61 defaultMaxConcurrentStreams = 1000
62 )
63
64
65
66
67
68 type Transport struct {
69
70
71
72
73
74
75
76 DialTLSContext func(ctx context.Context, network, addr string, cfg *tls.Config) (net.Conn, error)
77
78
79
80
81
82
83
84
85
86 DialTLS func(network, addr string, cfg *tls.Config) (net.Conn, error)
87
88
89
90 TLSClientConfig *tls.Config
91
92
93
94 ConnPool ClientConnPool
95
96
97
98
99
100
101
102
103
104 DisableCompression bool
105
106
107
108 AllowHTTP bool
109
110
111
112
113
114
115
116
117 MaxHeaderListSize uint32
118
119
120
121
122
123
124
125
126 MaxReadFrameSize uint32
127
128
129
130
131
132
133 MaxDecoderHeaderTableSize uint32
134
135
136
137
138
139 MaxEncoderHeaderTableSize uint32
140
141
142
143
144
145
146
147
148
149 StrictMaxConcurrentStreams bool
150
151
152
153
154
155 IdleConnTimeout time.Duration
156
157
158
159
160
161
162
163 ReadIdleTimeout time.Duration
164
165
166
167
168 PingTimeout time.Duration
169
170
171
172
173 WriteByteTimeout time.Duration
174
175
176
177
178
179 CountError func(errType string)
180
181 t1 TransportConfig
182
183 connPoolOnce sync.Once
184 connPoolOrDef ClientConnPool
185
186 *transportTestHooks
187 }
188
189
190
191
192
193 type transportTestHooks struct {
194 newclientconn func(*ClientConn)
195 }
196
197 func (t *Transport) maxHeaderListSize() uint32 {
198 n := t.t1.MaxResponseHeaderBytes()
199 if n > 0 {
200 n = adjustHTTP1MaxHeaderSize(n)
201 }
202 if n <= 0 {
203 return 10 << 20
204 }
205 if n >= 0xffffffff {
206 return 0
207 }
208 return uint32(n)
209 }
210
211 func (t *Transport) disableCompression() bool {
212 return t.DisableCompression || (t.t1 != nil && t.t1.DisableCompression())
213 }
214
215 func NewTransport(t1 TransportConfig) *Transport {
216 connPool := new(clientConnPool)
217 t2 := &Transport{
218 ConnPool: noDialClientConnPool{connPool},
219 t1: t1,
220 }
221 connPool.t = t2
222 return t2
223 }
224
225 func (t *Transport) AddConn(scheme, authority string, c net.Conn) error {
226 connPool, ok := t.ConnPool.(noDialClientConnPool)
227 if !ok {
228 go c.Close()
229 return nil
230 }
231 addr := authorityAddr(scheme, authority)
232 used, err := connPool.addConnIfNeeded(addr, t, c)
233 if !used {
234 go c.Close()
235 }
236 return err
237 }
238
239
240
241 type unencryptedTransport Transport
242
243 func (t *unencryptedTransport) RoundTrip(req *ClientRequest) (*ClientResponse, error) {
244 return (*Transport)(t).RoundTripOpt(req, RoundTripOpt{allowHTTP: true})
245 }
246
247 func (t *Transport) connPool() ClientConnPool {
248 t.connPoolOnce.Do(t.initConnPool)
249 return t.connPoolOrDef
250 }
251
252 func (t *Transport) initConnPool() {
253 if t.ConnPool != nil {
254 t.connPoolOrDef = t.ConnPool
255 } else {
256 t.connPoolOrDef = &clientConnPool{t: t}
257 }
258 }
259
260
261
262 type ClientConn struct {
263 t *Transport
264 tconn net.Conn
265 tlsState *tls.ConnectionState
266 atomicReused uint32
267 singleUse bool
268 getConnCalled bool
269
270
271 readerDone chan struct{}
272 readerErr error
273
274 idleTimeout time.Duration
275 idleTimer *time.Timer
276
277 mu sync.Mutex
278 cond *sync.Cond
279 flow outflow
280 inflow inflow
281 doNotReuse bool
282 closing bool
283 closed bool
284 closedOnIdle bool
285 seenSettings bool
286 seenSettingsChan chan struct{}
287 wantSettingsAck bool
288 goAway *GoAwayFrame
289 goAwayDebug string
290 streams map[uint32]*clientStream
291 streamsReserved int
292 nextStreamID uint32
293 pendingRequests int
294 pings map[[8]byte]chan struct{}
295 br *bufio.Reader
296 lastActive time.Time
297 lastIdle time.Time
298
299 maxFrameSize uint32
300 maxConcurrentStreams uint32
301 peerMaxHeaderListSize uint64
302 peerMaxHeaderTableSize uint32
303 initialWindowSize uint32
304 initialStreamRecvWindowSize int32
305 readIdleTimeout time.Duration
306 pingTimeout time.Duration
307 extendedConnectAllowed bool
308 strictMaxConcurrentStreams bool
309
310
311
312
313
314
315
316
317
318 rstStreamPingsBlocked bool
319
320
321
322
323
324
325
326 pendingResets int
327
328
329
330
331
332
333 readBeforeStreamID uint32
334
335
336
337
338 reqHeaderMu chan struct{}
339
340
341
342
343
344 internalStateHook func()
345
346
347
348
349 wmu sync.Mutex
350 bw *bufio.Writer
351 fr *Framer
352 werr error
353 hbuf bytes.Buffer
354 henc *hpack.Encoder
355 }
356
357
358
359 type clientStream struct {
360 cc *ClientConn
361
362
363 ctx context.Context
364 reqCancel <-chan struct{}
365
366 trace *httptrace.ClientTrace
367 ID uint32
368 bufPipe pipe
369 requestedGzip bool
370 isHead bool
371
372 abortOnce sync.Once
373 abort chan struct{}
374 abortErr error
375
376 peerClosed chan struct{}
377 donec chan struct{}
378 on100 chan struct{}
379
380 respHeaderRecv chan struct{}
381 res *ClientResponse
382
383 flow outflow
384 inflow inflow
385 bytesRemain int64
386 readErr error
387
388 reqBody io.ReadCloser
389 reqBodyContentLength int64
390 reqBodyClosed chan struct{}
391
392
393 sentEndStream bool
394 sentHeaders bool
395
396
397 firstByte bool
398 pastHeaders bool
399 pastTrailers bool
400 readClosed bool
401 readAborted bool
402 totalHeaderSize int64
403
404 trailer Header
405 resTrailer *Header
406
407 staticResp ClientResponse
408 }
409
410 var got1xxFuncForTests func(int, textproto.MIMEHeader) error
411
412
413
414 func (cs *clientStream) get1xxTraceFunc() func(int, textproto.MIMEHeader) error {
415 if fn := got1xxFuncForTests; fn != nil {
416 return fn
417 }
418 return traceGot1xxResponseFunc(cs.trace)
419 }
420
421 func (cs *clientStream) abortStream(err error) {
422 cs.cc.mu.Lock()
423 defer cs.cc.mu.Unlock()
424 cs.abortStreamLocked(err)
425 }
426
427 func (cs *clientStream) abortStreamLocked(err error) {
428 cs.abortOnce.Do(func() {
429 cs.abortErr = err
430 close(cs.abort)
431 })
432 if cs.reqBody != nil {
433 cs.closeReqBodyLocked()
434 }
435
436 if cs.cc.cond != nil {
437
438 cs.cc.cond.Broadcast()
439 }
440 }
441
442 func (cs *clientStream) abortRequestBodyWrite() {
443 cc := cs.cc
444 cc.mu.Lock()
445 defer cc.mu.Unlock()
446 if cs.reqBody != nil && cs.reqBodyClosed == nil {
447 cs.closeReqBodyLocked()
448 cc.cond.Broadcast()
449 }
450 }
451
452 func (cs *clientStream) closeReqBodyLocked() {
453 if cs.reqBodyClosed != nil {
454 return
455 }
456 cs.reqBodyClosed = make(chan struct{})
457 reqBodyClosed := cs.reqBodyClosed
458 go func() {
459 cs.reqBody.Close()
460 close(reqBodyClosed)
461 }()
462 }
463
464 type stickyErrWriter struct {
465 conn net.Conn
466 timeout time.Duration
467 err *error
468 }
469
470 func (sew stickyErrWriter) Write(p []byte) (n int, err error) {
471 if *sew.err != nil {
472 return 0, *sew.err
473 }
474 n, err = writeWithByteTimeout(sew.conn, sew.timeout, p)
475 *sew.err = err
476 return n, err
477 }
478
479
480
481
482
483
484
485 type noCachedConnError struct{}
486
487 func (noCachedConnError) IsHTTP2NoCachedConnError() {}
488 func (noCachedConnError) Error() string { return "http2: no cached connection was available" }
489
490
491
492
493 func isNoCachedConnError(err error) bool {
494 _, ok := err.(interface{ IsHTTP2NoCachedConnError() })
495 return ok
496 }
497
498 var ErrNoCachedConn error = noCachedConnError{}
499
500
501 type RoundTripOpt struct {
502
503
504
505
506 OnlyCachedConn bool
507
508 allowHTTP bool
509 }
510
511 func (t *Transport) RoundTrip(req *ClientRequest) (*ClientResponse, error) {
512 return t.RoundTripOpt(req, RoundTripOpt{})
513 }
514
515
516
517 func authorityAddr(scheme string, authority string) (addr string) {
518 host, port, err := net.SplitHostPort(authority)
519 if err != nil {
520 host = authority
521 port = ""
522 }
523 if port == "" {
524 port = "443"
525 if scheme == "http" {
526 port = "80"
527 }
528 }
529 if a, err := idna.ToASCII(host); err == nil {
530 host = a
531 }
532
533 if strings.HasPrefix(host, "[") && strings.HasSuffix(host, "]") {
534 return host + ":" + port
535 }
536 return net.JoinHostPort(host, port)
537 }
538
539
540 func (t *Transport) RoundTripOpt(req *ClientRequest, opt RoundTripOpt) (*ClientResponse, error) {
541 switch req.URL.Scheme {
542 case "https":
543
544 case "http":
545 if !t.AllowHTTP && !opt.allowHTTP {
546 return nil, errors.New("http2: unencrypted HTTP/2 not enabled")
547 }
548 default:
549 return nil, errors.New("http2: unsupported scheme")
550 }
551
552 addr := authorityAddr(req.URL.Scheme, req.URL.Host)
553 for retry := 0; ; retry++ {
554 cc, err := t.connPool().GetClientConn(req, addr)
555 if err != nil {
556 t.vlogf("http2: Transport failed to get client conn for %s: %v", addr, err)
557 return nil, err
558 }
559 reused := !atomic.CompareAndSwapUint32(&cc.atomicReused, 0, 1)
560 traceGotConn(req, cc, reused)
561 res, err := cc.RoundTrip(req)
562 if err != nil && retry <= 6 {
563 roundTripErr := err
564 if req, err = shouldRetryRequest(req, err); err == nil {
565
566 if retry == 0 {
567 t.vlogf("RoundTrip retrying after failure: %v", roundTripErr)
568 continue
569 }
570 backoff := float64(uint(1) << (uint(retry) - 1))
571 backoff += backoff * (0.1 * mathrand.Float64())
572 d := time.Second * time.Duration(backoff)
573 tm := time.NewTimer(d)
574 select {
575 case <-tm.C:
576 t.vlogf("RoundTrip retrying after failure: %v", roundTripErr)
577 continue
578 case <-req.Context.Done():
579 tm.Stop()
580 err = req.Context.Err()
581 }
582 }
583 }
584 if err == errClientConnNotEstablished {
585
586
587
588
589
590
591
592
593
594
595 if cc.idleTimer != nil {
596 cc.idleTimer.Stop()
597 }
598 t.connPool().MarkDead(cc)
599 }
600 if err != nil {
601 t.vlogf("RoundTrip failure: %v", err)
602 return nil, err
603 }
604 return res, nil
605 }
606 }
607
608 func (t *Transport) IdleConnStrsForTesting() []string {
609 pool, ok := t.connPool().(noDialClientConnPool)
610 if !ok {
611 return nil
612 }
613
614 var ret []string
615 pool.mu.Lock()
616 defer pool.mu.Unlock()
617 for k, ccs := range pool.conns {
618 for _, cc := range ccs {
619 if cc.idleState().canTakeNewRequest {
620 ret = append(ret, k)
621 }
622 }
623 }
624 slices.Sort(ret)
625 return ret
626 }
627
628
629
630
631 func (t *Transport) CloseIdleConnections() {
632 if cp, ok := t.connPool().(clientConnPoolIdleCloser); ok {
633 cp.closeIdleConnections()
634 }
635 }
636
637 var (
638 errClientConnClosed = errors.New("http2: client conn is closed")
639 errClientConnUnusable = errors.New("http2: client conn not usable")
640 errClientConnNotEstablished = errors.New("http2: client conn could not be established")
641 errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY")
642 errClientConnForceClosed = errors.New("http2: client connection force closed via ClientConn.Close")
643 )
644
645
646
647
648
649
650 func shouldRetryRequest(req *ClientRequest, err error) (*ClientRequest, error) {
651 if !canRetryError(err) {
652 return nil, err
653 }
654
655 if req.Body == nil || req.Body == NoBody {
656 return req.Clone(), nil
657 }
658
659
660
661 if req.GetBody != nil {
662 body, err := req.GetBody()
663 if err != nil {
664 return nil, err
665 }
666 newReq := req.Clone()
667 newReq.Body = body
668 return newReq, nil
669 }
670
671
672
673 if err == errClientConnUnusable {
674 return req.Clone(), nil
675 }
676
677 return nil, fmt.Errorf("http2: Transport: cannot retry err [%v] after Request.Body was written; define Request.GetBody to avoid this error", err)
678 }
679
680 func canRetryError(err error) bool {
681 if err == errClientConnUnusable || err == errClientConnGotGoAway {
682 return true
683 }
684 if se, ok := err.(StreamError); ok {
685 return se.Code == ErrCodeRefusedStream
686 }
687 return false
688 }
689
690 func (t *Transport) dialClientConn(ctx context.Context, addr string, singleUse bool) (*ClientConn, error) {
691 if t.transportTestHooks != nil {
692 return t.newClientConn(nil, singleUse, nil)
693 }
694 host, _, err := net.SplitHostPort(addr)
695 if err != nil {
696 return nil, err
697 }
698 tconn, err := t.dialTLS(ctx, "tcp", addr, t.newTLSConfig(host))
699 if err != nil {
700 return nil, err
701 }
702 return t.newClientConn(tconn, singleUse, nil)
703 }
704
705 func (t *Transport) newTLSConfig(host string) *tls.Config {
706 cfg := new(tls.Config)
707 if t.TLSClientConfig != nil {
708 *cfg = *t.TLSClientConfig.Clone()
709 }
710 if !slices.Contains(cfg.NextProtos, NextProtoTLS) {
711 cfg.NextProtos = append([]string{NextProtoTLS}, cfg.NextProtos...)
712 }
713 if cfg.ServerName == "" {
714 cfg.ServerName = host
715 }
716 return cfg
717 }
718
719 func (t *Transport) dialTLS(ctx context.Context, network, addr string, tlsCfg *tls.Config) (net.Conn, error) {
720 if t.DialTLSContext != nil {
721 return t.DialTLSContext(ctx, network, addr, tlsCfg)
722 } else if t.DialTLS != nil {
723 return t.DialTLS(network, addr, tlsCfg)
724 }
725
726 tlsCn, err := t.dialTLSWithContext(ctx, network, addr, tlsCfg)
727 if err != nil {
728 return nil, err
729 }
730 state := tlsCn.ConnectionState()
731 if p := state.NegotiatedProtocol; p != NextProtoTLS {
732 return nil, fmt.Errorf("http2: unexpected ALPN protocol %q; want %q", p, NextProtoTLS)
733 }
734 if !state.NegotiatedProtocolIsMutual {
735 return nil, errors.New("http2: could not negotiate protocol mutually")
736 }
737 return tlsCn, nil
738 }
739
740
741
742 func (t *Transport) disableKeepAlives() bool {
743 return t.t1 != nil && t.t1.DisableKeepAlives()
744 }
745
746 func (t *Transport) expectContinueTimeout() time.Duration {
747 if t.t1 == nil {
748 return 0
749 }
750 return t.t1.ExpectContinueTimeout()
751 }
752
753 func (t *Transport) NewClientConn(c net.Conn, internalStateHook func()) (NetHTTPClientConn, error) {
754 cc, err := t.newClientConn(c, t.disableKeepAlives(), internalStateHook)
755 if err != nil {
756 return NetHTTPClientConn{}, err
757 }
758
759
760
761 cc.strictMaxConcurrentStreams = true
762
763 return NetHTTPClientConn{cc}, nil
764 }
765
766 func (t *Transport) newClientConn(c net.Conn, singleUse bool, internalStateHook func()) (*ClientConn, error) {
767 conf := configFromTransport(t)
768 cc := &ClientConn{
769 t: t,
770 tconn: c,
771 readerDone: make(chan struct{}),
772 nextStreamID: 1,
773 maxFrameSize: 16 << 10,
774 initialWindowSize: 65535,
775 initialStreamRecvWindowSize: int32(conf.MaxReceiveBufferPerStream),
776 maxConcurrentStreams: initialMaxConcurrentStreams,
777 strictMaxConcurrentStreams: conf.StrictMaxConcurrentRequests,
778 peerMaxHeaderListSize: 0xffffffffffffffff,
779 streams: make(map[uint32]*clientStream),
780 singleUse: singleUse,
781 seenSettingsChan: make(chan struct{}),
782 wantSettingsAck: true,
783 readIdleTimeout: conf.SendPingTimeout,
784 pingTimeout: conf.PingTimeout,
785 pings: make(map[[8]byte]chan struct{}),
786 reqHeaderMu: make(chan struct{}, 1),
787 lastActive: time.Now(),
788 internalStateHook: internalStateHook,
789 }
790 if t.transportTestHooks != nil {
791 t.transportTestHooks.newclientconn(cc)
792 c = cc.tconn
793 }
794 if VerboseLogs {
795 t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr())
796 }
797
798 cc.cond = sync.NewCond(&cc.mu)
799 cc.flow.add(int32(initialWindowSize))
800
801
802
803 cc.bw = bufio.NewWriter(stickyErrWriter{
804 conn: c,
805 timeout: conf.WriteByteTimeout,
806 err: &cc.werr,
807 })
808 cc.br = bufio.NewReader(c)
809 cc.fr = NewFramer(cc.bw, cc.br)
810 cc.fr.SetMaxReadFrameSize(uint32(conf.MaxReadFrameSize))
811 if t.CountError != nil {
812 cc.fr.countError = t.CountError
813 }
814 maxHeaderTableSize := uint32(conf.MaxDecoderHeaderTableSize)
815 cc.fr.ReadMetaHeaders = hpack.NewDecoder(maxHeaderTableSize, nil)
816 cc.fr.MaxHeaderListSize = t.maxHeaderListSize()
817
818 cc.henc = hpack.NewEncoder(&cc.hbuf)
819 cc.henc.SetMaxDynamicTableSizeLimit(uint32(conf.MaxEncoderHeaderTableSize))
820 cc.peerMaxHeaderTableSize = initialHeaderTableSize
821
822 if cs, ok := c.(connectionStater); ok {
823 state := cs.ConnectionState()
824 cc.tlsState = &state
825 }
826
827 initialSettings := []Setting{
828 {ID: SettingEnablePush, Val: 0},
829 {ID: SettingInitialWindowSize, Val: uint32(cc.initialStreamRecvWindowSize)},
830 }
831 initialSettings = append(initialSettings, Setting{ID: SettingMaxFrameSize, Val: uint32(conf.MaxReadFrameSize)})
832 if max := t.maxHeaderListSize(); max != 0 {
833 initialSettings = append(initialSettings, Setting{ID: SettingMaxHeaderListSize, Val: max})
834 }
835 if maxHeaderTableSize != initialHeaderTableSize {
836 initialSettings = append(initialSettings, Setting{ID: SettingHeaderTableSize, Val: maxHeaderTableSize})
837 }
838
839 cc.bw.Write(clientPreface)
840 cc.fr.WriteSettings(initialSettings...)
841 cc.fr.WriteWindowUpdate(0, uint32(conf.MaxReceiveBufferPerConnection))
842 cc.inflow.init(int32(conf.MaxReceiveBufferPerConnection) + initialWindowSize)
843 cc.bw.Flush()
844 if cc.werr != nil {
845 cc.Close()
846 return nil, cc.werr
847 }
848
849
850 if d := t.idleConnTimeout(); d != 0 {
851 cc.idleTimeout = d
852 cc.idleTimer = time.AfterFunc(d, cc.onIdleTimeout)
853 }
854
855 go cc.readLoop()
856 return cc, nil
857 }
858
859 func (cc *ClientConn) healthCheck() {
860 pingTimeout := cc.pingTimeout
861
862
863 ctx, cancel := context.WithTimeout(context.Background(), pingTimeout)
864 defer cancel()
865 cc.vlogf("http2: Transport sending health check")
866 err := cc.Ping(ctx)
867 if err != nil {
868 cc.vlogf("http2: Transport health check failure: %v", err)
869 cc.closeForLostPing()
870 } else {
871 cc.vlogf("http2: Transport health check success")
872 }
873 }
874
875
876 func (cc *ClientConn) SetDoNotReuse() {
877 cc.mu.Lock()
878 defer cc.mu.Unlock()
879 cc.doNotReuse = true
880 }
881
882 func (cc *ClientConn) setGoAway(f *GoAwayFrame) {
883 cc.mu.Lock()
884 defer cc.mu.Unlock()
885
886 old := cc.goAway
887 cc.goAway = f
888
889
890 if cc.goAwayDebug == "" {
891 cc.goAwayDebug = string(f.DebugData())
892 }
893 if old != nil && old.ErrCode != ErrCodeNo {
894 cc.goAway.ErrCode = old.ErrCode
895 }
896 last := f.LastStreamID
897 for streamID, cs := range cc.streams {
898 if streamID <= last {
899
900
901
902 continue
903 }
904 if streamID == 1 && cc.goAway.ErrCode != ErrCodeNo {
905
906
907
908 cs.abortStreamLocked(fmt.Errorf("http2: Transport received GOAWAY from server ErrCode:%v", cc.goAway.ErrCode))
909 } else {
910
911
912 cs.abortStreamLocked(errClientConnGotGoAway)
913 }
914 }
915 }
916
917
918
919
920
921
922 func (cc *ClientConn) CanTakeNewRequest() bool {
923 cc.mu.Lock()
924 defer cc.mu.Unlock()
925 return cc.canTakeNewRequestLocked()
926 }
927
928
929
930
931 func (cc *ClientConn) ReserveNewRequest() bool {
932 cc.mu.Lock()
933 defer cc.mu.Unlock()
934 if st := cc.idleStateLocked(); !st.canTakeNewRequest {
935 return false
936 }
937 cc.streamsReserved++
938 return true
939 }
940
941
942 type ClientConnState struct {
943
944 Closed bool
945
946
947
948
949
950 Closing bool
951
952
953 StreamsActive int
954
955
956
957 StreamsReserved int
958
959
960
961
962 StreamsPending int
963
964
965
966
967 MaxConcurrentStreams uint32
968
969
970
971 LastIdle time.Time
972 }
973
974
975 func (cc *ClientConn) State() ClientConnState {
976 cc.wmu.Lock()
977 maxConcurrent := cc.maxConcurrentStreams
978 if !cc.seenSettings {
979 maxConcurrent = 0
980 }
981 cc.wmu.Unlock()
982
983 cc.mu.Lock()
984 defer cc.mu.Unlock()
985 return ClientConnState{
986 Closed: cc.closed,
987 Closing: cc.closing || cc.singleUse || cc.doNotReuse || cc.goAway != nil,
988 StreamsActive: len(cc.streams) + cc.pendingResets,
989 StreamsReserved: cc.streamsReserved,
990 StreamsPending: cc.pendingRequests,
991 LastIdle: cc.lastIdle,
992 MaxConcurrentStreams: maxConcurrent,
993 }
994 }
995
996
997
998 type clientConnIdleState struct {
999 canTakeNewRequest bool
1000 }
1001
1002 func (cc *ClientConn) idleState() clientConnIdleState {
1003 cc.mu.Lock()
1004 defer cc.mu.Unlock()
1005 return cc.idleStateLocked()
1006 }
1007
1008 func (cc *ClientConn) idleStateLocked() (st clientConnIdleState) {
1009 if cc.singleUse && cc.nextStreamID > 1 {
1010 return
1011 }
1012 var maxConcurrentOkay bool
1013 if cc.strictMaxConcurrentStreams {
1014
1015
1016
1017
1018 maxConcurrentOkay = true
1019 } else {
1020
1021
1022
1023
1024
1025
1026 maxConcurrentOkay = cc.currentRequestCountLocked() < int(cc.maxConcurrentStreams)
1027 }
1028
1029 st.canTakeNewRequest = maxConcurrentOkay && cc.isUsableLocked()
1030
1031
1032
1033
1034
1035
1036
1037
1038 if cc.nextStreamID == 1 && cc.streamsReserved == 0 && cc.closed && !cc.closedOnIdle {
1039 st.canTakeNewRequest = true
1040 }
1041
1042 return
1043 }
1044
1045 func (cc *ClientConn) isUsableLocked() bool {
1046 return cc.goAway == nil &&
1047 !cc.closed &&
1048 !cc.closing &&
1049 !cc.doNotReuse &&
1050 int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32 &&
1051 !cc.tooIdleLocked()
1052 }
1053
1054
1055
1056
1057
1058
1059
1060 func (cc *ClientConn) canReserveLocked() bool {
1061 if cc.currentRequestCountLocked() >= int(cc.maxConcurrentStreams) {
1062 return false
1063 }
1064 if !cc.isUsableLocked() {
1065 return false
1066 }
1067 return true
1068 }
1069
1070
1071
1072 func (cc *ClientConn) currentRequestCountLocked() int {
1073 return len(cc.streams) + cc.streamsReserved + cc.pendingResets
1074 }
1075
1076 func (cc *ClientConn) canTakeNewRequestLocked() bool {
1077 st := cc.idleStateLocked()
1078 return st.canTakeNewRequest
1079 }
1080
1081
1082 func (cc *ClientConn) availableLocked() int {
1083 if !cc.canTakeNewRequestLocked() {
1084 return 0
1085 }
1086 return max(0, int(cc.maxConcurrentStreams)-cc.currentRequestCountLocked())
1087 }
1088
1089
1090
1091 func (cc *ClientConn) tooIdleLocked() bool {
1092
1093
1094
1095
1096 return cc.idleTimeout != 0 && !cc.lastIdle.IsZero() && time.Since(cc.lastIdle.Round(0)) > cc.idleTimeout
1097 }
1098
1099
1100
1101
1102
1103
1104
1105 func (cc *ClientConn) onIdleTimeout() {
1106 cc.closeIfIdle()
1107 }
1108
1109 func (cc *ClientConn) closeConn() {
1110 t := time.AfterFunc(250*time.Millisecond, cc.forceCloseConn)
1111 defer t.Stop()
1112 cc.tconn.Close()
1113 cc.maybeCallStateHook()
1114 }
1115
1116
1117
1118 func (cc *ClientConn) forceCloseConn() {
1119 tc, ok := cc.tconn.(*tls.Conn)
1120 if !ok {
1121 return
1122 }
1123 if nc := tc.NetConn(); nc != nil {
1124 nc.Close()
1125 }
1126 }
1127
1128 func (cc *ClientConn) closeIfIdle() {
1129 cc.mu.Lock()
1130 if len(cc.streams) > 0 || cc.streamsReserved > 0 {
1131 cc.mu.Unlock()
1132 return
1133 }
1134 cc.closed = true
1135 cc.closedOnIdle = true
1136 nextID := cc.nextStreamID
1137
1138 cc.mu.Unlock()
1139
1140 if VerboseLogs {
1141 cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, nextID-2)
1142 }
1143 cc.closeConn()
1144 }
1145
1146 func (cc *ClientConn) isDoNotReuseAndIdle() bool {
1147 cc.mu.Lock()
1148 defer cc.mu.Unlock()
1149 return cc.doNotReuse && len(cc.streams) == 0
1150 }
1151
1152 var shutdownEnterWaitStateHook = func() {}
1153
1154
1155 func (cc *ClientConn) Shutdown(ctx context.Context) error {
1156 if err := cc.sendGoAway(); err != nil {
1157 return err
1158 }
1159
1160 done := make(chan struct{})
1161 cancelled := false
1162 go func() {
1163 cc.mu.Lock()
1164 defer cc.mu.Unlock()
1165 for {
1166 if len(cc.streams) == 0 || cc.closed {
1167 cc.closed = true
1168 close(done)
1169 break
1170 }
1171 if cancelled {
1172 break
1173 }
1174 cc.cond.Wait()
1175 }
1176 }()
1177 shutdownEnterWaitStateHook()
1178 select {
1179 case <-done:
1180 cc.closeConn()
1181 return nil
1182 case <-ctx.Done():
1183 cc.mu.Lock()
1184
1185 cancelled = true
1186 cc.cond.Broadcast()
1187 cc.mu.Unlock()
1188 return ctx.Err()
1189 }
1190 }
1191
1192 func (cc *ClientConn) sendGoAway() error {
1193 cc.mu.Lock()
1194 closing := cc.closing
1195 cc.closing = true
1196 maxStreamID := cc.nextStreamID
1197 cc.mu.Unlock()
1198 if closing {
1199
1200 return nil
1201 }
1202
1203 cc.wmu.Lock()
1204 defer cc.wmu.Unlock()
1205
1206 if err := cc.fr.WriteGoAway(maxStreamID, ErrCodeNo, nil); err != nil {
1207 return err
1208 }
1209 if err := cc.bw.Flush(); err != nil {
1210 return err
1211 }
1212
1213 return nil
1214 }
1215
1216
1217
1218 func (cc *ClientConn) closeForError(err error) {
1219 cc.mu.Lock()
1220 cc.closed = true
1221 for _, cs := range cc.streams {
1222 cs.abortStreamLocked(err)
1223 }
1224 cc.cond.Broadcast()
1225 cc.mu.Unlock()
1226 cc.closeConn()
1227 }
1228
1229
1230
1231
1232 func (cc *ClientConn) Close() error {
1233 cc.closeForError(errClientConnForceClosed)
1234 return nil
1235 }
1236
1237
1238 func (cc *ClientConn) closeForLostPing() {
1239 err := errors.New("http2: client connection lost")
1240 if f := cc.t.CountError; f != nil {
1241 f("conn_close_lost_ping")
1242 }
1243 cc.closeForError(err)
1244 }
1245
1246
1247
1248 var errRequestCanceled = internal.ErrRequestCanceled
1249
1250 func (cc *ClientConn) responseHeaderTimeout() time.Duration {
1251 if cc.t.t1 != nil {
1252 return cc.t.t1.ResponseHeaderTimeout()
1253 }
1254
1255
1256
1257
1258 return 0
1259 }
1260
1261
1262
1263
1264 func actualContentLength(req *ClientRequest) int64 {
1265 if req.Body == nil || req.Body == NoBody {
1266 return 0
1267 }
1268 if req.ContentLength != 0 {
1269 return req.ContentLength
1270 }
1271 return -1
1272 }
1273
1274 func (cc *ClientConn) decrStreamReservations() {
1275 cc.mu.Lock()
1276 defer cc.mu.Unlock()
1277 cc.decrStreamReservationsLocked()
1278 }
1279
1280 func (cc *ClientConn) decrStreamReservationsLocked() {
1281 if cc.streamsReserved > 0 {
1282 cc.streamsReserved--
1283 }
1284 }
1285
1286 func (cc *ClientConn) RoundTrip(req *ClientRequest) (*ClientResponse, error) {
1287 return cc.roundTrip(req, nil)
1288 }
1289
1290 func (cc *ClientConn) roundTrip(req *ClientRequest, streamf func(*clientStream)) (*ClientResponse, error) {
1291 ctx := req.Context
1292 req.stream = clientStream{
1293 cc: cc,
1294 ctx: ctx,
1295 reqCancel: req.Cancel,
1296 isHead: req.Method == "HEAD",
1297 reqBody: req.Body,
1298 reqBodyContentLength: actualContentLength(req),
1299 trace: httptrace.ContextClientTrace(ctx),
1300 peerClosed: make(chan struct{}),
1301 abort: make(chan struct{}),
1302 respHeaderRecv: make(chan struct{}),
1303 donec: make(chan struct{}),
1304 resTrailer: req.ResTrailer,
1305 }
1306 cs := &req.stream
1307
1308 cs.requestedGzip = httpcommon.IsRequestGzip(req.Method, req.Header, cc.t.disableCompression())
1309
1310 go cs.doRequest(req, streamf)
1311
1312 waitDone := func() error {
1313 select {
1314 case <-cs.donec:
1315 return nil
1316 case <-ctx.Done():
1317 return ctx.Err()
1318 case <-cs.reqCancel:
1319 return errRequestCanceled
1320 }
1321 }
1322
1323 handleResponseHeaders := func() (*ClientResponse, error) {
1324 res := cs.res
1325 if res.StatusCode > 299 {
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335 cs.abortRequestBodyWrite()
1336 }
1337 res.TLS = cc.tlsState
1338 if res.Body == NoBody && actualContentLength(req) == 0 {
1339
1340
1341
1342 if err := waitDone(); err != nil {
1343 return nil, err
1344 }
1345 }
1346 return res, nil
1347 }
1348
1349 cancelRequest := func(cs *clientStream, err error) error {
1350 cs.cc.mu.Lock()
1351 bodyClosed := cs.reqBodyClosed
1352 cs.cc.mu.Unlock()
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366 if bodyClosed != nil {
1367 <-bodyClosed
1368 }
1369 return err
1370 }
1371
1372 for {
1373 select {
1374 case <-cs.respHeaderRecv:
1375 return handleResponseHeaders()
1376 case <-cs.abort:
1377 select {
1378 case <-cs.respHeaderRecv:
1379
1380
1381
1382
1383 return handleResponseHeaders()
1384 default:
1385 waitDone()
1386 return nil, cs.abortErr
1387 }
1388 case <-ctx.Done():
1389 err := ctx.Err()
1390 cs.abortStream(err)
1391 return nil, cancelRequest(cs, err)
1392 case <-cs.reqCancel:
1393 cs.abortStream(errRequestCanceled)
1394 return nil, cancelRequest(cs, errRequestCanceled)
1395 }
1396 }
1397 }
1398
1399
1400
1401
1402 func (cs *clientStream) doRequest(req *ClientRequest, streamf func(*clientStream)) {
1403 err := cs.writeRequest(req, streamf)
1404 cs.cleanupWriteRequest(err)
1405 }
1406
1407 var errExtendedConnectNotSupported = errors.New("net/http: extended connect not supported by peer")
1408
1409
1410
1411
1412
1413
1414
1415
1416 func (cs *clientStream) writeRequest(req *ClientRequest, streamf func(*clientStream)) (err error) {
1417 cc := cs.cc
1418 ctx := cs.ctx
1419
1420
1421
1422 var isExtendedConnect bool
1423 if req.Method == "CONNECT" && req.Header.Get(":protocol") != "" {
1424 isExtendedConnect = true
1425 }
1426
1427
1428
1429
1430 if cc.reqHeaderMu == nil {
1431 panic("RoundTrip on uninitialized ClientConn")
1432 }
1433 if isExtendedConnect {
1434 select {
1435 case <-cs.reqCancel:
1436 return errRequestCanceled
1437 case <-ctx.Done():
1438 return ctx.Err()
1439 case <-cc.seenSettingsChan:
1440 if !cc.extendedConnectAllowed {
1441 return errExtendedConnectNotSupported
1442 }
1443 }
1444 }
1445 select {
1446 case cc.reqHeaderMu <- struct{}{}:
1447 case <-cs.reqCancel:
1448 return errRequestCanceled
1449 case <-ctx.Done():
1450 return ctx.Err()
1451 }
1452
1453 cc.mu.Lock()
1454 if cc.idleTimer != nil {
1455 cc.idleTimer.Stop()
1456 }
1457 cc.decrStreamReservationsLocked()
1458 if err := cc.awaitOpenSlotForStreamLocked(cs); err != nil {
1459 cc.mu.Unlock()
1460 <-cc.reqHeaderMu
1461 return err
1462 }
1463 cc.addStreamLocked(cs)
1464 if isConnectionCloseRequest(req) {
1465 cc.doNotReuse = true
1466 }
1467 cc.mu.Unlock()
1468
1469 if streamf != nil {
1470 streamf(cs)
1471 }
1472
1473 continueTimeout := cc.t.expectContinueTimeout()
1474 if continueTimeout != 0 {
1475 if !httpguts.HeaderValuesContainsToken(req.Header["Expect"], "100-continue") {
1476 continueTimeout = 0
1477 } else {
1478 cs.on100 = make(chan struct{}, 1)
1479 }
1480 }
1481
1482
1483
1484
1485
1486 err = cs.encodeAndWriteHeaders(req)
1487 <-cc.reqHeaderMu
1488 if err != nil {
1489 return err
1490 }
1491
1492 hasBody := cs.reqBodyContentLength != 0
1493 if !hasBody {
1494 cs.sentEndStream = true
1495 } else {
1496 if continueTimeout != 0 {
1497 traceWait100Continue(cs.trace)
1498 timer := time.NewTimer(continueTimeout)
1499 select {
1500 case <-timer.C:
1501 err = nil
1502 case <-cs.on100:
1503 err = nil
1504 case <-cs.abort:
1505 err = cs.abortErr
1506 case <-ctx.Done():
1507 err = ctx.Err()
1508 case <-cs.reqCancel:
1509 err = errRequestCanceled
1510 }
1511 timer.Stop()
1512 if err != nil {
1513 traceWroteRequest(cs.trace, err)
1514 return err
1515 }
1516 }
1517
1518 if err = cs.writeRequestBody(req); err != nil {
1519 if err != errStopReqBodyWrite {
1520 traceWroteRequest(cs.trace, err)
1521 return err
1522 }
1523 } else {
1524 cs.sentEndStream = true
1525 }
1526 }
1527
1528 traceWroteRequest(cs.trace, err)
1529
1530 var respHeaderTimer <-chan time.Time
1531 var respHeaderRecv chan struct{}
1532 if d := cc.responseHeaderTimeout(); d != 0 {
1533 timer := time.NewTimer(d)
1534 defer timer.Stop()
1535 respHeaderTimer = timer.C
1536 respHeaderRecv = cs.respHeaderRecv
1537 }
1538
1539
1540
1541 for {
1542 select {
1543 case <-cs.peerClosed:
1544 return nil
1545 case <-respHeaderTimer:
1546 return errTimeout
1547 case <-respHeaderRecv:
1548 respHeaderRecv = nil
1549 respHeaderTimer = nil
1550 case <-cs.abort:
1551 return cs.abortErr
1552 case <-ctx.Done():
1553 return ctx.Err()
1554 case <-cs.reqCancel:
1555 return errRequestCanceled
1556 }
1557 }
1558 }
1559
1560 func (cs *clientStream) encodeAndWriteHeaders(req *ClientRequest) error {
1561 cc := cs.cc
1562 ctx := cs.ctx
1563
1564 cc.wmu.Lock()
1565 defer cc.wmu.Unlock()
1566
1567
1568 select {
1569 case <-cs.abort:
1570 return cs.abortErr
1571 case <-ctx.Done():
1572 return ctx.Err()
1573 case <-cs.reqCancel:
1574 return errRequestCanceled
1575 default:
1576 }
1577
1578
1579
1580
1581
1582
1583 cc.hbuf.Reset()
1584 res, err := encodeRequestHeaders(req, cs.requestedGzip, cc.peerMaxHeaderListSize, func(name, value string) {
1585 cc.writeHeader(name, value)
1586 })
1587 if err != nil {
1588 return fmt.Errorf("http2: %w", err)
1589 }
1590 hdrs := cc.hbuf.Bytes()
1591
1592
1593 endStream := !res.HasBody && !res.HasTrailers
1594 cs.sentHeaders = true
1595 err = cc.writeHeaders(cs.ID, endStream, int(cc.maxFrameSize), hdrs)
1596 traceWroteHeaders(cs.trace)
1597 return err
1598 }
1599
1600 func encodeRequestHeaders(req *ClientRequest, addGzipHeader bool, peerMaxHeaderListSize uint64, headerf func(name, value string)) (httpcommon.EncodeHeadersResult, error) {
1601 return httpcommon.EncodeHeaders(req.Context, httpcommon.EncodeHeadersParam{
1602 Request: httpcommon.Request{
1603 Header: req.Header,
1604 Trailer: req.Trailer,
1605 URL: req.URL,
1606 Host: req.Host,
1607 Method: req.Method,
1608 ActualContentLength: actualContentLength(req),
1609 },
1610 AddGzipHeader: addGzipHeader,
1611 PeerMaxHeaderListSize: peerMaxHeaderListSize,
1612 DefaultUserAgent: defaultUserAgent,
1613 }, headerf)
1614 }
1615
1616
1617
1618
1619
1620 func (cs *clientStream) cleanupWriteRequest(err error) {
1621 cc := cs.cc
1622
1623 if cs.ID == 0 {
1624
1625 cc.decrStreamReservations()
1626 }
1627
1628
1629
1630
1631
1632 cc.mu.Lock()
1633 mustCloseBody := false
1634 if cs.reqBody != nil && cs.reqBodyClosed == nil {
1635 mustCloseBody = true
1636 cs.reqBodyClosed = make(chan struct{})
1637 }
1638 bodyClosed := cs.reqBodyClosed
1639 closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() || cc.goAway != nil
1640
1641 readSinceStream := cc.readBeforeStreamID > cs.ID
1642 cc.mu.Unlock()
1643 if mustCloseBody {
1644 cs.reqBody.Close()
1645 close(bodyClosed)
1646 }
1647 if bodyClosed != nil {
1648 <-bodyClosed
1649 }
1650
1651 if err != nil && cs.sentEndStream {
1652
1653
1654
1655 select {
1656 case <-cs.peerClosed:
1657 err = nil
1658 default:
1659 }
1660 }
1661 if err != nil {
1662 cs.abortStream(err)
1663 if cs.sentHeaders {
1664 if se, ok := err.(StreamError); ok {
1665 if se.Cause != errFromPeer {
1666 cc.writeStreamReset(cs.ID, se.Code, false, err)
1667 }
1668 } else {
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686 ping := false
1687 if !closeOnIdle && !readSinceStream {
1688 cc.mu.Lock()
1689
1690
1691 if !cc.rstStreamPingsBlocked {
1692 if cc.pendingResets == 0 {
1693 ping = true
1694 }
1695 cc.pendingResets++
1696 }
1697 cc.mu.Unlock()
1698 }
1699 cc.writeStreamReset(cs.ID, ErrCodeCancel, ping, err)
1700 }
1701 }
1702 cs.bufPipe.CloseWithError(err)
1703 } else {
1704 if cs.sentHeaders && !cs.sentEndStream {
1705 cc.writeStreamReset(cs.ID, ErrCodeNo, false, nil)
1706 }
1707 cs.bufPipe.CloseWithError(errRequestCanceled)
1708 }
1709 if cs.ID != 0 {
1710 cc.forgetStreamID(cs.ID)
1711 }
1712
1713 cc.wmu.Lock()
1714 werr := cc.werr
1715 cc.wmu.Unlock()
1716 if werr != nil {
1717 cc.Close()
1718 }
1719
1720 close(cs.donec)
1721 cc.maybeCallStateHook()
1722 }
1723
1724
1725
1726 func (cc *ClientConn) awaitOpenSlotForStreamLocked(cs *clientStream) error {
1727 for {
1728 if cc.closed && cc.nextStreamID == 1 && cc.streamsReserved == 0 {
1729
1730
1731 return errClientConnNotEstablished
1732 }
1733 cc.lastActive = time.Now()
1734 if cc.closed || !cc.canTakeNewRequestLocked() {
1735 return errClientConnUnusable
1736 }
1737 cc.lastIdle = time.Time{}
1738 if cc.currentRequestCountLocked() < int(cc.maxConcurrentStreams) {
1739 return nil
1740 }
1741 cc.pendingRequests++
1742 cc.cond.Wait()
1743 cc.pendingRequests--
1744 select {
1745 case <-cs.abort:
1746 return cs.abortErr
1747 default:
1748 }
1749 }
1750 }
1751
1752
1753 func (cc *ClientConn) writeHeaders(streamID uint32, endStream bool, maxFrameSize int, hdrs []byte) error {
1754 first := true
1755 for len(hdrs) > 0 && cc.werr == nil {
1756 chunk := hdrs
1757 if len(chunk) > maxFrameSize {
1758 chunk = chunk[:maxFrameSize]
1759 }
1760 hdrs = hdrs[len(chunk):]
1761 endHeaders := len(hdrs) == 0
1762 if first {
1763 cc.fr.WriteHeaders(HeadersFrameParam{
1764 StreamID: streamID,
1765 BlockFragment: chunk,
1766 EndStream: endStream,
1767 EndHeaders: endHeaders,
1768 })
1769 first = false
1770 } else {
1771 cc.fr.WriteContinuation(streamID, endHeaders, chunk)
1772 }
1773 }
1774 cc.bw.Flush()
1775 return cc.werr
1776 }
1777
1778
1779 var (
1780
1781 errStopReqBodyWrite = errors.New("http2: aborting request body write")
1782
1783
1784 errStopReqBodyWriteAndCancel = errors.New("http2: canceling request")
1785
1786 errReqBodyTooLong = errors.New("http2: request body larger than specified content length")
1787 )
1788
1789
1790
1791
1792
1793
1794 func (cs *clientStream) frameScratchBufferLen(maxFrameSize int) int {
1795 const max = 512 << 10
1796 n := min(int64(maxFrameSize), max)
1797 if cl := cs.reqBodyContentLength; cl != -1 && cl+1 < n {
1798
1799
1800
1801
1802 n = cl + 1
1803 }
1804 if n < 1 {
1805 return 1
1806 }
1807 return int(n)
1808 }
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818 var bufPools [7]sync.Pool
1819 func bufPoolIndex(size int) int {
1820 if size <= 16384 {
1821 return 0
1822 }
1823 size -= 1
1824 bits := bits.Len(uint(size))
1825 index := bits - 14
1826 if index >= len(bufPools) {
1827 return len(bufPools) - 1
1828 }
1829 return index
1830 }
1831
1832 func (cs *clientStream) writeRequestBody(req *ClientRequest) (err error) {
1833 cc := cs.cc
1834 body := cs.reqBody
1835 sentEnd := false
1836
1837 hasTrailers := req.Trailer != nil
1838 remainLen := cs.reqBodyContentLength
1839 hasContentLen := remainLen != -1
1840
1841 cc.mu.Lock()
1842 maxFrameSize := int(cc.maxFrameSize)
1843 cc.mu.Unlock()
1844
1845
1846 scratchLen := cs.frameScratchBufferLen(maxFrameSize)
1847 var buf []byte
1848 index := bufPoolIndex(scratchLen)
1849 if bp, ok := bufPools[index].Get().(*[]byte); ok && len(*bp) >= scratchLen {
1850 defer bufPools[index].Put(bp)
1851 buf = *bp
1852 } else {
1853 buf = make([]byte, scratchLen)
1854 defer bufPools[index].Put(&buf)
1855 }
1856
1857 var sawEOF bool
1858 for !sawEOF {
1859 n, err := body.Read(buf)
1860 if hasContentLen {
1861 remainLen -= int64(n)
1862 if remainLen == 0 && err == nil {
1863
1864
1865
1866
1867
1868
1869
1870 var scratch [1]byte
1871 var n1 int
1872 n1, err = body.Read(scratch[:])
1873 remainLen -= int64(n1)
1874 }
1875 if remainLen < 0 {
1876 err = errReqBodyTooLong
1877 return err
1878 }
1879 }
1880 if err != nil {
1881 cc.mu.Lock()
1882 bodyClosed := cs.reqBodyClosed != nil
1883 cc.mu.Unlock()
1884 switch {
1885 case bodyClosed:
1886 return errStopReqBodyWrite
1887 case err == io.EOF:
1888 sawEOF = true
1889 err = nil
1890 default:
1891 return err
1892 }
1893 }
1894
1895 remain := buf[:n]
1896 for len(remain) > 0 && err == nil {
1897 var allowed int32
1898 allowed, err = cs.awaitFlowControl(len(remain))
1899 if err != nil {
1900 return err
1901 }
1902 cc.wmu.Lock()
1903 data := remain[:allowed]
1904 remain = remain[allowed:]
1905 sentEnd = sawEOF && len(remain) == 0 && !hasTrailers
1906 err = cc.fr.WriteData(cs.ID, sentEnd, data)
1907 if err == nil {
1908
1909
1910
1911
1912
1913
1914 err = cc.bw.Flush()
1915 }
1916 cc.wmu.Unlock()
1917 }
1918 if err != nil {
1919 return err
1920 }
1921 }
1922
1923 if sentEnd {
1924
1925
1926
1927 return nil
1928 }
1929
1930
1931
1932
1933 cc.mu.Lock()
1934 trailer := req.Trailer
1935 err = cs.abortErr
1936 cc.mu.Unlock()
1937 if err != nil {
1938 return err
1939 }
1940
1941 cc.wmu.Lock()
1942 defer cc.wmu.Unlock()
1943 var trls []byte
1944 if len(trailer) > 0 {
1945 trls, err = cc.encodeTrailers(trailer)
1946 if err != nil {
1947 return err
1948 }
1949 }
1950
1951
1952
1953 if len(trls) > 0 {
1954 err = cc.writeHeaders(cs.ID, true, maxFrameSize, trls)
1955 } else {
1956 err = cc.fr.WriteData(cs.ID, true, nil)
1957 }
1958 if ferr := cc.bw.Flush(); ferr != nil && err == nil {
1959 err = ferr
1960 }
1961 return err
1962 }
1963
1964
1965
1966
1967
1968 func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
1969 cc := cs.cc
1970 ctx := cs.ctx
1971 cc.mu.Lock()
1972 defer cc.mu.Unlock()
1973 for {
1974 if cc.closed {
1975 return 0, errClientConnClosed
1976 }
1977 if cs.reqBodyClosed != nil {
1978 return 0, errStopReqBodyWrite
1979 }
1980 select {
1981 case <-cs.abort:
1982 return 0, cs.abortErr
1983 case <-ctx.Done():
1984 return 0, ctx.Err()
1985 case <-cs.reqCancel:
1986 return 0, errRequestCanceled
1987 default:
1988 }
1989 if a := cs.flow.available(); a > 0 {
1990 take := a
1991 if int(take) > maxBytes {
1992
1993 take = int32(maxBytes)
1994 }
1995 if take > int32(cc.maxFrameSize) {
1996 take = int32(cc.maxFrameSize)
1997 }
1998 cs.flow.take(take)
1999 return take, nil
2000 }
2001 cc.cond.Wait()
2002 }
2003 }
2004
2005
2006 func (cc *ClientConn) encodeTrailers(trailer Header) ([]byte, error) {
2007 cc.hbuf.Reset()
2008
2009 hlSize := uint64(0)
2010 for k, vv := range trailer {
2011 for _, v := range vv {
2012 hf := hpack.HeaderField{Name: k, Value: v}
2013 hlSize += uint64(hf.Size())
2014 }
2015 }
2016 if hlSize > cc.peerMaxHeaderListSize {
2017 return nil, errRequestHeaderListSize
2018 }
2019
2020 for k, vv := range trailer {
2021 lowKey, ascii := httpcommon.LowerHeader(k)
2022 if !ascii {
2023
2024
2025 continue
2026 }
2027
2028
2029 for _, v := range vv {
2030 cc.writeHeader(lowKey, v)
2031 }
2032 }
2033 return cc.hbuf.Bytes(), nil
2034 }
2035
2036 func (cc *ClientConn) writeHeader(name, value string) {
2037 if VerboseLogs {
2038 log.Printf("http2: Transport encoding header %q = %q", name, value)
2039 }
2040 cc.henc.WriteField(hpack.HeaderField{Name: name, Value: value})
2041 }
2042
2043 type resAndError struct {
2044 _ incomparable
2045 res *ClientResponse
2046 err error
2047 }
2048
2049
2050 func (cc *ClientConn) addStreamLocked(cs *clientStream) {
2051 cs.flow.add(int32(cc.initialWindowSize))
2052 cs.flow.setConnFlow(&cc.flow)
2053 cs.inflow.init(cc.initialStreamRecvWindowSize)
2054 cs.ID = cc.nextStreamID
2055 cc.nextStreamID += 2
2056 cc.streams[cs.ID] = cs
2057 if cs.ID == 0 {
2058 panic("assigned stream ID 0")
2059 }
2060 }
2061
2062 func (cc *ClientConn) forgetStreamID(id uint32) {
2063 cc.mu.Lock()
2064 slen := len(cc.streams)
2065 delete(cc.streams, id)
2066 if len(cc.streams) != slen-1 {
2067 panic("forgetting unknown stream id")
2068 }
2069 cc.lastActive = time.Now()
2070 if len(cc.streams) == 0 && cc.idleTimer != nil {
2071 cc.idleTimer.Reset(cc.idleTimeout)
2072 cc.lastIdle = time.Now()
2073 }
2074
2075
2076 cc.cond.Broadcast()
2077
2078 closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() || cc.goAway != nil
2079 if closeOnIdle && cc.streamsReserved == 0 && len(cc.streams) == 0 {
2080 if VerboseLogs {
2081 cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, cc.nextStreamID-2)
2082 }
2083 cc.closed = true
2084 defer cc.closeConn()
2085 }
2086
2087 cc.mu.Unlock()
2088 }
2089
2090
2091 type clientConnReadLoop struct {
2092 _ incomparable
2093 cc *ClientConn
2094 }
2095
2096
2097 func (cc *ClientConn) readLoop() {
2098 rl := &clientConnReadLoop{cc: cc}
2099 defer rl.cleanup()
2100 cc.readerErr = rl.run()
2101 if ce, ok := cc.readerErr.(ConnectionError); ok {
2102 cc.wmu.Lock()
2103 cc.fr.WriteGoAway(0, ErrCode(ce), nil)
2104 cc.wmu.Unlock()
2105 }
2106 }
2107
2108
2109
2110 type GoAwayError struct {
2111 LastStreamID uint32
2112 ErrCode ErrCode
2113 DebugData string
2114 }
2115
2116 func (e GoAwayError) Error() string {
2117 return fmt.Sprintf("http2: server sent GOAWAY and closed the connection; LastStreamID=%v, ErrCode=%v, debug=%q",
2118 e.LastStreamID, e.ErrCode, e.DebugData)
2119 }
2120
2121 func isEOFOrNetReadError(err error) bool {
2122 if err == io.EOF {
2123 return true
2124 }
2125 ne, ok := err.(*net.OpError)
2126 return ok && ne.Op == "read"
2127 }
2128
2129 func (rl *clientConnReadLoop) cleanup() {
2130 cc := rl.cc
2131 defer cc.closeConn()
2132 defer close(cc.readerDone)
2133
2134 if cc.idleTimer != nil {
2135 cc.idleTimer.Stop()
2136 }
2137
2138
2139
2140
2141 err := cc.readerErr
2142 cc.mu.Lock()
2143 if cc.goAway != nil && isEOFOrNetReadError(err) {
2144 err = GoAwayError{
2145 LastStreamID: cc.goAway.LastStreamID,
2146 ErrCode: cc.goAway.ErrCode,
2147 DebugData: cc.goAwayDebug,
2148 }
2149 } else if err == io.EOF {
2150 err = io.ErrUnexpectedEOF
2151 }
2152 cc.closed = true
2153
2154
2155
2156
2157
2158
2159
2160 unusedWaitTime := 5 * time.Second
2161 if cc.idleTimeout > 0 && unusedWaitTime > cc.idleTimeout {
2162 unusedWaitTime = cc.idleTimeout
2163 }
2164 idleTime := time.Now().Sub(cc.lastActive)
2165 if atomic.LoadUint32(&cc.atomicReused) == 0 && idleTime < unusedWaitTime && !cc.closedOnIdle {
2166 cc.idleTimer = time.AfterFunc(unusedWaitTime-idleTime, func() {
2167 cc.t.connPool().MarkDead(cc)
2168 })
2169 } else {
2170 cc.mu.Unlock()
2171 cc.t.connPool().MarkDead(cc)
2172 cc.mu.Lock()
2173 }
2174
2175 for _, cs := range cc.streams {
2176 select {
2177 case <-cs.peerClosed:
2178
2179
2180 default:
2181 cs.abortStreamLocked(err)
2182 }
2183 }
2184 cc.cond.Broadcast()
2185 cc.mu.Unlock()
2186
2187 if !cc.seenSettings {
2188
2189
2190 cc.extendedConnectAllowed = true
2191 close(cc.seenSettingsChan)
2192 }
2193 }
2194
2195
2196
2197 func (cc *ClientConn) countReadFrameError(err error) {
2198 f := cc.t.CountError
2199 if f == nil || err == nil {
2200 return
2201 }
2202 if ce, ok := err.(ConnectionError); ok {
2203 errCode := ErrCode(ce)
2204 f(fmt.Sprintf("read_frame_conn_error_%s", errCode.stringToken()))
2205 return
2206 }
2207 if errors.Is(err, io.EOF) {
2208 f("read_frame_eof")
2209 return
2210 }
2211 if errors.Is(err, io.ErrUnexpectedEOF) {
2212 f("read_frame_unexpected_eof")
2213 return
2214 }
2215 if errors.Is(err, ErrFrameTooLarge) {
2216 f("read_frame_too_large")
2217 return
2218 }
2219 f("read_frame_other")
2220 }
2221
2222 func (rl *clientConnReadLoop) run() error {
2223 cc := rl.cc
2224 gotSettings := false
2225 readIdleTimeout := cc.readIdleTimeout
2226 var t *time.Timer
2227 if readIdleTimeout != 0 {
2228 t = time.AfterFunc(readIdleTimeout, cc.healthCheck)
2229 }
2230 for {
2231 f, err := cc.fr.ReadFrame()
2232 if t != nil {
2233 t.Reset(readIdleTimeout)
2234 }
2235 if err != nil {
2236 cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
2237 }
2238 if se, ok := err.(StreamError); ok {
2239 if cs := rl.streamByID(se.StreamID, notHeaderOrDataFrame); cs != nil {
2240 if se.Cause == nil {
2241 se.Cause = cc.fr.errDetail
2242 }
2243 rl.endStreamError(cs, se)
2244 }
2245 continue
2246 } else if err != nil {
2247 cc.countReadFrameError(err)
2248 return err
2249 }
2250 if VerboseLogs {
2251 cc.vlogf("http2: Transport received %s", summarizeFrame(f))
2252 }
2253 if !gotSettings {
2254 if _, ok := f.(*SettingsFrame); !ok {
2255 cc.logf("protocol error: received %T before a SETTINGS frame", f)
2256 return ConnectionError(ErrCodeProtocol)
2257 }
2258 gotSettings = true
2259 }
2260
2261 switch f := f.(type) {
2262 case *MetaHeadersFrame:
2263 err = rl.processHeaders(f)
2264 case *DataFrame:
2265 err = rl.processData(f)
2266 case *GoAwayFrame:
2267 err = rl.processGoAway(f)
2268 case *RSTStreamFrame:
2269 err = rl.processResetStream(f)
2270 case *SettingsFrame:
2271 err = rl.processSettings(f)
2272 case *PushPromiseFrame:
2273 err = rl.processPushPromise(f)
2274 case *WindowUpdateFrame:
2275 err = rl.processWindowUpdate(f)
2276 case *PingFrame:
2277 err = rl.processPing(f)
2278 default:
2279 cc.logf("Transport: unhandled response frame type %T", f)
2280 }
2281 if err != nil {
2282 if VerboseLogs {
2283 cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, summarizeFrame(f), err)
2284 }
2285 return err
2286 }
2287 }
2288 }
2289
2290 func (rl *clientConnReadLoop) processHeaders(f *MetaHeadersFrame) error {
2291 cs := rl.streamByID(f.StreamID, headerOrDataFrame)
2292 if cs == nil {
2293
2294
2295
2296 return nil
2297 }
2298 if cs.readClosed {
2299 rl.endStreamError(cs, StreamError{
2300 StreamID: f.StreamID,
2301 Code: ErrCodeProtocol,
2302 Cause: errors.New("protocol error: headers after END_STREAM"),
2303 })
2304 return nil
2305 }
2306 if !cs.firstByte {
2307 if cs.trace != nil {
2308
2309
2310
2311
2312 traceFirstResponseByte(cs.trace)
2313 }
2314 cs.firstByte = true
2315 }
2316 if !cs.pastHeaders {
2317 cs.pastHeaders = true
2318 } else {
2319 return rl.processTrailers(cs, f)
2320 }
2321
2322 res, err := rl.handleResponse(cs, f)
2323 if err != nil {
2324 if _, ok := err.(ConnectionError); ok {
2325 return err
2326 }
2327
2328 rl.endStreamError(cs, StreamError{
2329 StreamID: f.StreamID,
2330 Code: ErrCodeProtocol,
2331 Cause: err,
2332 })
2333 return nil
2334 }
2335 if res == nil {
2336
2337 return nil
2338 }
2339 cs.res = res
2340 close(cs.respHeaderRecv)
2341 if f.StreamEnded() {
2342 rl.endStream(cs)
2343 }
2344 return nil
2345 }
2346
2347
2348
2349
2350
2351
2352
2353 func (rl *clientConnReadLoop) handleResponse(cs *clientStream, f *MetaHeadersFrame) (*ClientResponse, error) {
2354 if f.Truncated {
2355 return nil, errResponseHeaderListSize
2356 }
2357
2358 status := f.PseudoValue("status")
2359 if status == "" {
2360 return nil, errors.New("malformed response from server: missing status pseudo header")
2361 }
2362 statusCode, err := strconv.Atoi(status)
2363 if err != nil {
2364 return nil, errors.New("malformed response from server: malformed non-numeric status pseudo header")
2365 }
2366
2367 regularFields := f.RegularFields()
2368 strs := make([]string, len(regularFields))
2369 header := make(Header, len(regularFields))
2370 res := &cs.staticResp
2371 cs.staticResp = ClientResponse{
2372 Header: header,
2373 StatusCode: statusCode,
2374 Status: status,
2375 }
2376 for _, hf := range regularFields {
2377 key := httpcommon.CanonicalHeader(hf.Name)
2378 if key == "Trailer" {
2379 t := res.Trailer
2380 if t == nil {
2381 t = make(Header)
2382 res.Trailer = t
2383 }
2384 foreachHeaderElement(hf.Value, func(v string) {
2385 t[httpcommon.CanonicalHeader(v)] = nil
2386 })
2387 } else {
2388 vv := header[key]
2389 if vv == nil && len(strs) > 0 {
2390
2391
2392
2393
2394 vv, strs = strs[:1:1], strs[1:]
2395 vv[0] = hf.Value
2396 header[key] = vv
2397 } else {
2398 header[key] = append(vv, hf.Value)
2399 }
2400 }
2401 }
2402
2403 if statusCode >= 100 && statusCode <= 199 {
2404 if f.StreamEnded() {
2405 return nil, errors.New("1xx informational response with END_STREAM flag")
2406 }
2407 if fn := cs.get1xxTraceFunc(); fn != nil {
2408
2409
2410
2411 if err := fn(statusCode, textproto.MIMEHeader(header)); err != nil {
2412 return nil, err
2413 }
2414 } else {
2415
2416
2417
2418
2419
2420
2421
2422 limit := int64(cs.cc.t.maxHeaderListSize())
2423 if t1 := cs.cc.t.t1; t1 != nil && t1.MaxResponseHeaderBytes() > limit {
2424 limit = t1.MaxResponseHeaderBytes()
2425 }
2426 for _, h := range f.Fields {
2427 cs.totalHeaderSize += int64(h.Size())
2428 }
2429 if cs.totalHeaderSize > limit {
2430 if VerboseLogs {
2431 log.Printf("http2: 1xx informational responses too large")
2432 }
2433 return nil, errors.New("header list too large")
2434 }
2435 }
2436 if statusCode == 100 {
2437 traceGot100Continue(cs.trace)
2438 select {
2439 case cs.on100 <- struct{}{}:
2440 default:
2441 }
2442 }
2443 cs.pastHeaders = false
2444 return nil, nil
2445 }
2446
2447 res.ContentLength = -1
2448 if clens := res.Header["Content-Length"]; len(clens) == 1 {
2449 if cl, err := strconv.ParseUint(clens[0], 10, 63); err == nil {
2450 res.ContentLength = int64(cl)
2451 } else {
2452
2453
2454 }
2455 } else if len(clens) > 1 {
2456
2457
2458 } else if f.StreamEnded() && !cs.isHead {
2459 res.ContentLength = 0
2460 }
2461
2462 if cs.isHead {
2463 res.Body = NoBody
2464 return res, nil
2465 }
2466
2467 if f.StreamEnded() {
2468 if res.ContentLength > 0 {
2469 res.Body = missingBody{}
2470 } else {
2471 res.Body = NoBody
2472 }
2473 return res, nil
2474 }
2475
2476 cs.bufPipe.setBuffer(&dataBuffer{expected: res.ContentLength})
2477 cs.bytesRemain = res.ContentLength
2478 res.Body = transportResponseBody{cs}
2479
2480 if cs.requestedGzip && asciiEqualFold(res.Header.Get("Content-Encoding"), "gzip") {
2481 res.Header.Del("Content-Encoding")
2482 res.Header.Del("Content-Length")
2483 res.ContentLength = -1
2484 res.Body = &gzipReader{body: res.Body}
2485 res.Uncompressed = true
2486 }
2487 return res, nil
2488 }
2489
2490 func (rl *clientConnReadLoop) processTrailers(cs *clientStream, f *MetaHeadersFrame) error {
2491 if cs.pastTrailers {
2492
2493 return ConnectionError(ErrCodeProtocol)
2494 }
2495 cs.pastTrailers = true
2496 if !f.StreamEnded() {
2497
2498
2499 return ConnectionError(ErrCodeProtocol)
2500 }
2501 if len(f.PseudoFields()) > 0 {
2502
2503
2504 return ConnectionError(ErrCodeProtocol)
2505 }
2506
2507 trailer := make(Header)
2508 for _, hf := range f.RegularFields() {
2509 key := httpcommon.CanonicalHeader(hf.Name)
2510 trailer[key] = append(trailer[key], hf.Value)
2511 }
2512 cs.trailer = trailer
2513
2514 rl.endStream(cs)
2515 return nil
2516 }
2517
2518
2519
2520 type transportResponseBody struct {
2521 cs *clientStream
2522 }
2523
2524 func (b transportResponseBody) Read(p []byte) (n int, err error) {
2525 cs := b.cs
2526 cc := cs.cc
2527
2528 if cs.readErr != nil {
2529 return 0, cs.readErr
2530 }
2531 n, err = b.cs.bufPipe.Read(p)
2532 if cs.bytesRemain != -1 {
2533 if int64(n) > cs.bytesRemain {
2534 n = int(cs.bytesRemain)
2535 if err == nil {
2536 err = errors.New("net/http: server replied with more than declared Content-Length; truncated")
2537 cs.abortStream(err)
2538 }
2539 cs.readErr = err
2540 return int(cs.bytesRemain), err
2541 }
2542 cs.bytesRemain -= int64(n)
2543 if err == io.EOF && cs.bytesRemain > 0 {
2544 err = io.ErrUnexpectedEOF
2545 cs.readErr = err
2546 return n, err
2547 }
2548 }
2549 if n == 0 {
2550
2551 return
2552 }
2553
2554 cc.mu.Lock()
2555 connAdd := cc.inflow.add(n)
2556 var streamAdd int32
2557 if err == nil {
2558 streamAdd = cs.inflow.add(n)
2559 }
2560 cc.mu.Unlock()
2561
2562 if connAdd != 0 || streamAdd != 0 {
2563 cc.wmu.Lock()
2564 defer cc.wmu.Unlock()
2565 if connAdd != 0 {
2566 cc.fr.WriteWindowUpdate(0, mustUint31(connAdd))
2567 }
2568 if streamAdd != 0 {
2569 cc.fr.WriteWindowUpdate(cs.ID, mustUint31(streamAdd))
2570 }
2571 cc.bw.Flush()
2572 }
2573 return
2574 }
2575
2576 var errClosedResponseBody = errors.New("http2: response body closed")
2577
2578 func (b transportResponseBody) Close() error {
2579 cs := b.cs
2580 cc := cs.cc
2581
2582 cs.bufPipe.BreakWithError(errClosedResponseBody)
2583 cs.abortStream(errClosedResponseBody)
2584
2585 unread := cs.bufPipe.Len()
2586 if unread > 0 {
2587 cc.mu.Lock()
2588
2589 connAdd := cc.inflow.add(unread)
2590 cc.mu.Unlock()
2591
2592
2593
2594 cc.wmu.Lock()
2595
2596 if connAdd > 0 {
2597 cc.fr.WriteWindowUpdate(0, uint32(connAdd))
2598 }
2599 cc.bw.Flush()
2600 cc.wmu.Unlock()
2601 }
2602
2603 select {
2604 case <-cs.donec:
2605 case <-cs.ctx.Done():
2606
2607
2608
2609 return nil
2610 case <-cs.reqCancel:
2611 return errRequestCanceled
2612 }
2613 return nil
2614 }
2615
2616 func (rl *clientConnReadLoop) processData(f *DataFrame) error {
2617 cc := rl.cc
2618 cs := rl.streamByID(f.StreamID, headerOrDataFrame)
2619 data := f.Data()
2620 if cs == nil {
2621 cc.mu.Lock()
2622 neverSent := cc.nextStreamID
2623 cc.mu.Unlock()
2624 if f.StreamID >= neverSent {
2625
2626 cc.logf("http2: Transport received unsolicited DATA frame; closing connection")
2627 return ConnectionError(ErrCodeProtocol)
2628 }
2629
2630
2631
2632
2633
2634
2635 if f.Length > 0 {
2636 cc.mu.Lock()
2637 ok := cc.inflow.take(f.Length)
2638 connAdd := cc.inflow.add(int(f.Length))
2639 cc.mu.Unlock()
2640 if !ok {
2641 return ConnectionError(ErrCodeFlowControl)
2642 }
2643 if connAdd > 0 {
2644 cc.wmu.Lock()
2645 cc.fr.WriteWindowUpdate(0, uint32(connAdd))
2646 cc.bw.Flush()
2647 cc.wmu.Unlock()
2648 }
2649 }
2650 return nil
2651 }
2652 if cs.readClosed {
2653 cc.logf("protocol error: received DATA after END_STREAM")
2654 rl.endStreamError(cs, StreamError{
2655 StreamID: f.StreamID,
2656 Code: ErrCodeProtocol,
2657 })
2658 return nil
2659 }
2660 if !cs.pastHeaders {
2661 cc.logf("protocol error: received DATA before a HEADERS frame")
2662 rl.endStreamError(cs, StreamError{
2663 StreamID: f.StreamID,
2664 Code: ErrCodeProtocol,
2665 })
2666 return nil
2667 }
2668 if f.Length > 0 {
2669 if cs.isHead && len(data) > 0 {
2670 cc.logf("protocol error: received DATA on a HEAD request")
2671 rl.endStreamError(cs, StreamError{
2672 StreamID: f.StreamID,
2673 Code: ErrCodeProtocol,
2674 })
2675 return nil
2676 }
2677
2678 cc.mu.Lock()
2679 if !takeInflows(&cc.inflow, &cs.inflow, f.Length) {
2680 cc.mu.Unlock()
2681 return ConnectionError(ErrCodeFlowControl)
2682 }
2683
2684
2685 var refund int
2686 if pad := int(f.Length) - len(data); pad > 0 {
2687 refund += pad
2688 }
2689
2690 didReset := false
2691 var err error
2692 if len(data) > 0 {
2693 if _, err = cs.bufPipe.Write(data); err != nil {
2694
2695
2696 didReset = true
2697 refund += len(data)
2698 }
2699 }
2700
2701 sendConn := cc.inflow.add(refund)
2702 var sendStream int32
2703 if !didReset {
2704 sendStream = cs.inflow.add(refund)
2705 }
2706 cc.mu.Unlock()
2707
2708 if sendConn > 0 || sendStream > 0 {
2709 cc.wmu.Lock()
2710 if sendConn > 0 {
2711 cc.fr.WriteWindowUpdate(0, uint32(sendConn))
2712 }
2713 if sendStream > 0 {
2714 cc.fr.WriteWindowUpdate(cs.ID, uint32(sendStream))
2715 }
2716 cc.bw.Flush()
2717 cc.wmu.Unlock()
2718 }
2719
2720 if err != nil {
2721 rl.endStreamError(cs, err)
2722 return nil
2723 }
2724 }
2725
2726 if f.StreamEnded() {
2727 rl.endStream(cs)
2728 }
2729 return nil
2730 }
2731
2732 func (rl *clientConnReadLoop) endStream(cs *clientStream) {
2733
2734
2735 if !cs.readClosed {
2736 cs.readClosed = true
2737
2738
2739
2740
2741 rl.cc.mu.Lock()
2742 defer rl.cc.mu.Unlock()
2743 cs.bufPipe.closeWithErrorAndCode(io.EOF, cs.copyTrailers)
2744 close(cs.peerClosed)
2745 }
2746 }
2747
2748 func (rl *clientConnReadLoop) endStreamError(cs *clientStream, err error) {
2749 cs.readAborted = true
2750 cs.abortStream(err)
2751 }
2752
2753 func (rl *clientConnReadLoop) endStreamErrorLocked(cs *clientStream, err error) {
2754 cs.readAborted = true
2755 cs.abortStreamLocked(err)
2756 }
2757
2758
2759 const (
2760 headerOrDataFrame = true
2761 notHeaderOrDataFrame = false
2762 )
2763
2764
2765
2766 func (rl *clientConnReadLoop) streamByID(id uint32, headerOrData bool) *clientStream {
2767 rl.cc.mu.Lock()
2768 defer rl.cc.mu.Unlock()
2769 if headerOrData {
2770
2771
2772 rl.cc.rstStreamPingsBlocked = false
2773 }
2774 rl.cc.readBeforeStreamID = rl.cc.nextStreamID
2775 cs := rl.cc.streams[id]
2776 if cs != nil && !cs.readAborted {
2777 return cs
2778 }
2779 return nil
2780 }
2781
2782 func (cs *clientStream) copyTrailers() {
2783 for k, vv := range cs.trailer {
2784 t := cs.resTrailer
2785 if *t == nil {
2786 *t = make(Header)
2787 }
2788 (*t)[k] = vv
2789 }
2790 }
2791
2792 func (rl *clientConnReadLoop) processGoAway(f *GoAwayFrame) error {
2793 cc := rl.cc
2794 cc.t.connPool().MarkDead(cc)
2795 if f.ErrCode != 0 {
2796
2797 cc.vlogf("transport got GOAWAY with error code = %v", f.ErrCode)
2798 if fn := cc.t.CountError; fn != nil {
2799 fn("recv_goaway_" + f.ErrCode.stringToken())
2800 }
2801 }
2802 cc.setGoAway(f)
2803 return nil
2804 }
2805
2806 func (rl *clientConnReadLoop) processSettings(f *SettingsFrame) error {
2807 cc := rl.cc
2808
2809
2810 cc.wmu.Lock()
2811 defer cc.wmu.Unlock()
2812
2813 if err := rl.processSettingsNoWrite(f); err != nil {
2814 return err
2815 }
2816 if !f.IsAck() {
2817 cc.fr.WriteSettingsAck()
2818 cc.bw.Flush()
2819 }
2820 return nil
2821 }
2822
2823 func (rl *clientConnReadLoop) processSettingsNoWrite(f *SettingsFrame) error {
2824 cc := rl.cc
2825 defer cc.maybeCallStateHook()
2826 cc.mu.Lock()
2827 defer cc.mu.Unlock()
2828
2829 if f.IsAck() {
2830 if cc.wantSettingsAck {
2831 cc.wantSettingsAck = false
2832 return nil
2833 }
2834 return ConnectionError(ErrCodeProtocol)
2835 }
2836
2837 var seenMaxConcurrentStreams bool
2838 err := f.ForeachSetting(func(s Setting) error {
2839 if err := s.Valid(); err != nil {
2840 return err
2841 }
2842 switch s.ID {
2843 case SettingMaxFrameSize:
2844 cc.maxFrameSize = s.Val
2845 case SettingMaxConcurrentStreams:
2846 cc.maxConcurrentStreams = s.Val
2847 seenMaxConcurrentStreams = true
2848 case SettingMaxHeaderListSize:
2849 cc.peerMaxHeaderListSize = uint64(s.Val)
2850 case SettingInitialWindowSize:
2851
2852
2853
2854
2855 if s.Val > math.MaxInt32 {
2856 return ConnectionError(ErrCodeFlowControl)
2857 }
2858
2859
2860
2861
2862 delta := int32(s.Val) - int32(cc.initialWindowSize)
2863 for _, cs := range cc.streams {
2864 cs.flow.add(delta)
2865 }
2866 cc.cond.Broadcast()
2867
2868 cc.initialWindowSize = s.Val
2869 case SettingHeaderTableSize:
2870 cc.henc.SetMaxDynamicTableSize(s.Val)
2871 cc.peerMaxHeaderTableSize = s.Val
2872 case SettingEnableConnectProtocol:
2873
2874
2875
2876
2877
2878
2879
2880
2881 if !cc.seenSettings {
2882 cc.extendedConnectAllowed = s.Val == 1
2883 }
2884 default:
2885 cc.vlogf("Unhandled Setting: %v", s)
2886 }
2887 return nil
2888 })
2889 if err != nil {
2890 return err
2891 }
2892
2893 if !cc.seenSettings {
2894 if !seenMaxConcurrentStreams {
2895
2896
2897
2898
2899 cc.maxConcurrentStreams = defaultMaxConcurrentStreams
2900 }
2901 close(cc.seenSettingsChan)
2902 cc.seenSettings = true
2903 }
2904
2905 return nil
2906 }
2907
2908 func (rl *clientConnReadLoop) processWindowUpdate(f *WindowUpdateFrame) error {
2909 cc := rl.cc
2910 cs := rl.streamByID(f.StreamID, notHeaderOrDataFrame)
2911 if f.StreamID != 0 && cs == nil {
2912 return nil
2913 }
2914
2915 cc.mu.Lock()
2916 defer cc.mu.Unlock()
2917
2918 fl := &cc.flow
2919 if cs != nil {
2920 fl = &cs.flow
2921 }
2922 if !fl.add(int32(f.Increment)) {
2923
2924 if cs != nil {
2925 rl.endStreamErrorLocked(cs, StreamError{
2926 StreamID: f.StreamID,
2927 Code: ErrCodeFlowControl,
2928 })
2929 return nil
2930 }
2931
2932 return ConnectionError(ErrCodeFlowControl)
2933 }
2934 cc.cond.Broadcast()
2935 return nil
2936 }
2937
2938 func (rl *clientConnReadLoop) processResetStream(f *RSTStreamFrame) error {
2939 cs := rl.streamByID(f.StreamID, notHeaderOrDataFrame)
2940 if cs == nil {
2941
2942 return nil
2943 }
2944 serr := streamError(cs.ID, f.ErrCode)
2945 serr.Cause = errFromPeer
2946 if f.ErrCode == ErrCodeProtocol {
2947 rl.cc.SetDoNotReuse()
2948 }
2949 if fn := cs.cc.t.CountError; fn != nil {
2950 fn("recv_rststream_" + f.ErrCode.stringToken())
2951 }
2952 cs.abortStream(serr)
2953
2954 cs.bufPipe.CloseWithError(serr)
2955 return nil
2956 }
2957
2958
2959 func (cc *ClientConn) Ping(ctx context.Context) error {
2960 c := make(chan struct{})
2961
2962 var p [8]byte
2963 for {
2964 if _, err := rand.Read(p[:]); err != nil {
2965 return err
2966 }
2967 cc.mu.Lock()
2968
2969 if _, found := cc.pings[p]; !found {
2970 cc.pings[p] = c
2971 cc.mu.Unlock()
2972 break
2973 }
2974 cc.mu.Unlock()
2975 }
2976 var pingError error
2977 errc := make(chan struct{})
2978 go func() {
2979 cc.wmu.Lock()
2980 defer cc.wmu.Unlock()
2981 if pingError = cc.fr.WritePing(false, p); pingError != nil {
2982 close(errc)
2983 return
2984 }
2985 if pingError = cc.bw.Flush(); pingError != nil {
2986 close(errc)
2987 return
2988 }
2989 }()
2990 select {
2991 case <-c:
2992 return nil
2993 case <-errc:
2994 return pingError
2995 case <-ctx.Done():
2996 return ctx.Err()
2997 case <-cc.readerDone:
2998
2999 return cc.readerErr
3000 }
3001 }
3002
3003 func (rl *clientConnReadLoop) processPing(f *PingFrame) error {
3004 if f.IsAck() {
3005 cc := rl.cc
3006 defer cc.maybeCallStateHook()
3007 cc.mu.Lock()
3008 defer cc.mu.Unlock()
3009
3010 if c, ok := cc.pings[f.Data]; ok {
3011 close(c)
3012 delete(cc.pings, f.Data)
3013 }
3014 if cc.pendingResets > 0 {
3015
3016 cc.pendingResets = 0
3017 cc.rstStreamPingsBlocked = true
3018 cc.cond.Broadcast()
3019 }
3020 return nil
3021 }
3022 cc := rl.cc
3023 cc.wmu.Lock()
3024 defer cc.wmu.Unlock()
3025 if err := cc.fr.WritePing(true, f.Data); err != nil {
3026 return err
3027 }
3028 return cc.bw.Flush()
3029 }
3030
3031 func (rl *clientConnReadLoop) processPushPromise(f *PushPromiseFrame) error {
3032
3033
3034
3035
3036
3037
3038
3039 return ConnectionError(ErrCodeProtocol)
3040 }
3041
3042
3043
3044 func (cc *ClientConn) writeStreamReset(streamID uint32, code ErrCode, ping bool, err error) {
3045
3046
3047
3048
3049 cc.wmu.Lock()
3050 cc.fr.WriteRSTStream(streamID, code)
3051 if ping {
3052 var payload [8]byte
3053 rand.Read(payload[:])
3054 cc.fr.WritePing(false, payload)
3055 }
3056 cc.bw.Flush()
3057 cc.wmu.Unlock()
3058 }
3059
3060 var (
3061 errResponseHeaderListSize = errors.New("http2: response header list larger than advertised limit")
3062 errRequestHeaderListSize = httpcommon.ErrRequestHeaderListSize
3063 )
3064
3065 func (cc *ClientConn) logf(format string, args ...any) {
3066 cc.t.logf(format, args...)
3067 }
3068
3069 func (cc *ClientConn) vlogf(format string, args ...any) {
3070 cc.t.vlogf(format, args...)
3071 }
3072
3073 func (t *Transport) vlogf(format string, args ...any) {
3074 if VerboseLogs {
3075 t.logf(format, args...)
3076 }
3077 }
3078
3079 func (t *Transport) logf(format string, args ...any) {
3080 log.Printf(format, args...)
3081 }
3082
3083 type missingBody struct{}
3084
3085 func (missingBody) Close() error { return nil }
3086 func (missingBody) Read([]byte) (int, error) { return 0, io.ErrUnexpectedEOF }
3087
3088 type erringRoundTripper struct{ err error }
3089
3090 func (rt erringRoundTripper) RoundTripErr() error { return rt.err }
3091 func (rt erringRoundTripper) RoundTrip(*ClientRequest) (*ClientResponse, error) { return nil, rt.err }
3092
3093 var errConcurrentReadOnResBody = errors.New("http2: concurrent read on response body")
3094
3095
3096
3097
3098
3099 type gzipReader struct {
3100 _ incomparable
3101 body io.ReadCloser
3102 mu sync.Mutex
3103 zr *gzip.Reader
3104 zerr error
3105 }
3106
3107 type eofReader struct{}
3108
3109 func (eofReader) Read([]byte) (int, error) { return 0, io.EOF }
3110 func (eofReader) ReadByte() (byte, error) { return 0, io.EOF }
3111
3112 var gzipPool = sync.Pool{New: func() any { return new(gzip.Reader) }}
3113
3114
3115 func gzipPoolGet(r io.Reader) (*gzip.Reader, error) {
3116 zr := gzipPool.Get().(*gzip.Reader)
3117 if err := zr.Reset(r); err != nil {
3118 gzipPoolPut(zr)
3119 return nil, err
3120 }
3121 return zr, nil
3122 }
3123
3124
3125 func gzipPoolPut(zr *gzip.Reader) {
3126
3127
3128 var r flate.Reader = eofReader{}
3129 zr.Reset(r)
3130 gzipPool.Put(zr)
3131 }
3132
3133
3134
3135 func (gz *gzipReader) acquire() (*gzip.Reader, error) {
3136 gz.mu.Lock()
3137 defer gz.mu.Unlock()
3138 if gz.zerr != nil {
3139 return nil, gz.zerr
3140 }
3141 if gz.zr == nil {
3142 gz.zr, gz.zerr = gzipPoolGet(gz.body)
3143 if gz.zerr != nil {
3144 return nil, gz.zerr
3145 }
3146 }
3147 ret := gz.zr
3148 gz.zr, gz.zerr = nil, errConcurrentReadOnResBody
3149 return ret, nil
3150 }
3151
3152
3153 func (gz *gzipReader) release(zr *gzip.Reader) {
3154 gz.mu.Lock()
3155 defer gz.mu.Unlock()
3156 if gz.zerr == errConcurrentReadOnResBody {
3157 gz.zr, gz.zerr = zr, nil
3158 } else {
3159 gzipPoolPut(zr)
3160 }
3161 }
3162
3163
3164
3165 func (gz *gzipReader) close() {
3166 gz.mu.Lock()
3167 defer gz.mu.Unlock()
3168 if gz.zerr == nil && gz.zr != nil {
3169 gzipPoolPut(gz.zr)
3170 gz.zr = nil
3171 }
3172 gz.zerr = fs.ErrClosed
3173 }
3174
3175 func (gz *gzipReader) Read(p []byte) (n int, err error) {
3176 zr, err := gz.acquire()
3177 if err != nil {
3178 return 0, err
3179 }
3180 defer gz.release(zr)
3181
3182 return zr.Read(p)
3183 }
3184
3185 func (gz *gzipReader) Close() error {
3186 gz.close()
3187
3188 return gz.body.Close()
3189 }
3190
3191
3192
3193 func isConnectionCloseRequest(req *ClientRequest) bool {
3194 return req.Close || httpguts.HeaderValuesContainsToken(req.Header["Connection"], "close")
3195 }
3196
3197
3198
3199 type NetHTTPClientConn struct {
3200 cc *ClientConn
3201 }
3202
3203 func (cc NetHTTPClientConn) RoundTrip(req *ClientRequest) (*ClientResponse, error) {
3204 return cc.cc.RoundTrip(req)
3205 }
3206
3207 func (cc NetHTTPClientConn) Close() error {
3208 return cc.cc.Close()
3209 }
3210
3211 func (cc NetHTTPClientConn) Err() error {
3212 cc.cc.mu.Lock()
3213 defer cc.cc.mu.Unlock()
3214 if cc.cc.closed {
3215 return errors.New("connection closed")
3216 }
3217 return nil
3218 }
3219
3220 func (cc NetHTTPClientConn) Reserve() error {
3221 defer cc.cc.maybeCallStateHook()
3222 cc.cc.mu.Lock()
3223 defer cc.cc.mu.Unlock()
3224 if !cc.cc.canReserveLocked() {
3225 return errors.New("connection is unavailable")
3226 }
3227 cc.cc.streamsReserved++
3228 return nil
3229 }
3230
3231 func (cc NetHTTPClientConn) Release() {
3232 defer cc.cc.maybeCallStateHook()
3233 cc.cc.mu.Lock()
3234 defer cc.cc.mu.Unlock()
3235
3236
3237
3238
3239 if cc.cc.streamsReserved > 0 {
3240 cc.cc.streamsReserved--
3241 }
3242 }
3243
3244 func (cc NetHTTPClientConn) Available() int {
3245 cc.cc.mu.Lock()
3246 defer cc.cc.mu.Unlock()
3247 return cc.cc.availableLocked()
3248 }
3249
3250 func (cc NetHTTPClientConn) InFlight() int {
3251 cc.cc.mu.Lock()
3252 defer cc.cc.mu.Unlock()
3253 return cc.cc.currentRequestCountLocked()
3254 }
3255
3256 func (cc *ClientConn) maybeCallStateHook() {
3257 if cc.internalStateHook != nil {
3258 cc.internalStateHook()
3259 }
3260 }
3261
3262 func (t *Transport) idleConnTimeout() time.Duration {
3263
3264
3265
3266 if t.IdleConnTimeout != 0 {
3267 return t.IdleConnTimeout
3268 }
3269
3270 if t.t1 != nil {
3271 return t.t1.IdleConnTimeout()
3272 }
3273
3274 return 0
3275 }
3276
3277 func traceGetConn(req *ClientRequest, hostPort string) {
3278 trace := httptrace.ContextClientTrace(req.Context)
3279 if trace == nil || trace.GetConn == nil {
3280 return
3281 }
3282 trace.GetConn(hostPort)
3283 }
3284
3285 func traceGotConn(req *ClientRequest, cc *ClientConn, reused bool) {
3286 trace := httptrace.ContextClientTrace(req.Context)
3287 if trace == nil || trace.GotConn == nil {
3288 return
3289 }
3290 ci := httptrace.GotConnInfo{Conn: cc.tconn}
3291 ci.Reused = reused
3292 cc.mu.Lock()
3293 ci.WasIdle = len(cc.streams) == 0 && reused
3294 if ci.WasIdle && !cc.lastActive.IsZero() {
3295 ci.IdleTime = time.Since(cc.lastActive)
3296 }
3297 cc.mu.Unlock()
3298
3299 trace.GotConn(ci)
3300 }
3301
3302 func traceWroteHeaders(trace *httptrace.ClientTrace) {
3303 if trace != nil && trace.WroteHeaders != nil {
3304 trace.WroteHeaders()
3305 }
3306 }
3307
3308 func traceGot100Continue(trace *httptrace.ClientTrace) {
3309 if trace != nil && trace.Got100Continue != nil {
3310 trace.Got100Continue()
3311 }
3312 }
3313
3314 func traceWait100Continue(trace *httptrace.ClientTrace) {
3315 if trace != nil && trace.Wait100Continue != nil {
3316 trace.Wait100Continue()
3317 }
3318 }
3319
3320 func traceWroteRequest(trace *httptrace.ClientTrace, err error) {
3321 if trace != nil && trace.WroteRequest != nil {
3322 trace.WroteRequest(httptrace.WroteRequestInfo{Err: err})
3323 }
3324 }
3325
3326 func traceFirstResponseByte(trace *httptrace.ClientTrace) {
3327 if trace != nil && trace.GotFirstResponseByte != nil {
3328 trace.GotFirstResponseByte()
3329 }
3330 }
3331
3332 func traceGot1xxResponseFunc(trace *httptrace.ClientTrace) func(int, textproto.MIMEHeader) error {
3333 if trace != nil {
3334 return trace.Got1xxResponse
3335 }
3336 return nil
3337 }
3338
3339
3340
3341 func (t *Transport) dialTLSWithContext(ctx context.Context, network, addr string, cfg *tls.Config) (*tls.Conn, error) {
3342 dialer := &tls.Dialer{
3343 Config: cfg,
3344 }
3345 cn, err := dialer.DialContext(ctx, network, addr)
3346 if err != nil {
3347 return nil, err
3348 }
3349 tlsCn := cn.(*tls.Conn)
3350 return tlsCn, nil
3351 }
3352
View as plain text