Skip to content

Commit 5a179ed

Browse files
committed
Add a replacement blosc.py
Unfortunately, this wasn't immediately put under revision control and has undergone a number of changes. Original versions were largely lifted out of blosc.pyx.
1 parent dfd20f5 commit 5a179ed

File tree

1 file changed

+213
-0
lines changed

1 file changed

+213
-0
lines changed

numcodecs/blosc.py

Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
# native python wrapper around blosc
2+
# cython: embedsignature=True
3+
# cython: profile=False
4+
# cython: linetrace=False
5+
# cython: binding=False
6+
# cython: language_level=3
7+
import threading
8+
import multiprocessing
9+
import os
10+
import blosc
11+
12+
13+
from .compat import ensure_contiguous_ndarray
14+
from .abc import Codec
15+
16+
17+
# automatic shuffle
18+
AUTOSHUFFLE = -1
19+
# automatic block size - let blosc decide
20+
AUTOBLOCKS = 0
21+
22+
# synchronization
23+
try:
24+
mutex = multiprocessing.Lock()
25+
except OSError:
26+
mutex = None
27+
28+
# store ID of process that first loads the module, so we can detect a fork later
29+
_importer_pid = os.getpid()
30+
31+
32+
def init():
33+
"""Initialize the Blosc library environment."""
34+
blosc_init()
35+
36+
37+
def destroy():
38+
"""Destroy the Blosc library environment."""
39+
blosc_destroy()
40+
41+
42+
def compname_to_compcode(cname):
43+
"""Return the compressor code associated with the compressor name. If the compressor
44+
name is not recognized, or there is not support for it in this build, -1 is returned
45+
instead."""
46+
if isinstance(cname, str):
47+
cname = cname.encode('ascii')
48+
return blosc_compname_to_compcode(cname)
49+
50+
51+
def list_compressors():
52+
"""Get a list of compressors supported in the current build."""
53+
s = blosc_list_compressors()
54+
s = s.decode('ascii')
55+
return s.split(',')
56+
57+
58+
def get_nthreads():
59+
"""Get the number of threads that Blosc uses internally for compression and
60+
decompression."""
61+
return blosc_get_nthreads()
62+
63+
64+
def err_bad_cname(cname):
65+
raise ValueError('bad compressor or compressor not supported: %r; expected one of '
66+
'%s' % (cname, list_compressors()))
67+
68+
69+
# set the value of this variable to True or False to override the
70+
# default adaptive behaviour
71+
use_threads = None
72+
73+
74+
def _get_use_threads():
75+
global use_threads
76+
proc = multiprocessing.current_process()
77+
78+
# check if locks are available, and if not no threads
79+
if not mutex:
80+
return False
81+
82+
# check for fork
83+
if proc.pid != _importer_pid:
84+
# If this module has been imported in the parent process, and the current process
85+
# is a fork, attempting to use blosc in multi-threaded mode will cause a
86+
# program hang, so we force use of blosc ctx functions, i.e., no threads.
87+
return False
88+
89+
if use_threads in [True, False]:
90+
# user has manually overridden the default behaviour
91+
_use_threads = use_threads
92+
93+
else:
94+
# Adaptive behaviour: allow blosc to use threads if it is being called from the
95+
# main Python thread in the main Python process, inferring that it is being run
96+
# from within a single-threaded, single-process program; otherwise do not allow
97+
# blosc to use threads, inferring it is being run from within a multi-threaded
98+
# program or multi-process program
99+
100+
if proc.name != 'MainProcess':
101+
_use_threads = False
102+
elif hasattr(threading, 'main_thread'):
103+
_use_threads = (threading.main_thread() == threading.current_thread())
104+
else:
105+
_use_threads = threading.current_thread().name == 'MainThread'
106+
107+
return _use_threads
108+
109+
110+
_shuffle_repr = ['AUTOSHUFFLE', 'NOSHUFFLE', 'SHUFFLE', 'BITSHUFFLE']
111+
112+
113+
class Blosc(Codec):
114+
"""Codec providing compression using the Blosc meta-compressor.
115+
116+
Parameters
117+
----------
118+
cname : string, optional
119+
A string naming one of the compression algorithms available within blosc, e.g.,
120+
'zstd', 'blosclz', 'lz4', 'lz4hc', 'zlib' or 'snappy'.
121+
clevel : integer, optional
122+
An integer between 0 and 9 specifying the compression level.
123+
shuffle : integer, optional
124+
Either NOSHUFFLE (0), SHUFFLE (1), BITSHUFFLE (2) or AUTOSHUFFLE (-1). If -1
125+
(default), bit-shuffle will be used for buffers with itemsize 1,
126+
and byte-shuffle will be used otherwise.
127+
blocksize : int
128+
The requested size of the compressed blocks. If 0 (default), an automatic
129+
blocksize will be used.
130+
131+
See Also
132+
--------
133+
numcodecs.zstd.Zstd, numcodecs.lz4.LZ4
134+
135+
"""
136+
137+
codec_id = 'blosc'
138+
NOSHUFFLE = 0
139+
SHUFFLE = 1
140+
BITSHUFFLE = 2
141+
AUTOSHUFFLE = -1
142+
max_buffer_size = 2**31 - 1
143+
144+
def __init__(self, cname='lz4', clevel=5, shuffle=SHUFFLE, blocksize=AUTOBLOCKS):
145+
self.cname = cname
146+
if isinstance(cname, str):
147+
self._cname_bytes = cname.encode('ascii')
148+
else:
149+
self._cname_bytes = cname
150+
self.clevel = clevel
151+
self.shuffle = shuffle
152+
if shuffle == Blosc.AUTOSHUFFLE:
153+
# FIXME: where to find itemsize?
154+
self.shuffle = Blosc.NOSHUFFLE
155+
self.blocksize = blocksize
156+
157+
def encode(self, buf):
158+
buf = ensure_contiguous_ndarray(buf, self.max_buffer_size)
159+
return blosc.compress(
160+
buf,
161+
# typesize=self.blocksize, FIXME
162+
clevel=self.clevel,
163+
shuffle=self.shuffle,
164+
cname=self.cname,
165+
)
166+
167+
def decode(self, buf): # FIXME , out=None):
168+
buf = ensure_contiguous_ndarray(buf, self.max_buffer_size)
169+
return blosc.decompress(buf)
170+
171+
def decode_partial(self, buf, start, nitems, out=None):
172+
'''**Experimental**'''
173+
buf = ensure_contiguous_ndarray(buf, self.max_buffer_size)
174+
# return blosc.decompress_partial(buf, start, nitems, dest=out)
175+
raise Exception("FIXME")
176+
177+
178+
def __repr__(self):
179+
r = '%s(cname=%r, clevel=%r, shuffle=%s, blocksize=%s)' % \
180+
(type(self).__name__,
181+
self.cname,
182+
self.clevel,
183+
_shuffle_repr[self.shuffle + 1],
184+
self.blocksize)
185+
return r
186+
187+
def compress(source, cname, clevel: int, shuffle:int = SHUFFLE,
188+
blocksize:int = AUTOBLOCKS):
189+
"""Compress data.
190+
191+
Parameters
192+
----------
193+
source : bytes-like
194+
Data to be compressed. Can be any object supporting the buffer
195+
protocol.
196+
cname : bytes
197+
Name of compression library to use.
198+
clevel : int
199+
Compression level.
200+
shuffle : int
201+
Either NOSHUFFLE (0), SHUFFLE (1), BITSHUFFLE (2) or AUTOSHUFFLE (-1). If -1
202+
(default), bit-shuffle will be used for buffers with itemsize 1,
203+
and byte-shuffle will be used otherwise.
204+
blocksize : int
205+
The requested size of the compressed blocks. If 0, an automatic blocksize will
206+
be used.
207+
208+
Returns
209+
-------
210+
dest : bytes
211+
Compressed data.
212+
213+
"""

0 commit comments

Comments
 (0)