Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[+] Multiplayer #10

Draft
wants to merge 80 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
8b163f0
[+] Copy code
hykilpikonna Jan 13, 2025
6cc5d4d
[-] Remove json and use manual serailization
hykilpikonna Jan 14, 2025
6cd386b
[+] Patch?
hykilpikonna Jan 14, 2025
595a293
[F] Fix patch?
hykilpikonna Jan 14, 2025
3205d29
[O] Better logging
hykilpikonna Jan 14, 2025
435f29d
[O] Reconnect
hykilpikonna Jan 14, 2025
7db889f
[F] Fix keychip
hykilpikonna Jan 14, 2025
cd1a875
[F] Possible lifecycle iteration corrections
NekoShatoo Jan 14, 2025
7793977
[+] Connect Failed Log
NekoShatoo Jan 14, 2025
60978ad
[+] Patch send enable
hykilpikonna Jan 14, 2025
83b7e43
[+]RM Packet Encrypt
NekoShatoo Jan 14, 2025
4d218d5
[+]Skip DeliveryChecker
NekoShatoo Jan 14, 2025
0cc6186
[F] Fix return
hykilpikonna Jan 14, 2025
1d168e9
[O] Use consts
hykilpikonna Jan 14, 2025
d561ca2
[F] Fix decrypt
hykilpikonna Jan 14, 2025
7ca4eac
[+] StartupNetworkChecker
NekoShatoo Jan 14, 2025
c8f8ffe
Merge branch 'futari' of https://github.com/MewoLab/AquaMai into futari
NekoShatoo Jan 14, 2025
64519b6
[+] Log to readable string
hykilpikonna Jan 15, 2025
2feccb1
[O] Log with locking
hykilpikonna Jan 15, 2025
8285f70
[+] ToIP
hykilpikonna Jan 15, 2025
d339f8c
[-] Remove redundant logs
hykilpikonna Jan 15, 2025
71ec739
[F] Fix accept
hykilpikonna Jan 15, 2025
8d5cf0d
[+] Regions
hykilpikonna Jan 15, 2025
1935d03
[F] Fix my ip
hykilpikonna Jan 15, 2025
6b9468d
[F] Fix loopback IP translation
hykilpikonna Jan 15, 2025
ea96423
[+] More logging
hykilpikonna Jan 15, 2025
b031fed
[F] Fix SYN ACK callback
hykilpikonna Jan 15, 2025
7211200
[F] Fix TCP Multiport
hykilpikonna Jan 15, 2025
3898fa6
[O] Block SettingHostAddress
hykilpikonna Jan 15, 2025
2cc8558
[F] Fix receiveFrom remoteEP
hykilpikonna Jan 15, 2025
926c025
[F] Fix receivefrom
hykilpikonna Jan 15, 2025
f058224
[F] Fix IP reverting
hykilpikonna Jan 15, 2025
1320afb
[M] Reorder files
hykilpikonna Jan 15, 2025
a31f004
[-] Removeu nused
hykilpikonna Jan 15, 2025
1c1e0da
[O] Config options
hykilpikonna Jan 15, 2025
b662e34
[F] Fix debug flag
hykilpikonna Jan 15, 2025
629a14a
[+] Delay
hykilpikonna Jan 15, 2025
56c72ae
[F] Fix delay calculation
hykilpikonna Jan 15, 2025
4b7f6d0
[+] Start Process Info [F] Fix Skip Origin
NekoShatoo Jan 15, 2025
894b4bd
Merge branch 'futari' of https://github.com/MewoLab/AquaMai into futari
NekoShatoo Jan 15, 2025
edac6d4
[O] Retry
hykilpikonna Jan 15, 2025
79998f9
[+]Startup Check Text && Mulit Connect Room
NekoShatoo Jan 15, 2025
04254f3
Merge branch 'futari' of https://github.com/MewoLab/AquaMai into futari
NekoShatoo Jan 15, 2025
10d7bcb
[O] Meow :3
hykilpikonna Jan 15, 2025
ec99646
[F] Fix unknown fumen crash game
hykilpikonna Jan 15, 2025
7aca93d
[+] Room Info Displayer
NekoShatoo Jan 15, 2025
dba56dc
Merge branch 'futari' of https://github.com/MewoLab/AquaMai into futari
NekoShatoo Jan 15, 2025
3321de2
[F] Fix RecruitInfo
NekoShatoo Jan 15, 2025
574efb1
[O] Style
hykilpikonna Jan 15, 2025
6ed7888
[O] Flashing meow?
hykilpikonna Jan 15, 2025
e07a3d3
[F] Fix utage
hykilpikonna Jan 15, 2025
d95210d
[F] Fix return logic
hykilpikonna Jan 15, 2025
1a02679
[U] Update spacing
hykilpikonna Jan 15, 2025
40d2c5f
[+] Online Player Displayer
NekoShatoo Jan 15, 2025
0e0de71
Merge branch 'futari' of https://github.com/MewoLab/AquaMai into futari
NekoShatoo Jan 15, 2025
b57e156
[*]StartupProcess Text
NekoShatoo Jan 16, 2025
e1ae587
[O] No leaks
hykilpikonna Jan 16, 2025
c450d77
[+]Force LanAvailable
NekoShatoo Jan 16, 2025
127a8cb
[F] Fix IsLanAvailable patch
hykilpikonna Jan 16, 2025
6fcab08
[+] Fetch recruits from http
hykilpikonna Jan 16, 2025
c21c046
[O] Style
hykilpikonna Jan 16, 2025
213ee60
[U] Use official lobby
hykilpikonna Jan 16, 2025
e50219b
[F] Fix sleepy code
hykilpikonna Jan 16, 2025
400c1bf
[+] Run and block blocks
hykilpikonna Jan 16, 2025
669be44
[O] Loopback local traffic
hykilpikonna Jan 16, 2025
4a959e1
[O] Bind can be local
hykilpikonna Jan 16, 2025
bbe260f
[+] Call finish recruit
hykilpikonna Jan 16, 2025
37778a6
[F] Fix order: Finsih recruit before start
hykilpikonna Jan 16, 2025
816865d
[F] Fix invocation
hykilpikonna Jan 16, 2025
899942f
[F] Fix type mismatch
hykilpikonna Jan 16, 2025
6ee5847
[F] Fix bind
hykilpikonna Jan 16, 2025
4ad6e99
[O] Final changes
hykilpikonna Jan 17, 2025
6762e10
[F] fix
hykilpikonna Jan 17, 2025
e56788d
[+] Copy code
hykilpikonna Jan 13, 2025
c6cf6ff
[-] Remove json and use manual serailization
hykilpikonna Jan 14, 2025
4799b3a
[+] Patch?
hykilpikonna Jan 14, 2025
1160ec2
[F] Fix patch?
hykilpikonna Jan 14, 2025
fecb440
[F] Fix null
hykilpikonna Jan 17, 2025
5b47098
[F] Fix windows bell sounds
hykilpikonna Jan 17, 2025
c988ece
Merge branch 'main' into futari
hykilpikonna Jan 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 211 additions & 0 deletions AquaMai.Mods/WorldsLink/FutariClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using AMDaemon;
using PartyLink;
using static Manager.Accounting;

