forked from andybi7676/BioExpFinal-InvisibleHand
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
149 lines (120 loc) · 4.92 KB
/
main.py
File metadata and controls
149 lines (120 loc) · 4.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import sys
import os
import random
from PyQt5.QtWidgets import QApplication, QWidget
from PyQt5.QtCore import Qt
from PyQt5 import QtBluetooth
class SignalReaderBthConn(QWidget):
def __init__(self, addr, decisionMaker=None):
self.addr = addr
super().__init__()
self.connectToRobot()
self.win = QWidget()
self.win.show()
self.dataBuffer = []
self.decisionMaker = decisionMaker
def connectToRobot(self):
self.sock = QtBluetooth.QBluetoothSocket(QtBluetooth.QBluetoothServiceInfo.RfcommProtocol)
self.sock.connected.connect(self.connectedToBluetooth)
self.sock.readyRead.connect(self.receivedBluetoothMessage)
self.sock.disconnected.connect(self.disconnectedFromBluetooth)
self.sock.error.connect(self.socketError)
port = 1
self.sock.connectToService(QtBluetooth.QBluetoothAddress(self.addr),port)
def log(self, content):
print(f"[ SIGNAL_RECEIVER ] - {content}")
def socketError(self,error):
self.log(self.sock.errorString())
def connectedToBluetooth(self):
self.log("connected")
self.sock.write('connnection from pc'.encode())
def disconnectedFromBluetooth(self):
self.log('Disconnected from bluetooth')
def receivedBluetoothMessage(self):
while self.sock.canReadLine():
signal = str(self.sock.readLine())[2:-3].strip()
if not self.decisionMaker.makingDecision:
self.log(f'received signal: {signal}, sent to decisionMaker')
self.decisionMaker.makeDecision(signal)
else:
self.log(f"received signal: {signal}, but cannot send to decisionMaker")
class DecisionSenderBthConn(QWidget):
def __init__(self, addr):
super().__init__()
self.addr = addr
self.connectToRobot()
self.win = QWidget()
self.win.show()
self.connected = False
def log(self, content):
print(f"[ DICISION_SENDER ] - {content}")
def sendDecision(self, decision: str):
if (self.connected):
self.sock.write(decision.encode())
self.log(f"sent decision: \'{decision}\'")
def connectToRobot(self):
self.sock = QtBluetooth.QBluetoothSocket(QtBluetooth.QBluetoothServiceInfo.RfcommProtocol)
self.sock.connected.connect(self.connectedToBluetooth)
self.sock.readyRead.connect(self.receivedBluetoothMessage)
self.sock.disconnected.connect(self.disconnectedFromBluetooth)
self.sock.error.connect(self.socketError)
port = 1
self.sock.connectToService(QtBluetooth.QBluetoothAddress(self.addr),port)
def socketError(self,error):
self.log(self.sock.errorString())
def connectedToBluetooth(self):
self.connected = True
self.log("connected")
self.sock.write('connnection from pc'.encode())
def disconnectedFromBluetooth(self):
self.log('Disconnected from bluetooth')
def receivedBluetoothMessage(self):
while self.sock.canReadLine():
line = str(self.sock.readLine(), encoding='utf-8').strip()
self.log(line)
class DecisionMaker():
def __init__(self, decisionSender=None) -> None:
super().__init__()
self.decisionSender = decisionSender
self.makingDecision = False
self.decisions = ['F', 'B', 'L', 'R', 'A', 'D', 'S']
self.prevDecision = 'S'
pass
def log(self, content):
print(f"[ DICISION_MAKER ] - {content}")
def makeDecision(self, signal):
roll = int(signal.split(' ')[0])
pitch = int(signal.split(' ')[1])
self.makingDecision = True
decision = ''
threshold = 25
decision = 'S'
srange = range(-threshold, threshold)
if roll > threshold and pitch in srange:
decision = 'B'
elif roll < -threshold and pitch in srange:
decision = 'F'
elif pitch > threshold and roll in srange:
decision = 'L'
elif pitch < -threshold and roll in srange:
decision = 'R'
# if (signal.upper() in self.decisions):
# decision = signal.upper()
# else:
# decision = random.choice(self.decisions)
self.log(f'made decision: \'{decision}\'')
if self.prevDecision != decision:
self.decisionSender.sendDecision(decision)
self.prevDecision = decision
self.makingDecision = False
def main():
# deal with a bluetooth bug on mac
if sys.platform == 'darwin':
os.environ['QT_EVENT_DISPATCHER_CORE_FOUNDATION'] = '1'
app = QApplication(sys.argv)
decisionSender = DecisionSenderBthConn("00:13:EF:00:27:9D") # BioExpG5-2
decisionMaker = DecisionMaker(decisionSender=decisionSender)
signalReceiver = SignalReaderBthConn("98:D3:81:FD:46:F2", decisionMaker=decisionMaker) # BioExpG5-1
sys.exit(app.exec_())
if __name__ == '__main__':
main()