1
2
3
4
5 package trace
6
7 import (
8 "bufio"
9 "fmt"
10 "io"
11 "slices"
12 "strings"
13
14 "internal/trace/v2/event/go122"
15 "internal/trace/v2/version"
16 )
17
18
19 type Reader struct {
20 r *bufio.Reader
21 lastTs Time
22 gen *generation
23 spill *spilledBatch
24 frontier []*batchCursor
25 cpuSamples []cpuSample
26 order ordering
27 emittedSync bool
28 }
29
30
31 func NewReader(r io.Reader) (*Reader, error) {
32 br := bufio.NewReader(r)
33 v, err := version.ReadHeader(br)
34 if err != nil {
35 return nil, err
36 }
37 if v != version.Go122 {
38 return nil, fmt.Errorf("unknown or unsupported version go 1.%d", v)
39 }
40 return &Reader{
41 r: br,
42 order: ordering{
43 mStates: make(map[ThreadID]*mState),
44 pStates: make(map[ProcID]*pState),
45 gStates: make(map[GoID]*gState),
46 activeTasks: make(map[TaskID]taskState),
47 },
48
49 emittedSync: true,
50 }, nil
51 }
52
53
54
55
56
57 func (r *Reader) ReadEvent() (e Event, err error) {
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75 defer func() {
76 if err != nil {
77 return
78 }
79 if err = e.validateTableIDs(); err != nil {
80 return
81 }
82 if e.base.time <= r.lastTs {
83 e.base.time = r.lastTs + 1
84 }
85 r.lastTs = e.base.time
86 }()
87
88
89 if ev := r.order.consumeExtraEvent(); ev.Kind() != EventBad {
90 return ev, nil
91 }
92
93
94 if len(r.frontier) == 0 && len(r.cpuSamples) == 0 {
95 if !r.emittedSync {
96 r.emittedSync = true
97 return syncEvent(r.gen.evTable, r.lastTs), nil
98 }
99 if r.gen != nil && r.spill == nil {
100
101
102
103
104
105 return Event{}, io.EOF
106 }
107
108 r.gen, r.spill, err = readGeneration(r.r, r.spill)
109 if err != nil {
110 return Event{}, err
111 }
112
113
114 r.cpuSamples = r.gen.cpuSamples
115
116
117 for m, batches := range r.gen.batches {
118 bc := &batchCursor{m: m}
119 ok, err := bc.nextEvent(batches, r.gen.freq)
120 if err != nil {
121 return Event{}, err
122 }
123 if !ok {
124
125 continue
126 }
127 r.frontier = heapInsert(r.frontier, bc)
128 }
129
130
131 r.emittedSync = false
132 }
133 refresh := func(i int) error {
134 bc := r.frontier[i]
135
136
137 ok, err := bc.nextEvent(r.gen.batches[bc.m], r.gen.freq)
138 if err != nil {
139 return err
140 }
141 if ok {
142
143 heapUpdate(r.frontier, i)
144 } else {
145
146 r.frontier = heapRemove(r.frontier, i)
147 }
148 return nil
149 }
150
151 if len(r.cpuSamples) != 0 {
152 if len(r.frontier) == 0 || r.cpuSamples[0].time < r.frontier[0].ev.time {
153 e := r.cpuSamples[0].asEvent(r.gen.evTable)
154 r.cpuSamples = r.cpuSamples[1:]
155 return e, nil
156 }
157 }
158
159
160 if len(r.frontier) == 0 {
161 return Event{}, fmt.Errorf("broken trace: frontier is empty:\n[gen=%d]\n\n%s\n%s\n", r.gen.gen, dumpFrontier(r.frontier), dumpOrdering(&r.order))
162 }
163 bc := r.frontier[0]
164 if ctx, ok, err := r.order.advance(&bc.ev, r.gen.evTable, bc.m, r.gen.gen); err != nil {
165 return Event{}, err
166 } else if ok {
167 e := Event{table: r.gen.evTable, ctx: ctx, base: bc.ev}
168 return e, refresh(0)
169 }
170
171
172
173 slices.SortFunc(r.frontier, (*batchCursor).compare)
174
175 for i := 1; i < len(r.frontier); i++ {
176 bc := r.frontier[i]
177 if ctx, ok, err := r.order.advance(&bc.ev, r.gen.evTable, bc.m, r.gen.gen); err != nil {
178 return Event{}, err
179 } else if ok {
180 e := Event{table: r.gen.evTable, ctx: ctx, base: bc.ev}
181 return e, refresh(i)
182 }
183 }
184 return Event{}, fmt.Errorf("broken trace: failed to advance: frontier:\n[gen=%d]\n\n%s\n%s\n", r.gen.gen, dumpFrontier(r.frontier), dumpOrdering(&r.order))
185 }
186
187 func dumpFrontier(frontier []*batchCursor) string {
188 var sb strings.Builder
189 for _, bc := range frontier {
190 spec := go122.Specs()[bc.ev.typ]
191 fmt.Fprintf(&sb, "M %d [%s time=%d", bc.m, spec.Name, bc.ev.time)
192 for i, arg := range spec.Args[1:] {
193 fmt.Fprintf(&sb, " %s=%d", arg, bc.ev.args[i])
194 }
195 fmt.Fprintf(&sb, "]\n")
196 }
197 return sb.String()
198 }
199
View as plain text