Skip to content

Commit

Permalink
feat: Redis stream accept task to synchronize player data requests ac…
Browse files Browse the repository at this point in the history
…ross servers.
  • Loading branch information
QwQ-dev committed Jan 5, 2025
1 parent fb3c10a commit a1229a6
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 52 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package net.legacy.library.player.annotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* @author qwq-dev
* @since 2025-01-04 20:19
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RedisStreamAccept {
String[] redisStreamNames();
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public LegacyPlayerDataService(String name, MongoDBConnectionConfig mongoDBConne
this.mongoDBConnectionConfig = mongoDBConnectionConfig;

// Create L1 cache using Caffeine
CacheServiceInterface<Cache<String, LegacyPlayerData>, LegacyPlayerData> cacheStringCacheServiceInterface =
CacheServiceInterface<Cache<UUID, LegacyPlayerData>, LegacyPlayerData> cacheStringCacheServiceInterface =
CacheServiceFactory.createCaffeineCache();

// Create L2 cache using Redis
Expand Down Expand Up @@ -151,7 +151,7 @@ public <K, V> ScheduledTask<?> redisStreamPubTask(Map<K, V> data) {
*
* @return the {@link CacheServiceInterface} used for the first-level cache (L1)
*/
public CacheServiceInterface<Cache<String, LegacyPlayerData>, LegacyPlayerData> getL1Cache() {
public CacheServiceInterface<Cache<UUID, LegacyPlayerData>, LegacyPlayerData> getL1Cache() {
return flexibleMultiLevelCacheService.getCacheLevelElseThrow(1, () -> new IllegalStateException("L1 cache not found"))
.getCacheWithType();
}
Expand All @@ -163,14 +163,12 @@ public CacheServiceInterface<Cache<String, LegacyPlayerData>, LegacyPlayerData>
* @return an {@link Optional} containing the {@link LegacyPlayerData} if found, or empty if not found
*/
public Optional<LegacyPlayerData> getFromL1Cache(UUID uuid) {
String uuidString = uuid.toString();

// Get L1 cache
CacheServiceInterface<Cache<String, LegacyPlayerData>, LegacyPlayerData> l1Cache = getL1Cache();
Cache<String, LegacyPlayerData> l1CacheImpl = l1Cache.getCache();
CacheServiceInterface<Cache<UUID, LegacyPlayerData>, LegacyPlayerData> l1Cache = getL1Cache();
Cache<UUID, LegacyPlayerData> l1CacheImpl = l1Cache.getCache();

// Retrieve data from L1 cache
return Optional.ofNullable(l1CacheImpl.getIfPresent(uuidString));
return Optional.ofNullable(l1CacheImpl.getIfPresent(uuid));
}

/**
Expand Down Expand Up @@ -239,11 +237,11 @@ public LegacyPlayerData getLegacyPlayerData(UUID uuid) {
return dataFromL1Cache.get();
}

CacheServiceInterface<Cache<String, LegacyPlayerData>, LegacyPlayerData> l1Cache = getL1Cache();
Cache<String, LegacyPlayerData> l1CacheImpl = l1Cache.getCache();
CacheServiceInterface<Cache<UUID, LegacyPlayerData>, LegacyPlayerData> l1Cache = getL1Cache();
Cache<UUID, LegacyPlayerData> l1CacheImpl = l1Cache.getCache();
LegacyPlayerData legacyPlayerData = getFromL2Cache(uuid).orElseGet(() -> getFromDatabase(uuid));

l1CacheImpl.put(uuidString, legacyPlayerData);
l1CacheImpl.put(uuid, legacyPlayerData);
return legacyPlayerData;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import net.legacy.library.commons.task.TaskInterface;
import net.legacy.library.player.model.LegacyPlayerData;
import net.legacy.library.player.service.LegacyPlayerDataService;
import net.legacy.library.player.task.redis.L1ToL2DataSyncTask;
import net.legacy.library.player.util.KeyUtil;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
Expand All @@ -19,7 +20,6 @@
import java.time.Duration;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
* @author qwq-dev
Expand All @@ -39,28 +39,15 @@ public static PlayerDataPersistenceTask of(Duration delay, Duration interval, Lo
@Override
public ScheduledTask<?> start() {
Runnable runnable = () -> {
CacheServiceInterface<Cache<String, LegacyPlayerData>, LegacyPlayerData> l1Cache =
CacheServiceInterface<Cache<UUID, LegacyPlayerData>, LegacyPlayerData> l1Cache =
legacyPlayerDataService.getL1Cache();
RedisCacheServiceInterface l2Cache = legacyPlayerDataService.getL2Cache();
RedissonClient redissonClient = l2Cache.getCache();

// Sync L1 cache to L2 cache
l1Cache.getCache().asMap().forEach((key, legacyPlayerData) -> {
UUID uuid = legacyPlayerData.getUuid();
String serialized = SimplixSerializer.serialize(legacyPlayerData).toString();
String bucketKey = KeyUtil.getLegacyPlayerDataServiceKey(uuid, legacyPlayerDataService, "bucket-key");
String syncLockKey = KeyUtil.getLegacyPlayerDataServiceKey(uuid, legacyPlayerDataService, "persistence-l1-sync");

l2Cache.execute(
client -> client.getLock(syncLockKey),
client -> {
client.getBucket(bucketKey).set(serialized);
return null;
},
LockSettings.of(0, 0, TimeUnit.MILLISECONDS)
);
});
L1ToL2DataSyncTask.of(legacyPlayerDataService).start();

// Persistence
String lockKey = KeyUtil.getLegacyPlayerDataServiceKey(legacyPlayerDataService) + "-persistence-lock";
RLock lock = redissonClient.getLock(lockKey);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package net.legacy.library.player.task.redis;

import com.github.benmanes.caffeine.cache.Cache;
import de.leonhard.storage.internal.serialize.SimplixSerializer;
import io.fairyproject.scheduler.ScheduledTask;
import lombok.RequiredArgsConstructor;
import net.legacy.library.cache.model.LockSettings;
import net.legacy.library.cache.service.CacheServiceInterface;
import net.legacy.library.cache.service.redis.RedisCacheServiceInterface;
import net.legacy.library.commons.task.TaskInterface;
import net.legacy.library.player.model.LegacyPlayerData;
import net.legacy.library.player.service.LegacyPlayerDataService;
import net.legacy.library.player.util.KeyUtil;
import org.redisson.api.RedissonClient;

import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
* @author qwq-dev
* @since 2025-01-05 12:10
*/
@RequiredArgsConstructor
public class L1ToL2DataSyncTask implements TaskInterface {
private final UUID uuid;
private final LegacyPlayerDataService legacyPlayerDataService;

public static L1ToL2DataSyncTask of(LegacyPlayerDataService legacyPlayerDataService) {
return new L1ToL2DataSyncTask(null, legacyPlayerDataService);
}

public static L1ToL2DataSyncTask of(UUID uuid, LegacyPlayerDataService legacyPlayerDataService) {
return new L1ToL2DataSyncTask(uuid, legacyPlayerDataService);
}

@Override
public ScheduledTask<?> start() {
return schedule(() -> {
CacheServiceInterface<Cache<UUID, LegacyPlayerData>, LegacyPlayerData> l1Cache =
legacyPlayerDataService.getL1Cache();
RedisCacheServiceInterface l2Cache = legacyPlayerDataService.getL2Cache();
RedissonClient redissonClient = l2Cache.getCache();

l1Cache.getCache().asMap().forEach((key, legacyPlayerData) -> {
UUID uuid = legacyPlayerData.getUuid();

if (this.uuid != null && !this.uuid.equals(key)) {
return;
}

String serialized = SimplixSerializer.serialize(legacyPlayerData).toString();
String bucketKey = KeyUtil.getLegacyPlayerDataServiceKey(key, legacyPlayerDataService, "bucket-key");
String syncLockKey = KeyUtil.getLegacyPlayerDataServiceKey(key, legacyPlayerDataService, "l1-l2-sync-lock");

l2Cache.execute(
client -> client.getLock(syncLockKey),
client -> {
client.getBucket(bucketKey).set(serialized);
return null;
},
LockSettings.of(0, 0, TimeUnit.MILLISECONDS)
);
});
});
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
package net.legacy.library.player.task.redis;

import org.redisson.api.StreamMessageId;

import java.util.Map;

/**
* @author qwq-dev
* @since 2025-01-04 20:30
*/
public interface RedisStreamAcceptInterface {
boolean canAccept(StreamMessageId streamMessageId);

void accept(Map<Object, Object> message);
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
package net.legacy.library.player.task.redis;

import io.fairyproject.container.Autowired;
import io.fairyproject.container.InjectableComponent;
import io.fairyproject.container.scope.InjectableScope;
import io.fairyproject.log.Log;
import io.fairyproject.scheduler.ScheduledTask;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import net.legacy.library.annotation.service.AnnotationProcessingServiceInterface;
import net.legacy.library.annotation.util.AnnotationScanner;
import net.legacy.library.annotation.util.ReflectUtil;
import net.legacy.library.cache.service.redis.RedisCacheServiceInterface;
Expand All @@ -30,17 +26,13 @@
*/
@Getter
@RequiredArgsConstructor
@InjectableComponent(scope = InjectableScope.PROTOTYPE)
public class RedisStreamAcceptTask implements TaskInterface {
private final LegacyPlayerDataService legacyPlayerDataService;
private final String streamName;
private final List<String> basePackages;
private final List<ClassLoader> classLoaders;
private final Duration interval;

@Autowired
private AnnotationProcessingServiceInterface annotationProcessingService;

public static RedisStreamAcceptTask of(LegacyPlayerDataService legacyPlayerDataService, String streamName, List<String> basePackages, List<ClassLoader> classLoaders, Duration interval) {
return new RedisStreamAcceptTask(legacyPlayerDataService, streamName, basePackages, classLoaders, interval);
}
Expand All @@ -64,13 +56,12 @@ public ScheduledTask<?> start() {
try {
for (Map.Entry<StreamMessageId, Map<Object, Object>> streamMessageIdMapEntry : messages.entrySet()) {
StreamMessageId key = streamMessageIdMapEntry.getKey();
Map<Object, Object> value = streamMessageIdMapEntry.getValue();

RedisStreamAcceptInterface redisStreamAcceptInterface =
(RedisStreamAcceptInterface) clazz.getDeclaredConstructor().newInstance();

if (redisStreamAcceptInterface.canAccept(key)) {
redisStreamAcceptInterface.accept(streamMessageIdMapEntry.getValue());
}
redisStreamAcceptInterface.accept(value);
}
} catch (Exception exception) {
Log.error("Failed to process Redis stream message", exception);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,44 @@
package net.legacy.library.player.task.redis.impl;

import com.google.common.reflect.TypeToken;
import net.legacy.library.commons.util.GsonUtil;
import net.legacy.library.player.annotation.RedisStreamAccept;
import net.legacy.library.player.service.LegacyPlayerDataService;
import net.legacy.library.player.task.redis.L1ToL2DataSyncTask;
import net.legacy.library.player.task.redis.RedisStreamAcceptInterface;
import org.redisson.api.StreamMessageId;

import java.util.Map;
import java.util.Optional;
import java.util.UUID;

/**
* @author qwq-dev
* @since 2025-01-04 20:59
*/
@RedisStreamAccept(redisStreamNames = "player-data-sync")
@RedisStreamAccept
public class PlayerDataSyncRedisStreamAccept implements RedisStreamAcceptInterface {
@Override
public boolean canAccept(StreamMessageId streamMessageId) {
return false;
}

@Override
public void accept(Map<Object, Object> message) {
// TODO: player data sync (l1 -> l2)
for (Map.Entry<Object, Object> entry : message.entrySet()) {
// If the key is player-data-sync, cast the value to a map
if (entry.getKey().toString().equals("player-data-sync")) {
// This map key is LPDS name, value is player uuid
Map<String, String> value = GsonUtil.GSON.fromJson(
entry.getValue().toString(), new TypeToken<Map<String, String>>() {
}.getType()
);

// L1 -> L2
for (Map.Entry<String, String> dataSyncEntry : value.entrySet()) {
String lpdsName = dataSyncEntry.getValue();
String playerUuid = dataSyncEntry.getKey();
UUID uuid = UUID.fromString(playerUuid);

Optional<LegacyPlayerDataService> legacyPlayerDataService =
LegacyPlayerDataService.getLegacyPlayerDataService(lpdsName);
legacyPlayerDataService.ifPresent(service -> L1ToL2DataSyncTask.of(uuid, service).start());
}
}
}
}
}
}

0 comments on commit a1229a6

Please sign in to comment.