diff --git a/nibabel/streamlines/tck.py b/nibabel/streamlines/tck.py index ff12bc2322..823a88b5cf 100644 --- a/nibabel/streamlines/tck.py +++ b/nibabel/streamlines/tck.py @@ -91,10 +91,10 @@ def is_correct_format(cls, fileobj): otherwise returns False. """ with Opener(fileobj) as f: - magic_number = asstr(f.fobj.readline()) - f.seek(-len(magic_number), os.SEEK_CUR) + magic_number = f.read(len(cls.MAGIC_NUMBER)) + f.seek(-len(cls.MAGIC_NUMBER), os.SEEK_CUR) - return magic_number.strip() == cls.MAGIC_NUMBER + return asstr(magic_number) == cls.MAGIC_NUMBER @classmethod def create_empty_header(cls): @@ -287,8 +287,8 @@ def _write_header(fileobj, header): fileobj.write(asbytes(str(new_offset) + "\n")) fileobj.write(asbytes("END\n")) - @staticmethod - def _read_header(fileobj): + @classmethod + def _read_header(cls, fileobj): """ Reads a TCK header from a file. Parameters @@ -304,23 +304,56 @@ def _read_header(fileobj): header : dict Metadata associated with this tractogram file. """ - # Record start position if this is a file-like object - start_position = fileobj.tell() if hasattr(fileobj, 'tell') else None + + # Build header dictionary from the buffer + hdr = {} + offset_data = 0 with Opener(fileobj) as f: + + # Record start position + start_position = f.tell() + + # Make sure we are at the beginning of the file + f.seek(0, os.SEEK_SET) + # Read magic number - magic_number = f.fobj.readline().strip() + magic_number = f.read(len(cls.MAGIC_NUMBER)) + + if asstr(magic_number) != cls.MAGIC_NUMBER: + raise HeaderError(f"Invalid magic number: {magic_number}") + + hdr[Field.MAGIC_NUMBER] = magic_number - # Read all key-value pairs contained in the header. - buf = asstr(f.fobj.readline()) - while not buf.rstrip().endswith("END"): - buf += asstr(f.fobj.readline()) + f.seek(1, os.SEEK_CUR) # Skip \n + + found_end = False + + # Read all key-value pairs contained in the header, stop at EOF + for n_line, line in enumerate(f, 1): + line = asstr(line).strip() + + if not line: # Skip empty lines + continue + + if line == "END": # End of the header + found_end = True + break + + if ':' not in line: # Invalid header line + raise HeaderError(f"Invalid header (line {n_line}): {line}") + + key, value = line.split(":", 1) + hdr[key.strip()] = value.strip() + + if not found_end: + raise HeaderError("Missing END in the header.") offset_data = f.tell() - # Build header dictionary from the buffer. - hdr = dict(item.split(': ') for item in buf.rstrip().split('\n')[:-1]) - hdr[Field.MAGIC_NUMBER] = magic_number + # Set the file position where it was, in case it was previously open + if start_position is not None: + f.seek(start_position, os.SEEK_SET) # Check integrity of TCK header. if 'datatype' not in hdr: @@ -352,10 +385,6 @@ def _read_header(fileobj): # Keep the file position where the data begin. hdr['_offset_data'] = int(hdr['file'].split()[1]) - # Set the file position where it was, if it was previously open. - if start_position is not None: - fileobj.seek(start_position, os.SEEK_SET) - return hdr @classmethod diff --git a/nibabel/streamlines/tests/test_tck.py b/nibabel/streamlines/tests/test_tck.py index e2c6cf119a..1cdda4b44e 100644 --- a/nibabel/streamlines/tests/test_tck.py +++ b/nibabel/streamlines/tests/test_tck.py @@ -25,6 +25,9 @@ def setup_module(): global DATA DATA['empty_tck_fname'] = pjoin(data_path, "empty.tck") + DATA['no_magic_number_tck_fname'] = pjoin(data_path, "no_magic_number.tck") + DATA['no_header_end_tck_fname'] = pjoin(data_path, "no_header_end.tck") + DATA['no_header_end_eof_tck_fname'] = pjoin(data_path, "no_header_end_eof.tck") # simple.tck contains only streamlines DATA['simple_tck_fname'] = pjoin(data_path, "simple.tck") DATA['simple_tck_big_endian_fname'] = pjoin(data_path, @@ -50,6 +53,30 @@ def test_load_empty_file(self): with pytest.warns(Warning) if lazy_load else error_warnings(): assert_tractogram_equal(tck.tractogram, DATA['empty_tractogram']) + def test_load_no_magic_number_file(self): + for lazy_load in [False, True]: + with pytest.raises(HeaderError): + TckFile.load( + DATA['no_magic_number_tck_fname'], + lazy_load=lazy_load + ) + + def test_load_no_header_end_file(self): + for lazy_load in [False, True]: + with pytest.raises(HeaderError): + TckFile.load( + DATA['no_header_end_tck_fname'], + lazy_load=lazy_load + ) + + def test_load_no_header_end_eof_file(self): + for lazy_load in [False, True]: + with pytest.raises(HeaderError): + TckFile.load( + DATA['no_header_end_eof_tck_fname'], + lazy_load=lazy_load + ) + def test_load_simple_file(self): for lazy_load in [False, True]: tck = TckFile.load(DATA['simple_tck_fname'], lazy_load=lazy_load) diff --git a/nibabel/tests/data/no_header_end.tck b/nibabel/tests/data/no_header_end.tck new file mode 100644 index 0000000000..2304f41921 Binary files /dev/null and b/nibabel/tests/data/no_header_end.tck differ diff --git a/nibabel/tests/data/no_header_end_eof.tck b/nibabel/tests/data/no_header_end_eof.tck new file mode 100644 index 0000000000..ceb79ebfb7 --- /dev/null +++ b/nibabel/tests/data/no_header_end_eof.tck @@ -0,0 +1,4 @@ +mrtrix tracks +count: 0000000000 +datatype: Float32LE +file: . 67 \ No newline at end of file diff --git a/nibabel/tests/data/no_magic_number.tck b/nibabel/tests/data/no_magic_number.tck new file mode 100644 index 0000000000..3a4fe7de72 Binary files /dev/null and b/nibabel/tests/data/no_magic_number.tck differ