# Copyright (C) 2023-2026 Sebastien Rousseau.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Validate the CSV data before processing it. The CSV data must contain
# the following columns:
#
# - id (int) - unique identifier
# - date (str) - date of the payment
# - nb_of_txs (int) - number of transactions
# - initiator_name (str) - name of the initiator
# - payment_information_id (str) - payment information identifier
# - payment_method (str) - payment method
# - batch_booking (bool) - batch booking
# - ctrl_sum (int) - control sum
# - service_level_code (str) - service level code
# - requested_execution_date (str) - requested execution date
# - debtor_name (str) - debtor name
# - debtor_account_IBAN (str) - debtor account IBAN
# - debtor_agent_BIC (str) - debtor agent BIC
# - forwarding_agent_BIC (str) - forwarding agent BIC
# - charge_bearer (str) - charge bearer
# - payment_id (str) - payment identifier
# - payment_amount (str) - payment amount
# - currency (str) - currency
# - creditor_agent_BIC (str) - creditor agent BIC
# - creditor_name (str) - creditor name
# - creditor_account_IBAN (str) - creditor account IBAN
# - remittance_information (str) - remittance information
from datetime import datetime
from typing import Any
def _validate_datetime(value: str) -> bool:
"""Validate datetime field.
Args:
value: The datetime string to validate.
Returns:
bool: True if valid, False otherwise.
"""
# Handle the "Z" suffix for UTC
cleaned_value = value
if value.endswith("Z"):
cleaned_value = value[:-1] + "+00:00"
try:
datetime.fromisoformat(cleaned_value)
return True
except ValueError:
try:
datetime.strptime(cleaned_value, "%Y-%m-%d")
return True
except ValueError:
return False
def _validate_field_type(value: str, data_type: type) -> bool:
"""Validate a single field against its expected type.
Args:
value: The field value to validate.
data_type: The expected data type.
Returns:
bool: True if valid, False otherwise.
"""
try:
if data_type is int:
int(value)
elif data_type is float:
float(value)
elif data_type is bool:
if value.lower() not in ("true", "false"):
return False
elif data_type is datetime:
return _validate_datetime(value)
# str type always passes if not empty
return True
except ValueError:
return False
def _validate_row(
row: dict[str, Any], required_columns: dict[str, type]
) -> tuple[list[str], list[str]]:
"""Validate a single row of CSV data.
Args:
row: A dictionary containing row data.
required_columns: Dictionary of required column names and types.
Returns:
tuple: (missing_columns, invalid_columns)
"""
missing_columns = []
invalid_columns = []
for column, data_type in required_columns.items():
raw_value = row.get(column)
# Single strip operation, cached result
if raw_value is None:
missing_columns.append(column)
continue
value = raw_value.strip()
if not value:
missing_columns.append(column)
continue
# Validate type
if not _validate_field_type(value, data_type):
invalid_columns.append(column)
return missing_columns, invalid_columns
def _format_errors(
row: dict[str, Any],
missing_columns: list[str],
invalid_columns: list[str],
required_columns: dict[str, type],
) -> list[str]:
"""Format error messages for a row.
Args:
row: The row with errors.
missing_columns: List of missing column names.
invalid_columns: List of invalid column names.
required_columns: Dictionary of required column types.
Returns:
list: List of formatted error messages.
"""
errors = []
if missing_columns:
errors.append(
f"Error: Missing value(s) for column(s) {missing_columns} in row: {row}"
)
if invalid_columns:
expected_types = [
required_columns[col].__name__ for col in invalid_columns
]
errors.append(
f"Error: Invalid data type for column(s) "
f"{invalid_columns}, expected {expected_types} in row: {row}"
)
return errors
[docs]
def validate_csv_data(data: list[dict[str, Any]]) -> bool:
"""Validate the CSV data before processing it.
Args:
data (list): A list of dictionaries containing the CSV data.
Returns:
bool: True if the data is valid, False otherwise.
"""
required_columns = {
"id": int,
"date": datetime,
"nb_of_txs": int,
"ctrl_sum": float,
"initiator_name": str,
"payment_information_id": str,
"payment_method": str,
"batch_booking": bool,
"service_level_code": str,
"requested_execution_date": datetime,
"debtor_name": str,
"debtor_account_IBAN": str,
"debtor_agent_BIC": str,
"forwarding_agent_BIC": str,
"charge_bearer": str,
"payment_id": str,
"payment_amount": float,
"currency": str,
"creditor_agent_BIC": str,
"creditor_name": str,
"creditor_account_IBAN": str,
"remittance_information": str,
}
if not data:
print("Error: The CSV data is empty.")
return False
is_valid = True
all_errors = [] # Batch error messages for better performance
for row in data:
missing_columns, invalid_columns = _validate_row(row, required_columns)
if missing_columns or invalid_columns:
is_valid = False
all_errors.extend(
_format_errors(
row, missing_columns, invalid_columns, required_columns
)
)
# Single print operation for all errors
if all_errors:
print("\n".join(all_errors))
return is_valid