ovo.core.storage¶
Module Contents¶
Classes¶
Class for managing file storage in local filesystem or S3 bucket |
|
API¶
- class ovo.core.storage.Storage(storage_root: str, aws: ovo.core.aws.AWSSessionManager | None, verbose: bool = False, num_copy_threads: int | None = None, archive_method: Literal[zip, None] = None, memory_cache_limit_bytes=50 * 1024 * 1024, disk_cache_limit_bytes=200 * 1024 * 1024, memory_cache_limit_per_file_bytes=5 * 1024 * 1024)¶
Class for managing file storage in local filesystem or S3 bucket
Initialization
- archive_context(*extensions: str, delete_if_exists=False)¶
Activate an archive context for all store_…() calls for files ending with any of the provided extensions (or all files if not provided). Only one context may be active at a time.
- _get_zip_write_context(storage_abs_path: str) Optional[ovo.core.storage.ZipWriteContext]¶
Return a ZipWriteContext to write to, else None. Raises RuntimeError if called from a different process.
- bulk_read_context()¶
Context manager for speeding up reading files from zip and other archives
Each zip file is opened only once and reused for all reads within the context, and closed at the end of the context.
- _get_zip_read_file(zip_abs_path: str)¶
Return a ZipFile for reading, given its absolute path.
If bulk_read_context is active, reuse the same ZipFile for the same zip_abs_path, otherwise open and close it for each read.
- _clear_temp_dir()¶
- _cache_exists(file_path)¶
- _cache_read(file_path)¶
- _cache_store(file_path, content: str | bytes | None)¶
- clear_cache(storage_path: str)¶
Clear the cache for a specific file path
- static parse_path(path: str) tuple[str, str, str]¶
- static _parse_zip_path(abs_path: str) tuple[str, str]¶
- list_dir(abs_path: str, only_dir=False, recursive=False) list[str]¶
List all files in the directory, return list of paths relative to provided path
- Parameters:
abs_path – path to the source directory
only_dir – if True, only list directories, if False, list all files
recursive – if True, list all files in the directory recursively (relative to provided path)
- Returns:
list of files in the directory
- file_exists(storage_path) bool¶
Search for the file in the local filesystem or in the S3 bucket
- Parameters:
storage_path – relative (within storage root) or absolute path to the source file
- Returns:
True if the file exists, False otherwise
- read_file_bytes(storage_path, cache_store: bool = True) bytes¶
- read_file_str(storage_path: str, cache_store: bool = True) str¶
Read the file from the source filesystem or from the source S3 bucket
- read_file_pickle(storage_path: str)¶
Read the file from the source filesystem or from the source S3 bucket and parse it as a pickle.
- resolve_path(storage_path: str) str¶
Resolve the storage path to an absolute path in the local filesystem or S3 bucket.
- Parameters:
storage_path – relative (within storage root) or absolute path to the source file
- Returns:
absolute path to the file in the local filesystem or S3 bucket
- _basename(path: str) str¶
Get the base name (filename) from a path, supporting both regular paths and zip paths
- store_file_path(source_abs_path: str, storage_rel_path: str, overwrite: bool = True) str¶
Store the file in the local filesystem or in the S3 bucket
- Parameters:
source_abs_path – abs path (or s3 URI) to the source file
storage_rel_path – path to the storage file
overwrite – if True, overwrite the file if it exists
- Returns:
relative path to the stored file
- store_file_bytes(file_bytes: bytes, storage_rel_path: str, overwrite: bool = True) str¶
Store the file string in the source filesystem or source S3 bucket
- Parameters:
file_bytes – file bytes to store
storage_rel_path – relative path to the storage file
overwrite – if True, overwrite the file if it exists
- Returns:
relative path to the stored file
- store_file_str(file_str: str, storage_rel_path: str, overwrite: bool = True) str¶
Store the file string in the source filesystem or source S3 bucket
- Parameters:
file_str – file string to store
storage_rel_path – relative path to the storage file
overwrite – if True, overwrite the file if it exists
- get_project_path(project_id: str, pool_id: str = None, input_bytes: bytes = None, absolute: bool = False) str¶
Get the storage path for a project or a specific section within a project.
- store_input(project_id: str, file_path: str = None, file_bytes: bytes = None, file_str: str = None, filename: str = None) str¶
Store the file string in the source filesystem or source S3 bucket
- Parameters:
project_id – project ID
file_path – path to the source file (when loading from path)
file_bytes – file bytes to store (when loading from bytes)
file_str – file string to store (when loading from string)
filename – filename to use when storing the file (required when loading from bytes or string)
- create_zip(storage_paths_by_dir: dict[str, list[str]] | list[str]) bytes¶
Create zip file
- Parameters:
storage_paths_by_dir – dictionary with the storage paths by directory (”” or None for root) or flat list of storage paths (will be stored in the root of the zip)
- Returns:
zip content with the stored files
- sync_file(storage_path: str, local_destination_path: str, link: bool = False)¶
Download/copy stored file to local directory
- Parameters:
storage_path – path to the stored file
local_destination_path – path to the destination file
link – if True, and if supported, create a symbolic link to the file instead of copying it
- sync_files(storage_paths: list[str], local_destination_dir: str, preserve_subdirs=False, skip_outside_root=False, copy_full_archives=False)¶
Download/copy list of stored files to local directory
- Parameters:
storage_paths – list of files to be downloaded to destination directory
local_destination_dir – path to destination directory (will be created if not exists)
preserve_subdirs – if True (or if recursive=True), preserve subdirectories in the local destination directory
skip_outside_root – if True, skip files found outside of storage root instead of raising an error (only applies when preserve_subdirs=True)
copy_full_archives – if True, when a file is found to be within a zip archive, copy the entire archive to the destination directory instead of extracting just the file (only applies when preserve_subdirs=True)
- is_local_path(path)¶
- sync_directory(source_dir: str, local_destination_dir: str, link=False)¶
Download/copy directory to local directory
- Parameters:
source_dir – source directory, path should be absolute or relative to storage root
local_destination_dir – path to destination directory (will be created if not exists)
link – Use symbolic links instead of copying files when possible.
- prepare_workflow_input(storage_path: str, workdir: str, input_bytes: bytes = None, name: str = None, custom_hash: str = None) str¶
Prepare workflow input files for submission by storing them in the provided scheduler workdir under unique hashes based on each file’s contents.
- Parameters:
storage_path – path to the stored file
workdir – destination (working directory of the scheduler)
input_bytes – optional - do not read the storage_path, instead use the provided bytes
name – optional - rename the file to provided name (without extension, original extension will be appended)
custom_hash – use custom hash for the subdirectory name instead of using a generated hash of file contents
- prepare_workflow_inputs(storage_paths: list[str], workdir: str, names: List[str] = None, return_paths=False, single_directory=False) str | list[str]¶
Prepare workflow input files for submission by storing them in the provided scheduler workdir under unique hashes based on each file’s contents.
Each file will be stored in a separate subdirectory based on a hash of the file contents. This is useful when the same file can be submitted multiple times in different sets of files but should only be stored once.
When return_paths=False (default), returns a txt file with the final file paths (or directly the file path in case of single input file). When return_paths=True, returns list of file paths. When single_directory=True, creates single directory and returns its path. Requires filenames to be unique.
- Parameters:
storage_paths – list of files to be downloaded to destination directory
workdir – destination (working directory of the scheduler)
names – rename each file to provided name (list of names of same length as storage_paths, without file extension, original extension will be added)
- remove(storage_path: str)¶
Remove the file from storage
- class ovo.core.storage.ZipWriteContext(*extensions: str, delete_if_exists: bool = False)¶
Initialization
- close()¶
- get_zip_file(storage_abs_path: str)¶
Return a ZipFile for writing to the provided storage path.
Will keep the file open until the whole context is closed, allowing multiple files to be written to the same zip file without reopening it.
Given a storage path (e.g. “/path/to/storage/dir/file.txt”), return a ZipFile object for writing to “ovo/storage/dir.zip”
Raises RuntimeError if called from a different process (please use a thread pool for parallelism).
- accepts_path(storage_abs_path: str) bool¶
- class ovo.core.storage.ZipReadContext¶
Initialization
- close()¶
- get_zip_file(zip_abs_path: str)¶
Return a ZipFile for reading from the provided zip file path.
Will keep the file open until the whole context is closed, allowing multiple files to be read from the same zip file without reopening it.
Raises RuntimeError if called from a different process (please use a thread pool for parallelism).
- exception ovo.core.storage.ZipIsBeingWrittenError¶
Bases:
Exception
- exception ovo.core.storage.FileOutsideStorageRootError¶
Bases:
Exception