Source file src/net/http/netconn_test.go

     1  // Copyright 2024 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package http_test
     6  
     7  import (
     8  	"bytes"
     9  	"context"
    10  	"internal/synctest"
    11  	"io"
    12  	"math"
    13  	"net"
    14  	"net/netip"
    15  	"os"
    16  	"sync"
    17  	"time"
    18  )
    19  
    20  func fakeNetListen() *fakeNetListener {
    21  	li := &fakeNetListener{
    22  		setc:    make(chan struct{}, 1),
    23  		unsetc:  make(chan struct{}, 1),
    24  		addr:    netip.MustParseAddrPort("127.0.0.1:8000"),
    25  		locPort: 10000,
    26  	}
    27  	li.unsetc <- struct{}{}
    28  	return li
    29  }
    30  
    31  type fakeNetListener struct {
    32  	setc, unsetc chan struct{}
    33  	queue        []net.Conn
    34  	closed       bool
    35  	addr         netip.AddrPort
    36  	locPort      uint16
    37  
    38  	onDial  func()             // called when making a new connection
    39  	onClose func(*fakeNetConn) // called when closing a connection
    40  
    41  	trackConns bool // set this to record all created conns
    42  	conns      []*fakeNetConn
    43  }
    44  
    45  func (li *fakeNetListener) lock() {
    46  	select {
    47  	case <-li.setc:
    48  	case <-li.unsetc:
    49  	}
    50  }
    51  
    52  func (li *fakeNetListener) unlock() {
    53  	if li.closed || len(li.queue) > 0 {
    54  		li.setc <- struct{}{}
    55  	} else {
    56  		li.unsetc <- struct{}{}
    57  	}
    58  }
    59  
    60  func (li *fakeNetListener) connect() *fakeNetConn {
    61  	if li.onDial != nil {
    62  		li.onDial()
    63  	}
    64  	li.lock()
    65  	defer li.unlock()
    66  	locAddr := netip.AddrPortFrom(netip.AddrFrom4([4]byte{127, 0, 0, 1}), li.locPort)
    67  	li.locPort++
    68  	c0, c1 := fakeNetPipe(li.addr, locAddr)
    69  	c0.onClose = li.onClose
    70  	c1.onClose = li.onClose
    71  	li.queue = append(li.queue, c0)
    72  	if li.trackConns {
    73  		li.conns = append(li.conns, c0)
    74  	}
    75  	return c1
    76  }
    77  
    78  func (li *fakeNetListener) Accept() (net.Conn, error) {
    79  	<-li.setc
    80  	defer li.unlock()
    81  	if li.closed {
    82  		return nil, net.ErrClosed
    83  	}
    84  	c := li.queue[0]
    85  	li.queue = li.queue[1:]
    86  	return c, nil
    87  }
    88  
    89  func (li *fakeNetListener) Close() error {
    90  	li.lock()
    91  	defer li.unlock()
    92  	li.closed = true
    93  	return nil
    94  }
    95  
    96  func (li *fakeNetListener) Addr() net.Addr {
    97  	return net.TCPAddrFromAddrPort(li.addr)
    98  }
    99  
   100  // fakeNetPipe creates an in-memory, full duplex network connection.
   101  //
   102  // Unlike net.Pipe, the connection is not synchronous.
   103  // Writes are made to a buffer, and return immediately.
   104  // By default, the buffer size is unlimited.
   105  func fakeNetPipe(s1ap, s2ap netip.AddrPort) (r, w *fakeNetConn) {
   106  	s1addr := net.TCPAddrFromAddrPort(s1ap)
   107  	s2addr := net.TCPAddrFromAddrPort(s2ap)
   108  	s1 := newSynctestNetConnHalf(s1addr)
   109  	s2 := newSynctestNetConnHalf(s2addr)
   110  	c1 := &fakeNetConn{loc: s1, rem: s2}
   111  	c2 := &fakeNetConn{loc: s2, rem: s1}
   112  	c1.peer = c2
   113  	c2.peer = c1
   114  	return c1, c2
   115  }
   116  
   117  // A fakeNetConn is one endpoint of the connection created by fakeNetPipe.
   118  type fakeNetConn struct {
   119  	// local and remote connection halves.
   120  	// Each half contains a buffer.
   121  	// Reads pull from the local buffer, and writes push to the remote buffer.
   122  	loc, rem *fakeNetConnHalf
   123  
   124  	// When set, synctest.Wait is automatically called before reads and after writes.
   125  	autoWait bool
   126  
   127  	// peer is the other endpoint.
   128  	peer *fakeNetConn
   129  
   130  	onClose func(*fakeNetConn) // called when closing
   131  }
   132  
   133  // Read reads data from the connection.
   134  func (c *fakeNetConn) Read(b []byte) (n int, err error) {
   135  	if c.autoWait {
   136  		synctest.Wait()
   137  	}
   138  	return c.loc.read(b)
   139  }
   140  
   141  // Peek returns the available unread read buffer,
   142  // without consuming its contents.
   143  func (c *fakeNetConn) Peek() []byte {
   144  	if c.autoWait {
   145  		synctest.Wait()
   146  	}
   147  	return c.loc.peek()
   148  }
   149  
   150  // Write writes data to the connection.
   151  func (c *fakeNetConn) Write(b []byte) (n int, err error) {
   152  	if c.autoWait {
   153  		defer synctest.Wait()
   154  	}
   155  	return c.rem.write(b)
   156  }
   157  
   158  // IsClosed reports whether the peer has closed its end of the connection.
   159  func (c *fakeNetConn) IsClosedByPeer() bool {
   160  	if c.autoWait {
   161  		synctest.Wait()
   162  	}
   163  	c.rem.lock()
   164  	defer c.rem.unlock()
   165  	// If the remote half of the conn is returning ErrClosed,
   166  	// the peer has closed the connection.
   167  	return c.rem.readErr == net.ErrClosed
   168  }
   169  
   170  // Close closes the connection.
   171  func (c *fakeNetConn) Close() error {
   172  	if c.onClose != nil {
   173  		c.onClose(c)
   174  	}
   175  	// Local half of the conn is now closed.
   176  	c.loc.lock()
   177  	c.loc.writeErr = net.ErrClosed
   178  	c.loc.readErr = net.ErrClosed
   179  	c.loc.buf.Reset()
   180  	c.loc.unlock()
   181  	// Remote half of the connection reads EOF after reading any remaining data.
   182  	c.rem.lock()
   183  	if c.rem.readErr != nil {
   184  		c.rem.readErr = io.EOF
   185  	}
   186  	c.rem.unlock()
   187  	if c.autoWait {
   188  		synctest.Wait()
   189  	}
   190  	return nil
   191  }
   192  
   193  // LocalAddr returns the (fake) local network address.
   194  func (c *fakeNetConn) LocalAddr() net.Addr {
   195  	return c.loc.addr
   196  }
   197  
   198  // LocalAddr returns the (fake) remote network address.
   199  func (c *fakeNetConn) RemoteAddr() net.Addr {
   200  	return c.rem.addr
   201  }
   202  
   203  // SetDeadline sets the read and write deadlines for the connection.
   204  func (c *fakeNetConn) SetDeadline(t time.Time) error {
   205  	c.SetReadDeadline(t)
   206  	c.SetWriteDeadline(t)
   207  	return nil
   208  }
   209  
   210  // SetReadDeadline sets the read deadline for the connection.
   211  func (c *fakeNetConn) SetReadDeadline(t time.Time) error {
   212  	c.loc.rctx.setDeadline(t)
   213  	return nil
   214  }
   215  
   216  // SetWriteDeadline sets the write deadline for the connection.
   217  func (c *fakeNetConn) SetWriteDeadline(t time.Time) error {
   218  	c.rem.wctx.setDeadline(t)
   219  	return nil
   220  }
   221  
   222  // SetReadBufferSize sets the read buffer limit for the connection.
   223  // Writes by the peer will block so long as the buffer is full.
   224  func (c *fakeNetConn) SetReadBufferSize(size int) {
   225  	c.loc.setReadBufferSize(size)
   226  }
   227  
   228  // fakeNetConnHalf is one data flow in the connection created by fakeNetPipe.
   229  // Each half contains a buffer. Writes to the half push to the buffer, and reads pull from it.
   230  type fakeNetConnHalf struct {
   231  	addr net.Addr
   232  
   233  	// Read and write timeouts.
   234  	rctx, wctx deadlineContext
   235  
   236  	// A half can be readable and/or writable.
   237  	//
   238  	// These four channels act as a lock,
   239  	// and allow waiting for readability/writability.
   240  	// When the half is unlocked, exactly one channel contains a value.
   241  	// When the half is locked, all channels are empty.
   242  	lockr  chan struct{} // readable
   243  	lockw  chan struct{} // writable
   244  	lockrw chan struct{} // readable and writable
   245  	lockc  chan struct{} // neither readable nor writable
   246  
   247  	bufMax   int // maximum buffer size
   248  	buf      bytes.Buffer
   249  	readErr  error // error returned by reads
   250  	writeErr error // error returned by writes
   251  }
   252  
   253  func newSynctestNetConnHalf(addr net.Addr) *fakeNetConnHalf {
   254  	h := &fakeNetConnHalf{
   255  		addr:   addr,
   256  		lockw:  make(chan struct{}, 1),
   257  		lockr:  make(chan struct{}, 1),
   258  		lockrw: make(chan struct{}, 1),
   259  		lockc:  make(chan struct{}, 1),
   260  		bufMax: math.MaxInt, // unlimited
   261  	}
   262  	h.unlock()
   263  	return h
   264  }
   265  
   266  // lock locks h.
   267  func (h *fakeNetConnHalf) lock() {
   268  	select {
   269  	case <-h.lockw: // writable
   270  	case <-h.lockr: // readable
   271  	case <-h.lockrw: // readable and writable
   272  	case <-h.lockc: // neither readable nor writable
   273  	}
   274  }
   275  
   276  // h unlocks h.
   277  func (h *fakeNetConnHalf) unlock() {
   278  	canRead := h.readErr != nil || h.buf.Len() > 0
   279  	canWrite := h.writeErr != nil || h.bufMax > h.buf.Len()
   280  	switch {
   281  	case canRead && canWrite:
   282  		h.lockrw <- struct{}{} // readable and writable
   283  	case canRead:
   284  		h.lockr <- struct{}{} // readable
   285  	case canWrite:
   286  		h.lockw <- struct{}{} // writable
   287  	default:
   288  		h.lockc <- struct{}{} // neither readable nor writable
   289  	}
   290  }
   291  
   292  // waitAndLockForRead waits until h is readable and locks it.
   293  func (h *fakeNetConnHalf) waitAndLockForRead() error {
   294  	// First a non-blocking select to see if we can make immediate progress.
   295  	// This permits using a canceled context for a non-blocking operation.
   296  	select {
   297  	case <-h.lockr:
   298  		return nil // readable
   299  	case <-h.lockrw:
   300  		return nil // readable and writable
   301  	default:
   302  	}
   303  	ctx := h.rctx.context()
   304  	select {
   305  	case <-h.lockr:
   306  		return nil // readable
   307  	case <-h.lockrw:
   308  		return nil // readable and writable
   309  	case <-ctx.Done():
   310  		return context.Cause(ctx)
   311  	}
   312  }
   313  
   314  // waitAndLockForWrite waits until h is writable and locks it.
   315  func (h *fakeNetConnHalf) waitAndLockForWrite() error {
   316  	// First a non-blocking select to see if we can make immediate progress.
   317  	// This permits using a canceled context for a non-blocking operation.
   318  	select {
   319  	case <-h.lockw:
   320  		return nil // writable
   321  	case <-h.lockrw:
   322  		return nil // readable and writable
   323  	default:
   324  	}
   325  	ctx := h.wctx.context()
   326  	select {
   327  	case <-h.lockw:
   328  		return nil // writable
   329  	case <-h.lockrw:
   330  		return nil // readable and writable
   331  	case <-ctx.Done():
   332  		return context.Cause(ctx)
   333  	}
   334  }
   335  
   336  func (h *fakeNetConnHalf) peek() []byte {
   337  	h.lock()
   338  	defer h.unlock()
   339  	return h.buf.Bytes()
   340  }
   341  
   342  func (h *fakeNetConnHalf) read(b []byte) (n int, err error) {
   343  	if err := h.waitAndLockForRead(); err != nil {
   344  		return 0, err
   345  	}
   346  	defer h.unlock()
   347  	if h.buf.Len() == 0 && h.readErr != nil {
   348  		return 0, h.readErr
   349  	}
   350  	return h.buf.Read(b)
   351  }
   352  
   353  func (h *fakeNetConnHalf) setReadBufferSize(size int) {
   354  	h.lock()
   355  	defer h.unlock()
   356  	h.bufMax = size
   357  }
   358  
   359  func (h *fakeNetConnHalf) write(b []byte) (n int, err error) {
   360  	for n < len(b) {
   361  		nn, err := h.writePartial(b[n:])
   362  		n += nn
   363  		if err != nil {
   364  			return n, err
   365  		}
   366  	}
   367  	return n, nil
   368  }
   369  
   370  func (h *fakeNetConnHalf) writePartial(b []byte) (n int, err error) {
   371  	if err := h.waitAndLockForWrite(); err != nil {
   372  		return 0, err
   373  	}
   374  	defer h.unlock()
   375  	if h.writeErr != nil {
   376  		return 0, h.writeErr
   377  	}
   378  	writeMax := h.bufMax - h.buf.Len()
   379  	if writeMax < len(b) {
   380  		b = b[:writeMax]
   381  	}
   382  	return h.buf.Write(b)
   383  }
   384  
   385  // deadlineContext converts a changable deadline (as in net.Conn.SetDeadline) into a Context.
   386  type deadlineContext struct {
   387  	mu     sync.Mutex
   388  	ctx    context.Context
   389  	cancel context.CancelCauseFunc
   390  	timer  *time.Timer
   391  }
   392  
   393  // context returns a Context which expires when the deadline does.
   394  func (t *deadlineContext) context() context.Context {
   395  	t.mu.Lock()
   396  	defer t.mu.Unlock()
   397  	if t.ctx == nil {
   398  		t.ctx, t.cancel = context.WithCancelCause(context.Background())
   399  	}
   400  	return t.ctx
   401  }
   402  
   403  // setDeadline sets the current deadline.
   404  func (t *deadlineContext) setDeadline(deadline time.Time) {
   405  	t.mu.Lock()
   406  	defer t.mu.Unlock()
   407  	// If t.ctx is non-nil and t.cancel is nil, then t.ctx was canceled
   408  	// and we should create a new one.
   409  	if t.ctx == nil || t.cancel == nil {
   410  		t.ctx, t.cancel = context.WithCancelCause(context.Background())
   411  	}
   412  	// Stop any existing deadline from expiring.
   413  	if t.timer != nil {
   414  		t.timer.Stop()
   415  	}
   416  	if deadline.IsZero() {
   417  		// No deadline.
   418  		return
   419  	}
   420  	now := time.Now()
   421  	if !deadline.After(now) {
   422  		// Deadline has already expired.
   423  		t.cancel(os.ErrDeadlineExceeded)
   424  		t.cancel = nil
   425  		return
   426  	}
   427  	if t.timer != nil {
   428  		// Reuse existing deadline timer.
   429  		t.timer.Reset(deadline.Sub(now))
   430  		return
   431  	}
   432  	// Create a new timer to cancel the context at the deadline.
   433  	t.timer = time.AfterFunc(deadline.Sub(now), func() {
   434  		t.mu.Lock()
   435  		defer t.mu.Unlock()
   436  		t.cancel(os.ErrDeadlineExceeded)
   437  		t.cancel = nil
   438  	})
   439  }
   440  

View as plain text