Coverage for core/src/version_finder/version_finder.py: 72%
691 statements
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-18 10:30 +0000
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-18 10:30 +0000
1"""
2core.py
3====================================
4Core module for version_finder.
5This module contains the core functionality for finding versions in a git repository.
6It includes classes and functions for handling git operations and version finding.
7The module is designed to work with git repositories and provides a user-friendly interface for
8finding and comparing versions.
9"""
10from dataclasses import dataclass
11from pathlib import Path
12import difflib
13import os
14import re
15import time
16from typing import List, Optional, Dict, Callable
17from version_finder.git_executer import GitCommandExecutor, GitConfig, GitCommandError
18from version_finder.logger import get_logger
19from version_finder.common import GIT_CMD_FETCH, GIT_CMD_CHECKOUT, GIT_CMD_SUBMODULE_UPDATE, GIT_CMD_LIST_BRANCHES, GIT_CMD_LIST_SUBMODULES, BRANCH_PATTERN
21# Initialize module logger
22logger = get_logger()
25class GitError(Exception):
26 """Base exception for git operations"""
29class InvalidGitRepository(GitError):
30 """Raised when the repository path is invalid"""
33class GitRepositoryNotClean(GitError):
34 """Raised when the repository has uncommitted changes"""
37class InvalidCommitError(GitError):
38 """Raised when the commit is invalid"""
41class InvalidSubmoduleError(GitError):
42 """Raised when the submodule is invalid"""
45class InvalidBranchError(GitError):
46 """Raised when the branch is invalid"""
49class VersionNotFoundError(GitError):
50 """Raised when version is not found in commits message"""
53class InvalidFilepathError(GitError):
54 """Raised when a filepath input is not valid"""
57class GitNotInstalledError(GitError):
58 """Raised when git is not installed"""
60 def __init__(self, message: str):
61 installation_guide = """
62 To use version_finder, you need git installed on your system.
64 Installation instructions:
65 - macOS: Install via Xcode Command Line Tools with 'xcode-select --install'
66 - Linux: Use your package manager e.g. 'apt install git' or 'yum install git'
67 - Windows: Download from https://git-scm.com/download/win
69 After installation, ensure 'git' is available in your system PATH.
70 """
71 super().__init__(f"{message}\n{installation_guide}")
74class RepositoryNotTaskReady(GitError):
75 """Raised when the repository is not ready for task"""
77 def __init__(self):
78 super().__init__("Please run update_repository(<selected_branch>) first.")
81@dataclass
82class Commit:
83 """A class to represent a git commit."""
84 sha: str
85 subject: str
86 message: str
87 author: str
88 timestamp: int
89 version: Optional[str] = None
91 def __repr__(self):
92 return f"Commit(sha={self.sha} subject={self.subject})"
94 def __str__(self):
95 return f"{self.sha} {self.subject}"
98@dataclass
99class VersionFinderTask:
100 """A class to represent a VersionFinder task."""
101 index: int
102 name: str
103 description: str
104 args: Optional[Dict] = None
105 run: Callable = None
108class VersionFinderTaskRegistry:
109 def __init__(self):
110 self._tasks_by_name: Dict[str, VersionFinderTask] = {}
111 self._tasks_by_index: Dict[int, VersionFinderTask] = {}
112 self._initialize_tasks()
114 def _initialize_tasks(self):
115 tasks = [
116 VersionFinderTask(
117 name="Find first version containing commit",
118 index=0,
119 description="""The most common task is to find the first version that includes a change (=commit).
120 Given a commit SHA identifier in a repository, it can be done easily using: `git log --grep=version: <commit_ha>^1..<HEAD>` you now what to scroll down all the way to find the first commit.
121 But, when the change is part of a submodule, things can can a little more tricky. Given a submodule with the reposity and the commit SHA identifier, Version Finder
122 will iterate over all the commits that change the submodule pointer. It will than apply binary search to find the first ancestor of the change.""",
123 ),
124 VersionFinderTask(
125 name="Find all commits between two versions",
126 index=1,
127 description="""Trying to identify a commit that may cause an issue, a user would like to seek all the changes between two versions.
128 Once again an easy solution is `git log <old_version_tag>..<new_version_tag>`. If a submodule is given than Version Finder will get the submodule pointers at each commit, and log all the commits between them.""",
129 ),
130 VersionFinderTask(
131 name="Find commit by text",
132 index=2,
133 description="An helper task in-order to identify the correct commit SHA identifier for later",
134 )
135 ]
137 for task in tasks:
138 self._tasks_by_name[task.name] = task
139 self._tasks_by_index[task.index] = task
141 def get_by_name(self, name: str) -> Optional[VersionFinderTask]:
142 return self._tasks_by_name.get(name)
144 def get_by_index(self, index: int) -> Optional[VersionFinderTask]:
145 return self._tasks_by_index.get(index)
147 def get_tasks_by_index(self) -> list[VersionFinderTask]:
148 """Returns tasks sorted by index"""
149 return [self._tasks_by_index[i] for i in sorted(self._tasks_by_index.keys())]
151 def has_index(self, index: int) -> bool:
152 return index in self._tasks_by_index
154 def has_name(self, name: str) -> bool:
155 return name in self._tasks_by_name
157 def _set_task_action(self, index: int, action: Callable):
158 task = self.get_by_index(index)
159 if task:
160 task.run = action
161 else:
162 raise ValueError(f"Task with index {index} not found")
164 def _set_task_action_params(self, index: int, params: Dict):
165 task = self.get_by_index(index)
166 if task:
167 task.args = params
168 else:
169 raise ValueError(f"Task with index {index} not found")
171 def initialize_actions_and_args(self, actions: Dict[int, Callable], params: Dict[int, List[str]]):
172 """
174 """
175 for index, action in actions.items():
176 self._set_task_action(index, action)
177 self._set_task_action_params(index, params[index])
180class VersionFinder:
181 """A class to handle git repository operations and version finding."""
182 repository_path: Path
183 submodules: List[str]
184 branches: List[str]
185 _has_remote: bool
187 # Consolidated version pattern that handles various formats:
188 # - Optional prefixes like "Version:", "VERSION:", "Updated version"
189 # - Optional "XX_" prefix in the version number
190 # - Year formats (e.g., 2023)
191 # - Various separators (., _, -)
192 # - Multiple version components (e.g., 1.2.3, 2023_01_15, etc.)
193 #
194 # Examples of matched versions:
195 # - Version: 1.2.3
196 # - VERSION: XX_2023_01_15
197 # - Updated version 4.5-2
198 # - 2023.01.15
199 version_pattern = r"(?:(?:Version|VERSION|Updated version)\s*:?\s*|[^a-zA-Z0-9][^0-9\s]*)?(?:XX_)?(\d{1,4}(?:[._-]\d+)+)"
201 # Pattern used specifically for git grep searches
202 git_regex_pattern_for_version = "(Version|VERSION|Updated version)(:)? (XX_)?[0-9]+([._-][0-9]+)+"
204 def __init__(self,
205 path: str = '',
206 config: Optional[GitConfig] = None,
207 force: bool = False) -> None:
208 """
209 Initialize the VersionFinder with a repository path and configuration.
211 Args:
212 path: Path to the git repository. Uses current directory if None.
213 config: Configuration settings for git operations.
214 force: If True, allow initialization even if the repository has uncommitted changes.
215 """
216 self.config = config or GitConfig()
217 self.repository_path = Path(path or os.getcwd()).resolve()
218 self.force = force
220 # State tracking
221 self._initial_state = {
222 "branch": None,
223 "has_changes": False
224 }
225 self._state_saved = False
227 try:
228 self._git = GitCommandExecutor(self.repository_path, self.config)
229 except GitCommandError as e:
230 logger.error(f"Error initializing git executor: {e}")
231 raise GitNotInstalledError(e)
233 self.is_task_ready = False
234 self.submodules: List[str] = []
235 self.branches: List[str] = []
237 self.__validate_repository()
238 self.__load_repository_info()
240 def __has_remote(self) -> bool:
241 """Check if the repository has any remotes configured."""
242 try:
243 output: bytes = self._git.execute(["remote"])
244 return bool(output.strip())
245 except GitCommandError:
246 return False
248 def __validate_repository(self) -> None:
249 """Validate the git repository and its state."""
250 try:
251 # Check if directory is a git repository by running git status
252 self._git.execute(["status"])
253 except GitCommandError as e:
254 # Convert GitCommandError to InvalidGitRepository
255 raise InvalidGitRepository(f"Path {self.repository_path} is not a valid git repository: {str(e)}") from e
257 # Store remote status
258 self._has_remote = self.__has_remote()
259 logger.debug(f"Repository has remote: {self._has_remote}")
261 # Check for uncommitted changes
262 has_changes = not self.__is_clean_git_repo()
264 # Only raise an error if force is False
265 if has_changes and not self.force:
266 logger.warning("Repository has uncommitted changes. Use force=True to proceed anyway.")
267 raise GitRepositoryNotClean("Repository has uncommitted changes")
269 def __load_repository_info(self) -> None:
270 """Load repository information including submodules and branches."""
271 if self._has_remote:
272 logger.info(f"Fetching latest changes from remote repository: {self.repository_path}")
273 self.__fetch_repository()
274 self.__load_branches()
275 self.__load_submodules()
276 # If the checked out branch is valid, set it as updated
277 self.updated_branch = self.get_current_branch()
279 # Save initial repository state
280 self.save_repository_state()
282 def __load_submodules(self) -> None:
283 """Load git submodules information."""
284 try:
285 output = self._git.execute(["submodule", "status"])
286 self.submodules = [line.split()[1] for line in output.decode("utf-8").splitlines()]
287 logger.debug(f"Loaded submodules: {self.submodules}")
288 except GitCommandError as e:
289 logger.error(f"Failed to load submodules: {e}")
290 self.submodules = []
292 def __fetch_repository(self) -> None:
293 """Fetch latest changes from remote repository."""
294 try:
295 output = self._git.execute(["fetch", "--all"])
296 logger.debug(f"Fetch output: {output}")
297 except GitCommandError as e:
298 logger.error(f"Failed to fetch repository: {e}")
300 def __load_branches(self) -> None:
301 """Load git branches information."""
302 try:
303 output = self._git.execute(["branch", "-a"])
304 logger.debug(f"Loaded branches output: {output}")
306 start_time = time.time()
307 branch_pattern = re.compile(r'(?:remotes/origin/|\* |HEAD-> )')
308 self.branches = sorted(set(
309 branch_pattern.sub('', branch.strip())
310 for branch in output.decode("utf-8").splitlines()
311 ))
312 filtering_time = time.time()
313 self.branches = list(set(self.branches))
314 remove_duplicates_time = time.time()
315 self.branches.sort()
316 sort_time = time.time()
317 logger.debug(f"Branch filtering took {filtering_time - start_time} seconds")
318 logger.debug(f"Removing duplicates took {remove_duplicates_time - filtering_time} seconds")
319 logger.debug(f"Sorting took {sort_time - remove_duplicates_time} seconds")
320 logger.debug(f"Loaded branches: {self.branches}")
321 except GitCommandError as e:
322 logger.error(f"Failed to load branches: {e}")
323 self.branches = []
325 def __extract_version_from_message(self, commit_message: str) -> Optional[str]:
326 """
327 Extract version from commit message using various patterns.
329 Args:
330 commit_message: The commit message to parse
332 Returns:
333 Optional[str]: Extracted version or None if no version found
334 """
336 match = re.search(self.version_pattern, commit_message)
337 if match:
338 logger.debug(f"match.group(0) = {match.group(0)}")
339 return match.group(1)
340 return None
342 def __is_clean_git_repo(self) -> bool:
343 """Check if the git repository is clean."""
344 try:
345 self._git.execute(["diff", "--quiet", "HEAD"])
346 return True
347 except GitCommandError:
348 return False
350 def list_submodules(self) -> List[str]:
351 """Get list of submodules."""
352 return self.submodules
354 def list_branches(self) -> List[str]:
355 """Get list of branches."""
356 return self.branches
358 def get_commit_info(self, commit_sha: str, submodule: str = '') -> Commit:
359 """Get detailed commit information."""
361 # Verify ready for Tasks
362 if not self.is_task_ready:
363 raise RepositoryNotTaskReady()
365 git_command = ["show", "-s", "--format=%H%x1F%s%x1F%B%x1F%an%x1F%at", commit_sha]
366 if submodule:
367 git_command.insert(0, "--git-dir")
368 git_command.insert(1, f"{submodule}/.git")
369 try:
370 output = self._git.execute(git_command).decode("utf-8").strip()
371 except GitCommandError as e:
372 raise InvalidCommitError(f"Failed to get commit info: {e}")
373 logger.debug(f"Commit info output: {output}")
374 output = output.split('\x1F')
375 for elemetn in output:
376 logger.debug(f"Element: {elemetn}")
377 logger.debug(f"The length of output is: {len(output)}")
378 sha, subject, message, author, timestamp = output
379 version = self.__extract_version_from_message(message)
381 return Commit(
382 sha=sha,
383 subject=subject,
384 message=message,
385 author=author,
386 timestamp=int(timestamp),
387 version=version
388 )
390 def get_current_branch(self) -> str:
391 """Get the current Git branch name.
393 Returns:
394 str: The name of the current branch if successfully determined.
395 Returns None if:
396 - The repository is in a detached HEAD state
397 - There was an error executing the git command
398 - The branch name could not be determined
400 Raises:
401 GitCommandError: May be raised during git command execution, but is caught internally
402 """
403 current_branch = None
404 try:
405 output = self._git.execute(["rev-parse", "--abbrev-ref", "HEAD"])
406 output = output.decode("utf-8").strip()
407 logger.debug(f"Current branch output: {output}")
408 if output not in ["HEAD"]:
409 current_branch = output
410 except GitCommandError as e:
411 logger.error(f"Failed to get current branch: {e}")
412 return current_branch
414 def has_branch(self, branch: str) -> bool:
415 """Check if a branch exists."""
416 return branch in self.branches
418 def save_repository_state(self) -> dict:
419 """
420 Save the current state of the repository.
422 This method:
423 1. Saves the current branch (or commit hash if in detached HEAD state)
424 2. Stashes uncommitted changes if present with a unique identifier
425 3. Recursively saves state for all submodules
427 Returns:
428 dict: A dictionary containing the saved state information
429 """
430 logger.info("Saving repository state")
432 # Generate a unique stash identifier
433 stash_id = f"version_finder_stash_{int(time.time())}"
435 # Get current branch or commit hash if in detached HEAD
436 current_branch = self.updated_branch
437 if not current_branch: # Detached HEAD state
438 try:
439 # Get the current commit hash
440 output = self._git.execute(["rev-parse", "HEAD"]).decode("utf-8").strip()
441 current_branch = f"HEAD:{output}"
442 logger.info(f"Repository is in detached HEAD state at commit {output}")
443 except GitCommandError as e:
444 logger.error(f"Failed to get current commit hash: {e}")
446 # Check for uncommitted changes
447 has_changes = not self.__is_clean_git_repo()
448 stash_created = False
450 # Stash changes if needed
451 if has_changes:
452 logger.info(f"Repository has uncommitted changes, stashing with ID: {stash_id}")
453 try:
454 self._git.execute(["stash", "push", "-m", stash_id])
455 stash_created = True
456 logger.info("Changes stashed successfully")
457 except GitCommandError as e:
458 logger.error(f"Failed to stash changes: {e}")
460 # Save submodule states
461 submodule_states = {}
462 if self.submodules:
463 logger.info(f"Saving state for {len(self.submodules)} submodules")
464 for submodule in self.submodules:
465 try:
466 # Get submodule branch
467 git_command = ["--git-dir", f"{submodule}/.git", "rev-parse", "--abbrev-ref", "HEAD"]
468 submodule_branch = self._git.execute(git_command).decode("utf-8").strip()
470 # Check if submodule has changes
471 git_command = ["--git-dir", f"{submodule}/.git", "diff", "--quiet", "HEAD"]
472 submodule_has_changes = False
473 try:
474 self._git.execute(git_command)
475 except GitCommandError:
476 submodule_has_changes = True
478 # Stash submodule changes if needed
479 submodule_stash_created = False
480 if submodule_has_changes:
481 submodule_stash_id = f"{stash_id}_{submodule}"
482 try:
483 git_command = ["-C", submodule, "stash", "push", "-m", submodule_stash_id]
484 self._git.execute(git_command)
485 submodule_stash_created = True
486 logger.info(f"Stashed changes in submodule {submodule}")
487 except GitCommandError as e:
488 logger.error(f"Failed to stash changes in submodule {submodule}: {e}")
490 # Save submodule state
491 submodule_states[submodule] = {
492 "branch": submodule_branch if submodule_branch != "HEAD" else None,
493 "has_changes": submodule_has_changes,
494 "stash_created": submodule_stash_created,
495 "stash_id": f"{stash_id}_{submodule}" if submodule_has_changes else None
496 }
498 # If in detached HEAD, get the commit hash
499 if submodule_branch == "HEAD":
500 try:
501 git_command = ["--git-dir", f"{submodule}/.git", "rev-parse", "HEAD"]
502 commit_hash = self._git.execute(git_command).decode("utf-8").strip()
503 submodule_states[submodule]["commit_hash"] = commit_hash
504 except GitCommandError as e:
505 logger.error(f"Failed to get commit hash for submodule {submodule}: {e}")
507 except GitCommandError as e:
508 logger.error(f"Failed to save state for submodule {submodule}: {e}")
509 submodule_states[submodule] = {"error": str(e)}
511 # Save state
512 self._initial_state = {
513 "branch": current_branch,
514 "has_changes": has_changes,
515 "stash_created": stash_created,
516 "stash_id": stash_id if has_changes else None,
517 "submodules": submodule_states
518 }
519 self._state_saved = True
521 logger.info(f"Saved repository state: {self._initial_state}")
522 return self._initial_state
524 def get_saved_state(self) -> dict:
525 """
526 Get the saved repository state.
528 Returns:
529 dict: A dictionary containing the saved state information
530 """
531 return self._initial_state
533 def has_saved_state(self) -> bool:
534 """
535 Check if the repository state has been saved.
537 Returns:
538 bool: True if state has been saved, False otherwise
539 """
540 return self._state_saved
542 def has_uncommitted_changes(self) -> bool:
543 """
544 Check if the repository has uncommitted changes.
546 Returns:
547 bool: True if there are uncommitted changes, False otherwise
548 """
549 return not self.__is_clean_git_repo()
551 def restore_repository_state(self) -> bool:
552 """
553 Restore the repository to its saved state.
555 This method:
556 1. Restores the original branch (or commit if in detached HEAD)
557 2. Pops stashed changes if they were stashed during save
558 3. Recursively restores state for all submodules
560 Returns:
561 bool: True if restoration was successful, False otherwise
562 """
563 # Check if the repository directory still exists
564 if not os.path.exists(self.repository_path):
565 logger.warning(f"Repository directory {self.repository_path} no longer exists")
566 return False
568 if not self._state_saved:
569 logger.warning("No saved state to restore")
570 return False
572 original_branch = self._initial_state.get("branch")
573 if not original_branch:
574 logger.warning("No branch information in saved state")
575 return False
577 logger.info(f"Restoring repository to original state: {original_branch}")
579 # Restore submodules first (in reverse order)
580 submodule_states = self._initial_state.get("submodules", {})
581 if submodule_states:
582 logger.info(f"Restoring state for {len(submodule_states)} submodules")
583 for submodule, state in reversed(list(submodule_states.items())):
584 try:
585 # Skip if there was an error during save
586 if "error" in state:
587 logger.warning(f"Skipping submodule {submodule} due to previous error: {state['error']}")
588 continue
590 # Checkout original branch or commit
591 if state.get("branch"):
592 git_command = ["-C", submodule, "checkout", state["branch"]]
593 self._git.execute(git_command)
594 logger.info(f"Restored submodule {submodule} to branch {state['branch']}")
595 elif state.get("commit_hash"):
596 git_command = ["-C", submodule, "checkout", state["commit_hash"]]
597 self._git.execute(git_command)
598 logger.info(f"Restored submodule {submodule} to commit {state['commit_hash']}")
600 # Pop stashed changes if they were stashed
601 if state.get("stash_created"):
602 stash_id = state.get("stash_id")
603 if stash_id:
604 try:
605 # Find the stash by its message
606 git_command = ["-C", submodule, "stash", "list"]
607 stash_output = self._git.execute(git_command).decode("utf-8").strip()
609 if not stash_output:
610 logger.warning(f"No stashes found for submodule {submodule}")
611 continue
613 stash_list = stash_output.split("\n")
614 stash_index = None
616 for i, stash in enumerate(stash_list):
617 if stash_id in stash:
618 stash_index = i
619 break
621 if stash_index is not None:
622 # Use apply instead of pop to avoid conflicts
623 git_command = ["-C", submodule, "stash", "apply", f"stash@{ {stash_index}} "]
624 self._git.execute(git_command)
625 logger.info(f"Applied stashed changes in submodule {submodule}")
627 # Now drop the stash
628 git_command = ["-C", submodule, "stash", "drop", f"stash@{ {stash_index}} "]
629 self._git.execute(git_command)
630 logger.info(f"Dropped stash for submodule {submodule}")
631 else:
632 logger.warning(f"Could not find stash with ID {stash_id} for submodule {submodule}")
633 except GitCommandError as e:
634 logger.error(f"Failed to restore stashed changes for submodule {submodule}: {e}")
636 except GitCommandError as e:
637 logger.error(f"Failed to restore state for submodule {submodule}: {e}")
639 # Restore main repository
640 try:
641 # Check if we're restoring to a detached HEAD state
642 if original_branch.startswith("HEAD:"):
643 commit_hash = original_branch.split(":", 1)[1]
644 self._git.execute(GIT_CMD_CHECKOUT + [commit_hash])
645 logger.info(f"Restored repository to detached HEAD at commit {commit_hash}")
646 else:
647 # Checkout original branch
648 self._git.execute(GIT_CMD_CHECKOUT + [original_branch])
649 logger.info(f"Restored repository to branch {original_branch}")
651 # Pop stashed changes if they were stashed
652 if self._initial_state.get("stash_created"):
653 stash_id = self._initial_state.get("stash_id")
654 if stash_id:
655 try:
656 # Find the stash by its message
657 stash_output = self._git.execute(["stash", "list"]).decode("utf-8").strip()
659 if not stash_output:
660 logger.warning("No stashes found in repository")
661 return True # Still consider restoration successful
663 stash_list = stash_output.split("\n")
664 stash_index = None
666 for i, stash in enumerate(stash_list):
667 if stash_id in stash:
668 stash_index = i
669 break
671 if stash_index is not None:
672 # Use apply instead of pop to avoid conflicts
673 self._git.execute(["stash", "apply", f"stash@{ {stash_index}} "])
674 logger.info("Applied stashed changes")
676 # Now drop the stash
677 self._git.execute(["stash", "drop", f"stash@{ {stash_index}} "])
678 logger.info("Dropped stash after successful apply")
679 else:
680 logger.warning(f"Could not find stash with ID {stash_id}")
681 except GitCommandError as e:
682 logger.error(f"Failed to restore stashed changes: {e}")
683 # Continue anyway, as we've at least restored the branch
685 # Set a flag to indicate that state has been restored
686 self._state_restored = True
688 return True
689 except GitCommandError as e:
690 logger.error(f"Failed to restore repository state: {e}")
691 return False
693 def update_repository(self, branch: str, save_state: bool = True) -> None:
694 """
695 Update the repository to the specified branch.
697 Args:
698 branch: Branch name to checkout
699 save_state: Whether to save the current state before updating
701 Raises:
702 InvalidBranchError: If the branch is invalid
703 GitRepositoryNotClean: If the repository has uncommitted changes
704 """
705 logger.info(f"Updating repository to branch: {branch}")
707 # Save current state if requested
708 if save_state and not self._state_saved:
709 self.save_repository_state()
711 # Fetch latest changes
712 try:
713 self._git.execute(GIT_CMD_FETCH)
714 except GitCommandError as e:
715 logger.error(f"Failed to fetch: {e}")
716 raise
718 # Check if branch exists
719 branches = self.list_branches()
720 if branch not in branches:
721 raise InvalidBranchError(f"Branch '{branch}' not found in repository")
723 # Checkout branch
724 try:
725 self._git.execute(GIT_CMD_CHECKOUT + [branch])
726 except GitCommandError as e:
727 logger.error(f"Failed to checkout branch {branch}: {e}")
728 raise
730 # Update submodules
731 try:
732 self._git.execute(GIT_CMD_SUBMODULE_UPDATE)
733 except GitCommandError as e:
734 logger.warning(f"Failed to update submodules: {e}")
735 # Continue anyway, as this might not be critical
737 self.is_task_ready = True
738 logger.info(f"Repository updated to branch: {branch}")
740 def find_commits_by_text(self, text: str, submodule: str = '') -> List[Commit]:
741 """
742 Find commits in the specified branch that contain the given text in either title or description.
744 Args:
745 text: Text to search for in commit messages (title and description).
746 submodule: Optional submodule path to search in.
748 Returns:
749 List of commit hashes.
751 Raises:
752 GitCommandError: If the git command fails.
753 """
754 if not self.is_task_ready:
755 raise RepositoryNotTaskReady()
757 try:
758 command = [
759 "log",
760 "--format=%H%x1F%s%x1F%b"
761 ]
763 if submodule:
764 # Verify submodule exists
765 if submodule not in self.submodules:
766 raise InvalidSubmoduleError(f"Invalid submodule path: {submodule}")
767 # Execute command in submodule directory
768 command.insert(0, "-C")
769 command.insert(1, submodule)
771 output = self._git.execute(command)
772 commits = output.decode("utf-8").strip().split("\n")
773 matching_commits = []
775 for commit in commits:
776 if not commit: # Skip empty lines
777 continue
778 # Split the commit info using the ASCII delimiter
779 commit_parts = commit.split("\x1F")
780 if len(commit_parts) >= 2: # Only require hash and subject
781 commit_hash = commit_parts[0]
782 subject = commit_parts[1]
783 body = commit_parts[2] if len(commit_parts) >= 3 else ""
784 # Search in both subject and body
785 if (text.lower() in subject.lower() or
786 text.lower() in body.lower()):
787 matching_commits.append(commit_hash)
789 return [self.get_commit_info(commit_sha, submodule=submodule) for commit_sha in matching_commits]
790 except GitCommandError as e:
791 logger.error(f"Failed to find commits by text: {e}")
792 raise
794 def get_commit_surrounding_versions(self, commit_sha: str) -> List[Optional[str]]:
795 """
796 Find the nearest version commits before and after the given commit.
798 Args:
799 commit_sha: The commit SHA to get the surrounding version commits for.
801 Returns:
802 List containing the previous and next version commit SHAs. Elements can be None.
803 """
804 try:
805 if not self.has_commit(commit_sha):
806 raise GitCommandError(f"Commit {commit_sha} does not exist")
807 # Find nearest version commits using grep
808 prev_version = self._git.execute([
809 "log",
810 f"--grep={self.git_regex_pattern_for_version}",
811 "--extended-regexp",
812 "--format=%H",
813 "-n", "1",
814 f"{commit_sha}~1"
815 ]).decode("utf-8").strip() or None
817 # Add validation for empty output
818 if not prev_version:
819 logger.debug("No previous version found")
821 next_version_output = self._git.execute([
822 "log",
823 f"--grep={self.git_regex_pattern_for_version}",
824 "--extended-regexp",
825 "--format=%H",
826 f"{commit_sha}^1..HEAD"
827 ]).decode("utf-8").strip()
829 # Add validation for empty output
830 next_version = next_version_output.split()[-1] if next_version_output else None
831 if not next_version:
832 logger.debug("No next version found")
834 return [prev_version, next_version]
835 except GitCommandError as e:
836 raise GitCommandError(f"Failed to get version commits: {e}") from e
838 def get_version_from_commit(self, commit_sha: str) -> str:
839 """
840 Get the version from the commit message.
842 Args:
843 commit_sha: The commit SHA to get the version for.
845 Returns:
846 str: The version from the commit message.
848 Raises:
849 GitCommandError: If the commit does not exist or version cannot be extracted.
850 """
851 try:
852 # Get the commit message using the pretty format
853 output = self._git.execute([
854 "show",
855 "-s", # suppress diff output
856 "--format=%s", # get subject/title only
857 commit_sha
858 ])
859 message = output.decode("utf-8").strip()
861 # Extract version from message (assuming format "Version: X.Y.Z")
862 version_string = self.__extract_version_from_message(message)
863 if version_string:
864 return version_string
865 raise GitCommandError(f"Commit {commit_sha} does not contain version information")
867 except GitCommandError as e:
868 raise GitCommandError(f"Failed to get version for commit {commit_sha}: {e}") from e
870 def has_commit(self, commit_sha: str) -> bool:
871 """
872 Check if a commit exists in the repository.
874 Args:
875 commit_sha: The commit SHA to check.
877 Returns:
878 bool: True if the commit exists, False otherwise.
879 """
880 try:
881 # -e flag just checks for existence, -t type check is also good
882 self._git.execute(["cat-file", "-e", commit_sha])
883 return True
884 except GitCommandError:
885 return False
887 def submodule_has_commit(self, submodule_path: str, commit_sha: str) -> bool:
888 """
889 Check if a commit exists in a submodule.
891 Args:
892 submodule_path: The path to the submodule.
893 commit_sha: The commit SHA to check.
895 Returns:
896 bool: True if the commit exists in the submodule, False otherwise.
897 """
898 try:
899 # Check if the commit exists in the submodule
900 self._git.execute(["-C", submodule_path, "cat-file", "-e", commit_sha])
901 return True
902 except GitCommandError:
903 logger.error(f"Commit {commit_sha} does not exist in submodule {submodule_path}")
904 return False
906 def get_first_commit_including_submodule_changes(
907 self, submodule_path: str, submodule_target_commit: str) -> str:
908 """
909 Get the first commit that includes changes in the specified submodule.
910 """
911 if not self.is_task_ready:
912 raise RepositoryNotTaskReady()
914 # Verify submodule path exists
915 if submodule_path not in self.submodules:
916 raise GitCommandError(f"Invalid submodule path: {submodule_path}")
918 # Verify commit exists in submodule
919 if not self.submodule_has_commit(submodule_path, submodule_target_commit):
920 raise GitCommandError(f"Commit {submodule_target_commit} does not exist in submodule {submodule_path}")
922 def parse_git_log_output(git_log_output):
923 repo_commit_sha = None
924 tuples = []
925 for line in git_log_output.splitlines():
926 # Detect commit lines
927 if line.startswith("Commit: "):
928 repo_commit_sha = line.split()[1]
929 # Detect submodule commit change lines
930 match = re.match(r"^\+Subproject commit (\w+)", line)
931 if match and repo_commit_sha:
932 submodule_commit_sha = match.group(1)
933 tuples.append((repo_commit_sha, submodule_commit_sha))
934 repo_commit_sha = None # Reset to avoid duplication
935 return tuples
937 git_log_output = self.__get_commits_changing_submodule_pointers_and_the_new_pointer(submodule_path, 1500)
938 if not git_log_output:
939 raise GitCommandError(f"No commits found that change submodule {submodule_path} or its ancestors")
940 # Parse the git log output
941 repo_commot_submodule_ptr_tuples = parse_git_log_output(git_log_output)
942 logger.debug(
943 f"Found {len(repo_commot_submodule_ptr_tuples)} commits that change submodule {submodule_path}")
944 logger.debug(f"First commit: {repo_commot_submodule_ptr_tuples[0][0]}")
945 logger.debug(f"Last commit: {repo_commot_submodule_ptr_tuples[-1][0]}")
947 # Apply binary search to find the first commit that points to an ancestor of the target commit
948 left, right = 0, len(repo_commot_submodule_ptr_tuples) - 1
949 while left <= right:
950 mid = (left + right) // 2
951 submodule_ptr = repo_commot_submodule_ptr_tuples[mid][1]
952 logger.debug(f"Binary search - Left: {left}, Right: {right}, Mid: {mid}")
953 logger.debug(f"Checking if {submodule_target_commit} is ancestor of {submodule_ptr}")
955 is_ancestor = self._git.execute(
956 ["-C", submodule_path, "merge-base", "--is-ancestor", submodule_target_commit, submodule_ptr],
957 check=False) == b''
958 logger.debug(f"Is ancestor: {is_ancestor}")
959 is_equal = submodule_target_commit == submodule_ptr
960 logger.debug(f"Is equal: {is_equal}")
961 is_ancestor_or_equal = is_ancestor or is_equal
962 logger.debug(f"Is ancestor or equal result: {is_ancestor_or_equal}")
964 if is_ancestor_or_equal:
965 logger.debug(f"Moving left pointer from {left} to {mid + 1}")
966 left = mid + 1
967 else:
968 logger.debug(f"Moving right pointer from {right} to {mid - 1}")
969 right = mid - 1
971 logger.debug(f"Binary search completed - Final left: {left}, Final right: {right}")
973 first_commit_to_include_submodule_change = repo_commot_submodule_ptr_tuples[right][0]
974 logger.debug(f"First commit that includes submodule change: {first_commit_to_include_submodule_change}")
975 return first_commit_to_include_submodule_change
977 def __get_commits_changing_submodule_pointers_and_the_new_pointer(self, submodule_path, commit_num_limit):
978 git_log_command = [
979 "log", "--format=Commit: %H", "-p", "--", submodule_path,
980 ]
981 if commit_num_limit:
982 git_log_command.insert(2, f"-n {commit_num_limit}")
983 git_log_output = self._git.execute(git_log_command).decode("utf-8").strip()
984 return git_log_output
986 def find_commit_by_version(self, version: str) -> List[str]:
987 """
988 Find the commit that indicates the specified version.
990 Args:
991 version: The version string to search for
993 Returns:
994 List[str]: List of commit hashes that have the version in their commit message
995 """
996 if not self.is_task_ready:
997 raise RepositoryNotTaskReady()
999 # Find the commit that indicates the specified version using git's extended regex
1000 # The pattern matches various version formats:
1001 # - "Version: X_Y_Z"
1002 # - "VERSION: X_Y_Z"
1003 # - "Updated version X_Y_Z"
1004 # - With optional XX_ prefix
1005 version_pattern = f"(Version|VERSION|Updated version)(:)? (XX_)?{version}"
1006 logger.debug(f"Using version pattern: {version_pattern}")
1008 commits = self._git.execute(
1009 ["log", "--grep", version_pattern, "--extended-regexp", "--format=%H"]).decode("utf-8").strip().split("\n")
1011 # Filter out empty strings that might occur if no commits are found
1012 commits = [commit for commit in commits if commit]
1014 logger.debug(f"Found {len(commits)} commits for version {version}")
1015 logger.debug(f"The type of commits: {type(commits)}")
1016 return commits
1018 def get_submodule_commit_hash(self, commit: str, submodule: str) -> Optional[str]:
1019 """
1020 Get the submodule pointer from a commit.
1021 That is, get the hash of the submodule at the time of the commit.
1022 """
1023 if not self.is_task_ready:
1024 raise RepositoryNotTaskReady()
1026 if not self.has_commit(commit):
1027 logger.error(f"Commit {commit} does not exist")
1028 raise GitCommandError(f"Commit {commit} does not exist")
1030 # Get the submodule pointer from the commit
1031 submodule_ptr = self._git.execute(
1032 ["ls-tree", "-r", "--full-tree", commit, submodule]).decode("utf-8").strip().split("\n")
1033 if not submodule_ptr:
1034 return None
1035 return submodule_ptr[0].split()[2]
1037 def find_commits_between_versions(self, start_version: str,
1038 end_version: str, submodule: Optional[str] = None) -> List[Commit]:
1039 """
1040 Get the list of commits between two versions.
1041 """
1042 if not self.is_task_ready:
1043 raise RepositoryNotTaskReady()
1045 start_commits = self.find_commit_by_version(start_version)
1046 if not start_commits:
1047 raise VersionNotFoundError(f"Version: {start_version} was not found in the repository.")
1048 start_commit = start_commits[0]
1049 logger.debug(f"The commit SHA of version: {start_version} is {start_commit}")
1051 end_commits = self.find_commit_by_version(end_version)
1052 if not end_commits:
1053 raise VersionNotFoundError(f"Version: {end_version} was not found in the repository.")
1054 end_commit = end_commits[0]
1055 logger.debug(f"The commit SHA of version: {end_version} is {end_commit}")
1057 if submodule:
1058 start_commit = self.get_submodule_commit_hash(start_commit, submodule)
1059 logger.debug(f"Version {start_version} point to submodule {submodule} commit: {start_commit}")
1060 if not start_commit:
1061 raise GitError(f"startversion:start_commit: Couldn't find the pointer to submodule: {submodule}")
1062 end_commit = self.get_submodule_commit_hash(end_commit, submodule)
1063 logger.debug(f"Version {end_version} point to submodule {submodule} commit: {end_commit}")
1064 if not end_commit:
1065 raise GitError(f"startversion:end_commit: Couldn't find the pointer to submodule: {submodule}")
1067 lower_bound_commit = self.get_parent_commit(start_commit, submodule)
1068 git_command = ["log", "--format=%H", f"{lower_bound_commit}..{end_commit}"]
1069 if submodule:
1070 git_command.insert(0, "-C")
1071 git_command.insert(1, submodule)
1073 try:
1074 commit_sha_list = self._git.execute(
1075 git_command).decode("utf-8").strip().split("\n")
1076 except GitCommandError as e:
1077 logger.error(f"Failed to get commits between versions: {e}")
1078 raise e
1080 return [self.get_commit_info(commit, submodule=submodule) for commit in commit_sha_list]
1082 def get_parent_commit(self, commit: str, submodule=None) -> str:
1083 """
1084 Get the parent commit of a given commit hash.
1086 Args:
1087 commit: The commit hash to find the parent for
1088 submodule: Optional submodule path to look in
1090 Returns:
1091 str: Parent commit hash, or original commit if no parent exists
1093 Raises:
1094 RepositoryNotTaskReady: If repository is not ready
1095 """
1096 if not self.is_task_ready:
1097 raise RepositoryNotTaskReady()
1098 if submodule:
1099 if self.submodule_has_commit(submodule, f"{commit}^"):
1100 return f"{commit}^"
1101 return commit
1102 if self.has_commit(f"{commit}^"):
1103 return f"{commit}^"
1104 return commit
1106 def find_first_version_containing_commit(self, commit_sha: str, submodule=None) -> Optional[str]:
1107 """
1108 Get the first version which includes the given commit.
1109 If submodule is provided, get the first version which includes the given commit in the submodule.
1110 If no version is found, return None.
1111 """
1113 if not self.is_task_ready:
1114 raise RepositoryNotTaskReady()
1116 if submodule:
1117 # Get the first commit that includes changes in the submodule
1118 commit_sha = self.get_first_commit_including_submodule_changes(submodule, commit_sha)
1120 if not self.has_commit(commit_sha):
1121 logger.error(f"Commit {commit_sha} does not exist")
1122 raise InvalidCommitError(f"Commit {commit_sha} does not exist in the repository: {self.repository_path}")
1124 versions_commits = self.get_commit_surrounding_versions(commit_sha)
1125 if versions_commits is None or versions_commits[1] is None:
1126 return None
1128 return self.get_version_from_commit(versions_commits[1])
1130 def get_commit_sha_from_relative_string(self, relative_string: str, submodule: str = '') -> Optional[str]:
1131 """
1132 Get the commit SHA from a relative string.
1133 For example, "HEAD~1" will return the SHA of the commit that is one commit before HEAD.
1134 """
1135 if not self.is_task_ready:
1136 raise RepositoryNotTaskReady()
1138 # Get the commit SHA from the relative string
1139 try:
1140 commit_sha = self._git.execute(
1141 ["rev-parse", relative_string]).decode("utf-8").strip()
1142 except GitCommandError as e:
1143 logger.error(f"Error while getting commit SHA from relative string: {e}")
1144 raise InvalidCommitError(f"Invalid commit SHA: {e}")
1145 return commit_sha
1147 def get_task_api_functions(self) -> Dict[int, Callable]:
1148 """
1149 Get the list of API functions.
1150 """
1151 return {
1152 0: self.find_commits_by_text,
1153 1: self.find_first_version_containing_commit,
1154 2: self.find_commits_between_versions,
1155 }
1157 def get_task_api_functions_params(self) -> Dict[int, List[str]]:
1158 """
1159 Get the list of API functions parameters.
1160 """
1161 return {
1162 0: ["text"],
1163 1: ["commit_sha"],
1164 2: ["start_version", "end_version"],
1165 }
1167 def is_valid_submodule(self, submodule: str = ''):
1168 if not isinstance(submodule, str):
1169 raise TypeError("submodule only accepts string or empty string")
1170 if submodule in self.submodules or submodule == '':
1171 return True
1172 raise InvalidSubmoduleError()
1174 def get_commit_diff_files(self, commit_hash: str, submodule: str = '',):
1175 """
1176 Get the list of files changed in a commit.
1177 """
1178 # Validate input and task readiness
1179 self.is_valid_submodule(submodule=submodule)
1180 self.is_valid_commit(commit_hash=commit_hash, submodule=submodule)
1181 if not self.is_task_ready:
1182 raise RepositoryNotTaskReady()
1184 # Check if this is the initial commit by trying to get its parent
1185 try:
1186 self._git.execute(["rev-parse", f"{commit_hash}^"])
1187 has_parent = True
1188 except BaseException:
1189 has_parent = False
1191 if has_parent:
1192 diff_output = self._git.execute(["diff", "--name-only", commit_hash + "^", commit_hash])
1193 else:
1194 # For initial commit, compare with empty tree
1195 empty_tree = "4b825dc642cb6eb9a060e54bf8d69288fbee4904"
1196 diff_output = self._git.execute(["diff", "--name-only", empty_tree, commit_hash])
1197 return diff_output.decode().splitlines()
1199 def is_valid_commit(self, commit_hash: str, submodule: str = ''):
1200 valid_commit = False
1201 if not commit_hash or not isinstance(commit_hash, str):
1202 raise TypeError("commit_hash only accepts string")
1203 if not isinstance(submodule, str):
1204 raise TypeError("submodule only accepts string or empty string")
1206 if submodule:
1207 valid_commit = self.submodule_has_commit(submodule_path=submodule, commit_sha=commit_hash)
1208 else:
1209 valid_commit = self.has_commit(commit_sha=commit_hash)
1210 if not valid_commit:
1211 raise InvalidCommitError()
1212 return valid_commit
1214 def get_file_content_at_commit(self, commit_hash: str, file_path: str, submodule: str = '',):
1215 """
1216 Get the content of a file at a specific commit.
1217 """
1219 try:
1220 # Validate input and task readiness
1221 self.is_valid_submodule(submodule=submodule)
1222 self.is_valid_commit(commit_hash=commit_hash, submodule=submodule)
1223 if not self.is_task_ready:
1224 raise RepositoryNotTaskReady()
1225 if not file_path:
1226 raise InvalidFilepathError()
1227 file_content = self._git.execute(["show", f"{commit_hash}:{file_path}"])
1228 return file_content
1229 except GitCommandError:
1230 # If the file does not exist in the commit (e.g., new file), return an empty string
1231 return ""
1233 def generate_commit_diff_html(self, commit_hash: str, submodule: str = '', output_html="commit_diff.html"):
1234 """
1235 Generate an HTML file showing diffs for all files changed in a specific commit.
1236 """
1237 try:
1238 # Validate input and task readiness
1239 self.is_valid_submodule(submodule=submodule)
1240 self.is_valid_commit(commit_hash=commit_hash, submodule=submodule)
1241 if not self.is_task_ready:
1242 raise RepositoryNotTaskReady()
1244 # Get changed files
1245 changed_files = self.get_commit_diff_files(commit_hash)
1247 html_content = []
1248 for file_path in changed_files:
1249 # Get file content before and after the commit
1250 old_content = self.get_file_content_at_commit(commit_hash + "^", file_path)
1251 new_content = self.get_file_content_at_commit(commit_hash, file_path)
1253 # Generate the HTML diff for the current file
1254 diff_html = difflib.HtmlDiff(wrapcolumn=80).make_file(
1255 old_content.splitlines(),
1256 new_content.splitlines(),
1257 fromdesc=f"{file_path} (before)",
1258 todesc=f"{file_path} (after)",
1259 context=True,
1260 numlines=3
1261 )
1262 html_content.append(diff_html)
1264 # Save the generated HTML
1265 html_body = "<br><br>".join(html_content)
1266 html_template = f"""
1267 <html>
1268 <head>
1269 <title>Commit Diff: {commit_hash}</title>
1270 </head>
1271 <body>
1272 <h1>Commit Diff: {commit_hash}</h1>
1273 {html_body}
1274 </body>
1275 </html>
1276 """
1277 output_path = Path(output_html)
1278 output_path.write_text(html_template, encoding="utf-8")
1279 return str(output_path)
1280 except TypeError as e:
1281 raise e
1282 except Exception as e:
1283 return f"Error: {str(e)}"
1285 def find_version(self, commit_sha: str, submodule: str = None) -> Optional[str]:
1286 """
1287 Find the version that contains a specific commit.
1289 Args:
1290 commit_sha: The commit SHA to find the version for
1291 submodule: Optional submodule path
1293 Returns:
1294 The version string if found, None otherwise
1295 """
1296 logger.info(
1297 f"Finding version for commit {commit_sha} in "
1298 f"{'submodule ' + submodule if submodule else 'main repository'}")
1300 try:
1301 # Validate commit
1302 if not self.is_valid_commit(commit_sha, submodule):
1303 logger.error(f"Invalid commit: {commit_sha}")
1304 raise InvalidCommitError(f"Invalid commit: {commit_sha}")
1306 # Find the version
1307 version = self.find_first_version_containing_commit(commit_sha, submodule)
1309 if version:
1310 logger.info(f"Found version {version} for commit {commit_sha}")
1311 else:
1312 logger.info(f"No version found for commit {commit_sha}")
1314 return version
1316 except GitCommandError as e:
1317 logger.error(f"Git error while finding version: {e}")
1318 raise
1319 except Exception as e:
1320 logger.error(f"Error finding version: {e}")
1321 raise
1323 def check_repository_state(self) -> dict:
1324 """
1325 Check the current state of the repository without raising exceptions.
1327 Returns:
1328 dict: A dictionary containing information about the repository state
1329 """
1330 state = {
1331 "branch": self.updated_branch,
1332 "has_changes": not self.__is_clean_git_repo(),
1333 "is_valid": True,
1334 "error": None
1335 }
1337 try:
1338 # Check if directory is a git repository
1339 self._git.execute(["status"])
1340 except GitCommandError as e:
1341 state["is_valid"] = False
1342 state["error"] = f"Not a valid git repository: {str(e)}"
1344 return state
1346 def __del__(self):
1347 """
1348 Destructor to ensure repository state is restored when the object is garbage collected.
1349 This provides an additional safety net to ensure changes are always restored.
1350 """
1351 try:
1352 # Check if we need to restore the state
1353 # If _state_restored is True, it means the state was already restored explicitly
1354 if (hasattr(self, '_state_saved') and self._state_saved and
1355 not (hasattr(self, '_state_restored') and self._state_restored)):
1357 # Check if the repository directory still exists
1358 if hasattr(self, 'repository_path') and os.path.exists(self.repository_path):
1359 logger.info("VersionFinder being destroyed, attempting to restore repository state")
1360 self.restore_repository_state()
1361 else:
1362 logger.debug("Repository directory no longer exists, skipping state restoration")
1363 except Exception as e:
1364 # We can't raise exceptions in __del__, so just log them
1365 logger.error(f"Error in VersionFinder destructor: {str(e)}")