Skip to content

Commit

Permalink
[COMMON] add more logs to perf and server.metric.fetcher.interval.sec…
Browse files Browse the repository at this point in the history
…onds to metrics collector
  • Loading branch information
chia7712 committed Oct 26, 2024
1 parent 7276b28 commit 629bd2d
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 57 deletions.
121 changes: 76 additions & 45 deletions app/src/main/java/org/astraea/app/performance/Performance.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import org.astraea.app.argument.DataRateField;
import org.astraea.app.argument.DataSizeField;
import org.astraea.app.argument.DistributionTypeField;
Expand All @@ -58,6 +59,7 @@
import org.astraea.common.admin.Partition;
import org.astraea.common.admin.Replica;
import org.astraea.common.admin.TopicPartition;
import org.astraea.common.admin.TopicPartitionPath;
import org.astraea.common.consumer.Consumer;
import org.astraea.common.consumer.ConsumerConfigs;
import org.astraea.common.partitioner.Partitioner;
Expand Down Expand Up @@ -103,52 +105,75 @@ public static List<String> execute(final Argument param) {
() -> producerThreads.stream().allMatch(AbstractThread::closed),
() -> consumerThreads.stream().allMatch(AbstractThread::closed));

var fileWriterTask =
CompletableFuture.completedFuture(
param.CSVPath == null
? (Runnable) (() -> {})
: ReportFormat.createFileWriter(
param.reportFormat,
param.CSVPath,
() -> consumerThreads.stream().allMatch(AbstractThread::closed),
() -> producerThreads.stream().allMatch(AbstractThread::closed)))
.thenAcceptAsync(Runnable::run);

var monkeys = MonkeyThread.play(consumerThreads, param);

CompletableFuture.runAsync(
() -> {
dataGenerator.waitForDone();
var last = 0L;
var lastChange = System.currentTimeMillis();
while (true) {
var current = Report.recordsConsumedTotal();

if (blockingQueues.stream().allMatch(Collection::isEmpty)) {
var unfinishedProducers = producerThreads.stream().filter(p -> !p.closed()).toList();
unfinishedProducers.forEach(AbstractThread::close);
try (var admin = Admin.of(param.bootstrapServers())) {
Supplier<LongStream> sizes =
() ->
admin.brokers().toCompletableFuture().join().stream()
.filter(
b ->
b.topicPartitionPaths().stream()
.anyMatch(p -> param.topics.contains(p.topic())))
.mapToLong(
b ->
b.topicPartitionPaths().stream()
.mapToLong(TopicPartitionPath::size)
.sum());

var fileWriterTask =
CompletableFuture.completedFuture(
param.CSVPath == null
? (Runnable) (() -> {})
: ReportFormat.createFileWriter(
param.reportFormat,
param.CSVPath,
() -> consumerThreads.stream().allMatch(AbstractThread::closed),
() -> producerThreads.stream().allMatch(AbstractThread::closed),
param.logInterval,
List.of(
ReportFormat.CSVContentElement.create(
"max size", () -> String.valueOf(sizes.get().max().getAsLong())),
ReportFormat.CSVContentElement.create(
"min size",
() -> String.valueOf(sizes.get().min().getAsLong())))))
.thenAcceptAsync(Runnable::run);

var monkeys = MonkeyThread.play(consumerThreads, param);

CompletableFuture.runAsync(
() -> {
dataGenerator.waitForDone();
var last = 0L;
var lastChange = System.currentTimeMillis();
while (true) {
var current = Report.recordsConsumedTotal();

if (blockingQueues.stream().allMatch(Collection::isEmpty)) {
var unfinishedProducers =
producerThreads.stream().filter(p -> !p.closed()).toList();
unfinishedProducers.forEach(AbstractThread::close);
}

if (current != last) {
last = current;
lastChange = System.currentTimeMillis();
}
if (System.currentTimeMillis() - lastChange >= param.readIdle.toMillis()) {
consumerThreads.forEach(AbstractThread::close);
monkeys.forEach(AbstractThread::close);
}
if (consumerThreads.stream().allMatch(AbstractThread::closed)
&& monkeys.stream().allMatch(AbstractThread::closed)
&& producerThreads.stream().allMatch(AbstractThread::closed)) return;
Utils.sleep(Duration.ofSeconds(1));
}

if (current != last) {
last = current;
lastChange = System.currentTimeMillis();
}
if (System.currentTimeMillis() - lastChange >= param.readIdle.toMillis()) {
consumerThreads.forEach(AbstractThread::close);
monkeys.forEach(AbstractThread::close);
}
if (consumerThreads.stream().allMatch(AbstractThread::closed)
&& monkeys.stream().allMatch(AbstractThread::closed)
&& producerThreads.stream().allMatch(AbstractThread::closed)) return;
Utils.sleep(Duration.ofSeconds(1));
}
});
producerThreads.forEach(AbstractThread::waitForDone);
monkeys.forEach(AbstractThread::waitForDone);
consumerThreads.forEach(AbstractThread::waitForDone);
tracker.waitForDone();
fileWriterTask.join();
return param.topics;
});
producerThreads.forEach(AbstractThread::waitForDone);
monkeys.forEach(AbstractThread::waitForDone);
consumerThreads.forEach(AbstractThread::waitForDone);
tracker.waitForDone();
fileWriterTask.join();
return param.topics;
}
}

