Source file src/internal/trace/batch.go

     1  // Copyright 2023 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package trace
     6  
     7  import (
     8  	"bytes"
     9  	"encoding/binary"
    10  	"fmt"
    11  	"io"
    12  
    13  	"internal/trace/tracev2"
    14  )
    15  
    16  // timestamp is an unprocessed timestamp.
    17  type timestamp uint64
    18  
    19  // batch represents a batch of trace events.
    20  // It is unparsed except for its header.
    21  type batch struct {
    22  	m    ThreadID
    23  	time timestamp
    24  	data []byte
    25  	exp  tracev2.Experiment
    26  }
    27  
    28  func (b *batch) isStringsBatch() bool {
    29  	return b.exp == tracev2.NoExperiment && len(b.data) > 0 && tracev2.EventType(b.data[0]) == tracev2.EvStrings
    30  }
    31  
    32  func (b *batch) isStacksBatch() bool {
    33  	return b.exp == tracev2.NoExperiment && len(b.data) > 0 && tracev2.EventType(b.data[0]) == tracev2.EvStacks
    34  }
    35  
    36  func (b *batch) isCPUSamplesBatch() bool {
    37  	return b.exp == tracev2.NoExperiment && len(b.data) > 0 && tracev2.EventType(b.data[0]) == tracev2.EvCPUSamples
    38  }
    39  
    40  func (b *batch) isFreqBatch() bool {
    41  	return b.exp == tracev2.NoExperiment && len(b.data) > 0 && tracev2.EventType(b.data[0]) == tracev2.EvFrequency
    42  }
    43  
    44  // readBatch reads the next full batch from r.
    45  func readBatch(r interface {
    46  	io.Reader
    47  	io.ByteReader
    48  }) (batch, uint64, error) {
    49  	// Read batch header byte.
    50  	b, err := r.ReadByte()
    51  	if err != nil {
    52  		return batch{}, 0, err
    53  	}
    54  	if typ := tracev2.EventType(b); typ != tracev2.EvEventBatch && typ != tracev2.EvExperimentalBatch {
    55  		return batch{}, 0, fmt.Errorf("expected batch event, got event %d", typ)
    56  	}
    57  
    58  	// Read the experiment of we have one.
    59  	exp := tracev2.NoExperiment
    60  	if tracev2.EventType(b) == tracev2.EvExperimentalBatch {
    61  		e, err := r.ReadByte()
    62  		if err != nil {
    63  			return batch{}, 0, err
    64  		}
    65  		exp = tracev2.Experiment(e)
    66  	}
    67  
    68  	// Read the batch header: gen (generation), thread (M) ID, base timestamp
    69  	// for the batch.
    70  	gen, err := binary.ReadUvarint(r)
    71  	if err != nil {
    72  		return batch{}, gen, fmt.Errorf("error reading batch gen: %w", err)
    73  	}
    74  	m, err := binary.ReadUvarint(r)
    75  	if err != nil {
    76  		return batch{}, gen, fmt.Errorf("error reading batch M ID: %w", err)
    77  	}
    78  	ts, err := binary.ReadUvarint(r)
    79  	if err != nil {
    80  		return batch{}, gen, fmt.Errorf("error reading batch timestamp: %w", err)
    81  	}
    82  
    83  	// Read in the size of the batch to follow.
    84  	size, err := binary.ReadUvarint(r)
    85  	if err != nil {
    86  		return batch{}, gen, fmt.Errorf("error reading batch size: %w", err)
    87  	}
    88  	if size > tracev2.MaxBatchSize {
    89  		return batch{}, gen, fmt.Errorf("invalid batch size %d, maximum is %d", size, tracev2.MaxBatchSize)
    90  	}
    91  
    92  	// Copy out the batch for later processing.
    93  	var data bytes.Buffer
    94  	data.Grow(int(size))
    95  	n, err := io.CopyN(&data, r, int64(size))
    96  	if n != int64(size) {
    97  		return batch{}, gen, fmt.Errorf("failed to read full batch: read %d but wanted %d", n, size)
    98  	}
    99  	if err != nil {
   100  		return batch{}, gen, fmt.Errorf("copying batch data: %w", err)
   101  	}
   102  
   103  	// Return the batch.
   104  	return batch{
   105  		m:    ThreadID(m),
   106  		time: timestamp(ts),
   107  		data: data.Bytes(),
   108  		exp:  exp,
   109  	}, gen, nil
   110  }
   111  

View as plain text