本文主要研究一下cortex的Distributor
cortex/pkg/distributor/distributor.go
// Distributor is a storage.SampleAppender and a client.Querier which
// forwards appends and queries to individual ingesters.
type Distributor struct {
services.Service
cfg Config
ingestersRing ring.ReadRing
ingesterPool *ring_client.Pool
limits *validation.Overrides
// The global rate limiter requires a distributors ring to count
// the number of healthy instances
distributorsRing *ring.Lifecycler
// For handling HA replicas.
HATracker *haTracker
// Per-user rate limiter.
ingestionRateLimiter *limiter.RateLimiter
// Manager for subservices (HA Tracker, distributor ring and client pool)
subservices *services.Manager
subservicesWatcher *services.FailureWatcher
}
Distributor用于转发、追加、查询ingesters
cortex/pkg/distributor/distributor.go
// Push implements client.IngesterServer
func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*client.WriteResponse, error) {
userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}
source := util.GetSourceIPsFromOutgoingCtx(ctx)
var firstPartialErr error
removeReplica := false
numSamples := 0
for _, ts := range req.Timeseries {
numSamples += len(ts.Samples)
}
// Count the total samples in, prior to validation or deduplication, for comparison with other metrics.
incomingSamples.WithLabelValues(userID).Add(float64(numSamples))
// Count the total number of metadata in.
incomingMetadata.WithLabelValues(userID).Add(float64(len(req.Metadata)))
// A WriteRequest can only contain series or metadata but not both. This might change in the future.
// For each timeseries or samples, we compute a hash to distribute across ingesters;
// check each sample/metadata and discard if outside limits.
validatedTimeseries := make([]client.PreallocTimeseries, 0, len(req.Timeseries))
validatedMetadata := make([]*client.MetricMetadata, 0, len(req.Metadata))
metadataKeys := make([]uint32, 0, len(req.Metadata))
seriesKeys := make([]uint32, 0, len(req.Timeseries))
validatedSamples := 0
if d.limits.AcceptHASamples(userID) && len(req.Timeseries) > 0 {
cluster, replica := findHALabels(d.limits.HAReplicaLabel(userID), d.limits.HAClusterLabel(userID), req.Timeseries[0].Labels)
removeReplica, err = d.checkSample(ctx, userID, cluster, replica)
if err != nil {
// Ensure the request slice is reused if the series get deduped.
client.ReuseSlice(req.Timeseries)
if errors.Is(err, replicasNotMatchError{}) {
// These samples have been deduped.
dedupedSamples.WithLabelValues(userID, cluster).Add(float64(numSamples))
return nil, httpgrpc.Errorf(http.StatusAccepted, err.Error())
}
if errors.Is(err, tooManyClustersError{}) {
validation.DiscardedSamples.WithLabelValues(validation.TooManyHAClusters, userID).Add(float64(numSamples))
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
return nil, err
}
// If there wasn't an error but removeReplica is false that means we didn't find both HA labels.
if !removeReplica {
nonHASamples.WithLabelValues(userID).Add(float64(numSamples))
}
}
latestSampleTimestampMs := int64(0)
defer func() {
// Update this metric even in case of errors.
if latestSampleTimestampMs > 0 {
latestSeenSampleTimestampPerUser.WithLabelValues(userID).Set(float64(latestSampleTimestampMs) / 1000)
}
}()
// For each timeseries, compute a hash to distribute across ingesters;
// check each sample and discard if outside limits.
for _, ts := range req.Timeseries {
// Use timestamp of latest sample in the series. If samples for series are not ordered, metric for user may be wrong.
if len(ts.Samples) > 0 {
latestSampleTimestampMs = util.Max64(latestSampleTimestampMs, ts.Samples[len(ts.Samples)-1].TimestampMs)
}
if mrc := d.limits.MetricRelabelConfigs(userID); len(mrc) > 0 {
l := relabel.Process(client.FromLabelAdaptersToLabels(ts.Labels), mrc...)
ts.Labels = client.FromLabelsToLabelAdapters(l)
}
// If we found both the cluster and replica labels, we only want to include the cluster label when
// storing series in Cortex. If we kept the replica label we would end up with another series for the same
// series we're trying to dedupe when HA tracking moves over to a different replica.
if removeReplica {
removeLabel(d.limits.HAReplicaLabel(userID), &ts.Labels)
}
for _, labelName := range d.limits.DropLabels(userID) {
removeLabel(labelName, &ts.Labels)
}
if len(ts.Labels) == 0 {
continue
}
// We rely on sorted labels in different places:
// 1) When computing token for labels, and sharding by all labels. Here different order of labels returns
// different tokens, which is bad.
// 2) In validation code, when checking for duplicate label names. As duplicate label names are rejected
// later in the validation phase, we ignore them here.
sortLabelsIfNeeded(ts.Labels)
// Generate the sharding token based on the series labels without the HA replica
// label and dropped labels (if any)
key, err := d.tokenForLabels(userID, ts.Labels)
if err != nil {
return nil, err
}
validatedSeries, err := d.validateSeries(ts, userID)
// Errors in validation are considered non-fatal, as one series in a request may contain
// invalid data but all the remaining series could be perfectly valid.
if err != nil && firstPartialErr == nil {
firstPartialErr = err
}
// validateSeries would have returned an emptyPreallocSeries if there were no valid samples.
if validatedSeries == emptyPreallocSeries {
continue
}
seriesKeys = append(seriesKeys, key)
validatedTimeseries = append(validatedTimeseries, validatedSeries)
validatedSamples += len(ts.Samples)
}
for _, m := range req.Metadata {
err := validation.ValidateMetadata(d.limits, userID, m)
if err != nil {
if firstPartialErr == nil {
firstPartialErr = err
}
continue
}
metadataKeys = append(metadataKeys, d.tokenForMetadata(userID, m.MetricFamilyName))
validatedMetadata = append(validatedMetadata, m)
}
receivedSamples.WithLabelValues(userID).Add(float64(validatedSamples))
receivedMetadata.WithLabelValues(userID).Add(float64(len(validatedMetadata)))
if len(seriesKeys) == 0 && len(metadataKeys) == 0 {
// Ensure the request slice is reused if there's no series or metadata passing the validation.
client.ReuseSlice(req.Timeseries)
return &client.WriteResponse{}, firstPartialErr
}
now := time.Now()
totalN := validatedSamples + len(validatedMetadata)
if !d.ingestionRateLimiter.AllowN(now, userID, totalN) {
// Ensure the request slice is reused if the request is rate limited.
client.ReuseSlice(req.Timeseries)
// Return a 4xx here to have the client discard the data and not retry. If a client
// is sending too much data consistently we will unlikely ever catch up otherwise.
validation.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamples))
validation.DiscardedMetadata.WithLabelValues(validation.RateLimited, userID).Add(float64(len(validatedMetadata)))
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.ingestionRateLimiter.Limit(now, userID), validatedSamples, len(validatedMetadata))
}
subRing := d.ingestersRing
// Obtain a subring if required.
if d.cfg.ShardingStrategy == util.ShardingStrategyShuffle {
subRing = d.ingestersRing.ShuffleShard(userID, d.limits.IngestionTenantShardSize(userID))
}
keys := append(seriesKeys, metadataKeys...)
initialMetadataIndex := len(seriesKeys)
op := ring.WriteNoExtend
if d.cfg.ExtendWrites {
op = ring.Write
}
err = ring.DoBatch(ctx, op, subRing, keys, func(ingester ring.IngesterDesc, indexes []int) error {
timeseries := make([]client.PreallocTimeseries, 0, len(indexes))
var metadata []*client.MetricMetadata
for _, i := range indexes {
if i >= initialMetadataIndex {
metadata = append(metadata, validatedMetadata[i-initialMetadataIndex])
} else {
timeseries = append(timeseries, validatedTimeseries[i])
}
}
// Use a background context to make sure all ingesters get samples even if we return early
localCtx, cancel := context.WithTimeout(context.Background(), d.cfg.RemoteTimeout)
defer cancel()
localCtx = user.InjectOrgID(localCtx, userID)
if sp := opentracing.SpanFromContext(ctx); sp != nil {
localCtx = opentracing.ContextWithSpan(localCtx, sp)
}
// Get clientIP(s) from Context and add it to localCtx
localCtx = util.AddSourceIPsToOutgoingContext(localCtx, source)
return d.send(localCtx, ingester, timeseries, metadata, req.Source)
}, func() { client.ReuseSlice(req.Timeseries) })
if err != nil {
return nil, err
}
return &client.WriteResponse{}, firstPartialErr
}
Push方法在d.cfg.ShardingStrategy为util.ShardingStrategyShuffle时,会通过d.ingestersRing.ShuffleShard确定subRing;之后通过ring.DoBatch提交keys,其callback函数执行d.send(localCtx, ingester, timeseries, metadata, req.Source)
cortex/pkg/ring/batch.go
// DoBatch request against a set of keys in the ring, handling replication and
// failures. For example if we want to write N items where they may all
// hit different ingesters, and we want them all replicated R ways with
// quorum writes, we track the relationship between batch RPCs and the items
// within them.
//
// Callback is passed the ingester to target, and the indexes of the keys
// to send to that ingester.
//
// Not implemented as a method on Ring so we can test separately.
func DoBatch(ctx context.Context, op Operation, r ReadRing, keys []uint32, callback func(IngesterDesc, []int) error, cleanup func()) error {
if r.IngesterCount() <= 0 {
return fmt.Errorf("DoBatch: IngesterCount <= 0")
}
expectedTrackers := len(keys) * (r.ReplicationFactor() + 1) / r.IngesterCount()
itemTrackers := make([]itemTracker, len(keys))
ingesters := make(map[string]ingester, r.IngesterCount())
var (
bufDescs [GetBufferSize]IngesterDesc
bufHosts [GetBufferSize]string
bufZones [GetBufferSize]string
)
for i, key := range keys {
replicationSet, err := r.Get(key, op, bufDescs[:0], bufHosts[:0], bufZones[:0])
if err != nil {
return err
}
itemTrackers[i].minSuccess = len(replicationSet.Ingesters) - replicationSet.MaxErrors
itemTrackers[i].maxFailures = replicationSet.MaxErrors
for _, desc := range replicationSet.Ingesters {
curr, found := ingesters[desc.Addr]
if !found {
curr.itemTrackers = make([]*itemTracker, 0, expectedTrackers)
curr.indexes = make([]int, 0, expectedTrackers)
}
ingesters[desc.Addr] = ingester{
desc: desc,
itemTrackers: append(curr.itemTrackers, &itemTrackers[i]),
indexes: append(curr.indexes, i),
}
}
}
tracker := batchTracker{
done: make(chan struct{}, 1),
err: make(chan error, 1),
}
tracker.rpcsPending.Store(int32(len(itemTrackers)))
var wg sync.WaitGroup
wg.Add(len(ingesters))
for _, i := range ingesters {
go func(i ingester) {
err := callback(i.desc, i.indexes)
tracker.record(i.itemTrackers, err)
wg.Done()
}(i)
}
// Perform cleanup at the end.
go func() {
wg.Wait()
cleanup()
}()
select {
case err := <-tracker.err:
return err
case <-tracker.done:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
DoBatch方法提供了callback函数用于处理ingester及indexes
cortex/pkg/distributor/query.go
// Query multiple ingesters and returns a Matrix of samples.
func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) {
var matrix model.Matrix
err := instrument.CollectedRequest(ctx, "Distributor.Query", queryDuration, instrument.ErrorCode, func(ctx context.Context) error {
req, err := ingester_client.ToQueryRequest(from, to, matchers)
if err != nil {
return err
}
replicationSet, err := d.GetIngestersForQuery(ctx, matchers...)
if err != nil {
return err
}
matrix, err = d.queryIngesters(ctx, replicationSet, req)
if err != nil {
return err
}
if s := opentracing.SpanFromContext(ctx); s != nil {
s.LogKV("series", len(matrix))
}
return nil
})
return matrix, err
}
Query方法通过d.GetIngestersForQuery获取replicationSet,再通过d.queryIngesters获取matrix
cortex的Distributor提供了Push、Query方法;Push方法会通过d.ingestersRing.ShuffleShard确定subRing;之后通过ring.DoBatch提交keys;Query方法通过d.GetIngestersForQuery获取replicationSet,再通过d.queryIngesters获取matrix。