Source file src/sync/pool_test.go

     1  // Copyright 2013 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  // Pool is no-op under race detector, so all these tests do not work.
     6  //
     7  //go:build !race
     8  
     9  package sync_test
    10  
    11  import (
    12  	"runtime"
    13  	"runtime/debug"
    14  	"slices"
    15  	. "sync"
    16  	"sync/atomic"
    17  	"testing"
    18  	"time"
    19  )
    20  
    21  func TestPool(t *testing.T) {
    22  	// disable GC so we can control when it happens.
    23  	defer debug.SetGCPercent(debug.SetGCPercent(-1))
    24  	var p Pool
    25  	if p.Get() != nil {
    26  		t.Fatal("expected empty")
    27  	}
    28  
    29  	// Make sure that the goroutine doesn't migrate to another P
    30  	// between Put and Get calls.
    31  	Runtime_procPin()
    32  	p.Put("a")
    33  	p.Put("b")
    34  	if g := p.Get(); g != "a" {
    35  		t.Fatalf("got %#v; want a", g)
    36  	}
    37  	if g := p.Get(); g != "b" {
    38  		t.Fatalf("got %#v; want b", g)
    39  	}
    40  	if g := p.Get(); g != nil {
    41  		t.Fatalf("got %#v; want nil", g)
    42  	}
    43  	Runtime_procUnpin()
    44  
    45  	// Put in a large number of objects so they spill into
    46  	// stealable space.
    47  	for i := 0; i < 100; i++ {
    48  		p.Put("c")
    49  	}
    50  	// After one GC, the victim cache should keep them alive.
    51  	runtime.GC()
    52  	if g := p.Get(); g != "c" {
    53  		t.Fatalf("got %#v; want c after GC", g)
    54  	}
    55  	// A second GC should drop the victim cache.
    56  	runtime.GC()
    57  	if g := p.Get(); g != nil {
    58  		t.Fatalf("got %#v; want nil after second GC", g)
    59  	}
    60  }
    61  
    62  func TestPoolNew(t *testing.T) {
    63  	// disable GC so we can control when it happens.
    64  	defer debug.SetGCPercent(debug.SetGCPercent(-1))
    65  
    66  	i := 0
    67  	p := Pool{
    68  		New: func() any {
    69  			i++
    70  			return i
    71  		},
    72  	}
    73  	if v := p.Get(); v != 1 {
    74  		t.Fatalf("got %v; want 1", v)
    75  	}
    76  	if v := p.Get(); v != 2 {
    77  		t.Fatalf("got %v; want 2", v)
    78  	}
    79  
    80  	// Make sure that the goroutine doesn't migrate to another P
    81  	// between Put and Get calls.
    82  	Runtime_procPin()
    83  	p.Put(42)
    84  	if v := p.Get(); v != 42 {
    85  		t.Fatalf("got %v; want 42", v)
    86  	}
    87  	Runtime_procUnpin()
    88  
    89  	if v := p.Get(); v != 3 {
    90  		t.Fatalf("got %v; want 3", v)
    91  	}
    92  }
    93  
    94  // Test that Pool does not hold pointers to previously cached resources.
    95  func TestPoolGC(t *testing.T) {
    96  	testPool(t, true)
    97  }
    98  
    99  // Test that Pool releases resources on GC.
   100  func TestPoolRelease(t *testing.T) {
   101  	testPool(t, false)
   102  }
   103  
   104  func testPool(t *testing.T, drain bool) {
   105  	if drain {
   106  		// Run with GOMAXPROCS=1 if drain is set. The code below implicitly
   107  		// assumes it can remove all the pool-cached values with cleanups
   108  		// with Get, but this isn't necessarily true if a value gets stuck
   109  		// in the private slot for some P. This is especially likely when
   110  		// running with mayMoreStackPreempt. We can make this exact, however,
   111  		// by setting GOMAXPROCS to 1, so there's only 1 P. This is fine for
   112  		// this test, since we're not trying to check any concurrent properties
   113  		// of Pool anyway.
   114  		defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(1))
   115  	}
   116  
   117  	var p Pool
   118  	const N = 100
   119  	for try := 0; try < 3; try++ {
   120  		if try == 1 && testing.Short() {
   121  			break
   122  		}
   123  		var cln, cln1 uint32
   124  		for i := 0; i < N; i++ {
   125  			v := new(string)
   126  			runtime.AddCleanup(v, func(f *uint32) { atomic.AddUint32(f, 1) }, &cln)
   127  			p.Put(v)
   128  		}
   129  		if drain {
   130  			for i := 0; i < N; i++ {
   131  				p.Get()
   132  			}
   133  		} else {
   134  			// Run an extra GC cycles to drop items from the pool.
   135  			runtime.GC()
   136  		}
   137  
   138  		// Run a GC and wait for all the cleanups to run.
   139  		runtime.GC()
   140  		runtime_blockUntilEmptyCleanupQueue(int64(5 * time.Second))
   141  
   142  		// 1 pointer can remain on stack or elsewhere
   143  		if cln1 = atomic.LoadUint32(&cln); cln1 < N-1 {
   144  			t.Fatalf("only %v out of %v resources are cleaned up on try %v", cln1, N, try)
   145  		}
   146  	}
   147  }
   148  
   149  func TestPoolStress(t *testing.T) {
   150  	const P = 10
   151  	N := int(1e6)
   152  	if testing.Short() {
   153  		N /= 100
   154  	}
   155  	var p Pool
   156  	done := make(chan bool)
   157  	for i := 0; i < P; i++ {
   158  		go func() {
   159  			var v any = 0
   160  			for j := 0; j < N; j++ {
   161  				if v == nil {
   162  					v = 0
   163  				}
   164  				p.Put(v)
   165  				v = p.Get()
   166  				if v != nil && v.(int) != 0 {
   167  					t.Errorf("expect 0, got %v", v)
   168  					break
   169  				}
   170  			}
   171  			done <- true
   172  		}()
   173  	}
   174  	for i := 0; i < P; i++ {
   175  		<-done
   176  	}
   177  }
   178  
   179  func TestPoolDequeue(t *testing.T) {
   180  	testPoolDequeue(t, NewPoolDequeue(16))
   181  }
   182  
   183  func TestPoolChain(t *testing.T) {
   184  	testPoolDequeue(t, NewPoolChain())
   185  }
   186  
   187  func testPoolDequeue(t *testing.T, d PoolDequeue) {
   188  	const P = 10
   189  	var N int = 2e6
   190  	if testing.Short() {
   191  		N = 1e3
   192  	}
   193  	have := make([]int32, N)
   194  	var stop int32
   195  	var wg WaitGroup
   196  	record := func(val int) {
   197  		atomic.AddInt32(&have[val], 1)
   198  		if val == N-1 {
   199  			atomic.StoreInt32(&stop, 1)
   200  		}
   201  	}
   202  
   203  	// Start P-1 consumers.
   204  	for i := 1; i < P; i++ {
   205  		wg.Add(1)
   206  		go func() {
   207  			fail := 0
   208  			for atomic.LoadInt32(&stop) == 0 {
   209  				val, ok := d.PopTail()
   210  				if ok {
   211  					fail = 0
   212  					record(val.(int))
   213  				} else {
   214  					// Speed up the test by
   215  					// allowing the pusher to run.
   216  					if fail++; fail%100 == 0 {
   217  						runtime.Gosched()
   218  					}
   219  				}
   220  			}
   221  			wg.Done()
   222  		}()
   223  	}
   224  
   225  	// Start 1 producer.
   226  	nPopHead := 0
   227  	wg.Add(1)
   228  	go func() {
   229  		for j := 0; j < N; j++ {
   230  			for !d.PushHead(j) {
   231  				// Allow a popper to run.
   232  				runtime.Gosched()
   233  			}
   234  			if j%10 == 0 {
   235  				val, ok := d.PopHead()
   236  				if ok {
   237  					nPopHead++
   238  					record(val.(int))
   239  				}
   240  			}
   241  		}
   242  		wg.Done()
   243  	}()
   244  	wg.Wait()
   245  
   246  	// Check results.
   247  	for i, count := range have {
   248  		if count != 1 {
   249  			t.Errorf("expected have[%d] = 1, got %d", i, count)
   250  		}
   251  	}
   252  	// Check that at least some PopHeads succeeded. We skip this
   253  	// check in short mode because it's common enough that the
   254  	// queue will stay nearly empty all the time and a PopTail
   255  	// will happen during the window between every PushHead and
   256  	// PopHead.
   257  	if !testing.Short() && nPopHead == 0 {
   258  		t.Errorf("popHead never succeeded")
   259  	}
   260  }
   261  
   262  func TestNilPool(t *testing.T) {
   263  	catch := func() {
   264  		if recover() == nil {
   265  			t.Error("expected panic")
   266  		}
   267  	}
   268  
   269  	var p *Pool
   270  	t.Run("Get", func(t *testing.T) {
   271  		defer catch()
   272  		if p.Get() != nil {
   273  			t.Error("expected empty")
   274  		}
   275  		t.Error("should have panicked already")
   276  	})
   277  	t.Run("Put", func(t *testing.T) {
   278  		defer catch()
   279  		p.Put("a")
   280  		t.Error("should have panicked already")
   281  	})
   282  }
   283  
   284  func BenchmarkPool(b *testing.B) {
   285  	var p Pool
   286  	b.RunParallel(func(pb *testing.PB) {
   287  		for pb.Next() {
   288  			p.Put(1)
   289  			p.Get()
   290  		}
   291  	})
   292  }
   293  
   294  func BenchmarkPoolOverflow(b *testing.B) {
   295  	var p Pool
   296  	b.RunParallel(func(pb *testing.PB) {
   297  		for pb.Next() {
   298  			for b := 0; b < 100; b++ {
   299  				p.Put(1)
   300  			}
   301  			for b := 0; b < 100; b++ {
   302  				p.Get()
   303  			}
   304  		}
   305  	})
   306  }
   307  
   308  // Simulate object starvation in order to force Ps to steal objects
   309  // from other Ps.
   310  func BenchmarkPoolStarvation(b *testing.B) {
   311  	var p Pool
   312  	count := 100
   313  	// Reduce number of putted objects by 33 %. It creates objects starvation
   314  	// that force P-local storage to steal objects from other Ps.
   315  	countStarved := count - int(float32(count)*0.33)
   316  	b.RunParallel(func(pb *testing.PB) {
   317  		for pb.Next() {
   318  			for b := 0; b < countStarved; b++ {
   319  				p.Put(1)
   320  			}
   321  			for b := 0; b < count; b++ {
   322  				p.Get()
   323  			}
   324  		}
   325  	})
   326  }
   327  
   328  var globalSink any
   329  
   330  func BenchmarkPoolSTW(b *testing.B) {
   331  	// Take control of GC.
   332  	defer debug.SetGCPercent(debug.SetGCPercent(-1))
   333  
   334  	var mstats runtime.MemStats
   335  	var pauses []uint64
   336  
   337  	var p Pool
   338  	for i := 0; i < b.N; i++ {
   339  		// Put a large number of items into a pool.
   340  		const N = 100000
   341  		var item any = 42
   342  		for i := 0; i < N; i++ {
   343  			p.Put(item)
   344  		}
   345  		// Do a GC.
   346  		runtime.GC()
   347  		// Record pause time.
   348  		runtime.ReadMemStats(&mstats)
   349  		pauses = append(pauses, mstats.PauseNs[(mstats.NumGC+255)%256])
   350  	}
   351  
   352  	// Get pause time stats.
   353  	slices.Sort(pauses)
   354  	var total uint64
   355  	for _, ns := range pauses {
   356  		total += ns
   357  	}
   358  	// ns/op for this benchmark is average STW time.
   359  	b.ReportMetric(float64(total)/float64(b.N), "ns/op")
   360  	b.ReportMetric(float64(pauses[len(pauses)*95/100]), "p95-ns/STW")
   361  	b.ReportMetric(float64(pauses[len(pauses)*50/100]), "p50-ns/STW")
   362  }
   363  
   364  func BenchmarkPoolExpensiveNew(b *testing.B) {
   365  	// Populate a pool with items that are expensive to construct
   366  	// to stress pool cleanup and subsequent reconstruction.
   367  
   368  	// Create a ballast so the GC has a non-zero heap size and
   369  	// runs at reasonable times.
   370  	globalSink = make([]byte, 8<<20)
   371  	defer func() { globalSink = nil }()
   372  
   373  	// Create a pool that's "expensive" to fill.
   374  	var p Pool
   375  	var nNew uint64
   376  	p.New = func() any {
   377  		atomic.AddUint64(&nNew, 1)
   378  		time.Sleep(time.Millisecond)
   379  		return 42
   380  	}
   381  	var mstats1, mstats2 runtime.MemStats
   382  	runtime.ReadMemStats(&mstats1)
   383  	b.RunParallel(func(pb *testing.PB) {
   384  		// Simulate 100X the number of goroutines having items
   385  		// checked out from the Pool simultaneously.
   386  		items := make([]any, 100)
   387  		var sink []byte
   388  		for pb.Next() {
   389  			// Stress the pool.
   390  			for i := range items {
   391  				items[i] = p.Get()
   392  				// Simulate doing some work with this
   393  				// item checked out.
   394  				sink = make([]byte, 32<<10)
   395  			}
   396  			for i, v := range items {
   397  				p.Put(v)
   398  				items[i] = nil
   399  			}
   400  		}
   401  		_ = sink
   402  	})
   403  	runtime.ReadMemStats(&mstats2)
   404  
   405  	b.ReportMetric(float64(mstats2.NumGC-mstats1.NumGC)/float64(b.N), "GCs/op")
   406  	b.ReportMetric(float64(nNew)/float64(b.N), "New/op")
   407  }
   408  

View as plain text