Coverage for core/src/version_finder/version_finder.py: 72%

691 statements  

« prev     ^ index     » next       coverage.py v7.7.0, created at 2025-03-18 10:30 +0000

1""" 

2core.py 

3==================================== 

4Core module for version_finder. 

5This module contains the core functionality for finding versions in a git repository. 

6It includes classes and functions for handling git operations and version finding. 

7The module is designed to work with git repositories and provides a user-friendly interface for 

8finding and comparing versions. 

9""" 

10from dataclasses import dataclass 

11from pathlib import Path 

12import difflib 

13import os 

14import re 

15import time 

16from typing import List, Optional, Dict, Callable 

17from version_finder.git_executer import GitCommandExecutor, GitConfig, GitCommandError 

18from version_finder.logger import get_logger 

19from version_finder.common import GIT_CMD_FETCH, GIT_CMD_CHECKOUT, GIT_CMD_SUBMODULE_UPDATE, GIT_CMD_LIST_BRANCHES, GIT_CMD_LIST_SUBMODULES, BRANCH_PATTERN 

20 

21# Initialize module logger 

22logger = get_logger() 

23 

24 

25class GitError(Exception): 

26 """Base exception for git operations""" 

27 

28 

29class InvalidGitRepository(GitError): 

30 """Raised when the repository path is invalid""" 

31 

32 

33class GitRepositoryNotClean(GitError): 

34 """Raised when the repository has uncommitted changes""" 

35 

36 

37class InvalidCommitError(GitError): 

38 """Raised when the commit is invalid""" 

39 

40 

41class InvalidSubmoduleError(GitError): 

42 """Raised when the submodule is invalid""" 

43 

44 

45class InvalidBranchError(GitError): 

46 """Raised when the branch is invalid""" 

47 

48 

49class VersionNotFoundError(GitError): 

50 """Raised when version is not found in commits message""" 

51 

52 

53class InvalidFilepathError(GitError): 

54 """Raised when a filepath input is not valid""" 

55 

56 

57class GitNotInstalledError(GitError): 

58 """Raised when git is not installed""" 

59 

60 def __init__(self, message: str): 

61 installation_guide = """ 

62 To use version_finder, you need git installed on your system. 

63 

64 Installation instructions: 

65 - macOS: Install via Xcode Command Line Tools with 'xcode-select --install' 

66 - Linux: Use your package manager e.g. 'apt install git' or 'yum install git' 

67 - Windows: Download from https://git-scm.com/download/win 

68 

69 After installation, ensure 'git' is available in your system PATH. 

70 """ 

71 super().__init__(f"{message}\n{installation_guide}") 

72 

73 

74class RepositoryNotTaskReady(GitError): 

75 """Raised when the repository is not ready for task""" 

76 

77 def __init__(self): 

78 super().__init__("Please run update_repository(<selected_branch>) first.") 

79 

80 

81@dataclass 

82class Commit: 

83 """A class to represent a git commit.""" 

84 sha: str 

85 subject: str 

86 message: str 

87 author: str 

88 timestamp: int 

89 version: Optional[str] = None 

90 

91 def __repr__(self): 

92 return f"Commit(sha={self.sha} subject={self.subject})" 

93 

94 def __str__(self): 

95 return f"{self.sha} {self.subject}" 

96 

97 

98@dataclass 

99class VersionFinderTask: 

100 """A class to represent a VersionFinder task.""" 

101 index: int 

102 name: str 

103 description: str 

104 args: Optional[Dict] = None 

105 run: Callable = None 

106 

107 

108class VersionFinderTaskRegistry: 

109 def __init__(self): 

110 self._tasks_by_name: Dict[str, VersionFinderTask] = {} 

111 self._tasks_by_index: Dict[int, VersionFinderTask] = {} 

112 self._initialize_tasks() 

113 

114 def _initialize_tasks(self): 

115 tasks = [ 

116 VersionFinderTask( 

117 name="Find first version containing commit", 

118 index=0, 

119 description="""The most common task is to find the first version that includes a change (=commit). 

120 Given a commit SHA identifier in a repository, it can be done easily using: `git log --grep=version: <commit_ha>^1..<HEAD>` you now what to scroll down all the way to find the first commit. 

121 But, when the change is part of a submodule, things can can a little more tricky. Given a submodule with the reposity and the commit SHA identifier, Version Finder 

122 will iterate over all the commits that change the submodule pointer. It will than apply binary search to find the first ancestor of the change.""", 

123 ), 

124 VersionFinderTask( 

125 name="Find all commits between two versions", 

126 index=1, 

127 description="""Trying to identify a commit that may cause an issue, a user would like to seek all the changes between two versions. 

128 Once again an easy solution is `git log <old_version_tag>..<new_version_tag>`. If a submodule is given than Version Finder will get the submodule pointers at each commit, and log all the commits between them.""", 

129 ), 

130 VersionFinderTask( 

131 name="Find commit by text", 

132 index=2, 

133 description="An helper task in-order to identify the correct commit SHA identifier for later", 

134 ) 

135 ] 

136 

137 for task in tasks: 

138 self._tasks_by_name[task.name] = task 

139 self._tasks_by_index[task.index] = task 

140 

141 def get_by_name(self, name: str) -> Optional[VersionFinderTask]: 

142 return self._tasks_by_name.get(name) 

143 

144 def get_by_index(self, index: int) -> Optional[VersionFinderTask]: 

145 return self._tasks_by_index.get(index) 

146 

147 def get_tasks_by_index(self) -> list[VersionFinderTask]: 

148 """Returns tasks sorted by index""" 

149 return [self._tasks_by_index[i] for i in sorted(self._tasks_by_index.keys())] 

150 

151 def has_index(self, index: int) -> bool: 

152 return index in self._tasks_by_index 

153 

154 def has_name(self, name: str) -> bool: 

155 return name in self._tasks_by_name 

156 

157 def _set_task_action(self, index: int, action: Callable): 

158 task = self.get_by_index(index) 

159 if task: 

160 task.run = action 

161 else: 

162 raise ValueError(f"Task with index {index} not found") 

163 

164 def _set_task_action_params(self, index: int, params: Dict): 

165 task = self.get_by_index(index) 

166 if task: 

167 task.args = params 

168 else: 

169 raise ValueError(f"Task with index {index} not found") 

170 

171 def initialize_actions_and_args(self, actions: Dict[int, Callable], params: Dict[int, List[str]]): 

172 """ 

173 

174 """ 

175 for index, action in actions.items(): 

176 self._set_task_action(index, action) 

177 self._set_task_action_params(index, params[index]) 

178 

179 

180class VersionFinder: 

181 """A class to handle git repository operations and version finding.""" 

182 repository_path: Path 

183 submodules: List[str] 

184 branches: List[str] 

