Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` that enables adding `__unit__` and `__type__` labels for remote write v2 and OTLP requests. This is a breaking change; the `-distributor.otlp.enable-type-and-unit-labels` flag is now deprecated, operates as a no-op, and has been consolidated into this new flag. #7077
* [FEATURE] Querier: Add experimental projection pushdown support in Parquet Queryable. #7152
* [FEATURE] Ingester: Add experimental active series queried metric. #7173
* [FEATURE] Tenant Federation: Add experimental support for partial responses using the `-tenant-federation.allow-partial-data` flag. When enabled, failures from individual tenants during a federated query are treated as warnings, allowing results from successful tenants to be returned. #7232
* [ENHANCEMENT] Querier: Add `-querier.store-gateway-series-batch-size` flag to configure the maximum number of series to be batched in a single gRPC response message from Store Gateways. #7203
* [ENHANCEMENT] Ingester: Add support for ingesting Native Histogram with Custom Buckets. #7191
* [ENHANCEMENT] Ingester: Optimize labels out-of-order (ooo) check by allowing the iteration to terminate immediately upon finding the first unsorted label. #7186
Expand Down
5 changes: 5 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,11 @@ tenant_federation:
# CLI flag: -tenant-federation.user-sync-interval
[user_sync_interval: <duration> | default = 5m]

# [Experimental] If enabled, query errors from individual tenants are treated
# as warnings, allowing partial results to be returned.
# CLI flag: -tenant-federation.allow-partial-data
[allow_partial_data: <boolean> | default = false]

# The ruler_config configures the Cortex ruler.
[ruler: <ruler_config>]

Expand Down
1 change: 1 addition & 0 deletions docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ Currently experimental features are:
- `-tenant-federation.enabled`
- `-tenant-federation.regex-matcher-enabled`
- `-tenant-federation.user-sync-interval`
- `-tenant-federation.allow-partial-data`
- The thanosconvert tool for converting Thanos block metadata to Cortex
- HA Tracker: cleanup of old replicas from KV Store.
- Instance limits in ingester and distributor
Expand Down
74 changes: 74 additions & 0 deletions integration/querier_tenant_federation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,80 @@ func runQuerierTenantFederationTest(t *testing.T, cfg querierTenantFederationCon
// TODO: check fairness in queryfrontend
}

func TestQuerierTenantFederation_PartialData(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

memcached := e2ecache.NewMemcached()
consul := e2edb.NewConsul()
require.NoError(t, s.StartAndWaitReady(consul, memcached))

flags := mergeFlags(BlocksStorageFlags(), map[string]string{
"-querier.cache-results": "true",
"-querier.split-queries-by-interval": "24h",
"-frontend.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
"-tenant-federation.enabled": "true",
// Allow query federation partial data
"-tenant-federation.allow-partial-data": "true",
"-querier.max-fetched-series-per-query": "5", // to trigger failure
})

minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(minio))

queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", flags, "")
require.NoError(t, s.Start(queryFrontend))

flags["-querier.frontend-address"] = queryFrontend.NetworkGRPCEndpoint()
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")

require.NoError(t, s.StartAndWaitReady(querier, ingester, distributor))
require.NoError(t, s.WaitReady(queryFrontend))

// Wait until distributor and queriers have updated the ring.
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))

now := time.Now()

userPassID := "user-pass"
cPass, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", userPassID)
require.NoError(t, err)

seriesPass, expectedVectorPass := generateSeries("series_good", now)
res, err := cPass.Push(seriesPass)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

userFailID := "user-fail"
cFail, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", userFailID)
require.NoError(t, err)

var seriesFail []prompb.TimeSeries
seriesNum := 10 // to trigger fail
for i := range seriesNum {
s, _ := generateSeries(fmt.Sprintf("series_bad_%d", i), now)
seriesFail = append(seriesFail, s...)
}
res, err = cFail.Push(seriesFail)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

multiTenantID := userPassID + "|" + userFailID
cFederated, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), queryFrontend.HTTPEndpoint(), "", "", multiTenantID)
require.NoError(t, err)

result, err := cFederated.Query("series_good", now)
require.NoError(t, err)
expectedResult := mergeResults([]string{userPassID}, []model.Vector{expectedVectorPass})

require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, expectedResult, result.(model.Vector))
}

func mergeResults(tenantIDs []string, resultsPerTenant []model.Vector) model.Vector {
var v model.Vector
for pos, tenantID := range tenantIDs {
Expand Down
11 changes: 6 additions & 5 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,18 +288,19 @@ func (t *Cortex) initTenantFederation() (serv services.Service, err error) {
// federation.
byPassForSingleQuerier := true

t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, t.Cfg.TenantFederation.MaxConcurrent, byPassForSingleQuerier, prometheus.DefaultRegisterer))
t.MetadataQuerier = tenantfederation.NewMetadataQuerier(t.MetadataQuerier, t.Cfg.TenantFederation.MaxConcurrent, prometheus.DefaultRegisterer)
t.ExemplarQueryable = tenantfederation.NewExemplarQueryable(t.ExemplarQueryable, t.Cfg.TenantFederation.MaxConcurrent, byPassForSingleQuerier, prometheus.DefaultRegisterer)
reg := prometheus.DefaultRegisterer
t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, t.Cfg.TenantFederation, byPassForSingleQuerier, reg))
t.MetadataQuerier = tenantfederation.NewMetadataQuerier(t.MetadataQuerier, t.Cfg.TenantFederation, reg)
t.ExemplarQueryable = tenantfederation.NewExemplarQueryable(t.ExemplarQueryable, t.Cfg.TenantFederation, byPassForSingleQuerier, reg)

if t.Cfg.TenantFederation.RegexMatcherEnabled {
util_log.WarnExperimentalUse("tenant-federation.regex-matcher-enabled")

bucketClientFactory := func(ctx context.Context) (objstore.InstrumentedBucket, error) {
return bucket.NewClient(ctx, t.Cfg.BlocksStorage.Bucket, nil, "regex-resolver", util_log.Logger, prometheus.DefaultRegisterer)
return bucket.NewClient(ctx, t.Cfg.BlocksStorage.Bucket, nil, "regex-resolver", util_log.Logger, reg)
}

regexResolver, err := tenantfederation.NewRegexResolver(t.Cfg.BlocksStorage.UsersScanner, t.Cfg.TenantFederation, prometheus.DefaultRegisterer, bucketClientFactory, util_log.Logger)
regexResolver, err := tenantfederation.NewRegexResolver(t.Cfg.BlocksStorage.UsersScanner, t.Cfg.TenantFederation, reg, bucketClientFactory, util_log.Logger)
if err != nil {
return nil, fmt.Errorf("failed to initialize regex resolver: %v", err)
}
Expand Down
20 changes: 17 additions & 3 deletions pkg/querier/tenantfederation/exemplar_merge_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,15 @@ import (
// If the label "__tenant_id__" is already existing, its value is overwritten
// by the tenant ID and the previous value is exposed through a new label
// prefixed with "original_". This behaviour is not implemented recursively.
func NewExemplarQueryable(upstream storage.ExemplarQueryable, maxConcurrent int, byPassWithSingleQuerier bool, reg prometheus.Registerer) storage.ExemplarQueryable {
return NewMergeExemplarQueryable(defaultTenantLabel, maxConcurrent, tenantExemplarQuerierCallback(upstream), byPassWithSingleQuerier, reg)
func NewExemplarQueryable(upstream storage.ExemplarQueryable, cfg Config, byPassWithSingleQuerier bool, reg prometheus.Registerer) storage.ExemplarQueryable {
return NewMergeExemplarQueryable(
defaultTenantLabel,
cfg.MaxConcurrent,
tenantExemplarQuerierCallback(upstream),
byPassWithSingleQuerier,
cfg.AllowPartialData,
reg,
)
}

func tenantExemplarQuerierCallback(exemplarQueryable storage.ExemplarQueryable) MergeExemplarQuerierCallback {
Expand Down Expand Up @@ -68,10 +75,11 @@ type MergeExemplarQuerierCallback func(ctx context.Context) (ids []string, queri
// If the label `idLabelName` is already existing, its value is overwritten and
// the previous value is exposed through a new label prefixed with "original_".
// This behaviour is not implemented recursively.
func NewMergeExemplarQueryable(idLabelName string, maxConcurrent int, callback MergeExemplarQuerierCallback, byPassWithSingleQuerier bool, reg prometheus.Registerer) storage.ExemplarQueryable {
func NewMergeExemplarQueryable(idLabelName string, maxConcurrent int, callback MergeExemplarQuerierCallback, byPassWithSingleQuerier, allowPartialData bool, reg prometheus.Registerer) storage.ExemplarQueryable {
return &mergeExemplarQueryable{
idLabelName: idLabelName,
byPassWithSingleQuerier: byPassWithSingleQuerier,
allowPartialData: allowPartialData,
callback: callback,
maxConcurrent: maxConcurrent,

Expand All @@ -88,6 +96,7 @@ type mergeExemplarQueryable struct {
idLabelName string
maxConcurrent int
byPassWithSingleQuerier bool
allowPartialData bool
callback MergeExemplarQuerierCallback
tenantsPerExemplarQuery prometheus.Histogram
}
Expand All @@ -113,6 +122,7 @@ func (m *mergeExemplarQueryable) ExemplarQuerier(ctx context.Context) (storage.E
tenantIds: ids,
queriers: queriers,
byPassWithSingleQuerier: m.byPassWithSingleQuerier,
allowPartialData: m.allowPartialData,
}, nil
}

Expand All @@ -129,6 +139,7 @@ type mergeExemplarQuerier struct {
tenantIds []string
queriers []storage.ExemplarQuerier
byPassWithSingleQuerier bool
allowPartialData bool
}

type exemplarSelectJob struct {
Expand Down Expand Up @@ -170,6 +181,9 @@ func (m mergeExemplarQuerier) Select(start, end int64, matchers ...[]*labels.Mat

res, err := job.querier.Select(start, end, allUnrelatedMatchers...)
if err != nil {
if m.allowPartialData {
return nil
}
return errors.Wrapf(err, "error exemplars querying %s %s", rewriteLabelName(m.idLabelName), job.id)
}

Expand Down
94 changes: 78 additions & 16 deletions pkg/querier/tenantfederation/exemplar_merge_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,14 @@ func Test_MergeExemplarQuerier_Select(t *testing.T) {
users.WithDefaultResolver(users.NewMultiResolver())

tests := []struct {
name string
upstream mockExemplarQueryable
matcher [][]*labels.Matcher
orgId string
expectedResult []exemplar.QueryResult
expectedErr error
expectedMetrics string
name string
upstream mockExemplarQueryable
matcher [][]*labels.Matcher
orgId string
allowPartialData bool
expectedResult []exemplar.QueryResult
expectedErr error
expectedMetrics string
}{
{
name: "should be treated as single tenant",
Expand Down Expand Up @@ -295,12 +296,42 @@ func Test_MergeExemplarQuerier_Select(t *testing.T) {
},
expectedErr: errors.New("some error"),
},
{
name: "get error from one querier (partial data enabled)",
upstream: mockExemplarQueryable{exemplarQueriers: map[string]storage.ExemplarQuerier{
"user-1": &mockExemplarQuerier{res: getFixtureExemplarResult1()},
"user-2": &mockExemplarQuerier{err: errors.New("some error")},
}},
matcher: [][]*labels.Matcher{{
labels.MustNewMatcher(labels.MatchEqual, "__name__", "exemplar_series"),
}},
orgId: "user-1|user-2",
allowPartialData: true,
expectedResult: []exemplar.QueryResult{
{
SeriesLabels: labels.FromStrings("__name__", "exemplar_series", "__tenant_id__", "user-1"),
Exemplars: []exemplar.Exemplar{
{
Labels: labels.FromStrings("traceID", "123"),
Value: 123,
Ts: 1734942337900,
},
},
},
},
expectedMetrics: expectedTwoTenantsExemplarMetrics,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
exemplarQueryable := NewExemplarQueryable(&test.upstream, defaultMaxConcurrency, true, reg)

cfg := Config{
MaxConcurrent: defaultMaxConcurrency,
AllowPartialData: test.allowPartialData,
}
exemplarQueryable := NewExemplarQueryable(&test.upstream, cfg, true, reg)
ctx := user.InjectOrgID(context.Background(), test.orgId)
q, err := exemplarQueryable.ExemplarQuerier(ctx)
require.NoError(t, err)
Expand Down Expand Up @@ -346,13 +377,14 @@ func Test_MergeExemplarQuerier_Select_WhenUseRegexResolver(t *testing.T) {
})

tests := []struct {
name string
upstream mockExemplarQueryable
matcher [][]*labels.Matcher
orgId string
expectedResult []exemplar.QueryResult
expectedErr error
expectedMetrics string
name string
upstream mockExemplarQueryable
matcher [][]*labels.Matcher
orgId string
allowPartialData bool
expectedResult []exemplar.QueryResult
expectedErr error
expectedMetrics string
}{
{
name: "result labels should contains __tenant_id__ even if one tenant is queried",
Expand Down Expand Up @@ -412,12 +444,42 @@ func Test_MergeExemplarQuerier_Select_WhenUseRegexResolver(t *testing.T) {
},
expectedMetrics: expectedTwoTenantsExemplarMetrics,
},
{
name: "get error from one querier (partial data enabled)",
upstream: mockExemplarQueryable{exemplarQueriers: map[string]storage.ExemplarQuerier{
"user-1": &mockExemplarQuerier{res: getFixtureExemplarResult1()},
"user-2": &mockExemplarQuerier{err: errors.New("some error")},
}},
matcher: [][]*labels.Matcher{{
labels.MustNewMatcher(labels.MatchEqual, "__name__", "exemplar_series"),
}},
orgId: "user-.+",
allowPartialData: true,
expectedResult: []exemplar.QueryResult{
{
SeriesLabels: labels.FromStrings("__name__", "exemplar_series", "__tenant_id__", "user-1"),
Exemplars: []exemplar.Exemplar{
{
Labels: labels.FromStrings("traceID", "123"),
Value: 123,
Ts: 1734942337900,
},
},
},
},
expectedMetrics: expectedTwoTenantsExemplarMetrics,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
exemplarQueryable := NewExemplarQueryable(&test.upstream, defaultMaxConcurrency, false, reg)

cfg := Config{
MaxConcurrent: defaultMaxConcurrency,
AllowPartialData: test.allowPartialData,
}
exemplarQueryable := NewExemplarQueryable(&test.upstream, cfg, false, reg)
ctx := user.InjectOrgID(context.Background(), test.orgId)
q, err := exemplarQueryable.ExemplarQuerier(ctx)
require.NoError(t, err)
Expand Down
Loading
Loading