Source file src/internal/trace/batchcursor.go

     1  // Copyright 2023 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package trace
     6  
     7  import (
     8  	"cmp"
     9  	"encoding/binary"
    10  	"fmt"
    11  
    12  	"internal/trace/tracev2"
    13  )
    14  
    15  type batchCursor struct {
    16  	m       ThreadID
    17  	lastTs  Time
    18  	idx     int       // next index into []batch
    19  	dataOff int       // next index into batch.data
    20  	ev      baseEvent // last read event
    21  }
    22  
    23  func (b *batchCursor) nextEvent(batches []batch, freq frequency) (ok bool, err error) {
    24  	// Batches should generally always have at least one event,
    25  	// but let's be defensive about that and accept empty batches.
    26  	for b.idx < len(batches) && len(batches[b.idx].data) == b.dataOff {
    27  		b.idx++
    28  		b.dataOff = 0
    29  		b.lastTs = 0
    30  	}
    31  	// Have we reached the end of the batches?
    32  	if b.idx == len(batches) {
    33  		return false, nil
    34  	}
    35  	// Initialize lastTs if it hasn't been yet.
    36  	if b.lastTs == 0 {
    37  		b.lastTs = freq.mul(batches[b.idx].time)
    38  	}
    39  	// Read an event out.
    40  	n, tsdiff, err := readTimedBaseEvent(batches[b.idx].data[b.dataOff:], &b.ev)
    41  	if err != nil {
    42  		return false, err
    43  	}
    44  	// Complete the timestamp from the cursor's last timestamp.
    45  	b.ev.time = freq.mul(tsdiff) + b.lastTs
    46  
    47  	// Move the cursor's timestamp forward.
    48  	b.lastTs = b.ev.time
    49  
    50  	// Move the cursor forward.
    51  	b.dataOff += n
    52  	return true, nil
    53  }
    54  
    55  func (b *batchCursor) compare(a *batchCursor) int {
    56  	return cmp.Compare(b.ev.time, a.ev.time)
    57  }
    58  
    59  // readTimedBaseEvent reads out the raw event data from b
    60  // into e. It does not try to interpret the arguments
    61  // but it does validate that the event is a regular
    62  // event with a timestamp (vs. a structural event).
    63  //
    64  // It requires that the event its reading be timed, which must
    65  // be the case for every event in a plain EventBatch.
    66  func readTimedBaseEvent(b []byte, e *baseEvent) (int, timestamp, error) {
    67  	// Get the event type.
    68  	typ := tracev2.EventType(b[0])
    69  	specs := tracev2.Specs()
    70  	if int(typ) >= len(specs) {
    71  		return 0, 0, fmt.Errorf("found invalid event type: %v", typ)
    72  	}
    73  	e.typ = typ
    74  
    75  	// Get spec.
    76  	spec := &specs[typ]
    77  	if len(spec.Args) == 0 || !spec.IsTimedEvent {
    78  		return 0, 0, fmt.Errorf("found event without a timestamp: type=%v", typ)
    79  	}
    80  	n := 1
    81  
    82  	// Read timestamp diff.
    83  	ts, nb := binary.Uvarint(b[n:])
    84  	if nb <= 0 {
    85  		return 0, 0, fmt.Errorf("found invalid uvarint for timestamp")
    86  	}
    87  	n += nb
    88  
    89  	// Read the rest of the arguments.
    90  	for i := 0; i < len(spec.Args)-1; i++ {
    91  		arg, nb := binary.Uvarint(b[n:])
    92  		if nb <= 0 {
    93  			return 0, 0, fmt.Errorf("found invalid uvarint")
    94  		}
    95  		e.args[i] = arg
    96  		n += nb
    97  	}
    98  	return n, timestamp(ts), nil
    99  }
   100  
   101  func heapInsert(heap []*batchCursor, bc *batchCursor) []*batchCursor {
   102  	// Add the cursor to the end of the heap.
   103  	heap = append(heap, bc)
   104  
   105  	// Sift the new entry up to the right place.
   106  	heapSiftUp(heap, len(heap)-1)
   107  	return heap
   108  }
   109  
   110  func heapUpdate(heap []*batchCursor, i int) {
   111  	// Try to sift up.
   112  	if heapSiftUp(heap, i) != i {
   113  		return
   114  	}
   115  	// Try to sift down, if sifting up failed.
   116  	heapSiftDown(heap, i)
   117  }
   118  
   119  func heapRemove(heap []*batchCursor, i int) []*batchCursor {
   120  	// Sift index i up to the root, ignoring actual values.
   121  	for i > 0 {
   122  		heap[(i-1)/2], heap[i] = heap[i], heap[(i-1)/2]
   123  		i = (i - 1) / 2
   124  	}
   125  	// Swap the root with the last element, then remove it.
   126  	heap[0], heap[len(heap)-1] = heap[len(heap)-1], heap[0]
   127  	heap = heap[:len(heap)-1]
   128  	// Sift the root down.
   129  	heapSiftDown(heap, 0)
   130  	return heap
   131  }
   132  
   133  func heapSiftUp(heap []*batchCursor, i int) int {
   134  	for i > 0 && heap[(i-1)/2].ev.time > heap[i].ev.time {
   135  		heap[(i-1)/2], heap[i] = heap[i], heap[(i-1)/2]
   136  		i = (i - 1) / 2
   137  	}
   138  	return i
   139  }
   140  
   141  func heapSiftDown(heap []*batchCursor, i int) int {
   142  	for {
   143  		m := min3(heap, i, 2*i+1, 2*i+2)
   144  		if m == i {
   145  			// Heap invariant already applies.
   146  			break
   147  		}
   148  		heap[i], heap[m] = heap[m], heap[i]
   149  		i = m
   150  	}
   151  	return i
   152  }
   153  
   154  func min3(b []*batchCursor, i0, i1, i2 int) int {
   155  	minIdx := i0
   156  	minT := maxTime
   157  	if i0 < len(b) {
   158  		minT = b[i0].ev.time
   159  	}
   160  	if i1 < len(b) {
   161  		if t := b[i1].ev.time; t < minT {
   162  			minT = t
   163  			minIdx = i1
   164  		}
   165  	}
   166  	if i2 < len(b) {
   167  		if t := b[i2].ev.time; t < minT {
   168  			minT = t
   169  			minIdx = i2
   170  		}
   171  	}
   172  	return minIdx
   173  }
   174  

View as plain text