Skip to content

Commit 3e9d2c9

Browse files
author
begoldsm
committed
Update to version 0.0.6
This includes: Performance fixes for upload and download of large folders Authentication fixes for token rerfesh using lib.auth()
1 parent 9e35e75 commit 3e9d2c9

File tree

83 files changed

+3492
-253262
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

83 files changed

+3492
-253262
lines changed

HISTORY.rst

+4
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22
33
Release History
44
===============
5+
0.0.6 (2017-03-15)
6+
------------------
7+
* Fix an issue with path caching that should drastically improve performance for download
8+
59
0.0.5 (2017-03-01)
610
------------------
711
* Fix for downloader to ensure there is access to the source path before creating destination files
+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<?xml version="1.0" encoding="utf-8"?>
22
<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
33
<PropertyGroup>
4-
<ProjectView>ShowAllFiles</ProjectView>
4+
<ProjectView>ProjectFiles</ProjectView>
55
</PropertyGroup>
66
</Project>

azure/datalake/store/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
# license information.
77
# --------------------------------------------------------------------------
88

9-
__version__ = "0.0.5"
9+
__version__ = "0.0.6"
1010

1111
from .core import AzureDLFileSystem
1212
from .multithread import ADLDownloader

azure/datalake/store/core.py

+36-18
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,8 @@ def _ls(self, path):
111111
""" List files at given path """
112112
path = AzureDLPath(path).trim()
113113
key = path.as_posix()
114-
if path not in self.dirs:
115-
out = self.azure.call('LISTSTATUS', path.as_posix())
114+
if key not in self.dirs:
115+
out = self.azure.call('LISTSTATUS', key)
116116
self.dirs[key] = out['FileStatuses']['FileStatus']
117117
for f in self.dirs[key]:
118118
f['name'] = (path / f['pathSuffix']).as_posix()
@@ -137,10 +137,12 @@ def info(self, path):
137137
"""
138138
path = AzureDLPath(path).trim()
139139
root = path.parent
140-
myfile = [f for f in self._ls(root) if f['name'] == path.as_posix()]
141-
if len(myfile) == 1:
142-
return myfile[0]
143-
raise FileNotFoundError(path)
140+
path_as_posix = path.as_posix()
141+
for f in self._ls(root):
142+
if f['name'] == path_as_posix:
143+
return f
144+
else:
145+
raise FileNotFoundError(path)
144146

145147
def _walk(self, path):
146148
fi = list(self._ls(path))
@@ -149,21 +151,22 @@ def _walk(self, path):
149151
fi.extend(self._ls(apath['name']))
150152
return [f for f in fi if f['type'] == 'FILE']
151153

152-
def walk(self, path=''):
154+
def walk(self, path='', details=False):
153155
""" Get all files below given path
154156
"""
155-
return [f['name'] for f in self._walk(path)]
157+
return [f if details else f['name'] for f in self._walk(path)]
156158

157-
def glob(self, path):
159+
def glob(self, path, details=False):
158160
"""
159161
Find files (not directories) by glob-matching.
160162
"""
161163
path = AzureDLPath(path).trim()
164+
path_as_posix = path.as_posix()
162165
prefix = path.globless_prefix
163-
allfiles = self.walk(prefix)
166+
allfiles = self.walk(prefix, details)
164167
if prefix == path:
165168
return allfiles
166-
return [f for f in allfiles if AzureDLPath(f).match(path.as_posix())]
169+
return [f for f in allfiles if AzureDLPath(f['name'] if details else f).match(path_as_posix)]
167170

168171
def du(self, path, total=False, deep=False):
169172
""" Bytes in keys at path """
@@ -233,8 +236,9 @@ def set_expiry(self, path, expiry_option, expire_time=None):
233236
parms['expireTime'] = int(expire_time)
234237

235238
self.azure.call('SETEXPIRY', path.as_posix(), is_extended=True, **parms)
239+
self.invalidate_cache(path.as_posix())
236240

237-
def _acl_call(self, action, path, acl_spec=None):
241+
def _acl_call(self, action, path, acl_spec=None, invalidate_cache=False):
238242
"""
239243
Helper method for ACL calls to reduce code repetition
240244
@@ -249,13 +253,21 @@ def _acl_call(self, action, path, acl_spec=None):
249253
'[default:]user|group|other:[entity id or UPN]:r|-w|-x|-,[default:]user|group|other:[entity id or UPN]:r|-w|-x|-,...'
250254
251255
Note that for remove acl entries the permission (rwx) portion is not required.
256+
invalidate_cache: bool
257+
optionally indicates that the cache of files should be invalidated after this operation
258+
This should always be done for set and remove operations, since the state of the file or folder has changed.
252259
"""
253260
parms = {}
254261
path = AzureDLPath(path).trim()
262+
posix_path = path.as_posix()
255263
if acl_spec:
256264
parms['aclSpec'] = acl_spec
257265

258-
return self.azure.call(action, path.as_posix(), **parms)
266+
to_return = self.azure.call(action, posix_path, **parms)
267+
if invalidate_cache:
268+
self.invalidate_cache(posix_path)
269+
270+
return to_return
259271

260272
def set_acl(self, path, acl_spec):
261273
"""
@@ -272,7 +284,8 @@ def set_acl(self, path, acl_spec):
272284
'[default:]user|group|other:[entity id or UPN]:r|-w|-x|-,[default:]user|group|other:[entity id or UPN]:r|-w|-x|-,...'
273285
"""
274286

275-
self._acl_call('SETACL', path, acl_spec)
287+
self._acl_call('SETACL', path, acl_spec, invalidate_cache=True)
288+
276289

277290
def modify_acl_entries(self, path, acl_spec):
278291
"""
@@ -290,7 +303,8 @@ def modify_acl_entries(self, path, acl_spec):
290303
The ACL specification to use in modifying the ACL at the path in the format
291304
'[default:]user|group|other:[entity id or UPN]:r|-w|-x|-,[default:]user|group|other:[entity id or UPN]:r|-w|-x|-,...'
292305
"""
293-
self._acl_call('MODIFYACLENTRIES', path, acl_spec)
306+
self._acl_call('MODIFYACLENTRIES', path, acl_spec, invalidate_cache=True)
307+
294308

295309
def remove_acl_entries(self, path, acl_spec):
296310
"""
@@ -309,7 +323,8 @@ def remove_acl_entries(self, path, acl_spec):
309323
The ACL specification to remove from the ACL at the path in the format (note that the permission portion is missing)
310324
'[default:]user|group|other:[entity id or UPN],[default:]user|group|other:[entity id or UPN],...'
311325
"""
312-
self._acl_call('REMOVEACLENTRIES', path, acl_spec)
326+
self._acl_call('REMOVEACLENTRIES', path, acl_spec, invalidate_cache=True)
327+
313328

314329
def get_acl_status(self, path):
315330
"""
@@ -334,7 +349,8 @@ def remove_acl(self, path):
334349
path: str
335350
Location to remove the ACL.
336351
"""
337-
self._acl_call('REMOVEACL', path)
352+
self._acl_call('REMOVEACL', path, invalidate_cache=True)
353+
338354

339355
def remove_default_acl(self, path):
340356
"""
@@ -349,7 +365,8 @@ def remove_default_acl(self, path):
349365
path: str
350366
Location to set the ACL on.
351367
"""
352-
self._acl_call('REMOVEDEFAULTACL', path)
368+
self._acl_call('REMOVEDEFAULTACL', path, invalidate_cache=True)
369+
353370

354371
def chown(self, path, owner=None, group=None):
355372
"""
@@ -375,6 +392,7 @@ def chown(self, path, owner=None, group=None):
375392
parms['group'] = group
376393
path = AzureDLPath(path).trim()
377394
self.azure.call('SETOWNER', path.as_posix(), **parms)
395+
self.invalidate_cache(path.as_posix())
378396

