-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgetTriggers.py
More file actions
executable file
·218 lines (167 loc) · 6.4 KB
/
getTriggers.py
File metadata and controls
executable file
·218 lines (167 loc) · 6.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
from __future__ import print_function
import numpy as np
from collections import Counter
import datetime
import pandas as pd
import mne
import os
import glob
import argparse
from natsort import natsorted, ns
# defining constants
fs = 2000 # sampling freq for edf files
numframes = 1 #frames per bit on display
refresh = 1/60 #refresh period of monitor --- frame rate
bitperiod = numframes*refresh*fs
bitsperpacket = 8 #number of bits for each data packet
# defining lists for creating dataframe later to
trigger = []
trigger_time = []
trigger_index = []
def main(array, patient_id):
data = array[0]
# function to clear noise - the values are file specific
def clearing_noise():
data[0:3600000] = 0
# data = data *-1
x = -13750
data[data > x] = x #filtering out the noise #(data < 15000 in other cases)
clearing_noise()
# getting the most common value, which will be either zero or the maximum value of trigger
b = Counter(data)
peak = b.most_common()[1][0]
print("peak: ", peak)
print("data: ", data)
# cutoff to take care of noise due to decay in trigger signal
cutoff = peak + 500
print("cutoff: ", cutoff)
# getting everything in terms of ones and zeros - everything above the cutoff = 1, else 0
# cutoff = 0
filtereddata=np.zeros((1,len(data)))
ii = np.where(data < cutoff) # greater than (>) when upright triggers
filtereddata[0][ii[0]] = 1
risingedgetimes = np.where(np.diff(filtereddata[0]) > 0)
risingedges = np.zeros((1,len(data)))
risingedges[0][risingedgetimes[0]] = 1
print(risingedges)
# the start bit time!
startbittime = risingedgetimes[0][0] #prints the occurance of first one
print("start: ", startbittime)
# loop for extracting data
while (1):
testval = np.zeros([bitsperpacket])
testval_time = np.zeros([1])
testval_time[0]=startbittime
# extract one byte(arbitrary size, packet) at a time flanked by start/stop bits
# uses average value of signal during each bit period to allow for error
for n in range(1,bitsperpacket+1):
nd1 = int(startbittime+((n)*bitperiod)) #start after the start bit
nd2 = int(startbittime+((n+1)*bitperiod)) #end 8 bits after start bit
arr = filtereddata[0][range(nd1, nd2)]
# getting average value (85%) of signal for each bit period
mean_val = np.mean(arr)
if(mean_val > 0.95):
val = 1
else:
val = 0
testval[n-1] = val
# print triggers in terminal with trigger index and trigger time
def show_trigger():
if(bi2de(testval) > 0 and (bi2de(testval) != 1 or bi2de(testval) != 4 or bi2de(testval) != 16)):#==170 or bi2de(testval) == 1 or bi2de(testval) == 4 or bi2de(testval) == 16):# or bi2de(testval)!=0):
trigger.append(bi2de(testval))
trigger_time.append(testval_time[0]/2000)
trigger_index.append(testval_time[0])
print(testval)
print("trigger: ", bi2de(testval), end=" time: ")
print(secConversion(testval_time[0]/2000), end= " seconds: ")
print(testval_time[0]/2000, end=" index: ")
print(testval_time[0])
print()
show_trigger()
# Getting the next start bit after the end of the previous stop bit
newrisingedges = risingedgetimes[0][np.where(risingedgetimes[0]>(startbittime+(bitsperpacket+2)*bitperiod))[0]]
if (len(newrisingedges) == 0):
break
startbittime = newrisingedges[0]
print("new start bit time: ", startbittime)
# save to CSV
saveToCSV(trigger, trigger_time, trigger_index, patient_id)
# plotting for visualization
mne_array(array, 'trigger_ch')
def secConversion(sec):
time = datetime.timedelta(seconds=sec)
return(time)
def bi2de(array):
num = 0
for i in range(array.shape[0]):
num = num + (array[i]*(2**(7-i)))
# if(num==129):
# num=1
# if(num==134 or num==132):
# num = 4
# if(num==152 or num==144 or num==24):
# num=16
# for some files:
if(num==129 or num==128):
num=1
if(num==134 or num==132):
num = 4
if(num==152 or num==144 or num==24 or num==136):
num=16
if(num==255):
num=170
if(num==176):
num=32
return(num)
def saveToCSV(trigger, trigger_time, trigger_index, patientID):
cwd = '/data/Trigger_CSV_Files'
filename = str(patientID)+"_"+"trigger_ecog.csv"
download_destination = os.path.join(cwd, filename)
if not os.path.exists(os.path.dirname(download_destination)):
os.makedirs(os.path.dirname(download_destination))
trigger_dict = pd.DataFrame({'trigger' : trigger, 'trigger_time' : trigger_time, 'trigger_index' : trigger_index})
trigger_dict.to_csv(download_destination, sep=',')
else:
trigger_dict = pd.DataFrame({'trigger' : trigger, 'trigger_time' : trigger_time, 'trigger_index' : trigger_index})
trigger_dict.to_csv(download_destination, sep=',')
def file_process(trig_folder_loc, patient_id):
# read all the files in the given folder
files = natsorted(glob.glob(trig_folder_loc+"*.chn"), alg=ns.IGNORECASE)
data_list = []
for i in range(len(files)):
data_list.append(read_file(files[i]))
main(combining_chunks(data_list), patient_id)
def read_file(filepath):
with open(filepath, 'rb') as data:
data.readline()
num = np.fromfile(data, dtype='<i2').astype('int64') # read the data into numpy
data_num = np.reshape(num,(1,len(num)))
return(data_num)
def combining_chunks(file_list):
concate = np.concatenate(file_list, axis=1)
return(concate)
def mne_array(data, ch_name1):
ch_types = ['ecog']
ch_names = [ch_name1]
info = mne.create_info(ch_names=ch_names, sfreq=2000, ch_types=ch_types)
raw = mne.io.RawArray(data, info, first_samp=0, verbose=None).load_data()
plot(raw, 'Auto scaled data from original arrays')
def plot(raw, title):
scalings = {'ecog': 1}
color = {'ecog': 'g'}
print("plotting ... ")
raw.plot(n_channels=1, duration=10.0, color = color, scalings='auto', show=True, block=True)
if __name__ == '__main__':
# setting up parse options
parser = argparse.ArgumentParser(description='Get trigger time and trigger values for binary signals ',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('trigFolderLocation', help='Folder location with trigger channel files')
parser.add_argument('--patientID', help='Patient ID for naming the CSV file; eg: year')
args = parser.parse_args()
trig_folder_loc = args.trigFolderLocation
patient_id = str(args.patientID)
# checking if all the arguments are given
if not trig_folder_loc and not patient_id:
sys.exit('Must provide folder location with trigger channels (chunk files) and patient ID')
else:
file_process(trig_folder_loc, patient_id)