...

Source file src/sync/pool_test.go

Documentation: sync

     1  // Copyright 2013 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  // Pool is no-op under race detector, so all these tests do not work.
     6  //
     7  //go:build !race
     8  
     9  package sync_test
    10  
    11  import (
    12  	"runtime"
    13  	"runtime/debug"
    14  	"sort"
    15  	. "sync"
    16  	"sync/atomic"
    17  	"testing"
    18  	"time"
    19  )
    20  
    21  func TestPool(t *testing.T) {
    22  	// disable GC so we can control when it happens.
    23  	defer debug.SetGCPercent(debug.SetGCPercent(-1))
    24  	var p Pool
    25  	if p.Get() != nil {
    26  		t.Fatal("expected empty")
    27  	}
    28  
    29  	// Make sure that the goroutine doesn't migrate to another P
    30  	// between Put and Get calls.
    31  	Runtime_procPin()
    32  	p.Put("a")
    33  	p.Put("b")
    34  	if g := p.Get(); g != "a" {
    35  		t.Fatalf("got %#v; want a", g)
    36  	}
    37  	if g := p.Get(); g != "b" {
    38  		t.Fatalf("got %#v; want b", g)
    39  	}
    40  	if g := p.Get(); g != nil {
    41  		t.Fatalf("got %#v; want nil", g)
    42  	}
    43  	Runtime_procUnpin()
    44  
    45  	// Put in a large number of objects so they spill into
    46  	// stealable space.
    47  	for i := 0; i < 100; i++ {
    48  		p.Put("c")
    49  	}
    50  	// After one GC, the victim cache should keep them alive.
    51  	runtime.GC()
    52  	if g := p.Get(); g != "c" {
    53  		t.Fatalf("got %#v; want c after GC", g)
    54  	}
    55  	// A second GC should drop the victim cache.
    56  	runtime.GC()
    57  	if g := p.Get(); g != nil {
    58  		t.Fatalf("got %#v; want nil after second GC", g)
    59  	}
    60  }
    61  
    62  func TestPoolNew(t *testing.T) {
    63  	// disable GC so we can control when it happens.
    64  	defer debug.SetGCPercent(debug.SetGCPercent(-1))
    65  
    66  	i := 0
    67  	p := Pool{
    68  		New: func() any {
    69  			i++
    70  			return i
    71  		},
    72  	}
    73  	if v := p.Get(); v != 1 {
    74  		t.Fatalf("got %v; want 1", v)
    75  	}
    76  	if v := p.Get(); v != 2 {
    77  		t.Fatalf("got %v; want 2", v)
    78  	}
    79  
    80  	// Make sure that the goroutine doesn't migrate to another P
    81  	// between Put and Get calls.
    82  	Runtime_procPin()
    83  	p.Put(42)
    84  	if v := p.Get(); v != 42 {
    85  		t.Fatalf("got %v; want 42", v)
    86  	}
    87  	Runtime_procUnpin()
    88  
    89  	if v := p.Get(); v != 3 {
    90  		t.Fatalf("got %v; want 3", v)
    91  	}
    92  }
    93  
    94  // Test that Pool does not hold pointers to previously cached resources.
    95  func TestPoolGC(t *testing.T) {
    96  	testPool(t, true)
    97  }
    98  
    99  // Test that Pool releases resources on GC.
   100  func TestPoolRelease(t *testing.T) {
   101  	testPool(t, false)
   102  }
   103  
   104  func testPool(t *testing.T, drain bool) {
   105  	var p Pool
   106  	const N = 100
   107  loop:
   108  	for try := 0; try < 3; try++ {
   109  		if try == 1 && testing.Short() {
   110  			break
   111  		}
   112  		var fin, fin1 uint32
   113  		for i := 0; i < N; i++ {
   114  			v := new(string)
   115  			runtime.SetFinalizer(v, func(vv *string) {
   116  				atomic.AddUint32(&fin, 1)
   117  			})
   118  			p.Put(v)
   119  		}
   120  		if drain {
   121  			for i := 0; i < N; i++ {
   122  				p.Get()
   123  			}
   124  		}
   125  		for i := 0; i < 5; i++ {
   126  			runtime.GC()
   127  			time.Sleep(time.Duration(i*100+10) * time.Millisecond)
   128  			// 1 pointer can remain on stack or elsewhere
   129  			if fin1 = atomic.LoadUint32(&fin); fin1 >= N-1 {
   130  				continue loop
   131  			}
   132  		}
   133  		t.Fatalf("only %v out of %v resources are finalized on try %v", fin1, N, try)
   134  	}
   135  }
   136  
   137  func TestPoolStress(t *testing.T) {
   138  	const P = 10
   139  	N := int(1e6)
   140  	if testing.Short() {
   141  		N /= 100
   142  	}
   143  	var p Pool
   144  	done := make(chan bool)
   145  	for i := 0; i < P; i++ {
   146  		go func() {
   147  			var v any = 0
   148  			for j := 0; j < N; j++ {
   149  				if v == nil {
   150  					v = 0
   151  				}
   152  				p.Put(v)
   153  				v = p.Get()
   154  				if v != nil && v.(int) != 0 {
   155  					t.Errorf("expect 0, got %v", v)
   156  					break
   157  				}
   158  			}
   159  			done <- true
   160  		}()
   161  	}
   162  	for i := 0; i < P; i++ {
   163  		<-done
   164  	}
   165  }
   166  
   167  func TestPoolDequeue(t *testing.T) {
   168  	testPoolDequeue(t, NewPoolDequeue(16))
   169  }
   170  
   171  func TestPoolChain(t *testing.T) {
   172  	testPoolDequeue(t, NewPoolChain())
   173  }
   174  
   175  func testPoolDequeue(t *testing.T, d PoolDequeue) {
   176  	const P = 10
   177  	var N int = 2e6
   178  	if testing.Short() {
   179  		N = 1e3
   180  	}
   181  	have := make([]int32, N)
   182  	var stop int32
   183  	var wg WaitGroup
   184  	record := func(val int) {
   185  		atomic.AddInt32(&have[val], 1)
   186  		if val == N-1 {
   187  			atomic.StoreInt32(&stop, 1)
   188  		}
   189  	}
   190  
   191  	// Start P-1 consumers.
   192  	for i := 1; i < P; i++ {
   193  		wg.Add(1)
   194  		go func() {
   195  			fail := 0
   196  			for atomic.LoadInt32(&stop) == 0 {
   197  				val, ok := d.PopTail()
   198  				if ok {
   199  					fail = 0
   200  					record(val.(int))
   201  				} else {
   202  					// Speed up the test by
   203  					// allowing the pusher to run.
   204  					if fail++; fail%100 == 0 {
   205  						runtime.Gosched()
   206  					}
   207  				}
   208  			}
   209  			wg.Done()
   210  		}()
   211  	}
   212  
   213  	// Start 1 producer.
   214  	nPopHead := 0
   215  	wg.Add(1)
   216  	go func() {
   217  		for j := 0; j < N; j++ {
   218  			for !d.PushHead(j) {
   219  				// Allow a popper to run.
   220  				runtime.Gosched()
   221  			}
   222  			if j%10 == 0 {
   223  				val, ok := d.PopHead()
   224  				if ok {
   225  					nPopHead++
   226  					record(val.(int))
   227  				}
   228  			}
   229  		}
   230  		wg.Done()
   231  	}()
   232  	wg.Wait()
   233  
   234  	// Check results.
   235  	for i, count := range have {
   236  		if count != 1 {
   237  			t.Errorf("expected have[%d] = 1, got %d", i, count)
   238  		}
   239  	}
   240  	// Check that at least some PopHeads succeeded. We skip this
   241  	// check in short mode because it's common enough that the
   242  	// queue will stay nearly empty all the time and a PopTail
   243  	// will happen during the window between every PushHead and
   244  	// PopHead.
   245  	if !testing.Short() && nPopHead == 0 {
   246  		t.Errorf("popHead never succeeded")
   247  	}
   248  }
   249  
   250  func TestNilPool(t *testing.T) {
   251  	catch := func() {
   252  		if recover() == nil {
   253  			t.Error("expected panic")
   254  		}
   255  	}
   256  
   257  	var p *Pool
   258  	t.Run("Get", func(t *testing.T) {
   259  		defer catch()
   260  		if p.Get() != nil {
   261  			t.Error("expected empty")
   262  		}
   263  		t.Error("should have panicked already")
   264  	})
   265  	t.Run("Put", func(t *testing.T) {
   266  		defer catch()
   267  		p.Put("a")
   268  		t.Error("should have panicked already")
   269  	})
   270  }
   271  
   272  func BenchmarkPool(b *testing.B) {
   273  	var p Pool
   274  	b.RunParallel(func(pb *testing.PB) {
   275  		for pb.Next() {
   276  			p.Put(1)
   277  			p.Get()
   278  		}
   279  	})
   280  }
   281  
   282  func BenchmarkPoolOverflow(b *testing.B) {
   283  	var p Pool
   284  	b.RunParallel(func(pb *testing.PB) {
   285  		for pb.Next() {
   286  			for b := 0; b < 100; b++ {
   287  				p.Put(1)
   288  			}
   289  			for b := 0; b < 100; b++ {
   290  				p.Get()
   291  			}
   292  		}
   293  	})
   294  }
   295  
   296  // Simulate object starvation in order to force Ps to steal objects
   297  // from other Ps.
   298  func BenchmarkPoolStarvation(b *testing.B) {
   299  	var p Pool
   300  	count := 100
   301  	// Reduce number of putted objects by 33 %. It creates objects starvation
   302  	// that force P-local storage to steal objects from other Ps.
   303  	countStarved := count - int(float32(count)*0.33)
   304  	b.RunParallel(func(pb *testing.PB) {
   305  		for pb.Next() {
   306  			for b := 0; b < countStarved; b++ {
   307  				p.Put(1)
   308  			}
   309  			for b := 0; b < count; b++ {
   310  				p.Get()
   311  			}
   312  		}
   313  	})
   314  }
   315  
   316  var globalSink any
   317  
   318  func BenchmarkPoolSTW(b *testing.B) {
   319  	// Take control of GC.
   320  	defer debug.SetGCPercent(debug.SetGCPercent(-1))
   321  
   322  	var mstats runtime.MemStats
   323  	var pauses []uint64
   324  
   325  	var p Pool
   326  	for i := 0; i < b.N; i++ {
   327  		// Put a large number of items into a pool.
   328  		const N = 100000
   329  		var item any = 42
   330  		for i := 0; i < N; i++ {
   331  			p.Put(item)
   332  		}
   333  		// Do a GC.
   334  		runtime.GC()
   335  		// Record pause time.
   336  		runtime.ReadMemStats(&mstats)
   337  		pauses = append(pauses, mstats.PauseNs[(mstats.NumGC+255)%256])
   338  	}
   339  
   340  	// Get pause time stats.
   341  	sort.Slice(pauses, func(i, j int) bool { return pauses[i] < pauses[j] })
   342  	var total uint64
   343  	for _, ns := range pauses {
   344  		total += ns
   345  	}
   346  	// ns/op for this benchmark is average STW time.
   347  	b.ReportMetric(float64(total)/float64(b.N), "ns/op")
   348  	b.ReportMetric(float64(pauses[len(pauses)*95/100]), "p95-ns/STW")
   349  	b.ReportMetric(float64(pauses[len(pauses)*50/100]), "p50-ns/STW")
   350  }
   351  
   352  func BenchmarkPoolExpensiveNew(b *testing.B) {
   353  	// Populate a pool with items that are expensive to construct
   354  	// to stress pool cleanup and subsequent reconstruction.
   355  
   356  	// Create a ballast so the GC has a non-zero heap size and
   357  	// runs at reasonable times.
   358  	globalSink = make([]byte, 8<<20)
   359  	defer func() { globalSink = nil }()
   360  
   361  	// Create a pool that's "expensive" to fill.
   362  	var p Pool
   363  	var nNew uint64
   364  	p.New = func() any {
   365  		atomic.AddUint64(&nNew, 1)
   366  		time.Sleep(time.Millisecond)
   367  		return 42
   368  	}
   369  	var mstats1, mstats2 runtime.MemStats
   370  	runtime.ReadMemStats(&mstats1)
   371  	b.RunParallel(func(pb *testing.PB) {
   372  		// Simulate 100X the number of goroutines having items
   373  		// checked out from the Pool simultaneously.
   374  		items := make([]any, 100)
   375  		var sink []byte
   376  		for pb.Next() {
   377  			// Stress the pool.
   378  			for i := range items {
   379  				items[i] = p.Get()
   380  				// Simulate doing some work with this
   381  				// item checked out.
   382  				sink = make([]byte, 32<<10)
   383  			}
   384  			for i, v := range items {
   385  				p.Put(v)
   386  				items[i] = nil
   387  			}
   388  		}
   389  		_ = sink
   390  	})
   391  	runtime.ReadMemStats(&mstats2)
   392  
   393  	b.ReportMetric(float64(mstats2.NumGC-mstats1.NumGC)/float64(b.N), "GCs/op")
   394  	b.ReportMetric(float64(nNew)/float64(b.N), "New/op")
   395  }
   396  

View as plain text