Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pull again! #25

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion conf/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ mp {
gateway-client-num=1 //网关客户端连接数

admin-server-port=3002 //控制台服务端口, 内部端口
ws-server-port=0 //websocket对外端口, 公网端口, 0表示禁用websocket
ws-server-port=8008 //websocket对外端口, 公网端口, 0表示禁用websocket
ws-path="/" //websocket path

public-host-mapping { //本机局域网IP和公网IP的映射关系, 该配置后续会被废弃
Expand Down
2 changes: 1 addition & 1 deletion mpush-boot/src/main/java/com/mpush/bootstrap/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public static void main(String[] args) {
Logs.init();
Logs.Console.info("launch mpush server...");
ServerLauncher launcher = new ServerLauncher();
launcher.init();
// launcher.init();
launcher.start();
addHook(launcher);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public void init() {
}

public void start() {
init();
chain.start();
}

Expand Down
5 changes: 3 additions & 2 deletions mpush-boot/src/main/resources/mpush.conf
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ mp.redis { //redis 集群配置
}
mp.net.local-ip="" //本地ip, 默认取第一个网卡的本地IP
mp.net.public-ip="" //外网ip, 默认取第一个网卡的外网IP
mp.net.ws-server-port=0 //websocket对外端口, 0表示禁用websocket
mp.net.ws-server-port=8008 //websocket对外端口, 0表示禁用websocket
mp.net.ws-path="/client/ws"
mp.net.gateway-server-net=tcp // 网关服务使用的网络 udp/tcp
mp.net.connect-server-port=3000 //接入服务的端口号
mp.http.proxy-enabled=true //启用Http代理功能
mp.http.proxy-enabled=false //启用Http代理功能
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
/**
* 此线程池可伸缩,线程空闲一定时间后回收,新请求重新创建线程
*/
@Spi(order = 1)
@Spi(order = 2)
public final class ClientExecutorFactory extends CommonExecutorFactory {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public final class MPushClient implements MPushContext {
public MPushClient() {
monitorService = new MonitorService();

//本项目部署在mpns里,避免eventbus初始化两次,故注释掉如下代码。如要独立部署敬请放开一下代码
EventBus.create(monitorService.getThreadPoolManager().getEventBusExecutor());

pushRequestBus = new PushRequestBus(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public final class AdminHandler extends SimpleChannelInboundHandler<String> {

public AdminHandler(MPushServer mPushServer) {
this.mPushServer = mPushServer;
init();
}

public void init() {
Expand Down
16 changes: 12 additions & 4 deletions mpush-core/src/main/java/com/mpush/core/router/RouterCenter.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,18 @@ public RouterCenter(MPushServer mPushServer) {

@Override
protected void doStart(Listener listener) throws Throwable {
localRouterManager = new LocalRouterManager();
remoteRouterManager = new RemoteRouterManager();
routerChangeListener = new RouterChangeListener(mPushServer);
userEventConsumer = new UserEventConsumer(remoteRouterManager);
if (localRouterManager == null){
localRouterManager = new LocalRouterManager();
}
if (remoteRouterManager == null){
remoteRouterManager = new RemoteRouterManager();
}
if (routerChangeListener == null){
routerChangeListener = new RouterChangeListener(mPushServer);
}
if (userEventConsumer == null){
userEventConsumer = new UserEventConsumer(remoteRouterManager);
}
userEventConsumer.getUserManager().clearUserOnlineData();
super.doStart(listener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.mpush.core.handler.AckHandler;
import com.mpush.core.handler.BindUserHandler;
import com.mpush.core.handler.HandshakeHandler;
import com.mpush.core.handler.HeartBeatHandler;
import com.mpush.netty.server.NettyTCPServer;
import com.mpush.tools.config.CC;
import io.netty.bootstrap.ServerBootstrap;
Expand Down Expand Up @@ -67,6 +68,7 @@ public WebsocketServer(MPushServer mPushServer) {
public void init() {
super.init();
connectionManager.init();
messageDispatcher.register(Command.HEARTBEAT, HeartBeatHandler::new);
messageDispatcher.register(Command.HANDSHAKE, () -> new HandshakeHandler(mPushServer));
messageDispatcher.register(Command.BIND, () -> new BindUserHandler(mPushServer));
messageDispatcher.register(Command.UNBIND, () -> new BindUserHandler(mPushServer));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void testPush() throws Exception {

PushContext context = PushContext.build(msg)
.setAckModel(AckModel.AUTO_ACK)
.setUserId("user-" + i)
.setUserId("20")
.setBroadcast(false)
//.setTags(Sets.newHashSet("test"))
//.setCondition("tags&&tags.indexOf('test')!=-1")
Expand Down
3 changes: 2 additions & 1 deletion mpush-test/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mp.zk.server-address="127.0.0.1:2181"
mp.redis {// redis 集群配置
nodes:["127.0.0.1:6379"]//格式是ip:port,密码可以没有ip:port
}
mp.http.proxy-enabled=true
mp.http.proxy-enabled=false

mp.net {
gateway-server-net=tcp //网关服务使用的网络类型tcp/udp
Expand All @@ -17,4 +17,5 @@ mp.net {
gateway-client-port=4000 //UDP客户端端口, 内部端口
admin-server-port=3002 //控制台服务端口, 内部端口
ws-server-port=8008 //websocket对外端口, 0表示禁用websocket
ws-path=/client/ws
}
6 changes: 4 additions & 2 deletions mpush-tools/src/main/java/com/mpush/tools/event/EventBus.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ public class EventBus {
private static com.google.common.eventbus.EventBus eventBus;

public static void create(Executor executor) {
eventBus = new AsyncEventBus(executor, (exception, context)
-> LOGGER.error("event bus subscriber ex", exception));
if (eventBus == null){
eventBus = new AsyncEventBus(executor, (exception, context)
-> LOGGER.error("event bus subscriber ex", exception));
}
}

public static void post(Event event) {
Expand Down