Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.opentelemetry.sdk.metrics.export.CollectionRegistration;
import io.opentelemetry.sdk.metrics.export.MetricProducer;
import io.opentelemetry.sdk.metrics.export.MetricReader;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
import io.opentelemetry.sdk.metrics.internal.MeterConfig;
import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilterInternal;
Expand Down Expand Up @@ -94,10 +95,12 @@ public static SdkMeterProviderBuilder builder() {
for (RegisteredReader registeredReader : registeredReaders) {
List<MetricProducer> readerMetricProducers = new ArrayList<>(metricProducers);
readerMetricProducers.add(new LeasedMetricProducer(registry, sharedState, registeredReader));
registeredReader
.getReader()
.register(new SdkCollectionRegistration(readerMetricProducers, sharedState));
MetricReader reader = registeredReader.getReader();
reader.register(new SdkCollectionRegistration(readerMetricProducers, sharedState));
registeredReader.setLastCollectEpochNanos(startEpochNanos);
if (reader instanceof PeriodicMetricReader) {
SdkMeterProviderUtil.setMeterProvider((PeriodicMetricReader) reader, this);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.export;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.sdk.common.internal.ComponentId;
import io.opentelemetry.sdk.common.internal.SemConvAttributes;
import java.util.Collections;
import javax.annotation.Nullable;

final class MetricReaderInstrumentation {

private final DoubleHistogram collectionDuration;
private final Attributes standardAttrs;

MetricReaderInstrumentation(ComponentId componentId, MeterProvider meterProvider) {
Meter meter = meterProvider.get("io.opentelemetry.sdk.metrics");

standardAttrs =
Attributes.of(
SemConvAttributes.OTEL_COMPONENT_TYPE,
componentId.getTypeName(),
SemConvAttributes.OTEL_COMPONENT_NAME,
componentId.getComponentName());

collectionDuration =
meter
.histogramBuilder("otel.sdk.metric_reader.collection.duration")
.setUnit("s")
.setDescription("The duration of the collect operation of the metric reader.")
.setExplicitBucketBoundariesAdvice(Collections.emptyList())
.build();
}

void recordCollection(double seconds, @Nullable String error) {
Attributes attrs = standardAttrs;
if (error != null) {
attrs = attrs.toBuilder().put(SemConvAttributes.ERROR_TYPE, error).build();
}

collectionDuration.record(seconds, attrs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@

package io.opentelemetry.sdk.metrics.export;

import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.sdk.common.Clock;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.common.internal.ComponentId;
import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
Expand Down Expand Up @@ -34,11 +37,17 @@
public final class PeriodicMetricReader implements MetricReader {
private static final Logger logger = Logger.getLogger(PeriodicMetricReader.class.getName());

private static final Clock CLOCK = Clock.getDefault();

private static final ComponentId COMPONENT_ID =
ComponentId.generateLazy("periodic_metric_reader");

private final MetricExporter exporter;
private final long intervalNanos;
private final ScheduledExecutorService scheduler;
private final Scheduled scheduled;
private final Object lock = new Object();

private volatile CollectionRegistration collectionRegistration = CollectionRegistration.noop();

@Nullable private volatile ScheduledFuture<?> scheduledFuture;
Expand Down Expand Up @@ -135,6 +144,14 @@ public void register(CollectionRegistration collectionRegistration) {
start();
}

/**
* Sets the {@link MeterProvider} to export metrics about this {@link PeriodicMetricReader} to.
* Automatically called by the meter provider the reader is registered to.
*/
void setMeterProvider(MeterProvider meterProvider) {
this.scheduled.setMeterProvider(meterProvider);
}

@Override
public String toString() {
return "PeriodicMetricReader{"
Expand All @@ -157,10 +174,18 @@ void start() {
}

private final class Scheduled implements Runnable {

private final AtomicBoolean exportAvailable = new AtomicBoolean(true);

private MetricReaderInstrumentation instrumentation =
new MetricReaderInstrumentation(COMPONENT_ID, MeterProvider.noop());

private Scheduled() {}

void setMeterProvider(MeterProvider meterProvider) {
instrumentation = new MetricReaderInstrumentation(COMPONENT_ID, meterProvider);
}

@Override
public void run() {
// Ignore the CompletableResultCode from doRun() in order to keep run() asynchronous
Expand All @@ -172,7 +197,18 @@ CompletableResultCode doRun() {
CompletableResultCode flushResult = new CompletableResultCode();
if (exportAvailable.compareAndSet(true, false)) {
try {
Collection<MetricData> metricData = collectionRegistration.collectAllMetrics();
long startNanoTime = CLOCK.nanoTime();
String error = null;
Collection<MetricData> metricData;
try {
metricData = collectionRegistration.collectAllMetrics();
} catch (Throwable t) {
error = t.getClass().getName();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is actually possible in practice

throw t;
} finally {
long durationNanos = CLOCK.nanoTime() - startNanoTime;
instrumentation.recordCollection(durationNanos / 1_000_000_000.0, error);
}
if (metricData.isEmpty()) {
logger.log(Level.FINE, "No metric data to export - skipping export.");
flushResult.succeed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@

package io.opentelemetry.sdk.metrics.internal;

import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.common.internal.ScopeConfigurator;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder;
import io.opentelemetry.sdk.metrics.ViewBuilder;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
import io.opentelemetry.sdk.metrics.internal.view.AttributesProcessor;
import io.opentelemetry.sdk.metrics.internal.view.StringPredicates;
import java.lang.reflect.InvocationTargetException;
Expand Down Expand Up @@ -76,6 +78,19 @@ public static SdkMeterProviderBuilder addMeterConfiguratorCondition(
return sdkMeterProviderBuilder;
}

/** Reflectively sets the meter provider for a PeriodicMetricReader to export metrics to. */
public static void setMeterProvider(
PeriodicMetricReader metricReader, SdkMeterProvider meterProvider) {
try {
Method method =
PeriodicMetricReader.class.getDeclaredMethod("setMeterProvider", MeterProvider.class);
method.setAccessible(true);
method.invoke(metricReader, meterProvider);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
throw new IllegalStateException("Error calling setMeterProvider on PeriodicMetricReader", e);
}
}

/**
* Reflectively add an {@link AttributesProcessor} to the {@link ViewBuilder} which appends
* key-values from baggage to all measurements.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics;

import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;

import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.sdk.common.internal.SemConvAttributes;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricExporter;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;

class SdkMeterProviderMetricsTest {
@Test
void simple() {
InMemoryMetricExporter metricExporter = InMemoryMetricExporter.create();
try (SdkMeterProvider meterProvider =
SdkMeterProvider.builder()
.registerMetricReader(PeriodicMetricReader.create(metricExporter))
.build()) {
Meter meter = meterProvider.get("test");

LongCounter counter = meter.counterBuilder("counter").build();

counter.add(1);

meterProvider.forceFlush().join(10, TimeUnit.SECONDS);
metricExporter.reset();
// Export again to export the metric reader's metric.
meterProvider.forceFlush().join(10, TimeUnit.SECONDS);

List<MetricData> metrics = metricExporter.getFinishedMetricItems();
assertThat(metrics)
.satisfiesExactlyInAnyOrder(
m -> assertThat(m).hasName("counter"),
m -> {
assertThat(m)
.hasName("otel.sdk.metric_reader.collection.duration")
.hasHistogramSatisfying(
h ->
h.hasPointsSatisfying(
p ->
p.hasCount(1)
.hasAttributesSatisfying(
equalTo(
SemConvAttributes.OTEL_COMPONENT_TYPE,
"periodic_metric_reader"),
equalTo(
SemConvAttributes.OTEL_COMPONENT_NAME,
"periodic_metric_reader/0"))));
});
}
}
}
Loading