-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathedfWriter.py
More file actions
executable file
·146 lines (121 loc) · 5.24 KB
/
edfWriter.py
File metadata and controls
executable file
·146 lines (121 loc) · 5.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
from __future__ import print_function
import numpy as np
import glob
import json
import argparse
import writeToFile
from natsort import natsorted, ns
# initializing list for each relevant header information
channel_files = []
channel_name_list = []
phyDimension_list = []
phyMinimum_list = []
phyMaximum_list = []
digMinimum_list = []
digMaximum_list = []
sampsPerRecord_list = []
def main(chunk_folder, edf_file, num_chunks):
# read all the files in the folder
files = natsorted(glob.glob(chunk_folder+"*.chn"), alg=ns.IGNORECASE)
num_channels = int(len(files)/num_chunks) # number of channels to be included in the edf file
# pass the files to process (retreive header and data) assuming that only 2 chunk files for each signals are used to create a new edf file
for i in range(0,len(files),num_chunks):
filelist = []
for j in range(i,i+num_chunks):
filelist.append(files[j])
process_files(filelist, num_channels, edf_file, num_chunks)
# read the information from the files
def process_files(filelist, num_channels, edf_file, num_chunks):
# filelist = [filepath1, filepath2]
filepath = filelist[-1]
# check to see if both chunk files for same signal are in the same filelist
print(filelist)
with open(filepath, 'rb') as open_data:
data = open_data.read()
y = str(data, 'utf-8', 'ignore')
readline = y.splitlines()[0]
text = json.loads(readline)
sigName = text["sigLabel"]
phyDim = text["phyDimension"]
phyMax = text["phyMaximum"]
phyMin = text["phyMinimum"]
digMax = text["digMaximum"]
digMin = text["digMinimum"]
numSamps = text["sampsPerRecord"]
recsRemaining = int(text["recsRemaining"])
recsPerChunk = int(text["recsPerChunk"])
recDur = float(text["recDur"])
chunkDuration = float(text["chunkDuration"])
# add header information to lists
channel_name_list.append(sigName)
phyDimension_list.append(phyDim)
phyMaximum_list.append(phyMax)
phyMinimum_list.append(phyMin)
digMaximum_list.append(digMax)
digMinimum_list.append(digMin)
sampsPerRecord_list.append(numSamps)
# number of data records
if(recsRemaining >= recsPerChunk):
numRecs = int((chunkDuration/recDur)*num_chunks)
else:
if(num_chunks == 1):
numRecs = recsRemaining
else:
numRecs = recsRemaining+recsPerChunk
# header information related to data record
recInfoList = [numRecs, recDur]
# include all the lists for header information in one big list
header_output = [channel_name_list, phyDimension_list, phyMinimum_list, phyMaximum_list, digMinimum_list, digMaximum_list, sampsPerRecord_list]
# reading the data (content) of each file pair
file_data = read_data(filelist)
channel_files.append(combining_chunks(file_data))
# pass the file information and data if and only if all the files are read
if(len(channel_files) == num_channels):
arrange_data(num_channels, channel_files, header_output, recInfoList, edf_file)
# read the main data (content) for each chunk files
def read_data(filepath_list):
data_array_list = []
for i in range(len(filepath_list)):
with open(filepath_list[i], 'rb') as open_data:
open_data.readline()
data_array= np.fromfile((open_data), dtype='<i2').astype('int16')# astype('int64')
data_array_list.insert(i, np.reshape(data_array,(1,len(data_array))))
return(data_array_list)
# combining data array for two chunk files
def combining_chunks(data_array_list):
concate = np.concatenate(data_array_list, axis=1)
return(concate)
# arrange main content data (numpy array)
def arrange_data(num_channels, data_num_list, header, recHeader, edf_file):
# updating the data array list as a stack of array to write as edf
updated_list = []
for i in range(len(data_num_list)):
updated_list.insert(i, np.array(data_num_list[i]))
data_num = tuple(updated_list)
final_array = np.vstack(data_num)
print("converting to edf - passing number of channels, data array and header information to writeFile")
writeToFile.write(num_channels, final_array, header, recHeader, edf_file)
# main
if __name__ == '__main__':
# setting up parse options
parser = argparse.ArgumentParser(description='Combine 2 chunk files for each signals to form a edf file. ' +
'Must specify location for folder with chunk files and the final edf file name and destination',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('edfFileLocation', help='File name and location for final edf file , eg:(directory/filename.edf)')
parser.add_argument('chunkFolderLocation', help='Location of folder with chunk files to convert, eg:(directory)')
parser.add_argument('--numChunks', help='Number of chunks for each channel')
args = parser.parse_args()
edf_file = args.edfFileLocation
chunk_folder = args.chunkFolderLocation
num_chunks = int(args.numChunks)
# checking if all the arguments are given
if not chunk_folder and not edf_file:
sys.exit('Must provide input folder location for chunk files and an output file location.')
elif not chunk_folder:
sys.exit('Must provide input chunk folder location.')
elif not edf_file:
sys.exit('Must provide output edf file location.')
elif not num_chunks:
sys.exit('Must provide number of chunks in each channel')
else:
main(chunk_folder, edf_file, num_chunks)