Source code for pain001.data.loader

# Copyright (C) 2023-2026 Sebastien Rousseau.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Universal data loader supporting multiple input sources."""

from collections.abc import Generator
from typing import Any, Union

# pylint: disable=duplicate-code
from pain001.csv.load_csv_data import load_csv_data, load_csv_data_streaming
from pain001.csv.validate_csv_data import validate_csv_data
from pain001.db.load_db_data import load_db_data
from pain001.db.load_db_data_streaming import load_db_data_streaming
from pain001.db.validate_db_data import validate_db_data
from pain001.exceptions import DataSourceError, PaymentValidationError
from pain001.json.load_json_data import (
    load_json_data,
    load_json_data_streaming,
    load_jsonl_data,
    load_jsonl_data_streaming,
)
from pain001.parquet.load_parquet_data import (
    load_parquet_data,
    load_parquet_data_streaming,
)


def _get_file_loaders() -> dict[str, tuple[Any, Any, str]]:
    """Build dispatch table at call time so mocks are respected."""
    return {
        ".csv": (load_csv_data, validate_csv_data, "CSV"),
        ".db": (
            lambda p: load_db_data(p, table_name="pain001"),
            validate_db_data,
            "Database",
        ),
        ".json": (load_json_data, validate_csv_data, "JSON"),
        ".jsonl": (load_jsonl_data, validate_csv_data, "JSONL"),
        ".parquet": (load_parquet_data, validate_csv_data, "Parquet"),
    }


def _get_file_stream_loaders() -> dict[str, tuple[Any, Any, str]]:
    """Build streaming dispatch table at call time so mocks are respected."""
    return {
        ".csv": (load_csv_data_streaming, validate_csv_data, "CSV"),
        ".db": (
            lambda p, cs: load_db_data_streaming(p, "pain001", cs),
            validate_db_data,
            "Database",
        ),
        ".json": (load_json_data_streaming, validate_csv_data, "JSON"),
        ".jsonl": (load_jsonl_data_streaming, validate_csv_data, "JSONL"),
        ".parquet": (
            load_parquet_data_streaming,
            validate_csv_data,
            "Parquet",
        ),
    }


