Skip to content

Commit 6e02b03

Browse files
feat(*): 添加tcp客户端封装
1 parent c4517e5 commit 6e02b03

File tree

5 files changed

+275
-0
lines changed

5 files changed

+275
-0
lines changed
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.pengxh.kt.lite.utils.socket.tcp
2+
3+
sealed class ConnectState {
4+
/**
5+
* 连接成功
6+
* */
7+
object SUCCESS : ConnectState()
8+
9+
/**
10+
* 关闭连接
11+
* */
12+
object CLOSED : ConnectState()
13+
14+
/**
15+
* 连接失败
16+
* */
17+
object ERROR : ConnectState()
18+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.pengxh.kt.lite.utils.socket.tcp
2+
3+
interface ISocketListener {
4+
/**
5+
* 当接收到系统消息
6+
*/
7+
fun onMessageResponse(data: ByteArray?)
8+
9+
/**
10+
* 当连接状态发生变化时调用
11+
*/
12+
fun onConnectStatusChanged(state: ConnectState)
13+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package com.pengxh.kt.lite.utils.socket.tcp
2+
3+
import android.util.Log
4+
import io.netty.channel.ChannelHandlerContext
5+
import io.netty.channel.SimpleChannelInboundHandler
6+
7+
class SocketChannelHandle(private val listener: ISocketListener?) :
8+
SimpleChannelInboundHandler<ByteArray>() {
9+
10+
private val kTag = "SocketChannelHandle"
11+
12+
override fun channelActive(ctx: ChannelHandlerContext) {
13+
super.channelActive(ctx)
14+
Log.d(kTag, "channelActive ===> 连接成功")
15+
listener?.onConnectStatusChanged(ConnectState.SUCCESS)
16+
}
17+
18+
override fun channelInactive(ctx: ChannelHandlerContext) {
19+
super.channelInactive(ctx)
20+
Log.e(kTag, "channelInactive: 连接断开")
21+
listener?.onConnectStatusChanged(ConnectState.CLOSED)
22+
}
23+
24+
override fun channelRead0(ctx: ChannelHandlerContext, data: ByteArray?) {
25+
listener?.onMessageResponse(data)
26+
}
27+
28+
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
29+
super.exceptionCaught(ctx, cause)
30+
Log.d(kTag, "exceptionCaught ===> $cause")
31+
listener?.onConnectStatusChanged(ConnectState.ERROR)
32+
ctx.close()
33+
}
34+
}
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
package com.pengxh.kt.lite.utils.socket.tcp
2+
3+
import android.os.SystemClock
4+
import android.util.Log
5+
import io.netty.bootstrap.Bootstrap
6+
import io.netty.channel.AdaptiveRecvByteBufAllocator
7+
import io.netty.channel.Channel
8+
import io.netty.channel.ChannelFuture
9+
import io.netty.channel.ChannelFutureListener
10+
import io.netty.channel.ChannelInitializer
11+
import io.netty.channel.ChannelOption
12+
import io.netty.channel.nio.NioEventLoopGroup
13+
import io.netty.channel.socket.SocketChannel
14+
import io.netty.channel.socket.nio.NioSocketChannel
15+
import io.netty.handler.codec.bytes.ByteArrayDecoder
16+
import io.netty.handler.codec.bytes.ByteArrayEncoder
17+
import io.netty.handler.timeout.IdleStateHandler
18+
19+
class SocketClient {
20+
21+
private val kTag = "SocketClient"
22+
private var host = ""
23+
private var port = 8888
24+
private var nioEventLoopGroup: NioEventLoopGroup? = null
25+
private var channel: Channel? = null
26+
private var listener: ISocketListener? = null
27+
28+
//现在连接的状态
29+
var connectStatus = false //判断是否已连接
30+
private var reconnectNum = Int.MAX_VALUE //定义的重连到时候用
31+
private var isNeedReconnect = true //是否需要重连
32+
var isConnecting = false //是否正在连接
33+
private set
34+
private var reconnectIntervalTime: Long = 15000 //重连的时间
35+
36+
//重连时间
37+
fun setReconnectNum(reconnectNum: Int) {
38+
this.reconnectNum = reconnectNum
39+
}
40+
41+
fun setReconnectIntervalTime(reconnectIntervalTime: Long) {
42+
this.reconnectIntervalTime = reconnectIntervalTime
43+
}
44+
45+
fun setSocketListener(listener: ISocketListener?) {
46+
this.listener = listener
47+
}
48+
49+
fun connect(host: String, port: Int) {
50+
this.host = host
51+
this.port = port
52+
Log.d(kTag, "connect ===> 开始连接TCP服务器")
53+
if (isConnecting) {
54+
return
55+
}
56+
//起个线程
57+
val clientThread = object : Thread("client-Netty") {
58+
override fun run() {
59+
super.run()
60+
isNeedReconnect = true
61+
reconnectNum = Int.MAX_VALUE
62+
connectServer()
63+
}
64+
}
65+
clientThread.start()
66+
}
67+
68+
private fun connectServer() {
69+
synchronized(this@SocketClient) {
70+
var channelFuture: ChannelFuture? = null //连接管理对象
71+
if (!connectStatus) {
72+
isConnecting = true
73+
nioEventLoopGroup = NioEventLoopGroup() //设置的连接group
74+
val bootstrap = Bootstrap()
75+
bootstrap.group(nioEventLoopGroup) //设置的一系列连接参数操作等
76+
.channel(NioSocketChannel::class.java)
77+
.option(ChannelOption.TCP_NODELAY, true) //无阻塞
78+
.option(ChannelOption.SO_KEEPALIVE, true) //长连接
79+
.option(
80+
ChannelOption.RCVBUF_ALLOCATOR,
81+
AdaptiveRecvByteBufAllocator(5000, 5000, 8000)
82+
) //接收缓冲区 最小值太小时数据接收不全
83+
.handler(object : ChannelInitializer<SocketChannel>() {
84+
override fun initChannel(channel: SocketChannel) {
85+
val pipeline = channel.pipeline()
86+
//参数1:代表读套接字超时的时间,没收到数据会触发读超时回调;
87+
//参数2:代表写套接字超时时间,没进行写会触发写超时回调;
88+
//参数3:将在未执行读取或写入时触发超时回调,0代表不处理;
89+
//读超时尽量设置大于写超时,代表多次写超时时写心跳包,多次写了心跳数据仍然读超时代表当前连接错误,即可断开连接重新连接
90+
pipeline.addLast(IdleStateHandler(60, 10, 0))
91+
pipeline.addLast(ByteArrayDecoder())
92+
pipeline.addLast(ByteArrayEncoder())
93+
pipeline.addLast(SocketChannelHandle(listener))
94+
}
95+
})
96+
try {
97+
//连接监听
98+
channelFuture = bootstrap.connect(host, port)
99+
.addListener(object : ChannelFutureListener {
100+
override fun operationComplete(channelFuture: ChannelFuture) {
101+
if (channelFuture.isSuccess) {
102+
connectStatus = true
103+
channel = channelFuture.channel()
104+
} else {
105+
Log.e(kTag, "operationComplete: 连接失败")
106+
connectStatus = false
107+
}
108+
isConnecting = false
109+
}
110+
}).sync()
111+
// 等待连接关闭
112+
channelFuture.channel().closeFuture().sync()
113+
} catch (e: Exception) {
114+
e.printStackTrace()
115+
} finally {
116+
connectStatus = false
117+
listener?.onConnectStatusChanged(ConnectState.CLOSED)
118+
channelFuture?.apply {
119+
channel?.apply {
120+
if (isOpen) {
121+
close()
122+
}
123+
}
124+
}
125+
nioEventLoopGroup?.shutdownGracefully()
126+
reconnect() //重新连接
127+
}
128+
}
129+
}
130+
}
131+
132+
//断开连接
133+
fun disconnect() {
134+
Log.d(kTag, "disconnect ===> 断开连接")
135+
isNeedReconnect = false
136+
nioEventLoopGroup?.shutdownGracefully()
137+
}
138+
139+
//重新连接
140+
private fun reconnect() {
141+
if (isNeedReconnect && reconnectNum > 0 && !connectStatus) {
142+
reconnectNum--
143+
SystemClock.sleep(reconnectIntervalTime)
144+
if (isNeedReconnect && reconnectNum > 0 && !connectStatus) {
145+
Log.d(kTag, "reconnect ===> 重新连接")
146+
connectServer()
147+
}
148+
}
149+
}
150+
151+
fun sendData(bytes: ByteArray) {
152+
channel?.writeAndFlush(bytes)?.addListener(ChannelFutureListener { future ->
153+
if (!future.isSuccess) {
154+
future.channel().close()
155+
nioEventLoopGroup!!.shutdownGracefully()
156+
}
157+
})
158+
}
159+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package com.pengxh.kt.lite.utils.socket.tcp
2+
3+
import android.util.Log
4+
5+
class SocketManager private constructor() : ISocketListener {
6+
7+
private val kTag = "SocketManager"
8+
private var nettyClient = SocketClient()
9+
10+
companion object {
11+
val get: SocketManager by lazy(mode = LazyThreadSafetyMode.SYNCHRONIZED) { SocketManager() }
12+
}
13+
14+
fun connectServer(hostname: String, port: Int) {
15+
Thread {
16+
if (!nettyClient.connectStatus) {
17+
nettyClient.setSocketListener(this)
18+
nettyClient.connect(hostname, port)
19+
} else {
20+
nettyClient.disconnect()
21+
}
22+
}.start()
23+
}
24+
25+
/**
26+
* 处理接收消息
27+
* */
28+
override fun onMessageResponse(data: ByteArray?) {
29+
30+
}
31+
32+
override fun onConnectStatusChanged(state: ConnectState) {
33+
if (state == ConnectState.SUCCESS) {
34+
if (nettyClient.connectStatus) {
35+
Log.d(kTag, "连接成功")
36+
}
37+
} else {
38+
if (!nettyClient.connectStatus) {
39+
Log.e(kTag, "onConnectStatusChanged: 连接断开,正在重连")
40+
}
41+
}
42+
}
43+
44+
fun sendData(bytes: ByteArray) {
45+
nettyClient.sendData(bytes)
46+
}
47+
48+
fun close() {
49+
nettyClient.disconnect()
50+
}
51+
}

0 commit comments

Comments
 (0)