185 _has_remote: bool 

186 

187 # Consolidated version pattern that handles various formats: 

188 # - Optional prefixes like "Version:", "VERSION:", "Updated version" 

189 # - Optional "XX_" prefix in the version number 

190 # - Year formats (e.g., 2023) 

191 # - Various separators (., _, -) 

192 # - Multiple version components (e.g., 1.2.3, 2023_01_15, etc.) 

193 # 

194 # Examples of matched versions: 

195 # - Version: 1.2.3 

196 # - VERSION: XX_2023_01_15 

197 # - Updated version 4.5-2 

198 # - 2023.01.15 

199 version_pattern = r"(?:(?:Version|VERSION|Updated version)\s*:?\s*|[^a-zA-Z0-9][^0-9\s]*)?(?:XX_)?(\d{1,4}(?:[._-]\d+)+)" 

200 

201 # Pattern used specifically for git grep searches 

202 git_regex_pattern_for_version = "(Version|VERSION|Updated version)(:)? (XX_)?[0-9]+([._-][0-9]+)+" 

203 

204 def __init__(self, 

205 path: str = '', 

206 config: Optional[GitConfig] = None, 

207 force: bool = False) -> None: 

208 """ 

209 Initialize the VersionFinder with a repository path and configuration. 

210 

211 Args: 

212 path: Path to the git repository. Uses current directory if None. 

213 config: Configuration settings for git operations. 

214 force: If True, allow initialization even if the repository has uncommitted changes. 

215 """ 

216 self.config = config or GitConfig() 

217 self.repository_path = Path(path or os.getcwd()).resolve() 

218 self.force = force 

219 

220 # State tracking 

221 self._initial_state = { 

222 "branch": None, 

223 "has_changes": False 

224 } 

225 self._state_saved = False 

226 

227 try: 

228 self._git = GitCommandExecutor(self.repository_path, self.config) 

229 except GitCommandError as e: 

230 logger.error(f"Error initializing git executor: {e}") 

231 raise GitNotInstalledError(e) 

232 

233 self.is_task_ready = False 

234 self.submodules: List[str] = [] 

235 self.branches: List[str] = [] 

236 

237 self.__validate_repository() 

238 self.__load_repository_info() 

239 

240 def __has_remote(self) -> bool: 

241 """Check if the repository has any remotes configured.""" 

242 try: 

243 output: bytes = self._git.execute(["remote"]) 

244 return bool(output.strip()) 

245 except GitCommandError: 

246 return False 

247 

248 def __validate_repository(self) -> None: 

249 """Validate the git repository and its state.""" 

250 try: 

251 # Check if directory is a git repository by running git status 

252 self._git.execute(["status"]) 

253 except GitCommandError as e: 

254 # Convert GitCommandError to InvalidGitRepository 

255 raise InvalidGitRepository(f"Path {self.repository_path} is not a valid git repository: {str(e)}") from e 

256 

257 # Store remote status 

258 self._has_remote = self.__has_remote() 

259 logger.debug(f"Repository has remote: {self._has_remote}") 

260 

261 # Check for uncommitted changes 

262 has_changes = not self.__is_clean_git_repo() 

263 

264 # Only raise an error if force is False 

265 if has_changes and not self.force: 

266 logger.warning("Repository has uncommitted changes. Use force=True to proceed anyway.") 

267 raise GitRepositoryNotClean("Repository has uncommitted changes") 

268 

269 def __load_repository_info(self) -> None: 

270 """Load repository information including submodules and branches.""" 

271 if self._has_remote: 

272 logger.info(f"Fetching latest changes from remote repository: {self.repository_path}") 

273 self.__fetch_repository() 

274 self.__load_branches() 

275 self.__load_submodules() 

276 # If the checked out branch is valid, set it as updated 

277 self.updated_branch = self.get_current_branch() 

278 

279 # Save initial repository state 

280 self.save_repository_state() 

281 

282 def __load_submodules(self) -> None: 

283 """Load git submodules information.""" 

284 try: 

285 output = self._git.execute(["submodule", "status"]) 

286 self.submodules = [line.split()[1] for line in output.decode("utf-8").splitlines()] 

287 logger.debug(f"Loaded submodules: {self.submodules}") 

288 except GitCommandError as e: 

289 logger.error(f"Failed to load submodules: {e}") 

290 self.submodules = [] 

291 

292 def __fetch_repository(self) -> None: 

293 """Fetch latest changes from remote repository.""" 

294 try: 

295 output = self._git.execute(["fetch", "--all"]) 

296 logger.debug(f"Fetch output: {output}") 

297 except GitCommandError as e: 

298 logger.error(f"Failed to fetch repository: {e}") 

299 

300 def __load_branches(self) -> None: 

301 """Load git branches information.""" 

302 try: 

303 output = self._git.execute(["branch", "-a"]) 

304 logger.debug(f"Loaded branches output: {output}") 

305 

306 start_time = time.time() 

307 branch_pattern = re.compile(r'(?:remotes/origin/|\* |HEAD-> )') 

308 self.branches = sorted(set( 

309 branch_pattern.sub('', branch.strip()) 

310 for branch in output.decode("utf-8").splitlines() 

311 )) 

312 filtering_time = time.time() 

313 self.branches = list(set(self.branches)) 

314 remove_duplicates_time = time.time() 

315 self.branches.sort() 

316 sort_time = time.time() 

317 logger.debug(f"Branch filtering took {filtering_time - start_time} seconds") 

318 logger.debug(f"Removing duplicates took {remove_duplicates_time - filtering_time} seconds") 

319 logger.debug(f"Sorting took {sort_time - remove_duplicates_time} seconds") 

320 logger.debug(f"Loaded branches: {self.branches}") 

321 except GitCommandError as e: 

322 logger.error(f"Failed to load branches: {e}") 

323 self.branches = [] 

324 

325 def __extract_version_from_message(self, commit_message: str) -> Optional[str]: 

326 """ 

327 Extract version from commit message using various patterns. 

328 

329 Args: 

330 commit_message: The commit message to parse 

331 

332 Returns: 

333 Optional[str]: Extracted version or None if no version found 

334 """ 

335 

336 match = re.search(self.version_pattern, commit_message) 

337 if match: 

338 logger.debug(f"match.group(0) = {match.group(0)}") 

339 return match.group(1) 

340 return None 

341 

342 def __is_clean_git_repo(self) -> bool: 

343 """Check if the git repository is clean.""" 

344 try: 

345 self._git.execute(["diff", "--quiet", "HEAD"]) 

346 return True 

347 except GitCommandError: 

348 return False 

349 

350 def list_submodules(self) -> List[str]: 

351 """Get list of submodules.""" 

352 return self.submodules 

353 

354 def list_branches(self) -> List[str]: 

