Database handler abstractions for MADSci.
This package provides ABC interfaces and implementations for database access, allowing managers to use fast in-memory mocks in tests while using real database connections in production.
Handler types:
DocumentStorageHandler: MongoDB-compatible document database access (PyDocumentStorageHandler, InMemoryDocumentStorageHandler)
CacheHandler: Redis/Valkey-compatible cache access (PyCacheHandler, InMemoryCacheHandler)
PostgresHandler: PostgreSQL/SQLite via SQLAlchemy (SQLAlchemyHandler, SQLiteHandler)
ObjectStorageHandler: S3-compatible object storage access (RealObjectStorageHandler, InMemoryObjectStorageHandler)
Sub-modules¶
madsci.common.db_handlers.cache_handler
madsci.common.db_handlers.document_storage_handler
madsci.common.db_handlers.object_storage_handler
madsci.common.db_handlers.postgres_handler
Classes¶
CacheHandler()Abstract interface for cache access (Redis/Valkey compatible).
Managers use this interface instead of directly depending on
redis.Redisandpotterydata structures, enabling in-memory substitution for tests.Ancestors (in MRO)¶
Descendants¶
madsci.common.db_handlers.cache_handler.InMemoryCacheHandler
madsci.common.db_handlers.cache_handler.PyCacheHandler
Methods¶
close(self) ‑> NoneRelease cache connections and resources.
create_dict(self, key: str) ‑> <class 'collections.abc.MutableMapping'>Create a dict-like object backed by the cache.
Returns an object supporting
__getitem__,__setitem__,__delitem__,__contains__,__iter__,__len__,get,items,update,clear,to_dict.create_list(self, key: str) ‑> AnyCreate a list-like object backed by the cache.
Returns an object supporting
append,remove,__iter__,__len__,__contains__.create_lock(self, key: str, auto_release_time: int = 60) ‑> ContextManagerCreate a distributed lock.
Returns an object supporting context manager protocol (
__enter__/__exit__) andacquire/release.Args: key: Lock identifier. auto_release_time: Seconds before auto-release.
get(self, key: str) ‑> str | NoneReturn the value for key, or
Noneif missing.incr(self, key: str, amount: int = 1) ‑> intIncrement a key by amount and return the new value.
ping(self) ‑> boolCheck connectivity to the cache server.
Returns: True if the cache server is reachable, False otherwise.
set(self, key: str, value: Any) ‑> NoneSet key to value.
DocumentStorageHandler()Abstract interface for MongoDB-compatible document database access.
Managers use this interface instead of directly depending on
pymongo.Database, enabling in-memory substitution for tests.Ancestors (in MRO)¶
Descendants¶
madsci.common.db_handlers.document_storage_handler.InMemoryDocumentStorageHandler
madsci.common.db_handlers.document_storage_handler.PyDocumentStorageHandler
Methods¶
close(self) ‑> NoneRelease database connections and resources.
command(self, cmd: str, **kwargs: Any) ‑> dict[str, typing.Any]Execute a database command (e.g.
ping).get_collection(self, name: str) ‑> Union[Collection, Any]Return a collection-like object for the given name.
The returned object supports the pymongo Collection interface: insert_one, find_one, find, update_one, update_many, delete_one, delete_many, count_documents, create_index, drop_index, index_information.
list_collection_names(self) ‑> list[str]Return the names of all collections in the database.
ping(self) ‑> boolCheck connectivity to the database.
Returns: True if the database is reachable, False otherwise.
InMemoryCacheHandler(client: Optional[Any] = None)Cache handler backed by in-memory data structures for testing.
Usage::
handler = InMemoryCacheHandler() d = handler.create_dict("my:key") d["foo"] = "bar"Initialize with an optional InMemoryRedisClient.
If no client is provided, a new one is created and all registries are cleared for test isolation.
Args: client: An existing
InMemoryRedisClientinstance.Ancestors (in MRO)¶
madsci.common.db_handlers.cache_handler.CacheHandler
Methods¶
close(self) ‑> None- No-op for in-memory cache.
create_dict(self, key: str) ‑> Any- Create an InMemoryRedisDict.
create_list(self, key: str) ‑> Any- Create an InMemoryRedisList.
create_lock(self, key: str, auto_release_time: int = 60) ‑> Any- Create an InMemoryRedlock.
get(self, key: str) ‑> str | None- Get a value from the in-memory store.
incr(self, key: str, amount: int = 1) ‑> int- Increment a key in the in-memory store.
ping(self) ‑> bool- Always returns True for in-memory cache.
set(self, key: str, value: Any) ‑> None- Set a value in the in-memory store.
InMemoryDocumentStorageHandler(database: Optional[Any] = None, database_name: str = 'test')Document storage handler backed by in-memory collections for testing.
Usage::
handler = InMemoryDocumentStorageHandler() collection = handler.get_collection("events") collection.insert_one({"key": "value"})Initialize with an optional InMemoryDatabase.
Args: database: An existing
InMemoryDatabaseinstance. If not provided, a newInMemoryMongoClientis created. database_name: Name for the database (used when creating a new client).Ancestors (in MRO)¶
madsci.common.db_handlers.document_storage_handler.DocumentStorageHandler
Methods¶
close(self) ‑> None- No-op for in-memory databases.
command(self, cmd: str, **kwargs: Any) ‑> dict[str, typing.Any]- Execute a database command (only
pingis supported). get_collection(self, name: str) ‑> Any- Return an InMemoryCollection.
list_collection_names(self) ‑> list[str]- Return collection names from the in-memory database.
ping(self) ‑> bool- Always returns True for in-memory databases.
InMemoryObjectStorageHandler()Object storage handler backed by in-memory storage for testing.
Stores files as bytes in a dictionary keyed by
(bucket, object_name).Usage::
handler = InMemoryObjectStorageHandler() handler.make_bucket("test-bucket") handler.upload_file("test-bucket", "data.csv", "/path/to/data.csv") data = handler.get_object_data("test-bucket", "data.csv")Initialize with empty in-memory storage.
Ancestors (in MRO)¶
madsci.common.db_handlers.object_storage_handler.ObjectStorageHandler
Methods¶
bucket_exists(self, bucket: str) ‑> bool- Check if a bucket exists in in-memory storage.
close(self) ‑> None- No-op for in-memory storage.
download_file(self, bucket: str, object_name: str, output_path: Union[str, Path]) ‑> None- Download an object from in-memory storage to a local file.
get_object_data(self, bucket: str, object_name: str) ‑> bytes- Get object contents from in-memory storage.
make_bucket(self, bucket: str) ‑> None- Create a bucket in in-memory storage.
ping(self) ‑> bool- Always returns True for in-memory storage.
upload_file(self, bucket: str, object_name: str, file_path: Union[str, Path], content_type: Optional[str] = None, metadata: Optional[dict[str, str]] = None) ‑> dict[str, typing.Any]- Upload a file to in-memory storage.
ObjectStorageHandler()Abstract interface for S3-compatible object storage access.
Managers use this interface instead of directly depending on
minio.Minio, enabling in-memory substitution for tests.Ancestors (in MRO)¶
Descendants¶
madsci.common.db_handlers.object_storage_handler.InMemoryObjectStorageHandler
madsci.common.db_handlers.object_storage_handler.RealObjectStorageHandler
Methods¶
bucket_exists(self, bucket: str) ‑> boolCheck if a bucket exists.
Args: bucket: Bucket name.
Returns: True if the bucket exists.
close(self) ‑> NoneRelease any connections or resources.
download_file(self, bucket: str, object_name: str, output_path: Union[str, Path]) ‑> NoneDownload an object to a local file.
Args: bucket: Bucket name. object_name: Name/key of the object. output_path: Local path to write the file to.
ensure_bucket(self, bucket: str) ‑> NoneEnsure a bucket exists, creating it if necessary.
get_object_data(self, bucket: str, object_name: str) ‑> bytesGet object contents as bytes.
Args: bucket: Bucket name. object_name: Name/key of the object.
Returns: The object contents as bytes.
list_buckets(self) ‑> list[str]Return names of all buckets.
make_bucket(self, bucket: str) ‑> NoneCreate a bucket.
Args: bucket: Bucket name to create.
ping(self) ‑> boolCheck connectivity to the object storage service.
Returns: True if the service is reachable, False otherwise.
upload_file(self, bucket: str, object_name: str, file_path: Union[str, Path], content_type: Optional[str] = None, metadata: Optional[dict[str, str]] = None) ‑> dict[str, typing.Any]Upload a file to object storage.
Args: bucket: Bucket name. object_name: Name/key for the object. file_path: Local file path to upload. content_type: MIME type (auto-detected if not provided). metadata: Additional metadata to attach.
Returns: Dictionary with storage information including bucket_name, object_name, etag, size_bytes, and content_type.
PostgresHandler()Abstract interface for relational database access via SQLAlchemy.
Managers use this interface instead of directly depending on
sqlalchemy.Engine, enabling SQLite substitution for tests.Ancestors (in MRO)¶
Descendants¶
madsci.common.db_handlers.postgres_handler.SQLAlchemyHandler
madsci.common.db_handlers.postgres_handler.SQLiteHandler
Methods¶
close(self) ‑> NoneRelease database connections and resources.
create_all_tables(self, metadata: Union[MetaData, Any]) ‑> NoneCreate all tables defined in the given metadata.
Args: metadata: A
sqlalchemy.MetaDataorsqlmodel.SQLModel.metadataobject.get_engine(self) ‑> Union[Engine, Any]Return the SQLAlchemy Engine instance.
ping(self) ‑> boolCheck connectivity to the database.
Returns: True if the database is reachable, False otherwise.
PyCacheHandler(client: Any)Cache handler backed by a real Redis/Valkey server.
Uses
redis.Redisfor basic operations andpotteryfor RedisDict, RedisList, and Redlock data structures.Usage::
handler = PyCacheHandler.from_settings(host="localhost", port=6379) d = handler.create_dict("my:key") d["foo"] = "bar"Initialize with an existing redis.Redis client.
Args: client: A
redis.Redisinstance.Ancestors (in MRO)¶
madsci.common.db_handlers.cache_handler.CacheHandler
Static methods¶
from_settings(host: str = 'localhost', port: int = 6379, password: Optional[str] = None) ‑> madsci.common.db_handlers.cache_handler.PyCacheHandlerCreate a handler by connecting to a cache server.
Args: host: Cache server hostname. port: Cache server port. password: Optional cache server password.
Returns: A new PyCacheHandler instance.
Methods¶
close(self) ‑> None- Close the cache connection.
create_dict(self, key: str) ‑> Any- Create a pottery RedisDict.
create_list(self, key: str) ‑> Any- Create a pottery RedisList.
create_lock(self, key: str, auto_release_time: int = 60) ‑> Any- Create a pottery Redlock.
get(self, key: str) ‑> str | None- Get a value from the cache.
incr(self, key: str, amount: int = 1) ‑> int- Increment a key in the cache.
ping(self) ‑> bool- Ping the cache server.
set(self, key: str, value: Any) ‑> None- Set a value in the cache.
PyDocumentStorageHandler(database: Any)Document storage handler backed by a real pymongo connection (MongoDB, FerretDB, etc.).
Usage::
handler = PyDocumentStorageHandler.from_url("mongodb://localhost:27017", "my_db") collection = handler.get_collection("events") collection.insert_one({"key": "value"})Initialize with an existing pymongo Database object.
Args: database: A
pymongo.database.Databaseinstance.Ancestors (in MRO)¶
madsci.common.db_handlers.document_storage_handler.DocumentStorageHandler
Static methods¶
from_url(url: str, database_name: str) ‑> madsci.common.db_handlers.document_storage_handler.PyDocumentStorageHandlerCreate a handler by connecting to a MongoDB-compatible server.
Args: url: MongoDB-compatible connection URL. database_name: Name of the database to use.
Returns: A new PyDocumentStorageHandler instance.
Methods¶
close(self) ‑> None- Close the underlying MongoClient.
command(self, cmd: str, **kwargs: Any) ‑> dict[str, typing.Any]- Execute a database command.
get_collection(self, name: str) ‑> Any- Return a pymongo Collection.
list_collection_names(self) ‑> list[str]- Return collection names from the database.
ping(self) ‑> bool- Ping the document database server.
RealObjectStorageHandler(client: Any)Object storage handler backed by a real S3-compatible server (MinIO, SeaweedFS, etc.).
Usage::
handler = RealObjectStorageHandler.from_settings(object_storage_settings) handler.upload_file("my-bucket", "data.csv", "/path/to/data.csv")Initialize with an existing Minio client.
Args: client: A
minio.Minioinstance.Ancestors (in MRO)¶
madsci.common.db_handlers.object_storage_handler.ObjectStorageHandler
Static methods¶
from_settings(settings: Any) ‑> madsci.common.db_handlers.object_storage_handler.RealObjectStorageHandlerCreate a handler from ObjectStorageSettings.
Args: settings: An
ObjectStorageSettingsinstance with endpoint, access_key, secret_key, secure, and region fields.Returns: A new RealObjectStorageHandler instance.
Methods¶
bucket_exists(self, bucket: str) ‑> bool- Check if a bucket exists in object storage.
close(self) ‑> None- No-op for S3 client (uses HTTP, no persistent connection).
download_file(self, bucket: str, object_name: str, output_path: Union[str, Path]) ‑> None- Download an object from S3-compatible object storage.
get_object_data(self, bucket: str, object_name: str) ‑> bytes- Get object contents from S3-compatible object storage.
make_bucket(self, bucket: str) ‑> None- Create a bucket in object storage.
ping(self) ‑> bool- Check connectivity by listing buckets.
upload_file(self, bucket: str, object_name: str, file_path: Union[str, Path], content_type: Optional[str] = None, metadata: Optional[dict[str, str]] = None) ‑> dict[str, typing.Any]- Upload a file to S3-compatible object storage.
SQLAlchemyHandler(engine: Any)PostgreSQL handler backed by a real SQLAlchemy engine.
Usage::
handler = SQLAlchemyHandler.from_url("postgresql://localhost/mydb") engine = handler.get_engine()Initialize with an existing SQLAlchemy Engine.
Args: engine: A
sqlalchemy.Engineinstance.Ancestors (in MRO)¶
madsci.common.db_handlers.postgres_handler.PostgresHandler
Static methods¶
from_url(url: str, pool_size: int = 20, pool_pre_ping: bool = True) ‑> madsci.common.db_handlers.postgres_handler.SQLAlchemyHandlerCreate a handler by connecting to a PostgreSQL database.
Args: url: SQLAlchemy database URL. pool_size: Connection pool size. pool_pre_ping: Whether to verify connections before use.
Returns: A new SQLAlchemyHandler instance.
Methods¶
close(self) ‑> None- Dispose of the engine’s connection pool.
create_all_tables(self, metadata: Any) ‑> None- Create all tables using the engine.
get_engine(self) ‑> Any- Return the SQLAlchemy Engine.
ping(self) ‑> bool- Execute
SELECT 1to verify connectivity.
SQLiteHandler(url: Optional[str] = None)SQLite handler for testing, using SQLAlchemy with a SQLite backend.
Defaults to an in-memory database (
sqlite:///:memory:) but can also use a file-based SQLite database. In-memory databases useStaticPoolso the same connection is shared across threads (required forTestClientand similar multi-threaded test harnesses).Usage::
handler = SQLiteHandler() # in-memory handler = SQLiteHandler("sqlite:///path/to/db.sqlite") # file-basedInitialize with an optional SQLite URL.
Args: url: SQLite connection URL. Defaults to
sqlite:///:memory:.Ancestors (in MRO)¶
madsci.common.db_handlers.postgres_handler.PostgresHandler
Methods¶
close(self) ‑> NoneDispose of the engine’s connection pool.
create_all_tables(self, metadata: Any) ‑> NoneCreate compatible tables using the SQLite engine.
When a table uses PostgreSQL-specific features (e.g.
AUTOINCREMENTon a composite primary key), the handler builds SQLite-compatible DDL where the autoincrement column becomes the soleINTEGER PRIMARY KEY AUTOINCREMENTand other PK columns become regularNOT NULLcolumns.get_engine(self) ‑> AnyReturn the SQLAlchemy Engine.
ping(self) ‑> boolExecute
SELECT 1to verify connectivity.