Source code for vs_common.filesystem

"""
filesystem.py
==============
This module contains functions for handling filesystems via model.
"""

import os
import pathlib
import contextlib
from typing import Callable, Dict, Iterator

import fsspec
from fsspec.implementations.local import LocalFileSystem
from s3fs import S3FileSystem
from swiftspec import SWIFTFileSystem
from sshfs import SSHFileSystem
from urllib.parse import urlparse
from swiftclient import Connection

from .model import FilesystemConfig, FilesystemType


def _get_swift_authuthentication_environment(
    config: FilesystemConfig,
) -> Dict[str, str]:
    swift_config = config.swift
    if not swift_config:
        raise TypeError("FilesystemConfig.swift cannot be None")

    conn = Connection(
        authurl=swift_config.authurl,
        user=swift_config.user,
        key=swift_config.key,
        tenant_name=swift_config.tenant_name,
        auth_version=swift_config.auth_version,
        os_options={"region_name": swift_config.region_name},
    )

    url, token = conn.get_auth()
    return {
        "OS_STORAGE_URL": url,
        "OS_AUTH_TOKEN": token,
        "SWIFT_STORAGE_URL": url,
        "SWIFT_AUTH_TOKEN": token,
    }


def _get_s3_authuthentication_environment(config: FilesystemConfig) -> Dict[str, str]:
    s3_config = config.s3
    if not s3_config:
        raise TypeError("FilesystemConfig.s3 cannot be None")

    return {
        "AWS_ACCESS_KEY_ID": s3_config.access_key_id,
        "AWS_SECRET_ACCESS_KEY": s3_config.secret_access_key,
        "AWS_DEFAULT_REGION": s3_config.region,
        "AWS_S3_ENDPOINT": s3_config.endpoint_url or "https://s3.amazonaws.com",
        "AWS_ENDPOINT_URL": s3_config.endpoint_url or "https://s3.amazonaws.com",
        "AWS_ENDPOINT_URL_S3": s3_config.endpoint_url or "https://s3.amazonaws.com",
        "AWS_NO_SIGN_REQUEST": "YES" if s3_config.public else "NO",
        "FSSPEC_S3_ENDPOINT_URL": s3_config.endpoint_url or "https://s3.amazonaws.com",
        "FSSPEC_S3_KEY": s3_config.access_key_id,
        "FSSPEC_S3_SECRET": s3_config.secret_access_key,
    }


def _get_null_authuthentication_environment(config: FilesystemConfig) -> Dict[str, str]:
    return {}


GetAuthVarFunction = Callable[[FilesystemConfig], Dict[str, str]]

AUTH_MAP: Dict[FilesystemType, GetAuthVarFunction] = {
    FilesystemType.swift: _get_swift_authuthentication_environment,
    FilesystemType.s3: _get_s3_authuthentication_environment,
    FilesystemType.file: _get_null_authuthentication_environment,
    FilesystemType.sshfs: _get_null_authuthentication_environment,
}


def _set_environment_variables(env) -> Dict[str, str]:
    old_values: Dict[str, str] = {}
    for key, value in env.items():
        old_value = os.environ.get(key, "")
        old_values[key] = old_value
        os.environ[key] = value

    return old_values


def _clean_environment_variables(env: Dict[str, str]) -> None:
    for key, value in env.items():
        if not value:
            os.environ.pop(key)
        else:
            os.environ[key] = value


[docs] @contextlib.contextmanager def authentication_environment( filesystem_config: FilesystemConfig, ) -> Iterator: """Set appropriate authentication environment variables from the given config Args: auth_config (FilesystemConfig): Storage authentication configuration """ env = AUTH_MAP[filesystem_config.type](filesystem_config) old_env = _set_environment_variables(env) yield _clean_environment_variables(old_env)
[docs] def upload(from_path: str, to_path: str) -> str: """Uploads file to given path Args: from_path (str): File path to upload to_path (str): Destination path Returns: str: Final path of file """ protocol = urlparse(to_path).scheme or "file" file_name = pathlib.Path(from_path).name storage_options = {} if protocol == "file": storage_options["auto_mkdir"] = True fs: fsspec.AbstractFileSystem = fsspec.filesystem(protocol, **storage_options) final_path = os.path.join(to_path, file_name) fs.upload(from_path, final_path, recursive=True) return final_path
FilesystemCreationFunction = Callable[[FilesystemConfig], fsspec.AbstractFileSystem] def _create_local_filesystem( filesystem_config: FilesystemConfig, ) -> fsspec.AbstractFileSystem: return LocalFileSystem(auto_mkdir=True) def _create_s3_filesystem( filesystem_config: FilesystemConfig, ) -> fsspec.AbstractFileSystem: s3_config = filesystem_config.s3 if not s3_config: raise TypeError("FilesystemConfig.s3 cannot be None") return S3FileSystem( key=s3_config.access_key_id, secret=s3_config.secret_access_key, endpoint_url=s3_config.endpoint_url, ) def _create_swift_filesystem( filesystem_config: FilesystemConfig, ) -> fsspec.AbstractFileSystem: swift_config = filesystem_config.swift if not swift_config: raise TypeError("FilesystemConfig.swift cannot be None") auth_env = _get_swift_authuthentication_environment(filesystem_config) auth = [ {"token": auth_env["SWIFT_AUTH_TOKEN"], "url": auth_env["SWIFT_STORAGE_URL"]} ] return SWIFTFileSystem(auth=auth) def _create_sshfs_filesystem( filesystem_config: FilesystemConfig, ) -> fsspec.AbstractFileSystem: sshfs_config = filesystem_config.sshfs if not sshfs_config: raise TypeError("FilesystemConfig.sftp cannot be None") return SSHFileSystem( host=sshfs_config.host, username=sshfs_config.username, password=sshfs_config.password, ) FILESYSTEM_MAP: Dict[FilesystemType, FilesystemCreationFunction] = { FilesystemType.file: _create_local_filesystem, FilesystemType.s3: _create_s3_filesystem, FilesystemType.swift: _create_swift_filesystem, FilesystemType.sshfs: _create_sshfs_filesystem, }
[docs] def get_filesystem(filesystem_config: FilesystemConfig) -> fsspec.AbstractFileSystem: """Initialize appropriate filesystem from given config Args: filesystem_config (FilesystemConfig): Filesystem configuration Returns: fsspec.AbstractFileSystem: fsspec filesystem """ filesystem_type = filesystem_config.type filesystem_f = FILESYSTEM_MAP[filesystem_type] return filesystem_f(filesystem_config)