355 """Get list of branches.""" 

356 return self.branches 

357 

358 def get_commit_info(self, commit_sha: str, submodule: str = '') -> Commit: 

359 """Get detailed commit information.""" 

360 

361 # Verify ready for Tasks 

362 if not self.is_task_ready: 

363 raise RepositoryNotTaskReady() 

364 

365 git_command = ["show", "-s", "--format=%H%x1F%s%x1F%B%x1F%an%x1F%at", commit_sha] 

366 if submodule: 

367 git_command.insert(0, "--git-dir") 

368 git_command.insert(1, f"{submodule}/.git") 

369 try: 

370 output = self._git.execute(git_command).decode("utf-8").strip() 

371 except GitCommandError as e: 

372 raise InvalidCommitError(f"Failed to get commit info: {e}") 

373 logger.debug(f"Commit info output: {output}") 

374 output = output.split('\x1F') 

375 for elemetn in output: 

376 logger.debug(f"Element: {elemetn}") 

377 logger.debug(f"The length of output is: {len(output)}") 

378 sha, subject, message, author, timestamp = output 

379 version = self.__extract_version_from_message(message) 

380 

381 return Commit( 

382 sha=sha, 

383 subject=subject, 

384 message=message, 

385 author=author, 

386 timestamp=int(timestamp), 

387 version=version 

388 ) 

389 

390 def get_current_branch(self) -> str: 

391 """Get the current Git branch name. 

392 

393 Returns: 

394 str: The name of the current branch if successfully determined. 

395 Returns None if: 

396 - The repository is in a detached HEAD state 

397 - There was an error executing the git command 

398 - The branch name could not be determined 

399 

400 Raises: 

401 GitCommandError: May be raised during git command execution, but is caught internally 

402 """ 

403 current_branch = None 

404 try: 

405 output = self._git.execute(["rev-parse", "--abbrev-ref", "HEAD"]) 

406 output = output.decode("utf-8").strip() 

407 logger.debug(f"Current branch output: {output}") 

408 if output not in ["HEAD"]: 

409 current_branch = output 

410 except GitCommandError as e: 

411 logger.error(f"Failed to get current branch: {e}") 

412 return current_branch 

413 

414 def has_branch(self, branch: str) -> bool: 

415 """Check if a branch exists.""" 

416 return branch in self.branches 

417 

418 def save_repository_state(self) -> dict: 

419 """ 

420 Save the current state of the repository. 

421 

422 This method: 

423 1. Saves the current branch (or commit hash if in detached HEAD state) 

424 2. Stashes uncommitted changes if present with a unique identifier 

425 3. Recursively saves state for all submodules 

426 

427 Returns: 

428 dict: A dictionary containing the saved state information 

429 """ 

430 logger.info("Saving repository state") 

431 

432 # Generate a unique stash identifier 

433 stash_id = f"version_finder_stash_{int(time.time())}" 

434 

435 # Get current branch or commit hash if in detached HEAD 

436 current_branch = self.updated_branch 

437 if not current_branch: # Detached HEAD state 

438 try: 

439 # Get the current commit hash 

440 output = self._git.execute(["rev-parse", "HEAD"]).decode("utf-8").strip() 

441 current_branch = f"HEAD:{output}" 

442 logger.info(f"Repository is in detached HEAD state at commit {output}") 

443 except GitCommandError as e: 

444 logger.error(f"Failed to get current commit hash: {e}") 

445 

446 # Check for uncommitted changes 

447 has_changes = not self.__is_clean_git_repo() 

448 stash_created = False 

449 

450 # Stash changes if needed 

451 if has_changes: 

452 logger.info(f"Repository has uncommitted changes, stashing with ID: {stash_id}") 

453 try: 

454 self._git.execute(["stash", "push", "-m", stash_id]) 

455 stash_created = True 

456 logger.info("Changes stashed successfully") 

457 except GitCommandError as e: 

458 logger.error(f"Failed to stash changes: {e}") 

459 

460 # Save submodule states 

461 submodule_states = {} 

462 if self.submodules: 

463 logger.info(f"Saving state for {len(self.submodules)} submodules") 

464 for submodule in self.submodules: 

465 try: 

466 # Get submodule branch 

467 git_command = ["--git-dir", f"{submodule}/.git", "rev-parse", "--abbrev-ref", "HEAD"] 

468 submodule_branch = self._git.execute(git_command).decode("utf-8").strip() 

469 

470 # Check if submodule has changes 

471 git_command = ["--git-dir", f"{submodule}/.git", "diff", "--quiet", "HEAD"] 

472 submodule_has_changes = False 

473 try: 

474 self._git.execute(git_command) 

475 except GitCommandError: 

476 submodule_has_changes = True 

477 

478 # Stash submodule changes if needed 

479 submodule_stash_created = False 

480 if submodule_has_changes: 

481 submodule_stash_id = f"{stash_id}_{submodule}" 

482 try: 

483 git_command = ["-C", submodule, "stash", "push", "-m", submodule_stash_id] 

484 self._git.execute(git_command) 

485 submodule_stash_created = True 

486 logger.info(f"Stashed changes in submodule {submodule}") 

487 except GitCommandError as e: 

488 logger.error(f"Failed to stash changes in submodule {submodule}: {e}") 

489 

490 # Save submodule state 

491 submodule_states[submodule] = { 

492 "branch": submodule_branch if submodule_branch != "HEAD" else None, 

493 "has_changes": submodule_has_changes, 

494 "stash_created": submodule_stash_created, 

495 "stash_id": f"{stash_id}_{submodule}" if submodule_has_changes else None 

496 } 

497 

498 # If in detached HEAD, get the commit hash 

499 if submodule_branch == "HEAD": 

500 try: 

501 git_command = ["--git-dir", f"{submodule}/.git", "rev-parse", "HEAD"] 

502 commit_hash = self._git.execute(git_command).decode("utf-8").strip() 

503 submodule_states[submodule]["commit_hash"] = commit_hash 

504 except GitCommandError as e: 

505 logger.error(f"Failed to get commit hash for submodule {submodule}: {e}") 

506 

507 except GitCommandError as e: 

508 logger.error(f"Failed to save state for submodule {submodule}: {e}") 

509 submodule_states[submodule] = {"error": str(e)} 

510 

511 # Save state 

512 self._initial_state = { 

513 "branch": current_branch, 

514 "has_changes": has_changes, 

515 "stash_created": stash_created, 

516 "stash_id": stash_id if has_changes else None, 

517 "submodules": submodule_states 

518 } 

519 self._state_saved = True 

520 

521 logger.info(f"Saved repository state: {self._initial_state}") 

522 return self._initial_state 

523 

524 def get_saved_state(self) -> dict: 

