-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest.py
90 lines (77 loc) · 2.26 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import aioactioncable
import asyncio
import json
import ssl
import time
import serial
import concurrent.futures
import threading
import RPi.GPIO as GPIO
# 5615-2607-f140-400-7-91a-c23c-2631-d75e.ngrok-free.app
state = {
"dial_6": 0,
"dial_7": 0,
"checkbox_6": 0
}
stepperPositions = {
"dial_6": 0,
"dial_7": 0,
"checkbox_6": 0
}
experiment_id = '6'
steppers = ["dial_6", "dial_7"]
power = threading.Event()
GPIO.setmode(GPIO.BOARD)
power_pin = 40
GPIO.setup(power_pin, GPIO.OUT)
def worker(power):
while True:
if power.is_set():
GPIO.output(power_pin, GPIO.HIGH)
else:
time.sleep(5)
GPIO.output(power_pin, GPIO.LOW)
ser = serial.Serial('/dev/ttyUSB0',9600, timeout=1)
# ser.flush()
ser.reset_input_buffer()
t1 = threading.Thread(target = worker, args=(power,))
t1.start()
def fetcher(power):
def update():
global power
for stepper in steppers:
newmotorPos = state[stepper]
if newmotorPos != stepperPositions[stepper]:
print("stepperID: {}, stepperPos: {}".format(stepper, newmotorPos))
stepperPositions[stepper] = newmotorPos
if stepper == "dial_6":
stepper_id = 1
else:
stepper_id = 2
inputString = "M{0}:{1}\n".format(stepper_id, newmotorPos)
print(inputString)
# ser.write(inputString.encode('utf-8'))
time.sleep(1)
print(state)
if not power.is_set() and state["checkbox_6"] == True:
power.set()
elif state["checkbox_6"] == False and power.is_set():
power.clear()
def process(msg):
msg_json = json.loads(msg)
print(f'Processing {msg_json}')
print(msg_json['value'])
state[msg_json['control']] = msg_json['value']
update()
async def ac_recv(uri, identifier):
async with aioactioncable.connect(uri) as acconnect:
subscription = await acconnect.subscribe(identifier)
async for msg in subscription:
if json.loads(msg)['location'] == 'pi':
print(msg)
process(msg)
await subscription.send({**json.loads(msg), 'location': 'controls'})
update()
asyncio.run(ac_recv('ws://2.tcp.ngrok.io:11502/cable', {'channel': 'ExperimentChannel', 'experiment': experiment_id, "location": 'pi'}))
t2 = threading.Thread(target = fetcher, args=(power,))
t2.start()