Source code for pain001.xml.generate_xml

# Copyright (C) 2023-2026 Sebastien Rousseau.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# XML generator function that creates the XML file from the CSV data
# and the mapping dictionary between XML tags and CSV columns names and
# writes it to a file in the same directory as the CSV file

# pylint: disable=duplicate-code

# Import the CSV library
import os
from typing import Any

from jinja2 import Environment, FileSystemLoader

from pain001.security import validate_path
from pain001.xml.generate_updated_xml_file_path import (
    generate_updated_xml_file_path,
)
from pain001.xml.validate_via_xsd import validate_xml_string_via_xsd


def _prepare_xml_data_v03(data: list[dict[str, Any]]) -> dict[str, Any]:
    """Prepare XML data for pain.001.001.03 message type."""
    return {
        "id": data[0]["id"],
        "date": data[0]["date"],
        "nb_of_txs": data[0]["nb_of_txs"],
        "initiator_name": data[0]["initiator_name"],
        "initiator_street_name": data[0]["initiator_street_name"],
        "initiator_building_number": data[0]["initiator_building_number"],
        "initiator_postal_code": data[0]["initiator_postal_code"],
        "initiator_town_name": data[0]["initiator_town_name"],
        "initiator_country_code": data[0]["initiator_country_code"],
        "payment_id": data[0]["payment_id"],
        "payment_method": data[0]["payment_method"],
        "batch_booking": data[0]["batch_booking"],
        "requested_execution_date": data[0]["requested_execution_date"],
        "debtor_name": data[0]["debtor_name"],
        "debtor_street_name": data[0]["debtor_street_name"],
        "debtor_building_number": data[0]["debtor_building_number"],
        "debtor_postal_code": data[0]["debtor_postal_code"],
        "debtor_town_name": data[0]["debtor_town_name"],
        "debtor_country_code": data[0]["debtor_country_code"],
        "debtor_account_IBAN": data[0]["debtor_account_IBAN"],
        "debtor_agent_BIC": data[0]["debtor_agent_BIC"],
        "charge_bearer": data[0]["charge_bearer"],
        "transactions": [
            {
                "payment_id": row["payment_id"],
                "payment_amount": row.get("payment_amount", ""),
                "payment_currency": row.get("payment_currency", ""),
                "charge_bearer": row["charge_bearer"],
                "creditor_agent_BIC": row["creditor_agent_BIC"],
                "creditor_name": row["creditor_name"],
                "creditor_street_name": row["creditor_street_name"],
                "creditor_building_number": row["creditor_building_number"],
                "creditor_postal_code": row["creditor_postal_code"],
                "creditor_town_name": row["creditor_town_name"],
                "creditor_country_code": row["creditor_country_code"],
                "creditor_account_IBAN": row["creditor_account_IBAN"],
                "purpose_code": row["purpose_code"],
                "reference_number": row["reference_number"],
                "reference_date": row["reference_date"],
            }
            for row in data[0:]
        ],
    }


