diff --git a/.DS_Store b/.DS_Store
new file mode 100644
index 0000000..17be9f1
Binary files /dev/null and b/.DS_Store differ
diff --git a/.idea/.gitignore b/.idea/.gitignore
new file mode 100644
index 0000000..26d3352
--- /dev/null
+++ b/.idea/.gitignore
@@ -0,0 +1,3 @@
+# Default ignored files
+/shelf/
+/workspace.xml
diff --git a/.idea/alluxiofs.iml b/.idea/alluxiofs.iml
new file mode 100644
index 0000000..fa73b3a
--- /dev/null
+++ b/.idea/alluxiofs.iml
@@ -0,0 +1,15 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml
new file mode 100644
index 0000000..cc5462d
--- /dev/null
+++ b/.idea/inspectionProfiles/profiles_settings.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
diff --git a/.idea/misc.xml b/.idea/misc.xml
new file mode 100644
index 0000000..baaff83
--- /dev/null
+++ b/.idea/misc.xml
@@ -0,0 +1,7 @@
+
+
+
+
+
+
+
diff --git a/.idea/modules.xml b/.idea/modules.xml
new file mode 100644
index 0000000..799eb9a
--- /dev/null
+++ b/.idea/modules.xml
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
new file mode 100644
index 0000000..dcb6b8c
--- /dev/null
+++ b/.idea/vcs.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
diff --git a/alluxiofs/.DS_Store b/alluxiofs/.DS_Store
new file mode 100644
index 0000000..90129ea
Binary files /dev/null and b/alluxiofs/.DS_Store differ
diff --git a/alluxiofs/client/.DS_Store b/alluxiofs/client/.DS_Store
index 0cce2b1..c53c615 100644
Binary files a/alluxiofs/client/.DS_Store and b/alluxiofs/client/.DS_Store differ
diff --git a/alluxiofs/client/config.py b/alluxiofs/client/config.py
new file mode 100644
index 0000000..8745e6e
--- /dev/null
+++ b/alluxiofs/client/config.py
@@ -0,0 +1,89 @@
+from typing import Optional
+
+import humanfriendly
+
+from .const import ALLUXIO_CLUSTER_NAME_DEFAULT_VALUE
+from .const import ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE
+from .const import ALLUXIO_PAGE_SIZE_DEFAULT_VALUE
+from .const import ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE
+
+
+class AlluxioClientConfig:
+ """
+ Class responsible for creating the configuration for Alluxio Client.
+ """
+
+ def __init__(
+ self,
+ etcd_hosts: Optional[str] = None,
+ worker_hosts: Optional[str] = None,
+ etcd_port=2379,
+ worker_http_port=ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE,
+ etcd_refresh_workers_interval=120,
+ page_size=ALLUXIO_PAGE_SIZE_DEFAULT_VALUE,
+ hash_node_per_worker=ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE,
+ cluster_name=ALLUXIO_CLUSTER_NAME_DEFAULT_VALUE,
+ etcd_username: Optional[str] = None,
+ etcd_password: Optional[str] = None,
+ concurrency=64,
+ **kwargs,
+ ):
+ """
+ Initializes Alluxio client configuration.
+ Args:
+ etcd_hosts (Optional[str], optional): The hostnames of ETCD to get worker addresses from
+ in 'host1,host2,host3' format. Either etcd_hosts or worker_hosts should be provided, not both.
+ worker_hosts (Optional[str], optional): The worker hostnames in 'host1,host2,host3' format.
+ Either etcd_hosts or worker_hosts should be provided, not both.
+ concurrency (int, optional): The maximum number of concurrent operations for HTTP requests, default to 64.
+ etcd_port (int, optional): The port of each etcd server.
+ worker_http_port (int, optional): The port of the HTTP server on each Alluxio worker node.
+ etcd_refresh_workers_interval (int, optional): The interval to refresh worker list from ETCD membership service periodically.
+ All negative values mean the service is disabled.
+ """
+
+ assert (
+ etcd_hosts or worker_hosts
+ ), "Must supply either 'etcd_hosts' or 'worker_hosts'"
+
+ assert not (
+ etcd_hosts and worker_hosts
+ ), "Supply either 'etcd_hosts' or 'worker_hosts', not both"
+
+ assert isinstance(etcd_port, int) and (
+ 1 <= etcd_port <= 65535
+ ), "'etcd_port' should be an integer in the range 1-65535"
+
+ assert isinstance(worker_http_port, int) and (
+ 1 <= worker_http_port <= 65535
+ ), "'worker_http_port' should be an integer in the range 1-65535"
+
+ assert (
+ isinstance(concurrency, int) and concurrency > 0
+ ), "'concurrency' should be a positive integer"
+
+ assert isinstance(
+ etcd_refresh_workers_interval, int
+ ), "'etcd_refresh_workers_interval' should be an integer"
+
+ self.etcd_hosts = etcd_hosts
+ self.worker_hosts = worker_hosts
+ self.etcd_port = etcd_port
+ self.worker_http_port = worker_http_port
+ self.etcd_refresh_workers_interval = etcd_refresh_workers_interval
+
+ assert (
+ isinstance(hash_node_per_worker, int) and hash_node_per_worker > 0
+ ), "'hash_node_per_worker' should be a positive integer"
+
+ self.hash_node_per_worker = hash_node_per_worker
+ self.page_size = humanfriendly.parse_size(page_size, binary=True)
+ self.cluster_name = cluster_name
+
+ assert (etcd_username is None) == (
+ etcd_password is None
+ ), "Both ETCD username and password must be set or both should be unset."
+
+ self.etcd_username = etcd_username
+ self.etcd_password = etcd_password
+ self.concurrency = concurrency
diff --git a/alluxiofs/client/const.py b/alluxiofs/client/const.py
index a699944..ad13ff5 100644
--- a/alluxiofs/client/const.py
+++ b/alluxiofs/client/const.py
@@ -8,8 +8,6 @@
# See the NOTICE file distributed with this work for information regarding copyright ownership.
ALLUXIO_CLUSTER_NAME_KEY = "alluxio.cluster.name"
ALLUXIO_CLUSTER_NAME_DEFAULT_VALUE = "DefaultAlluxioCluster"
-ALLUXIO_ETCD_USERNAME_KEY = "alluxio.etcd.username"
-ALLUXIO_ETCD_PASSWORD_KEY = "alluxio.etcd.password"
ALLUXIO_PAGE_SIZE_KEY = "alluxio.worker.page.store.page.size"
ALLUXIO_PAGE_SIZE_DEFAULT_VALUE = "1MB"
ALLUXIO_HASH_NODE_PER_WORKER_KEY1 = (
diff --git a/alluxiofs/client/core.py b/alluxiofs/client/core.py
index dafd1c9..4060cb7 100644
--- a/alluxiofs/client/core.py
+++ b/alluxiofs/client/core.py
@@ -23,24 +23,10 @@
import requests
from requests.adapters import HTTPAdapter
-from .utils import set_log_level
-
-try:
- from alluxiocommon import _DataManager
-except ModuleNotFoundError:
- print(
- "[WARNING]pkg 'alluxiocommon' not installed, relative modules unable to invoke."
- )
-
-from .const import ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE
-from .const import ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE
-from .const import ALLUXIO_COMMON_EXTENSION_ENABLE
-from .const import ALLUXIO_HASH_NODE_PER_WORKER_KEY1
-from .const import ALLUXIO_HASH_NODE_PER_WORKER_KEY2
+from .config import AlluxioClientConfig
from .const import ALLUXIO_PAGE_SIZE_DEFAULT_VALUE
from .const import ALLUXIO_PAGE_SIZE_KEY
from .const import ALLUXIO_SUCCESS_IDENTIFIER
-from .const import ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE
from .const import FULL_PAGE_URL_FORMAT
from .const import GET_FILE_STATUS_URL_FORMAT
from .const import LIST_URL_FORMAT
@@ -49,8 +35,16 @@
from .const import LOAD_URL_FORMAT
from .const import PAGE_URL_FORMAT
from .const import WRITE_PAGE_URL_FORMAT
+from .utils import set_log_level
from .worker_ring import ConsistentHashProvider
+# try:
+# from alluxiocommon import _DataManager
+# except ModuleNotFoundError:
+# print(
+# "[WARNING]pkg 'alluxiocommon' not installed, relative modules unable to invoke."
+# )
+
logger = logging.getLogger(__name__)
@@ -91,160 +85,27 @@ class OpType(Enum):
class AlluxioClient:
"""
- Access Alluxio file system
-
- Examples
- --------
- >>> # Launch Alluxio with ETCD as service discovery
- >>> alluxio = AlluxioClient(etcd_hosts="localhost")
- >>> # Or launch Alluxio with user provided worker list
- >>> alluxio = AlluxioClient(worker_hosts="host1,host2,host3")
-
- >>> print(alluxio.listdir("s3://mybucket/mypath/dir"))
- [
- {
- type: "file",
- name: "my_file_name",
- path: '/my_file_name',
- ufs_path: 's3://example-bucket/my_file_name',
- last_modification_time_ms: 0,
- length: 77542,
- human_readable_file_size: '75.72KB'
- },
-
- ]
- >>> print(alluxio.read("s3://mybucket/mypath/dir/myfile"))
- my_file_content
+ An AlluxioClient for interacting with Alluxio servers.
"""
def __init__(
self,
- etcd_hosts=None,
- worker_hosts=None,
- options=None,
- concurrency=64,
- etcd_port=2379,
- worker_http_port=ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE,
- etcd_refresh_workers_interval=120,
test_options=None,
+ **kwargs,
):
"""
- Inits Alluxio file system.
-
- Args:
- etcd_hosts (str, optional):
- The hostnames of ETCD to get worker addresses from
- The hostnames in host1,host2,host3 format. Either etcd_hosts or worker_hosts should be provided, not both.
- worker_hosts (str, optional):
- The worker hostnames in host1,host2,host3 format. Either etcd_hosts or worker_hosts should be provided, not both.
- options (dict, optional):
- A dictionary of Alluxio property key and values.
- Note that Alluxio Python API only support a limited set of Alluxio properties.
- concurrency (int, optional):
- The maximum number of concurrent operations for HTTP requests. Default to 64.
- etcd_port (int, optional):
- The port of each etcd server.
- worker_http_port (int, optional):
- The port of the HTTP server on each Alluxio worker node.
- etcd_refresh_workers_interval(int, optional):
- The interval to refresh worker list from ETCD membership service periodically. All negative values mean the service is disabled.
-
+ Inits Alluxio Client with Alluxio client arguments.
+ See AlluxioClientConfig for configurations.
"""
- # TODO(lu/chunxu) change to ETCD endpoints in format of 'http://etcd_host:port, http://etcd_host:port' & worker hosts in 'host:port, host:port' format
- if not (etcd_hosts or worker_hosts):
- raise ValueError(
- "Must supply either 'etcd_hosts' or 'worker_hosts'"
- )
- if etcd_hosts and worker_hosts:
- raise ValueError(
- "Supply either 'etcd_hosts' or 'worker_hosts', not both"
- )
- if not etcd_hosts:
- logger.warning(
- "'etcd_hosts' not supplied. An etcd cluster is required for dynamic cluster changes."
- )
- if not isinstance(etcd_port, int) or not (1 <= etcd_port <= 65535):
- raise ValueError(
- "'etcd_port' should be an integer in the range 1-65535"
- )
- if not isinstance(worker_http_port, int) or not (
- 1 <= worker_http_port <= 65535
- ):
- raise ValueError(
- "'worker_http_port' should be an integer in the range 1-65535"
- )
- if not isinstance(concurrency, int) or concurrency <= 0:
- raise ValueError("'concurrency' should be a positive integer")
- if concurrency < 10 or concurrency > 128:
- logger.warning(
- f"'concurrency' value of {concurrency} is outside the recommended range (10-128). "
- "This may lead to suboptimal performance or resource utilization.",
- )
- if not isinstance(etcd_refresh_workers_interval, int):
- raise ValueError(
- "'etcd_refresh_workers_interval' should be an integer"
- )
- self.session = self._create_session(concurrency)
+ self.logger = kwargs.get("logger", logging.getLogger("Alluxiofs"))
+ self.config = AlluxioClientConfig(**kwargs)
+ self.session = self._create_session(self.config.concurrency)
+ self.hash_provider = ConsistentHashProvider(self.config, self.logger)
- # parse options
- page_size = ALLUXIO_PAGE_SIZE_DEFAULT_VALUE
- hash_node_per_worker = ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE
- self.data_manager = None
- if options:
- if ALLUXIO_PAGE_SIZE_KEY in options:
- page_size = options[ALLUXIO_PAGE_SIZE_KEY]
- logger.debug(f"Page size is set to {page_size}")
- if ALLUXIO_HASH_NODE_PER_WORKER_KEY1 in options:
- hash_node_per_worker = int(
- options[ALLUXIO_HASH_NODE_PER_WORKER_KEY1]
- )
- logger.debug(
- f"Hash node per worker is set to {hash_node_per_worker}"
- )
- if ALLUXIO_HASH_NODE_PER_WORKER_KEY2 in options:
- hash_node_per_worker = int(
- options[ALLUXIO_HASH_NODE_PER_WORKER_KEY2]
- )
- logger.debug(
- f"Hash node per worker is set to {hash_node_per_worker}"
- )
- if (
- ALLUXIO_COMMON_EXTENSION_ENABLE in options
- and options[ALLUXIO_COMMON_EXTENSION_ENABLE].lower() == "true"
- ):
- print("alluxiocommon extension enabled.")
- logger.info("alluxiocommon extension enabled.")
- ondemand_pool_disabled = (
- ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE in options
- and options[ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE].lower()
- == "true"
- )
- self.data_manager = _DataManager(
- concurrency, ondemand_pool_disabled=ondemand_pool_disabled
- )
- if (
- not isinstance(hash_node_per_worker, int)
- or hash_node_per_worker <= 0
- ):
- raise ValueError(
- "'hash_node_per_worker' should be a positive integer"
- )
-
- self.page_size = humanfriendly.parse_size(page_size, binary=True)
test_options = test_options or {}
set_log_level(logger, test_options)
- self.hash_provider = ConsistentHashProvider(
- etcd_hosts=etcd_hosts,
- etcd_port=etcd_port,
- worker_hosts=worker_hosts,
- worker_http_port=worker_http_port,
- hash_node_per_worker=hash_node_per_worker,
- options=options,
- etcd_refresh_workers_interval=etcd_refresh_workers_interval,
- )
-
def listdir(self, path):
"""
Lists the directory.
@@ -683,7 +544,7 @@ def _all_page_generator(self, worker_host, worker_http_port, path_id):
if not page_content:
break
yield page_content
- if len(page_content) < self.page_size: # last page
+ if len(page_content) < self.config.page_size: # last page
break
page_index += 1
@@ -723,23 +584,23 @@ def _range_page_generator_alluxiocommon(
def _range_page_generator(
self, worker_host, worker_http_port, path_id, offset, length
):
- start_page_index = offset // self.page_size
- start_page_offset = offset % self.page_size
+ start_page_index = offset // self.config.page_size
+ start_page_offset = offset % self.config.page_size
- end_page_index = (offset + length - 1) // self.page_size
- end_page_read_to = ((offset + length - 1) % self.page_size) + 1
+ end_page_index = (offset + length - 1) // self.config.page_size
+ end_page_read_to = ((offset + length - 1) % self.config.page_size) + 1
page_index = start_page_index
while True:
try:
read_offset = 0
- read_length = self.page_size
+ read_length = self.config.page_size
if page_index == start_page_index:
read_offset = start_page_offset
if start_page_index == end_page_index:
read_length = end_page_read_to - start_page_offset
else:
- read_length = self.page_size - start_page_offset
+ read_length = self.config.page_size - start_page_offset
elif page_index == end_page_index:
read_length = end_page_read_to
@@ -1004,13 +865,14 @@ def __init__(
logger.debug(f"Page size is set to {page_size}")
self.page_size = humanfriendly.parse_size(page_size, binary=True)
self.hash_provider = ConsistentHashProvider(
- etcd_hosts=etcd_hosts,
- etcd_port=int(etcd_port),
- worker_hosts=worker_hosts,
- worker_http_port=int(http_port),
- hash_node_per_worker=ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE,
- options=options,
- etcd_refresh_workers_interval=120,
+ AlluxioClientConfig(
+ etcd_hosts=etcd_hosts,
+ etcd_port=int(etcd_port),
+ worker_hosts=worker_hosts,
+ worker_http_port=int(http_port),
+ etcd_refresh_workers_interval=120,
+ page_size=page_size,
+ )
)
self.http_port = http_port
self._loop = loop or asyncio.get_event_loop()
diff --git a/alluxiofs/client/worker_ring.py b/alluxiofs/client/worker_ring.py
index 1f0a6de..52ec663 100644
--- a/alluxiofs/client/worker_ring.py
+++ b/alluxiofs/client/worker_ring.py
@@ -21,10 +21,7 @@
import mmh3
from sortedcontainers import SortedDict
-from .const import ALLUXIO_CLUSTER_NAME_DEFAULT_VALUE
-from .const import ALLUXIO_CLUSTER_NAME_KEY
-from .const import ALLUXIO_ETCD_PASSWORD_KEY
-from .const import ALLUXIO_ETCD_USERNAME_KEY
+from .config import AlluxioClientConfig
from .const import ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE
from .const import ETCD_PREFIX_FORMAT
from .utils import set_log_level
@@ -150,7 +147,9 @@ def from_host_and_port(worker_host, worker_http_port):
class EtcdClient:
- def __init__(self, host="localhost", port=2379, options=None):
+ def __init__(
+ self, config: AlluxioClientConfig, host="localhost", port=2379
+ ):
self._host = host
self._port = port
@@ -158,22 +157,11 @@ def __init__(self, host="localhost", port=2379, options=None):
self._etcd_username = None
self._etcd_password = None
self._prefix = ETCD_PREFIX_FORMAT.format(
- cluster_name=ALLUXIO_CLUSTER_NAME_DEFAULT_VALUE
+ cluster_name=config.cluster_name
)
- if options:
- if ALLUXIO_ETCD_USERNAME_KEY in options:
- self._etcd_username = options[ALLUXIO_ETCD_USERNAME_KEY]
- if ALLUXIO_ETCD_PASSWORD_KEY in options:
- self._etcd_password = options[ALLUXIO_ETCD_PASSWORD_KEY]
- if ALLUXIO_CLUSTER_NAME_KEY in options:
- self._prefix = ETCD_PREFIX_FORMAT.format(
- cluster_name=options[ALLUXIO_CLUSTER_NAME_KEY]
- )
-
- if (self._etcd_username is None) != (self._etcd_password is None):
- raise ValueError(
- "Both ETCD username and password must be set or both should be unset."
- )
+ if config.etcd_username is not None:
+ self._etcd_username = config.etcd_username
+ self._etcd_password = config.etcd_password
def get_worker_entities(self) -> Set[WorkerEntity]:
"""
@@ -184,7 +172,6 @@ def get_worker_entities(self) -> Set[WorkerEntity]:
"""
# Note that EtcdClient should not be passed through python multiprocessing
etcd = self._get_etcd_client()
- worker_entities: Set[WorkerEntity] = set()
try:
worker_entities = {
WorkerEntity.from_worker_info(worker_info)
@@ -216,36 +203,27 @@ def _get_etcd_client(self):
class ConsistentHashProvider:
def __init__(
self,
- etcd_hosts=None,
- etcd_port=None,
- worker_hosts=None,
- worker_http_port=None,
- options=None,
- etcd_refresh_workers_interval=None,
- hash_node_per_worker=None,
- max_attempts=100,
+ config: AlluxioClientConfig,
test_options=None,
):
- self._etcd_hosts = etcd_hosts
- self._etcd_port = etcd_port
- self._options = options
- self._hash_node_per_worker = hash_node_per_worker
- self._max_attempts = max_attempts
+ self._logger = logger or logging.getLogger("ConsistentHashProvider")
+ self._config = config
self._lock = threading.Lock()
self._is_ring_initialized = False
self._worker_info_map = {}
- self._etcd_refresh_workers_interval = etcd_refresh_workers_interval
- if worker_hosts:
+ if self._config.worker_hosts is not None:
self._update_hash_ring(
- self._generate_worker_info_map(worker_hosts, worker_http_port)
+ self._generate_worker_info_map(
+ self._config.worker_hosts, self._config.worker_http_port
+ )
)
- if self._etcd_hosts:
+ if self._config.etcd_hosts is not None:
self._fetch_workers_and_update_ring()
- if self._etcd_refresh_workers_interval > 0:
+ if self._config.etcd_refresh_workers_interval > 0:
self._shutdown_background_update_ring_event = threading.Event()
self._background_thread = None
self._start_background_update_ring(
- self._etcd_refresh_workers_interval
+ self._config.etcd_refresh_workers_interval
)
test_options = test_options or {}
set_log_level(logger, test_options)
@@ -287,7 +265,7 @@ def _get_multiple_worker_identities(
)
workers = []
attempts = 0
- while len(workers) < count and attempts < self._max_attempts:
+ while len(workers) < count and attempts < 100:
attempts += 1
worker = self._get_ceiling_value(self._hash(key, attempts))
if worker not in workers:
@@ -309,7 +287,10 @@ def update_loop():
self._background_thread.start()
def shutdown_background_update_ring(self):
- if self._etcd_hosts and self._etcd_refresh_workers_interval > 0:
+ if (
+ self._config.etcd_hosts is not None
+ and self._config.etcd_refresh_workers_interval > 0
+ ):
self._shutdown_background_update_ring_event.set()
if self._background_thread:
self._background_thread.join()
@@ -318,13 +299,15 @@ def __del__(self):
self.shutdown_background_update_ring()
def _fetch_workers_and_update_ring(self):
- etcd_hosts_list = self._etcd_hosts.split(",")
+ etcd_hosts_list = self._config.etcd_hosts.split(",")
random.shuffle(etcd_hosts_list)
worker_entities: Set[WorkerEntity] = set()
for host in etcd_hosts_list:
try:
worker_entities = EtcdClient(
- host=host, port=self._etcd_port, options=self._options
+ host=host,
+ port=self._config.etcd_port,
+ config=self._config,
).get_worker_entities()
break
except Exception:
@@ -337,7 +320,7 @@ def _fetch_workers_and_update_ring(self):
return
else:
raise Exception(
- f"Failed to achieve worker info list from ETCD servers:{self._etcd_hosts}"
+ f"Failed to achieve worker info list from ETCD servers:{self._config.etcd_hosts}"
)
worker_info_map = {}
@@ -366,7 +349,7 @@ def _update_hash_ring(
with self._lock:
hash_ring = SortedDict()
for worker_identity in worker_info_map.keys():
- for i in range(self._hash_node_per_worker):
+ for i in range(self._config.hash_node_per_worker):
hash_key = self._hash_worker_identity(worker_identity, i)
hash_ring[hash_key] = worker_identity
self.hash_ring = hash_ring
diff --git a/alluxiofs/core.py b/alluxiofs/core.py
index d74f14f..16142ce 100644
--- a/alluxiofs/core.py
+++ b/alluxiofs/core.py
@@ -46,16 +46,10 @@ class AlluxioFileSystem(AbstractFileSystem):
def __init__(
self,
- etcd_hosts=None,
- worker_hosts=None,
- options=None,
- concurrency=64,
- etcd_port=2379,
- worker_http_port=28080,
- preload_path=None,
target_protocol=None,
target_options=None,
fs=None,
+ alluxio_client=None,
test_options=None,
**kwargs,
):
@@ -63,32 +57,16 @@ def __init__(
Initializes an Alluxio filesystem on top of underlying filesystem
to leveraging the data caching and management features of Alluxio.
- The Alluxio args:
- etcd_hosts (str, optional): A comma-separated list of ETCD server hosts in the format "host1:port1,host2:port2,...".
- ETCD is used for dynamic discovery of Alluxio workers.
- Either `etcd_hosts` or `worker_hosts` must be specified, not both.
- worker_hosts (str, optional): A comma-separated list of Alluxio worker hosts in the format "host1:port1,host2:port2,...".
- Directly specifies workers without using ETCD.
- Either `etcd_hosts` or `worker_hosts` must be specified, not both.
- options (dict, optional): A dictionary of Alluxio configuration options where keys are property names and values are property values.
- These options configure the Alluxio client behavior.
- concurrency (int, optional): The maximum number of concurrent operations (e.g., reads, writes) that the file system interface will allow. Defaults to 64.
- etcd_port (int, optional): The port number used by each etcd server.
- Relevant only if `etcd_hosts` is specified.
- worker_http_port (int, optional): The port number used by the HTTP server on each Alluxio worker.
- This is used for accessing Alluxio's HTTP-based APIs.
- preload_path (str, optional): Specifies a path to preload into the Alluxio file system cache at initialization.
- This can be useful for ensuring that certain critical data is immediately available in the cache.
The underlying filesystem args
target_protocol (str, optional): Specifies the under storage protocol to create the under storage file system object.
Common examples include 's3' for Amazon S3, 'hdfs' for Hadoop Distributed File System, and others.
target_options (dict, optional): Provides a set of configuration options relevant to the `target_protocol`.
These options might include credentials, endpoint URLs, and other protocol-specific settings required to successfully interact with the under storage system.
fs (object, optional): Directly supplies an instance of a file system object for accessing the underlying storage of Alluxio
- Other args:
- test_options (dict, optional): A dictionary of options used exclusively for testing purposes.
- These might include mock interfaces or specific configuration overrides for test scenarios.
- **kwargs: other parameters for core session.
+ The Alluxio client args:
+ alluxio_client (AlluxioClient, Optional): the alluxio client to connects to Alluxio servers.
+ If not provided, please add Alluxio client arguments to init a new Alluxio Client.
+ **kwargs: other parameters for initializing Alluxio client or fsspec.
"""
super().__init__(**kwargs)
if fs and target_protocol:
@@ -102,6 +80,8 @@ def __init__(
"provided. Will not fall back to under file systems when "
"accessed files are not in Alluxiofs"
)
+
+ self.logger = kwargs.get("logger", logging.getLogger("Alluxiofs"))
self.kwargs = target_options or {}
self.fs = None
self.target_protocol = None
@@ -120,22 +100,18 @@ def __init__(
elif target_protocol is not None:
self.fs = filesystem(target_protocol, **self.kwargs)
self.target_protocol = target_protocol
- test_options = test_options or {}
- set_log_level(logger, test_options)
- if test_options.get("skip_alluxio") is True:
+
+ skip_alluxio = kwargs.get("skip_alluxio", False)
+ if skip_alluxio:
self.alluxio = None
+ elif alluxio_client:
+ self.alluxio = alluxio_client
else:
- self.alluxio = AlluxioClient(
- etcd_hosts=etcd_hosts,
- worker_hosts=worker_hosts,
- options=options,
- concurrency=concurrency,
- etcd_port=etcd_port,
- worker_http_port=worker_http_port,
- test_options=test_options,
- )
- if preload_path is not None:
- self.alluxio.load(preload_path)
+ self.alluxio = AlluxioClient(**kwargs)
+
+ # TODO: check test options
+ test_options = test_options or {}
+ set_log_level(logger, test_options)
def _strip_alluxiofs_protocol(path):
def _strip_individual_path(p):
diff --git a/tests/client/test_worker_hash_ring.py b/tests/client/test_worker_hash_ring.py
index a112917..68deb06 100644
--- a/tests/client/test_worker_hash_ring.py
+++ b/tests/client/test_worker_hash_ring.py
@@ -1,6 +1,7 @@
import json
import os
+from alluxiofs.client.config import AlluxioClientConfig
from alluxiofs.client.worker_ring import ConsistentHashProvider
from alluxiofs.client.worker_ring import WorkerIdentity
from alluxiofs.client.worker_ring import WorkerNetAddress
@@ -14,9 +15,10 @@ def test_hash_ring():
worker_hostnames = json.load(file)
hash_provider = ConsistentHashProvider(
- worker_hosts=", ".join(worker_hostnames),
- hash_node_per_worker=5,
- etcd_refresh_workers_interval=100000000,
+ AlluxioClientConfig(
+ worker_hosts=", ".join(worker_hostnames),
+ etcd_refresh_workers_interval=100000000,
+ )
)
hash_ring_path = os.path.join(hash_res_dir, "activeNodesMap.json")
diff --git a/tests/fs/fallback/local/local_fallback_fixtures.py b/tests/fs/fallback/local/local_fallback_fixtures.py
index cf1417f..89dcc37 100644
--- a/tests/fs/fallback/local/local_fallback_fixtures.py
+++ b/tests/fs/fallback/local/local_fallback_fixtures.py
@@ -12,7 +12,7 @@ def fs(self):
etcd_hosts="localhost",
target_protocol="file",
target_options={"auto_mkdir": True},
- test_options={"skip_alluxio": True},
+ skip_alluxio=True,
)
@pytest.fixture
diff --git a/tests/fs/fallback/memory/memory_fallback_fixtures.py b/tests/fs/fallback/memory/memory_fallback_fixtures.py
index 2af5011..2ed5903 100644
--- a/tests/fs/fallback/memory/memory_fallback_fixtures.py
+++ b/tests/fs/fallback/memory/memory_fallback_fixtures.py
@@ -16,7 +16,7 @@ def fs(self):
yield AlluxioFileSystem(
etcd_hosts="localhost",
fs=m,
- test_options={"skip_alluxio": True},
+ skip_alluxio=True,
)
finally:
m.store.clear()
diff --git a/tests/fs/fallback/s3/s3_fallback_fixtures.py b/tests/fs/fallback/s3/s3_fallback_fixtures.py
index fb41a87..b911b18 100644
--- a/tests/fs/fallback/s3/s3_fallback_fixtures.py
+++ b/tests/fs/fallback/s3/s3_fallback_fixtures.py
@@ -10,7 +10,7 @@ def fs(self):
return AlluxioFileSystem(
etcd_hosts="localhost",
target_protocol="s3",
- test_options={"skip_alluxio": True},
+ skip_alluxio=True,
)
@pytest.fixture