525 """ 

526 Get the saved repository state. 

527 

528 Returns: 

529 dict: A dictionary containing the saved state information 

530 """ 

531 return self._initial_state 

532 

533 def has_saved_state(self) -> bool: 

534 """ 

535 Check if the repository state has been saved. 

536 

537 Returns: 

538 bool: True if state has been saved, False otherwise 

539 """ 

540 return self._state_saved 

541 

542 def has_uncommitted_changes(self) -> bool: 

543 """ 

544 Check if the repository has uncommitted changes. 

545 

546 Returns: 

547 bool: True if there are uncommitted changes, False otherwise 

548 """ 

549 return not self.__is_clean_git_repo() 

550 

551 def restore_repository_state(self) -> bool: 

552 """ 

553 Restore the repository to its saved state. 

554 

555 This method: 

556 1. Restores the original branch (or commit if in detached HEAD) 

557 2. Pops stashed changes if they were stashed during save 

558 3. Recursively restores state for all submodules 

559 

560 Returns: 

561 bool: True if restoration was successful, False otherwise 

562 """ 

563 # Check if the repository directory still exists 

564 if not os.path.exists(self.repository_path): 

565 logger.warning(f"Repository directory {self.repository_path} no longer exists") 

566 return False 

567 

568 if not self._state_saved: 

569 logger.warning("No saved state to restore") 

570 return False 

571 

572 original_branch = self._initial_state.get("branch") 

573 if not original_branch: 

574 logger.warning("No branch information in saved state") 

575 return False 

576 

577 logger.info(f"Restoring repository to original state: {original_branch}") 

578 

579 # Restore submodules first (in reverse order) 

580 submodule_states = self._initial_state.get("submodules", {}) 

581 if submodule_states: 

582 logger.info(f"Restoring state for {len(submodule_states)} submodules") 

583 for submodule, state in reversed(list(submodule_states.items())): 

584 try: 

585 # Skip if there was an error during save 

586 if "error" in state: 

587 logger.warning(f"Skipping submodule {submodule} due to previous error: {state['error']}") 

588 continue 

589 

590 # Checkout original branch or commit 

591 if state.get("branch"): 

592 git_command = ["-C", submodule, "checkout", state["branch"]] 

593 self._git.execute(git_command) 

594 logger.info(f"Restored submodule {submodule} to branch {state['branch']}") 

595 elif state.get("commit_hash"): 

596 git_command = ["-C", submodule, "checkout", state["commit_hash"]] 

597 self._git.execute(git_command) 

598 logger.info(f"Restored submodule {submodule} to commit {state['commit_hash']}") 

599 

600 # Pop stashed changes if they were stashed 

601 if state.get("stash_created"): 

602 stash_id = state.get("stash_id") 

603 if stash_id: 

604 try: 

605 # Find the stash by its message 

606 git_command = ["-C", submodule, "stash", "list"] 

607 stash_output = self._git.execute(git_command).decode("utf-8").strip() 

608 

609 if not stash_output: 

610 logger.warning(f"No stashes found for submodule {submodule}") 

611 continue 

612 

613 stash_list = stash_output.split("\n") 

614 stash_index = None 

615 

616 for i, stash in enumerate(stash_list): 

617 if stash_id in stash: 

618 stash_index = i 

619 break 

620 

621 if stash_index is not None: 

622 # Use apply instead of pop to avoid conflicts 

623 git_command = ["-C", submodule, "stash", "apply", f"stash@{ {stash_index}} "] 

624 self._git.execute(git_command) 

625 logger.info(f"Applied stashed changes in submodule {submodule}") 

626 

627 # Now drop the stash 

628 git_command = ["-C", submodule, "stash", "drop", f"stash@{ {stash_index}} "] 

629 self._git.execute(git_command) 

630 logger.info(f"Dropped stash for submodule {submodule}") 

631 else: 

632 logger.warning(f"Could not find stash with ID {stash_id} for submodule {submodule}") 

633 except GitCommandError as e: 

634 logger.error(f"Failed to restore stashed changes for submodule {submodule}: {e}") 

635 

636 except GitCommandError as e: 

637 logger.error(f"Failed to restore state for submodule {submodule}: {e}") 

638 

639 # Restore main repository 

640 try: 

641 # Check if we're restoring to a detached HEAD state 

642 if original_branch.startswith("HEAD:"): 

643 commit_hash = original_branch.split(":", 1)[1] 

644 self._git.execute(GIT_CMD_CHECKOUT + [commit_hash]) 

645 logger.info(f"Restored repository to detached HEAD at commit {commit_hash}") 

646 else: 

647 # Checkout original branch 

648 self._git.execute(GIT_CMD_CHECKOUT + [original_branch]) 

649 logger.info(f"Restored repository to branch {original_branch}") 

650 

651 # Pop stashed changes if they were stashed 

652 if self._initial_state.get("stash_created"): 

653 stash_id = self._initial_state.get("stash_id") 

654 if stash_id: 

655 try: 

656 # Find the stash by its message 

657 stash_output = self._git.execute(["stash", "list"]).decode("utf-8").strip() 

658 

659 if not stash_output: 

660 logger.warning("No stashes found in repository") 

661 return True # Still consider restoration successful 

662 

663 stash_list = stash_output.split("\n") 

664 stash_index = None 

665 

666 for i, stash in enumerate(stash_list): 

667 if stash_id in stash: 

668 stash_index = i 

669 break 

670 

671 if stash_index is not None: 

672 # Use apply instead of pop to avoid conflicts 

673 self._git.execute(["stash", "apply", f"stash@{ {stash_index}} "]) 

674 logger.info("Applied stashed changes") 

675 

676 # Now drop the stash 

677 self._git.execute(["stash", "drop", f"stash@{ {stash_index}} "]) 

678 logger.info("Dropped stash after successful apply") 

679 else: 

680 logger.warning(f"Could not find stash with ID {stash_id}") 

681 except GitCommandError as e: 

682 logger.error(f"Failed to restore stashed changes: {e}") 

683 # Continue anyway, as we've at least restored the branch 

684 

685 # Set a flag to indicate that state has been restored 

686 self._state_restored = True 

687 

688 return True 

689 except GitCommandError as e: 

690 logger.error(f"Failed to restore repository state: {e}") 

691 return False 

692 

693 def update_repository(self, branch: str, save_state: bool = True) -> None: 

694 """ 

695 Update the repository to the specified branch. 

696 

697 Args: 

698 branch: Branch name to checkout 

699 save_state: Whether to save the current state before updating 

700 

701 Raises: 

702 InvalidBranchError: If the branch is invalid 

703 GitRepositoryNotClean: If the repository has uncommitted changes 

704 """ 

705 logger.info(f"Updating repository to branch: {branch}") 

706 

707 # Save current state if requested 

708 if save_state and not self._state_saved: 

