Source file src/internal/trace/reader.go

     1  // Copyright 2023 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package trace
     6  
     7  import (
     8  	"bufio"
     9  	"fmt"
    10  	"io"
    11  	"slices"
    12  	"strings"
    13  
    14  	"internal/trace/internal/tracev1"
    15  	"internal/trace/tracev2"
    16  	"internal/trace/version"
    17  )
    18  
    19  // Reader reads a byte stream, validates it, and produces trace events.
    20  //
    21  // Provided the trace is non-empty the Reader always produces a Sync
    22  // event as the first event, and a Sync event as the last event.
    23  // (There may also be any number of Sync events in the middle, too.)
    24  type Reader struct {
    25  	version      version.Version
    26  	r            *bufio.Reader
    27  	lastTs       Time
    28  	gen          *generation
    29  	spill        *spilledBatch
    30  	spillErr     error // error from reading spill
    31  	spillErrSync bool  // whether we emitted a Sync before reporting spillErr
    32  	frontier     []*batchCursor
    33  	cpuSamples   []cpuSample
    34  	order        ordering
    35  	syncs        int
    36  	done         bool
    37  
    38  	v1Events *traceV1Converter
    39  }
    40  
    41  // NewReader creates a new trace reader.
    42  func NewReader(r io.Reader) (*Reader, error) {
    43  	br := bufio.NewReader(r)
    44  	v, err := version.ReadHeader(br)
    45  	if err != nil {
    46  		return nil, err
    47  	}
    48  	switch v {
    49  	case version.Go111, version.Go119, version.Go121:
    50  		tr, err := tracev1.Parse(br, v)
    51  		if err != nil {
    52  			return nil, err
    53  		}
    54  		return &Reader{
    55  			v1Events: convertV1Trace(tr),
    56  		}, nil
    57  	case version.Go122, version.Go123:
    58  		return &Reader{
    59  			version: v,
    60  			r:       br,
    61  			order: ordering{
    62  				traceVer:    v,
    63  				mStates:     make(map[ThreadID]*mState),
    64  				pStates:     make(map[ProcID]*pState),
    65  				gStates:     make(map[GoID]*gState),
    66  				activeTasks: make(map[TaskID]taskState),
    67  			},
    68  		}, nil
    69  	default:
    70  		return nil, fmt.Errorf("unknown or unsupported version go 1.%d", v)
    71  	}
    72  }
    73  
    74  // ReadEvent reads a single event from the stream.
    75  //
    76  // If the stream has been exhausted, it returns an invalid event and io.EOF.
    77  func (r *Reader) ReadEvent() (e Event, err error) {
    78  	// Return only io.EOF if we're done.
    79  	if r.done {
    80  		return Event{}, io.EOF
    81  	}
    82  
    83  	// Handle v1 execution traces.
    84  	if r.v1Events != nil {
    85  		if r.syncs == 0 {
    86  			// Always emit a sync event first, if we have any events at all.
    87  			ev, ok := r.v1Events.events.Peek()
    88  			if ok {
    89  				r.syncs++
    90  				return syncEvent(r.v1Events.evt, Time(ev.Ts-1), r.syncs), nil
    91  			}
    92  		}
    93  		ev, err := r.v1Events.next()
    94  		if err == io.EOF {
    95  			// Always emit a sync event at the end.
    96  			r.done = true
    97  			r.syncs++
    98  			return syncEvent(nil, r.v1Events.lastTs+1, r.syncs), nil
    99  		} else if err != nil {
   100  			return Event{}, err
   101  		}
   102  		return ev, nil
   103  	}
   104  
   105  	// Trace v2 parsing algorithm.
   106  	//
   107  	// (1) Read in all the batches for the next generation from the stream.
   108  	//   (a) Use the size field in the header to quickly find all batches.
   109  	// (2) Parse out the strings, stacks, CPU samples, and timestamp conversion data.
   110  	// (3) Group each event batch by M, sorted by timestamp. (batchCursor contains the groups.)
   111  	// (4) Organize batchCursors in a min-heap, ordered by the timestamp of the next event for each M.
   112  	// (5) Try to advance the next event for the M at the top of the min-heap.
   113  	//   (a) On success, select that M.
   114  	//   (b) On failure, sort the min-heap and try to advance other Ms. Select the first M that advances.
   115  	//   (c) If there's nothing left to advance, goto (1).
   116  	// (6) Select the latest event for the selected M and get it ready to be returned.
   117  	// (7) Read the next event for the selected M and update the min-heap.
   118  	// (8) Return the selected event, goto (5) on the next call.
   119  
   120  	// Set us up to track the last timestamp and fix up
   121  	// the timestamp of any event that comes through.
   122  	defer func() {
   123  		if err != nil {
   124  			return
   125  		}
   126  		if err = e.validateTableIDs(); err != nil {
   127  			return
   128  		}
   129  		if e.base.time <= r.lastTs {
   130  			e.base.time = r.lastTs + 1
   131  		}
   132  		r.lastTs = e.base.time
   133  	}()
   134  
   135  	// Consume any events in the ordering first.
   136  	if ev, ok := r.order.Next(); ok {
   137  		return ev, nil
   138  	}
   139  
   140  	// Check if we need to refresh the generation.
   141  	if len(r.frontier) == 0 && len(r.cpuSamples) == 0 {
   142  		if r.spillErr != nil {
   143  			if r.spillErrSync {
   144  				return Event{}, r.spillErr
   145  			}
   146  			r.spillErrSync = true
   147  			r.syncs++
   148  			return syncEvent(nil, r.lastTs, r.syncs), nil
   149  		}
   150  		if r.gen != nil && r.spill == nil {
   151  			// If we have a generation from the last read,
   152  			// and there's nothing left in the frontier, and
   153  			// there's no spilled batch, indicating that there's
   154  			// no further generation, it means we're done.
   155  			// Emit the final sync event.
   156  			r.done = true
   157  			r.syncs++
   158  			return syncEvent(nil, r.lastTs, r.syncs), nil
   159  		}
   160  		// Read the next generation.
   161  		r.gen, r.spill, r.spillErr = readGeneration(r.r, r.spill)
   162  		if r.gen == nil {
   163  			r.spillErrSync = true
   164  			r.syncs++
   165  			return syncEvent(nil, r.lastTs, r.syncs), nil
   166  		}
   167  
   168  		// Reset CPU samples cursor.
   169  		r.cpuSamples = r.gen.cpuSamples
   170  
   171  		// Reset frontier.
   172  		for _, m := range r.gen.batchMs {
   173  			batches := r.gen.batches[m]
   174  			bc := &batchCursor{m: m}
   175  			ok, err := bc.nextEvent(batches, r.gen.freq)
   176  			if err != nil {
   177  				return Event{}, err
   178  			}
   179  			if !ok {
   180  				// Turns out there aren't actually any events in these batches.
   181  				continue
   182  			}
   183  			r.frontier = heapInsert(r.frontier, bc)
   184  		}
   185  		r.syncs++
   186  		if r.lastTs == 0 {
   187  			r.lastTs = r.gen.freq.mul(r.gen.minTs)
   188  		}
   189  		// Always emit a sync event at the beginning of the generation.
   190  		return syncEvent(r.gen.evTable, r.lastTs, r.syncs), nil
   191  	}
   192  	tryAdvance := func(i int) (bool, error) {
   193  		bc := r.frontier[i]
   194  
   195  		if ok, err := r.order.Advance(&bc.ev, r.gen.evTable, bc.m, r.gen.gen); !ok || err != nil {
   196  			return ok, err
   197  		}
   198  
   199  		// Refresh the cursor's event.
   200  		ok, err := bc.nextEvent(r.gen.batches[bc.m], r.gen.freq)
   201  		if err != nil {
   202  			return false, err
   203  		}
   204  		if ok {
   205  			// If we successfully refreshed, update the heap.
   206  			heapUpdate(r.frontier, i)
   207  		} else {
   208  			// There's nothing else to read. Delete this cursor from the frontier.
   209  			r.frontier = heapRemove(r.frontier, i)
   210  		}
   211  		return true, nil
   212  	}
   213  	// Inject a CPU sample if it comes next.
   214  	if len(r.cpuSamples) != 0 {
   215  		if len(r.frontier) == 0 || r.cpuSamples[0].time < r.frontier[0].ev.time {
   216  			e := r.cpuSamples[0].asEvent(r.gen.evTable)
   217  			r.cpuSamples = r.cpuSamples[1:]
   218  			return e, nil
   219  		}
   220  	}
   221  	// Try to advance the head of the frontier, which should have the minimum timestamp.
   222  	// This should be by far the most common case
   223  	if len(r.frontier) == 0 {
   224  		return Event{}, fmt.Errorf("broken trace: frontier is empty:\n[gen=%d]\n\n%s\n%s\n", r.gen.gen, dumpFrontier(r.frontier), dumpOrdering(&r.order))
   225  	}
   226  	if ok, err := tryAdvance(0); err != nil {
   227  		return Event{}, err
   228  	} else if !ok {
   229  		// Try to advance the rest of the frontier, in timestamp order.
   230  		//
   231  		// To do this, sort the min-heap. A sorted min-heap is still a
   232  		// min-heap, but now we can iterate over the rest and try to
   233  		// advance in order. This path should be rare.
   234  		slices.SortFunc(r.frontier, (*batchCursor).compare)
   235  		success := false
   236  		for i := 1; i < len(r.frontier); i++ {
   237  			if ok, err = tryAdvance(i); err != nil {
   238  				return Event{}, err
   239  			} else if ok {
   240  				success = true
   241  				break
   242  			}
   243  		}
   244  		if !success {
   245  			return Event{}, fmt.Errorf("broken trace: failed to advance: frontier:\n[gen=%d]\n\n%s\n%s\n", r.gen.gen, dumpFrontier(r.frontier), dumpOrdering(&r.order))
   246  		}
   247  	}
   248  
   249  	// Pick off the next event on the queue. At this point, one must exist.
   250  	ev, ok := r.order.Next()
   251  	if !ok {
   252  		panic("invariant violation: advance successful, but queue is empty")
   253  	}
   254  	return ev, nil
   255  }
   256  
   257  func dumpFrontier(frontier []*batchCursor) string {
   258  	var sb strings.Builder
   259  	for _, bc := range frontier {
   260  		spec := tracev2.Specs()[bc.ev.typ]
   261  		fmt.Fprintf(&sb, "M %d [%s time=%d", bc.m, spec.Name, bc.ev.time)
   262  		for i, arg := range spec.Args[1:] {
   263  			fmt.Fprintf(&sb, " %s=%d", arg, bc.ev.args[i])
   264  		}
   265  		fmt.Fprintf(&sb, "]\n")
   266  	}
   267  	return sb.String()
   268  }
   269  

View as plain text