Skip to content

Commit

Permalink
Merge pull request #17 from Netflix/feature/gradient_algo
Browse files Browse the repository at this point in the history
Gradient based algorithm to detect the concurrency limit
  • Loading branch information
elandau authored Feb 9, 2018
2 parents 650e176 + f141547 commit 6ca9013
Show file tree
Hide file tree
Showing 4 changed files with 200 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package com.netflix.concurrency.limits.limit;

import com.netflix.concurrency.limits.Limit;
import com.netflix.concurrency.limits.MetricIds;
import com.netflix.concurrency.limits.MetricRegistry;
import com.netflix.concurrency.limits.internal.EmptyMetricRegistry;
import com.netflix.concurrency.limits.internal.Preconditions;

import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Concurrency limit algorithm that adjust the limits based on the gradient of change in the
* samples minimum RTT and absolute minimum RTT allowing for a queue of square root of the
* current limit. Why square root? Because it's better than a fixed queue size that becomes too
* small for large limits but still prevents the limit from growing too much by slowing down
* growth as the limit grows.
*/
public final class GradientLimit implements Limit {
private static final Logger LOG = LoggerFactory.getLogger(GradientLimit.class);

public static class Builder {
private int initialLimit = 20;
private int maxConcurrency = 1000;
private double smoothing = 0.2;
private Function<Integer, Integer> queueSize = (limit) -> (int)Math.max(4, Math.sqrt(limit));
private MetricRegistry registry = EmptyMetricRegistry.INSTANCE;

public Builder initialLimit(int initialLimit) {
this.initialLimit = initialLimit;
return this;
}

public Builder maxConcurrency(int maxConcurrency) {
this.maxConcurrency = maxConcurrency;
return this;
}

public Builder queueSize(int queueSize) {
this.queueSize = (ignore) -> queueSize;
return this;
}

public Builder queueSize(Function<Integer, Integer> queueSize) {
this.queueSize = queueSize;
return this;
}

public Builder smoothing(double smoothing) {
this.smoothing = smoothing;
return this;
}

public Builder metricRegistry(MetricRegistry registry) {
this.registry = registry;
return this;
}

public GradientLimit build() {
GradientLimit limit = new GradientLimit(this);
registry.registerGauge(MetricIds.MIN_RTT_GUAGE_NAME, limit::getRttNoLoad);
return limit;
}
}

public static Builder newBuilder() {
return new Builder();
}

public static GradientLimit newDefault() {
return newBuilder().build();
}

/**
* Estimated concurrency limit based on our algorithm
*/
private volatile double estimatedLimit;

private volatile long rtt_noload = 0;

private boolean didDrop = false;

/**
* Maximum allowed limit providing an upper bound failsafe
*/
private final int maxLimit;

private final Function<Integer, Integer> queueSize;

private final double smoothing;

private GradientLimit(Builder builder) {
this.estimatedLimit = builder.initialLimit;
this.maxLimit = builder.maxConcurrency;
this.queueSize = builder.queueSize;
this.smoothing = builder.smoothing;
}

@Override
public synchronized void update(long rtt) {
Preconditions.checkArgument(rtt > 0, "rtt must be >0 but got " + rtt);

if (rtt_noload == 0 || rtt < rtt_noload) {
LOG.debug("New MinRTT {}", rtt);
rtt_noload = rtt;
}

final double queueSize = this.queueSize.apply((int) Math.ceil(this.estimatedLimit));
final double gradient = (double)rtt_noload / rtt;
double newLimit;
if (didDrop) {
newLimit = estimatedLimit * (1-smoothing) + smoothing*(estimatedLimit/2);
didDrop = false;
} else {
newLimit = estimatedLimit * (1-smoothing) + smoothing*(gradient * estimatedLimit + queueSize);
}

newLimit = Math.max(1, Math.min(maxLimit, newLimit));
if ((int)newLimit != (int)estimatedLimit) {
estimatedLimit = newLimit;
if (LOG.isDebugEnabled()) {
LOG.debug("New limit={} minRtt={} μs winRtt={} μs queueSize={} gradient={}",
(int)estimatedLimit,
TimeUnit.NANOSECONDS.toMicros(rtt_noload),
TimeUnit.NANOSECONDS.toMicros(rtt),
queueSize,
gradient);
}
}
}

@Override
public synchronized void drop() {
didDrop = true;
}

@Override
public int getLimit() {
return (int)Math.ceil(estimatedLimit);
}

public long getRttNoLoad() {
return rtt_noload;
}

@Override
public String toString() {
return "GradientLimit [limit=" + (int)estimatedLimit +
", rtt_noload=" + TimeUnit.NANOSECONDS.toMillis(rtt_noload) +
"]";
}
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package com.netflix.concurrency.limits.limit;

import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.netflix.concurrency.limits.Limit;
import com.netflix.concurrency.limits.MetricIds;
import com.netflix.concurrency.limits.MetricRegistry;
import com.netflix.concurrency.limits.internal.EmptyMetricRegistry;
import com.netflix.concurrency.limits.internal.Preconditions;

import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Limiter based on TCP Vegas where the limit increases by alpha if the queue_use is small ({@literal <} alpha)
* and decreases by alpha if the queue_use is large ({@literal >} beta).
Expand Down Expand Up @@ -94,7 +94,9 @@ public Builder fastStartEnabled(boolean fastStartEnabled) {
}

public VegasLimit build() {
return new VegasLimit(this);
VegasLimit limit = new VegasLimit(this);
registry.registerGauge(MetricIds.MIN_RTT_GUAGE_NAME, limit::getRttNoLoad);
return limit;
}
}