709 self.save_repository_state() 

710 

711 # Fetch latest changes 

712 try: 

713 self._git.execute(GIT_CMD_FETCH) 

714 except GitCommandError as e: 

715 logger.error(f"Failed to fetch: {e}") 

716 raise 

717 

718 # Check if branch exists 

719 branches = self.list_branches() 

720 if branch not in branches: 

721 raise InvalidBranchError(f"Branch '{branch}' not found in repository") 

722 

723 # Checkout branch 

724 try: 

725 self._git.execute(GIT_CMD_CHECKOUT + [branch]) 

726 except GitCommandError as e: 

727 logger.error(f"Failed to checkout branch {branch}: {e}") 

728 raise 

729 

730 # Update submodules 

731 try: 

732 self._git.execute(GIT_CMD_SUBMODULE_UPDATE) 

733 except GitCommandError as e: 

734 logger.warning(f"Failed to update submodules: {e}") 

735 # Continue anyway, as this might not be critical 

736 

737 self.is_task_ready = True 

738 logger.info(f"Repository updated to branch: {branch}") 

739 

740 def find_commits_by_text(self, text: str, submodule: str = '') -> List[Commit]: 

741 """ 

742 Find commits in the specified branch that contain the given text in either title or description. 

743 

744 Args: 

745 text: Text to search for in commit messages (title and description). 

746 submodule: Optional submodule path to search in. 

747 

748 Returns: 

749 List of commit hashes. 

750 

751 Raises: 

752 GitCommandError: If the git command fails. 

753 """ 

754 if not self.is_task_ready: 

755 raise RepositoryNotTaskReady() 

756 

757 try: 

758 command = [ 

759 "log", 

760 "--format=%H%x1F%s%x1F%b" 

761 ] 

762 

763 if submodule: 

764 # Verify submodule exists 

765 if submodule not in self.submodules: 

766 raise InvalidSubmoduleError(f"Invalid submodule path: {submodule}") 

767 # Execute command in submodule directory 

768 command.insert(0, "-C") 

769 command.insert(1, submodule) 

770 

771 output = self._git.execute(command) 

772 commits = output.decode("utf-8").strip().split("\n") 

773 matching_commits = [] 

774 

775 for commit in commits: 

776 if not commit: # Skip empty lines 

777 continue 

778 # Split the commit info using the ASCII delimiter 

779 commit_parts = commit.split("\x1F") 

780 if len(commit_parts) >= 2: # Only require hash and subject 

781 commit_hash = commit_parts[0] 

782 subject = commit_parts[1] 

783 body = commit_parts[2] if len(commit_parts) >= 3 else "" 

784 # Search in both subject and body 

785 if (text.lower() in subject.lower() or 

786 text.lower() in body.lower()): 

787 matching_commits.append(commit_hash) 

788 

789 return [self.get_commit_info(commit_sha, submodule=submodule) for commit_sha in matching_commits] 

790 except GitCommandError as e: 

791 logger.error(f"Failed to find commits by text: {e}") 

792 raise 

793 

794 def get_commit_surrounding_versions(self, commit_sha: str) -> List[Optional[str]]: 

795 """ 

796 Find the nearest version commits before and after the given commit. 

797 

798 Args: 

799 commit_sha: The commit SHA to get the surrounding version commits for. 

800 

801 Returns: 

802 List containing the previous and next version commit SHAs. Elements can be None. 

803 """ 

804 try: 

805 if not self.has_commit(commit_sha): 

806 raise GitCommandError(f"Commit {commit_sha} does not exist") 

807 # Find nearest version commits using grep 

808 prev_version = self._git.execute([ 

809 "log", 

810 f"--grep={self.git_regex_pattern_for_version}", 

811 "--extended-regexp", 

812 "--format=%H", 

813 "-n", "1", 

814 f"{commit_sha}~1" 

815 ]).decode("utf-8").strip() or None 

816 

817 # Add validation for empty output 

818 if not prev_version: 

819 logger.debug("No previous version found") 

820 

821 next_version_output = self._git.execute([ 

822 "log", 

823 f"--grep={self.git_regex_pattern_for_version}", 

824 "--extended-regexp", 

825 "--format=%H", 

826 f"{commit_sha}^1..HEAD" 

827 ]).decode("utf-8").strip() 

828 

829 # Add validation for empty output 

830 next_version = next_version_output.split()[-1] if next_version_output else None 

831 if not next_version: 

832 logger.debug("No next version found") 

833 

834 return [prev_version, next_version] 

835 except GitCommandError as e: 

836 raise GitCommandError(f"Failed to get version commits: {e}") from e 

837 

838 def get_version_from_commit(self, commit_sha: str) -> str: 

839 """ 

840 Get the version from the commit message. 

841 

842 Args: 

843 commit_sha: The commit SHA to get the version for. 

844 

845 Returns: 

846 str: The version from the commit message. 

847 

848 Raises: 

849 GitCommandError: If the commit does not exist or version cannot be extracted. 

850 """ 

851 try: 

852 # Get the commit message using the pretty format 

853 output = self._git.execute([ 

854 "show", 

855 "-s", # suppress diff output 

856 "--format=%s", # get subject/title only 

857 commit_sha 

858 ]) 

859 message = output.decode("utf-8").strip() 

860 

861 # Extract version from message (assuming format "Version: X.Y.Z") 

862 version_string = self.__extract_version_from_message(message) 

863 if version_string: 

864 return version_string 

865 raise GitCommandError(f"Commit {commit_sha} does not contain version information") 

866 

867 except GitCommandError as e: 

868 raise GitCommandError(f"Failed to get version for commit {commit_sha}: {e}") from e 

869 

870 def has_commit(self, commit_sha: str) -> bool: 

871 """ 

872 Check if a commit exists in the repository. 

873 

874 Args: 

875 commit_sha: The commit SHA to check. 

876 

877 Returns: 

878 bool: True if the commit exists, False otherwise. 

879 """ 

880 try: 

881 # -e flag just checks for existence, -t type check is also good 

882 self._git.execute(["cat-file", "-e", commit_sha]) 

883 return True 

884 except GitCommandError: 

885 return False 

886 

887 def submodule_has_commit(self, submodule_path: str, commit_sha: str) -> bool: 

888 """ 

889 Check if a commit exists in a submodule. 

890 

891 Args: 

892 submodule_path: The path to the submodule. 

893 commit_sha: The commit SHA to check. 

894 

895 Returns: 

896 bool: True if the commit exists in the submodule, False otherwise. 

897 """ 

898 try: 

899 # Check if the commit exists in the submodule 

900 self._git.execute(["-C", submodule_path, "cat-file", "-e", commit_sha]) 

901 return True 

902 except GitCommandError: 

903 logger.error(f"Commit {commit_sha} does not exist in submodule {submodule_path}") 

