Skip to content

Commit

Permalink
feat: redis stream.
Browse files Browse the repository at this point in the history
  • Loading branch information
QwQ-dev committed Jan 5, 2025
1 parent 631154f commit 88fa6df
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,29 @@
* @since 2025-01-04 20:30
*/
public interface RStreamAcceptInterface {
/**
* Get the action name.
*
* @return action name
*/
String getActionName();

/**
* Get the target legacy player data service name.
*
* @return target legacy player data service name
*/
String getTargetLegacyPlayerDataServiceName();

/**
* Handle the data.
*
* <p>If the data is processed as expected, the data can be deleted in this method.
* If it is different from the expected, it will not be processed and handed over to other connections for processing. It is not exclusive.
* Unless it expires or is processed correctly by a connection, the data will always exist.
*
* @param rStream rStream
* @param entry entry
*/
void accept(RStream<Object, Object> rStream, Map.Entry<StreamMessageId, Map<Object, Object>> entry);
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,21 +52,35 @@ public ScheduledTask<?> start() {
RedisStreamAccepter.class
);

for (Class<?> clazz : annotatedClasses) {
try {
for (Map.Entry<StreamMessageId, Map<Object, Object>> streamMessageIdMapEntry : messages.entrySet()) {
// Get all msg
for (Map.Entry<StreamMessageId, Map<Object, Object>> streamMessageIdMapEntry : messages.entrySet()) {
// Get the class that uses @RedisStreamAccepter
for (Class<?> clazz : annotatedClasses) {
try {
// get key (action name)
StreamMessageId key = streamMessageIdMapEntry.getKey();

// LPDS name and data
Map<Object, Object> value = streamMessageIdMapEntry.getValue();

RStreamAcceptInterface redisStreamAcceptInterface =
(RStreamAcceptInterface) clazz.getDeclaredConstructor().newInstance();
// handle
for (Map.Entry<Object, Object> entry : value.entrySet()) {
RStreamAcceptInterface redisStreamAcceptInterface =
(RStreamAcceptInterface) clazz.getDeclaredConstructor().newInstance();

Object key1 = entry.getKey();
Object value1 = entry.getValue();

if (!redisStreamAcceptInterface.getActionName().equals(key1) ||
!redisStreamAcceptInterface.getTargetLegacyPlayerDataServiceName().equals(legacyPlayerDataService.getName())) {
continue;
}

redisStreamAcceptInterface.accept(rStream, streamMessageIdMapEntry);
redisStreamAcceptInterface.accept(rStream, streamMessageIdMapEntry);
}
} catch (Exception exception) {
Log.error("Failed to process Redis stream message", exception);
}
} catch (Exception exception) {
// DEBUG print
exception.printStackTrace();
Log.error("Failed to process Redis stream message", exception);
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,45 +27,43 @@ public String getActionName() {
return "player-data-sync-name";
}

public String getTargetLegacyPlayerDataServiceName() {
return "player-data-service";
}

@Override
public void accept(RStream<Object, Object> rStream, Map.Entry<StreamMessageId, Map<Object, Object>> streamMessageIdMapEntry) {
for (Map.Entry<Object, Object> entry : streamMessageIdMapEntry.getValue().entrySet()) {
Object key = entry.getKey();
String keyString = key.toString();
Object value = entry.getValue();

System.out.println(keyString);
System.out.println(value.toString());
if (keyString.equals(getActionName())) {
Pair<String, String> data = GsonUtil.getGson().fromJson(
value.toString(), new TypeToken<Pair<String, String>>() {
}.getType()
);

String first = data.getKey();
String second = data.getValue();
Pair<String, String> data = GsonUtil.getGson().fromJson(
value.toString(), new TypeToken<Pair<String, String>>() {
}.getType()
);

System.out.println(first);
System.out.println(second);
String first = data.getKey();
String second = data.getValue();

// Very slow, but it's async so it's fine
OfflinePlayer offlinePlayer =
Bukkit.getOfflinePlayer(second);
// Very slow, but it's async so it's fine
OfflinePlayer offlinePlayer =
Bukkit.getOfflinePlayer(second);

Optional<LegacyPlayerDataService> legacyPlayerDataService =
LegacyPlayerDataService.getLegacyPlayerDataService(first);
Optional<LegacyPlayerDataService> legacyPlayerDataService =
LegacyPlayerDataService.getLegacyPlayerDataService(first);

legacyPlayerDataService.ifPresent(service -> {
L1ToL2DataSyncTask.of(offlinePlayer.getUniqueId(), service).start().getFuture().whenComplete((aVoid, throwable) -> {
if (throwable != null) {
Log.error("Error while syncing player data", throwable);
return;
}
legacyPlayerDataService.ifPresent(service -> {
L1ToL2DataSyncTask.of(offlinePlayer.getUniqueId(), service).start().getFuture().whenComplete((aVoid, throwable) -> {
if (throwable != null) {
Log.error("Error while syncing player data", throwable);
return;
}

rStream.ack(getActionName(), streamMessageIdMapEntry.getKey());
});
rStream.remove(streamMessageIdMapEntry.getKey());
System.out.println("success");
});
}
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ public String getActionName() {
return "player-data-sync-uuid";
}

@Override
public String getTargetLegacyPlayerDataServiceName() {
return "";
}

@Override
public void accept(RStream<Object, Object> rStream, Map.Entry<StreamMessageId, Map<Object, Object>> streamMessageIdMapEntry) {
for (Map.Entry<Object, Object> entry : streamMessageIdMapEntry.getValue().entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ public static String getRMapCacheKey(LegacyPlayerDataService legacyPlayerDataSer
return legacyPlayerDataService.getName() + "-rmapcache";
}

public static String getRStreamGroupKey(LegacyPlayerDataService legacyPlayerDataService) {
return legacyPlayerDataService.getName() + "-rstreamgroup";
}

/**
* Generates a key for a {@link LegacyPlayerDataService} and a {@link UUID}.
*
Expand Down

0 comments on commit 88fa6df

Please sign in to comment.