...
1 package concurrent
2
3 import (
4 "context"
5 "fmt"
6 "runtime"
7 "runtime/debug"
8 "sync"
9 "time"
10 "reflect"
11 )
12
13
14 var HandlePanic = func(recovered interface{}, funcName string) {
15 ErrorLogger.Println(fmt.Sprintf("%s panic: %v", funcName, recovered))
16 ErrorLogger.Println(string(debug.Stack()))
17 }
18
19
20
21 type UnboundedExecutor struct {
22 ctx context.Context
23 cancel context.CancelFunc
24 activeGoroutinesMutex *sync.Mutex
25 activeGoroutines map[string]int
26 HandlePanic func(recovered interface{}, funcName string)
27 }
28
29
30
31
32
33 var GlobalUnboundedExecutor = NewUnboundedExecutor()
34
35
36
37
38 func NewUnboundedExecutor() *UnboundedExecutor {
39 ctx, cancel := context.WithCancel(context.TODO())
40 return &UnboundedExecutor{
41 ctx: ctx,
42 cancel: cancel,
43 activeGoroutinesMutex: &sync.Mutex{},
44 activeGoroutines: map[string]int{},
45 }
46 }
47
48
49
50 func (executor *UnboundedExecutor) Go(handler func(ctx context.Context)) {
51 pc := reflect.ValueOf(handler).Pointer()
52 f := runtime.FuncForPC(pc)
53 funcName := f.Name()
54 file, line := f.FileLine(pc)
55 executor.activeGoroutinesMutex.Lock()
56 defer executor.activeGoroutinesMutex.Unlock()
57 startFrom := fmt.Sprintf("%s:%d", file, line)
58 executor.activeGoroutines[startFrom] += 1
59 go func() {
60 defer func() {
61 recovered := recover()
62
63
64 if recovered != nil {
65 if executor.HandlePanic == nil {
66 HandlePanic(recovered, funcName)
67 } else {
68 executor.HandlePanic(recovered, funcName)
69 }
70 }
71 executor.activeGoroutinesMutex.Lock()
72 executor.activeGoroutines[startFrom] -= 1
73 executor.activeGoroutinesMutex.Unlock()
74 }()
75 handler(executor.ctx)
76 }()
77 }
78
79
80 func (executor *UnboundedExecutor) Stop() {
81 executor.cancel()
82 }
83
84
85
86 func (executor *UnboundedExecutor) StopAndWaitForever() {
87 executor.StopAndWait(context.Background())
88 }
89
90
91
92 func (executor *UnboundedExecutor) StopAndWait(ctx context.Context) {
93 executor.cancel()
94 for {
95 oneHundredMilliseconds := time.NewTimer(time.Millisecond * 100)
96 select {
97 case <-oneHundredMilliseconds.C:
98 if executor.checkNoActiveGoroutines() {
99 return
100 }
101 case <-ctx.Done():
102 return
103 }
104 }
105 }
106
107 func (executor *UnboundedExecutor) checkNoActiveGoroutines() bool {
108 executor.activeGoroutinesMutex.Lock()
109 defer executor.activeGoroutinesMutex.Unlock()
110 for startFrom, count := range executor.activeGoroutines {
111 if count > 0 {
112 InfoLogger.Println("UnboundedExecutor is still waiting goroutines to quit",
113 "startFrom", startFrom,
114 "count", count)
115 return false
116 }
117 }
118 return true
119 }
120
View as plain text