# File: src/fitzzftw/patch/container.py
# Author: Fitzz TeXnik Welt
# Email: FitzzTeXnikWelt@t-online.de
# License: LGPLv2 or above
"""
container
===============================
This module provides stateful containers for structural patch data. It models
the hierarchy of a unified diff, ensuring logical integrity during the
parsing and application phases.
Core Components:
----------------
* **Hunk**:
The atomic unit of change. It contains a header (range information)
and a collection of `HunkLine` objects (additions, deletions, or context).
* **DiffCodeFile**:
A file-level container that manages multiple hunks and
file headers (--- / +++). It provides the logic to resolve paths and
sequentially apply changes to file content.
* **Structural Integrity**:
Containers enforce strict validation (e.g.,
mandatory headers) to prevent the processing of malformed patch data.
These classes act as the "Source of Truth" for the patch state, offering
list-like access (indexing, iteration) to their internal elements for
seamless integration with the rest of the framework.
"""
import tempfile
import weakref
from pathlib import Path
from typing import Iterator, cast
from fitzzftw.patch.exceptions import FtwPatchError, PatchParseError
from fitzzftw.patch.lines import FileLine, HeadLine, HunkHeadLine, HunkLine
from fitzzftw.patch.protocols import DiffCodeOptions, HunkCompareOptions
# CLASS - Hunks
[docs]
class Hunk:
"""
Container for a single change block within a file.
This class stores the coordinate information from the hunk header and
the actual content lines. It ensures that only valid HunkHeadLine objects
are used as headers. It tracks added and deleted lines and reports
them to a parent DiffCodeFile if associated.
"""
[docs]
def __init__(
self,
header: HunkHeadLine,
parent: "DiffCodeFile|None" = None,
) -> None:
"""
Initializes a new Hunk with coordinate metadata.
:param header: The '@@' coordinate header line object.
:param parent: Optional parent DiffCodeFile for statistics aggregation.
:raises TypeError: If header is not a HunkHeadLine instance.
"""
if not isinstance(header, HunkHeadLine):
raise TypeError(f"header must be HunkHeadLine, got {type(header).__name__}")
self._parent:"DiffCodeFile|None" = weakref.proxy(parent) if parent else None
self._header = header
self._lines: list[HunkLine] = []
self._lines_added:int = 0
self._lines_deleted:int = 0
@property
def parent(self) -> 'DiffCodeFile | None':
"""
The parent DiffCodeFile (via weakref proxy) **(rw)**.
This can only be set once to maintain statistical integrity. If the
hunk already contains lines, they are reported to the new parent.
Note: Accessing this property may raise a ReferenceError if the
referenced object has been garbage collected.
:param value: The DiffCodeFile instance to link.
:type value: 'DiffCodeFile'
"""
return self._parent
@parent.setter
def parent(self, value: 'DiffCodeFile') -> None:
"""
Sets the parent DiffCodeFile as a weak reference.
This can only be set once to maintain statistical integrity. If the
hunk already contains lines, they are reported to the new parent.
:param value: The DiffCodeFile instance to link.
:raises FtwPatchError: If parent is already set or value is None.
"""
if value is None:
raise FtwPatchError("Hunk parent cannot be set to None!")
if self._parent is not None:
raise FtwPatchError("Hunk parent is already set and cannot be changed!")
self._parent = cast("DiffCodeFile", weakref.proxy(value))
# Sync existing counts to the new parent
if self._lines_added > 0 or self._lines_deleted > 0:
self._parent.update_line_counts(self._lines_added, self._lines_deleted)
@property
def lines(self) -> list[HunkLine]:
"""
Returns a list of all HunkLine objects within this hunk **(ro)**.
:returns: A list containing specialized HunkLine objects.
"""
return self._lines
@property
def old_start(self) -> int:
"""
The starting line number in the original file **(ro)**.
:returns: The 1-based start line index from the header.
"""
return self._header.old_start
@property
def new_start(self) -> int:
"""
The starting line number in the new file **(ro)**.
:returns: The 1-based start line index for the target state.
"""
return self._header.new_start
@property
def addedlines(self) -> int:
"""Total number of added lines (+) in this hunk **(ro)**.
:returns: Total number of added lines.
"""
return self._lines_added
@property
def deletedlines(self)->int:
"""Total number of deleted lines (-) in this hunk **(ro)**.
:returns: Total number of deleted lines.
"""
return self._lines_deleted
[docs]
def add_line(self, line: HunkLine) -> None:
"""
Adds a single content line to the hunk and updates statistics.
:param line: The HunkLine object to append.
"""
lines_add = 1 if line.is_addition else 0
lines_del = 1 if line.is_deletion else 0
self._lines_added += lines_add
self._lines_deleted += lines_del
self.lines.append(line)
if self._parent is not None:
try:
self._parent.update_line_counts(lines_add, lines_del)
except ReferenceError:
self._parent = None
def _compare_context( self,
expected: list[HunkLine],
actual: list[FileLine],
options: HunkCompareOptions
) -> bool:
"""
Compare hunk context against file content using specialized properties.
This method acts as a dispatcher that selects the appropriate comparison
property from the FileLine/HunkLine objects based on the provided
whitespace options.
:param expected: Lines from the hunk (context or deleted).
:param actual: Corresponding lines from the actual file.
:param options: Configuration for whitespace and blankline handling.
:returns: True if all lines match according to the selected rules.
"""
if len(expected) != len(actual):
return False
for exp, act in zip(expected, actual, strict=False):
# 1. Option: --ignore-blank-lines
if getattr(options, "ignore_blank_lines", False):
if exp.is_empty and act.is_empty:
continue
# 2. Vergleich basierend auf den Whitespace-Regeln
if getattr(options, "ignore_all_space", False):
if exp.ignore_all_ws_content != act.ignore_all_ws_content:
return False
elif getattr(options, "ignore_space_change", False):
if exp.normalized_ws_content != act.normalized_ws_content:
return False
else:
if exp.content != act.content:
return False
return True
[docs]
def apply(self, lines: list[FileLine], options: HunkCompareOptions) -> list[FileLine]:
"""
Apply the hunk's changes to a list of FileLine objects.
This method validates the context of the target lines against the
expected hunk context. If the validation passes (considering whitespace
options), it performs the replacement/insertion and returns the new
state of the lines.
:param lines: Current file content as a list of FileLine objects.
:param options: Command line arguments for whitespace and comparison.
:raises FtwPatchError: If the context check fails or the index is out of bounds.
:returns: A modified list of FileLine objects.
"""
# 1. Indizierung vorbereiten (old_start aus dem Unified Diff Header)
# Wir korrigieren auf 0-basierten Index
start_idx = self.old_start - 1 if self.old_start else 0
# 2. Erwarteten Kontext extrahieren
expected_hunk_lines = [lin for lin in self.lines if not lin.is_addition]
# 3. Validierung der Grenzen
if start_idx < 0 or (start_idx + len(expected_hunk_lines)) > len(lines):
raise PatchParseError(
f"Hunk starting at line {self.old_start} exceeds file bounds. "
f"File has {len(lines)} lines."
)
actual_file_lines = lines[start_idx : start_idx + len(expected_hunk_lines)]
# 4. Inhalts-Check mit Whitespace-Logik (ruft interne Methode auf)
if not self._compare_context(expected_hunk_lines, actual_file_lines, options):
raise PatchParseError(
f"Hunk mismatch at line {self.old_start}. "
"The actual file content does not match the hunk's context."
)
# 5. Rekonstruktion der Zeilenliste
new_lines = lines[:start_idx]
for h_line in self.lines:
# Kontext behalten, Additions einfügen, Deletions weglassen
if h_line.is_context or h_line.is_addition:
new_lines.append(FileLine(h_line.line_string))
new_lines.extend(lines[start_idx + len(expected_hunk_lines) :])
return new_lines
def __getitem__(self, index: int) -> HunkLine:
"""
Provides access to hunk lines by their position.
:param index: Zero-based index of the line.
:returns: The HunkLine object at the given index.
"""
return self.lines[index]
def __len__(self) -> int:
"""
Returns the count of lines in this hunk.
:returns: The total number of HunkLine objects.
"""
return len(self.lines)
def __iter__(self) -> Iterator[HunkLine]:
"""
Provides an iterator for the hunk's lines.
:returns: A list iterator for the internal line collection.
"""
return iter(self.lines)
def __repr__(self) -> str:
return (f"{self.__class__.__name__}(header={self._header.coords}, "
f"lines={len(self.lines)})")
#!CLASS - Hunks
# CLASS - DiffCodeFile
[docs]
class DiffCodeFile:
"""
Stateful container for a single file's modifications within a patch.
This class serves as the central assembly point for a file-level patch. It
ensures that only valid HeadLine objects are used for headers. It manages
a collection of Hunks, provides indexed and iterative access, and aggregates
line-level statistics (added/deleted) from its child hunks.
Attributes:
hunks (list[Hunk]): The collection of change blocks for this file.
"""
[docs]
def __init__(self, orig_header: HeadLine) -> None:
"""
Initializes a new DiffCodeFile with a mandatory original file header.
:param orig_header: The '---' header line object.
:raises TypeError: If orig_header is not a HeadLine instance.
:raises PatchParseError: If the header is not an original (---) header.
"""
if not isinstance(orig_header, HeadLine):
raise TypeError(f"orig_header must be HeadLine, got {type(orig_header).__name__}")
if not orig_header.is_orig:
msg_info = "".join(
[
f"{orig_header.prefix}",
f"{orig_header.content}",
f"{orig_header.info if orig_header.info else ''}",
]
)
raise PatchParseError(
f"DiffCodeFile must start with an original header (---), got '{msg_info}'"
)
self._orig_header = orig_header
self._new_header: HeadLine | None = None
self._hunks: list[Hunk] = []
self._lines_added:int = 0
self._lines_deleted:int = 0
@property
def hunks(self) -> list[Hunk]:
"""
Returns a list of all Hunk objects associated with this file **(ro)**.
:returns: A list containing specialized Hunk containers.
"""
return self._hunks
@property
def orig_header(self) -> HeadLine:
"""
The original file header (---) **(ro)**.
:returns: The HeadLine object representing the source file state.
"""
return self._orig_header
@property
def new_header(self) -> HeadLine | None:
"""
The new file header (+++) **(rw)**.
:param value: The HeadLine object to set as the target file state.
:raises TypeError: If the value is not a HeadLine instance (Setter).
:returns: The HeadLine object representing the target file state or None.
"""
return self._new_header
@new_header.setter
def new_header(self, value: HeadLine) -> None:
"""
The new file header (+++) **(rw)**.
:param value: The HeadLine object to set as the target file state.
:raises TypeError: If the value is not a HeadLine instance.
"""
if not isinstance(value, HeadLine):
raise TypeError(f"new_header must be HeadLine, got {type(value).__name__}")
self._new_header = value
@property
def addedlines(self) -> int:
"""Total number of added lines (+) across all hunks **(ro)**."""
return self._lines_added
@property
def deletedlines(self)->int:
"""Total number of deleted lines (-) across all hunks **(ro)**."""
return self._lines_deleted
def __getitem__(self, index: int) -> "Hunk":
"""
Enables indexed access to the stored hunks.
:param index: Zero-based index of the hunk.
:returns: The Hunk object at the specified position.
"""
return self.hunks[index]
def __len__(self) -> int:
"""
Provides the total count of hunks.
:returns: The number of hunks in the internal list.
"""
return len(self.hunks)
def __iter__(self) -> Iterator[Hunk]:
"""
Returns an independent iterator over the hunks.
:returns: A list iterator object for the hunk collection.
"""
return iter(self.hunks)
[docs]
def add_hunk(self, hunk: "Hunk") -> None:
"""
Appends a Hunk to the internal collection and establishes
the parent-child link for statistics.
:param hunk: The Hunk object to add.
"""
if hunk.parent is None:
hunk.parent = self
self.hunks.append(hunk)
[docs]
def apply(self, options: DiffCodeOptions) -> list[FileLine]:
"""
Apply hunks to the file content and return the resulting lines.
This method is purely logical and does not perform any write operations.
:param options: Command line arguments for comparison.
:returns: A new list of FileLine objects representing the patched state.
:raises FtwPatchError: If any hunk fails to apply.
"""
# 1. Datei einlesen (Lesender Zugriff)
if self.orig_header.is_null_path:
current_lines = []
else:
current_lines = self._read_file(self.get_source_path(strip=options.strip_count))
# 2. Hunks sortieren (wie besprochen: rückwärts)
sorted_hunks = sorted(self.hunks, key=lambda h: h.old_start, reverse=True)
# 3. Transformation
for hunk in sorted_hunks:
current_lines = hunk.apply(current_lines, options)
# Wir geben die fertigen Objekte einfach an den Controller zurück
return current_lines
[docs]
def get_source_path(self, strip: int = 0) -> Path:
"""
Determine the source file path based on the header and strip level.
:param strip: Number of path components to remove from the start.
:returns: A Path object for the source file.
"""
# Wir delegieren die Arbeit an das HeadLine-Objekt
return Path(self.orig_header.get_path(strip))
[docs]
def get_target_path(self, strip: int = 0) -> Path:
"""
Determine the source file path based on the header and strip level.
:param strip: Number of path components to remove from the start.
:returns: A Path object for the source file.
"""
# Wir delegieren die Arbeit an das HeadLine-Objekt
if self.new_header is None:
raise ValueError("New Header is None")
return Path(cast(HeadLine,self.new_header).get_path(strip))
[docs]
def update_line_counts(self,lines_added:int, lines_deleleted:int)->None:
"""
Increment the file-wide line counters.
Usually called by child Hunk objects.
:param lines_added: Number of '+' lines to add.
:param lines_deleted: Number of '-' lines to add.
"""
self._lines_added += lines_added
self._lines_deleted += lines_deleleted
@property
def _temp_path(self) -> Path:
"""
Generates a unique path for the staging file **(ro)**.
The path is located in the system's temporary directory and includes
the object's ID to avoid collisions during concurrent processing.
:returns: A Path object for a temporary staging file.
"""
return Path(tempfile.gettempdir()) / f"ftw_patch_{id(self)}.tmp"
def _read_file(self, path: Path) -> list[FileLine]:
"""
Read a file and convert its lines into FileLine objects.
Uses universal newline support to normalize line endings during
reading, ensuring the patch logic operates on consistent content.
:param path: The path to the source file.
:returns: A list of FileLine instances.
:raises FtwPatchError: If the file cannot be read.
"""
try:
with path.open("r", encoding="utf-8") as f:
return [FileLine(line) for line in f]
except (OSError, IOError) as e:
raise PatchParseError(f"Could not read file {path}: {e}")
def _write_to_staging(self, lines: list[FileLine]) -> Path:
"""
Write the patched lines to a temporary file in the staging area.
Reconstructs the file by writing each FileLine. Python's universal
newline handling ensures the output matches the system's standard
line endings.
:param lines: List of patched FileLine objects.
:returns: The Path to the generated temporary file.
:raises FtwPatchError: If writing to the staging area fails.
"""
temp_file = self._temp_path
try:
with temp_file.open("w", encoding="utf-8") as f:
for line in lines:
f.write(line.line_string)
return temp_file
except (OSError, IOError) as e:
raise PatchParseError(f"Could not write to staging file {temp_file}: {e}")
def __repr__(self) -> str:
return " ".join(
[
f"{self.__class__.__name__}(orig={self._orig_header.content},",
f"hunks={len(self.hunks)})",
]
)
#!CLASS - DiffCodeFile
# Hier den Code einfügen
if __name__ == "__main__": # pragma: no cover
from doctest import FAIL_FAST, testfile
be_verbose = False
be_verbose = True
option_flags = 0
option_flags = FAIL_FAST
test_sum = 0
test_failed = 0
# Pfad zu den dokumentierenden Tests
testfiles_dir = Path(__file__).parents[3] / "doc/source/devel"
test_file = testfiles_dir / "get_started_container.rst"
# test_file = testfiles_dir / "get_started_ftw_patch.rst"
if test_file.exists():
print(f"--- Running Doctest for {test_file.name} ---")
doctestresult = testfile(
str(test_file),
module_relative=False,
verbose=be_verbose,
optionflags=option_flags,
)
test_failed += doctestresult.failed
test_sum += doctestresult.attempted
if test_failed == 0:
print(f"\nDocTests passed without errors, {test_sum} tests.")
else:
print(f"\nDocTests failed: {test_failed} tests.")
else:
print(f"⚠️ Warning: Test file {test_file.name} not found.")