...

Source file src/github.com/modern-go/concurrent/unbounded_executor.go

Documentation: github.com/modern-go/concurrent

     1  package concurrent
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"runtime"
     7  	"runtime/debug"
     8  	"sync"
     9  	"time"
    10  	"reflect"
    11  )
    12  
    13  // HandlePanic logs goroutine panic by default
    14  var HandlePanic = func(recovered interface{}, funcName string) {
    15  	ErrorLogger.Println(fmt.Sprintf("%s panic: %v", funcName, recovered))
    16  	ErrorLogger.Println(string(debug.Stack()))
    17  }
    18  
    19  // UnboundedExecutor is a executor without limits on counts of alive goroutines
    20  // it tracks the goroutine started by it, and can cancel them when shutdown
    21  type UnboundedExecutor struct {
    22  	ctx                   context.Context
    23  	cancel                context.CancelFunc
    24  	activeGoroutinesMutex *sync.Mutex
    25  	activeGoroutines      map[string]int
    26  	HandlePanic           func(recovered interface{}, funcName string)
    27  }
    28  
    29  // GlobalUnboundedExecutor has the life cycle of the program itself
    30  // any goroutine want to be shutdown before main exit can be started from this executor
    31  // GlobalUnboundedExecutor expects the main function to call stop
    32  // it does not magically knows the main function exits
    33  var GlobalUnboundedExecutor = NewUnboundedExecutor()
    34  
    35  // NewUnboundedExecutor creates a new UnboundedExecutor,
    36  // UnboundedExecutor can not be created by &UnboundedExecutor{}
    37  // HandlePanic can be set with a callback to override global HandlePanic
    38  func NewUnboundedExecutor() *UnboundedExecutor {
    39  	ctx, cancel := context.WithCancel(context.TODO())
    40  	return &UnboundedExecutor{
    41  		ctx:                   ctx,
    42  		cancel:                cancel,
    43  		activeGoroutinesMutex: &sync.Mutex{},
    44  		activeGoroutines:      map[string]int{},
    45  	}
    46  }
    47  
    48  // Go starts a new goroutine and tracks its lifecycle.
    49  // Panic will be recovered and logged automatically, except for StopSignal
    50  func (executor *UnboundedExecutor) Go(handler func(ctx context.Context)) {
    51  	pc := reflect.ValueOf(handler).Pointer()
    52  	f := runtime.FuncForPC(pc)
    53  	funcName := f.Name()
    54  	file, line := f.FileLine(pc)
    55  	executor.activeGoroutinesMutex.Lock()
    56  	defer executor.activeGoroutinesMutex.Unlock()
    57  	startFrom := fmt.Sprintf("%s:%d", file, line)
    58  	executor.activeGoroutines[startFrom] += 1
    59  	go func() {
    60  		defer func() {
    61  			recovered := recover()
    62  			// if you want to quit a goroutine without trigger HandlePanic
    63  			// use runtime.Goexit() to quit
    64  			if recovered != nil {
    65  				if executor.HandlePanic == nil {
    66  					HandlePanic(recovered, funcName)
    67  				} else {
    68  					executor.HandlePanic(recovered, funcName)
    69  				}
    70  			}
    71  			executor.activeGoroutinesMutex.Lock()
    72  			executor.activeGoroutines[startFrom] -= 1
    73  			executor.activeGoroutinesMutex.Unlock()
    74  		}()
    75  		handler(executor.ctx)
    76  	}()
    77  }
    78  
    79  // Stop cancel all goroutines started by this executor without wait
    80  func (executor *UnboundedExecutor) Stop() {
    81  	executor.cancel()
    82  }
    83  
    84  // StopAndWaitForever cancel all goroutines started by this executor and
    85  // wait until all goroutines exited
    86  func (executor *UnboundedExecutor) StopAndWaitForever() {
    87  	executor.StopAndWait(context.Background())
    88  }
    89  
    90  // StopAndWait cancel all goroutines started by this executor and wait.
    91  // Wait can be cancelled by the context passed in.
    92  func (executor *UnboundedExecutor) StopAndWait(ctx context.Context) {
    93  	executor.cancel()
    94  	for {
    95  		oneHundredMilliseconds := time.NewTimer(time.Millisecond * 100)
    96  		select {
    97  		case <-oneHundredMilliseconds.C:
    98  			if executor.checkNoActiveGoroutines() {
    99  				return
   100  			}
   101  		case <-ctx.Done():
   102  			return
   103  		}
   104  	}
   105  }
   106  
   107  func (executor *UnboundedExecutor) checkNoActiveGoroutines() bool {
   108  	executor.activeGoroutinesMutex.Lock()
   109  	defer executor.activeGoroutinesMutex.Unlock()
   110  	for startFrom, count := range executor.activeGoroutines {
   111  		if count > 0 {
   112  			InfoLogger.Println("UnboundedExecutor is still waiting goroutines to quit",
   113  				"startFrom", startFrom,
   114  				"count", count)
   115  			return false
   116  		}
   117  	}
   118  	return true
   119  }
   120  

View as plain text