...

Source file src/golang.org/x/net/internal/timeseries/timeseries.go

Documentation: golang.org/x/net/internal/timeseries

     1  // Copyright 2015 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  // Package timeseries implements a time series structure for stats collection.
     6  package timeseries // import "golang.org/x/net/internal/timeseries"
     7  
     8  import (
     9  	"fmt"
    10  	"log"
    11  	"time"
    12  )
    13  
    14  const (
    15  	timeSeriesNumBuckets       = 64
    16  	minuteHourSeriesNumBuckets = 60
    17  )
    18  
    19  var timeSeriesResolutions = []time.Duration{
    20  	1 * time.Second,
    21  	10 * time.Second,
    22  	1 * time.Minute,
    23  	10 * time.Minute,
    24  	1 * time.Hour,
    25  	6 * time.Hour,
    26  	24 * time.Hour,          // 1 day
    27  	7 * 24 * time.Hour,      // 1 week
    28  	4 * 7 * 24 * time.Hour,  // 4 weeks
    29  	16 * 7 * 24 * time.Hour, // 16 weeks
    30  }
    31  
    32  var minuteHourSeriesResolutions = []time.Duration{
    33  	1 * time.Second,
    34  	1 * time.Minute,
    35  }
    36  
    37  // An Observable is a kind of data that can be aggregated in a time series.
    38  type Observable interface {
    39  	Multiply(ratio float64)    // Multiplies the data in self by a given ratio
    40  	Add(other Observable)      // Adds the data from a different observation to self
    41  	Clear()                    // Clears the observation so it can be reused.
    42  	CopyFrom(other Observable) // Copies the contents of a given observation to self
    43  }
    44  
    45  // Float attaches the methods of Observable to a float64.
    46  type Float float64
    47  
    48  // NewFloat returns a Float.
    49  func NewFloat() Observable {
    50  	f := Float(0)
    51  	return &f
    52  }
    53  
    54  // String returns the float as a string.
    55  func (f *Float) String() string { return fmt.Sprintf("%g", f.Value()) }
    56  
    57  // Value returns the float's value.
    58  func (f *Float) Value() float64 { return float64(*f) }
    59  
    60  func (f *Float) Multiply(ratio float64) { *f *= Float(ratio) }
    61  
    62  func (f *Float) Add(other Observable) {
    63  	o := other.(*Float)
    64  	*f += *o
    65  }
    66  
    67  func (f *Float) Clear() { *f = 0 }
    68  
    69  func (f *Float) CopyFrom(other Observable) {
    70  	o := other.(*Float)
    71  	*f = *o
    72  }
    73  
    74  // A Clock tells the current time.
    75  type Clock interface {
    76  	Time() time.Time
    77  }
    78  
    79  type defaultClock int
    80  
    81  var defaultClockInstance defaultClock
    82  
    83  func (defaultClock) Time() time.Time { return time.Now() }
    84  
    85  // Information kept per level. Each level consists of a circular list of
    86  // observations. The start of the level may be derived from end and the
    87  // len(buckets) * sizeInMillis.
    88  type tsLevel struct {
    89  	oldest   int               // index to oldest bucketed Observable
    90  	newest   int               // index to newest bucketed Observable
    91  	end      time.Time         // end timestamp for this level
    92  	size     time.Duration     // duration of the bucketed Observable
    93  	buckets  []Observable      // collections of observations
    94  	provider func() Observable // used for creating new Observable
    95  }
    96  
    97  func (l *tsLevel) Clear() {
    98  	l.oldest = 0
    99  	l.newest = len(l.buckets) - 1
   100  	l.end = time.Time{}
   101  	for i := range l.buckets {
   102  		if l.buckets[i] != nil {
   103  			l.buckets[i].Clear()
   104  			l.buckets[i] = nil
   105  		}
   106  	}
   107  }
   108  
   109  func (l *tsLevel) InitLevel(size time.Duration, numBuckets int, f func() Observable) {
   110  	l.size = size
   111  	l.provider = f
   112  	l.buckets = make([]Observable, numBuckets)
   113  }
   114  
   115  // Keeps a sequence of levels. Each level is responsible for storing data at
   116  // a given resolution. For example, the first level stores data at a one
   117  // minute resolution while the second level stores data at a one hour
   118  // resolution.
   119  
   120  // Each level is represented by a sequence of buckets. Each bucket spans an
   121  // interval equal to the resolution of the level. New observations are added
   122  // to the last bucket.
   123  type timeSeries struct {
   124  	provider    func() Observable // make more Observable
   125  	numBuckets  int               // number of buckets in each level
   126  	levels      []*tsLevel        // levels of bucketed Observable
   127  	lastAdd     time.Time         // time of last Observable tracked
   128  	total       Observable        // convenient aggregation of all Observable
   129  	clock       Clock             // Clock for getting current time
   130  	pending     Observable        // observations not yet bucketed
   131  	pendingTime time.Time         // what time are we keeping in pending
   132  	dirty       bool              // if there are pending observations
   133  }
   134  
   135  // init initializes a level according to the supplied criteria.
   136  func (ts *timeSeries) init(resolutions []time.Duration, f func() Observable, numBuckets int, clock Clock) {
   137  	ts.provider = f
   138  	ts.numBuckets = numBuckets
   139  	ts.clock = clock
   140  	ts.levels = make([]*tsLevel, len(resolutions))
   141  
   142  	for i := range resolutions {
   143  		if i > 0 && resolutions[i-1] >= resolutions[i] {
   144  			log.Print("timeseries: resolutions must be monotonically increasing")
   145  			break
   146  		}
   147  		newLevel := new(tsLevel)
   148  		newLevel.InitLevel(resolutions[i], ts.numBuckets, ts.provider)
   149  		ts.levels[i] = newLevel
   150  	}
   151  
   152  	ts.Clear()
   153  }
   154  
   155  // Clear removes all observations from the time series.
   156  func (ts *timeSeries) Clear() {
   157  	ts.lastAdd = time.Time{}
   158  	ts.total = ts.resetObservation(ts.total)
   159  	ts.pending = ts.resetObservation(ts.pending)
   160  	ts.pendingTime = time.Time{}
   161  	ts.dirty = false
   162  
   163  	for i := range ts.levels {
   164  		ts.levels[i].Clear()
   165  	}
   166  }
   167  
   168  // Add records an observation at the current time.
   169  func (ts *timeSeries) Add(observation Observable) {
   170  	ts.AddWithTime(observation, ts.clock.Time())
   171  }
   172  
   173  // AddWithTime records an observation at the specified time.
   174  func (ts *timeSeries) AddWithTime(observation Observable, t time.Time) {
   175  
   176  	smallBucketDuration := ts.levels[0].size
   177  
   178  	if t.After(ts.lastAdd) {
   179  		ts.lastAdd = t
   180  	}
   181  
   182  	if t.After(ts.pendingTime) {
   183  		ts.advance(t)
   184  		ts.mergePendingUpdates()
   185  		ts.pendingTime = ts.levels[0].end
   186  		ts.pending.CopyFrom(observation)
   187  		ts.dirty = true
   188  	} else if t.After(ts.pendingTime.Add(-1 * smallBucketDuration)) {
   189  		// The observation is close enough to go into the pending bucket.
   190  		// This compensates for clock skewing and small scheduling delays
   191  		// by letting the update stay in the fast path.
   192  		ts.pending.Add(observation)
   193  		ts.dirty = true
   194  	} else {
   195  		ts.mergeValue(observation, t)
   196  	}
   197  }
   198  
   199  // mergeValue inserts the observation at the specified time in the past into all levels.
   200  func (ts *timeSeries) mergeValue(observation Observable, t time.Time) {
   201  	for _, level := range ts.levels {
   202  		index := (ts.numBuckets - 1) - int(level.end.Sub(t)/level.size)
   203  		if 0 <= index && index < ts.numBuckets {
   204  			bucketNumber := (level.oldest + index) % ts.numBuckets
   205  			if level.buckets[bucketNumber] == nil {
   206  				level.buckets[bucketNumber] = level.provider()
   207  			}
   208  			level.buckets[bucketNumber].Add(observation)
   209  		}
   210  	}
   211  	ts.total.Add(observation)
   212  }
   213  
   214  // mergePendingUpdates applies the pending updates into all levels.
   215  func (ts *timeSeries) mergePendingUpdates() {
   216  	if ts.dirty {
   217  		ts.mergeValue(ts.pending, ts.pendingTime)
   218  		ts.pending = ts.resetObservation(ts.pending)
   219  		ts.dirty = false
   220  	}
   221  }
   222  
   223  // advance cycles the buckets at each level until the latest bucket in
   224  // each level can hold the time specified.
   225  func (ts *timeSeries) advance(t time.Time) {
   226  	if !t.After(ts.levels[0].end) {
   227  		return
   228  	}
   229  	for i := 0; i < len(ts.levels); i++ {
   230  		level := ts.levels[i]
   231  		if !level.end.Before(t) {
   232  			break
   233  		}
   234  
   235  		// If the time is sufficiently far, just clear the level and advance
   236  		// directly.
   237  		if !t.Before(level.end.Add(level.size * time.Duration(ts.numBuckets))) {
   238  			for _, b := range level.buckets {
   239  				ts.resetObservation(b)
   240  			}
   241  			level.end = time.Unix(0, (t.UnixNano()/level.size.Nanoseconds())*level.size.Nanoseconds())
   242  		}
   243  
   244  		for t.After(level.end) {
   245  			level.end = level.end.Add(level.size)
   246  			level.newest = level.oldest
   247  			level.oldest = (level.oldest + 1) % ts.numBuckets
   248  			ts.resetObservation(level.buckets[level.newest])
   249  		}
   250  
   251  		t = level.end
   252  	}
   253  }
   254  
   255  // Latest returns the sum of the num latest buckets from the level.
   256  func (ts *timeSeries) Latest(level, num int) Observable {
   257  	now := ts.clock.Time()
   258  	if ts.levels[0].end.Before(now) {
   259  		ts.advance(now)
   260  	}
   261  
   262  	ts.mergePendingUpdates()
   263  
   264  	result := ts.provider()
   265  	l := ts.levels[level]
   266  	index := l.newest
   267  
   268  	for i := 0; i < num; i++ {
   269  		if l.buckets[index] != nil {
   270  			result.Add(l.buckets[index])
   271  		}
   272  		if index == 0 {
   273  			index = ts.numBuckets
   274  		}
   275  		index--
   276  	}
   277  
   278  	return result
   279  }
   280  
   281  // LatestBuckets returns a copy of the num latest buckets from level.
   282  func (ts *timeSeries) LatestBuckets(level, num int) []Observable {
   283  	if level < 0 || level > len(ts.levels) {
   284  		log.Print("timeseries: bad level argument: ", level)
   285  		return nil
   286  	}
   287  	if num < 0 || num >= ts.numBuckets {
   288  		log.Print("timeseries: bad num argument: ", num)
   289  		return nil
   290  	}
   291  
   292  	results := make([]Observable, num)
   293  	now := ts.clock.Time()
   294  	if ts.levels[0].end.Before(now) {
   295  		ts.advance(now)
   296  	}
   297  
   298  	ts.mergePendingUpdates()
   299  
   300  	l := ts.levels[level]
   301  	index := l.newest
   302  
   303  	for i := 0; i < num; i++ {
   304  		result := ts.provider()
   305  		results[i] = result
   306  		if l.buckets[index] != nil {
   307  			result.CopyFrom(l.buckets[index])
   308  		}
   309  
   310  		if index == 0 {
   311  			index = ts.numBuckets
   312  		}
   313  		index -= 1
   314  	}
   315  	return results
   316  }
   317  
   318  // ScaleBy updates observations by scaling by factor.
   319  func (ts *timeSeries) ScaleBy(factor float64) {
   320  	for _, l := range ts.levels {
   321  		for i := 0; i < ts.numBuckets; i++ {
   322  			l.buckets[i].Multiply(factor)
   323  		}
   324  	}
   325  
   326  	ts.total.Multiply(factor)
   327  	ts.pending.Multiply(factor)
   328  }
   329  
   330  // Range returns the sum of observations added over the specified time range.
   331  // If start or finish times don't fall on bucket boundaries of the same
   332  // level, then return values are approximate answers.
   333  func (ts *timeSeries) Range(start, finish time.Time) Observable {
   334  	return ts.ComputeRange(start, finish, 1)[0]
   335  }
   336  
   337  // Recent returns the sum of observations from the last delta.
   338  func (ts *timeSeries) Recent(delta time.Duration) Observable {
   339  	now := ts.clock.Time()
   340  	return ts.Range(now.Add(-delta), now)
   341  }
   342  
   343  // Total returns the total of all observations.
   344  func (ts *timeSeries) Total() Observable {
   345  	ts.mergePendingUpdates()
   346  	return ts.total
   347  }
   348  
   349  // ComputeRange computes a specified number of values into a slice using
   350  // the observations recorded over the specified time period. The return
   351  // values are approximate if the start or finish times don't fall on the
   352  // bucket boundaries at the same level or if the number of buckets spanning
   353  // the range is not an integral multiple of num.
   354  func (ts *timeSeries) ComputeRange(start, finish time.Time, num int) []Observable {
   355  	if start.After(finish) {
   356  		log.Printf("timeseries: start > finish, %v>%v", start, finish)
   357  		return nil
   358  	}
   359  
   360  	if num < 0 {
   361  		log.Printf("timeseries: num < 0, %v", num)
   362  		return nil
   363  	}
   364  
   365  	results := make([]Observable, num)
   366  
   367  	for _, l := range ts.levels {
   368  		if !start.Before(l.end.Add(-l.size * time.Duration(ts.numBuckets))) {
   369  			ts.extract(l, start, finish, num, results)
   370  			return results
   371  		}
   372  	}
   373  
   374  	// Failed to find a level that covers the desired range. So just
   375  	// extract from the last level, even if it doesn't cover the entire
   376  	// desired range.
   377  	ts.extract(ts.levels[len(ts.levels)-1], start, finish, num, results)
   378  
   379  	return results
   380  }
   381  
   382  // RecentList returns the specified number of values in slice over the most
   383  // recent time period of the specified range.
   384  func (ts *timeSeries) RecentList(delta time.Duration, num int) []Observable {
   385  	if delta < 0 {
   386  		return nil
   387  	}
   388  	now := ts.clock.Time()
   389  	return ts.ComputeRange(now.Add(-delta), now, num)
   390  }
   391  
   392  // extract returns a slice of specified number of observations from a given
   393  // level over a given range.
   394  func (ts *timeSeries) extract(l *tsLevel, start, finish time.Time, num int, results []Observable) {
   395  	ts.mergePendingUpdates()
   396  
   397  	srcInterval := l.size
   398  	dstInterval := finish.Sub(start) / time.Duration(num)
   399  	dstStart := start
   400  	srcStart := l.end.Add(-srcInterval * time.Duration(ts.numBuckets))
   401  
   402  	srcIndex := 0
   403  
   404  	// Where should scanning start?
   405  	if dstStart.After(srcStart) {
   406  		advance := int(dstStart.Sub(srcStart) / srcInterval)
   407  		srcIndex += advance
   408  		srcStart = srcStart.Add(time.Duration(advance) * srcInterval)
   409  	}
   410  
   411  	// The i'th value is computed as show below.
   412  	// interval = (finish/start)/num
   413  	// i'th value = sum of observation in range
   414  	//   [ start + i       * interval,
   415  	//     start + (i + 1) * interval )
   416  	for i := 0; i < num; i++ {
   417  		results[i] = ts.resetObservation(results[i])
   418  		dstEnd := dstStart.Add(dstInterval)
   419  		for srcIndex < ts.numBuckets && srcStart.Before(dstEnd) {
   420  			srcEnd := srcStart.Add(srcInterval)
   421  			if srcEnd.After(ts.lastAdd) {
   422  				srcEnd = ts.lastAdd
   423  			}
   424  
   425  			if !srcEnd.Before(dstStart) {
   426  				srcValue := l.buckets[(srcIndex+l.oldest)%ts.numBuckets]
   427  				if !srcStart.Before(dstStart) && !srcEnd.After(dstEnd) {
   428  					// dst completely contains src.
   429  					if srcValue != nil {
   430  						results[i].Add(srcValue)
   431  					}
   432  				} else {
   433  					// dst partially overlaps src.
   434  					overlapStart := maxTime(srcStart, dstStart)
   435  					overlapEnd := minTime(srcEnd, dstEnd)
   436  					base := srcEnd.Sub(srcStart)
   437  					fraction := overlapEnd.Sub(overlapStart).Seconds() / base.Seconds()
   438  
   439  					used := ts.provider()
   440  					if srcValue != nil {
   441  						used.CopyFrom(srcValue)
   442  					}
   443  					used.Multiply(fraction)
   444  					results[i].Add(used)
   445  				}
   446  
   447  				if srcEnd.After(dstEnd) {
   448  					break
   449  				}
   450  			}
   451  			srcIndex++
   452  			srcStart = srcStart.Add(srcInterval)
   453  		}
   454  		dstStart = dstStart.Add(dstInterval)
   455  	}
   456  }
   457  
   458  // resetObservation clears the content so the struct may be reused.
   459  func (ts *timeSeries) resetObservation(observation Observable) Observable {
   460  	if observation == nil {
   461  		observation = ts.provider()
   462  	} else {
   463  		observation.Clear()
   464  	}
   465  	return observation
   466  }
   467  
   468  // TimeSeries tracks data at granularities from 1 second to 16 weeks.
   469  type TimeSeries struct {
   470  	timeSeries
   471  }
   472  
   473  // NewTimeSeries creates a new TimeSeries using the function provided for creating new Observable.
   474  func NewTimeSeries(f func() Observable) *TimeSeries {
   475  	return NewTimeSeriesWithClock(f, defaultClockInstance)
   476  }
   477  
   478  // NewTimeSeriesWithClock creates a new TimeSeries using the function provided for creating new Observable and the clock for
   479  // assigning timestamps.
   480  func NewTimeSeriesWithClock(f func() Observable, clock Clock) *TimeSeries {
   481  	ts := new(TimeSeries)
   482  	ts.timeSeries.init(timeSeriesResolutions, f, timeSeriesNumBuckets, clock)
   483  	return ts
   484  }
   485  
   486  // MinuteHourSeries tracks data at granularities of 1 minute and 1 hour.
   487  type MinuteHourSeries struct {
   488  	timeSeries
   489  }
   490  
   491  // NewMinuteHourSeries creates a new MinuteHourSeries using the function provided for creating new Observable.
   492  func NewMinuteHourSeries(f func() Observable) *MinuteHourSeries {
   493  	return NewMinuteHourSeriesWithClock(f, defaultClockInstance)
   494  }
   495  
   496  // NewMinuteHourSeriesWithClock creates a new MinuteHourSeries using the function provided for creating new Observable and the clock for
   497  // assigning timestamps.
   498  func NewMinuteHourSeriesWithClock(f func() Observable, clock Clock) *MinuteHourSeries {
   499  	ts := new(MinuteHourSeries)
   500  	ts.timeSeries.init(minuteHourSeriesResolutions, f,
   501  		minuteHourSeriesNumBuckets, clock)
   502  	return ts
   503  }
   504  
   505  func (ts *MinuteHourSeries) Minute() Observable {
   506  	return ts.timeSeries.Latest(0, 60)
   507  }
   508  
   509  func (ts *MinuteHourSeries) Hour() Observable {
   510  	return ts.timeSeries.Latest(1, 60)
   511  }
   512  
   513  func minTime(a, b time.Time) time.Time {
   514  	if a.Before(b) {
   515  		return a
   516  	}
   517  	return b
   518  }
   519  
   520  func maxTime(a, b time.Time) time.Time {
   521  	if a.After(b) {
   522  		return a
   523  	}
   524  	return b
   525  }
   526  

View as plain text