def _prepare_xml_data_v04(data: list[dict[str, Any]]) -> dict[str, Any]:
    """Prepare XML data for pain.001.001.04 message type."""
    return {
        "id": data[0].get("id", ""),
        "date": data[0].get("date", ""),
        "nb_of_txs": data[0].get("nb_of_txs", ""),
        "initiator_name": data[0].get("initiator_name", ""),
        "initiator_street": data[0].get("initiator_street_name", ""),
        "initiator_building_number": data[0].get(
            "initiator_building_number", ""
        ),
        "initiator_postal_code": data[0].get("initiator_postal_code", ""),
        "initiator_town": data[0].get("initiator_town_name", ""),
        "initiator_country": data[0].get("initiator_country_code", ""),
        "payment_information_id": data[0].get("payment_id", ""),
        "payment_method": data[0].get("payment_method", ""),
        "batch_booking": data[0].get("batch_booking", ""),
        "requested_execution_date": data[0].get(
            "requested_execution_date", ""
        ),
        "debtor_name": data[0].get("debtor_name", ""),
        "debtor_street": data[0].get("debtor_street_name", ""),
        "debtor_building_number": data[0].get("debtor_building_number", ""),
        "debtor_postal_code": data[0].get("debtor_postal_code", ""),
        "debtor_town": data[0].get("debtor_town_name", ""),
        "debtor_country": data[0].get("debtor_country_code", ""),
        "debtor_account_IBAN": data[0].get("debtor_account_IBAN", ""),
        "debtor_agent_BIC": data[0].get("debtor_agent_BIC", ""),
        "debtor_agent_account_IBAN": data[0].get(
            "debtor_agent_account_IBAN", ""
        ),
        "instruction_for_debtor_agent": data[0].get(
            "instruction_for_debtor_agent", ""
        ),
        "charge_bearer": data[0].get("charge_bearer", ""),
        "charge_account_IBAN": data[0].get("charge_account_IBAN", ""),
        "charge_agent_BICFI": data[0].get("charge_agent_BICFI", ""),
        "payment_instruction_id": data[0].get(
            "payment_instruction_id", data[0].get("payment_id", "")
        ),
        "payment_end_to_end_id": data[0].get(
            "payment_end_to_end_id", data[0].get("reference_number", "")
        ),
        "payment_currency": data[0].get("payment_currency", ""),
        "payment_amount": data[0].get("payment_amount", ""),
        "creditor_agent_BIC": data[0].get("creditor_agent_BIC", ""),
        "creditor_name": data[0].get("creditor_name", ""),
        "creditor_street": data[0].get("creditor_street_name", ""),
        "creditor_building_number": data[0].get(
            "creditor_building_number", ""
        ),
        "creditor_postal_code": data[0].get("creditor_postal_code", ""),
        "creditor_town": data[0].get("creditor_town_name", ""),
        "creditor_account_IBAN": data[0].get("creditor_account_IBAN", ""),
        "purpose_code": data[0].get("purpose_code", ""),
        "reference_number": data[0].get("reference_number", ""),
        "reference_date": data[0].get("reference_date", ""),
        "transactions": [
            {
                "payment_instruction_id": row.get("payment_id", ""),
                "payment_end_to_end_id": row.get("reference_number", ""),
                "payment_currency": row.get("payment_currency", "EUR"),
                "payment_amount": row.get("payment_amount", ""),
                "charge_bearer": row.get("charge_bearer", ""),
                "creditor_agent_BIC": row.get("creditor_agent_BIC", ""),
                "creditor_name": row.get("creditor_name", ""),
                "creditor_street": row.get("creditor_street_name", ""),
                "creditor_building_number": row.get(
                    "creditor_building_number", ""
                ),
                "creditor_postal_code": row.get("creditor_postal_code", ""),
                "creditor_town": row.get("creditor_town_name", ""),
                "creditor_account_IBAN": row.get("creditor_account_IBAN", ""),
                "purpose_code": row.get("purpose_code", ""),
                "reference_number": row.get("reference_number", ""),
                "reference_date": row.get("reference_date", ""),
            }
            for row in data[0:]
        ],
    }


