Skip to content

Commit

Permalink
Merge pull request #101 from Foxikle/fix/redis-reliablility
Browse files Browse the repository at this point in the history
Fix redis reliablility
  • Loading branch information
Foxikle authored Jun 16, 2024
2 parents a1bdf94 + 4c2f2c8 commit a0a6b90
Show file tree
Hide file tree
Showing 13 changed files with 190 additions and 86 deletions.
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dependencies {
api("com.rabbitmq:amqp-client:5.21.0") // Message broker
api("dev.hollowcube:polar:1.10.0") // Polar
api("redis.clients:jedis:5.1.3") // redis client
api("com.google.guava:guava:33.2.1-jre")
implementation("org.reflections:reflections:0.10.2") // reflection utils
}

Expand Down
14 changes: 10 additions & 4 deletions src/main/java/net/cytonic/cytosis/CytonicNetwork.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import lombok.Getter;
import net.cytonic.cytosis.data.RedisDatabase;
import net.cytonic.cytosis.data.obj.CytonicServer;

import java.util.HashSet;
import java.util.Set;
Expand All @@ -15,6 +16,8 @@ public class CytonicNetwork {
private final Set<String> networkPlayers = new HashSet<>();
private final Set<UUID> networkPlayerUUIDs = new HashSet<>();

private final Set<CytonicServer> servers = new HashSet<>(); // online servers

/**
* The default constructor
*/
Expand All @@ -24,13 +27,16 @@ public CytonicNetwork() {
/**
* Imports online player data from redis
*
* @param redisDatabase The redis instance
* @param redis The redis instance
*/
public void importDataFromRedis(RedisDatabase redisDatabase) {
public void importDataFromRedis(RedisDatabase redis) {
networkPlayers.clear();
networkPlayerUUIDs.clear();
networkPlayers.addAll(redisDatabase.getOnlinePlayers());
networkPlayerUUIDs.addAll(redisDatabase.getOnlineUUIDs());
servers.clear();

networkPlayers.addAll(redis.getSet(RedisDatabase.ONLINE_PLAYER_NAME_KEY));
redis.getSet(RedisDatabase.ONLINE_PLAYER_UUID_KEY).forEach(s -> networkPlayerUUIDs.add(UUID.fromString(s)));
redis.getSet(RedisDatabase.ONLINE_SERVER_KEY).forEach(s -> servers.add(CytonicServer.deserialize(s)));
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/net/cytonic/cytosis/Cytosis.java
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ public static void completeNonEssentialTasks(long start) {
Logger.info("Initializing Plugin Manager!");
pluginManager = new PluginManager();
Logger.info("Loading plugins!");
pluginManager.loadPlugins();
Thread.ofVirtual().name("CytosisPluginLoader").start(pluginManager::loadPlugins);

Logger.info("Initializing Rank Manager");
rankManager = new RankManager();
Expand Down
5 changes: 2 additions & 3 deletions src/main/java/net/cytonic/cytosis/commands/RankCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ public class RankCommand extends Command {
public RankCommand() {
super("rank");
setCondition((sender, _) -> sender.hasPermission("cytosis.commands.rank"));
// setDefaultExecutor((sender, _) -> sender.sendMessage(MM."<red>You must specify a valid player and rank!"));

var rankArg = ArgumentType.Enum("rank", PlayerRank.class).setFormat(ArgumentEnum.Format.LOWER_CASED);
rankArg.setCallback((sender, exception) -> sender.sendMessage(STR."The rank \{exception.getInput()} is invalid!"));
Expand All @@ -41,14 +40,14 @@ public RankCommand() {
if (sender instanceof Player player) {
player.sendActionBar(MM."<green>Fetching online players...");
}
Cytosis.getDatabaseManager().getRedisDatabase().getOnlinePlayers().forEach(player ->
Cytosis.getCytonicNetwork().getNetworkPlayers().forEach(player ->
suggestion.addEntry(new SuggestionEntry(player)));
});


addSyntax((sender, context) -> {
String name = context.get(playerArg);
if (!Cytosis.getDatabaseManager().getRedisDatabase().getOnlinePlayers().contains(name)) {
if (!Cytosis.getCytonicNetwork().getNetworkPlayers().contains(name)) {
sender.sendMessage(MM."<red>The player \{context.get("player")} doesn't exist!");
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public BanCommand() {
if (sender instanceof Player player) {
player.sendActionBar(MM."<green>Fetching online players...");
}
Cytosis.getDatabaseManager().getRedisDatabase().getOnlinePlayers().forEach(player ->
Cytosis.getCytonicNetwork().getNetworkPlayers().forEach(player ->
suggestion.addEntry(new SuggestionEntry(player)));
});
var durationArg = ArgumentType.Word("duration");
Expand All @@ -54,7 +54,7 @@ public BanCommand() {
final String rawDur = context.get(durationArg);
final Instant dur = DurationParser.parse(rawDur);

if (!Cytosis.getDatabaseManager().getRedisDatabase().getOnlinePlayers().contains(player)) {
if (!Cytosis.getCytonicNetwork().getNetworkPlayers().contains(player)) {
sender.sendMessage(MM."<red>The player \{context.get(playerArg)} doesn't exist!");
return;
}
Expand Down
83 changes: 46 additions & 37 deletions src/main/java/net/cytonic/cytosis/data/RedisDatabase.java
Original file line number Diff line number Diff line change
@@ -1,32 +1,51 @@
package net.cytonic.cytosis.data;

import lombok.Getter;
import net.cytonic.cytosis.Cytosis;
import net.cytonic.cytosis.config.CytosisSettings;
import net.cytonic.cytosis.logging.Logger;
import net.cytonic.cytosis.messaging.pubsub.PlayerLoginLogout;
import net.cytonic.cytosis.messaging.pubsub.ServerStatus;
import net.cytonic.cytosis.utils.Utils;
import redis.clients.jedis.*;

import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* A class that holds the connection to the redis cache
*/
@SuppressWarnings("unused")
public class RedisDatabase extends JedisPubSub {
public class RedisDatabase {

/**
* Cached player names
*/
public static final String ONLINE_PLAYER_NAME_KEY = "online_player_names";
/**
* Cached player UUIDs
*/
public static final String ONLINE_PLAYER_UUID_KEY = "online_player_uuids";
/**
* Cached Servers
*/
public static final String ONLINE_SERVER_KEY = "online_servers";

/**
* Player login/out channel
*/
public static final String PLAYER_STATUS_CHANNEL = "player_status";
/**
* Server startup / shutdown
*/
public static final String SERVER_STATUS_CHANNEL = "server_status";

private final String ONLINE_PLAYER_NAME_KEY = "online_player_names";
private final String ONLINE_PLAYER_UUID_KEY = "online_player_uuids";
private final String PLAYER_STATUS_CHANNEL = "player_status";
private final String SERVER_SHUTDOWN_KEY = "server_shutdown";
private final JedisPooled jedis;
private final JedisPooled jedisPub;
private final JedisPooled jedisSub;
private final ExecutorService worker = Executors.newCachedThreadPool(Thread.ofVirtual().name("CytosisRedisWorker").factory());
@Getter
private final Set<String> onlinePlayers;
@Getter
private final Set<UUID> onlineUUIDs;

// server cache


/**
Expand All @@ -36,42 +55,32 @@ public RedisDatabase() {
HostAndPort hostAndPort = new HostAndPort(CytosisSettings.REDIS_HOST, 6379);
JedisClientConfig config = DefaultJedisClientConfig.builder().password(CytosisSettings.REDIS_PASSWORD).build();
this.jedis = new JedisPooled(hostAndPort, config);
this.jedisPub = new JedisPooled(hostAndPort, config);
this.jedisSub = new JedisPooled(hostAndPort, config);

onlinePlayers = jedis.smembers(ONLINE_PLAYER_NAME_KEY);
Set<UUID> uuids = new HashSet<>();
jedis.smembers(ONLINE_PLAYER_UUID_KEY).forEach(s -> uuids.add(UUID.fromString(s)));
this.onlineUUIDs = uuids;
Logger.info(STR."Loaded \{this.onlineUUIDs.size()} players.");
worker.submit(() -> jedis.subscribe(this, PLAYER_STATUS_CHANNEL));
worker.submit(() -> jedisSub.subscribe(new PlayerLoginLogout(), PLAYER_STATUS_CHANNEL));
worker.submit(() -> jedisSub.subscribe(new ServerStatus(), SERVER_STATUS_CHANNEL));
}

/**
* Sends a server shutdown message to the redis server
*/
public void sendShutdownMessage() {
jedis.set(SERVER_SHUTDOWN_KEY, "");
// formatting: <START/STOP>|:|<SERVER_ID>|:|<SERVER_IP>|:|<SERVER_PORT>
jedisPub.publish(SERVER_STATUS_CHANNEL, STR."STOP|:|\{Cytosis.SERVER_ID}|:|\{Utils.getServerIP()}|:|\{CytosisSettings.SERVER_PORT}");
Logger.info("Server shutdown message sent!");
}

/**
* Consumes messages on the redis pub/sub interface to determine the online players
*
* @param channel The channel that was messaged
* @param message The connent of the message
* Sends a server startup message to the redis server
*/
@Override
public void onMessage(String channel, String message) {
if (!channel.equals(PLAYER_STATUS_CHANNEL)) return;
// <PLAYER_NAME>|:|<PLAYER_UUID>|:|<JOIN/LEAVE>
String[] parts = message.split("\\|:\\|");
if (parts[2].equalsIgnoreCase("JOIN")) {
onlinePlayers.add(parts[0]);
onlineUUIDs.add(UUID.fromString(parts[1]));
} else {
onlinePlayers.remove(parts[0]);
onlineUUIDs.remove(UUID.fromString(parts[1]));
}
public void sendStartupMessage() {
// formatting: <START/STOP>|:|<SERVER_ID>|:|<SERVER_IP>|:|<SERVER_PORT>
jedisPub.publish(SERVER_STATUS_CHANNEL, STR."START|:|\{Cytosis.SERVER_ID}|:|\{Utils.getServerIP()}|:|\{CytosisSettings.SERVER_PORT}");
Logger.info("Server startup message sent!");
}


/**
* Disconnects from the redis server
*/
Expand Down Expand Up @@ -127,7 +136,7 @@ public void removeValue(String key, String... value) {
* @param channel the channel to listen on
*/
public void registerPubSub(JedisPubSub jedisPubSub, String channel) {
worker.submit(() -> jedis.subscribe(jedisPubSub, channel));
worker.submit(() -> jedisSub.subscribe(jedisPubSub, channel));
}

/**
Expand All @@ -137,6 +146,6 @@ public void registerPubSub(JedisPubSub jedisPubSub, String channel) {
* @param message the message
*/
public void publish(String channel, String message) {
jedis.publish(channel, message);
jedisPub.publish(channel, message);
}
}
28 changes: 28 additions & 0 deletions src/main/java/net/cytonic/cytosis/data/obj/CytonicServer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package net.cytonic.cytosis.data.obj;

/**
* A class that holds data about a Cytosis server
*/
@SuppressWarnings("unused")
public record CytonicServer(String ip, String id, int port) {

/**
* Converts a serialized string into a CytonicServer
*
* @param serialized The serialized string
* @return the server object
*/
public static CytonicServer deserialize(String serialized) {
String[] parts = serialized.split("\\|");
return new CytonicServer(parts[0], parts[1], Integer.parseInt(parts[2]));
}

/**
* Serializes the server into a string
*
* @return the serialized string
*/
public String serialize() {
return String.format("%s|%s|%d", ip, id, port);
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package net.cytonic.cytosis.messaging;

import lombok.Getter;
import net.cytonic.cytosis.Cytosis;
import net.cytonic.cytosis.config.CytosisSettings;
import net.cytonic.cytosis.data.RedisDatabase;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
Expand All @@ -14,6 +16,7 @@ public class MessagingManager {

@Getter
private final RabbitMQ rabbitMQ;
private final RedisDatabase redis;
private final ExecutorService worker;

/**
Expand All @@ -23,6 +26,7 @@ public MessagingManager() {
worker = Executors.newSingleThreadExecutor(Thread.ofVirtual().name("CytosisMessageWorker").factory());
if (CytosisSettings.RABBITMQ_ENABLED) this.rabbitMQ = new RabbitMQ();
else this.rabbitMQ = null;
this.redis = Cytosis.getDatabaseManager().getRedisDatabase();
}

/**
Expand All @@ -36,8 +40,8 @@ public CompletableFuture<Void> initialize() {
if (rabbitMQ != null) {
rabbitMQ.initializeConnection();
rabbitMQ.initializeQueues();
rabbitMQ.sendServerDeclarationMessage();
rabbitMQ.receiveChatMessages();
redis.sendStartupMessage();
}
future.complete(null);
});
Expand All @@ -49,9 +53,9 @@ public CompletableFuture<Void> initialize() {
*/
public void shutdown() {
if (rabbitMQ != null) {
rabbitMQ.sendServerShutdownMessage();
rabbitMQ.shutdown();
}
redis.sendShutdownMessage();
worker.shutdown();
}
}
33 changes: 2 additions & 31 deletions src/main/java/net/cytonic/cytosis/messaging/RabbitMQ.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import net.cytonic.cytosis.data.enums.KickReason;
import net.cytonic.cytosis.logging.Logger;
import net.cytonic.cytosis.utils.OfflinePlayer;
import net.cytonic.cytosis.utils.Utils;
import net.kyori.adventure.sound.Sound;
import net.kyori.adventure.text.Component;
import net.kyori.adventure.text.serializer.json.JSONComponentSerializer;
Expand Down Expand Up @@ -56,12 +55,13 @@ public void initializeConnection() {
} catch (IOException | TimeoutException e) {
Logger.error("An error occurred whilst connecting to RabbitMQ!", e);
}
Logger.info("Connected to RabbitMQ!");

try {
channel = connection.createChannel();
} catch (IOException e) {
Logger.error("An error occurred whilst connecting to RabbitMQ!", e);
}
Logger.info("Connected to RabbitMQ!");
}

/**
Expand Down Expand Up @@ -93,35 +93,6 @@ public void initializeQueues() {
}
}

/**
* Sends a message to proxies to register the server
*/
public void sendServerDeclarationMessage() {
//formatting: {server-name}|:|{server-ip}|:|{server-port}

String message = STR."\{Cytosis.SERVER_ID}|:|\{Utils.getServerIP()}|:|\{CytosisSettings.SERVER_PORT}";
try {
channel.basicPublish("", SERVER_DECLARE_QUEUE, null, message.getBytes());
} catch (IOException e) {
Logger.error("An error occurred whilst attempting to send the server declaration message!", e);
}
Logger.info(STR."Server Declaration message sent! '\{message}'.");
}

/**
* Sends a message to proxies to unregister the server
*/
public void sendServerShutdownMessage() {
//formatting: {server-name}|:|{server-ip}|:|{server-port}
String message = STR."\{Cytosis.SERVER_ID}|:|\{Utils.getServerIP()}|:|\{CytosisSettings.SERVER_PORT}";
try {
channel.basicPublish("", SHUTDOWN_QUEUE, null, message.getBytes());
} catch (IOException e) {
Logger.error("An error occurred whilst attempting to send the server shutdown message!", e);
}
Logger.info(STR."Server Shutdown message sent! '\{message}'.");
}

/**
* Closes the RabbitMQ connection
*/
Expand Down
Loading

0 comments on commit a0a6b90

Please sign in to comment.