-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathreceive.py
62 lines (50 loc) · 1.54 KB
/
receive.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
"""Example program to show how to read a multi-channel time series from LSL."""
from pylsl import StreamInlet, resolve_stream
from __future__ import print_function
try:
import pyaudio
except:
print("This demo requires pyaudio installed to function")
import sys
sys.exit(1)
import numpy as np
import scipy as sp
from scipy.integrate import simps
NUM_SAMPLES = 1024
SAMPLING_RATE = 44100
MAX_FREQ = SAMPLING_RATE / 2
FREQ_SAMPLES = NUM_SAMPLES / 8
TIMESLICE = 100 # ms
NUM_BINS = 16
data = {'values': None}
def _get_audio_data():
pa = pyaudio.PyAudio()
stream = pa.open(
format=pyaudio.paInt16,
channels=1,
rate=SAMPLING_RATE,
input=True,
frames_per_buffer=NUM_SAMPLES
)
while True:
try:
raw_data = np.fromstring(stream.read(NUM_SAMPLES), dtype=np.int16)
signal = raw_data / 32768.0
fft = sp.fft(signal)
spectrum = abs(fft)[:NUM_SAMPLES/2]
power = spectrum**2
bins = simps(np.split(power, NUM_BINS))
data['values'] = signal, spectrum, bins
except:
continue
# first resolve an EEG stream on the lab network
print("looking for an EEG stream...")
streams = resolve_stream('type', 'EEG')
print("resolved an EEG stream...")
# create a new inlet to read from the stream
inlet = StreamInlet(streams[0])
while True:
# get a new sample (you can also omit the timestamp part if you're not
# interested in it)
sample, timestamp = inlet.pull_sample()
print(timestamp, sample)