def _prepare_xml_data_v05_to_v08(data: list[dict[str, Any]]) -> dict[str, Any]:
    """Prepare XML data for pain.001.001.05-08 message types."""
    return {
        "id": data[0].get("id", ""),
        "date": data[0].get("date", ""),
        "nb_of_txs": data[0].get("nb_of_txs", ""),
        "ctrl_sum": data[0].get("ctrl_sum", ""),
        "initiator_name": data[0].get("initiator_name", ""),
        "initiator_street_name": data[0].get("initiator_street_name", ""),
        "initiator_building_number": data[0].get(
            "initiator_building_number", ""
        ),
        "initiator_postal_code": data[0].get("initiator_postal_code", ""),
        "initiator_town": data[0].get(
            "initiator_town_name", data[0].get("initiator_town", "")
        ),
        "initiator_country": data[0].get(
            "initiator_country_code", data[0].get("initiator_country", "")
        ),
        "ultimate_debtor_name": data[0].get(
            "ultimate_debtor_name", data[0].get("debtor_name", "")
        ),
        "service_level_code": data[0].get("service_level_code", "SEPA"),
        "requested_execution_date": data[0].get(
            "requested_execution_date", ""
        ),
        "payment_information_id": data[0].get("payment_information_id", ""),
        "payment_method": data[0].get("payment_method", "TRF"),
        "batch_booking": data[0].get("batch_booking", "false"),
        "debtor_name": data[0].get("debtor_name", ""),
        "debtor_street": data[0].get("debtor_street_name", ""),
        "debtor_building_number": data[0].get("debtor_building_number", ""),
        "debtor_postal_code": data[0].get("debtor_postal_code", ""),
        "debtor_town": data[0].get("debtor_town_name", ""),
        "debtor_country": data[0].get(
            "debtor_country_code", data[0].get("debtor_country", "")
        ),
        "debtor_account_IBAN": data[0].get("debtor_account_IBAN", ""),
        "debtor_agent_BIC": data[0].get("debtor_agent_BIC", ""),
        "transactions": [
            {
                "payment_id": row.get("payment_id", ""),
                "payment_instruction_id": row.get(
                    "payment_instruction_id", row.get("payment_id", "")
                ),
                "payment_end_to_end_id": row.get(
                    "payment_end_to_end_id", row.get("reference_number", "")
                ),
                "payment_amount": row.get("payment_amount", ""),
                "payment_currency": row.get("payment_currency", ""),
                "charge_bearer": row.get("charge_bearer", "SLEV"),
                "creditor_agent_BIC": row.get(
                    "creditor_agent_BIC", row.get("creditor_agent_BICFI", "")
                ),
                "creditor_name": row.get("creditor_name", ""),
                "creditor_street": row.get("creditor_street_name", ""),
                "creditor_building_number": row.get(
                    "creditor_building_number", ""
                ),
                "creditor_postal_code": row.get("creditor_postal_code", ""),
                "creditor_town": row.get("creditor_town_name", ""),
                "creditor_country": row.get(
                    "creditor_country_code", row.get("creditor_country", "")
                ),
                "creditor_account_IBAN": row.get("creditor_account_IBAN", ""),
                "creditor_agent_BICFI": row.get("creditor_agent_BICFI", ""),
                "purpose_code": row.get("purpose_code", ""),
                "reference_number": row.get("reference_number", ""),
                "reference_date": row.get("reference_date", ""),
                "remittance_information": row.get(
                    "remittance_information", ""
                ),
            }
            for row in data
        ],
    }


def _prepare_xml_data_v09_to_v11(data: list[dict[str, Any]]) -> dict[str, Any]:
    """Prepare XML data for pain.001.001.09-11 message types."""
    return {
        "id": data[0]["id"],
        "date": data[0]["date"],
        "nb_of_txs": data[0]["nb_of_txs"],
        "initiator_name": data[0]["initiator_name"],
        "payment_id": data[0]["payment_id"],
        "payment_method": data[0]["payment_method"],
        "payment_nb_of_txs": data[0]["nb_of_txs"],
        "requested_execution_date": data[0]["requested_execution_date"],
        "debtor_name": data[0]["debtor_name"],
        "debtor_account_IBAN": data[0]["debtor_account_IBAN"],
        "debtor_agent_BIC": data[0]["debtor_agent_BIC"],
        "charge_bearer": data[0]["charge_bearer"],
        "transactions": [
            {
                "payment_id": row["payment_id"],
                "payment_amount": row["payment_amount"],
                "payment_currency": row.get("payment_currency", ""),
                "charge_bearer": row["charge_bearer"],
                "creditor_agent_BIC": row["creditor_agent_BIC"],
                "creditor_name": row["creditor_name"],
                "creditor_account_IBAN": row["creditor_account_IBAN"],
                "creditor_remittance_information": row[
                    "remittance_information"
                ],
            }
            for row in data[0:]
        ],
    }


