From ec94553399c5f8b9339f330d5da75f8ee8cfa3cd Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Fri, 8 Nov 2024 02:33:32 +0800 Subject: [PATCH] [COMMON] add hw0 --- app/src/main/java/org/astraea/app/App.java | 3 + .../org/astraea/app/performance/Prepare.java | 121 ++++++++++++++++++ .../common/partitioner/YourPartitioner.java | 41 ++++++ 3 files changed, 165 insertions(+) create mode 100644 app/src/main/java/org/astraea/app/performance/Prepare.java create mode 100644 common/src/main/java/org/astraea/common/partitioner/YourPartitioner.java diff --git a/app/src/main/java/org/astraea/app/App.java b/app/src/main/java/org/astraea/app/App.java index 53c3b0b36f..4180d4c246 100644 --- a/app/src/main/java/org/astraea/app/App.java +++ b/app/src/main/java/org/astraea/app/App.java @@ -24,6 +24,7 @@ import org.astraea.app.automation.Automation; import org.astraea.app.benchmark.BalancerBenchmarkApp; import org.astraea.app.performance.Performance; +import org.astraea.app.performance.Prepare; import org.astraea.app.publisher.MetricPublisher; import org.astraea.app.version.Version; import org.astraea.app.web.WebService; @@ -33,6 +34,8 @@ public class App { Map.of( "performance", Performance.class, + "prepare", + Prepare.class, "automation", Automation.class, "web", diff --git a/app/src/main/java/org/astraea/app/performance/Prepare.java b/app/src/main/java/org/astraea/app/performance/Prepare.java new file mode 100644 index 0000000000..8b0d35ffc0 --- /dev/null +++ b/app/src/main/java/org/astraea/app/performance/Prepare.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app.performance; + +import com.beust.jcommander.Parameter; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.astraea.app.argument.StringListField; +import org.astraea.common.DataSize; +import org.astraea.common.admin.Admin; +import org.astraea.common.admin.Broker; +import org.astraea.common.admin.Replica; +import org.astraea.common.admin.TopicPartitionPath; + +public class Prepare { + + public static void main(String[] args) { + execute(Argument.parse(new Argument(), args)); + } + + public static void execute(final Argument param) { + try (var admin = Admin.of(param.bootstrapServers())) { + if (param.topics == null || param.topics.isEmpty()) { + var cluster = + admin + .topicNames(true) + .thenComposeAsync(admin::clusterInfo) + .toCompletableFuture() + .join(); + var brokerSize = + cluster.brokers().stream() + .collect( + Collectors.toMap( + Broker::id, + b -> + b.topicPartitionPaths().stream() + .mapToLong(TopicPartitionPath::size) + .sum())); + System.out.println("id,role,value"); + brokerSize.forEach( + (id, size) -> System.out.println(id + ",broker," + DataSize.Byte.of(size))); + + var partitionSize = + cluster.replicas().stream() + .filter(Replica::isLeader) + .collect(Collectors.toMap(Replica::topicPartition, Replica::size)); + partitionSize.forEach( + (tp, size) -> System.out.println(tp + ",partition," + DataSize.Byte.of(size))); + + var brokerAvg = + brokerSize.values().stream().mapToLong(i -> i).filter(i -> i > 0).average().orElse(0); + var avedevBroker = + brokerSize.values().stream() + .mapToDouble(i -> Math.abs(i - brokerAvg)) + .average() + .orElse(0.0D); + System.out.println("avedev,broker," + DataSize.Byte.of((long) avedevBroker)); + + var partitionAvg = + partitionSize.values().stream() + .mapToLong(i -> i) + .filter(i -> i > 0) + .average() + .orElse(0); + var avedevPartition = + partitionSize.values().stream() + .mapToDouble(i -> Math.abs(i - partitionAvg)) + .average() + .orElse(0.0D); + System.out.println("avedev,partition," + DataSize.Byte.of((long) avedevPartition)); + return; + } + var brokerIds = + admin.brokers().toCompletableFuture().join().stream().map(Broker::id).toList(); + Function> ids = + index -> { + if (index < brokerIds.size()) return List.of(brokerIds.get(index)); + return List.of(brokerIds.get((int) (Math.random() * brokerIds.size()))); + }; + param.topics.forEach( + topic -> + admin + .creator() + .topic(topic) + .replicasAssignments( + IntStream.range(0, brokerIds.size() + 2) + .boxed() + .collect(Collectors.toMap(i -> i, ids))) + .run() + .toCompletableFuture() + .join()); + System.out.println("succeed to create topics: " + param.topics); + } + } + + public static class Argument extends org.astraea.app.argument.Argument { + @Parameter( + names = {"--topics"}, + description = "List: topic names which you subscribed", + validateWith = StringListField.class, + listConverter = StringListField.class, + required = false) + List topics; + } +} diff --git a/common/src/main/java/org/astraea/common/partitioner/YourPartitioner.java b/common/src/main/java/org/astraea/common/partitioner/YourPartitioner.java new file mode 100644 index 0000000000..8ae5bc82e8 --- /dev/null +++ b/common/src/main/java/org/astraea/common/partitioner/YourPartitioner.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common.partitioner; + +import java.util.Map; +import org.apache.kafka.clients.producer.Partitioner; +import org.apache.kafka.common.Cluster; + +public class YourPartitioner implements Partitioner { + + // get your magic configs + @Override + public void configure(Map configs) {} + + // write your magic code + @Override + public int partition( + String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { + var partitions = cluster.availablePartitionsForTopic(topic); + // no available partition so we return -1 + if (partitions.isEmpty()) return -1; + return partitions.get(0).partition(); + } + + @Override + public void close() {} +}