-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
86 lines (70 loc) · 1.95 KB
/
main.py
File metadata and controls
86 lines (70 loc) · 1.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import os
import re
import socket
import select
global tcp_socket
global udp_socket
def tcp_connect():
# 与机器人控制命令端口建立 TCP 连接
tcp_address = ("192.168.42.2", int(40923))
global tcp_socket
tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print("TCP Connecting...")
tcp_socket.connect(tcp_address)
print("TCP Connected!")
def udp_connect():
# 与机器人控制命令端口建立 UDP 连接
udp_address = ("0.0.0.0", int(40924))
global udp_socket
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
print("UDP Connecting...")
udp_socket.bind(udp_address)
print("UDP Connected!")
def start_get_game_msg():
# 进入明文模式
msg = "command;"
tcp_socket.send(msg.encode('utf-8'))
buf = tcp_socket.recv(1024)
print(buf.decode('utf-8'))
msg = "quit;"
tcp_socket.send(msg.encode('utf-8'))
buf = tcp_socket.recv(1024)
print(buf.decode('utf-8'))
msg = "command;"
tcp_socket.send(msg.encode('utf-8'))
buf = tcp_socket.recv(1024)
print(buf.decode('utf-8'))
# 开始赛事推送
msg = "game_msg on;"
tcp_socket.send(msg.encode('utf-8'))
buf = tcp_socket.recv(1024)
print(buf.decode('utf-8'))
def main():
tcp_connect()
udp_connect()
start_get_game_msg()
while True:
# print("Waiting msg")
# 设置超时时间
ready = select.select([udp_socket], [], [], 5)
if ready[0]:
# 接收数据并转为数组
buf = udp_socket.recv(1024)
game_msg = re.findall(r"\d+", buf.decode('utf-8'))
print(game_msg)
if int(game_msg[6]) > 0:
key_num = [0] * 3
for i in range(7, 7 + int(game_msg[6]), 1):
key_num[i - 7] = int(game_msg[i])
print(key_num)
# 判断按下的按键并触发对应函数
if 81 in key_num: # 按下Q
os.system('cd ~/RM-yolo/RMSDK && python3 06_final.py')
# 关闭端口连接
tcp_socket.shutdown(socket.SHUT_WR)
tcp_socket.close()
udp_socket.shutdown(socket.SHUT_WR)
udp_socket.close()
print("Socket Closed")
if __name__ == '__main__':
main()