[docs] def generate_xml_string( data: list[dict[str, Any]], payment_initiation_message_type: str, xml_template_path: str, xsd_schema_path: str, ) -> str: """Generate ISO 20022 pain.001 XML content as a string (in-memory). This function is ideal for serverless architectures, REST APIs, and microservices where XML needs to be returned without writing to disk. Args: data: List of dictionaries containing payment data. payment_initiation_message_type: Message type (e.g., "pain.001.001.03"). xml_template_path: Path to the Jinja2 XML template file. xsd_schema_path: Path to XSD schema file for validation. Returns: str: The generated and validated XML content. Raises: ValueError: If message type is invalid or data is empty. RuntimeError: If XML validation fails against XSD schema. Examples: >>> data = [{"id": "MSG001", "date": "2026-01-15", ...}] >>> xml_str = generate_xml_string( ... data, ... "pain.001.001.03", ... "templates/pain.001.001.03/template.xml", ... "templates/pain.001.001.03/pain.001.001.03.xsd" ... ) # doctest: +SKIP >>> xml_str.startswith('<?xml') True """ # Define mapping between XML types and data preparation functions xml_data_preparers = { "pain.001.001.03": _prepare_xml_data_v03, "pain.001.001.04": _prepare_xml_data_v04, "pain.001.001.05": _prepare_xml_data_v05_to_v08, "pain.001.001.06": _prepare_xml_data_v05_to_v08, "pain.001.001.07": _prepare_xml_data_v05_to_v08, "pain.001.001.08": _prepare_xml_data_v05_to_v08, "pain.001.001.09": _prepare_xml_data_v09_to_v11, "pain.001.001.10": _prepare_xml_data_v09_to_v11, "pain.001.001.11": _prepare_xml_data_v09_to_v11, } # Validate template path try: xml_template_path = validate_path(xml_template_path) except Exception as e: raise ValueError(f"Invalid template path: {e}") from e # Validate schema path try: xsd_schema_path = validate_path(xsd_schema_path) except Exception as e: raise ValueError(f"Invalid schema path: {e}") from e # Validate message type if payment_initiation_message_type not in xml_data_preparers: raise ValueError( f"Invalid XML message type: {payment_initiation_message_type}" ) # Check if data is not empty if not data: raise ValueError("No data to process - data list is empty") # Prepare XML data using appropriate function preparer = xml_data_preparers[payment_initiation_message_type] xml_data = preparer(data) # Create a Jinja2 environment and load template template_dir = os.path.dirname(xml_template_path) template_file = os.path.basename(xml_template_path) # Use current directory if path has no directory component loader_path = template_dir if template_dir else "." env = Environment(loader=FileSystemLoader(loader_path), autoescape=True) template = env.get_template(template_file) # Render the template to string xml_content = template.render(**xml_data) # Validate the XML content against the XSD schema is_valid = validate_xml_string_via_xsd(xml_content, xsd_schema_path) if not is_valid: raise RuntimeError( f"Generated XML failed validation against {xsd_schema_path}" ) return xml_content
[docs] def generate_xml( data: list[dict[str, Any]], payment_initiation_message_type: str, xml_file_path: str, xsd_file_path: str, ) -> None: """Generates an ISO 20022 pain.001 XML file from input data. This function writes XML to a file. For in-memory XML generation (serverless/API use cases), use generate_xml_string() instead. Args: data: List of dictionaries containing payment data payment_initiation_message_type: String indicating message type such as "pain.001.001.03, pain.001.001.04, pain.001.001.05, pain.001.001.06, pain.001.001.07, pain.001.001.08, etc." xml_file_path: Path to write generated XML file to xsd_file_path: Path to XML schema file for validation Returns: None Raises: ValueError: If message type is invalid or data is empty. RuntimeError: If XML validation fails. """ # Generate XML content as string xml_content = generate_xml_string( data, payment_initiation_message_type, xml_file_path, xsd_file_path ) # Generate updated XML file path updated_xml_file_path = generate_updated_xml_file_path( xml_file_path, payment_initiation_message_type ) # Validate path to prevent traversal attacks try: safe_xml_path = validate_path(updated_xml_file_path) # nosec B108 except Exception as e: raise ValueError(f"Path validation failed: {e}") from e # Explicit startswith guard for CodeQL CWE-22 sanitiser recognition. # validate_path already enforces this, but CodeQL requires the guard # at the call site for interprocedural taint tracking. cwd_prefix = str(os.path.realpath(os.getcwd())) if not safe_xml_path.startswith(cwd_prefix + os.sep): raise ValueError( f"Output path outside working directory: {safe_xml_path}" ) # Write the XML content to the file (now safe after validation) with open(safe_xml_path, "w", encoding="utf-8") as xml_file: # nosec B108 xml_file.write(xml_content) print(f"A new XML file has been created at `{safe_xml_path}`") print(f"The XML has been validated against `{xsd_file_path}`")