static List<ConsumerThread> consumers(Argument param, Map<TopicPartition, Long> latestOffsets) {
Expand Down Expand Up @@ -263,6 +288,12 @@ String partitioner() {
return this.partitioner;
}

@Parameter(
names = {"--log.interval"},
description = "integer: seconds to log csv output",
validateWith = PositiveLongField.class)
Duration logInterval = Duration.ofSeconds(2);

@Parameter(
names = {"--transaction.size"},
description =
Expand Down
27 changes: 22 additions & 5 deletions app/src/main/java/org/astraea/app/performance/ReportFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,18 @@ public static Runnable createFileWriter(
ReportFormat reportFormat,
Path path,
Supplier<Boolean> consumerDone,
Supplier<Boolean> producerDone) {
Supplier<Boolean> producerDone,
Duration interval) {
return createFileWriter(reportFormat, path, consumerDone, producerDone, interval, List.of());
}

public static Runnable createFileWriter(
ReportFormat reportFormat,
Path path,
Supplier<Boolean> consumerDone,
Supplier<Boolean> producerDone,
Duration interval,
List<CSVContentElement> extraCsv) {
var filePath =
FileSystems.getDefault()
.getPath(
Expand All @@ -81,12 +92,18 @@ public static Runnable createFileWriter(
var writer = new BufferedWriter(Utils.packException(() -> new FileWriter(filePath.toFile())));
switch (reportFormat) {
case CSV -> {
initCSVFormat(writer, latencyAndIO());
Supplier<List<CSVContentElement>> elements =
() -> {
var e = new ArrayList<>(latencyAndIO());
e.addAll(extraCsv);
return e;
};
initCSVFormat(writer, elements.get());
return () -> {
try {
while (!(producerDone.get() && consumerDone.get())) {
logToCSV(writer, latencyAndIO());
Utils.sleep(Duration.ofSeconds(1));
logToCSV(writer, elements.get());
Utils.sleep(interval);
}
} finally {
Utils.close(writer);
Expand Down Expand Up @@ -138,7 +155,7 @@ static void logToJSON(BufferedWriter writer, List<CSVContentElement> elements) {
}

// Visible for test
interface CSVContentElement {
public interface CSVContentElement {
String title();

String value();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@
import org.astraea.common.metrics.JndiClient;

public class ServerMetricFetcher implements MetricsReporter, ClientTelemetry {
private static final String BOOTSTRAP_SERVERS = "server.metric.fetcher.bootstrap.servers";
private static final String BOOTSTRAP_SERVERS_CONFIG = "server.metric.fetcher.bootstrap.servers";
private static final String INTERVAL_CONFIG = "server.metric.fetcher.interval.seconds";
private String bootstrapServers;
private int nodeId = -1;
private Duration interval = Duration.ofSeconds(10);
private final AtomicBoolean closed = new AtomicBoolean(false);
private final BlockingQueue<Boolean> queue = new LinkedBlockingQueue<>(1);

Expand All @@ -55,22 +57,23 @@ public void close() {

@Override
public void configure(Map<String, ?> map) {
if (!map.containsKey(BOOTSTRAP_SERVERS))
throw new RuntimeException(BOOTSTRAP_SERVERS + " is required");
if (!map.containsKey(BOOTSTRAP_SERVERS_CONFIG))
throw new RuntimeException(BOOTSTRAP_SERVERS_CONFIG + " is required");
if (!map.containsKey("node.id")) throw new RuntimeException("node.id is required");
this.bootstrapServers = map.get(BOOTSTRAP_SERVERS).toString();
this.bootstrapServers = map.get(BOOTSTRAP_SERVERS_CONFIG).toString();
nodeId = Integer.parseInt(map.get("node.id").toString());
if (map.containsKey(INTERVAL_CONFIG))
interval = Duration.ofSeconds(Long.parseLong(map.get(INTERVAL_CONFIG).toString()));
CompletableFuture.runAsync(
() -> {
MetricSender sender = null;
var lastSent = System.currentTimeMillis();
try {
while (!closed.get()) {
var done = queue.poll(3, TimeUnit.SECONDS);
var done = queue.poll(interval.toSeconds(), TimeUnit.SECONDS);
if (done == null) done = false;
if (done) return;
if (System.currentTimeMillis() - lastSent <= Duration.ofSeconds(3).toMillis())
continue;
if (System.currentTimeMillis() - lastSent <= interval.toMillis()) continue;
if (sender == null) sender = MetricSender.topic(bootstrapServers);
var beans = JndiClient.local().beans(BeanQuery.all());
sender.send(nodeId, beans);
Expand Down

0 comments on commit 629bd2d

Please sign in to comment.