Skip to content

Commit

Permalink
Merge pull request #54 from elandau/feature/gradient_tweak
Browse files Browse the repository at this point in the history
Enforce min_rtt_threshold at sample collector
  • Loading branch information
elandau authored Jun 14, 2018
2 parents 677fff4 + bbef52a commit 0bfc4a8
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -1,36 +1,39 @@
package com.netflix.concurrency.limits.limit;

public class ExpAvgMeasurement implements Measurement {
private final int window;
private final double ratio;
private final double filter;

private double value;
private int count;

public ExpAvgMeasurement(int window, double filter) {
this.window = window;
this.ratio = 1.0 / window;
this.filter = filter;
this.value = 0.0;
this.count = 0;
}

@Override
public Number add(Number sample) {
// First sample seen
if (count == 0) {
if (value == 0) {
value = sample.doubleValue();
} else if (sample.doubleValue() < value) {
value = sample.doubleValue();
count = 1;
// Adaptive average for the first <window> samples
} else if (count < window) {
count++;
double tempRatio = 1.0 / count;
value = (1-tempRatio) * value + tempRatio * Math.min(value*filter, sample.doubleValue());
// Steady state
} else {
value = (1-ratio) * value + ratio * Math.min(value*filter, sample.doubleValue());
}
// // First sample seen
// if (count == 0) {
// value = sample.doubleValue();
// count = 1;
// // Adaptive average for the first <window> samples
// } else if (count < window) {
// count++;
// double tempRatio = 1.0 / count;
// value = (1-tempRatio) * value + tempRatio * Math.min(value*filter, sample.doubleValue());
// // Steady state
// } else {
// value = (1-ratio) * value + ratio * Math.min(value*filter, sample.doubleValue());
// }
return value;
}

Expand All @@ -42,7 +45,6 @@ public Number get() {
@Override
public void reset() {
value = 0.0;
count = 0;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ public static class Builder {
private int initialLimit = 50;
private int minLimit = 1;
private int maxLimit = 1000;
private long minRttThreshold = TimeUnit.MICROSECONDS.toNanos(1);

private double smoothing = 0.1;
private double smoothing = 0.2;
private Function<Integer, Integer> queueSize = SquareRootFunction.create(4);
private MetricRegistry registry = EmptyMetricRegistry.INSTANCE;
private int noLoadRttWindow = 1000;
Expand All @@ -44,8 +43,8 @@ public static class Builder {
* @param units
* @return Chainable builder
*/
@Deprecated
public Builder minRttThreshold(long minRttTreshold, TimeUnit units) {
this.minRttThreshold = units.toNanos(minRttTreshold);
return this;
}

Expand Down Expand Up @@ -212,8 +211,6 @@ public static GradientLimit newDefault() {

private final Function<Integer, Integer> queueSize;

private final long minRttThreshold;

private final SampleListener minRttSampleListener;

private final SampleListener minWindowRttSampleListener;
Expand All @@ -226,7 +223,6 @@ private GradientLimit(Builder builder) {
this.minLimit = builder.minLimit;
this.queueSize = builder.queueSize;
this.smoothing = builder.smoothing;
this.minRttThreshold = builder.minRttThreshold;
this.rttNoLoadAccumulator = new ExpAvgMeasurement(builder.noLoadRttWindow, builder.noLoadRttFilter);

this.minRttSampleListener = builder.registry.registerDistribution(MetricIds.MIN_RTT_NAME);
Expand All @@ -240,9 +236,6 @@ public synchronized void update(SampleWindow sample) {

final long rttSample = sample.getCandidateRttNanos();
minWindowRttSampleListener.addSample(rttSample);
if (rttSample < minRttThreshold) {
return;
}

final double queueSize = this.queueSize.apply((int)this.estimatedLimit);
queueSizeSampleListener.addSample(queueSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public final class DefaultLimiter<ContextT> implements Limiter<ContextT> {

private static final long DEFAULT_MIN_WINDOW_TIME = TimeUnit.SECONDS.toNanos(1);
private static final long DEFAULT_MAX_WINDOW_TIME = TimeUnit.SECONDS.toNanos(1);
private static final long DEFAULT_MIN_RTT_THRESHOLD = TimeUnit.MICROSECONDS.toNanos(100);

/**
* Minimum observed samples to filter out sample windows with not enough significant samples
Expand All @@ -42,6 +43,8 @@ public final class DefaultLimiter<ContextT> implements Limiter<ContextT> {

private final long maxWindowTime;

private final long minRttThreshold;

private final int windowSize;

/**
Expand All @@ -61,6 +64,7 @@ public static class Builder {
private long maxWindowTime = DEFAULT_MAX_WINDOW_TIME;
private long minWindowTime = DEFAULT_MIN_WINDOW_TIME;
private int windowSize = DEFAULT_WINDOW_SIZE;
private long minRttThreshold = DEFAULT_MIN_RTT_THRESHOLD;

/**
* Algorithm used to determine the new limit based on the current limit and minimum
Expand Down Expand Up @@ -99,6 +103,11 @@ public Builder windowSize(int windowSize) {
return this;
}

public Builder minRttThreshold(long minRttThreshold, TimeUnit units) {
this.minRttThreshold = units.toNanos(minRttThreshold);
return this;
}

/**
* @param strategy Strategy for enforcing the limit
*/
Expand Down Expand Up @@ -126,13 +135,15 @@ public DefaultLimiter(Limit limit, Strategy<ContextT> strategy) {
this.windowSize = DEFAULT_WINDOW_SIZE;
this.minWindowTime = DEFAULT_MIN_WINDOW_TIME;
this.maxWindowTime = DEFAULT_MAX_WINDOW_TIME;
this.minRttThreshold = DEFAULT_MIN_RTT_THRESHOLD;
strategy.setLimit(limit.getLimit());
}

private DefaultLimiter(Builder builder, Strategy<ContextT> strategy) {
this.limit = builder.limit;
this.minWindowTime = builder.minWindowTime;
this.maxWindowTime = builder.maxWindowTime;
this.minRttThreshold = DEFAULT_MIN_RTT_THRESHOLD;
this.windowSize = builder.windowSize;
this.strategy = strategy;
strategy.setLimit(limit.getLimit());
Expand All @@ -158,7 +169,9 @@ public void onSuccess() {
final long endTime = nanoClock.get();
final long rtt = endTime - startTime;

sample.updateAndGet(window -> window.addSample(rtt, currentMaxInFlight));
if (rtt > minRttThreshold) {
sample.updateAndGet(window -> window.addSample(rtt, currentMaxInFlight));
}

if (endTime > nextUpdateTime) {
synchronized (lock) {
Expand Down

0 comments on commit 0bfc4a8

Please sign in to comment.