Source file src/net/http/internal/http2/transport.go

     1  // Copyright 2015 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  // Transport code.
     6  
     7  package http2
     8  
     9  import (
    10  	"bufio"
    11  	"bytes"
    12  	"compress/flate"
    13  	"compress/gzip"
    14  	"context"
    15  	"crypto/rand"
    16  	"crypto/tls"
    17  	"errors"
    18  	"fmt"
    19  	"io"
    20  	"io/fs"
    21  	"log"
    22  	"math"
    23  	"math/bits"
    24  	mathrand "math/rand"
    25  	"net"
    26  	"net/http/httptrace"
    27  	"net/http/internal"
    28  	"net/http/internal/httpcommon"
    29  	"net/textproto"
    30  	"slices"
    31  	"strconv"
    32  	"strings"
    33  	"sync"
    34  	"sync/atomic"
    35  	"time"
    36  
    37  	"golang.org/x/net/http/httpguts"
    38  	"golang.org/x/net/http2/hpack"
    39  	"golang.org/x/net/idna"
    40  )
    41  
    42  const (
    43  	// transportDefaultConnFlow is how many connection-level flow control
    44  	// tokens we give the server at start-up, past the default 64k.
    45  	transportDefaultConnFlow = 1 << 30
    46  
    47  	// transportDefaultStreamFlow is how many stream-level flow
    48  	// control tokens we announce to the peer, and how many bytes
    49  	// we buffer per stream.
    50  	transportDefaultStreamFlow = 4 << 20
    51  
    52  	defaultUserAgent = "Go-http-client/2.0"
    53  
    54  	// initialMaxConcurrentStreams is a connections maxConcurrentStreams until
    55  	// it's received servers initial SETTINGS frame, which corresponds with the
    56  	// spec's minimum recommended value.
    57  	initialMaxConcurrentStreams = 100
    58  
    59  	// defaultMaxConcurrentStreams is a connections default maxConcurrentStreams
    60  	// if the server doesn't include one in its initial SETTINGS frame.
    61  	defaultMaxConcurrentStreams = 1000
    62  )
    63  
    64  // Transport is an HTTP/2 Transport.
    65  //
    66  // A Transport internally caches connections to servers. It is safe
    67  // for concurrent use by multiple goroutines.
    68  type Transport struct {
    69  	// DialTLSContext specifies an optional dial function with context for
    70  	// creating TLS connections for requests.
    71  	//
    72  	// If DialTLSContext and DialTLS is nil, tls.Dial is used.
    73  	//
    74  	// If the returned net.Conn has a ConnectionState method like tls.Conn,
    75  	// it will be used to set http.Response.TLS.
    76  	DialTLSContext func(ctx context.Context, network, addr string, cfg *tls.Config) (net.Conn, error)
    77  
    78  	// DialTLS specifies an optional dial function for creating
    79  	// TLS connections for requests.
    80  	//
    81  	// If DialTLSContext and DialTLS is nil, tls.Dial is used.
    82  	//
    83  	// Deprecated: Use DialTLSContext instead, which allows the transport
    84  	// to cancel dials as soon as they are no longer needed.
    85  	// If both are set, DialTLSContext takes priority.
    86  	DialTLS func(network, addr string, cfg *tls.Config) (net.Conn, error)
    87  
    88  	// TLSClientConfig specifies the TLS configuration to use with
    89  	// tls.Client. If nil, the default configuration is used.
    90  	TLSClientConfig *tls.Config
    91  
    92  	// ConnPool optionally specifies an alternate connection pool to use.
    93  	// If nil, the default is used.
    94  	ConnPool ClientConnPool
    95  
    96  	// DisableCompression, if true, prevents the Transport from
    97  	// requesting compression with an "Accept-Encoding: gzip"
    98  	// request header when the Request contains no existing
    99  	// Accept-Encoding value. If the Transport requests gzip on
   100  	// its own and gets a gzipped response, it's transparently
   101  	// decoded in the Response.Body. However, if the user
   102  	// explicitly requested gzip it is not automatically
   103  	// uncompressed.
   104  	DisableCompression bool
   105  
   106  	// AllowHTTP, if true, permits HTTP/2 requests using the insecure,
   107  	// plain-text "http" scheme. Note that this does not enable h2c support.
   108  	AllowHTTP bool
   109  
   110  	// MaxHeaderListSize is the http2 SETTINGS_MAX_HEADER_LIST_SIZE to
   111  	// send in the initial settings frame. It is how many bytes
   112  	// of response headers are allowed. Unlike the http2 spec, zero here
   113  	// means to use a default limit (currently 10MB). If you actually
   114  	// want to advertise an unlimited value to the peer, Transport
   115  	// interprets the highest possible value here (0xffffffff or 1<<32-1)
   116  	// to mean no limit.
   117  	MaxHeaderListSize uint32
   118  
   119  	// MaxReadFrameSize is the http2 SETTINGS_MAX_FRAME_SIZE to send in the
   120  	// initial settings frame. It is the size in bytes of the largest frame
   121  	// payload that the sender is willing to receive. If 0, no setting is
   122  	// sent, and the value is provided by the peer, which should be 16384
   123  	// according to the spec:
   124  	// https://datatracker.ietf.org/doc/html/rfc7540#section-6.5.2.
   125  	// Values are bounded in the range 16k to 16M.
   126  	MaxReadFrameSize uint32
   127  
   128  	// MaxDecoderHeaderTableSize optionally specifies the http2
   129  	// SETTINGS_HEADER_TABLE_SIZE to send in the initial settings frame. It
   130  	// informs the remote endpoint of the maximum size of the header compression
   131  	// table used to decode header blocks, in octets. If zero, the default value
   132  	// of 4096 is used.
   133  	MaxDecoderHeaderTableSize uint32
   134  
   135  	// MaxEncoderHeaderTableSize optionally specifies an upper limit for the
   136  	// header compression table used for encoding request headers. Received
   137  	// SETTINGS_HEADER_TABLE_SIZE settings are capped at this limit. If zero,
   138  	// the default value of 4096 is used.
   139  	MaxEncoderHeaderTableSize uint32
   140  
   141  	// StrictMaxConcurrentStreams controls whether the server's
   142  	// SETTINGS_MAX_CONCURRENT_STREAMS should be respected
   143  	// globally. If false, new TCP connections are created to the
   144  	// server as needed to keep each under the per-connection
   145  	// SETTINGS_MAX_CONCURRENT_STREAMS limit. If true, the
   146  	// server's SETTINGS_MAX_CONCURRENT_STREAMS is interpreted as
   147  	// a global limit and callers of RoundTrip block when needed,
   148  	// waiting for their turn.
   149  	StrictMaxConcurrentStreams bool
   150  
   151  	// IdleConnTimeout is the maximum amount of time an idle
   152  	// (keep-alive) connection will remain idle before closing
   153  	// itself.
   154  	// Zero means no limit.
   155  	IdleConnTimeout time.Duration
   156  
   157  	// ReadIdleTimeout is the timeout after which a health check using ping
   158  	// frame will be carried out if no frame is received on the connection.
   159  	// Note that a ping response will is considered a received frame, so if
   160  	// there is no other traffic on the connection, the health check will
   161  	// be performed every ReadIdleTimeout interval.
   162  	// If zero, no health check is performed.
   163  	ReadIdleTimeout time.Duration
   164  
   165  	// PingTimeout is the timeout after which the connection will be closed
   166  	// if a response to Ping is not received.
   167  	// Defaults to 15s.
   168  	PingTimeout time.Duration
   169  
   170  	// WriteByteTimeout is the timeout after which the connection will be
   171  	// closed no data can be written to it. The timeout begins when data is
   172  	// available to write, and is extended whenever any bytes are written.
   173  	WriteByteTimeout time.Duration
   174  
   175  	// CountError, if non-nil, is called on HTTP/2 transport errors.
   176  	// It's intended to increment a metric for monitoring, such
   177  	// as an expvar or Prometheus metric.
   178  	// The errType consists of only ASCII word characters.
   179  	CountError func(errType string)
   180  
   181  	t1 TransportConfig
   182  
   183  	connPoolOnce  sync.Once
   184  	connPoolOrDef ClientConnPool // non-nil version of ConnPool
   185  
   186  	*transportTestHooks
   187  }
   188  
   189  // Hook points used for testing.
   190  // Outside of tests, t.transportTestHooks is nil and these all have minimal implementations.
   191  // Inside tests, see the testSyncHooks function docs.
   192  
   193  type transportTestHooks struct {
   194  	newclientconn func(*ClientConn)
   195  }
   196  
   197  func (t *Transport) maxHeaderListSize() uint32 {
   198  	n := t.t1.MaxResponseHeaderBytes()
   199  	if n > 0 {
   200  		n = adjustHTTP1MaxHeaderSize(n)
   201  	}
   202  	if n <= 0 {
   203  		return 10 << 20
   204  	}
   205  	if n >= 0xffffffff {
   206  		return 0
   207  	}
   208  	return uint32(n)
   209  }
   210  
   211  func (t *Transport) disableCompression() bool {
   212  	return t.DisableCompression || (t.t1 != nil && t.t1.DisableCompression())
   213  }
   214  
   215  func NewTransport(t1 TransportConfig) *Transport {
   216  	connPool := new(clientConnPool)
   217  	t2 := &Transport{
   218  		ConnPool: noDialClientConnPool{connPool},
   219  		t1:       t1,
   220  	}
   221  	connPool.t = t2
   222  	return t2
   223  }
   224  
   225  func (t *Transport) AddConn(scheme, authority string, c net.Conn) error {
   226  	connPool, ok := t.ConnPool.(noDialClientConnPool)
   227  	if !ok {
   228  		go c.Close()
   229  		return nil
   230  	}
   231  	addr := authorityAddr(scheme, authority)
   232  	used, err := connPool.addConnIfNeeded(addr, t, c)
   233  	if !used {
   234  		go c.Close()
   235  	}
   236  	return err
   237  }
   238  
   239  // unencryptedTransport is a Transport with a RoundTrip method that
   240  // always permits http:// URLs.
   241  type unencryptedTransport Transport
   242  
   243  func (t *unencryptedTransport) RoundTrip(req *ClientRequest) (*ClientResponse, error) {
   244  	return (*Transport)(t).RoundTripOpt(req, RoundTripOpt{allowHTTP: true})
   245  }
   246  
   247  func (t *Transport) connPool() ClientConnPool {
   248  	t.connPoolOnce.Do(t.initConnPool)
   249  	return t.connPoolOrDef
   250  }
   251  
   252  func (t *Transport) initConnPool() {
   253  	if t.ConnPool != nil {
   254  		t.connPoolOrDef = t.ConnPool
   255  	} else {
   256  		t.connPoolOrDef = &clientConnPool{t: t}
   257  	}
   258  }
   259  
   260  // ClientConn is the state of a single HTTP/2 client connection to an
   261  // HTTP/2 server.
   262  type ClientConn struct {
   263  	t             *Transport
   264  	tconn         net.Conn             // usually *tls.Conn, except specialized impls
   265  	tlsState      *tls.ConnectionState // nil only for specialized impls
   266  	atomicReused  uint32               // whether conn is being reused; atomic
   267  	singleUse     bool                 // whether being used for a single http.Request
   268  	getConnCalled bool                 // used by clientConnPool
   269  
   270  	// readLoop goroutine fields:
   271  	readerDone chan struct{} // closed on error
   272  	readerErr  error         // set before readerDone is closed
   273  
   274  	idleTimeout time.Duration // or 0 for never
   275  	idleTimer   *time.Timer
   276  
   277  	mu               sync.Mutex // guards following
   278  	cond             *sync.Cond // hold mu; broadcast on flow/closed changes
   279  	flow             outflow    // our conn-level flow control quota (cs.outflow is per stream)
   280  	inflow           inflow     // peer's conn-level flow control
   281  	doNotReuse       bool       // whether conn is marked to not be reused for any future requests
   282  	closing          bool
   283  	closed           bool
   284  	closedOnIdle     bool                     // true if conn was closed for idleness
   285  	seenSettings     bool                     // true if we've seen a settings frame, false otherwise
   286  	seenSettingsChan chan struct{}            // closed when seenSettings is true or frame reading fails
   287  	wantSettingsAck  bool                     // we sent a SETTINGS frame and haven't heard back
   288  	goAway           *GoAwayFrame             // if non-nil, the GoAwayFrame we received
   289  	goAwayDebug      string                   // goAway frame's debug data, retained as a string
   290  	streams          map[uint32]*clientStream // client-initiated
   291  	streamsReserved  int                      // incr by ReserveNewRequest; decr on RoundTrip
   292  	nextStreamID     uint32
   293  	pendingRequests  int                       // requests blocked and waiting to be sent because len(streams) == maxConcurrentStreams
   294  	pings            map[[8]byte]chan struct{} // in flight ping data to notification channel
   295  	br               *bufio.Reader
   296  	lastActive       time.Time
   297  	lastIdle         time.Time // time last idle
   298  	// Settings from peer: (also guarded by wmu)
   299  	maxFrameSize                uint32
   300  	maxConcurrentStreams        uint32
   301  	peerMaxHeaderListSize       uint64
   302  	peerMaxHeaderTableSize      uint32
   303  	initialWindowSize           uint32
   304  	initialStreamRecvWindowSize int32
   305  	readIdleTimeout             time.Duration
   306  	pingTimeout                 time.Duration
   307  	extendedConnectAllowed      bool
   308  	strictMaxConcurrentStreams  bool
   309  
   310  	// rstStreamPingsBlocked works around an unfortunate gRPC behavior.
   311  	// gRPC strictly limits the number of PING frames that it will receive.
   312  	// The default is two pings per two hours, but the limit resets every time
   313  	// the gRPC endpoint sends a HEADERS or DATA frame. See golang/go#70575.
   314  	//
   315  	// rstStreamPingsBlocked is set after receiving a response to a PING frame
   316  	// bundled with an RST_STREAM (see pendingResets below), and cleared after
   317  	// receiving a HEADERS or DATA frame.
   318  	rstStreamPingsBlocked bool
   319  
   320  	// pendingResets is the number of RST_STREAM frames we have sent to the peer,
   321  	// without confirming that the peer has received them. When we send a RST_STREAM,
   322  	// we bundle it with a PING frame, unless a PING is already in flight. We count
   323  	// the reset stream against the connection's concurrency limit until we get
   324  	// a PING response. This limits the number of requests we'll try to send to a
   325  	// completely unresponsive connection.
   326  	pendingResets int
   327  
   328  	// readBeforeStreamID is the smallest stream ID that has not been followed by
   329  	// a frame read from the peer. We use this to determine when a request may
   330  	// have been sent to a completely unresponsive connection:
   331  	// If the request ID is less than readBeforeStreamID, then we have had some
   332  	// indication of life on the connection since sending the request.
   333  	readBeforeStreamID uint32
   334  
   335  	// reqHeaderMu is a 1-element semaphore channel controlling access to sending new requests.
   336  	// Write to reqHeaderMu to lock it, read from it to unlock.
   337  	// Lock reqmu BEFORE mu or wmu.
   338  	reqHeaderMu chan struct{}
   339  
   340  	// internalStateHook reports state changes back to the net/http.ClientConn.
   341  	// Note that this is different from the user state hook registered by
   342  	// net/http.ClientConn.SetStateHook: The internal hook calls ClientConn,
   343  	// which calls the user hook.
   344  	internalStateHook func()
   345  
   346  	// wmu is held while writing.
   347  	// Acquire BEFORE mu when holding both, to avoid blocking mu on network writes.
   348  	// Only acquire both at the same time when changing peer settings.
   349  	wmu  sync.Mutex
   350  	bw   *bufio.Writer
   351  	fr   *Framer
   352  	werr error        // first write error that has occurred
   353  	hbuf bytes.Buffer // HPACK encoder writes into this
   354  	henc *hpack.Encoder
   355  }
   356  
   357  // clientStream is the state for a single HTTP/2 stream. One of these
   358  // is created for each Transport.RoundTrip call.
   359  type clientStream struct {
   360  	cc *ClientConn
   361  
   362  	// Fields of Request that we may access even after the response body is closed.
   363  	ctx       context.Context
   364  	reqCancel <-chan struct{}
   365  
   366  	trace         *httptrace.ClientTrace // or nil
   367  	ID            uint32
   368  	bufPipe       pipe // buffered pipe with the flow-controlled response payload
   369  	requestedGzip bool
   370  	isHead        bool
   371  
   372  	abortOnce sync.Once
   373  	abort     chan struct{} // closed to signal stream should end immediately
   374  	abortErr  error         // set if abort is closed
   375  
   376  	peerClosed chan struct{} // closed when the peer sends an END_STREAM flag
   377  	donec      chan struct{} // closed after the stream is in the closed state
   378  	on100      chan struct{} // buffered; written to if a 100 is received
   379  
   380  	respHeaderRecv chan struct{}   // closed when headers are received
   381  	res            *ClientResponse // set if respHeaderRecv is closed
   382  
   383  	flow        outflow // guarded by cc.mu
   384  	inflow      inflow  // guarded by cc.mu
   385  	bytesRemain int64   // -1 means unknown; owned by transportResponseBody.Read
   386  	readErr     error   // sticky read error; owned by transportResponseBody.Read
   387  
   388  	reqBody              io.ReadCloser
   389  	reqBodyContentLength int64         // -1 means unknown
   390  	reqBodyClosed        chan struct{} // guarded by cc.mu; non-nil on Close, closed when done
   391  
   392  	// owned by writeRequest:
   393  	sentEndStream bool // sent an END_STREAM flag to the peer
   394  	sentHeaders   bool
   395  
   396  	// owned by clientConnReadLoop:
   397  	firstByte       bool  // got the first response byte
   398  	pastHeaders     bool  // got first MetaHeadersFrame (actual headers)
   399  	pastTrailers    bool  // got optional second MetaHeadersFrame (trailers)
   400  	readClosed      bool  // peer sent an END_STREAM flag
   401  	readAborted     bool  // read loop reset the stream
   402  	totalHeaderSize int64 // total size of 1xx headers seen
   403  
   404  	trailer    Header  // accumulated trailers
   405  	resTrailer *Header // client's Response.Trailer
   406  
   407  	staticResp ClientResponse
   408  }
   409  
   410  var got1xxFuncForTests func(int, textproto.MIMEHeader) error
   411  
   412  // get1xxTraceFunc returns the value of request's httptrace.ClientTrace.Got1xxResponse func,
   413  // if any. It returns nil if not set or if the Go version is too old.
   414  func (cs *clientStream) get1xxTraceFunc() func(int, textproto.MIMEHeader) error {
   415  	if fn := got1xxFuncForTests; fn != nil {
   416  		return fn
   417  	}
   418  	return traceGot1xxResponseFunc(cs.trace)
   419  }
   420  
   421  func (cs *clientStream) abortStream(err error) {
   422  	cs.cc.mu.Lock()
   423  	defer cs.cc.mu.Unlock()
   424  	cs.abortStreamLocked(err)
   425  }
   426  
   427  func (cs *clientStream) abortStreamLocked(err error) {
   428  	cs.abortOnce.Do(func() {
   429  		cs.abortErr = err
   430  		close(cs.abort)
   431  	})
   432  	if cs.reqBody != nil {
   433  		cs.closeReqBodyLocked()
   434  	}
   435  	// TODO(dneil): Clean up tests where cs.cc.cond is nil.
   436  	if cs.cc.cond != nil {
   437  		// Wake up writeRequestBody if it is waiting on flow control.
   438  		cs.cc.cond.Broadcast()
   439  	}
   440  }
   441  
   442  func (cs *clientStream) abortRequestBodyWrite() {
   443  	cc := cs.cc
   444  	cc.mu.Lock()
   445  	defer cc.mu.Unlock()
   446  	if cs.reqBody != nil && cs.reqBodyClosed == nil {
   447  		cs.closeReqBodyLocked()
   448  		cc.cond.Broadcast()
   449  	}
   450  }
   451  
   452  func (cs *clientStream) closeReqBodyLocked() {
   453  	if cs.reqBodyClosed != nil {
   454  		return
   455  	}
   456  	cs.reqBodyClosed = make(chan struct{})
   457  	reqBodyClosed := cs.reqBodyClosed
   458  	go func() {
   459  		cs.reqBody.Close()
   460  		close(reqBodyClosed)
   461  	}()
   462  }
   463  
   464  type stickyErrWriter struct {
   465  	conn    net.Conn
   466  	timeout time.Duration
   467  	err     *error
   468  }
   469  
   470  func (sew stickyErrWriter) Write(p []byte) (n int, err error) {
   471  	if *sew.err != nil {
   472  		return 0, *sew.err
   473  	}
   474  	n, err = writeWithByteTimeout(sew.conn, sew.timeout, p)
   475  	*sew.err = err
   476  	return n, err
   477  }
   478  
   479  // noCachedConnError is the concrete type of ErrNoCachedConn, which
   480  // needs to be detected by net/http regardless of whether it's its
   481  // bundled version (in h2_bundle.go with a rewritten type name) or
   482  // from a user's x/net/http2. As such, as it has a unique method name
   483  // (IsHTTP2NoCachedConnError) that net/http sniffs for via func
   484  // isNoCachedConnError.
   485  type noCachedConnError struct{}
   486  
   487  func (noCachedConnError) IsHTTP2NoCachedConnError() {}
   488  func (noCachedConnError) Error() string             { return "http2: no cached connection was available" }
   489  
   490  // isNoCachedConnError reports whether err is of type noCachedConnError
   491  // or its equivalent renamed type in net/http2's h2_bundle.go. Both types
   492  // may coexist in the same running program.
   493  func isNoCachedConnError(err error) bool {
   494  	_, ok := err.(interface{ IsHTTP2NoCachedConnError() })
   495  	return ok
   496  }
   497  
   498  var ErrNoCachedConn error = noCachedConnError{}
   499  
   500  // RoundTripOpt are options for the Transport.RoundTripOpt method.
   501  type RoundTripOpt struct {
   502  	// OnlyCachedConn controls whether RoundTripOpt may
   503  	// create a new TCP connection. If set true and
   504  	// no cached connection is available, RoundTripOpt
   505  	// will return ErrNoCachedConn.
   506  	OnlyCachedConn bool
   507  
   508  	allowHTTP bool // allow http:// URLs
   509  }
   510  
   511  func (t *Transport) RoundTrip(req *ClientRequest) (*ClientResponse, error) {
   512  	return t.RoundTripOpt(req, RoundTripOpt{})
   513  }
   514  
   515  // authorityAddr returns a given authority (a host/IP, or host:port / ip:port)
   516  // and returns a host:port. The port 443 is added if needed.
   517  func authorityAddr(scheme string, authority string) (addr string) {
   518  	host, port, err := net.SplitHostPort(authority)
   519  	if err != nil { // authority didn't have a port
   520  		host = authority
   521  		port = ""
   522  	}
   523  	if port == "" { // authority's port was empty
   524  		port = "443"
   525  		if scheme == "http" {
   526  			port = "80"
   527  		}
   528  	}
   529  	if a, err := idna.ToASCII(host); err == nil {
   530  		host = a
   531  	}
   532  	// IPv6 address literal, without a port:
   533  	if strings.HasPrefix(host, "[") && strings.HasSuffix(host, "]") {
   534  		return host + ":" + port
   535  	}
   536  	return net.JoinHostPort(host, port)
   537  }
   538  
   539  // RoundTripOpt is like RoundTrip, but takes options.
   540  func (t *Transport) RoundTripOpt(req *ClientRequest, opt RoundTripOpt) (*ClientResponse, error) {
   541  	switch req.URL.Scheme {
   542  	case "https":
   543  		// Always okay.
   544  	case "http":
   545  		if !t.AllowHTTP && !opt.allowHTTP {
   546  			return nil, errors.New("http2: unencrypted HTTP/2 not enabled")
   547  		}
   548  	default:
   549  		return nil, errors.New("http2: unsupported scheme")
   550  	}
   551  
   552  	addr := authorityAddr(req.URL.Scheme, req.URL.Host)
   553  	for retry := 0; ; retry++ {
   554  		cc, err := t.connPool().GetClientConn(req, addr)
   555  		if err != nil {
   556  			t.vlogf("http2: Transport failed to get client conn for %s: %v", addr, err)
   557  			return nil, err
   558  		}
   559  		reused := !atomic.CompareAndSwapUint32(&cc.atomicReused, 0, 1)
   560  		traceGotConn(req, cc, reused)
   561  		res, err := cc.RoundTrip(req)
   562  		if err != nil && retry <= 6 {
   563  			roundTripErr := err
   564  			if req, err = shouldRetryRequest(req, err); err == nil {
   565  				// After the first retry, do exponential backoff with 10% jitter.
   566  				if retry == 0 {
   567  					t.vlogf("RoundTrip retrying after failure: %v", roundTripErr)
   568  					continue
   569  				}
   570  				backoff := float64(uint(1) << (uint(retry) - 1))
   571  				backoff += backoff * (0.1 * mathrand.Float64())
   572  				d := time.Second * time.Duration(backoff)
   573  				tm := time.NewTimer(d)
   574  				select {
   575  				case <-tm.C:
   576  					t.vlogf("RoundTrip retrying after failure: %v", roundTripErr)
   577  					continue
   578  				case <-req.Context.Done():
   579  					tm.Stop()
   580  					err = req.Context.Err()
   581  				}
   582  			}
   583  		}
   584  		if err == errClientConnNotEstablished {
   585  			// This ClientConn was created recently,
   586  			// this is the first request to use it,
   587  			// and the connection is closed and not usable.
   588  			//
   589  			// In this state, cc.idleTimer will remove the conn from the pool
   590  			// when it fires. Stop the timer and remove it here so future requests
   591  			// won't try to use this connection.
   592  			//
   593  			// If the timer has already fired and we're racing it, the redundant
   594  			// call to MarkDead is harmless.
   595  			if cc.idleTimer != nil {
   596  				cc.idleTimer.Stop()
   597  			}
   598  			t.connPool().MarkDead(cc)
   599  		}
   600  		if err != nil {
   601  			t.vlogf("RoundTrip failure: %v", err)
   602  			return nil, err
   603  		}
   604  		return res, nil
   605  	}
   606  }
   607  
   608  func (t *Transport) IdleConnStrsForTesting() []string {
   609  	pool, ok := t.connPool().(noDialClientConnPool)
   610  	if !ok {
   611  		return nil
   612  	}
   613  
   614  	var ret []string
   615  	pool.mu.Lock()
   616  	defer pool.mu.Unlock()
   617  	for k, ccs := range pool.conns {
   618  		for _, cc := range ccs {
   619  			if cc.idleState().canTakeNewRequest {
   620  				ret = append(ret, k)
   621  			}
   622  		}
   623  	}
   624  	slices.Sort(ret)
   625  	return ret
   626  }
   627  
   628  // CloseIdleConnections closes any connections which were previously
   629  // connected from previous requests but are now sitting idle.
   630  // It does not interrupt any connections currently in use.
   631  func (t *Transport) CloseIdleConnections() {
   632  	if cp, ok := t.connPool().(clientConnPoolIdleCloser); ok {
   633  		cp.closeIdleConnections()
   634  	}
   635  }
   636  
   637  var (
   638  	errClientConnClosed         = errors.New("http2: client conn is closed")
   639  	errClientConnUnusable       = errors.New("http2: client conn not usable")
   640  	errClientConnNotEstablished = errors.New("http2: client conn could not be established")
   641  	errClientConnGotGoAway      = errors.New("http2: Transport received Server's graceful shutdown GOAWAY")
   642  	errClientConnForceClosed    = errors.New("http2: client connection force closed via ClientConn.Close")
   643  )
   644  
   645  // shouldRetryRequest is called by RoundTrip when a request fails to get
   646  // response headers. It is always called with a non-nil error.
   647  // It returns either a request to retry or an error if the request can't be replayed.
   648  // If the request is retried, it always clones the request (since requests
   649  // contain an unreusable clientStream).
   650  func shouldRetryRequest(req *ClientRequest, err error) (*ClientRequest, error) {
   651  	if !canRetryError(err) {
   652  		return nil, err
   653  	}
   654  	// If the Body is nil (or http.NoBody), it's safe to reuse this request's Body.
   655  	if req.Body == nil || req.Body == NoBody {
   656  		return req.Clone(), nil
   657  	}
   658  
   659  	// If the request body can be reset back to its original
   660  	// state via the optional req.GetBody, do that.
   661  	if req.GetBody != nil {
   662  		body, err := req.GetBody()
   663  		if err != nil {
   664  			return nil, err
   665  		}
   666  		newReq := req.Clone()
   667  		newReq.Body = body
   668  		return newReq, nil
   669  	}
   670  
   671  	// The Request.Body can't reset back to the beginning, but we
   672  	// don't seem to have started to read from it yet, so reuse the body.
   673  	if err == errClientConnUnusable {
   674  		return req.Clone(), nil
   675  	}
   676  
   677  	return nil, fmt.Errorf("http2: Transport: cannot retry err [%v] after Request.Body was written; define Request.GetBody to avoid this error", err)
   678  }
   679  
   680  func canRetryError(err error) bool {
   681  	if err == errClientConnUnusable || err == errClientConnGotGoAway {
   682  		return true
   683  	}
   684  	if se, ok := err.(StreamError); ok {
   685  		return se.Code == ErrCodeRefusedStream
   686  	}
   687  	return false
   688  }
   689  
   690  func (t *Transport) dialClientConn(ctx context.Context, addr string, singleUse bool) (*ClientConn, error) {
   691  	if t.transportTestHooks != nil {
   692  		return t.newClientConn(nil, singleUse, nil)
   693  	}
   694  	host, _, err := net.SplitHostPort(addr)
   695  	if err != nil {
   696  		return nil, err
   697  	}
   698  	tconn, err := t.dialTLS(ctx, "tcp", addr, t.newTLSConfig(host))
   699  	if err != nil {
   700  		return nil, err
   701  	}
   702  	return t.newClientConn(tconn, singleUse, nil)
   703  }
   704  
   705  func (t *Transport) newTLSConfig(host string) *tls.Config {
   706  	cfg := new(tls.Config)
   707  	if t.TLSClientConfig != nil {
   708  		*cfg = *t.TLSClientConfig.Clone()
   709  	}
   710  	if !slices.Contains(cfg.NextProtos, NextProtoTLS) {
   711  		cfg.NextProtos = append([]string{NextProtoTLS}, cfg.NextProtos...)
   712  	}
   713  	if cfg.ServerName == "" {
   714  		cfg.ServerName = host
   715  	}
   716  	return cfg
   717  }
   718  
   719  func (t *Transport) dialTLS(ctx context.Context, network, addr string, tlsCfg *tls.Config) (net.Conn, error) {
   720  	if t.DialTLSContext != nil {
   721  		return t.DialTLSContext(ctx, network, addr, tlsCfg)
   722  	} else if t.DialTLS != nil {
   723  		return t.DialTLS(network, addr, tlsCfg)
   724  	}
   725  
   726  	tlsCn, err := t.dialTLSWithContext(ctx, network, addr, tlsCfg)
   727  	if err != nil {
   728  		return nil, err
   729  	}
   730  	state := tlsCn.ConnectionState()
   731  	if p := state.NegotiatedProtocol; p != NextProtoTLS {
   732  		return nil, fmt.Errorf("http2: unexpected ALPN protocol %q; want %q", p, NextProtoTLS)
   733  	}
   734  	if !state.NegotiatedProtocolIsMutual {
   735  		return nil, errors.New("http2: could not negotiate protocol mutually")
   736  	}
   737  	return tlsCn, nil
   738  }
   739  
   740  // disableKeepAlives reports whether connections should be closed as
   741  // soon as possible after handling the first request.
   742  func (t *Transport) disableKeepAlives() bool {
   743  	return t.t1 != nil && t.t1.DisableKeepAlives()
   744  }
   745  
   746  func (t *Transport) expectContinueTimeout() time.Duration {
   747  	if t.t1 == nil {
   748  		return 0
   749  	}
   750  	return t.t1.ExpectContinueTimeout()
   751  }
   752  
   753  func (t *Transport) NewClientConn(c net.Conn, internalStateHook func()) (NetHTTPClientConn, error) {
   754  	cc, err := t.newClientConn(c, t.disableKeepAlives(), internalStateHook)
   755  	if err != nil {
   756  		return NetHTTPClientConn{}, err
   757  	}
   758  
   759  	// RoundTrip should block when the conn is at its concurrency limit,
   760  	// not return an error. Setting strictMaxConcurrentStreams enables this.
   761  	cc.strictMaxConcurrentStreams = true
   762  
   763  	return NetHTTPClientConn{cc}, nil
   764  }
   765  
   766  func (t *Transport) newClientConn(c net.Conn, singleUse bool, internalStateHook func()) (*ClientConn, error) {
   767  	conf := configFromTransport(t)
   768  	cc := &ClientConn{
   769  		t:                           t,
   770  		tconn:                       c,
   771  		readerDone:                  make(chan struct{}),
   772  		nextStreamID:                1,
   773  		maxFrameSize:                16 << 10, // spec default
   774  		initialWindowSize:           65535,    // spec default
   775  		initialStreamRecvWindowSize: int32(conf.MaxReceiveBufferPerStream),
   776  		maxConcurrentStreams:        initialMaxConcurrentStreams, // "infinite", per spec. Use a smaller value until we have received server settings.
   777  		strictMaxConcurrentStreams:  conf.StrictMaxConcurrentRequests,
   778  		peerMaxHeaderListSize:       0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead.
   779  		streams:                     make(map[uint32]*clientStream),
   780  		singleUse:                   singleUse,
   781  		seenSettingsChan:            make(chan struct{}),
   782  		wantSettingsAck:             true,
   783  		readIdleTimeout:             conf.SendPingTimeout,
   784  		pingTimeout:                 conf.PingTimeout,
   785  		pings:                       make(map[[8]byte]chan struct{}),
   786  		reqHeaderMu:                 make(chan struct{}, 1),
   787  		lastActive:                  time.Now(),
   788  		internalStateHook:           internalStateHook,
   789  	}
   790  	if t.transportTestHooks != nil {
   791  		t.transportTestHooks.newclientconn(cc)
   792  		c = cc.tconn
   793  	}
   794  	if VerboseLogs {
   795  		t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr())
   796  	}
   797  
   798  	cc.cond = sync.NewCond(&cc.mu)
   799  	cc.flow.add(int32(initialWindowSize))
   800  
   801  	// TODO: adjust this writer size to account for frame size +
   802  	// MTU + crypto/tls record padding.
   803  	cc.bw = bufio.NewWriter(stickyErrWriter{
   804  		conn:    c,
   805  		timeout: conf.WriteByteTimeout,
   806  		err:     &cc.werr,
   807  	})
   808  	cc.br = bufio.NewReader(c)
   809  	cc.fr = NewFramer(cc.bw, cc.br)
   810  	cc.fr.SetMaxReadFrameSize(uint32(conf.MaxReadFrameSize))
   811  	if t.CountError != nil {
   812  		cc.fr.countError = t.CountError
   813  	}
   814  	maxHeaderTableSize := uint32(conf.MaxDecoderHeaderTableSize)
   815  	cc.fr.ReadMetaHeaders = hpack.NewDecoder(maxHeaderTableSize, nil)
   816  	cc.fr.MaxHeaderListSize = t.maxHeaderListSize()
   817  
   818  	cc.henc = hpack.NewEncoder(&cc.hbuf)
   819  	cc.henc.SetMaxDynamicTableSizeLimit(uint32(conf.MaxEncoderHeaderTableSize))
   820  	cc.peerMaxHeaderTableSize = initialHeaderTableSize
   821  
   822  	if cs, ok := c.(connectionStater); ok {
   823  		state := cs.ConnectionState()
   824  		cc.tlsState = &state
   825  	}
   826  
   827  	initialSettings := []Setting{
   828  		{ID: SettingEnablePush, Val: 0},
   829  		{ID: SettingInitialWindowSize, Val: uint32(cc.initialStreamRecvWindowSize)},
   830  	}
   831  	initialSettings = append(initialSettings, Setting{ID: SettingMaxFrameSize, Val: uint32(conf.MaxReadFrameSize)})
   832  	if max := t.maxHeaderListSize(); max != 0 {
   833  		initialSettings = append(initialSettings, Setting{ID: SettingMaxHeaderListSize, Val: max})
   834  	}
   835  	if maxHeaderTableSize != initialHeaderTableSize {
   836  		initialSettings = append(initialSettings, Setting{ID: SettingHeaderTableSize, Val: maxHeaderTableSize})
   837  	}
   838  
   839  	cc.bw.Write(clientPreface)
   840  	cc.fr.WriteSettings(initialSettings...)
   841  	cc.fr.WriteWindowUpdate(0, uint32(conf.MaxReceiveBufferPerConnection))
   842  	cc.inflow.init(int32(conf.MaxReceiveBufferPerConnection) + initialWindowSize)
   843  	cc.bw.Flush()
   844  	if cc.werr != nil {
   845  		cc.Close()
   846  		return nil, cc.werr
   847  	}
   848  
   849  	// Start the idle timer after the connection is fully initialized.
   850  	if d := t.idleConnTimeout(); d != 0 {
   851  		cc.idleTimeout = d
   852  		cc.idleTimer = time.AfterFunc(d, cc.onIdleTimeout)
   853  	}
   854  
   855  	go cc.readLoop()
   856  	return cc, nil
   857  }
   858  
   859  func (cc *ClientConn) healthCheck() {
   860  	pingTimeout := cc.pingTimeout
   861  	// We don't need to periodically ping in the health check, because the readLoop of ClientConn will
   862  	// trigger the healthCheck again if there is no frame received.
   863  	ctx, cancel := context.WithTimeout(context.Background(), pingTimeout)
   864  	defer cancel()
   865  	cc.vlogf("http2: Transport sending health check")
   866  	err := cc.Ping(ctx)
   867  	if err != nil {
   868  		cc.vlogf("http2: Transport health check failure: %v", err)
   869  		cc.closeForLostPing()
   870  	} else {
   871  		cc.vlogf("http2: Transport health check success")
   872  	}
   873  }
   874  
   875  // SetDoNotReuse marks cc as not reusable for future HTTP requests.
   876  func (cc *ClientConn) SetDoNotReuse() {
   877  	cc.mu.Lock()
   878  	defer cc.mu.Unlock()
   879  	cc.doNotReuse = true
   880  }
   881  
   882  func (cc *ClientConn) setGoAway(f *GoAwayFrame) {
   883  	cc.mu.Lock()
   884  	defer cc.mu.Unlock()
   885  
   886  	old := cc.goAway
   887  	cc.goAway = f
   888  
   889  	// Merge the previous and current GoAway error frames.
   890  	if cc.goAwayDebug == "" {
   891  		cc.goAwayDebug = string(f.DebugData())
   892  	}
   893  	if old != nil && old.ErrCode != ErrCodeNo {
   894  		cc.goAway.ErrCode = old.ErrCode
   895  	}
   896  	last := f.LastStreamID
   897  	for streamID, cs := range cc.streams {
   898  		if streamID <= last {
   899  			// The server's GOAWAY indicates that it received this stream.
   900  			// It will either finish processing it, or close the connection
   901  			// without doing so. Either way, leave the stream alone for now.
   902  			continue
   903  		}
   904  		if streamID == 1 && cc.goAway.ErrCode != ErrCodeNo {
   905  			// Don't retry the first stream on a connection if we get a non-NO error.
   906  			// If the server is sending an error on a new connection,
   907  			// retrying the request on a new one probably isn't going to work.
   908  			cs.abortStreamLocked(fmt.Errorf("http2: Transport received GOAWAY from server ErrCode:%v", cc.goAway.ErrCode))
   909  		} else {
   910  			// Aborting the stream with errClentConnGotGoAway indicates that
   911  			// the request should be retried on a new connection.
   912  			cs.abortStreamLocked(errClientConnGotGoAway)
   913  		}
   914  	}
   915  }
   916  
   917  // CanTakeNewRequest reports whether the connection can take a new request,
   918  // meaning it has not been closed or received or sent a GOAWAY.
   919  //
   920  // If the caller is going to immediately make a new request on this
   921  // connection, use ReserveNewRequest instead.
   922  func (cc *ClientConn) CanTakeNewRequest() bool {
   923  	cc.mu.Lock()
   924  	defer cc.mu.Unlock()
   925  	return cc.canTakeNewRequestLocked()
   926  }
   927  
   928  // ReserveNewRequest is like CanTakeNewRequest but also reserves a
   929  // concurrent stream in cc. The reservation is decremented on the
   930  // next call to RoundTrip.
   931  func (cc *ClientConn) ReserveNewRequest() bool {
   932  	cc.mu.Lock()
   933  	defer cc.mu.Unlock()
   934  	if st := cc.idleStateLocked(); !st.canTakeNewRequest {
   935  		return false
   936  	}
   937  	cc.streamsReserved++
   938  	return true
   939  }
   940  
   941  // ClientConnState describes the state of a ClientConn.
   942  type ClientConnState struct {
   943  	// Closed is whether the connection is closed.
   944  	Closed bool
   945  
   946  	// Closing is whether the connection is in the process of
   947  	// closing. It may be closing due to shutdown, being a
   948  	// single-use connection, being marked as DoNotReuse, or
   949  	// having received a GOAWAY frame.
   950  	Closing bool
   951  
   952  	// StreamsActive is how many streams are active.
   953  	StreamsActive int
   954  
   955  	// StreamsReserved is how many streams have been reserved via
   956  	// ClientConn.ReserveNewRequest.
   957  	StreamsReserved int
   958  
   959  	// StreamsPending is how many requests have been sent in excess
   960  	// of the peer's advertised MaxConcurrentStreams setting and
   961  	// are waiting for other streams to complete.
   962  	StreamsPending int
   963  
   964  	// MaxConcurrentStreams is how many concurrent streams the
   965  	// peer advertised as acceptable. Zero means no SETTINGS
   966  	// frame has been received yet.
   967  	MaxConcurrentStreams uint32
   968  
   969  	// LastIdle, if non-zero, is when the connection last
   970  	// transitioned to idle state.
   971  	LastIdle time.Time
   972  }
   973  
   974  // State returns a snapshot of cc's state.
   975  func (cc *ClientConn) State() ClientConnState {
   976  	cc.wmu.Lock()
   977  	maxConcurrent := cc.maxConcurrentStreams
   978  	if !cc.seenSettings {
   979  		maxConcurrent = 0
   980  	}
   981  	cc.wmu.Unlock()
   982  
   983  	cc.mu.Lock()
   984  	defer cc.mu.Unlock()
   985  	return ClientConnState{
   986  		Closed:               cc.closed,
   987  		Closing:              cc.closing || cc.singleUse || cc.doNotReuse || cc.goAway != nil,
   988  		StreamsActive:        len(cc.streams) + cc.pendingResets,
   989  		StreamsReserved:      cc.streamsReserved,
   990  		StreamsPending:       cc.pendingRequests,
   991  		LastIdle:             cc.lastIdle,
   992  		MaxConcurrentStreams: maxConcurrent,
   993  	}
   994  }
   995  
   996  // clientConnIdleState describes the suitability of a client
   997  // connection to initiate a new RoundTrip request.
   998  type clientConnIdleState struct {
   999  	canTakeNewRequest bool
  1000  }
  1001  
  1002  func (cc *ClientConn) idleState() clientConnIdleState {
  1003  	cc.mu.Lock()
  1004  	defer cc.mu.Unlock()
  1005  	return cc.idleStateLocked()
  1006  }
  1007  
  1008  func (cc *ClientConn) idleStateLocked() (st clientConnIdleState) {
  1009  	if cc.singleUse && cc.nextStreamID > 1 {
  1010  		return
  1011  	}
  1012  	var maxConcurrentOkay bool
  1013  	if cc.strictMaxConcurrentStreams {
  1014  		// We'll tell the caller we can take a new request to
  1015  		// prevent the caller from dialing a new TCP
  1016  		// connection, but then we'll block later before
  1017  		// writing it.
  1018  		maxConcurrentOkay = true
  1019  	} else {
  1020  		// We can take a new request if the total of
  1021  		//   - active streams;
  1022  		//   - reservation slots for new streams; and
  1023  		//   - streams for which we have sent a RST_STREAM and a PING,
  1024  		//     but received no subsequent frame
  1025  		// is less than the concurrency limit.
  1026  		maxConcurrentOkay = cc.currentRequestCountLocked() < int(cc.maxConcurrentStreams)
  1027  	}
  1028  
  1029  	st.canTakeNewRequest = maxConcurrentOkay && cc.isUsableLocked()
  1030  
  1031  	// If this connection has never been used for a request and is closed,
  1032  	// then let it take a request (which will fail).
  1033  	// If the conn was closed for idleness, we're racing the idle timer;
  1034  	// don't try to use the conn. (Issue #70515.)
  1035  	//
  1036  	// This avoids a situation where an error early in a connection's lifetime
  1037  	// goes unreported.
  1038  	if cc.nextStreamID == 1 && cc.streamsReserved == 0 && cc.closed && !cc.closedOnIdle {
  1039  		st.canTakeNewRequest = true
  1040  	}
  1041  
  1042  	return
  1043  }
  1044  
  1045  func (cc *ClientConn) isUsableLocked() bool {
  1046  	return cc.goAway == nil &&
  1047  		!cc.closed &&
  1048  		!cc.closing &&
  1049  		!cc.doNotReuse &&
  1050  		int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32 &&
  1051  		!cc.tooIdleLocked()
  1052  }
  1053  
  1054  // canReserveLocked reports whether a net/http.ClientConn can reserve a slot on this conn.
  1055  //
  1056  // This follows slightly different rules than clientConnIdleState.canTakeNewRequest.
  1057  // We only permit reservations up to the conn's concurrency limit.
  1058  // This differs from ClientConn.ReserveNewRequest, which permits reservations
  1059  // past the limit when StrictMaxConcurrentStreams is set.
  1060  func (cc *ClientConn) canReserveLocked() bool {
  1061  	if cc.currentRequestCountLocked() >= int(cc.maxConcurrentStreams) {
  1062  		return false
  1063  	}
  1064  	if !cc.isUsableLocked() {
  1065  		return false
  1066  	}
  1067  	return true
  1068  }
  1069  
  1070  // currentRequestCountLocked reports the number of concurrency slots currently in use,
  1071  // including active streams, reserved slots, and reset streams waiting for acknowledgement.
  1072  func (cc *ClientConn) currentRequestCountLocked() int {
  1073  	return len(cc.streams) + cc.streamsReserved + cc.pendingResets
  1074  }
  1075  
  1076  func (cc *ClientConn) canTakeNewRequestLocked() bool {
  1077  	st := cc.idleStateLocked()
  1078  	return st.canTakeNewRequest
  1079  }
  1080  
  1081  // availableLocked reports the number of concurrency slots available.
  1082  func (cc *ClientConn) availableLocked() int {
  1083  	if !cc.canTakeNewRequestLocked() {
  1084  		return 0
  1085  	}
  1086  	return max(0, int(cc.maxConcurrentStreams)-cc.currentRequestCountLocked())
  1087  }
  1088  
  1089  // tooIdleLocked reports whether this connection has been been sitting idle
  1090  // for too much wall time.
  1091  func (cc *ClientConn) tooIdleLocked() bool {
  1092  	// The Round(0) strips the monontonic clock reading so the
  1093  	// times are compared based on their wall time. We don't want
  1094  	// to reuse a connection that's been sitting idle during
  1095  	// VM/laptop suspend if monotonic time was also frozen.
  1096  	return cc.idleTimeout != 0 && !cc.lastIdle.IsZero() && time.Since(cc.lastIdle.Round(0)) > cc.idleTimeout
  1097  }
  1098  
  1099  // onIdleTimeout is called from a time.AfterFunc goroutine. It will
  1100  // only be called when we're idle, but because we're coming from a new
  1101  // goroutine, there could be a new request coming in at the same time,
  1102  // so this simply calls the synchronized closeIfIdle to shut down this
  1103  // connection. The timer could just call closeIfIdle, but this is more
  1104  // clear.
  1105  func (cc *ClientConn) onIdleTimeout() {
  1106  	cc.closeIfIdle()
  1107  }
  1108  
  1109  func (cc *ClientConn) closeConn() {
  1110  	t := time.AfterFunc(250*time.Millisecond, cc.forceCloseConn)
  1111  	defer t.Stop()
  1112  	cc.tconn.Close()
  1113  	cc.maybeCallStateHook()
  1114  }
  1115  
  1116  // A tls.Conn.Close can hang for a long time if the peer is unresponsive.
  1117  // Try to shut it down more aggressively.
  1118  func (cc *ClientConn) forceCloseConn() {
  1119  	tc, ok := cc.tconn.(*tls.Conn)
  1120  	if !ok {
  1121  		return
  1122  	}
  1123  	if nc := tc.NetConn(); nc != nil {
  1124  		nc.Close()
  1125  	}
  1126  }
  1127  
  1128  func (cc *ClientConn) closeIfIdle() {
  1129  	cc.mu.Lock()
  1130  	if len(cc.streams) > 0 || cc.streamsReserved > 0 {
  1131  		cc.mu.Unlock()
  1132  		return
  1133  	}
  1134  	cc.closed = true
  1135  	cc.closedOnIdle = true
  1136  	nextID := cc.nextStreamID
  1137  	// TODO: do clients send GOAWAY too? maybe? Just Close:
  1138  	cc.mu.Unlock()
  1139  
  1140  	if VerboseLogs {
  1141  		cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, nextID-2)
  1142  	}
  1143  	cc.closeConn()
  1144  }
  1145  
  1146  func (cc *ClientConn) isDoNotReuseAndIdle() bool {
  1147  	cc.mu.Lock()
  1148  	defer cc.mu.Unlock()
  1149  	return cc.doNotReuse && len(cc.streams) == 0
  1150  }
  1151  
  1152  var shutdownEnterWaitStateHook = func() {}
  1153  
  1154  // Shutdown gracefully closes the client connection, waiting for running streams to complete.
  1155  func (cc *ClientConn) Shutdown(ctx context.Context) error {
  1156  	if err := cc.sendGoAway(); err != nil {
  1157  		return err
  1158  	}
  1159  	// Wait for all in-flight streams to complete or connection to close
  1160  	done := make(chan struct{})
  1161  	cancelled := false // guarded by cc.mu
  1162  	go func() {
  1163  		cc.mu.Lock()
  1164  		defer cc.mu.Unlock()
  1165  		for {
  1166  			if len(cc.streams) == 0 || cc.closed {
  1167  				cc.closed = true
  1168  				close(done)
  1169  				break
  1170  			}
  1171  			if cancelled {
  1172  				break
  1173  			}
  1174  			cc.cond.Wait()
  1175  		}
  1176  	}()
  1177  	shutdownEnterWaitStateHook()
  1178  	select {
  1179  	case <-done:
  1180  		cc.closeConn()
  1181  		return nil
  1182  	case <-ctx.Done():
  1183  		cc.mu.Lock()
  1184  		// Free the goroutine above
  1185  		cancelled = true
  1186  		cc.cond.Broadcast()
  1187  		cc.mu.Unlock()
  1188  		return ctx.Err()
  1189  	}
  1190  }
  1191  
  1192  func (cc *ClientConn) sendGoAway() error {
  1193  	cc.mu.Lock()
  1194  	closing := cc.closing
  1195  	cc.closing = true
  1196  	maxStreamID := cc.nextStreamID
  1197  	cc.mu.Unlock()
  1198  	if closing {
  1199  		// GOAWAY sent already
  1200  		return nil
  1201  	}
  1202  
  1203  	cc.wmu.Lock()
  1204  	defer cc.wmu.Unlock()
  1205  	// Send a graceful shutdown frame to server
  1206  	if err := cc.fr.WriteGoAway(maxStreamID, ErrCodeNo, nil); err != nil {
  1207  		return err
  1208  	}
  1209  	if err := cc.bw.Flush(); err != nil {
  1210  		return err
  1211  	}
  1212  	// Prevent new requests
  1213  	return nil
  1214  }
  1215  
  1216  // closes the client connection immediately. In-flight requests are interrupted.
  1217  // err is sent to streams.
  1218  func (cc *ClientConn) closeForError(err error) {
  1219  	cc.mu.Lock()
  1220  	cc.closed = true
  1221  	for _, cs := range cc.streams {
  1222  		cs.abortStreamLocked(err)
  1223  	}
  1224  	cc.cond.Broadcast()
  1225  	cc.mu.Unlock()
  1226  	cc.closeConn()
  1227  }
  1228  
  1229  // Close closes the client connection immediately.
  1230  //
  1231  // In-flight requests are interrupted. For a graceful shutdown, use Shutdown instead.
  1232  func (cc *ClientConn) Close() error {
  1233  	cc.closeForError(errClientConnForceClosed)
  1234  	return nil
  1235  }
  1236  
  1237  // closes the client connection immediately. In-flight requests are interrupted.
  1238  func (cc *ClientConn) closeForLostPing() {
  1239  	err := errors.New("http2: client connection lost")
  1240  	if f := cc.t.CountError; f != nil {
  1241  		f("conn_close_lost_ping")
  1242  	}
  1243  	cc.closeForError(err)
  1244  }
  1245  
  1246  // errRequestCanceled is a copy of net/http's errRequestCanceled because it's not
  1247  // exported. At least they'll be DeepEqual for h1-vs-h2 comparisons tests.
  1248  var errRequestCanceled = internal.ErrRequestCanceled
  1249  
  1250  func (cc *ClientConn) responseHeaderTimeout() time.Duration {
  1251  	if cc.t.t1 != nil {
  1252  		return cc.t.t1.ResponseHeaderTimeout()
  1253  	}
  1254  	// No way to do this (yet?) with just an http2.Transport. Probably
  1255  	// no need. Request.Cancel this is the new way. We only need to support
  1256  	// this for compatibility with the old http.Transport fields when
  1257  	// we're doing transparent http2.
  1258  	return 0
  1259  }
  1260  
  1261  // actualContentLength returns a sanitized version of
  1262  // req.ContentLength, where 0 actually means zero (not unknown) and -1
  1263  // means unknown.
  1264  func actualContentLength(req *ClientRequest) int64 {
  1265  	if req.Body == nil || req.Body == NoBody {
  1266  		return 0
  1267  	}
  1268  	if req.ContentLength != 0 {
  1269  		return req.ContentLength
  1270  	}
  1271  	return -1
  1272  }
  1273  
  1274  func (cc *ClientConn) decrStreamReservations() {
  1275  	cc.mu.Lock()
  1276  	defer cc.mu.Unlock()
  1277  	cc.decrStreamReservationsLocked()
  1278  }
  1279  
  1280  func (cc *ClientConn) decrStreamReservationsLocked() {
  1281  	if cc.streamsReserved > 0 {
  1282  		cc.streamsReserved--
  1283  	}
  1284  }
  1285  
  1286  func (cc *ClientConn) RoundTrip(req *ClientRequest) (*ClientResponse, error) {
  1287  	return cc.roundTrip(req, nil)
  1288  }
  1289  
  1290  func (cc *ClientConn) roundTrip(req *ClientRequest, streamf func(*clientStream)) (*ClientResponse, error) {
  1291  	ctx := req.Context
  1292  	req.stream = clientStream{
  1293  		cc:                   cc,
  1294  		ctx:                  ctx,
  1295  		reqCancel:            req.Cancel,
  1296  		isHead:               req.Method == "HEAD",
  1297  		reqBody:              req.Body,
  1298  		reqBodyContentLength: actualContentLength(req),
  1299  		trace:                httptrace.ContextClientTrace(ctx),
  1300  		peerClosed:           make(chan struct{}),
  1301  		abort:                make(chan struct{}),
  1302  		respHeaderRecv:       make(chan struct{}),
  1303  		donec:                make(chan struct{}),
  1304  		resTrailer:           req.ResTrailer,
  1305  	}
  1306  	cs := &req.stream
  1307  
  1308  	cs.requestedGzip = httpcommon.IsRequestGzip(req.Method, req.Header, cc.t.disableCompression())
  1309  
  1310  	go cs.doRequest(req, streamf)
  1311  
  1312  	waitDone := func() error {
  1313  		select {
  1314  		case <-cs.donec:
  1315  			return nil
  1316  		case <-ctx.Done():
  1317  			return ctx.Err()
  1318  		case <-cs.reqCancel:
  1319  			return errRequestCanceled
  1320  		}
  1321  	}
  1322  
  1323  	handleResponseHeaders := func() (*ClientResponse, error) {
  1324  		res := cs.res
  1325  		if res.StatusCode > 299 {
  1326  			// On error or status code 3xx, 4xx, 5xx, etc abort any
  1327  			// ongoing write, assuming that the server doesn't care
  1328  			// about our request body. If the server replied with 1xx or
  1329  			// 2xx, however, then assume the server DOES potentially
  1330  			// want our body (e.g. full-duplex streaming:
  1331  			// golang.org/issue/13444). If it turns out the server
  1332  			// doesn't, they'll RST_STREAM us soon enough. This is a
  1333  			// heuristic to avoid adding knobs to Transport. Hopefully
  1334  			// we can keep it.
  1335  			cs.abortRequestBodyWrite()
  1336  		}
  1337  		res.TLS = cc.tlsState
  1338  		if res.Body == NoBody && actualContentLength(req) == 0 {
  1339  			// If there isn't a request or response body still being
  1340  			// written, then wait for the stream to be closed before
  1341  			// RoundTrip returns.
  1342  			if err := waitDone(); err != nil {
  1343  				return nil, err
  1344  			}
  1345  		}
  1346  		return res, nil
  1347  	}
  1348  
  1349  	cancelRequest := func(cs *clientStream, err error) error {
  1350  		cs.cc.mu.Lock()
  1351  		bodyClosed := cs.reqBodyClosed
  1352  		cs.cc.mu.Unlock()
  1353  		// Wait for the request body to be closed.
  1354  		//
  1355  		// If nothing closed the body before now, abortStreamLocked
  1356  		// will have started a goroutine to close it.
  1357  		//
  1358  		// Closing the body before returning avoids a race condition
  1359  		// with net/http checking its readTrackingBody to see if the
  1360  		// body was read from or closed. See golang/go#60041.
  1361  		//
  1362  		// The body is closed in a separate goroutine without the
  1363  		// connection mutex held, but dropping the mutex before waiting
  1364  		// will keep us from holding it indefinitely if the body
  1365  		// close is slow for some reason.
  1366  		if bodyClosed != nil {
  1367  			<-bodyClosed
  1368  		}
  1369  		return err
  1370  	}
  1371  
  1372  	for {
  1373  		select {
  1374  		case <-cs.respHeaderRecv:
  1375  			return handleResponseHeaders()
  1376  		case <-cs.abort:
  1377  			select {
  1378  			case <-cs.respHeaderRecv:
  1379  				// If both cs.respHeaderRecv and cs.abort are signaling,
  1380  				// pick respHeaderRecv. The server probably wrote the
  1381  				// response and immediately reset the stream.
  1382  				// golang.org/issue/49645
  1383  				return handleResponseHeaders()
  1384  			default:
  1385  				waitDone()
  1386  				return nil, cs.abortErr
  1387  			}
  1388  		case <-ctx.Done():
  1389  			err := ctx.Err()
  1390  			cs.abortStream(err)
  1391  			return nil, cancelRequest(cs, err)
  1392  		case <-cs.reqCancel:
  1393  			cs.abortStream(errRequestCanceled)
  1394  			return nil, cancelRequest(cs, errRequestCanceled)
  1395  		}
  1396  	}
  1397  }
  1398  
  1399  // doRequest runs for the duration of the request lifetime.
  1400  //
  1401  // It sends the request and performs post-request cleanup (closing Request.Body, etc.).
  1402  func (cs *clientStream) doRequest(req *ClientRequest, streamf func(*clientStream)) {
  1403  	err := cs.writeRequest(req, streamf)
  1404  	cs.cleanupWriteRequest(err)
  1405  }
  1406  
  1407  var errExtendedConnectNotSupported = errors.New("net/http: extended connect not supported by peer")
  1408  
  1409  // writeRequest sends a request.
  1410  //
  1411  // It returns nil after the request is written, the response read,
  1412  // and the request stream is half-closed by the peer.
  1413  //
  1414  // It returns non-nil if the request ends otherwise.
  1415  // If the returned error is StreamError, the error Code may be used in resetting the stream.
  1416  func (cs *clientStream) writeRequest(req *ClientRequest, streamf func(*clientStream)) (err error) {
  1417  	cc := cs.cc
  1418  	ctx := cs.ctx
  1419  
  1420  	// wait for setting frames to be received, a server can change this value later,
  1421  	// but we just wait for the first settings frame
  1422  	var isExtendedConnect bool
  1423  	if req.Method == "CONNECT" && req.Header.Get(":protocol") != "" {
  1424  		isExtendedConnect = true
  1425  	}
  1426  
  1427  	// Acquire the new-request lock by writing to reqHeaderMu.
  1428  	// This lock guards the critical section covering allocating a new stream ID
  1429  	// (requires mu) and creating the stream (requires wmu).
  1430  	if cc.reqHeaderMu == nil {
  1431  		panic("RoundTrip on uninitialized ClientConn") // for tests
  1432  	}
  1433  	if isExtendedConnect {
  1434  		select {
  1435  		case <-cs.reqCancel:
  1436  			return errRequestCanceled
  1437  		case <-ctx.Done():
  1438  			return ctx.Err()
  1439  		case <-cc.seenSettingsChan:
  1440  			if !cc.extendedConnectAllowed {
  1441  				return errExtendedConnectNotSupported
  1442  			}
  1443  		}
  1444  	}
  1445  	select {
  1446  	case cc.reqHeaderMu <- struct{}{}:
  1447  	case <-cs.reqCancel:
  1448  		return errRequestCanceled
  1449  	case <-ctx.Done():
  1450  		return ctx.Err()
  1451  	}
  1452  
  1453  	cc.mu.Lock()
  1454  	if cc.idleTimer != nil {
  1455  		cc.idleTimer.Stop()
  1456  	}
  1457  	cc.decrStreamReservationsLocked()
  1458  	if err := cc.awaitOpenSlotForStreamLocked(cs); err != nil {
  1459  		cc.mu.Unlock()
  1460  		<-cc.reqHeaderMu
  1461  		return err
  1462  	}
  1463  	cc.addStreamLocked(cs) // assigns stream ID
  1464  	if isConnectionCloseRequest(req) {
  1465  		cc.doNotReuse = true
  1466  	}
  1467  	cc.mu.Unlock()
  1468  
  1469  	if streamf != nil {
  1470  		streamf(cs)
  1471  	}
  1472  
  1473  	continueTimeout := cc.t.expectContinueTimeout()
  1474  	if continueTimeout != 0 {
  1475  		if !httpguts.HeaderValuesContainsToken(req.Header["Expect"], "100-continue") {
  1476  			continueTimeout = 0
  1477  		} else {
  1478  			cs.on100 = make(chan struct{}, 1)
  1479  		}
  1480  	}
  1481  
  1482  	// Past this point (where we send request headers), it is possible for
  1483  	// RoundTrip to return successfully. Since the RoundTrip contract permits
  1484  	// the caller to "mutate or reuse" the Request after closing the Response's Body,
  1485  	// we must take care when referencing the Request from here on.
  1486  	err = cs.encodeAndWriteHeaders(req)
  1487  	<-cc.reqHeaderMu
  1488  	if err != nil {
  1489  		return err
  1490  	}
  1491  
  1492  	hasBody := cs.reqBodyContentLength != 0
  1493  	if !hasBody {
  1494  		cs.sentEndStream = true
  1495  	} else {
  1496  		if continueTimeout != 0 {
  1497  			traceWait100Continue(cs.trace)
  1498  			timer := time.NewTimer(continueTimeout)
  1499  			select {
  1500  			case <-timer.C:
  1501  				err = nil
  1502  			case <-cs.on100:
  1503  				err = nil
  1504  			case <-cs.abort:
  1505  				err = cs.abortErr
  1506  			case <-ctx.Done():
  1507  				err = ctx.Err()
  1508  			case <-cs.reqCancel:
  1509  				err = errRequestCanceled
  1510  			}
  1511  			timer.Stop()
  1512  			if err != nil {
  1513  				traceWroteRequest(cs.trace, err)
  1514  				return err
  1515  			}
  1516  		}
  1517  
  1518  		if err = cs.writeRequestBody(req); err != nil {
  1519  			if err != errStopReqBodyWrite {
  1520  				traceWroteRequest(cs.trace, err)
  1521  				return err
  1522  			}
  1523  		} else {
  1524  			cs.sentEndStream = true
  1525  		}
  1526  	}
  1527  
  1528  	traceWroteRequest(cs.trace, err)
  1529  
  1530  	var respHeaderTimer <-chan time.Time
  1531  	var respHeaderRecv chan struct{}
  1532  	if d := cc.responseHeaderTimeout(); d != 0 {
  1533  		timer := time.NewTimer(d)
  1534  		defer timer.Stop()
  1535  		respHeaderTimer = timer.C
  1536  		respHeaderRecv = cs.respHeaderRecv
  1537  	}
  1538  	// Wait until the peer half-closes its end of the stream,
  1539  	// or until the request is aborted (via context, error, or otherwise),
  1540  	// whichever comes first.
  1541  	for {
  1542  		select {
  1543  		case <-cs.peerClosed:
  1544  			return nil
  1545  		case <-respHeaderTimer:
  1546  			return errTimeout
  1547  		case <-respHeaderRecv:
  1548  			respHeaderRecv = nil
  1549  			respHeaderTimer = nil // keep waiting for END_STREAM
  1550  		case <-cs.abort:
  1551  			return cs.abortErr
  1552  		case <-ctx.Done():
  1553  			return ctx.Err()
  1554  		case <-cs.reqCancel:
  1555  			return errRequestCanceled
  1556  		}
  1557  	}
  1558  }
  1559  
  1560  func (cs *clientStream) encodeAndWriteHeaders(req *ClientRequest) error {
  1561  	cc := cs.cc
  1562  	ctx := cs.ctx
  1563  
  1564  	cc.wmu.Lock()
  1565  	defer cc.wmu.Unlock()
  1566  
  1567  	// If the request was canceled while waiting for cc.mu, just quit.
  1568  	select {
  1569  	case <-cs.abort:
  1570  		return cs.abortErr
  1571  	case <-ctx.Done():
  1572  		return ctx.Err()
  1573  	case <-cs.reqCancel:
  1574  		return errRequestCanceled
  1575  	default:
  1576  	}
  1577  
  1578  	// Encode headers.
  1579  	//
  1580  	// we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is
  1581  	// sent by writeRequestBody below, along with any Trailers,
  1582  	// again in form HEADERS{1}, CONTINUATION{0,})
  1583  	cc.hbuf.Reset()
  1584  	res, err := encodeRequestHeaders(req, cs.requestedGzip, cc.peerMaxHeaderListSize, func(name, value string) {
  1585  		cc.writeHeader(name, value)
  1586  	})
  1587  	if err != nil {
  1588  		return fmt.Errorf("http2: %w", err)
  1589  	}
  1590  	hdrs := cc.hbuf.Bytes()
  1591  
  1592  	// Write the request.
  1593  	endStream := !res.HasBody && !res.HasTrailers
  1594  	cs.sentHeaders = true
  1595  	err = cc.writeHeaders(cs.ID, endStream, int(cc.maxFrameSize), hdrs)
  1596  	traceWroteHeaders(cs.trace)
  1597  	return err
  1598  }
  1599  
  1600  func encodeRequestHeaders(req *ClientRequest, addGzipHeader bool, peerMaxHeaderListSize uint64, headerf func(name, value string)) (httpcommon.EncodeHeadersResult, error) {
  1601  	return httpcommon.EncodeHeaders(req.Context, httpcommon.EncodeHeadersParam{
  1602  		Request: httpcommon.Request{
  1603  			Header:              req.Header,
  1604  			Trailer:             req.Trailer,
  1605  			URL:                 req.URL,
  1606  			Host:                req.Host,
  1607  			Method:              req.Method,
  1608  			ActualContentLength: actualContentLength(req),
  1609  		},
  1610  		AddGzipHeader:         addGzipHeader,
  1611  		PeerMaxHeaderListSize: peerMaxHeaderListSize,
  1612  		DefaultUserAgent:      defaultUserAgent,
  1613  	}, headerf)
  1614  }
  1615  
  1616  // cleanupWriteRequest performs post-request tasks.
  1617  //
  1618  // If err (the result of writeRequest) is non-nil and the stream is not closed,
  1619  // cleanupWriteRequest will send a reset to the peer.
  1620  func (cs *clientStream) cleanupWriteRequest(err error) {
  1621  	cc := cs.cc
  1622  
  1623  	if cs.ID == 0 {
  1624  		// We were canceled before creating the stream, so return our reservation.
  1625  		cc.decrStreamReservations()
  1626  	}
  1627  
  1628  	// TODO: write h12Compare test showing whether
  1629  	// Request.Body is closed by the Transport,
  1630  	// and in multiple cases: server replies <=299 and >299
  1631  	// while still writing request body
  1632  	cc.mu.Lock()
  1633  	mustCloseBody := false
  1634  	if cs.reqBody != nil && cs.reqBodyClosed == nil {
  1635  		mustCloseBody = true
  1636  		cs.reqBodyClosed = make(chan struct{})
  1637  	}
  1638  	bodyClosed := cs.reqBodyClosed
  1639  	closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() || cc.goAway != nil
  1640  	// Have we read any frames from the connection since sending this request?
  1641  	readSinceStream := cc.readBeforeStreamID > cs.ID
  1642  	cc.mu.Unlock()
  1643  	if mustCloseBody {
  1644  		cs.reqBody.Close()
  1645  		close(bodyClosed)
  1646  	}
  1647  	if bodyClosed != nil {
  1648  		<-bodyClosed
  1649  	}
  1650  
  1651  	if err != nil && cs.sentEndStream {
  1652  		// If the connection is closed immediately after the response is read,
  1653  		// we may be aborted before finishing up here. If the stream was closed
  1654  		// cleanly on both sides, there is no error.
  1655  		select {
  1656  		case <-cs.peerClosed:
  1657  			err = nil
  1658  		default:
  1659  		}
  1660  	}
  1661  	if err != nil {
  1662  		cs.abortStream(err) // possibly redundant, but harmless
  1663  		if cs.sentHeaders {
  1664  			if se, ok := err.(StreamError); ok {
  1665  				if se.Cause != errFromPeer {
  1666  					cc.writeStreamReset(cs.ID, se.Code, false, err)
  1667  				}
  1668  			} else {
  1669  				// We're cancelling an in-flight request.
  1670  				//
  1671  				// This could be due to the server becoming unresponsive.
  1672  				// To avoid sending too many requests on a dead connection,
  1673  				// if we haven't read any frames from the connection since
  1674  				// sending this request, we let it continue to consume
  1675  				// a concurrency slot until we can confirm the server is
  1676  				// still responding.
  1677  				// We do this by sending a PING frame along with the RST_STREAM
  1678  				// (unless a ping is already in flight).
  1679  				//
  1680  				// For simplicity, we don't bother tracking the PING payload:
  1681  				// We reset cc.pendingResets any time we receive a PING ACK.
  1682  				//
  1683  				// We skip this if the conn is going to be closed on idle,
  1684  				// because it's short lived and will probably be closed before
  1685  				// we get the ping response.
  1686  				ping := false
  1687  				if !closeOnIdle && !readSinceStream {
  1688  					cc.mu.Lock()
  1689  					// rstStreamPingsBlocked works around a gRPC behavior:
  1690  					// see comment on the field for details.
  1691  					if !cc.rstStreamPingsBlocked {
  1692  						if cc.pendingResets == 0 {
  1693  							ping = true
  1694  						}
  1695  						cc.pendingResets++
  1696  					}
  1697  					cc.mu.Unlock()
  1698  				}
  1699  				cc.writeStreamReset(cs.ID, ErrCodeCancel, ping, err)
  1700  			}
  1701  		}
  1702  		cs.bufPipe.CloseWithError(err) // no-op if already closed
  1703  	} else {
  1704  		if cs.sentHeaders && !cs.sentEndStream {
  1705  			cc.writeStreamReset(cs.ID, ErrCodeNo, false, nil)
  1706  		}
  1707  		cs.bufPipe.CloseWithError(errRequestCanceled)
  1708  	}
  1709  	if cs.ID != 0 {
  1710  		cc.forgetStreamID(cs.ID)
  1711  	}
  1712  
  1713  	cc.wmu.Lock()
  1714  	werr := cc.werr
  1715  	cc.wmu.Unlock()
  1716  	if werr != nil {
  1717  		cc.Close()
  1718  	}
  1719  
  1720  	close(cs.donec)
  1721  	cc.maybeCallStateHook()
  1722  }
  1723  
  1724  // awaitOpenSlotForStreamLocked waits until len(streams) < maxConcurrentStreams.
  1725  // Must hold cc.mu.
  1726  func (cc *ClientConn) awaitOpenSlotForStreamLocked(cs *clientStream) error {
  1727  	for {
  1728  		if cc.closed && cc.nextStreamID == 1 && cc.streamsReserved == 0 {
  1729  			// This is the very first request sent to this connection.
  1730  			// Return a fatal error which aborts the retry loop.
  1731  			return errClientConnNotEstablished
  1732  		}
  1733  		cc.lastActive = time.Now()
  1734  		if cc.closed || !cc.canTakeNewRequestLocked() {
  1735  			return errClientConnUnusable
  1736  		}
  1737  		cc.lastIdle = time.Time{}
  1738  		if cc.currentRequestCountLocked() < int(cc.maxConcurrentStreams) {
  1739  			return nil
  1740  		}
  1741  		cc.pendingRequests++
  1742  		cc.cond.Wait()
  1743  		cc.pendingRequests--
  1744  		select {
  1745  		case <-cs.abort:
  1746  			return cs.abortErr
  1747  		default:
  1748  		}
  1749  	}
  1750  }
  1751  
  1752  // requires cc.wmu be held
  1753  func (cc *ClientConn) writeHeaders(streamID uint32, endStream bool, maxFrameSize int, hdrs []byte) error {
  1754  	first := true // first frame written (HEADERS is first, then CONTINUATION)
  1755  	for len(hdrs) > 0 && cc.werr == nil {
  1756  		chunk := hdrs
  1757  		if len(chunk) > maxFrameSize {
  1758  			chunk = chunk[:maxFrameSize]
  1759  		}
  1760  		hdrs = hdrs[len(chunk):]
  1761  		endHeaders := len(hdrs) == 0
  1762  		if first {
  1763  			cc.fr.WriteHeaders(HeadersFrameParam{
  1764  				StreamID:      streamID,
  1765  				BlockFragment: chunk,
  1766  				EndStream:     endStream,
  1767  				EndHeaders:    endHeaders,
  1768  			})
  1769  			first = false
  1770  		} else {
  1771  			cc.fr.WriteContinuation(streamID, endHeaders, chunk)
  1772  		}
  1773  	}
  1774  	cc.bw.Flush()
  1775  	return cc.werr
  1776  }
  1777  
  1778  // internal error values; they don't escape to callers
  1779  var (
  1780  	// abort request body write; don't send cancel
  1781  	errStopReqBodyWrite = errors.New("http2: aborting request body write")
  1782  
  1783  	// abort request body write, but send stream reset of cancel.
  1784  	errStopReqBodyWriteAndCancel = errors.New("http2: canceling request")
  1785  
  1786  	errReqBodyTooLong = errors.New("http2: request body larger than specified content length")
  1787  )
  1788  
  1789  // frameScratchBufferLen returns the length of a buffer to use for
  1790  // outgoing request bodies to read/write to/from.
  1791  //
  1792  // It returns max(1, min(peer's advertised max frame size,
  1793  // Request.ContentLength+1, 512KB)).
  1794  func (cs *clientStream) frameScratchBufferLen(maxFrameSize int) int {
  1795  	const max = 512 << 10
  1796  	n := min(int64(maxFrameSize), max)
  1797  	if cl := cs.reqBodyContentLength; cl != -1 && cl+1 < n {
  1798  		// Add an extra byte past the declared content-length to
  1799  		// give the caller's Request.Body io.Reader a chance to
  1800  		// give us more bytes than they declared, so we can catch it
  1801  		// early.
  1802  		n = cl + 1
  1803  	}
  1804  	if n < 1 {
  1805  		return 1
  1806  	}
  1807  	return int(n) // doesn't truncate; max is 512K
  1808  }
  1809  
  1810  // Seven bufPools manage different frame sizes. This helps to avoid scenarios where long-running
  1811  // streaming requests using small frame sizes occupy large buffers initially allocated for prior
  1812  // requests needing big buffers. The size ranges are as follows:
  1813  // {0 KB, 16 KB], {16 KB, 32 KB], {32 KB, 64 KB], {64 KB, 128 KB], {128 KB, 256 KB],
  1814  // {256 KB, 512 KB], {512 KB, infinity}
  1815  // In practice, the maximum scratch buffer size should not exceed 512 KB due to
  1816  // frameScratchBufferLen(maxFrameSize), thus the "infinity pool" should never be used.
  1817  // It exists mainly as a safety measure, for potential future increases in max buffer size.
  1818  var bufPools [7]sync.Pool // of *[]byte
  1819  func bufPoolIndex(size int) int {
  1820  	if size <= 16384 {
  1821  		return 0
  1822  	}
  1823  	size -= 1
  1824  	bits := bits.Len(uint(size))
  1825  	index := bits - 14
  1826  	if index >= len(bufPools) {
  1827  		return len(bufPools) - 1
  1828  	}
  1829  	return index
  1830  }
  1831  
  1832  func (cs *clientStream) writeRequestBody(req *ClientRequest) (err error) {
  1833  	cc := cs.cc
  1834  	body := cs.reqBody
  1835  	sentEnd := false // whether we sent the final DATA frame w/ END_STREAM
  1836  
  1837  	hasTrailers := req.Trailer != nil
  1838  	remainLen := cs.reqBodyContentLength
  1839  	hasContentLen := remainLen != -1
  1840  
  1841  	cc.mu.Lock()
  1842  	maxFrameSize := int(cc.maxFrameSize)
  1843  	cc.mu.Unlock()
  1844  
  1845  	// Scratch buffer for reading into & writing from.
  1846  	scratchLen := cs.frameScratchBufferLen(maxFrameSize)
  1847  	var buf []byte
  1848  	index := bufPoolIndex(scratchLen)
  1849  	if bp, ok := bufPools[index].Get().(*[]byte); ok && len(*bp) >= scratchLen {
  1850  		defer bufPools[index].Put(bp)
  1851  		buf = *bp
  1852  	} else {
  1853  		buf = make([]byte, scratchLen)
  1854  		defer bufPools[index].Put(&buf)
  1855  	}
  1856  
  1857  	var sawEOF bool
  1858  	for !sawEOF {
  1859  		n, err := body.Read(buf)
  1860  		if hasContentLen {
  1861  			remainLen -= int64(n)
  1862  			if remainLen == 0 && err == nil {
  1863  				// The request body's Content-Length was predeclared and
  1864  				// we just finished reading it all, but the underlying io.Reader
  1865  				// returned the final chunk with a nil error (which is one of
  1866  				// the two valid things a Reader can do at EOF). Because we'd prefer
  1867  				// to send the END_STREAM bit early, double-check that we're actually
  1868  				// at EOF. Subsequent reads should return (0, EOF) at this point.
  1869  				// If either value is different, we return an error in one of two ways below.
  1870  				var scratch [1]byte
  1871  				var n1 int
  1872  				n1, err = body.Read(scratch[:])
  1873  				remainLen -= int64(n1)
  1874  			}
  1875  			if remainLen < 0 {
  1876  				err = errReqBodyTooLong
  1877  				return err
  1878  			}
  1879  		}
  1880  		if err != nil {
  1881  			cc.mu.Lock()
  1882  			bodyClosed := cs.reqBodyClosed != nil
  1883  			cc.mu.Unlock()
  1884  			switch {
  1885  			case bodyClosed:
  1886  				return errStopReqBodyWrite
  1887  			case err == io.EOF:
  1888  				sawEOF = true
  1889  				err = nil
  1890  			default:
  1891  				return err
  1892  			}
  1893  		}
  1894  
  1895  		remain := buf[:n]
  1896  		for len(remain) > 0 && err == nil {
  1897  			var allowed int32
  1898  			allowed, err = cs.awaitFlowControl(len(remain))
  1899  			if err != nil {
  1900  				return err
  1901  			}
  1902  			cc.wmu.Lock()
  1903  			data := remain[:allowed]
  1904  			remain = remain[allowed:]
  1905  			sentEnd = sawEOF && len(remain) == 0 && !hasTrailers
  1906  			err = cc.fr.WriteData(cs.ID, sentEnd, data)
  1907  			if err == nil {
  1908  				// TODO(bradfitz): this flush is for latency, not bandwidth.
  1909  				// Most requests won't need this. Make this opt-in or
  1910  				// opt-out?  Use some heuristic on the body type? Nagel-like
  1911  				// timers?  Based on 'n'? Only last chunk of this for loop,
  1912  				// unless flow control tokens are low? For now, always.
  1913  				// If we change this, see comment below.
  1914  				err = cc.bw.Flush()
  1915  			}
  1916  			cc.wmu.Unlock()
  1917  		}
  1918  		if err != nil {
  1919  			return err
  1920  		}
  1921  	}
  1922  
  1923  	if sentEnd {
  1924  		// Already sent END_STREAM (which implies we have no
  1925  		// trailers) and flushed, because currently all
  1926  		// WriteData frames above get a flush. So we're done.
  1927  		return nil
  1928  	}
  1929  
  1930  	// Since the RoundTrip contract permits the caller to "mutate or reuse"
  1931  	// a request after the Response's Body is closed, verify that this hasn't
  1932  	// happened before accessing the trailers.
  1933  	cc.mu.Lock()
  1934  	trailer := req.Trailer
  1935  	err = cs.abortErr
  1936  	cc.mu.Unlock()
  1937  	if err != nil {
  1938  		return err
  1939  	}
  1940  
  1941  	cc.wmu.Lock()
  1942  	defer cc.wmu.Unlock()
  1943  	var trls []byte
  1944  	if len(trailer) > 0 {
  1945  		trls, err = cc.encodeTrailers(trailer)
  1946  		if err != nil {
  1947  			return err
  1948  		}
  1949  	}
  1950  
  1951  	// Two ways to send END_STREAM: either with trailers, or
  1952  	// with an empty DATA frame.
  1953  	if len(trls) > 0 {
  1954  		err = cc.writeHeaders(cs.ID, true, maxFrameSize, trls)
  1955  	} else {
  1956  		err = cc.fr.WriteData(cs.ID, true, nil)
  1957  	}
  1958  	if ferr := cc.bw.Flush(); ferr != nil && err == nil {
  1959  		err = ferr
  1960  	}
  1961  	return err
  1962  }
  1963  
  1964  // awaitFlowControl waits for [1, min(maxBytes, cc.cs.maxFrameSize)] flow
  1965  // control tokens from the server.
  1966  // It returns either the non-zero number of tokens taken or an error
  1967  // if the stream is dead.
  1968  func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
  1969  	cc := cs.cc
  1970  	ctx := cs.ctx
  1971  	cc.mu.Lock()
  1972  	defer cc.mu.Unlock()
  1973  	for {
  1974  		if cc.closed {
  1975  			return 0, errClientConnClosed
  1976  		}
  1977  		if cs.reqBodyClosed != nil {
  1978  			return 0, errStopReqBodyWrite
  1979  		}
  1980  		select {
  1981  		case <-cs.abort:
  1982  			return 0, cs.abortErr
  1983  		case <-ctx.Done():
  1984  			return 0, ctx.Err()
  1985  		case <-cs.reqCancel:
  1986  			return 0, errRequestCanceled
  1987  		default:
  1988  		}
  1989  		if a := cs.flow.available(); a > 0 {
  1990  			take := a
  1991  			if int(take) > maxBytes {
  1992  
  1993  				take = int32(maxBytes) // can't truncate int; take is int32
  1994  			}
  1995  			if take > int32(cc.maxFrameSize) {
  1996  				take = int32(cc.maxFrameSize)
  1997  			}
  1998  			cs.flow.take(take)
  1999  			return take, nil
  2000  		}
  2001  		cc.cond.Wait()
  2002  	}
  2003  }
  2004  
  2005  // requires cc.wmu be held.
  2006  func (cc *ClientConn) encodeTrailers(trailer Header) ([]byte, error) {
  2007  	cc.hbuf.Reset()
  2008  
  2009  	hlSize := uint64(0)
  2010  	for k, vv := range trailer {
  2011  		for _, v := range vv {
  2012  			hf := hpack.HeaderField{Name: k, Value: v}
  2013  			hlSize += uint64(hf.Size())
  2014  		}
  2015  	}
  2016  	if hlSize > cc.peerMaxHeaderListSize {
  2017  		return nil, errRequestHeaderListSize
  2018  	}
  2019  
  2020  	for k, vv := range trailer {
  2021  		lowKey, ascii := httpcommon.LowerHeader(k)
  2022  		if !ascii {
  2023  			// Skip writing invalid headers. Per RFC 7540, Section 8.1.2, header
  2024  			// field names have to be ASCII characters (just as in HTTP/1.x).
  2025  			continue
  2026  		}
  2027  		// Transfer-Encoding, etc.. have already been filtered at the
  2028  		// start of RoundTrip
  2029  		for _, v := range vv {
  2030  			cc.writeHeader(lowKey, v)
  2031  		}
  2032  	}
  2033  	return cc.hbuf.Bytes(), nil
  2034  }
  2035  
  2036  func (cc *ClientConn) writeHeader(name, value string) {
  2037  	if VerboseLogs {
  2038  		log.Printf("http2: Transport encoding header %q = %q", name, value)
  2039  	}
  2040  	cc.henc.WriteField(hpack.HeaderField{Name: name, Value: value})
  2041  }
  2042  
  2043  type resAndError struct {
  2044  	_   incomparable
  2045  	res *ClientResponse
  2046  	err error
  2047  }
  2048  
  2049  // requires cc.mu be held.
  2050  func (cc *ClientConn) addStreamLocked(cs *clientStream) {
  2051  	cs.flow.add(int32(cc.initialWindowSize))
  2052  	cs.flow.setConnFlow(&cc.flow)
  2053  	cs.inflow.init(cc.initialStreamRecvWindowSize)
  2054  	cs.ID = cc.nextStreamID
  2055  	cc.nextStreamID += 2
  2056  	cc.streams[cs.ID] = cs
  2057  	if cs.ID == 0 {
  2058  		panic("assigned stream ID 0")
  2059  	}
  2060  }
  2061  
  2062  func (cc *ClientConn) forgetStreamID(id uint32) {
  2063  	cc.mu.Lock()
  2064  	slen := len(cc.streams)
  2065  	delete(cc.streams, id)
  2066  	if len(cc.streams) != slen-1 {
  2067  		panic("forgetting unknown stream id")
  2068  	}
  2069  	cc.lastActive = time.Now()
  2070  	if len(cc.streams) == 0 && cc.idleTimer != nil {
  2071  		cc.idleTimer.Reset(cc.idleTimeout)
  2072  		cc.lastIdle = time.Now()
  2073  	}
  2074  	// Wake up writeRequestBody via clientStream.awaitFlowControl and
  2075  	// wake up RoundTrip if there is a pending request.
  2076  	cc.cond.Broadcast()
  2077  
  2078  	closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() || cc.goAway != nil
  2079  	if closeOnIdle && cc.streamsReserved == 0 && len(cc.streams) == 0 {
  2080  		if VerboseLogs {
  2081  			cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, cc.nextStreamID-2)
  2082  		}
  2083  		cc.closed = true
  2084  		defer cc.closeConn()
  2085  	}
  2086  
  2087  	cc.mu.Unlock()
  2088  }
  2089  
  2090  // clientConnReadLoop is the state owned by the clientConn's frame-reading readLoop.
  2091  type clientConnReadLoop struct {
  2092  	_  incomparable
  2093  	cc *ClientConn
  2094  }
  2095  
  2096  // readLoop runs in its own goroutine and reads and dispatches frames.
  2097  func (cc *ClientConn) readLoop() {
  2098  	rl := &clientConnReadLoop{cc: cc}
  2099  	defer rl.cleanup()
  2100  	cc.readerErr = rl.run()
  2101  	if ce, ok := cc.readerErr.(ConnectionError); ok {
  2102  		cc.wmu.Lock()
  2103  		cc.fr.WriteGoAway(0, ErrCode(ce), nil)
  2104  		cc.wmu.Unlock()
  2105  	}
  2106  }
  2107  
  2108  // GoAwayError is returned by the Transport when the server closes the
  2109  // TCP connection after sending a GOAWAY frame.
  2110  type GoAwayError struct {
  2111  	LastStreamID uint32
  2112  	ErrCode      ErrCode
  2113  	DebugData    string
  2114  }
  2115  
  2116  func (e GoAwayError) Error() string {
  2117  	return fmt.Sprintf("http2: server sent GOAWAY and closed the connection; LastStreamID=%v, ErrCode=%v, debug=%q",
  2118  		e.LastStreamID, e.ErrCode, e.DebugData)
  2119  }
  2120  
  2121  func isEOFOrNetReadError(err error) bool {
  2122  	if err == io.EOF {
  2123  		return true
  2124  	}
  2125  	ne, ok := err.(*net.OpError)
  2126  	return ok && ne.Op == "read"
  2127  }
  2128  
  2129  func (rl *clientConnReadLoop) cleanup() {
  2130  	cc := rl.cc
  2131  	defer cc.closeConn()
  2132  	defer close(cc.readerDone)
  2133  
  2134  	if cc.idleTimer != nil {
  2135  		cc.idleTimer.Stop()
  2136  	}
  2137  
  2138  	// Close any response bodies if the server closes prematurely.
  2139  	// TODO: also do this if we've written the headers but not
  2140  	// gotten a response yet.
  2141  	err := cc.readerErr
  2142  	cc.mu.Lock()
  2143  	if cc.goAway != nil && isEOFOrNetReadError(err) {
  2144  		err = GoAwayError{
  2145  			LastStreamID: cc.goAway.LastStreamID,
  2146  			ErrCode:      cc.goAway.ErrCode,
  2147  			DebugData:    cc.goAwayDebug,
  2148  		}
  2149  	} else if err == io.EOF {
  2150  		err = io.ErrUnexpectedEOF
  2151  	}
  2152  	cc.closed = true
  2153  
  2154  	// If the connection has never been used, and has been open for only a short time,
  2155  	// leave it in the connection pool for a little while.
  2156  	//
  2157  	// This avoids a situation where new connections are constantly created,
  2158  	// added to the pool, fail, and are removed from the pool, without any error
  2159  	// being surfaced to the user.
  2160  	unusedWaitTime := 5 * time.Second
  2161  	if cc.idleTimeout > 0 && unusedWaitTime > cc.idleTimeout {
  2162  		unusedWaitTime = cc.idleTimeout
  2163  	}
  2164  	idleTime := time.Now().Sub(cc.lastActive)
  2165  	if atomic.LoadUint32(&cc.atomicReused) == 0 && idleTime < unusedWaitTime && !cc.closedOnIdle {
  2166  		cc.idleTimer = time.AfterFunc(unusedWaitTime-idleTime, func() {
  2167  			cc.t.connPool().MarkDead(cc)
  2168  		})
  2169  	} else {
  2170  		cc.mu.Unlock() // avoid any deadlocks in MarkDead
  2171  		cc.t.connPool().MarkDead(cc)
  2172  		cc.mu.Lock()
  2173  	}
  2174  
  2175  	for _, cs := range cc.streams {
  2176  		select {
  2177  		case <-cs.peerClosed:
  2178  			// The server closed the stream before closing the conn,
  2179  			// so no need to interrupt it.
  2180  		default:
  2181  			cs.abortStreamLocked(err)
  2182  		}
  2183  	}
  2184  	cc.cond.Broadcast()
  2185  	cc.mu.Unlock()
  2186  
  2187  	if !cc.seenSettings {
  2188  		// If we have a pending request that wants extended CONNECT,
  2189  		// let it continue and fail with the connection error.
  2190  		cc.extendedConnectAllowed = true
  2191  		close(cc.seenSettingsChan)
  2192  	}
  2193  }
  2194  
  2195  // countReadFrameError calls Transport.CountError with a string
  2196  // representing err.
  2197  func (cc *ClientConn) countReadFrameError(err error) {
  2198  	f := cc.t.CountError
  2199  	if f == nil || err == nil {
  2200  		return
  2201  	}
  2202  	if ce, ok := err.(ConnectionError); ok {
  2203  		errCode := ErrCode(ce)
  2204  		f(fmt.Sprintf("read_frame_conn_error_%s", errCode.stringToken()))
  2205  		return
  2206  	}
  2207  	if errors.Is(err, io.EOF) {
  2208  		f("read_frame_eof")
  2209  		return
  2210  	}
  2211  	if errors.Is(err, io.ErrUnexpectedEOF) {
  2212  		f("read_frame_unexpected_eof")
  2213  		return
  2214  	}
  2215  	if errors.Is(err, ErrFrameTooLarge) {
  2216  		f("read_frame_too_large")
  2217  		return
  2218  	}
  2219  	f("read_frame_other")
  2220  }
  2221  
  2222  func (rl *clientConnReadLoop) run() error {
  2223  	cc := rl.cc
  2224  	gotSettings := false
  2225  	readIdleTimeout := cc.readIdleTimeout
  2226  	var t *time.Timer
  2227  	if readIdleTimeout != 0 {
  2228  		t = time.AfterFunc(readIdleTimeout, cc.healthCheck)
  2229  	}
  2230  	for {
  2231  		f, err := cc.fr.ReadFrame()
  2232  		if t != nil {
  2233  			t.Reset(readIdleTimeout)
  2234  		}
  2235  		if err != nil {
  2236  			cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
  2237  		}
  2238  		if se, ok := err.(StreamError); ok {
  2239  			if cs := rl.streamByID(se.StreamID, notHeaderOrDataFrame); cs != nil {
  2240  				if se.Cause == nil {
  2241  					se.Cause = cc.fr.errDetail
  2242  				}
  2243  				rl.endStreamError(cs, se)
  2244  			}
  2245  			continue
  2246  		} else if err != nil {
  2247  			cc.countReadFrameError(err)
  2248  			return err
  2249  		}
  2250  		if VerboseLogs {
  2251  			cc.vlogf("http2: Transport received %s", summarizeFrame(f))
  2252  		}
  2253  		if !gotSettings {
  2254  			if _, ok := f.(*SettingsFrame); !ok {
  2255  				cc.logf("protocol error: received %T before a SETTINGS frame", f)
  2256  				return ConnectionError(ErrCodeProtocol)
  2257  			}
  2258  			gotSettings = true
  2259  		}
  2260  
  2261  		switch f := f.(type) {
  2262  		case *MetaHeadersFrame:
  2263  			err = rl.processHeaders(f)
  2264  		case *DataFrame:
  2265  			err = rl.processData(f)
  2266  		case *GoAwayFrame:
  2267  			err = rl.processGoAway(f)
  2268  		case *RSTStreamFrame:
  2269  			err = rl.processResetStream(f)
  2270  		case *SettingsFrame:
  2271  			err = rl.processSettings(f)
  2272  		case *PushPromiseFrame:
  2273  			err = rl.processPushPromise(f)
  2274  		case *WindowUpdateFrame:
  2275  			err = rl.processWindowUpdate(f)
  2276  		case *PingFrame:
  2277  			err = rl.processPing(f)
  2278  		default:
  2279  			cc.logf("Transport: unhandled response frame type %T", f)
  2280  		}
  2281  		if err != nil {
  2282  			if VerboseLogs {
  2283  				cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, summarizeFrame(f), err)
  2284  			}
  2285  			return err
  2286  		}
  2287  	}
  2288  }
  2289  
  2290  func (rl *clientConnReadLoop) processHeaders(f *MetaHeadersFrame) error {
  2291  	cs := rl.streamByID(f.StreamID, headerOrDataFrame)
  2292  	if cs == nil {
  2293  		// We'd get here if we canceled a request while the
  2294  		// server had its response still in flight. So if this
  2295  		// was just something we canceled, ignore it.
  2296  		return nil
  2297  	}
  2298  	if cs.readClosed {
  2299  		rl.endStreamError(cs, StreamError{
  2300  			StreamID: f.StreamID,
  2301  			Code:     ErrCodeProtocol,
  2302  			Cause:    errors.New("protocol error: headers after END_STREAM"),
  2303  		})
  2304  		return nil
  2305  	}
  2306  	if !cs.firstByte {
  2307  		if cs.trace != nil {
  2308  			// TODO(bradfitz): move first response byte earlier,
  2309  			// when we first read the 9 byte header, not waiting
  2310  			// until all the HEADERS+CONTINUATION frames have been
  2311  			// merged. This works for now.
  2312  			traceFirstResponseByte(cs.trace)
  2313  		}
  2314  		cs.firstByte = true
  2315  	}
  2316  	if !cs.pastHeaders {
  2317  		cs.pastHeaders = true
  2318  	} else {
  2319  		return rl.processTrailers(cs, f)
  2320  	}
  2321  
  2322  	res, err := rl.handleResponse(cs, f)
  2323  	if err != nil {
  2324  		if _, ok := err.(ConnectionError); ok {
  2325  			return err
  2326  		}
  2327  		// Any other error type is a stream error.
  2328  		rl.endStreamError(cs, StreamError{
  2329  			StreamID: f.StreamID,
  2330  			Code:     ErrCodeProtocol,
  2331  			Cause:    err,
  2332  		})
  2333  		return nil // return nil from process* funcs to keep conn alive
  2334  	}
  2335  	if res == nil {
  2336  		// (nil, nil) special case. See handleResponse docs.
  2337  		return nil
  2338  	}
  2339  	cs.res = res
  2340  	close(cs.respHeaderRecv)
  2341  	if f.StreamEnded() {
  2342  		rl.endStream(cs)
  2343  	}
  2344  	return nil
  2345  }
  2346  
  2347  // may return error types nil, or ConnectionError. Any other error value
  2348  // is a StreamError of type ErrCodeProtocol. The returned error in that case
  2349  // is the detail.
  2350  //
  2351  // As a special case, handleResponse may return (nil, nil) to skip the
  2352  // frame (currently only used for 1xx responses).
  2353  func (rl *clientConnReadLoop) handleResponse(cs *clientStream, f *MetaHeadersFrame) (*ClientResponse, error) {
  2354  	if f.Truncated {
  2355  		return nil, errResponseHeaderListSize
  2356  	}
  2357  
  2358  	status := f.PseudoValue("status")
  2359  	if status == "" {
  2360  		return nil, errors.New("malformed response from server: missing status pseudo header")
  2361  	}
  2362  	statusCode, err := strconv.Atoi(status)
  2363  	if err != nil {
  2364  		return nil, errors.New("malformed response from server: malformed non-numeric status pseudo header")
  2365  	}
  2366  
  2367  	regularFields := f.RegularFields()
  2368  	strs := make([]string, len(regularFields))
  2369  	header := make(Header, len(regularFields))
  2370  	res := &cs.staticResp
  2371  	cs.staticResp = ClientResponse{
  2372  		Header:     header,
  2373  		StatusCode: statusCode,
  2374  		Status:     status,
  2375  	}
  2376  	for _, hf := range regularFields {
  2377  		key := httpcommon.CanonicalHeader(hf.Name)
  2378  		if key == "Trailer" {
  2379  			t := res.Trailer
  2380  			if t == nil {
  2381  				t = make(Header)
  2382  				res.Trailer = t
  2383  			}
  2384  			foreachHeaderElement(hf.Value, func(v string) {
  2385  				t[httpcommon.CanonicalHeader(v)] = nil
  2386  			})
  2387  		} else {
  2388  			vv := header[key]
  2389  			if vv == nil && len(strs) > 0 {
  2390  				// More than likely this will be a single-element key.
  2391  				// Most headers aren't multi-valued.
  2392  				// Set the capacity on strs[0] to 1, so any future append
  2393  				// won't extend the slice into the other strings.
  2394  				vv, strs = strs[:1:1], strs[1:]
  2395  				vv[0] = hf.Value
  2396  				header[key] = vv
  2397  			} else {
  2398  				header[key] = append(vv, hf.Value)
  2399  			}
  2400  		}
  2401  	}
  2402  
  2403  	if statusCode >= 100 && statusCode <= 199 {
  2404  		if f.StreamEnded() {
  2405  			return nil, errors.New("1xx informational response with END_STREAM flag")
  2406  		}
  2407  		if fn := cs.get1xxTraceFunc(); fn != nil {
  2408  			// If the 1xx response is being delivered to the user,
  2409  			// then they're responsible for limiting the number
  2410  			// of responses.
  2411  			if err := fn(statusCode, textproto.MIMEHeader(header)); err != nil {
  2412  				return nil, err
  2413  			}
  2414  		} else {
  2415  			// If the user didn't examine the 1xx response, then we
  2416  			// limit the size of all 1xx headers.
  2417  			//
  2418  			// This differs a bit from the HTTP/1 implementation, which
  2419  			// limits the size of all 1xx headers plus the final response.
  2420  			// Use the larger limit of MaxHeaderListSize and
  2421  			// net/http.Transport.MaxResponseHeaderBytes.
  2422  			limit := int64(cs.cc.t.maxHeaderListSize())
  2423  			if t1 := cs.cc.t.t1; t1 != nil && t1.MaxResponseHeaderBytes() > limit {
  2424  				limit = t1.MaxResponseHeaderBytes()
  2425  			}
  2426  			for _, h := range f.Fields {
  2427  				cs.totalHeaderSize += int64(h.Size())
  2428  			}
  2429  			if cs.totalHeaderSize > limit {
  2430  				if VerboseLogs {
  2431  					log.Printf("http2: 1xx informational responses too large")
  2432  				}
  2433  				return nil, errors.New("header list too large")
  2434  			}
  2435  		}
  2436  		if statusCode == 100 {
  2437  			traceGot100Continue(cs.trace)
  2438  			select {
  2439  			case cs.on100 <- struct{}{}:
  2440  			default:
  2441  			}
  2442  		}
  2443  		cs.pastHeaders = false // do it all again
  2444  		return nil, nil
  2445  	}
  2446  
  2447  	res.ContentLength = -1
  2448  	if clens := res.Header["Content-Length"]; len(clens) == 1 {
  2449  		if cl, err := strconv.ParseUint(clens[0], 10, 63); err == nil {
  2450  			res.ContentLength = int64(cl)
  2451  		} else {
  2452  			// TODO: care? unlike http/1, it won't mess up our framing, so it's
  2453  			// more safe smuggling-wise to ignore.
  2454  		}
  2455  	} else if len(clens) > 1 {
  2456  		// TODO: care? unlike http/1, it won't mess up our framing, so it's
  2457  		// more safe smuggling-wise to ignore.
  2458  	} else if f.StreamEnded() && !cs.isHead {
  2459  		res.ContentLength = 0
  2460  	}
  2461  
  2462  	if cs.isHead {
  2463  		res.Body = NoBody
  2464  		return res, nil
  2465  	}
  2466  
  2467  	if f.StreamEnded() {
  2468  		if res.ContentLength > 0 {
  2469  			res.Body = missingBody{}
  2470  		} else {
  2471  			res.Body = NoBody
  2472  		}
  2473  		return res, nil
  2474  	}
  2475  
  2476  	cs.bufPipe.setBuffer(&dataBuffer{expected: res.ContentLength})
  2477  	cs.bytesRemain = res.ContentLength
  2478  	res.Body = transportResponseBody{cs}
  2479  
  2480  	if cs.requestedGzip && asciiEqualFold(res.Header.Get("Content-Encoding"), "gzip") {
  2481  		res.Header.Del("Content-Encoding")
  2482  		res.Header.Del("Content-Length")
  2483  		res.ContentLength = -1
  2484  		res.Body = &gzipReader{body: res.Body}
  2485  		res.Uncompressed = true
  2486  	}
  2487  	return res, nil
  2488  }
  2489  
  2490  func (rl *clientConnReadLoop) processTrailers(cs *clientStream, f *MetaHeadersFrame) error {
  2491  	if cs.pastTrailers {
  2492  		// Too many HEADERS frames for this stream.
  2493  		return ConnectionError(ErrCodeProtocol)
  2494  	}
  2495  	cs.pastTrailers = true
  2496  	if !f.StreamEnded() {
  2497  		// We expect that any headers for trailers also
  2498  		// has END_STREAM.
  2499  		return ConnectionError(ErrCodeProtocol)
  2500  	}
  2501  	if len(f.PseudoFields()) > 0 {
  2502  		// No pseudo header fields are defined for trailers.
  2503  		// TODO: ConnectionError might be overly harsh? Check.
  2504  		return ConnectionError(ErrCodeProtocol)
  2505  	}
  2506  
  2507  	trailer := make(Header)
  2508  	for _, hf := range f.RegularFields() {
  2509  		key := httpcommon.CanonicalHeader(hf.Name)
  2510  		trailer[key] = append(trailer[key], hf.Value)
  2511  	}
  2512  	cs.trailer = trailer
  2513  
  2514  	rl.endStream(cs)
  2515  	return nil
  2516  }
  2517  
  2518  // transportResponseBody is the concrete type of Transport.RoundTrip's
  2519  // Response.Body. It is an io.ReadCloser.
  2520  type transportResponseBody struct {
  2521  	cs *clientStream
  2522  }
  2523  
  2524  func (b transportResponseBody) Read(p []byte) (n int, err error) {
  2525  	cs := b.cs
  2526  	cc := cs.cc
  2527  
  2528  	if cs.readErr != nil {
  2529  		return 0, cs.readErr
  2530  	}
  2531  	n, err = b.cs.bufPipe.Read(p)
  2532  	if cs.bytesRemain != -1 {
  2533  		if int64(n) > cs.bytesRemain {
  2534  			n = int(cs.bytesRemain)
  2535  			if err == nil {
  2536  				err = errors.New("net/http: server replied with more than declared Content-Length; truncated")
  2537  				cs.abortStream(err)
  2538  			}
  2539  			cs.readErr = err
  2540  			return int(cs.bytesRemain), err
  2541  		}
  2542  		cs.bytesRemain -= int64(n)
  2543  		if err == io.EOF && cs.bytesRemain > 0 {
  2544  			err = io.ErrUnexpectedEOF
  2545  			cs.readErr = err
  2546  			return n, err
  2547  		}
  2548  	}
  2549  	if n == 0 {
  2550  		// No flow control tokens to send back.
  2551  		return
  2552  	}
  2553  
  2554  	cc.mu.Lock()
  2555  	connAdd := cc.inflow.add(n)
  2556  	var streamAdd int32
  2557  	if err == nil { // No need to refresh if the stream is over or failed.
  2558  		streamAdd = cs.inflow.add(n)
  2559  	}
  2560  	cc.mu.Unlock()
  2561  
  2562  	if connAdd != 0 || streamAdd != 0 {
  2563  		cc.wmu.Lock()
  2564  		defer cc.wmu.Unlock()
  2565  		if connAdd != 0 {
  2566  			cc.fr.WriteWindowUpdate(0, mustUint31(connAdd))
  2567  		}
  2568  		if streamAdd != 0 {
  2569  			cc.fr.WriteWindowUpdate(cs.ID, mustUint31(streamAdd))
  2570  		}
  2571  		cc.bw.Flush()
  2572  	}
  2573  	return
  2574  }
  2575  
  2576  var errClosedResponseBody = errors.New("http2: response body closed")
  2577  
  2578  func (b transportResponseBody) Close() error {
  2579  	cs := b.cs
  2580  	cc := cs.cc
  2581  
  2582  	cs.bufPipe.BreakWithError(errClosedResponseBody)
  2583  	cs.abortStream(errClosedResponseBody)
  2584  
  2585  	unread := cs.bufPipe.Len()
  2586  	if unread > 0 {
  2587  		cc.mu.Lock()
  2588  		// Return connection-level flow control.
  2589  		connAdd := cc.inflow.add(unread)
  2590  		cc.mu.Unlock()
  2591  
  2592  		// TODO(dneil): Acquiring this mutex can block indefinitely.
  2593  		// Move flow control return to a goroutine?
  2594  		cc.wmu.Lock()
  2595  		// Return connection-level flow control.
  2596  		if connAdd > 0 {
  2597  			cc.fr.WriteWindowUpdate(0, uint32(connAdd))
  2598  		}
  2599  		cc.bw.Flush()
  2600  		cc.wmu.Unlock()
  2601  	}
  2602  
  2603  	select {
  2604  	case <-cs.donec:
  2605  	case <-cs.ctx.Done():
  2606  		// See golang/go#49366: The net/http package can cancel the
  2607  		// request context after the response body is fully read.
  2608  		// Don't treat this as an error.
  2609  		return nil
  2610  	case <-cs.reqCancel:
  2611  		return errRequestCanceled
  2612  	}
  2613  	return nil
  2614  }
  2615  
  2616  func (rl *clientConnReadLoop) processData(f *DataFrame) error {
  2617  	cc := rl.cc
  2618  	cs := rl.streamByID(f.StreamID, headerOrDataFrame)
  2619  	data := f.Data()
  2620  	if cs == nil {
  2621  		cc.mu.Lock()
  2622  		neverSent := cc.nextStreamID
  2623  		cc.mu.Unlock()
  2624  		if f.StreamID >= neverSent {
  2625  			// We never asked for this.
  2626  			cc.logf("http2: Transport received unsolicited DATA frame; closing connection")
  2627  			return ConnectionError(ErrCodeProtocol)
  2628  		}
  2629  		// We probably did ask for this, but canceled. Just ignore it.
  2630  		// TODO: be stricter here? only silently ignore things which
  2631  		// we canceled, but not things which were closed normally
  2632  		// by the peer? Tough without accumulating too much state.
  2633  
  2634  		// But at least return their flow control:
  2635  		if f.Length > 0 {
  2636  			cc.mu.Lock()
  2637  			ok := cc.inflow.take(f.Length)
  2638  			connAdd := cc.inflow.add(int(f.Length))
  2639  			cc.mu.Unlock()
  2640  			if !ok {
  2641  				return ConnectionError(ErrCodeFlowControl)
  2642  			}
  2643  			if connAdd > 0 {
  2644  				cc.wmu.Lock()
  2645  				cc.fr.WriteWindowUpdate(0, uint32(connAdd))
  2646  				cc.bw.Flush()
  2647  				cc.wmu.Unlock()
  2648  			}
  2649  		}
  2650  		return nil
  2651  	}
  2652  	if cs.readClosed {
  2653  		cc.logf("protocol error: received DATA after END_STREAM")
  2654  		rl.endStreamError(cs, StreamError{
  2655  			StreamID: f.StreamID,
  2656  			Code:     ErrCodeProtocol,
  2657  		})
  2658  		return nil
  2659  	}
  2660  	if !cs.pastHeaders {
  2661  		cc.logf("protocol error: received DATA before a HEADERS frame")
  2662  		rl.endStreamError(cs, StreamError{
  2663  			StreamID: f.StreamID,
  2664  			Code:     ErrCodeProtocol,
  2665  		})
  2666  		return nil
  2667  	}
  2668  	if f.Length > 0 {
  2669  		if cs.isHead && len(data) > 0 {
  2670  			cc.logf("protocol error: received DATA on a HEAD request")
  2671  			rl.endStreamError(cs, StreamError{
  2672  				StreamID: f.StreamID,
  2673  				Code:     ErrCodeProtocol,
  2674  			})
  2675  			return nil
  2676  		}
  2677  		// Check connection-level flow control.
  2678  		cc.mu.Lock()
  2679  		if !takeInflows(&cc.inflow, &cs.inflow, f.Length) {
  2680  			cc.mu.Unlock()
  2681  			return ConnectionError(ErrCodeFlowControl)
  2682  		}
  2683  		// Return any padded flow control now, since we won't
  2684  		// refund it later on body reads.
  2685  		var refund int
  2686  		if pad := int(f.Length) - len(data); pad > 0 {
  2687  			refund += pad
  2688  		}
  2689  
  2690  		didReset := false
  2691  		var err error
  2692  		if len(data) > 0 {
  2693  			if _, err = cs.bufPipe.Write(data); err != nil {
  2694  				// Return len(data) now if the stream is already closed,
  2695  				// since data will never be read.
  2696  				didReset = true
  2697  				refund += len(data)
  2698  			}
  2699  		}
  2700  
  2701  		sendConn := cc.inflow.add(refund)
  2702  		var sendStream int32
  2703  		if !didReset {
  2704  			sendStream = cs.inflow.add(refund)
  2705  		}
  2706  		cc.mu.Unlock()
  2707  
  2708  		if sendConn > 0 || sendStream > 0 {
  2709  			cc.wmu.Lock()
  2710  			if sendConn > 0 {
  2711  				cc.fr.WriteWindowUpdate(0, uint32(sendConn))
  2712  			}
  2713  			if sendStream > 0 {
  2714  				cc.fr.WriteWindowUpdate(cs.ID, uint32(sendStream))
  2715  			}
  2716  			cc.bw.Flush()
  2717  			cc.wmu.Unlock()
  2718  		}
  2719  
  2720  		if err != nil {
  2721  			rl.endStreamError(cs, err)
  2722  			return nil
  2723  		}
  2724  	}
  2725  
  2726  	if f.StreamEnded() {
  2727  		rl.endStream(cs)
  2728  	}
  2729  	return nil
  2730  }
  2731  
  2732  func (rl *clientConnReadLoop) endStream(cs *clientStream) {
  2733  	// TODO: check that any declared content-length matches, like
  2734  	// server.go's (*stream).endStream method.
  2735  	if !cs.readClosed {
  2736  		cs.readClosed = true
  2737  		// Close cs.bufPipe and cs.peerClosed with cc.mu held to avoid a
  2738  		// race condition: The caller can read io.EOF from Response.Body
  2739  		// and close the body before we close cs.peerClosed, causing
  2740  		// cleanupWriteRequest to send a RST_STREAM.
  2741  		rl.cc.mu.Lock()
  2742  		defer rl.cc.mu.Unlock()
  2743  		cs.bufPipe.closeWithErrorAndCode(io.EOF, cs.copyTrailers)
  2744  		close(cs.peerClosed)
  2745  	}
  2746  }
  2747  
  2748  func (rl *clientConnReadLoop) endStreamError(cs *clientStream, err error) {
  2749  	cs.readAborted = true
  2750  	cs.abortStream(err)
  2751  }
  2752  
  2753  func (rl *clientConnReadLoop) endStreamErrorLocked(cs *clientStream, err error) {
  2754  	cs.readAborted = true
  2755  	cs.abortStreamLocked(err)
  2756  }
  2757  
  2758  // Constants passed to streamByID for documentation purposes.
  2759  const (
  2760  	headerOrDataFrame    = true
  2761  	notHeaderOrDataFrame = false
  2762  )
  2763  
  2764  // streamByID returns the stream with the given id, or nil if no stream has that id.
  2765  // If headerOrData is true, it clears rst.StreamPingsBlocked.
  2766  func (rl *clientConnReadLoop) streamByID(id uint32, headerOrData bool) *clientStream {
  2767  	rl.cc.mu.Lock()
  2768  	defer rl.cc.mu.Unlock()
  2769  	if headerOrData {
  2770  		// Work around an unfortunate gRPC behavior.
  2771  		// See comment on ClientConn.rstStreamPingsBlocked for details.
  2772  		rl.cc.rstStreamPingsBlocked = false
  2773  	}
  2774  	rl.cc.readBeforeStreamID = rl.cc.nextStreamID
  2775  	cs := rl.cc.streams[id]
  2776  	if cs != nil && !cs.readAborted {
  2777  		return cs
  2778  	}
  2779  	return nil
  2780  }
  2781  
  2782  func (cs *clientStream) copyTrailers() {
  2783  	for k, vv := range cs.trailer {
  2784  		t := cs.resTrailer
  2785  		if *t == nil {
  2786  			*t = make(Header)
  2787  		}
  2788  		(*t)[k] = vv
  2789  	}
  2790  }
  2791  
  2792  func (rl *clientConnReadLoop) processGoAway(f *GoAwayFrame) error {
  2793  	cc := rl.cc
  2794  	cc.t.connPool().MarkDead(cc)
  2795  	if f.ErrCode != 0 {
  2796  		// TODO: deal with GOAWAY more. particularly the error code
  2797  		cc.vlogf("transport got GOAWAY with error code = %v", f.ErrCode)
  2798  		if fn := cc.t.CountError; fn != nil {
  2799  			fn("recv_goaway_" + f.ErrCode.stringToken())
  2800  		}
  2801  	}
  2802  	cc.setGoAway(f)
  2803  	return nil
  2804  }
  2805  
  2806  func (rl *clientConnReadLoop) processSettings(f *SettingsFrame) error {
  2807  	cc := rl.cc
  2808  	// Locking both mu and wmu here allows frame encoding to read settings with only wmu held.
  2809  	// Acquiring wmu when f.IsAck() is unnecessary, but convenient and mostly harmless.
  2810  	cc.wmu.Lock()
  2811  	defer cc.wmu.Unlock()
  2812  
  2813  	if err := rl.processSettingsNoWrite(f); err != nil {
  2814  		return err
  2815  	}
  2816  	if !f.IsAck() {
  2817  		cc.fr.WriteSettingsAck()
  2818  		cc.bw.Flush()
  2819  	}
  2820  	return nil
  2821  }
  2822  
  2823  func (rl *clientConnReadLoop) processSettingsNoWrite(f *SettingsFrame) error {
  2824  	cc := rl.cc
  2825  	defer cc.maybeCallStateHook()
  2826  	cc.mu.Lock()
  2827  	defer cc.mu.Unlock()
  2828  
  2829  	if f.IsAck() {
  2830  		if cc.wantSettingsAck {
  2831  			cc.wantSettingsAck = false
  2832  			return nil
  2833  		}
  2834  		return ConnectionError(ErrCodeProtocol)
  2835  	}
  2836  
  2837  	var seenMaxConcurrentStreams bool
  2838  	err := f.ForeachSetting(func(s Setting) error {
  2839  		if err := s.Valid(); err != nil {
  2840  			return err
  2841  		}
  2842  		switch s.ID {
  2843  		case SettingMaxFrameSize:
  2844  			cc.maxFrameSize = s.Val
  2845  		case SettingMaxConcurrentStreams:
  2846  			cc.maxConcurrentStreams = s.Val
  2847  			seenMaxConcurrentStreams = true
  2848  		case SettingMaxHeaderListSize:
  2849  			cc.peerMaxHeaderListSize = uint64(s.Val)
  2850  		case SettingInitialWindowSize:
  2851  			// Values above the maximum flow-control
  2852  			// window size of 2^31-1 MUST be treated as a
  2853  			// connection error (Section 5.4.1) of type
  2854  			// FLOW_CONTROL_ERROR.
  2855  			if s.Val > math.MaxInt32 {
  2856  				return ConnectionError(ErrCodeFlowControl)
  2857  			}
  2858  
  2859  			// Adjust flow control of currently-open
  2860  			// frames by the difference of the old initial
  2861  			// window size and this one.
  2862  			delta := int32(s.Val) - int32(cc.initialWindowSize)
  2863  			for _, cs := range cc.streams {
  2864  				cs.flow.add(delta)
  2865  			}
  2866  			cc.cond.Broadcast()
  2867  
  2868  			cc.initialWindowSize = s.Val
  2869  		case SettingHeaderTableSize:
  2870  			cc.henc.SetMaxDynamicTableSize(s.Val)
  2871  			cc.peerMaxHeaderTableSize = s.Val
  2872  		case SettingEnableConnectProtocol:
  2873  			// If the peer wants to send us SETTINGS_ENABLE_CONNECT_PROTOCOL,
  2874  			// we require that it do so in the first SETTINGS frame.
  2875  			//
  2876  			// When we attempt to use extended CONNECT, we wait for the first
  2877  			// SETTINGS frame to see if the server supports it. If we let the
  2878  			// server enable the feature with a later SETTINGS frame, then
  2879  			// users will see inconsistent results depending on whether we've
  2880  			// seen that frame or not.
  2881  			if !cc.seenSettings {
  2882  				cc.extendedConnectAllowed = s.Val == 1
  2883  			}
  2884  		default:
  2885  			cc.vlogf("Unhandled Setting: %v", s)
  2886  		}
  2887  		return nil
  2888  	})
  2889  	if err != nil {
  2890  		return err
  2891  	}
  2892  
  2893  	if !cc.seenSettings {
  2894  		if !seenMaxConcurrentStreams {
  2895  			// This was the servers initial SETTINGS frame and it
  2896  			// didn't contain a MAX_CONCURRENT_STREAMS field so
  2897  			// increase the number of concurrent streams this
  2898  			// connection can establish to our default.
  2899  			cc.maxConcurrentStreams = defaultMaxConcurrentStreams
  2900  		}
  2901  		close(cc.seenSettingsChan)
  2902  		cc.seenSettings = true
  2903  	}
  2904  
  2905  	return nil
  2906  }
  2907  
  2908  func (rl *clientConnReadLoop) processWindowUpdate(f *WindowUpdateFrame) error {
  2909  	cc := rl.cc
  2910  	cs := rl.streamByID(f.StreamID, notHeaderOrDataFrame)
  2911  	if f.StreamID != 0 && cs == nil {
  2912  		return nil
  2913  	}
  2914  
  2915  	cc.mu.Lock()
  2916  	defer cc.mu.Unlock()
  2917  
  2918  	fl := &cc.flow
  2919  	if cs != nil {
  2920  		fl = &cs.flow
  2921  	}
  2922  	if !fl.add(int32(f.Increment)) {
  2923  		// For stream, the sender sends RST_STREAM with an error code of FLOW_CONTROL_ERROR
  2924  		if cs != nil {
  2925  			rl.endStreamErrorLocked(cs, StreamError{
  2926  				StreamID: f.StreamID,
  2927  				Code:     ErrCodeFlowControl,
  2928  			})
  2929  			return nil
  2930  		}
  2931  
  2932  		return ConnectionError(ErrCodeFlowControl)
  2933  	}
  2934  	cc.cond.Broadcast()
  2935  	return nil
  2936  }
  2937  
  2938  func (rl *clientConnReadLoop) processResetStream(f *RSTStreamFrame) error {
  2939  	cs := rl.streamByID(f.StreamID, notHeaderOrDataFrame)
  2940  	if cs == nil {
  2941  		// TODO: return error if server tries to RST_STREAM an idle stream
  2942  		return nil
  2943  	}
  2944  	serr := streamError(cs.ID, f.ErrCode)
  2945  	serr.Cause = errFromPeer
  2946  	if f.ErrCode == ErrCodeProtocol {
  2947  		rl.cc.SetDoNotReuse()
  2948  	}
  2949  	if fn := cs.cc.t.CountError; fn != nil {
  2950  		fn("recv_rststream_" + f.ErrCode.stringToken())
  2951  	}
  2952  	cs.abortStream(serr)
  2953  
  2954  	cs.bufPipe.CloseWithError(serr)
  2955  	return nil
  2956  }
  2957  
  2958  // Ping sends a PING frame to the server and waits for the ack.
  2959  func (cc *ClientConn) Ping(ctx context.Context) error {
  2960  	c := make(chan struct{})
  2961  	// Generate a random payload
  2962  	var p [8]byte
  2963  	for {
  2964  		if _, err := rand.Read(p[:]); err != nil {
  2965  			return err
  2966  		}
  2967  		cc.mu.Lock()
  2968  		// check for dup before insert
  2969  		if _, found := cc.pings[p]; !found {
  2970  			cc.pings[p] = c
  2971  			cc.mu.Unlock()
  2972  			break
  2973  		}
  2974  		cc.mu.Unlock()
  2975  	}
  2976  	var pingError error
  2977  	errc := make(chan struct{})
  2978  	go func() {
  2979  		cc.wmu.Lock()
  2980  		defer cc.wmu.Unlock()
  2981  		if pingError = cc.fr.WritePing(false, p); pingError != nil {
  2982  			close(errc)
  2983  			return
  2984  		}
  2985  		if pingError = cc.bw.Flush(); pingError != nil {
  2986  			close(errc)
  2987  			return
  2988  		}
  2989  	}()
  2990  	select {
  2991  	case <-c:
  2992  		return nil
  2993  	case <-errc:
  2994  		return pingError
  2995  	case <-ctx.Done():
  2996  		return ctx.Err()
  2997  	case <-cc.readerDone:
  2998  		// connection closed
  2999  		return cc.readerErr
  3000  	}
  3001  }
  3002  
  3003  func (rl *clientConnReadLoop) processPing(f *PingFrame) error {
  3004  	if f.IsAck() {
  3005  		cc := rl.cc
  3006  		defer cc.maybeCallStateHook()
  3007  		cc.mu.Lock()
  3008  		defer cc.mu.Unlock()
  3009  		// If ack, notify listener if any
  3010  		if c, ok := cc.pings[f.Data]; ok {
  3011  			close(c)
  3012  			delete(cc.pings, f.Data)
  3013  		}
  3014  		if cc.pendingResets > 0 {
  3015  			// See clientStream.cleanupWriteRequest.
  3016  			cc.pendingResets = 0
  3017  			cc.rstStreamPingsBlocked = true
  3018  			cc.cond.Broadcast()
  3019  		}
  3020  		return nil
  3021  	}
  3022  	cc := rl.cc
  3023  	cc.wmu.Lock()
  3024  	defer cc.wmu.Unlock()
  3025  	if err := cc.fr.WritePing(true, f.Data); err != nil {
  3026  		return err
  3027  	}
  3028  	return cc.bw.Flush()
  3029  }
  3030  
  3031  func (rl *clientConnReadLoop) processPushPromise(f *PushPromiseFrame) error {
  3032  	// We told the peer we don't want them.
  3033  	// Spec says:
  3034  	// "PUSH_PROMISE MUST NOT be sent if the SETTINGS_ENABLE_PUSH
  3035  	// setting of the peer endpoint is set to 0. An endpoint that
  3036  	// has set this setting and has received acknowledgement MUST
  3037  	// treat the receipt of a PUSH_PROMISE frame as a connection
  3038  	// error (Section 5.4.1) of type PROTOCOL_ERROR."
  3039  	return ConnectionError(ErrCodeProtocol)
  3040  }
  3041  
  3042  // writeStreamReset sends a RST_STREAM frame.
  3043  // When ping is true, it also sends a PING frame with a random payload.
  3044  func (cc *ClientConn) writeStreamReset(streamID uint32, code ErrCode, ping bool, err error) {
  3045  	// TODO: map err to more interesting error codes, once the
  3046  	// HTTP community comes up with some. But currently for
  3047  	// RST_STREAM there's no equivalent to GOAWAY frame's debug
  3048  	// data, and the error codes are all pretty vague ("cancel").
  3049  	cc.wmu.Lock()
  3050  	cc.fr.WriteRSTStream(streamID, code)
  3051  	if ping {
  3052  		var payload [8]byte
  3053  		rand.Read(payload[:])
  3054  		cc.fr.WritePing(false, payload)
  3055  	}
  3056  	cc.bw.Flush()
  3057  	cc.wmu.Unlock()
  3058  }
  3059  
  3060  var (
  3061  	errResponseHeaderListSize = errors.New("http2: response header list larger than advertised limit")
  3062  	errRequestHeaderListSize  = httpcommon.ErrRequestHeaderListSize
  3063  )
  3064  
  3065  func (cc *ClientConn) logf(format string, args ...any) {
  3066  	cc.t.logf(format, args...)
  3067  }
  3068  
  3069  func (cc *ClientConn) vlogf(format string, args ...any) {
  3070  	cc.t.vlogf(format, args...)
  3071  }
  3072  
  3073  func (t *Transport) vlogf(format string, args ...any) {
  3074  	if VerboseLogs {
  3075  		t.logf(format, args...)
  3076  	}
  3077  }
  3078  
  3079  func (t *Transport) logf(format string, args ...any) {
  3080  	log.Printf(format, args...)
  3081  }
  3082  
  3083  type missingBody struct{}
  3084  
  3085  func (missingBody) Close() error             { return nil }
  3086  func (missingBody) Read([]byte) (int, error) { return 0, io.ErrUnexpectedEOF }
  3087  
  3088  type erringRoundTripper struct{ err error }
  3089  
  3090  func (rt erringRoundTripper) RoundTripErr() error                               { return rt.err }
  3091  func (rt erringRoundTripper) RoundTrip(*ClientRequest) (*ClientResponse, error) { return nil, rt.err }
  3092  
  3093  var errConcurrentReadOnResBody = errors.New("http2: concurrent read on response body")
  3094  
  3095  // gzipReader wraps a response body so it can lazily
  3096  // get gzip.Reader from the pool on the first call to Read.
  3097  // After Close is called it puts gzip.Reader to the pool immediately
  3098  // if there is no Read in progress or later when Read completes.
  3099  type gzipReader struct {
  3100  	_    incomparable
  3101  	body io.ReadCloser // underlying Response.Body
  3102  	mu   sync.Mutex    // guards zr and zerr
  3103  	zr   *gzip.Reader  // stores gzip reader from the pool between reads
  3104  	zerr error         // sticky gzip reader init error or sentinel value to detect concurrent read and read after close
  3105  }
  3106  
  3107  type eofReader struct{}
  3108  
  3109  func (eofReader) Read([]byte) (int, error) { return 0, io.EOF }
  3110  func (eofReader) ReadByte() (byte, error)  { return 0, io.EOF }
  3111  
  3112  var gzipPool = sync.Pool{New: func() any { return new(gzip.Reader) }}
  3113  
  3114  // gzipPoolGet gets a gzip.Reader from the pool and resets it to read from r.
  3115  func gzipPoolGet(r io.Reader) (*gzip.Reader, error) {
  3116  	zr := gzipPool.Get().(*gzip.Reader)
  3117  	if err := zr.Reset(r); err != nil {
  3118  		gzipPoolPut(zr)
  3119  		return nil, err
  3120  	}
  3121  	return zr, nil
  3122  }
  3123  
  3124  // gzipPoolPut puts a gzip.Reader back into the pool.
  3125  func gzipPoolPut(zr *gzip.Reader) {
  3126  	// Reset will allocate bufio.Reader if we pass it anything
  3127  	// other than a flate.Reader, so ensure that it's getting one.
  3128  	var r flate.Reader = eofReader{}
  3129  	zr.Reset(r)
  3130  	gzipPool.Put(zr)
  3131  }
  3132  
  3133  // acquire returns a gzip.Reader for reading response body.
  3134  // The reader must be released after use.
  3135  func (gz *gzipReader) acquire() (*gzip.Reader, error) {
  3136  	gz.mu.Lock()
  3137  	defer gz.mu.Unlock()
  3138  	if gz.zerr != nil {
  3139  		return nil, gz.zerr
  3140  	}
  3141  	if gz.zr == nil {
  3142  		gz.zr, gz.zerr = gzipPoolGet(gz.body)
  3143  		if gz.zerr != nil {
  3144  			return nil, gz.zerr
  3145  		}
  3146  	}
  3147  	ret := gz.zr
  3148  	gz.zr, gz.zerr = nil, errConcurrentReadOnResBody
  3149  	return ret, nil
  3150  }
  3151  
  3152  // release returns the gzip.Reader to the pool if Close was called during Read.
  3153  func (gz *gzipReader) release(zr *gzip.Reader) {
  3154  	gz.mu.Lock()
  3155  	defer gz.mu.Unlock()
  3156  	if gz.zerr == errConcurrentReadOnResBody {
  3157  		gz.zr, gz.zerr = zr, nil
  3158  	} else { // fs.ErrClosed
  3159  		gzipPoolPut(zr)
  3160  	}
  3161  }
  3162  
  3163  // close returns the gzip.Reader to the pool immediately or
  3164  // signals release to do so after Read completes.
  3165  func (gz *gzipReader) close() {
  3166  	gz.mu.Lock()
  3167  	defer gz.mu.Unlock()
  3168  	if gz.zerr == nil && gz.zr != nil {
  3169  		gzipPoolPut(gz.zr)
  3170  		gz.zr = nil
  3171  	}
  3172  	gz.zerr = fs.ErrClosed
  3173  }
  3174  
  3175  func (gz *gzipReader) Read(p []byte) (n int, err error) {
  3176  	zr, err := gz.acquire()
  3177  	if err != nil {
  3178  		return 0, err
  3179  	}
  3180  	defer gz.release(zr)
  3181  
  3182  	return zr.Read(p)
  3183  }
  3184  
  3185  func (gz *gzipReader) Close() error {
  3186  	gz.close()
  3187  
  3188  	return gz.body.Close()
  3189  }
  3190  
  3191  // isConnectionCloseRequest reports whether req should use its own
  3192  // connection for a single request and then close the connection.
  3193  func isConnectionCloseRequest(req *ClientRequest) bool {
  3194  	return req.Close || httpguts.HeaderValuesContainsToken(req.Header["Connection"], "close")
  3195  }
  3196  
  3197  // netHTTPClientConn wraps ClientConn and implements the interface net/http expects from
  3198  // the RoundTripper returned by NewClientConn.
  3199  type NetHTTPClientConn struct {
  3200  	cc *ClientConn
  3201  }
  3202  
  3203  func (cc NetHTTPClientConn) RoundTrip(req *ClientRequest) (*ClientResponse, error) {
  3204  	return cc.cc.RoundTrip(req)
  3205  }
  3206  
  3207  func (cc NetHTTPClientConn) Close() error {
  3208  	return cc.cc.Close()
  3209  }
  3210  
  3211  func (cc NetHTTPClientConn) Err() error {
  3212  	cc.cc.mu.Lock()
  3213  	defer cc.cc.mu.Unlock()
  3214  	if cc.cc.closed {
  3215  		return errors.New("connection closed")
  3216  	}
  3217  	return nil
  3218  }
  3219  
  3220  func (cc NetHTTPClientConn) Reserve() error {
  3221  	defer cc.cc.maybeCallStateHook()
  3222  	cc.cc.mu.Lock()
  3223  	defer cc.cc.mu.Unlock()
  3224  	if !cc.cc.canReserveLocked() {
  3225  		return errors.New("connection is unavailable")
  3226  	}
  3227  	cc.cc.streamsReserved++
  3228  	return nil
  3229  }
  3230  
  3231  func (cc NetHTTPClientConn) Release() {
  3232  	defer cc.cc.maybeCallStateHook()
  3233  	cc.cc.mu.Lock()
  3234  	defer cc.cc.mu.Unlock()
  3235  	// We don't complain if streamsReserved is 0.
  3236  	//
  3237  	// This is consistent with RoundTrip: both Release and RoundTrip will
  3238  	// consume a reservation iff one exists.
  3239  	if cc.cc.streamsReserved > 0 {
  3240  		cc.cc.streamsReserved--
  3241  	}
  3242  }
  3243  
  3244  func (cc NetHTTPClientConn) Available() int {
  3245  	cc.cc.mu.Lock()
  3246  	defer cc.cc.mu.Unlock()
  3247  	return cc.cc.availableLocked()
  3248  }
  3249  
  3250  func (cc NetHTTPClientConn) InFlight() int {
  3251  	cc.cc.mu.Lock()
  3252  	defer cc.cc.mu.Unlock()
  3253  	return cc.cc.currentRequestCountLocked()
  3254  }
  3255  
  3256  func (cc *ClientConn) maybeCallStateHook() {
  3257  	if cc.internalStateHook != nil {
  3258  		cc.internalStateHook()
  3259  	}
  3260  }
  3261  
  3262  func (t *Transport) idleConnTimeout() time.Duration {
  3263  	// to keep things backwards compatible, we use non-zero values of
  3264  	// IdleConnTimeout, followed by using the IdleConnTimeout on the underlying
  3265  	// http1 transport, followed by 0
  3266  	if t.IdleConnTimeout != 0 {
  3267  		return t.IdleConnTimeout
  3268  	}
  3269  
  3270  	if t.t1 != nil {
  3271  		return t.t1.IdleConnTimeout()
  3272  	}
  3273  
  3274  	return 0
  3275  }
  3276  
  3277  func traceGetConn(req *ClientRequest, hostPort string) {
  3278  	trace := httptrace.ContextClientTrace(req.Context)
  3279  	if trace == nil || trace.GetConn == nil {
  3280  		return
  3281  	}
  3282  	trace.GetConn(hostPort)
  3283  }
  3284  
  3285  func traceGotConn(req *ClientRequest, cc *ClientConn, reused bool) {
  3286  	trace := httptrace.ContextClientTrace(req.Context)
  3287  	if trace == nil || trace.GotConn == nil {
  3288  		return
  3289  	}
  3290  	ci := httptrace.GotConnInfo{Conn: cc.tconn}
  3291  	ci.Reused = reused
  3292  	cc.mu.Lock()
  3293  	ci.WasIdle = len(cc.streams) == 0 && reused
  3294  	if ci.WasIdle && !cc.lastActive.IsZero() {
  3295  		ci.IdleTime = time.Since(cc.lastActive)
  3296  	}
  3297  	cc.mu.Unlock()
  3298  
  3299  	trace.GotConn(ci)
  3300  }
  3301  
  3302  func traceWroteHeaders(trace *httptrace.ClientTrace) {
  3303  	if trace != nil && trace.WroteHeaders != nil {
  3304  		trace.WroteHeaders()
  3305  	}
  3306  }
  3307  
  3308  func traceGot100Continue(trace *httptrace.ClientTrace) {
  3309  	if trace != nil && trace.Got100Continue != nil {
  3310  		trace.Got100Continue()
  3311  	}
  3312  }
  3313  
  3314  func traceWait100Continue(trace *httptrace.ClientTrace) {
  3315  	if trace != nil && trace.Wait100Continue != nil {
  3316  		trace.Wait100Continue()
  3317  	}
  3318  }
  3319  
  3320  func traceWroteRequest(trace *httptrace.ClientTrace, err error) {
  3321  	if trace != nil && trace.WroteRequest != nil {
  3322  		trace.WroteRequest(httptrace.WroteRequestInfo{Err: err})
  3323  	}
  3324  }
  3325  
  3326  func traceFirstResponseByte(trace *httptrace.ClientTrace) {
  3327  	if trace != nil && trace.GotFirstResponseByte != nil {
  3328  		trace.GotFirstResponseByte()
  3329  	}
  3330  }
  3331  
  3332  func traceGot1xxResponseFunc(trace *httptrace.ClientTrace) func(int, textproto.MIMEHeader) error {
  3333  	if trace != nil {
  3334  		return trace.Got1xxResponse
  3335  	}
  3336  	return nil
  3337  }
  3338  
  3339  // dialTLSWithContext uses tls.Dialer, added in Go 1.15, to open a TLS
  3340  // connection.
  3341  func (t *Transport) dialTLSWithContext(ctx context.Context, network, addr string, cfg *tls.Config) (*tls.Conn, error) {
  3342  	dialer := &tls.Dialer{
  3343  		Config: cfg,
  3344  	}
  3345  	cn, err := dialer.DialContext(ctx, network, addr)
  3346  	if err != nil {
  3347  		return nil, err
  3348  	}
  3349  	tlsCn := cn.(*tls.Conn) // DialContext comment promises this will always succeed
  3350  	return tlsCn, nil
  3351  }
  3352  

View as plain text