904 return False 

905 

906 def get_first_commit_including_submodule_changes( 

907 self, submodule_path: str, submodule_target_commit: str) -> str: 

908 """ 

909 Get the first commit that includes changes in the specified submodule. 

910 """ 

911 if not self.is_task_ready: 

912 raise RepositoryNotTaskReady() 

913 

914 # Verify submodule path exists 

915 if submodule_path not in self.submodules: 

916 raise GitCommandError(f"Invalid submodule path: {submodule_path}") 

917 

918 # Verify commit exists in submodule 

919 if not self.submodule_has_commit(submodule_path, submodule_target_commit): 

920 raise GitCommandError(f"Commit {submodule_target_commit} does not exist in submodule {submodule_path}") 

921 

922 def parse_git_log_output(git_log_output): 

923 repo_commit_sha = None 

924 tuples = [] 

925 for line in git_log_output.splitlines(): 

926 # Detect commit lines 

927 if line.startswith("Commit: "): 

928 repo_commit_sha = line.split()[1] 

929 # Detect submodule commit change lines 

930 match = re.match(r"^\+Subproject commit (\w+)", line) 

931 if match and repo_commit_sha: 

932 submodule_commit_sha = match.group(1) 

933 tuples.append((repo_commit_sha, submodule_commit_sha)) 

934 repo_commit_sha = None # Reset to avoid duplication 

935 return tuples 

936 

937 git_log_output = self.__get_commits_changing_submodule_pointers_and_the_new_pointer(submodule_path, 1500) 

938 if not git_log_output: 

939 raise GitCommandError(f"No commits found that change submodule {submodule_path} or its ancestors") 

940 # Parse the git log output 

941 repo_commot_submodule_ptr_tuples = parse_git_log_output(git_log_output) 

942 logger.debug( 

943 f"Found {len(repo_commot_submodule_ptr_tuples)} commits that change submodule {submodule_path}") 

944 logger.debug(f"First commit: {repo_commot_submodule_ptr_tuples[0][0]}") 

945 logger.debug(f"Last commit: {repo_commot_submodule_ptr_tuples[-1][0]}") 

946 

947 # Apply binary search to find the first commit that points to an ancestor of the target commit 

948 left, right = 0, len(repo_commot_submodule_ptr_tuples) - 1 

949 while left <= right: 

950 mid = (left + right) // 2 

951 submodule_ptr = repo_commot_submodule_ptr_tuples[mid][1] 

952 logger.debug(f"Binary search - Left: {left}, Right: {right}, Mid: {mid}") 

953 logger.debug(f"Checking if {submodule_target_commit} is ancestor of {submodule_ptr}") 

954 

955 is_ancestor = self._git.execute( 

956 ["-C", submodule_path, "merge-base", "--is-ancestor", submodule_target_commit, submodule_ptr], 

957 check=False) == b'' 

958 logger.debug(f"Is ancestor: {is_ancestor}") 

959 is_equal = submodule_target_commit == submodule_ptr 

960 logger.debug(f"Is equal: {is_equal}") 

961 is_ancestor_or_equal = is_ancestor or is_equal 

962 logger.debug(f"Is ancestor or equal result: {is_ancestor_or_equal}") 

963 

964 if is_ancestor_or_equal: 

965 logger.debug(f"Moving left pointer from {left} to {mid + 1}") 

966 left = mid + 1 

967 else: 

968 logger.debug(f"Moving right pointer from {right} to {mid - 1}") 

969 right = mid - 1 

970 

971 logger.debug(f"Binary search completed - Final left: {left}, Final right: {right}") 

972 

973 first_commit_to_include_submodule_change = repo_commot_submodule_ptr_tuples[right][0] 

974 logger.debug(f"First commit that includes submodule change: {first_commit_to_include_submodule_change}") 

975 return first_commit_to_include_submodule_change 

976 

977 def __get_commits_changing_submodule_pointers_and_the_new_pointer(self, submodule_path, commit_num_limit): 

978 git_log_command = [ 

979 "log", "--format=Commit: %H", "-p", "--", submodule_path, 

980 ] 

981 if commit_num_limit: 

982 git_log_command.insert(2, f"-n {commit_num_limit}") 

983 git_log_output = self._git.execute(git_log_command).decode("utf-8").strip() 

984 return git_log_output 

985 

986 def find_commit_by_version(self, version: str) -> List[str]: 

987 """ 

988 Find the commit that indicates the specified version. 

989 

990 Args: 

991 version: The version string to search for 

992 

993 Returns: 

994 List[str]: List of commit hashes that have the version in their commit message 

995 """ 

996 if not self.is_task_ready: 

997 raise RepositoryNotTaskReady() 

998 

999 # Find the commit that indicates the specified version using git's extended regex 

1000 # The pattern matches various version formats: 

1001 # - "Version: X_Y_Z" 

1002 # - "VERSION: X_Y_Z" 

1003 # - "Updated version X_Y_Z" 

1004 # - With optional XX_ prefix 

1005 version_pattern = f"(Version|VERSION|Updated version)(:)? (XX_)?{version}" 

1006 logger.debug(f"Using version pattern: {version_pattern}") 

1007 

1008 commits = self._git.execute( 

1009 ["log", "--grep", version_pattern, "--extended-regexp", "--format=%H"]).decode("utf-8").strip().split("\n") 

1010 

1011 # Filter out empty strings that might occur if no commits are found 

1012 commits = [commit for commit in commits if commit] 

1013 

1014 logger.debug(f"Found {len(commits)} commits for version {version}") 

1015 logger.debug(f"The type of commits: {type(commits)}") 

1016 return commits 

1017 

1018 def get_submodule_commit_hash(self, commit: str, submodule: str) -> Optional[str]: 

1019 """ 

1020 Get the submodule pointer from a commit. 

1021 That is, get the hash of the submodule at the time of the commit. 

1022 """ 

1023 if not self.is_task_ready: 

1024 raise RepositoryNotTaskReady() 

1025 

1026 if not self.has_commit(commit): 

1027 logger.error(f"Commit {commit} does not exist") 

1028 raise GitCommandError(f"Commit {commit} does not exist") 

1029 

1030 # Get the submodule pointer from the commit 

1031 submodule_ptr = self._git.execute( 

1032 ["ls-tree", "-r", "--full-tree", commit, submodule]).decode("utf-8").strip().split("\n") 

1033 if not submodule_ptr: 

1034 return None 

1035 return submodule_ptr[0].split()[2] 

1036 

1037 def find_commits_between_versions(self, start_version: str, 

1038 end_version: str, submodule: Optional[str] = None) -> List[Commit]: 

1039 """ 

1040 Get the list of commits between two versions. 

1041 """ 

1042 if not self.is_task_ready: 

