Skip to content

Commit

Permalink
Merge pull request #876 from girder/caches
Browse files Browse the repository at this point in the history
Abstract caching and support entrypoints
  • Loading branch information
manthey authored Jun 17, 2022
2 parents 7144a9b + 83dc6f1 commit 58fb8a2
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 60 deletions.
81 changes: 81 additions & 0 deletions large_image/cache_util/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import hashlib
import threading
import time
from typing import Tuple

import cachetools


class BaseCache(cachetools.Cache):
"""Base interface to cachetools.Cache for use with large-image."""

def __init__(self, *args, getsizeof=None, **kwargs):
super().__init__(*args, getsizeof=getsizeof, **kwargs)
self.lastError = {}
self.throttleErrors = 10 # seconds between logging errors

def logError(self, err, func, msg):
"""
Log errors, but throttle them so as not to spam the logs.
:param err: error to log.
:param func: function to use for logging. This is something like
logprint.exception or logger.error.
:param msg: the message to log.
"""
curtime = time.time()
key = (err, func)
if (curtime - self.lastError.get(key, {}).get('time', 0) > self.throttleErrors):
skipped = self.lastError.get(key, {}).get('skipped', 0)
if skipped:
msg += ' (%d similar messages)' % skipped
self.lastError[key] = {'time': curtime, 'skipped': 0}
func(msg)
else:
self.lastError[key]['skipped'] += 1

def __repr__(self):
raise NotImplementedError

def __iter__(self):
raise NotImplementedError

def __len__(self):
raise NotImplementedError

def __contains__(self, key):
raise NotImplementedError

def __delitem__(self, key):
raise NotImplementedError

def _hashKey(self, key):
return hashlib.sha256(key.encode()).hexdigest()

def __getitem__(self, key):
# hashedKey = self._hashKey(key)
raise NotImplementedError

def __setitem__(self, key, value):
# hashedKey = self._hashKey(key)
raise NotImplementedError

@property
def curritems(self):
raise NotImplementedError

@property
def currsize(self):
raise NotImplementedError

@property
def maxsize(self):
raise NotImplementedError

def clear(self):
raise NotImplementedError

@staticmethod
def getCache() -> Tuple['BaseCache', threading.Lock]:
# return cache, cacheLock
raise NotImplementedError
6 changes: 5 additions & 1 deletion large_image/cache_util/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,11 @@ def __new__(metacls, name, bases, namespace, **kwargs): # noqa - N804
cacheName = cls

if LruCacheMetaclass.namedCaches.get(cacheName) is None:
cache, cacheLock = CacheFactory().getCache(maxSize, cacheName=cacheName)
cache, cacheLock = CacheFactory().getCache(
numItems=maxSize,
cacheName=cacheName,
inProcess=True,
)
LruCacheMetaclass.namedCaches[cacheName] = (cache, cacheLock)
config.getConfig('logger').debug(
'Created LRU Cache for %r with %d maximum size' % (cacheName, cache.maxsize))
Expand Down
105 changes: 76 additions & 29 deletions large_image/cache_util/cachefactory.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
# limitations under the License.
#############################################################################


import math
import threading

Expand All @@ -25,13 +24,51 @@
except ImportError:
psutil = None

try:
from importlib.metadata import entry_points
except ImportError:
from importlib_metadata import entry_points

from .. import config
from ..exceptions import TileCacheError

try:
from .memcache import MemCache
except ImportError:
MemCache = None

# DO NOT MANUALLY ADD ANYTHING TO `_availableCaches`
# use entrypoints and let loadCaches fill in `_availableCaches`
_availableCaches = {}