379397
def exists(self, path):
380398
""" Does such a file/directory exist? """

azure/datalake/store/lib.py

+26-29
Original file line numberDiff line numberDiff line change
@@ -46,34 +46,6 @@
4646
# This ensures that no connections are prematurely evicted, which has negative performance implications.
4747
MAX_POOL_CONNECTIONS = 1024
4848

49-
def refresh_token(token, authority=None):
50-
""" Refresh an expired authorization token
51-
52-
Parameters
53-
----------
54-
token : dict
55-
Produced by `auth()` or `refresh_token`.
56-
authority: string
57-
The full URI of the authentication authority to authenticate against (such as https://login.microsoftonline.com/)
58-
"""
59-
if token.get('refresh', False) is False:
60-
raise ValueError("Token cannot be auto-refreshed.")
61-
62-
if not authority:
63-
authority = 'https://login.microsoftonline.com/'
64-
65-
context = adal.AuthenticationContext(authority +
66-
token['tenant'])
67-
out = context.acquire_token_with_refresh_token(token['refresh'],
68-
client_id=token['client'],
69-
resource=token['resource'])
70-
out.update({'access': out['accessToken'], 'refresh': out['refreshToken'],
71-
'time': time.time(), 'tenant': token['tenant'],
72-
'resource': token['resource'], 'client': token['client']})
73-
74-
return DataLakeCredential(out)
75-
76-
7749
def auth(tenant_id=None, username=None,
7850
password=None, client_id=default_client,
7951
client_secret=None, resource=DEFAULT_RESOURCE_ENDPOINT,
@@ -157,12 +129,37 @@ def __init__(self, token):
157129
def signed_session(self):
158130
session = super(DataLakeCredential, self).signed_session()
159131
if time.time() - self.token['time'] > self.token['expiresIn'] - 100:
160-
self.token = refresh_token(self.token)
132+
self.refresh_token()
161133

162134
scheme, token = self.token['tokenType'], self.token['access']
163135
header = "{} {}".format(scheme, token)
164136
session.headers['Authorization'] = header
165137
return session
138+
139+
def refresh_token(self, authority=None):
140+
""" Refresh an expired authorization token
141+
142+
Parameters
143+
----------
144+
authority: string
145+
The full URI of the authentication authority to authenticate against (such as https://login.microsoftonline.com/)
146+
"""
147+
if self.token.get('refresh', False) is False:
148+
raise ValueError("Token cannot be auto-refreshed.")
149+
150+
if not authority:
151+
authority = 'https://login.microsoftonline.com/'
152+
153+
context = adal.AuthenticationContext(authority +
154+
self.token['tenant'])
155+
out = context.acquire_token_with_refresh_token(self.token['refresh'],
156+
client_id=self.token['client'],
157+
resource=self.token['resource'])
158+
out.update({'access': out['accessToken'], 'refresh': out['refreshToken'],
159+
'time': time.time(), 'tenant': self.token['tenant'],
160+
'resource': self.token['resource'], 'client': self.token['client']})
161+
162+
self.token = out
166163

167164
class DatalakeRESTInterface:
168165
""" Call factory for webHDFS endpoints on ADLS

azure/datalake/store/multithread.py

+24-20
Original file line numberDiff line numberDiff line change
@@ -181,26 +181,28 @@ def _setup(self):
181181
""" Create set of parameters to loop over
182182
"""
183183
if "*" not in self.rpath:
184-
rfiles = self.client._adlfs.walk(self.rpath)
184+
rfiles = self.client._adlfs.walk(self.rpath, details=True)
185185
else:
186-
rfiles = self.client._adlfs.glob(self.rpath)
186+
rfiles = self.client._adlfs.glob(self.rpath, details=True)
187187
if len(rfiles) > 1:
188-
prefix = commonprefix(rfiles)
189-
lfiles = [os.path.join(self.lpath, os.path.relpath(f, prefix))
190-
for f in rfiles]
191-
elif rfiles:
188+
prefix = commonprefix([f['name'] for f in rfiles])
189+
file_pairs = [(os.path.join(self.lpath, os.path.relpath(f['name'], prefix)), f)
190+
for f in rfiles]
191+
elif len(rfiles) == 1:
192192
if os.path.exists(self.lpath) and os.path.isdir(self.lpath):
193-
lfiles = [os.path.join(self.lpath, os.path.basename(rfiles[0]))]
193+
file_pairs = [(os.path.join(self.lpath, os.path.basename(rfiles[0]['name'])),
194+
rfiles[0])]
194195
else:
195-
lfiles = [self.lpath]
196+
file_pairs = [(self.lpath, rfiles[0])]
196197
else:
197198
raise ValueError('No files to download')
198-
self.rfiles = rfiles
199-
self.lfiles = lfiles
200199

201-
for lfile, rfile in zip(lfiles, rfiles):
202-
fsize = self.client._adlfs.info(rfile)['length']
203-
self.client.submit(rfile, lfile, fsize)
200+
# this property is used for internal validation
201+
# and should not be referenced directly by public callers
202+
self._file_pairs = file_pairs
203+
204+
for lfile, rfile in file_pairs:
205+
self.client.submit(rfile['name'], lfile, rfile['length'])
204206

205207
def run(self, nthreads=None, monitor=True):
206208
""" Populate transfer queue and execute downloads
@@ -412,22 +414,24 @@ def _setup(self):
412414
lfiles = [self.lpath]
413415
else:
414416
lfiles = glob.glob(self.lpath)
417+
415418
if len(lfiles) > 1:
416419
prefix = commonprefix(lfiles)
417-
rfiles = [self.rpath / AzureDLPath(f).relative_to(prefix)
418-
for f in lfiles]
420+
file_pairs = [(f, self.rpath / AzureDLPath(f).relative_to(prefix)) for f in lfiles]
419421
elif lfiles:
420422
if self.client._adlfs.exists(self.rpath) and \
421423
self.client._adlfs.info(self.rpath)['type'] == "DIRECTORY":
422-
rfiles = [self.rpath / AzureDLPath(lfiles[0]).name]
424+
file_pairs = [(lfiles[0], self.rpath / AzureDLPath(lfiles[0]).name)]
423425
else:
424-
rfiles = [self.rpath]
426+
file_pairs = [(lfiles[0], self.rpath)]
425427
else:
426428
raise ValueError('No files to upload')
427-
self.rfiles = rfiles
428-
self.lfiles = lfiles
429429

430-
for lfile, rfile in zip(lfiles, rfiles):
430+
# this property is used for internal validation
431+
# and should not be referenced directly by public callers
432+
self._file_pairs = file_pairs
433+
434+
for lfile, rfile in file_pairs:
431435
fsize = os.stat(lfile).st_size
432436
self.client.submit(lfile, rfile, fsize)
433437

azure/datalake/store/transfer.py

+21-16
Original file line numberDiff line numberDiff line change
@@ -261,40 +261,45 @@ def submit(self, src, dst, length):
261261
'pending', 'running', 'finished', 'cancelled', 'errored')
262262

