1
2
3
4
5 package trace
6
7 import (
8 "bufio"
9 "fmt"
10 "io"
11 "slices"
12 "strings"
13
14 "internal/trace/internal/tracev1"
15 "internal/trace/tracev2"
16 "internal/trace/version"
17 )
18
19
20
21
22
23
24 type Reader struct {
25 version version.Version
26 r *bufio.Reader
27 lastTs Time
28 gen *generation
29 spill *spilledBatch
30 spillErr error
31 spillErrSync bool
32 frontier []*batchCursor
33 cpuSamples []cpuSample
34 order ordering
35 syncs int
36 done bool
37
38 v1Events *traceV1Converter
39 }
40
41
42 func NewReader(r io.Reader) (*Reader, error) {
43 br := bufio.NewReader(r)
44 v, err := version.ReadHeader(br)
45 if err != nil {
46 return nil, err
47 }
48 switch v {
49 case version.Go111, version.Go119, version.Go121:
50 tr, err := tracev1.Parse(br, v)
51 if err != nil {
52 return nil, err
53 }
54 return &Reader{
55 v1Events: convertV1Trace(tr),
56 }, nil
57 case version.Go122, version.Go123:
58 return &Reader{
59 version: v,
60 r: br,
61 order: ordering{
62 traceVer: v,
63 mStates: make(map[ThreadID]*mState),
64 pStates: make(map[ProcID]*pState),
65 gStates: make(map[GoID]*gState),
66 activeTasks: make(map[TaskID]taskState),
67 },
68 }, nil
69 default:
70 return nil, fmt.Errorf("unknown or unsupported version go 1.%d", v)
71 }
72 }
73
74
75
76
77 func (r *Reader) ReadEvent() (e Event, err error) {
78
79 if r.done {
80 return Event{}, io.EOF
81 }
82
83
84 if r.v1Events != nil {
85 if r.syncs == 0 {
86
87 ev, ok := r.v1Events.events.Peek()
88 if ok {
89 r.syncs++
90 return syncEvent(r.v1Events.evt, Time(ev.Ts-1), r.syncs), nil
91 }
92 }
93 ev, err := r.v1Events.next()
94 if err == io.EOF {
95
96 r.done = true
97 r.syncs++
98 return syncEvent(nil, r.v1Events.lastTs+1, r.syncs), nil
99 } else if err != nil {
100 return Event{}, err
101 }
102 return ev, nil
103 }
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122 defer func() {
123 if err != nil {
124 return
125 }
126 if err = e.validateTableIDs(); err != nil {
127 return
128 }
129 if e.base.time <= r.lastTs {
130 e.base.time = r.lastTs + 1
131 }
132 r.lastTs = e.base.time
133 }()
134
135
136 if ev, ok := r.order.Next(); ok {
137 return ev, nil
138 }
139
140
141 if len(r.frontier) == 0 && len(r.cpuSamples) == 0 {
142 if r.spillErr != nil {
143 if r.spillErrSync {
144 return Event{}, r.spillErr
145 }
146 r.spillErrSync = true
147 r.syncs++
148 return syncEvent(nil, r.lastTs, r.syncs), nil
149 }
150 if r.gen != nil && r.spill == nil {
151
152
153
154
155
156 r.done = true
157 r.syncs++
158 return syncEvent(nil, r.lastTs, r.syncs), nil
159 }
160
161 r.gen, r.spill, r.spillErr = readGeneration(r.r, r.spill)
162 if r.gen == nil {
163 r.spillErrSync = true
164 r.syncs++
165 return syncEvent(nil, r.lastTs, r.syncs), nil
166 }
167
168
169 r.cpuSamples = r.gen.cpuSamples
170
171
172 for _, m := range r.gen.batchMs {
173 batches := r.gen.batches[m]
174 bc := &batchCursor{m: m}
175 ok, err := bc.nextEvent(batches, r.gen.freq)
176 if err != nil {
177 return Event{}, err
178 }
179 if !ok {
180
181 continue
182 }
183 r.frontier = heapInsert(r.frontier, bc)
184 }
185 r.syncs++
186 if r.lastTs == 0 {
187 r.lastTs = r.gen.freq.mul(r.gen.minTs)
188 }
189
190 return syncEvent(r.gen.evTable, r.lastTs, r.syncs), nil
191 }
192 tryAdvance := func(i int) (bool, error) {
193 bc := r.frontier[i]
194
195 if ok, err := r.order.Advance(&bc.ev, r.gen.evTable, bc.m, r.gen.gen); !ok || err != nil {
196 return ok, err
197 }
198
199
200 ok, err := bc.nextEvent(r.gen.batches[bc.m], r.gen.freq)
201 if err != nil {
202 return false, err
203 }
204 if ok {
205
206 heapUpdate(r.frontier, i)
207 } else {
208
209 r.frontier = heapRemove(r.frontier, i)
210 }
211 return true, nil
212 }
213
214 if len(r.cpuSamples) != 0 {
215 if len(r.frontier) == 0 || r.cpuSamples[0].time < r.frontier[0].ev.time {
216 e := r.cpuSamples[0].asEvent(r.gen.evTable)
217 r.cpuSamples = r.cpuSamples[1:]
218 return e, nil
219 }
220 }
221
222
223 if len(r.frontier) == 0 {
224 return Event{}, fmt.Errorf("broken trace: frontier is empty:\n[gen=%d]\n\n%s\n%s\n", r.gen.gen, dumpFrontier(r.frontier), dumpOrdering(&r.order))
225 }
226 if ok, err := tryAdvance(0); err != nil {
227 return Event{}, err
228 } else if !ok {
229
230
231
232
233
234 slices.SortFunc(r.frontier, (*batchCursor).compare)
235 success := false
236 for i := 1; i < len(r.frontier); i++ {
237 if ok, err = tryAdvance(i); err != nil {
238 return Event{}, err
239 } else if ok {
240 success = true
241 break
242 }
243 }
244 if !success {
245 return Event{}, fmt.Errorf("broken trace: failed to advance: frontier:\n[gen=%d]\n\n%s\n%s\n", r.gen.gen, dumpFrontier(r.frontier), dumpOrdering(&r.order))
246 }
247 }
248
249
250 ev, ok := r.order.Next()
251 if !ok {
252 panic("invariant violation: advance successful, but queue is empty")
253 }
254 return ev, nil
255 }
256
257 func dumpFrontier(frontier []*batchCursor) string {
258 var sb strings.Builder
259 for _, bc := range frontier {
260 spec := tracev2.Specs()[bc.ev.typ]
261 fmt.Fprintf(&sb, "M %d [%s time=%d", bc.m, spec.Name, bc.ev.time)
262 for i, arg := range spec.Args[1:] {
263 fmt.Fprintf(&sb, " %s=%d", arg, bc.ev.args[i])
264 }
265 fmt.Fprintf(&sb, "]\n")
266 }
267 return sb.String()
268 }
269
View as plain text