Expand Down Expand Up @@ -132,7 +134,6 @@ private VegasLimit(Builder builder) {
this.betaFunc = builder.beta;
this.backoffRatio = builder.backoffRatio;
this.tolerance = builder.tolerance;
builder.registry.registerGauge(MetricIds.MIN_RTT_GUAGE_NAME, () -> rtt_noload);
}

@Override
Expand Down Expand Up @@ -183,6 +184,10 @@ public int getLimit() {
return estimatedLimit;
}

long getRttNoLoad() {
return rtt_noload;
}

@Override
public String toString() {
return "VegasLimit [limit=" + estimatedLimit +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.netflix.concurrency.limits.executors.BlockingAdaptiveExecutor;
import com.netflix.concurrency.limits.limit.AIMDLimit;
import com.netflix.concurrency.limits.limit.GradientLimit;
import com.netflix.concurrency.limits.limit.VegasLimit;
import com.netflix.concurrency.limits.limiter.DefaultLimiter;
import com.netflix.concurrency.limits.strategy.SimpleStrategy;
Expand Down Expand Up @@ -34,6 +35,13 @@ public void testVegas() {
run(10000, 50, executor, randomLatency(50, 150));
}

@Test
public void testGradient() {
DefaultLimiter<Void> limiter = DefaultLimiter.newBuilder().limit(GradientLimit.newBuilder().initialLimit(100).build()).build(new SimpleStrategy<>());
Executor executor = new BlockingAdaptiveExecutor(limiter);
run(100000, 50, executor, randomLatency(50, 150));
}

public void run(int iterations, int limit, Executor executor, Supplier<Long> latency) {
AtomicInteger requests = new AtomicInteger();
AtomicInteger busy = new AtomicInteger();
Expand Down
24 changes: 24 additions & 0 deletions concurrency-limits-core/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#
# Copyright 2012 Netflix, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

log4j.rootLogger=ERROR, console

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d %-5p %c{1}:%L %x %m [%t]%n

log4j.additivity.com.netflix.concurrency.limits=false
log4j.logger.com.netflix.concurrency.limits=DEBUG, console

0 comments on commit 6ca9013

Please sign in to comment.