[docs] def load_payment_data( data_source: Union[str, list[dict[str, Any]], dict[str, Any]], ) -> list[dict[str, Any]]: """ Universal data loader supporting multiple input sources. This function provides a unified interface for loading payment data from various sources while maintaining backward compatibility with existing file-based workflows. Args: data_source: The payment data source. Supports: - str: File path to CSV (.csv), SQLite (.db), JSON (.json/.jsonl), or Parquet (.parquet) file - list: List of dictionaries with payment data - dict: Single payment transaction as dictionary Returns: List[Dict[str, Any]]: List of payment data dictionaries Raises: ValueError: If data source type is unsupported or data is invalid FileNotFoundError: If file path doesn't exist Examples: # Existing file-based usage (backward compatible) >>> data = load_payment_data('payments.csv') >>> data = load_payment_data('payments.db') # New JSON formats >>> data = load_payment_data('payments.json') >>> data = load_payment_data('payments.jsonl') # JSON Lines # New Parquet format (requires pyarrow) >>> data = load_payment_data('payments.parquet') # New direct Python data usage >>> data = load_payment_data([ ... {'id': 'MSG001', 'amount': '1000.00', ...}, ... {'id': 'MSG002', 'amount': '500.00', ...} ... ]) # Single transaction >>> data = load_payment_data({ ... 'id': 'MSG001', 'amount': '1000.00', ... ... }) """ # pylint: disable=fixme # TODO: add streaming/chunked loaders for large CSV/DB sources to reduce memory usage. # Handle file path (existing behaviour - backward compatible) if isinstance(data_source, str): return _load_from_file(data_source) # Handle Python dict/list (new feature) elif isinstance(data_source, list): return _load_from_list(data_source) elif isinstance(data_source, dict): return _load_from_dict(data_source) else: raise DataSourceError( f"Unsupported data source type: {type(data_source).__name__}. " f"Expected str (file path), list, or dict." )
def _load_from_file(file_path: str) -> list[dict[str, Any]]: """ Load data from file (CSV, SQLite, JSON, or Parquet). This preserves the existing behaviour for backward compatibility. and adds support for JSON and Parquet formats. """ import os # First, check if file extension is supported (for better error messages) supported_extensions = [".csv", ".db", ".json", ".jsonl", ".parquet"] if not any(file_path.endswith(ext) for ext in supported_extensions): raise DataSourceError( f"Unsupported file type: {file_path}. " f"Expected .csv, .db, .json, .jsonl, or .parquet file." ) # CodeQL: Prevent path traversal by anchoring to current working directory try: from pain001.security import validate_path base_dir = os.getcwd() safe_path = validate_path( file_path, must_exist=True, base_dir=base_dir ) except ( Exception ) as e: # Catch PathValidationError, SecurityError, FileNotFoundError raise FileNotFoundError( f"Data file validation failed: {file_path}\nError: {e}" ) from e # Use safe_path for all subsequent operations ext = os.path.splitext(safe_path)[1] entry = _get_file_loaders().get(ext) if entry is None: raise DataSourceError( f"Unsupported file type: {file_path}. " f"Expected .csv, .db, .json, .jsonl, or .parquet file." ) loader_fn, validator_fn, format_name = entry data = loader_fn(safe_path) if not validator_fn(data): raise PaymentValidationError( f"{format_name} data validation failed for {file_path}" ) return data def _load_from_list(data_list: list[dict[str, Any]]) -> list[dict[str, Any]]: """ Load data from Python list of dictionaries. New feature for direct Python data input. """ if not data_list: raise DataSourceError("Empty data list provided.") if not all(isinstance(item, dict) for item in data_list): raise PaymentValidationError( "All items in data list must be dictionaries. " f"Found: {[type(item).__name__ for item in data_list if not isinstance(item, dict)]}" ) # Mandatory validation for data integrity if not validate_csv_data(data_list): raise PaymentValidationError("Data list validation failed") return data_list def _load_from_dict(data_dict: dict[str, Any]) -> list[dict[str, Any]]: """ Load data from a single Python dictionary. New feature for single transaction input. """ if not data_dict: raise DataSourceError("Empty data dictionary provided.") # Wrap single dict in list and validate data_list = [data_dict] if not validate_csv_data(data_list): raise PaymentValidationError("Data dictionary validation failed") return data_list
[docs] def load_payment_data_streaming( data_source: Union[str, list[dict[str, Any]]], chunk_size: int = 1000, validate: bool = True, ) -> Generator[list[dict[str, Any]], None, None]: """ Memory-efficient streaming loader supporting multiple input sources. This function yields chunks of payment data instead of loading everything into memory, making it suitable for large datasets (millions of rows). Args: data_source: The payment data source. Supports: - str: File path to CSV (.csv) or SQLite (.db) file - list: List of dictionaries with payment data chunk_size: Number of records to yield per chunk. Default is 1000. validate: If True, validate each chunk. Default True. Set False for testing or when data is pre-validated. Yields: List[Dict[str, Any]]: Chunks of payment data dictionaries Raises: ValueError: If data source type is unsupported or data is invalid FileNotFoundError: If file path doesn't exist DataSourceError: If data source is empty or invalid Examples: # Streaming from large CSV file >>> for chunk in load_payment_data_streaming('large_payments.csv', chunk_size=500): ... process_batch(chunk) # Streaming from large SQLite database >>> for chunk in load_payment_data_streaming('payments.db', chunk_size=1000): ... generate_xml_batch(chunk) # Streaming from large Python list (useful for APIs) >>> large_data = [{'id': f'TX{i}', ...} for i in range(100000)] >>> for chunk in load_payment_data_streaming(large_data, chunk_size=500): ... validate_and_process(chunk) Performance: - Memory usage: O(chunk_size) instead of O(total_records) - Enables processing datasets larger than available RAM - ~10-15% slower than load_payment_data() due to yielding overhead - Best for files/datasets with 10,000+ records Note: Single dict input not supported in streaming mode. Convert to list first. """ # Handle file path (CSV or SQLite) if isinstance(data_source, str): yield from _load_from_file_streaming(data_source, chunk_size, validate) # Handle Python list elif isinstance(data_source, list): yield from _load_from_list_streaming(data_source, chunk_size, validate) else: raise DataSourceError( f"Unsupported data source type for streaming: {type(data_source).__name__}. " f"Expected str (file path) or list. " f"For single dict, wrap in list: [your_dict]" )
def _load_from_file_streaming( file_path: str, chunk_size: int, validate: bool = True ) -> Generator[list[dict[str, Any]], None, None]: """ Stream data from file (CSV, SQLite, JSON, or Parquet) in chunks. Memory-efficient for large files. """ import os ext = os.path.splitext(file_path)[1] entry = _get_file_stream_loaders().get(ext) if entry is None: raise DataSourceError( f"Unsupported file type: {file_path}. " f"Expected .csv, .db, .json, .jsonl, or .parquet file." ) stream_loader_fn, validator_fn, format_name = entry for chunk in stream_loader_fn(file_path, chunk_size): if validate and not validator_fn(chunk): raise PaymentValidationError( f"{format_name} data validation failed for chunk in {file_path}" ) yield chunk def _load_from_list_streaming( data_list: list[dict[str, Any]], chunk_size: int, validate: bool = True ) -> Generator[list[dict[str, Any]], None, None]: """ Stream data from Python list in chunks. Useful for API inputs or in-memory data processing. """ if not data_list: raise DataSourceError("Empty data list provided.") if not all(isinstance(item, dict) for item in data_list): raise PaymentValidationError( "All items in data list must be dictionaries. " f"Found: {[type(item).__name__ for item in data_list if not isinstance(item, dict)]}" ) # Yield data in chunks for i in range(0, len(data_list), chunk_size): chunk = data_list[i : i + chunk_size] if validate and not validate_csv_data(chunk): raise PaymentValidationError( f"Data validation failed for chunk starting at index {i}" ) yield chunk