...
1
2
3
4
5
6 package par
7
8 import (
9 "errors"
10 "math/rand"
11 "sync"
12 "sync/atomic"
13 )
14
15
16
17 type Work[T comparable] struct {
18 f func(T)
19 running int
20
21 mu sync.Mutex
22 added map[T]bool
23 todo []T
24 wait sync.Cond
25 waiting int
26 }
27
28 func (w *Work[T]) init() {
29 if w.added == nil {
30 w.added = make(map[T]bool)
31 }
32 }
33
34
35 func (w *Work[T]) Add(item T) {
36 w.mu.Lock()
37 w.init()
38 if !w.added[item] {
39 w.added[item] = true
40 w.todo = append(w.todo, item)
41 if w.waiting > 0 {
42 w.wait.Signal()
43 }
44 }
45 w.mu.Unlock()
46 }
47
48
49
50
51
52
53
54
55 func (w *Work[T]) Do(n int, f func(item T)) {
56 if n < 1 {
57 panic("par.Work.Do: n < 1")
58 }
59 if w.running >= 1 {
60 panic("par.Work.Do: already called Do")
61 }
62
63 w.running = n
64 w.f = f
65 w.wait.L = &w.mu
66
67 for i := 0; i < n-1; i++ {
68 go w.runner()
69 }
70 w.runner()
71 }
72
73
74
75
76 func (w *Work[T]) runner() {
77 for {
78
79 w.mu.Lock()
80 for len(w.todo) == 0 {
81 w.waiting++
82 if w.waiting == w.running {
83
84 w.wait.Broadcast()
85 w.mu.Unlock()
86 return
87 }
88 w.wait.Wait()
89 w.waiting--
90 }
91
92
93
94
95
96 i := rand.Intn(len(w.todo))
97 item := w.todo[i]
98 w.todo[i] = w.todo[len(w.todo)-1]
99 w.todo = w.todo[:len(w.todo)-1]
100 w.mu.Unlock()
101
102 w.f(item)
103 }
104 }
105
106
107
108 type ErrCache[K comparable, V any] struct {
109 Cache[K, errValue[V]]
110 }
111
112 type errValue[V any] struct {
113 v V
114 err error
115 }
116
117 func (c *ErrCache[K, V]) Do(key K, f func() (V, error)) (V, error) {
118 v := c.Cache.Do(key, func() errValue[V] {
119 v, err := f()
120 return errValue[V]{v, err}
121 })
122 return v.v, v.err
123 }
124
125 var ErrCacheEntryNotFound = errors.New("cache entry not found")
126
127
128
129 func (c *ErrCache[K, V]) Get(key K) (V, error) {
130 v, ok := c.Cache.Get(key)
131 if !ok {
132 v.err = ErrCacheEntryNotFound
133 }
134 return v.v, v.err
135 }
136
137
138 type Cache[K comparable, V any] struct {
139 m sync.Map
140 }
141
142 type cacheEntry[V any] struct {
143 done atomic.Bool
144 mu sync.Mutex
145 result V
146 }
147
148
149
150
151 func (c *Cache[K, V]) Do(key K, f func() V) V {
152 entryIface, ok := c.m.Load(key)
153 if !ok {
154 entryIface, _ = c.m.LoadOrStore(key, new(cacheEntry[V]))
155 }
156 e := entryIface.(*cacheEntry[V])
157 if !e.done.Load() {
158 e.mu.Lock()
159 if !e.done.Load() {
160 e.result = f()
161 e.done.Store(true)
162 }
163 e.mu.Unlock()
164 }
165 return e.result
166 }
167
168
169
170
171
172 func (c *Cache[K, V]) Get(key K) (V, bool) {
173 entryIface, ok := c.m.Load(key)
174 if !ok {
175 return *new(V), false
176 }
177 e := entryIface.(*cacheEntry[V])
178 if !e.done.Load() {
179 return *new(V), false
180 }
181 return e.result, true
182 }
183
184
185
186
187
188
189
190
191 func (c *Cache[K, V]) Clear() {
192 c.m.Range(func(key, value any) bool {
193 c.m.Delete(key)
194 return true
195 })
196 }
197
198
199
200
201
202
203
204
205 func (c *Cache[K, V]) Delete(key K) {
206 c.m.Delete(key)
207 }
208
209
210
211
212
213
214
215
216 func (c *Cache[K, V]) DeleteIf(pred func(key K) bool) {
217 c.m.Range(func(key, _ any) bool {
218 if key := key.(K); pred(key) {
219 c.Delete(key)
220 }
221 return true
222 })
223 }
224
View as plain text