namespace AquaMai.Mods.WorldsLink;

public class FutariClient(string keychip, string host, int port, int _)
{
// public static string LOBBY_BASE = "http://localhost/mai2-futari/recruit";
public const string LOBBY_BASE = "https://aquadx.net/aqua/mai2-futari/recruit";
public static FutariClient Instance { get; private set; }

public FutariClient(string keychip, string host, int port) : this(keychip, host, port, 0)
{
Instance = this;
}

public string keychip { get; set; } = keychip;

private TcpClient _tcpClient;
private StreamWriter _writer;
private StreamReader _reader;

public readonly ConcurrentQueue<Msg> sendQ = new();
// <Port + Stream ID, Message Queue>
public readonly ConcurrentDictionary<int, ConcurrentQueue<Msg>> tcpRecvQ = new();
// <Port, Message Queue>
public readonly ConcurrentDictionary<int, ConcurrentQueue<Msg>> udpRecvQ = new();
// <Port, Accept Queue>
public readonly ConcurrentDictionary<int, ConcurrentQueue<Msg>> acceptQ = new();
// <Port + Stream ID, Callback>
public readonly ConcurrentDictionary<int, Action<Msg>> acceptCallbacks = new();

private Thread _sendThread;
private Thread _recvThread;

private bool _reconnecting = false;

private readonly Stopwatch _heartbeat = new Stopwatch().Also(it => it.Start());
private readonly long[] _delayWindow = new int[20].Select(_ => -1L).ToArray();
public int _delayIndex = 0;
public long _delayAvg = 0;

public IPAddress StubIP => FutariExt.KeychipToStubIp(keychip).ToIP();

/// <summary>
/// -1: Failed to connect
/// 0: Not connect
/// 1: Connecting
/// 2: Connected
/// </summary>
public int StatusCode { get; private set; } = 0;
public string ErrorMsg { get; private set; } = "";

public void ConnectAsync() => new Thread(Connect) { IsBackground = true }.Start();

private void Connect()
{
_tcpClient = new TcpClient();

try
{
StatusCode = 1;
_tcpClient.Connect(host, port);
StatusCode = 2;
}
catch (Exception ex)
{
StatusCode = -1;
ErrorMsg = ex.Message;
Log.Error($"Error connecting to server:\nHost:{host}:{port}\n{ex.Message}");
ConnectAsync();
return;
}
var networkStream = _tcpClient.GetStream();
_writer = new StreamWriter(networkStream, Encoding.UTF8) { AutoFlush = true };
_reader = new StreamReader(networkStream, Encoding.UTF8);
_reconnecting = false;

// Register
Send(new Msg { cmd = Cmd.CTL_START, data = keychip });
Log.Info($"Connected to server at {host}:{port}");

// Start communication and message receiving in separate threads
_sendThread = 10.Interval(() =>
{
if (_heartbeat.ElapsedMilliseconds > 1000)
{
_heartbeat.Restart();
Send(new Msg { cmd = Cmd.CTL_HEARTBEAT });
}

// Send any data in the send queue
while (sendQ.TryDequeue(out var msg)) Send(msg);

}, final: Reconnect, name: "SendThread", stopOnError: true);

_recvThread = 10.Interval(() =>
{
var line = _reader.ReadLine();
if (line == null) return;

var message = Msg.FromString(line);
HandleIncomingMessage(message);

}, final: Reconnect, name: "RecvThread", stopOnError: true);
}

public void Bind(int bindPort, ProtocolType proto)
{
if (proto == ProtocolType.Tcp)
acceptQ.TryAdd(bindPort, new ConcurrentQueue<Msg>());
else if (proto == ProtocolType.Udp)
udpRecvQ.TryAdd(bindPort, new ConcurrentQueue<Msg>());
}

private void Reconnect()
{
Log.Warn("Reconnect Entered");
if (_reconnecting) return;
_reconnecting = true;

try { _tcpClient.Close(); }
catch { /* ignored */ }

try { _sendThread.Abort(); }
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚨 问题 (安全性): 用 CancellationTokens 替换 Thread.Abort() 以实现更安全的线程管理

Thread.Abort() 是危险的,可能会导致共享状态损坏。使用 CancellationTokenSource 和 CancellationToken 来优雅地取消操作。

Original comment in English

🚨 issue (security): Replace Thread.Abort() with CancellationTokens for safer thread management

Thread.Abort() is dangerous and can leave shared state corrupted. Use CancellationTokenSource and CancellationToken to gracefully cancel operations.

catch { /* ignored */ }

try { _recvThread.Abort(); }
catch { /* ignored */ }

_sendThread = null;
_recvThread = null;
_tcpClient = null;

// Reconnect
Log.Warn("Reconnecting...");
ConnectAsync();
}

private void HandleIncomingMessage(Msg msg)
{
if (msg.cmd != Cmd.CTL_HEARTBEAT)
Log.Info($"{StubIP} <<< {msg.ToReadableString()}");

switch (msg.cmd)
{
// Heartbeat
case Cmd.CTL_HEARTBEAT:
var delay = _heartbeat.ElapsedMilliseconds;
_delayWindow[_delayIndex] = delay;
_delayIndex = (_delayIndex + 1) % _delayWindow.Length;
_delayAvg = (long) _delayWindow.Where(x => x != -1).Average();
Log.Info($"Heartbeat: {delay}ms, Avg: {_delayAvg}ms");
break;

// UDP message
case Cmd.DATA_SEND or Cmd.DATA_BROADCAST when msg is { proto: ProtocolType.Udp, dPort: not null }:
udpRecvQ.Get(msg.dPort.Value)?.Also(q =>
{
Log.Info($"+ Added to UDP queue, there are {q.Count + 1} messages in queue");
})?.Enqueue(msg);
break;

// TCP message
case Cmd.DATA_SEND when msg.proto == ProtocolType.Tcp && msg is { sid: not null, dPort: not null }:
tcpRecvQ.Get(msg.sid.Value + msg.dPort.Value)?.Also(q =>
{
Log.Info($"+ Added to TCP queue, there are {q.Count + 1} messages in queue for port {msg.dPort}");
})?.Enqueue(msg);
break;

// TCP connection request
case Cmd.CTL_TCP_CONNECT when msg.dPort != null:
acceptQ.Get(msg.dPort.Value)?.Also(q =>
{
Log.Info($"+ Added to Accept queue, there are {q.Count + 1} messages in queue");
})?.Enqueue(msg);
break;

// TCP connection accept
case Cmd.CTL_TCP_ACCEPT when msg is { sid: not null, dPort: not null }:
acceptCallbacks.Get(msg.sid.Value + msg.dPort.Value)?.Invoke(msg);
break;
}
}

private void Send(Msg msg)
{
// Check if msg's destination ip is the same as my local ip. If so, handle it locally
if (msg.dst == StubIP.ToU32())
{
Log.Debug($"Loopback @@@ {msg.ToReadableString()}");
HandleIncomingMessage(msg);
return;
}

_writer.WriteLine(msg);
if (msg.cmd != Cmd.CTL_HEARTBEAT)
Log.Info($"{StubIP} >>> {msg.ToReadableString()}");
}
}
108 changes: 108 additions & 0 deletions AquaMai.Mods/WorldsLink/FutariExt.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#nullable enable
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Security.Cryptography;
using System.Text;
using System.Threading;
using Manager.Party.Party;
using PartyLink;