1043 raise RepositoryNotTaskReady() 

1044 

1045 start_commits = self.find_commit_by_version(start_version) 

1046 if not start_commits: 

1047 raise VersionNotFoundError(f"Version: {start_version} was not found in the repository.") 

1048 start_commit = start_commits[0] 

1049 logger.debug(f"The commit SHA of version: {start_version} is {start_commit}") 

1050 

1051 end_commits = self.find_commit_by_version(end_version) 

1052 if not end_commits: 

1053 raise VersionNotFoundError(f"Version: {end_version} was not found in the repository.") 

1054 end_commit = end_commits[0] 

1055 logger.debug(f"The commit SHA of version: {end_version} is {end_commit}") 

1056 

1057 if submodule: 

1058 start_commit = self.get_submodule_commit_hash(start_commit, submodule) 

1059 logger.debug(f"Version {start_version} point to submodule {submodule} commit: {start_commit}") 

1060 if not start_commit: 

1061 raise GitError(f"startversion:start_commit: Couldn't find the pointer to submodule: {submodule}") 

1062 end_commit = self.get_submodule_commit_hash(end_commit, submodule) 

1063 logger.debug(f"Version {end_version} point to submodule {submodule} commit: {end_commit}") 

1064 if not end_commit: 

1065 raise GitError(f"startversion:end_commit: Couldn't find the pointer to submodule: {submodule}") 

1066 

1067 lower_bound_commit = self.get_parent_commit(start_commit, submodule) 

1068 git_command = ["log", "--format=%H", f"{lower_bound_commit}..{end_commit}"] 

1069 if submodule: 

1070 git_command.insert(0, "-C") 

1071 git_command.insert(1, submodule) 

1072 

1073 try: 

1074 commit_sha_list = self._git.execute( 

1075 git_command).decode("utf-8").strip().split("\n") 

1076 except GitCommandError as e: 

1077 logger.error(f"Failed to get commits between versions: {e}") 

1078 raise e 

1079 

1080 return [self.get_commit_info(commit, submodule=submodule) for commit in commit_sha_list] 

1081 

1082 def get_parent_commit(self, commit: str, submodule=None) -> str: 

1083 """ 

1084 Get the parent commit of a given commit hash. 

1085 

1086 Args: 

1087 commit: The commit hash to find the parent for 

1088 submodule: Optional submodule path to look in 

1089 

1090 Returns: 

1091 str: Parent commit hash, or original commit if no parent exists 

1092 

1093 Raises: 

1094 RepositoryNotTaskReady: If repository is not ready 

1095 """ 

1096 if not self.is_task_ready: 

1097 raise RepositoryNotTaskReady() 

1098 if submodule: 

1099 if self.submodule_has_commit(submodule, f"{commit}^"): 

1100 return f"{commit}^" 

1101 return commit 

1102 if self.has_commit(f"{commit}^"): 

1103 return f"{commit}^" 

1104 return commit 

1105 

1106 def find_first_version_containing_commit(self, commit_sha: str, submodule=None) -> Optional[str]: 

1107 """ 

1108 Get the first version which includes the given commit. 

1109 If submodule is provided, get the first version which includes the given commit in the submodule. 

1110 If no version is found, return None. 

1111 """ 

1112 

1113 if not self.is_task_ready: 

1114 raise RepositoryNotTaskReady() 

1115 

1116 if submodule: 

1117 # Get the first commit that includes changes in the submodule 

1118 commit_sha = self.get_first_commit_including_submodule_changes(submodule, commit_sha) 

1119 

1120 if not self.has_commit(commit_sha): 

1121 logger.error(f"Commit {commit_sha} does not exist") 

1122 raise InvalidCommitError(f"Commit {commit_sha} does not exist in the repository: {self.repository_path}") 

1123 

1124 versions_commits = self.get_commit_surrounding_versions(commit_sha) 

1125 if versions_commits is None or versions_commits[1] is None: 

1126 return None 

1127 

1128 return self.get_version_from_commit(versions_commits[1]) 

1129 

1130 def get_commit_sha_from_relative_string(self, relative_string: str, submodule: str = '') -> Optional[str]: 

1131 """ 

1132 Get the commit SHA from a relative string. 

1133 For example, "HEAD~1" will return the SHA of the commit that is one commit before HEAD. 

1134 """ 

1135 if not self.is_task_ready: 

1136 raise RepositoryNotTaskReady() 

1137 

1138 # Get the commit SHA from the relative string 

1139 try: 

1140 commit_sha = self._git.execute( 

1141 ["rev-parse", relative_string]).decode("utf-8").strip() 

1142 except GitCommandError as e: 

1143 logger.error(f"Error while getting commit SHA from relative string: {e}") 

1144 raise InvalidCommitError(f"Invalid commit SHA: {e}") 

1145 return commit_sha 

1146 

1147 def get_task_api_functions(self) -> Dict[int, Callable]: 

1148 """ 

1149 Get the list of API functions. 

1150 """ 

1151 return { 

1152 0: self.find_commits_by_text, 

1153 1: self.find_first_version_containing_commit, 

1154 2: self.find_commits_between_versions, 

1155 } 

1156 

1157 def get_task_api_functions_params(self) -> Dict[int, List[str]]: 

1158 """ 

1159 Get the list of API functions parameters. 

1160 """ 

1161 return { 

1162 0: ["text"], 

1163 1: ["commit_sha"], 

1164 2: ["start_version", "end_version"], 

1165 } 

1166 

1167 def is_valid_submodule(self, submodule: str = ''): 

1168 if not isinstance(submodule, str): 

1169 raise TypeError("submodule only accepts string or empty string") 

1170 if submodule in self.submodules or submodule == '': 

1171 return True 

1172 raise InvalidSubmoduleError() 

1173 

1174 def get_commit_diff_files(self, commit_hash: str, submodule: str = '',): 

1175 """ 

1176 Get the list of files changed in a commit. 

1177 """ 

1178 # Validate input and task readiness 

1179 self.is_valid_submodule(submodule=submodule) 

1180 self.is_valid_commit(commit_hash=commit_hash, submodule=submodule) 

1181 if not self.is_task_ready: 

1182 raise RepositoryNotTaskReady() 

1183 

1184 # Check if this is the initial commit by trying to get its parent 

1185 try: 

1186 self._git.execute(["rev-parse", f"{commit_hash}^"]) 

1187 has_parent = True 

1188 except BaseException: 

1189 has_parent = False 

1190 

1191 if has_parent: 

1192 diff_output = self._git.execute(["diff", "--name-only", commit_hash + "^", commit_hash]) 

1193 else: 

1194 # For initial commit, compare with empty tree 

1195 empty_tree = "4b825dc642cb6eb9a060e54bf8d69288fbee4904" 