263263
# Create unique temporary directory for each file
264-
if self._chunked and self._unique_temporary:
265-
tmpdir = dst.parent / "{}.segments.{}".format(dst.name, self._unique_str)
266-
elif self._chunked:
267-
tmpdir = dst.parent / "{}.segments".format(dst.name)
264+
if self._chunked:
265+
if self._unique_temporary:
266+
filename = "{}.segments.{}".format(dst.name, self._unique_str)
267+
else:
268+
filename = "{}.segments".format(dst.name)
269+
tmpdir = dst.parent/filename
268270
else:
269271
tmpdir = None
270272

271-
offsets = list(range(0, length, self._chunksize))
273+
# TODO: might need xrange support for py2
274+
offsets = range(0, length, self._chunksize)
272275

273276
# in the case of empty files, ensure that the initial offset of 0 is properly added.
274277
if not offsets:
275278
if not length:
276-
offsets.append(0)
279+
offsets = [0]
277280
else:
278281
raise DatalakeIncompleteTransferException('Could not compute offsets for source: {}, with destination: {} and expected length: {}.'.format(src, dst, length))
279282

283+
tmpdir_and_offsets = tmpdir and len(offsets) > 1
280284
for offset in offsets:
281-
if tmpdir and len(offsets) > 1:
285+
if tmpdir_and_offsets:
282286
name = tmpdir / "{}_{}".format(dst.name, offset)
283287
else:
284288
name = dst
285289
cstates[(name, offset)] = 'pending'
286-
self._chunks[(name, offset)] = dict(
287-
parent=(src, dst),
288-
expected=min(length - offset, self._chunksize),
289-
actual=0,
290-
exception=None)
290+
self._chunks[(name, offset)] = {
291+
"parent": (src, dst),
292+
"expected": min(length - offset, self._chunksize),
293+
"actual": 0,
294+
"exception": None}
291295
logger.debug("Submitted %s, byte offset %d", name, offset)
292296

293297
self._fstates[(src, dst)] = 'pending'
294-
self._files[(src, dst)] = dict(
295-
length=length,
296-
cstates=cstates,
297-
exception=None)
298+
self._files[(src, dst)] = {
299+
"length": length,
300+
"cstates": cstates,
301+
"exception": None}
302+
298303

299304
def _submit(self, fn, *args, **kwargs):
300305
kwargs['shutdown_event'] = self._shutdown_event

0 commit comments

Comments
 (0)