namespace AquaMai.Mods.WorldsLink;

public static class FutariExt
{
private static uint HashStringToUInt(string input)
{
using var md5 = MD5.Create();
var hashBytes = md5.ComputeHash(Encoding.UTF8.GetBytes(input));
return ((uint)(hashBytes[0] & 0xFF) << 24) |
((uint)(hashBytes[1] & 0xFF) << 16) |
((uint)(hashBytes[2] & 0xFF) << 8) |
((uint)(hashBytes[3] & 0xFF));
}

public static uint KeychipToStubIp(string keychip) => HashStringToUInt(keychip);

public static IPAddress ToIP(this uint val) => new(new IpAddress(val).GetAddressBytes());
public static uint ToU32(this IPAddress ip) => ip.ToNetworkByteOrderU32();

public static void Do<T>(this T x, Action<T> f) => f(x);
public static R Let<T, R>(this T x, Func<T, R> f) => f(x);
public static T Also<T>(this T x, Action<T> f) { f(x); return x; }

public static List<T> Each<T>(this IEnumerable<T> enu, Action<T> f) =>
enu.ToList().Also(x => x.ForEach(f));

public static byte[] View(this byte[] buffer, int offset, int size)
{
var array = new byte[size];
Array.Copy(buffer, offset, array, 0, size);
return array;
}

public static string B64(this byte[] buffer) => Convert.ToBase64String(buffer);
public static byte[] B64(this string str) => Convert.FromBase64String(str);

public static V? Get<K, V>(this ConcurrentDictionary<K, V> dict, K key) where V : class
{
return dict.GetValueOrDefault(key);
}

// Call a function using reflection
public static void Call(this object obj, string method, params object[] args)
{
obj.GetType().GetMethod(method)?.Invoke(obj, args);
}

public static uint MyStubIP() => KeychipToStubIp(AMDaemon.System.KeychipId.ShortValue);

public static string Post(this string url, string body) => new WebClient().UploadString(url, body);
public static void PostAsync(this string url, string body, UploadStringCompletedEventHandler? callback = null) =>
new WebClient().Also(web =>
{
callback?.Do(it => web.UploadStringCompleted += it);
web.UploadStringAsync(new Uri(url), body);
});

public static Thread Interval(
this int delay, Action action, bool stopOnError = false,
Action<Exception>? error = null, Action? final = null, string? name = null
) => new Thread(() =>
{
name ??= $"Interval {Thread.CurrentThread.ManagedThreadId} for {action}";
try
{
while (true)
{
try
{
Thread.Sleep(delay);
action();
}
catch (ThreadInterruptedException)
{
break;
}
catch (Exception e)
{
if (stopOnError) throw;
Log.Error($"Error in {name}: {e}");
}
}
}
catch (Exception e)
{
Log.Error($"Fatal error in {name}: {e}");
error?.Invoke(e);
}
finally
{
Log.Warn($"{name} stopped");
final?.Invoke();
}
}).Also(x => x.Start());

}
Loading