1
2
3
4
5 package trace
6
7 import (
8 "cmp"
9 "encoding/binary"
10 "fmt"
11
12 "internal/trace/v2/event"
13 "internal/trace/v2/event/go122"
14 )
15
16 type batchCursor struct {
17 m ThreadID
18 lastTs Time
19 idx int
20 dataOff int
21 ev baseEvent
22 }
23
24 func (b *batchCursor) nextEvent(batches []batch, freq frequency) (ok bool, err error) {
25
26
27 for b.idx < len(batches) && len(batches[b.idx].data) == b.dataOff {
28 b.idx++
29 b.dataOff = 0
30 b.lastTs = 0
31 }
32
33 if b.idx == len(batches) {
34 return false, nil
35 }
36
37 if b.lastTs == 0 {
38 b.lastTs = freq.mul(batches[b.idx].time)
39 }
40
41 n, tsdiff, err := readTimedBaseEvent(batches[b.idx].data[b.dataOff:], &b.ev)
42 if err != nil {
43 return false, err
44 }
45
46 b.ev.time = freq.mul(tsdiff) + b.lastTs
47
48
49 b.lastTs = b.ev.time
50
51
52 b.dataOff += n
53 return true, nil
54 }
55
56 func (b *batchCursor) compare(a *batchCursor) int {
57 return cmp.Compare(b.ev.time, a.ev.time)
58 }
59
60
61
62
63
64
65
66
67 func readTimedBaseEvent(b []byte, e *baseEvent) (int, timestamp, error) {
68
69 typ := event.Type(b[0])
70 specs := go122.Specs()
71 if int(typ) >= len(specs) {
72 return 0, 0, fmt.Errorf("found invalid event type: %v", typ)
73 }
74 e.typ = typ
75
76
77 spec := &specs[typ]
78 if len(spec.Args) == 0 || !spec.IsTimedEvent {
79 return 0, 0, fmt.Errorf("found event without a timestamp: type=%v", typ)
80 }
81 n := 1
82
83
84 ts, nb := binary.Uvarint(b[n:])
85 if nb <= 0 {
86 return 0, 0, fmt.Errorf("found invalid uvarint for timestamp")
87 }
88 n += nb
89
90
91 for i := 0; i < len(spec.Args)-1; i++ {
92 arg, nb := binary.Uvarint(b[n:])
93 if nb <= 0 {
94 return 0, 0, fmt.Errorf("found invalid uvarint")
95 }
96 e.args[i] = arg
97 n += nb
98 }
99 return n, timestamp(ts), nil
100 }
101
102 func heapInsert(heap []*batchCursor, bc *batchCursor) []*batchCursor {
103
104 heap = append(heap, bc)
105
106
107 heapSiftUp(heap, len(heap)-1)
108 return heap
109 }
110
111 func heapUpdate(heap []*batchCursor, i int) {
112
113 if heapSiftUp(heap, i) != i {
114 return
115 }
116
117 heapSiftDown(heap, i)
118 }
119
120 func heapRemove(heap []*batchCursor, i int) []*batchCursor {
121
122 for i > 0 {
123 heap[(i-1)/2], heap[i] = heap[i], heap[(i-1)/2]
124 i = (i - 1) / 2
125 }
126
127 heap[0], heap[len(heap)-1] = heap[len(heap)-1], heap[0]
128 heap = heap[:len(heap)-1]
129
130 heapSiftDown(heap, 0)
131 return heap
132 }
133
134 func heapSiftUp(heap []*batchCursor, i int) int {
135 for i > 0 && heap[(i-1)/2].ev.time > heap[i].ev.time {
136 heap[(i-1)/2], heap[i] = heap[i], heap[(i-1)/2]
137 i = (i - 1) / 2
138 }
139 return i
140 }
141
142 func heapSiftDown(heap []*batchCursor, i int) int {
143 for {
144 m := min3(heap, i, 2*i+1, 2*i+2)
145 if m == i {
146
147 break
148 }
149 heap[i], heap[m] = heap[m], heap[i]
150 i = m
151 }
152 return i
153 }
154
155 func min3(b []*batchCursor, i0, i1, i2 int) int {
156 minIdx := i0
157 minT := maxTime
158 if i0 < len(b) {
159 minT = b[i0].ev.time
160 }
161 if i1 < len(b) {
162 if t := b[i1].ev.time; t < minT {
163 minT = t
164 minIdx = i1
165 }
166 }
167 if i2 < len(b) {
168 if t := b[i2].ev.time; t < minT {
169 minT = t
170 minIdx = i2
171 }
172 }
173 return minIdx
174 }
175
View as plain text