|
1 | 1 |
|
2 | 2 | from io import BytesIO
|
3 | 3 | from math import ceil
|
| 4 | +import os |
4 | 5 | from pathlib import Path
|
| 6 | +from typing import BinaryIO, Tuple, Union, TYPE_CHECKING, Any, Dict, cast |
5 | 7 | import warnings
|
6 | 8 |
|
| 9 | +import numpy as np |
| 10 | + |
7 | 11 | import _openjpeg
|
8 | 12 |
|
9 | 13 |
|
10 |
| -def _get_format(stream): |
| 14 | +if TYPE_CHECKING: # pragma: no cover |
| 15 | + from pydicom.dataset import Dataset |
| 16 | + |
| 17 | + |
| 18 | +def _get_format(stream: BinaryIO) -> int: |
11 | 19 | """Return the JPEG 2000 format for the encoded data in `stream`.
|
12 | 20 |
|
13 | 21 | Parameters
|
14 | 22 | ----------
|
15 |
| - stream : bytes or file-like |
| 23 | + stream : file-like |
16 | 24 | A Python object containing the encoded JPEG 2000 data. If not
|
17 | 25 | :class:`bytes` then the object must have ``tell()``, ``seek()`` and
|
18 | 26 | ``read()`` methods.
|
@@ -56,13 +64,17 @@ def _get_format(stream):
|
56 | 64 | raise ValueError("No matching JPEG 2000 format found")
|
57 | 65 |
|
58 | 66 |
|
59 |
| -def get_openjpeg_version(): |
| 67 | +def get_openjpeg_version() -> Tuple[int, ...]: |
60 | 68 | """Return the openjpeg version as tuple of int."""
|
61 | 69 | version = _openjpeg.get_version().decode("ascii").split(".")
|
62 | 70 | return tuple([int(ii) for ii in version])
|
63 | 71 |
|
64 | 72 |
|
65 |
| -def decode(stream, j2k_format=None, reshape=True): |
| 73 | +def decode( |
| 74 | + stream: Union[str, os.PathLike, bytes, bytearray, BinaryIO], |
| 75 | + j2k_format: Union[int, None] = None, |
| 76 | + reshape: bool = True, |
| 77 | +) -> np.ndarray: |
66 | 78 | """Return the decoded JPEG2000 data from `stream` as a
|
67 | 79 | :class:`numpy.ndarray`.
|
68 | 80 |
|
@@ -98,42 +110,53 @@ def decode(stream, j2k_format=None, reshape=True):
|
98 | 110 | """
|
99 | 111 | if isinstance(stream, (str, Path)):
|
100 | 112 | with open(stream, 'rb') as f:
|
101 |
| - stream = f.read() |
102 |
| - |
103 |
| - if isinstance(stream, (bytes, bytearray)): |
104 |
| - stream = BytesIO(stream) |
105 |
| - |
106 |
| - required_methods = ["read", "tell", "seek"] |
107 |
| - if not all([hasattr(stream, meth) for meth in required_methods]): |
108 |
| - raise TypeError( |
109 |
| - "The Python object containing the encoded JPEG 2000 data must " |
110 |
| - "either be bytes or have read(), tell() and seek() methods." |
111 |
| - ) |
| 113 | + buffer: BinaryIO = BytesIO(f.read()) |
| 114 | + buffer.seek(0) |
| 115 | + elif isinstance(stream, (bytes, bytearray)): |
| 116 | + buffer = BytesIO(stream) |
| 117 | + else: |
| 118 | + # BinaryIO |
| 119 | + required_methods = ["read", "tell", "seek"] |
| 120 | + if not all([hasattr(stream, meth) for meth in required_methods]): |
| 121 | + raise TypeError( |
| 122 | + "The Python object containing the encoded JPEG 2000 data must " |
| 123 | + "either be bytes or have read(), tell() and seek() methods." |
| 124 | + ) |
| 125 | + buffer = stream |
112 | 126 |
|
113 | 127 | if j2k_format is None:
|
114 |
| - j2k_format = _get_format(stream) |
| 128 | + j2k_format = _get_format(buffer) |
115 | 129 |
|
116 | 130 | if j2k_format not in [0, 1, 2]:
|
117 | 131 | raise ValueError(f"Unsupported 'j2k_format' value: {j2k_format}")
|
118 | 132 |
|
119 |
| - arr = _openjpeg.decode(stream, j2k_format) |
| 133 | + arr = cast(np.ndarray, _openjpeg.decode(buffer, j2k_format)) |
120 | 134 | if not reshape:
|
121 | 135 | return arr
|
122 | 136 |
|
123 |
| - meta = get_parameters(stream, j2k_format) |
124 |
| - bpp = ceil(meta["precision"] / 8) |
| 137 | + meta = get_parameters(buffer, j2k_format) |
| 138 | + precision = cast(int, meta["precision"]) |
| 139 | + rows = cast(int, meta["rows"]) |
| 140 | + columns = cast(int, meta["columns"]) |
| 141 | + pixels_per_sample = cast(int, meta["nr_components"]) |
| 142 | + pixel_representation = cast(bool, meta["is_signed"]) |
| 143 | + bpp = ceil(precision / 8) |
125 | 144 |
|
126 |
| - dtype = f"uint{8 * bpp}" if not meta["is_signed"] else f"int{8 * bpp}" |
| 145 | + dtype = f"u{bpp}" if not pixel_representation else f"i{bpp}" |
127 | 146 | arr = arr.view(dtype)
|
128 | 147 |
|
129 |
| - shape = [meta["rows"], meta["columns"]] |
130 |
| - if meta["nr_components"] > 1: |
131 |
| - shape.append(meta["nr_components"]) |
| 148 | + shape = [rows, columns] |
| 149 | + if pixels_per_sample> 1: |
| 150 | + shape.append(pixels_per_sample) |
132 | 151 |
|
133 | 152 | return arr.reshape(*shape)
|
134 | 153 |
|
135 | 154 |
|
136 |
| -def decode_pixel_data(stream, ds=None, **kwargs): |
| 155 | +def decode_pixel_data( |
| 156 | + stream: Union[bytes, bytearray, BinaryIO], |
| 157 | + ds: "Dataset" = None, |
| 158 | + **kwargs: Any |
| 159 | +) -> np.ndarray: |
137 | 160 | """Return the decoded JPEG 2000 data as a :class:`numpy.ndarray`.
|
138 | 161 |
|
139 | 162 | Intended for use with *pydicom* ``Dataset`` objects.
|
@@ -189,7 +212,7 @@ def decode_pixel_data(stream, ds=None, **kwargs):
|
189 | 212 | )
|
190 | 213 |
|
191 | 214 | if not ds and no_kwargs:
|
192 |
| - return arr |
| 215 | + return cast(np.ndarray, arr) |
193 | 216 |
|
194 | 217 | samples_per_pixel = ds.get("SamplesPerPixel", samples_per_pixel)
|
195 | 218 | bits_stored = ds.get("BitsStored", bits_stored)
|
@@ -224,10 +247,13 @@ def decode_pixel_data(stream, ds=None, **kwargs):
|
224 | 247 | f"JPEG 2000 data '{val}'"
|
225 | 248 | )
|
226 | 249 |
|
227 |
| - return arr |
| 250 | + return cast(np.ndarray, arr) |
228 | 251 |
|
229 | 252 |
|
230 |
| -def get_parameters(stream, j2k_format=None): |
| 253 | +def get_parameters( |
| 254 | + stream: Union[str, os.PathLike, bytes, bytearray, BinaryIO], |
| 255 | + j2k_format: Union[int, None] = None, |
| 256 | +) -> Dict[str, Union[int, str, bool]]: |
231 | 257 | """Return a :class:`dict` containing the JPEG2000 image parameters.
|
232 | 258 |
|
233 | 259 | .. versionchanged:: 1.1
|
@@ -263,22 +289,27 @@ def get_parameters(stream, j2k_format=None):
|
263 | 289 | """
|
264 | 290 | if isinstance(stream, (str, Path)):
|
265 | 291 | with open(stream, 'rb') as f:
|
266 |
| - stream = f.read() |
267 |
| - |
268 |
| - if isinstance(stream, (bytes, bytearray)): |
269 |
| - stream = BytesIO(stream) |
270 |
| - |
271 |
| - required_methods = ["read", "tell", "seek"] |
272 |
| - if not all([hasattr(stream, func) for func in required_methods]): |
273 |
| - raise TypeError( |
274 |
| - "The Python object containing the encoded JPEG 2000 data must " |
275 |
| - "either be bytes or have read(), tell() and seek() methods." |
276 |
| - ) |
| 292 | + buffer: BinaryIO = BytesIO(f.read()) |
| 293 | + buffer.seek(0) |
| 294 | + elif isinstance(stream, (bytes, bytearray)): |
| 295 | + buffer = BytesIO(stream) |
| 296 | + else: |
| 297 | + # BinaryIO |
| 298 | + required_methods = ["read", "tell", "seek"] |
| 299 | + if not all([hasattr(stream, meth) for meth in required_methods]): |
| 300 | + raise TypeError( |
| 301 | + "The Python object containing the encoded JPEG 2000 data must " |
| 302 | + "either be bytes or have read(), tell() and seek() methods." |
| 303 | + ) |
| 304 | + buffer = stream |
277 | 305 |
|
278 | 306 | if j2k_format is None:
|
279 |
| - j2k_format = _get_format(stream) |
| 307 | + j2k_format = _get_format(buffer) |
280 | 308 |
|
281 | 309 | if j2k_format not in [0, 1, 2]:
|
282 | 310 | raise ValueError(f"Unsupported 'j2k_format' value: {j2k_format}")
|
283 | 311 |
|
284 |
| - return _openjpeg.get_parameters(stream, j2k_format) |
| 312 | + return cast( |
| 313 | + Dict[str, Union[str, int, bool]], |
| 314 | + _openjpeg.get_parameters(buffer, j2k_format), |
| 315 | + ) |
0 commit comments