1
2
3
4
5 package trace
6
7 import (
8 "bufio"
9 "bytes"
10 "encoding/binary"
11 "fmt"
12 "io"
13
14 "internal/trace/v2/event"
15 "internal/trace/v2/event/go122"
16 )
17
18
19 type timestamp uint64
20
21
22
23 type batch struct {
24 m ThreadID
25 time timestamp
26 data []byte
27 }
28
29 func (b *batch) isStringsBatch() bool {
30 return len(b.data) > 0 && event.Type(b.data[0]) == go122.EvStrings
31 }
32
33 func (b *batch) isStacksBatch() bool {
34 return len(b.data) > 0 && event.Type(b.data[0]) == go122.EvStacks
35 }
36
37 func (b *batch) isCPUSamplesBatch() bool {
38 return len(b.data) > 0 && event.Type(b.data[0]) == go122.EvCPUSamples
39 }
40
41 func (b *batch) isFreqBatch() bool {
42 return len(b.data) > 0 && event.Type(b.data[0]) == go122.EvFrequency
43 }
44
45
46 func readBatch(r *bufio.Reader) (batch, uint64, error) {
47
48 b, err := r.ReadByte()
49 if err != nil {
50 return batch{}, 0, err
51 }
52 if typ := event.Type(b); typ != go122.EvEventBatch {
53 return batch{}, 0, fmt.Errorf("expected batch event (%s), got %s", go122.EventString(go122.EvEventBatch), go122.EventString(typ))
54 }
55
56
57
58 gen, err := binary.ReadUvarint(r)
59 if err != nil {
60 return batch{}, gen, fmt.Errorf("error reading batch gen: %w", err)
61 }
62 m, err := binary.ReadUvarint(r)
63 if err != nil {
64 return batch{}, gen, fmt.Errorf("error reading batch M ID: %w", err)
65 }
66 ts, err := binary.ReadUvarint(r)
67 if err != nil {
68 return batch{}, gen, fmt.Errorf("error reading batch timestamp: %w", err)
69 }
70
71
72 size, err := binary.ReadUvarint(r)
73 if err != nil {
74 return batch{}, gen, fmt.Errorf("error reading batch size: %w", err)
75 }
76 if size > go122.MaxBatchSize {
77 return batch{}, gen, fmt.Errorf("invalid batch size %d, maximum is %d", size, go122.MaxBatchSize)
78 }
79
80
81 var data bytes.Buffer
82 data.Grow(int(size))
83 n, err := io.CopyN(&data, r, int64(size))
84 if n != int64(size) {
85 return batch{}, gen, fmt.Errorf("failed to read full batch: read %d but wanted %d", n, size)
86 }
87 if err != nil {
88 return batch{}, gen, fmt.Errorf("copying batch data: %w", err)
89 }
90
91
92 return batch{
93 m: ThreadID(m),
94 time: timestamp(ts),
95 data: data.Bytes(),
96 }, gen, nil
97 }
98
View as plain text