1
2
3
4
5
6 package timeseries
7
8 import (
9 "fmt"
10 "log"
11 "time"
12 )
13
14 const (
15 timeSeriesNumBuckets = 64
16 minuteHourSeriesNumBuckets = 60
17 )
18
19 var timeSeriesResolutions = []time.Duration{
20 1 * time.Second,
21 10 * time.Second,
22 1 * time.Minute,
23 10 * time.Minute,
24 1 * time.Hour,
25 6 * time.Hour,
26 24 * time.Hour,
27 7 * 24 * time.Hour,
28 4 * 7 * 24 * time.Hour,
29 16 * 7 * 24 * time.Hour,
30 }
31
32 var minuteHourSeriesResolutions = []time.Duration{
33 1 * time.Second,
34 1 * time.Minute,
35 }
36
37
38 type Observable interface {
39 Multiply(ratio float64)
40 Add(other Observable)
41 Clear()
42 CopyFrom(other Observable)
43 }
44
45
46 type Float float64
47
48
49 func NewFloat() Observable {
50 f := Float(0)
51 return &f
52 }
53
54
55 func (f *Float) String() string { return fmt.Sprintf("%g", f.Value()) }
56
57
58 func (f *Float) Value() float64 { return float64(*f) }
59
60 func (f *Float) Multiply(ratio float64) { *f *= Float(ratio) }
61
62 func (f *Float) Add(other Observable) {
63 o := other.(*Float)
64 *f += *o
65 }
66
67 func (f *Float) Clear() { *f = 0 }
68
69 func (f *Float) CopyFrom(other Observable) {
70 o := other.(*Float)
71 *f = *o
72 }
73
74
75 type Clock interface {
76 Time() time.Time
77 }
78
79 type defaultClock int
80
81 var defaultClockInstance defaultClock
82
83 func (defaultClock) Time() time.Time { return time.Now() }
84
85
86
87
88 type tsLevel struct {
89 oldest int
90 newest int
91 end time.Time
92 size time.Duration
93 buckets []Observable
94 provider func() Observable
95 }
96
97 func (l *tsLevel) Clear() {
98 l.oldest = 0
99 l.newest = len(l.buckets) - 1
100 l.end = time.Time{}
101 for i := range l.buckets {
102 if l.buckets[i] != nil {
103 l.buckets[i].Clear()
104 l.buckets[i] = nil
105 }
106 }
107 }
108
109 func (l *tsLevel) InitLevel(size time.Duration, numBuckets int, f func() Observable) {
110 l.size = size
111 l.provider = f
112 l.buckets = make([]Observable, numBuckets)
113 }
114
115
116
117
118
119
120
121
122
123 type timeSeries struct {
124 provider func() Observable
125 numBuckets int
126 levels []*tsLevel
127 lastAdd time.Time
128 total Observable
129 clock Clock
130 pending Observable
131 pendingTime time.Time
132 dirty bool
133 }
134
135
136 func (ts *timeSeries) init(resolutions []time.Duration, f func() Observable, numBuckets int, clock Clock) {
137 ts.provider = f
138 ts.numBuckets = numBuckets
139 ts.clock = clock
140 ts.levels = make([]*tsLevel, len(resolutions))
141
142 for i := range resolutions {
143 if i > 0 && resolutions[i-1] >= resolutions[i] {
144 log.Print("timeseries: resolutions must be monotonically increasing")
145 break
146 }
147 newLevel := new(tsLevel)
148 newLevel.InitLevel(resolutions[i], ts.numBuckets, ts.provider)
149 ts.levels[i] = newLevel
150 }
151
152 ts.Clear()
153 }
154
155
156 func (ts *timeSeries) Clear() {
157 ts.lastAdd = time.Time{}
158 ts.total = ts.resetObservation(ts.total)
159 ts.pending = ts.resetObservation(ts.pending)
160 ts.pendingTime = time.Time{}
161 ts.dirty = false
162
163 for i := range ts.levels {
164 ts.levels[i].Clear()
165 }
166 }
167
168
169 func (ts *timeSeries) Add(observation Observable) {
170 ts.AddWithTime(observation, ts.clock.Time())
171 }
172
173
174 func (ts *timeSeries) AddWithTime(observation Observable, t time.Time) {
175
176 smallBucketDuration := ts.levels[0].size
177
178 if t.After(ts.lastAdd) {
179 ts.lastAdd = t
180 }
181
182 if t.After(ts.pendingTime) {
183 ts.advance(t)
184 ts.mergePendingUpdates()
185 ts.pendingTime = ts.levels[0].end
186 ts.pending.CopyFrom(observation)
187 ts.dirty = true
188 } else if t.After(ts.pendingTime.Add(-1 * smallBucketDuration)) {
189
190
191
192 ts.pending.Add(observation)
193 ts.dirty = true
194 } else {
195 ts.mergeValue(observation, t)
196 }
197 }
198
199
200 func (ts *timeSeries) mergeValue(observation Observable, t time.Time) {
201 for _, level := range ts.levels {
202 index := (ts.numBuckets - 1) - int(level.end.Sub(t)/level.size)
203 if 0 <= index && index < ts.numBuckets {
204 bucketNumber := (level.oldest + index) % ts.numBuckets
205 if level.buckets[bucketNumber] == nil {
206 level.buckets[bucketNumber] = level.provider()
207 }
208 level.buckets[bucketNumber].Add(observation)
209 }
210 }
211 ts.total.Add(observation)
212 }
213
214
215 func (ts *timeSeries) mergePendingUpdates() {
216 if ts.dirty {
217 ts.mergeValue(ts.pending, ts.pendingTime)
218 ts.pending = ts.resetObservation(ts.pending)
219 ts.dirty = false
220 }
221 }
222
223
224
225 func (ts *timeSeries) advance(t time.Time) {
226 if !t.After(ts.levels[0].end) {
227 return
228 }
229 for i := 0; i < len(ts.levels); i++ {
230 level := ts.levels[i]
231 if !level.end.Before(t) {
232 break
233 }
234
235
236
237 if !t.Before(level.end.Add(level.size * time.Duration(ts.numBuckets))) {
238 for _, b := range level.buckets {
239 ts.resetObservation(b)
240 }
241 level.end = time.Unix(0, (t.UnixNano()/level.size.Nanoseconds())*level.size.Nanoseconds())
242 }
243
244 for t.After(level.end) {
245 level.end = level.end.Add(level.size)
246 level.newest = level.oldest
247 level.oldest = (level.oldest + 1) % ts.numBuckets
248 ts.resetObservation(level.buckets[level.newest])
249 }
250
251 t = level.end
252 }
253 }
254
255
256 func (ts *timeSeries) Latest(level, num int) Observable {
257 now := ts.clock.Time()
258 if ts.levels[0].end.Before(now) {
259 ts.advance(now)
260 }
261
262 ts.mergePendingUpdates()
263
264 result := ts.provider()
265 l := ts.levels[level]
266 index := l.newest
267
268 for i := 0; i < num; i++ {
269 if l.buckets[index] != nil {
270 result.Add(l.buckets[index])
271 }
272 if index == 0 {
273 index = ts.numBuckets
274 }
275 index--
276 }
277
278 return result
279 }
280
281
282 func (ts *timeSeries) LatestBuckets(level, num int) []Observable {
283 if level < 0 || level > len(ts.levels) {
284 log.Print("timeseries: bad level argument: ", level)
285 return nil
286 }
287 if num < 0 || num >= ts.numBuckets {
288 log.Print("timeseries: bad num argument: ", num)
289 return nil
290 }
291
292 results := make([]Observable, num)
293 now := ts.clock.Time()
294 if ts.levels[0].end.Before(now) {
295 ts.advance(now)
296 }
297
298 ts.mergePendingUpdates()
299
300 l := ts.levels[level]
301 index := l.newest
302
303 for i := 0; i < num; i++ {
304 result := ts.provider()
305 results[i] = result
306 if l.buckets[index] != nil {
307 result.CopyFrom(l.buckets[index])
308 }
309
310 if index == 0 {
311 index = ts.numBuckets
312 }
313 index -= 1
314 }
315 return results
316 }
317
318
319 func (ts *timeSeries) ScaleBy(factor float64) {
320 for _, l := range ts.levels {
321 for i := 0; i < ts.numBuckets; i++ {
322 l.buckets[i].Multiply(factor)
323 }
324 }
325
326 ts.total.Multiply(factor)
327 ts.pending.Multiply(factor)
328 }
329
330
331
332
333 func (ts *timeSeries) Range(start, finish time.Time) Observable {
334 return ts.ComputeRange(start, finish, 1)[0]
335 }
336
337
338 func (ts *timeSeries) Recent(delta time.Duration) Observable {
339 now := ts.clock.Time()
340 return ts.Range(now.Add(-delta), now)
341 }
342
343
344 func (ts *timeSeries) Total() Observable {
345 ts.mergePendingUpdates()
346 return ts.total
347 }
348
349
350
351
352
353
354 func (ts *timeSeries) ComputeRange(start, finish time.Time, num int) []Observable {
355 if start.After(finish) {
356 log.Printf("timeseries: start > finish, %v>%v", start, finish)
357 return nil
358 }
359
360 if num < 0 {
361 log.Printf("timeseries: num < 0, %v", num)
362 return nil
363 }
364
365 results := make([]Observable, num)
366
367 for _, l := range ts.levels {
368 if !start.Before(l.end.Add(-l.size * time.Duration(ts.numBuckets))) {
369 ts.extract(l, start, finish, num, results)
370 return results
371 }
372 }
373
374
375
376
377 ts.extract(ts.levels[len(ts.levels)-1], start, finish, num, results)
378
379 return results
380 }
381
382
383
384 func (ts *timeSeries) RecentList(delta time.Duration, num int) []Observable {
385 if delta < 0 {
386 return nil
387 }
388 now := ts.clock.Time()
389 return ts.ComputeRange(now.Add(-delta), now, num)
390 }
391
392
393
394 func (ts *timeSeries) extract(l *tsLevel, start, finish time.Time, num int, results []Observable) {
395 ts.mergePendingUpdates()
396
397 srcInterval := l.size
398 dstInterval := finish.Sub(start) / time.Duration(num)
399 dstStart := start
400 srcStart := l.end.Add(-srcInterval * time.Duration(ts.numBuckets))
401
402 srcIndex := 0
403
404
405 if dstStart.After(srcStart) {
406 advance := int(dstStart.Sub(srcStart) / srcInterval)
407 srcIndex += advance
408 srcStart = srcStart.Add(time.Duration(advance) * srcInterval)
409 }
410
411
412
413
414
415
416 for i := 0; i < num; i++ {
417 results[i] = ts.resetObservation(results[i])
418 dstEnd := dstStart.Add(dstInterval)
419 for srcIndex < ts.numBuckets && srcStart.Before(dstEnd) {
420 srcEnd := srcStart.Add(srcInterval)
421 if srcEnd.After(ts.lastAdd) {
422 srcEnd = ts.lastAdd
423 }
424
425 if !srcEnd.Before(dstStart) {
426 srcValue := l.buckets[(srcIndex+l.oldest)%ts.numBuckets]
427 if !srcStart.Before(dstStart) && !srcEnd.After(dstEnd) {
428
429 if srcValue != nil {
430 results[i].Add(srcValue)
431 }
432 } else {
433
434 overlapStart := maxTime(srcStart, dstStart)
435 overlapEnd := minTime(srcEnd, dstEnd)
436 base := srcEnd.Sub(srcStart)
437 fraction := overlapEnd.Sub(overlapStart).Seconds() / base.Seconds()
438
439 used := ts.provider()
440 if srcValue != nil {
441 used.CopyFrom(srcValue)
442 }
443 used.Multiply(fraction)
444 results[i].Add(used)
445 }
446
447 if srcEnd.After(dstEnd) {
448 break
449 }
450 }
451 srcIndex++
452 srcStart = srcStart.Add(srcInterval)
453 }
454 dstStart = dstStart.Add(dstInterval)
455 }
456 }
457
458
459 func (ts *timeSeries) resetObservation(observation Observable) Observable {
460 if observation == nil {
461 observation = ts.provider()
462 } else {
463 observation.Clear()
464 }
465 return observation
466 }
467
468
469 type TimeSeries struct {
470 timeSeries
471 }
472
473
474 func NewTimeSeries(f func() Observable) *TimeSeries {
475 return NewTimeSeriesWithClock(f, defaultClockInstance)
476 }
477
478
479
480 func NewTimeSeriesWithClock(f func() Observable, clock Clock) *TimeSeries {
481 ts := new(TimeSeries)
482 ts.timeSeries.init(timeSeriesResolutions, f, timeSeriesNumBuckets, clock)
483 return ts
484 }
485
486
487 type MinuteHourSeries struct {
488 timeSeries
489 }
490
491
492 func NewMinuteHourSeries(f func() Observable) *MinuteHourSeries {
493 return NewMinuteHourSeriesWithClock(f, defaultClockInstance)
494 }
495
496
497
498 func NewMinuteHourSeriesWithClock(f func() Observable, clock Clock) *MinuteHourSeries {
499 ts := new(MinuteHourSeries)
500 ts.timeSeries.init(minuteHourSeriesResolutions, f,
501 minuteHourSeriesNumBuckets, clock)
502 return ts
503 }
504
505 func (ts *MinuteHourSeries) Minute() Observable {
506 return ts.timeSeries.Latest(0, 60)
507 }
508
509 func (ts *MinuteHourSeries) Hour() Observable {
510 return ts.timeSeries.Latest(1, 60)
511 }
512
513 func minTime(a, b time.Time) time.Time {
514 if a.Before(b) {
515 return a
516 }
517 return b
518 }
519
520 func maxTime(a, b time.Time) time.Time {
521 if a.After(b) {
522 return a
523 }
524 return b
525 }
526
View as plain text