1196 diff_output = self._git.execute(["diff", "--name-only", empty_tree, commit_hash]) 

1197 return diff_output.decode().splitlines() 

1198 

1199 def is_valid_commit(self, commit_hash: str, submodule: str = ''): 

1200 valid_commit = False 

1201 if not commit_hash or not isinstance(commit_hash, str): 

1202 raise TypeError("commit_hash only accepts string") 

1203 if not isinstance(submodule, str): 

1204 raise TypeError("submodule only accepts string or empty string") 

1205 

1206 if submodule: 

1207 valid_commit = self.submodule_has_commit(submodule_path=submodule, commit_sha=commit_hash) 

1208 else: 

1209 valid_commit = self.has_commit(commit_sha=commit_hash) 

1210 if not valid_commit: 

1211 raise InvalidCommitError() 

1212 return valid_commit 

1213 

1214 def get_file_content_at_commit(self, commit_hash: str, file_path: str, submodule: str = '',): 

1215 """ 

1216 Get the content of a file at a specific commit. 

1217 """ 

1218 

1219 try: 

1220 # Validate input and task readiness 

1221 self.is_valid_submodule(submodule=submodule) 

1222 self.is_valid_commit(commit_hash=commit_hash, submodule=submodule) 

1223 if not self.is_task_ready: 

1224 raise RepositoryNotTaskReady() 

1225 if not file_path: 

1226 raise InvalidFilepathError() 

1227 file_content = self._git.execute(["show", f"{commit_hash}:{file_path}"]) 

1228 return file_content 

1229 except GitCommandError: 

1230 # If the file does not exist in the commit (e.g., new file), return an empty string 

1231 return "" 

1232 

1233 def generate_commit_diff_html(self, commit_hash: str, submodule: str = '', output_html="commit_diff.html"): 

1234 """ 

1235 Generate an HTML file showing diffs for all files changed in a specific commit. 

1236 """ 

1237 try: 

1238 # Validate input and task readiness 

1239 self.is_valid_submodule(submodule=submodule) 

1240 self.is_valid_commit(commit_hash=commit_hash, submodule=submodule) 

1241 if not self.is_task_ready: 

1242 raise RepositoryNotTaskReady() 

1243 

1244 # Get changed files 

1245 changed_files = self.get_commit_diff_files(commit_hash) 

1246 

1247 html_content = [] 

1248 for file_path in changed_files: 

1249 # Get file content before and after the commit 

1250 old_content = self.get_file_content_at_commit(commit_hash + "^", file_path) 

1251 new_content = self.get_file_content_at_commit(commit_hash, file_path) 

1252 

1253 # Generate the HTML diff for the current file 

1254 diff_html = difflib.HtmlDiff(wrapcolumn=80).make_file( 

1255 old_content.splitlines(), 

1256 new_content.splitlines(), 

1257 fromdesc=f"{file_path} (before)", 

1258 todesc=f"{file_path} (after)", 

1259 context=True, 

1260 numlines=3 

1261 ) 

1262 html_content.append(diff_html) 

1263 

1264 # Save the generated HTML 

1265 html_body = "<br><br>".join(html_content) 

1266 html_template = f""" 

1267 <html> 

1268 <head> 

1269 <title>Commit Diff: {commit_hash}</title> 

1270 </head> 

1271 <body> 

1272 <h1>Commit Diff: {commit_hash}</h1> 

1273 {html_body} 

1274 </body> 

1275 </html> 

1276 """ 

1277 output_path = Path(output_html) 

1278 output_path.write_text(html_template, encoding="utf-8") 

1279 return str(output_path) 

1280 except TypeError as e: 

1281 raise e 

1282 except Exception as e: 

1283 return f"Error: {str(e)}" 

1284 

1285 def find_version(self, commit_sha: str, submodule: str = None) -> Optional[str]: 

1286 """ 

1287 Find the version that contains a specific commit. 

1288 

1289 Args: 

1290 commit_sha: The commit SHA to find the version for 

1291 submodule: Optional submodule path 

1292 

1293 Returns: 

1294 The version string if found, None otherwise 

1295 """ 

1296 logger.info( 

1297 f"Finding version for commit {commit_sha} in " 

1298 f"{'submodule ' + submodule if submodule else 'main repository'}") 

1299 

1300 try: 

1301 # Validate commit 

1302 if not self.is_valid_commit(commit_sha, submodule): 

1303 logger.error(f"Invalid commit: {commit_sha}") 

1304 raise InvalidCommitError(f"Invalid commit: {commit_sha}") 

1305 

1306 # Find the version 

1307 version = self.find_first_version_containing_commit(commit_sha, submodule) 

1308 

1309 if version: 

1310 logger.info(f"Found version {version} for commit {commit_sha}") 

1311 else: 

1312 logger.info(f"No version found for commit {commit_sha}") 

1313 

1314 return version 

1315 

1316 except GitCommandError as e: 

1317 logger.error(f"Git error while finding version: {e}") 

1318 raise 

1319 except Exception as e: 

1320 logger.error(f"Error finding version: {e}") 

1321 raise 

1322 

1323 def check_repository_state(self) -> dict: 

1324 """ 

1325 Check the current state of the repository without raising exceptions. 

1326 

1327 Returns: 

1328 dict: A dictionary containing information about the repository state 

1329 """ 

1330 state = { 

1331 "branch": self.updated_branch, 

1332 "has_changes": not self.__is_clean_git_repo(), 

1333 "is_valid": True, 

1334 "error": None 

1335 } 

1336 

1337 try: 

1338 # Check if directory is a git repository 

1339 self._git.execute(["status"]) 

1340 except GitCommandError as e: 

1341 state["is_valid"] = False 

1342 state["error"] = f"Not a valid git repository: {str(e)}" 

1343 

1344 return state 

1345 

1346 def __del__(self): 

1347 """ 

1348 Destructor to ensure repository state is restored when the object is garbage collected. 

1349 This provides an additional safety net to ensure changes are always restored. 

1350 """ 

1351 try: 

1352 # Check if we need to restore the state 

1353 # If _state_restored is True, it means the state was already restored explicitly 

1354 if (hasattr(self, '_state_saved') and self._state_saved and 

1355 not (hasattr(self, '_state_restored') and self._state_restored)): 

1356 

1357 # Check if the repository directory still exists 

1358 if hasattr(self, 'repository_path') and os.path.exists(self.repository_path): 

1359 logger.info("VersionFinder being destroyed, attempting to restore repository state") 

1360 self.restore_repository_state() 

1361 else: 

1362 logger.debug("Repository directory no longer exists, skipping state restoration") 

1363 except Exception as e: 

1364 # We can't raise exceptions in __del__, so just log them 

1365 logger.error(f"Error in VersionFinder destructor: {str(e)}")