Source code for pain001.db.load_db_data

# Copyright (C) 2023-2026 Sebastien Rousseau.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# CodeQL: This module uses parameterized queries where possible.
# For table names (which cannot be parameterized), we use strict allowlist validation
# via enable_sanitize_table_name() to prevent SQL injection (CWE-89).

import os
import re
import sqlite3
from typing import Any

from pain001.exceptions import ConfigurationError
from pain001.security import validate_path


[docs] def sanitize_table_name(table_name: str) -> str: """ Validate and sanitize a table name to prevent SQL injection. Uses strict validation: only alphanumeric characters and underscores allowed. MUST start with a letter (SQL identifier rules). Args: table_name (str): The table name to validate. Returns: str: The validated table name (unchanged if valid). Raises: ConfigurationError: If the table name is empty or contains invalid characters. """ if not table_name: raise ConfigurationError("Table name cannot be empty") # Strict validation: only alphanumeric and underscore, must start with letter if not re.match(r"^[a-zA-Z][a-zA-Z0-9_]*$", table_name): raise ConfigurationError( f"Invalid table name '{table_name}'. " "Table names must start with a letter and contain only " "alphanumeric characters and underscores." ) return table_name
[docs] def load_db_data(data_file_path: str, table_name: str) -> list[dict[str, Any]]: """ Load data from an SQLite database table into a list of dictionaries. Args: data_file_path (str): The path to the SQLite database file. table_name (str): The name of the table from which data will be loaded. Returns: list: A list of dictionaries where each dictionary represents a row of data. The keys in each dictionary correspond to the column names, and the values are the column values for that row. Raises: FileNotFoundError: If the SQLite file specified by data_file_path does not exist. sqlite3.OperationalError: If there is an issue with SQLite database operations. Example: data = load_db_data("my_database.db", "my_table") """ # Validate path to prevent traversal attacks try: # must_exist=True ensures both validation and existence check safe_path = validate_path( data_file_path, must_exist=True, ) # nosec B108 - Returns sanitized string except Exception as e: raise FileNotFoundError( f"SQLite file path validation failed: {data_file_path}" ) from e # Check file existence using os.path for string path if not os.path.isfile(safe_path): raise FileNotFoundError( f"SQLite file '{data_file_path}' does not exist." ) # Connect to the SQLite database (now safe after validation) conn = sqlite3.connect(str(safe_path)) # nosec B108 try: cursor = conn.cursor() # Validate the table_name before using it in the query (strict regex validation) table_name = sanitize_table_name(table_name) # Fetch column names from the table # Safe: table_name validated via regex to contain only [a-zA-Z0-9_] cursor.execute(f"PRAGMA table_info({table_name})") # nosec B608 columns = [column[1] for column in cursor.fetchall()] # Use parameterised query to prevent SQL injection # Note: SQLite does not support ? placeholders for table names. # sanitize_table_name() enforces strict validation: ^[a-zA-Z][a-zA-Z0-9_]*$ query = f"SELECT * FROM [{table_name}]" # nosec B608 # CodeQL: py/sql-injection (Sanitized) cursor.execute(query) rows = cursor.fetchall() # Create a list of dictionaries with column names as keys data = [] for row in rows: row_dict = {} for i, value in enumerate(row): row_dict[columns[i]] = value data.append(row_dict) return data finally: # Close the connection to the SQLite database conn.close()