Skip to content

Commit

Permalink
add RpcGuard
Browse files Browse the repository at this point in the history
  • Loading branch information
gongxuanzhang committed Jan 3, 2025
1 parent cec3985 commit 960b3df
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 121 deletions.
6 changes: 2 additions & 4 deletions app/src/main/java/org/astraea/app/checker/Checker.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@

public class Checker {

private static final List<Guard> GUARDS = List.of(
new ProduceRpcGuard(),
new FetchRpcGuard());
private static final List<Guard> GUARDS = List.of(new RpcGuard());

public static void main(String[] args) throws Exception {
execute(Argument.parse(new Argument(), args));
Expand All @@ -47,7 +45,7 @@ public static void execute(final Argument param) throws Exception {
try (var admin = Admin.create(Map.of("bootstrap.servers", param.bootstrapServers()))) {
for (var guard : GUARDS) {
var result = guard.run(admin, param.mBeanClientFunction(), param.readChangelog());
System.out.println(result);
result.forEach(System.out::println);
}
}
}
Expand Down
45 changes: 0 additions & 45 deletions app/src/main/java/org/astraea/app/checker/FetchRpcGuard.java

This file was deleted.

44 changes: 0 additions & 44 deletions app/src/main/java/org/astraea/app/checker/ProduceRpcGuard.java

This file was deleted.

65 changes: 37 additions & 28 deletions app/src/main/java/org/astraea/app/checker/Report.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,46 @@
*/
package org.astraea.app.checker;

import org.apache.kafka.common.Node;

import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.Node;

public record Report(Node node, String why) {
static Report noMetrics(Node node) {
return new Report(node, "failed to get metrics from");
}

static Report of(Node node, String why) {
return new Report(node, why);
}

static Report empty() {
return new Report(null, "");
}

static Report of(Node node, Protocol protocol, Set<Integer> versions) {
var unsupportedVersions =
versions.stream().filter(v -> v < protocol.base()).collect(Collectors.toSet());
if (unsupportedVersions.isEmpty()) return empty();
return new Report(
node,
String.format(
"there are unsupported %s versions: %s due to new baseline: %s",
protocol.name(), unsupportedVersions, protocol.base()));
}

Stream<Report> stream() {
if (why.isEmpty()) return Stream.empty();
return Stream.of(this);
}
static Report noMetrics(Node node) {
return new Report(node, "failed to get metrics from");
}

static Report of(Node node, String why) {
return new Report(node, why);
}

static Report empty() {
return new Report(null, "");
}

static Report of(Node node, Protocol protocol, Set<Integer> versions) {
var unsupportedVersions =
versions.stream().filter(v -> v < protocol.base()).collect(Collectors.toSet());
if (unsupportedVersions.isEmpty()) return empty();
return new Report(
node,
String.format(
"there are unsupported %s versions: %s due to new baseline: %s",
protocol.name(), unsupportedVersions, protocol.base()));
}

Stream<Report> stream() {
if (why.isEmpty()) return Stream.empty();
return Stream.of(this);
}

@Override
public String toString() {
if (node == null) {
return "Report[pass]";
}
return "Report[" + node+ "] why = " + why;
}
}
50 changes: 50 additions & 0 deletions app/src/main/java/org/astraea/app/checker/RpcGuard.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.app.checker;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.Node;
import org.astraea.common.metrics.MBeanClient;
import org.astraea.common.metrics.broker.NetworkMetrics;

import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.function.Function;

public class RpcGuard implements Guard {
@Override
public Collection<Report> run(
Admin admin, Function<Node, MBeanClient> clients, Changelog changelog) throws Exception {
Map<String, Protocol> protocols = changelog.protocols();
return admin.describeCluster().nodes().get().stream()
.map(node -> checkNode(node, protocols, clients))
.flatMap(Collection::stream)
.toList();
}

private Collection<Report> checkNode(Node node, Map<String, Protocol> protocols,Function<Node, MBeanClient> clients) {
return Arrays.stream(NetworkMetrics.Request.values())
.filter(request -> protocols.containsKey(request.metricName().toLowerCase()))
.map(request -> {
var protocol = protocols.get(request.metricName().toLowerCase());
var versions = NetworkMetrics.Request.PRODUCE.versions(clients.apply(node));
return Report.of(node, protocol, versions);
}).toList();
}

}

0 comments on commit 960b3df

Please sign in to comment.