def loadCaches(entryPointName='large_image.cache', sourceDict=_availableCaches):
"""
Load all caches from entrypoints and add them to the
availableCaches dictionary.
:param entryPointName: the name of the entry points to load.
:param sourceDict: a dictionary to populate with the loaded caches.
"""
if len(_availableCaches):
return
epoints = entry_points()
if entryPointName in epoints:
for entryPoint in epoints[entryPointName]:
try:
cacheClass = entryPoint.load()
sourceDict[entryPoint.name.lower()] = cacheClass
config.getConfig('logprint').debug(f'Loaded cache {entryPoint.name}')
except Exception:
config.getConfig('logprint').exception(
f'Failed to load cache {entryPoint.name}'
)
# Load memcached last for now
if MemCache is not None:
# TODO: put this in an entry point for a new package
_availableCaches['memcached'] = MemCache
# NOTE: `python` cache is viewed as a fallback and isn't listed in `availableCaches`


def pickAvailableCache(sizeEach, portion=8, maxItems=None, cacheName=None):
"""
Expand Down Expand Up @@ -64,6 +101,26 @@ def pickAvailableCache(sizeEach, portion=8, maxItems=None, cacheName=None):
return numItems


def getFirstAvailableCache():
cacheBackend = config.getConfig('cache_backend', None)
if cacheBackend is not None:
raise ValueError('cache_backend already set')
loadCaches()
cache, cacheLock = None, None
for cacheBackend in _availableCaches:
try:
cache, cacheLock = _availableCaches[cacheBackend].getCache()
break
except TileCacheError:
continue
if cache is not None:
config.getConfig('logprint').info(
f'Automatically setting `{cacheBackend}` as cache_backend from availableCaches'
)
config.setConfig('cache_backend', cacheBackend)
return cache, cacheLock


class CacheFactory:
logged = False

Expand All @@ -88,38 +145,28 @@ def getCacheSize(self, numItems, cacheName=None):
pass
return numItems

def getCache(self, numItems=None, cacheName=None):
# memcached is the fallback default, if available.
cacheBackend = config.getConfig('cache_backend', 'python')
if cacheBackend:
cacheBackend = str(cacheBackend).lower()
def getCache(self, numItems=None, cacheName=None, inProcess=False):
loadCaches()

# Default to `python` cache for inProcess
cacheBackend = config.getConfig('cache_backend', 'python' if inProcess else None)

if isinstance(cacheBackend, str):
cacheBackend = cacheBackend.lower()

cache = None
if cacheBackend == 'memcached' and MemCache and numItems is None:
# lock needed because pylibmc(memcached client) is not threadsafe
cacheLock = threading.Lock()
if not inProcess and cacheBackend in _availableCaches:
cache, cacheLock = _availableCaches[cacheBackend].getCache()
elif not inProcess and cacheBackend is None:
cache, cacheLock = getFirstAvailableCache()

# check if credentials and location exist, otherwise assume
# location is 127.0.0.1 (localhost) with no password
url = config.getConfig('cache_memcached_url')
if not url:
url = '127.0.0.1'
memcachedUsername = config.getConfig('cache_memcached_username')
if not memcachedUsername:
memcachedUsername = None
memcachedPassword = config.getConfig('cache_memcached_password')
if not memcachedPassword:
memcachedPassword = None
try:
cache = MemCache(url, memcachedUsername, memcachedPassword,
mustBeAvailable=True)
except Exception:
config.getConfig('logger').info('Cannot use memcached for caching.')
cache = None
if cache is None: # fallback backend
if cache is None: # fallback backend or inProcess
cacheBackend = 'python'
cache = cachetools.LRUCache(self.getCacheSize(numItems, cacheName=cacheName))
cacheLock = threading.Lock()
if numItems is None and not CacheFactory.logged:
config.getConfig('logprint').info('Using %s for large_image caching' % cacheBackend)

if not inProcess and not CacheFactory.logged:
config.getConfig('logprint').info(f'Using {cacheBackend} for large_image caching')
CacheFactory.logged = True

return cache, cacheLock
60 changes: 31 additions & 29 deletions large_image/cache_util/memcache.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
# limitations under the License.
#############################################################################

import hashlib
import threading
import time

import cachetools
from typing import Tuple

from .. import config
from .base import BaseCache


class MemCache(cachetools.Cache):
class MemCache(BaseCache):
"""Use memcached as the backing cache."""

def __init__(self, url='127.0.0.1', username=None, password=None,
Expand Down Expand Up @@ -56,8 +56,6 @@ def __init__(self, url='127.0.0.1', username=None, password=None,
self._client['large_image_cache_test'] = time.time()
self._clientParams = (url, dict(
binary=True, username=username, password=password, behaviors=behaviors))
self.lastError = {}
self.throttleErrors = 10 # seconds between logging errors

def __repr__(self):
return "Memcache doesn't list its keys"
Expand All @@ -75,31 +73,11 @@ def __contains__(self, key):
return None

def __delitem__(self, key):
hashedKey = hashlib.sha256(key.encode()).hexdigest()
hashedKey = self._hashKey(key)
del self._client[hashedKey]

def logError(self, err, func, msg):
"""
Log errors, but throttle them so as not to spam the logs.
:param err: error to log.
:param func: function to use for logging. This is something like
logprint.exception or logger.error.
:param msg: the message to log.
"""
curtime = time.time()
key = (err, func)
if (curtime - self.lastError.get(key, {}).get('time', 0) > self.throttleErrors):
skipped = self.lastError.get(key, {}).get('skipped', 0)
if skipped:
msg += ' (%d similar messages)' % skipped
self.lastError[key] = {'time': curtime, 'skipped': 0}
func(msg)
else:
self.lastError[key]['skipped'] += 1

def __getitem__(self, key):
hashedKey = hashlib.sha256(key.encode()).hexdigest()
hashedKey = self._hashKey(key)
try:
return self._client[hashedKey]
except KeyError:
Expand All @@ -114,7 +92,7 @@ def __getitem__(self, key):
return self.__missing__(key)

def __setitem__(self, key, value):
hashedKey = hashlib.sha256(key.encode()).hexdigest()
hashedKey = self._hashKey(key)
try:
self._client[hashedKey] = value
except (TypeError, KeyError) as exc:
Expand Down Expand Up @@ -166,3 +144,27 @@ def _getStat(self, key):

def clear(self):
self._client.flush_all()

@staticmethod
def getCache() -> Tuple['MemCache', threading.Lock]:
# lock needed because pylibmc(memcached client) is not threadsafe
cacheLock = threading.Lock()

# check if credentials and location exist, otherwise assume
# location is 127.0.0.1 (localhost) with no password
url = config.getConfig('cache_memcached_url')
if not url:
url = '127.0.0.1'
memcachedUsername = config.getConfig('cache_memcached_username')
if not memcachedUsername:
memcachedUsername = None
memcachedPassword = config.getConfig('cache_memcached_password')
if not memcachedPassword:
memcachedPassword = None
try:
cache = MemCache(url, memcachedUsername, memcachedPassword,
mustBeAvailable=True)
except Exception:
config.getConfig('logger').info('Cannot use memcached for caching.')
cache = None
return cache, cacheLock
2 changes: 1 addition & 1 deletion large_image/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
'logprint': fallbackLogger,

# For tiles
'cache_backend': 'python', # 'python' or 'memcached'
'cache_backend': None, # 'python' or 'memcached'
# 'python' cache can use 1/(val) of the available memory
'cache_python_memory_portion': 32,
# cache_memcached_url may be a list
Expand Down
8 changes: 8 additions & 0 deletions large_image/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ def __init__(self, *args, **kwargs):
return super().__init__(errno.ENOENT, *args, **kwargs)


class TileCacheError(TileGeneralError):
pass


class TileCacheConfigurationError(TileCacheError):
pass


TileGeneralException = TileGeneralError
TileSourceException = TileSourceError
TileSourceAssetstoreException = TileSourceAssetstoreError

0 comments on commit 58fb8a2

Please sign in to comment.