1
2
3
4
5 package trace
6
7 import (
8 "bytes"
9 "encoding/binary"
10 "fmt"
11 "io"
12
13 "internal/trace/tracev2"
14 )
15
16
17 type timestamp uint64
18
19
20
21 type batch struct {
22 m ThreadID
23 time timestamp
24 data []byte
25 exp tracev2.Experiment
26 }
27
28 func (b *batch) isStringsBatch() bool {
29 return b.exp == tracev2.NoExperiment && len(b.data) > 0 && tracev2.EventType(b.data[0]) == tracev2.EvStrings
30 }
31
32 func (b *batch) isStacksBatch() bool {
33 return b.exp == tracev2.NoExperiment && len(b.data) > 0 && tracev2.EventType(b.data[0]) == tracev2.EvStacks
34 }
35
36 func (b *batch) isCPUSamplesBatch() bool {
37 return b.exp == tracev2.NoExperiment && len(b.data) > 0 && tracev2.EventType(b.data[0]) == tracev2.EvCPUSamples
38 }
39
40 func (b *batch) isFreqBatch() bool {
41 return b.exp == tracev2.NoExperiment && len(b.data) > 0 && tracev2.EventType(b.data[0]) == tracev2.EvFrequency
42 }
43
44
45 func readBatch(r interface {
46 io.Reader
47 io.ByteReader
48 }) (batch, uint64, error) {
49
50 b, err := r.ReadByte()
51 if err != nil {
52 return batch{}, 0, err
53 }
54 if typ := tracev2.EventType(b); typ != tracev2.EvEventBatch && typ != tracev2.EvExperimentalBatch {
55 return batch{}, 0, fmt.Errorf("expected batch event, got event %d", typ)
56 }
57
58
59 exp := tracev2.NoExperiment
60 if tracev2.EventType(b) == tracev2.EvExperimentalBatch {
61 e, err := r.ReadByte()
62 if err != nil {
63 return batch{}, 0, err
64 }
65 exp = tracev2.Experiment(e)
66 }
67
68
69
70 gen, err := binary.ReadUvarint(r)
71 if err != nil {
72 return batch{}, gen, fmt.Errorf("error reading batch gen: %w", err)
73 }
74 m, err := binary.ReadUvarint(r)
75 if err != nil {
76 return batch{}, gen, fmt.Errorf("error reading batch M ID: %w", err)
77 }
78 ts, err := binary.ReadUvarint(r)
79 if err != nil {
80 return batch{}, gen, fmt.Errorf("error reading batch timestamp: %w", err)
81 }
82
83
84 size, err := binary.ReadUvarint(r)
85 if err != nil {
86 return batch{}, gen, fmt.Errorf("error reading batch size: %w", err)
87 }
88 if size > tracev2.MaxBatchSize {
89 return batch{}, gen, fmt.Errorf("invalid batch size %d, maximum is %d", size, tracev2.MaxBatchSize)
90 }
91
92
93 var data bytes.Buffer
94 data.Grow(int(size))
95 n, err := io.CopyN(&data, r, int64(size))
96 if n != int64(size) {
97 return batch{}, gen, fmt.Errorf("failed to read full batch: read %d but wanted %d", n, size)
98 }
99 if err != nil {
100 return batch{}, gen, fmt.Errorf("copying batch data: %w", err)
101 }
102
103
104 return batch{
105 m: ThreadID(m),
106 time: timestamp(ts),
107 data: data.Bytes(),
108 exp: exp,
109 }, gen, nil
110 }
111
View as plain text