"""
filesystem.py
==============
This module contains functions for handling filesystems via model.
"""
import os
import pathlib
import contextlib
from typing import Callable, Dict, Iterator
import fsspec
from fsspec.implementations.local import LocalFileSystem
from s3fs import S3FileSystem
from swiftspec import SWIFTFileSystem
from sshfs import SSHFileSystem
from urllib.parse import urlparse
from swiftclient import Connection
from .model import FilesystemConfig, FilesystemType
def _get_swift_authuthentication_environment(
config: FilesystemConfig,
) -> Dict[str, str]:
swift_config = config.swift
if not swift_config:
raise TypeError("FilesystemConfig.swift cannot be None")
conn = Connection(
authurl=swift_config.authurl,
user=swift_config.user,
key=swift_config.key,
tenant_name=swift_config.tenant_name,
auth_version=swift_config.auth_version,
os_options={"region_name": swift_config.region_name},
)
url, token = conn.get_auth()
return {
"OS_STORAGE_URL": url,
"OS_AUTH_TOKEN": token,
"SWIFT_STORAGE_URL": url,
"SWIFT_AUTH_TOKEN": token,
}
def _get_s3_authuthentication_environment(config: FilesystemConfig) -> Dict[str, str]:
s3_config = config.s3
if not s3_config:
raise TypeError("FilesystemConfig.s3 cannot be None")
return {
"AWS_ACCESS_KEY_ID": s3_config.access_key_id,
"AWS_SECRET_ACCESS_KEY": s3_config.secret_access_key,
"AWS_DEFAULT_REGION": s3_config.region,
"AWS_S3_ENDPOINT": s3_config.endpoint_url or "https://s3.amazonaws.com",
"AWS_ENDPOINT_URL": s3_config.endpoint_url or "https://s3.amazonaws.com",
"AWS_ENDPOINT_URL_S3": s3_config.endpoint_url or "https://s3.amazonaws.com",
"AWS_NO_SIGN_REQUEST": "YES" if s3_config.public else "NO",
"FSSPEC_S3_ENDPOINT_URL": s3_config.endpoint_url or "https://s3.amazonaws.com",
"FSSPEC_S3_KEY": s3_config.access_key_id,
"FSSPEC_S3_SECRET": s3_config.secret_access_key,
}
def _get_null_authuthentication_environment(config: FilesystemConfig) -> Dict[str, str]:
return {}
GetAuthVarFunction = Callable[[FilesystemConfig], Dict[str, str]]
AUTH_MAP: Dict[FilesystemType, GetAuthVarFunction] = {
FilesystemType.swift: _get_swift_authuthentication_environment,
FilesystemType.s3: _get_s3_authuthentication_environment,
FilesystemType.file: _get_null_authuthentication_environment,
FilesystemType.sshfs: _get_null_authuthentication_environment,
}
def _set_environment_variables(env) -> Dict[str, str]:
old_values: Dict[str, str] = {}
for key, value in env.items():
old_value = os.environ.get(key, "")
old_values[key] = old_value
os.environ[key] = value
return old_values
def _clean_environment_variables(env: Dict[str, str]) -> None:
for key, value in env.items():
if not value:
os.environ.pop(key)
else:
os.environ[key] = value
[docs]
@contextlib.contextmanager
def authentication_environment(
filesystem_config: FilesystemConfig,
) -> Iterator:
"""Set appropriate authentication environment variables from the given config
Args:
auth_config (FilesystemConfig): Storage authentication configuration
"""
env = AUTH_MAP[filesystem_config.type](filesystem_config)
old_env = _set_environment_variables(env)
yield
_clean_environment_variables(old_env)
[docs]
def upload(from_path: str, to_path: str) -> str:
"""Uploads file to given path
Args:
from_path (str): File path to upload
to_path (str): Destination path
Returns:
str: Final path of file
"""
protocol = urlparse(to_path).scheme or "file"
file_name = pathlib.Path(from_path).name
storage_options = {}
if protocol == "file":
storage_options["auto_mkdir"] = True
fs: fsspec.AbstractFileSystem = fsspec.filesystem(protocol, **storage_options)
final_path = os.path.join(to_path, file_name)
fs.upload(from_path, final_path, recursive=True)
return final_path
FilesystemCreationFunction = Callable[[FilesystemConfig], fsspec.AbstractFileSystem]
def _create_local_filesystem(
filesystem_config: FilesystemConfig,
) -> fsspec.AbstractFileSystem:
return LocalFileSystem(auto_mkdir=True)
def _create_s3_filesystem(
filesystem_config: FilesystemConfig,
) -> fsspec.AbstractFileSystem:
s3_config = filesystem_config.s3
if not s3_config:
raise TypeError("FilesystemConfig.s3 cannot be None")
return S3FileSystem(
key=s3_config.access_key_id,
secret=s3_config.secret_access_key,
endpoint_url=s3_config.endpoint_url,
)
def _create_swift_filesystem(
filesystem_config: FilesystemConfig,
) -> fsspec.AbstractFileSystem:
swift_config = filesystem_config.swift
if not swift_config:
raise TypeError("FilesystemConfig.swift cannot be None")
auth_env = _get_swift_authuthentication_environment(filesystem_config)
auth = [
{"token": auth_env["SWIFT_AUTH_TOKEN"], "url": auth_env["SWIFT_STORAGE_URL"]}
]
return SWIFTFileSystem(auth=auth)
def _create_sshfs_filesystem(
filesystem_config: FilesystemConfig,
) -> fsspec.AbstractFileSystem:
sshfs_config = filesystem_config.sshfs
if not sshfs_config:
raise TypeError("FilesystemConfig.sftp cannot be None")
return SSHFileSystem(
host=sshfs_config.host,
username=sshfs_config.username,
password=sshfs_config.password,
)
FILESYSTEM_MAP: Dict[FilesystemType, FilesystemCreationFunction] = {
FilesystemType.file: _create_local_filesystem,
FilesystemType.s3: _create_s3_filesystem,
FilesystemType.swift: _create_swift_filesystem,
FilesystemType.sshfs: _create_sshfs_filesystem,
}
[docs]
def get_filesystem(filesystem_config: FilesystemConfig) -> fsspec.AbstractFileSystem:
"""Initialize appropriate filesystem from given config
Args:
filesystem_config (FilesystemConfig): Filesystem configuration
Returns:
fsspec.AbstractFileSystem: fsspec filesystem
"""
filesystem_type = filesystem_config.type
filesystem_f = FILESYSTEM_MAP[filesystem_type]
return filesystem_f(filesystem_config)