""" Module for executing git commands, sending results back to the handlers """ import base64 import datetime import os import pathlib import re import shlex import shutil import subprocess import traceback from enum import Enum, IntEnum from pathlib import Path from typing import Dict, List, Optional, Tuple from urllib.parse import unquote import nbformat import pexpect import tornado import tornado.locks from jupyter_server.utils import ensure_async from nbdime import diff_notebooks, merge_notebooks from .log import get_logger # Regex pattern to capture (key, value) of Git configuration options. # See https://git-scm.com/docs/git-config#_syntax for git var syntax CONFIG_PATTERN = re.compile(r"(?:^|\n)([\w\-\.]+)\=") DEFAULT_REMOTE_NAME = "origin" # Maximum number of character of command output to print in debug log MAX_LOG_OUTPUT = 500 # type: int # Ensure on NFS or similar, that we give the .git/index.lock time to be removed MAX_WAIT_FOR_LOCK_S = 5 # How often should we check for the lock above to be free? This comes up more on things like NFS CHECK_LOCK_INTERVAL_S = 0.1 # Parse Git version output GIT_VERSION_REGEX = re.compile(r"^git\sversion\s(?P\d+(.\d+)*)") # Parse Git branch status GIT_BRANCH_STATUS = re.compile( r"^## (?P([\w\-/]+|HEAD \(no branch\)|No commits yet on \w+))(\.\.\.(?P[\w\-/]+)( \[(ahead (?P\d+))?(, )?(behind (?P\d+))?\])?)?$" ) # Parse Git detached head GIT_DETACHED_HEAD = re.compile(r"^\(HEAD detached at (?P.+?)\)$") # Parse Git branch rebase name GIT_REBASING_BRANCH = re.compile(r"^\(no branch, rebasing (?P.+?)\)$") # Git cache as a credential helper GIT_CREDENTIAL_HELPER_CACHE = re.compile(r"cache\b") # Parse git stash list GIT_STASH_LIST = re.compile( r"^stash@{(?P\d+)}: (WIP on|On) (?P.+?): (?P.+?)$" ) execution_lock = tornado.locks.Lock() class State(IntEnum): """Git repository state.""" # Default state DEFAULT = (0,) # Detached head state DETACHED = (1,) # Merge in progress MERGING = (2,) # Rebase in progress REBASING = (3,) # Cherry-pick in progress CHERRY_PICKING = 4 class RebaseAction(Enum): """Git available action when rebasing.""" CONTINUE = 1 SKIP = 2 ABORT = 3 async def execute( cmdline: "List[str]", cwd: "str", timeout: "float" = 20, env: "Optional[Dict[str, str]]" = None, username: "Optional[str]" = None, password: "Optional[str]" = None, is_binary=False, ) -> "Tuple[int, str, str]": """Asynchronously execute a command. Args: cmdline (List[str]): Command line to be executed cwd (Optional[str]): Current working directory env (Optional[Dict[str, str]]): Defines the environment variables for the new process username (Optional[str]): User name password (Optional[str]): User password Returns: (int, str, str): (return code, stdout, stderr) """ async def call_subprocess_with_authentication( cmdline: "List[str]", username: "str", password: "str", cwd: "Optional[str]" = None, env: "Optional[Dict[str, str]]" = None, ) -> "Tuple[int, str, str]": try: p = pexpect.spawn( cmdline[0], cmdline[1:], cwd=cwd, env=env, encoding="utf-8", timeout=None, ) # We expect a prompt from git # In most of cases git will prompt for username and # then for password # In some cases (Bitbucket) username is included in # remote URL, so git will not ask for username i = await p.expect(["Username for .*: ", "Password for .*:"], async_=True) if i == 0: # ask for username then password p.sendline(username) await p.expect("Password for .*:", async_=True) p.sendline(password) elif i == 1: # only ask for password p.sendline(password) await p.expect(pexpect.EOF, async_=True) response = p.before returncode = p.wait() p.close() return returncode, "", response except pexpect.exceptions.EOF: # In case of pexpect failure response = p.before returncode = p.exitstatus p.close() # close process return returncode, "", response def call_subprocess( cmdline: "List[str]", cwd: "Optional[str]" = None, env: "Optional[Dict[str, str]]" = None, is_binary=is_binary, ) -> "Tuple[int, str, str]": process = subprocess.Popen( cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=cwd, env=env ) output, error = process.communicate() if is_binary: return ( process.returncode, base64.encodebytes(output).decode("ascii"), error.decode("utf-8"), ) else: return (process.returncode, output.decode("utf-8"), error.decode("utf-8")) try: await execution_lock.acquire(timeout=datetime.timedelta(seconds=timeout)) except tornado.util.TimeoutError: return (1, "", "Unable to get the lock on the directory") try: # Ensure our execution operation will succeed by first checking and waiting for the lock to be removed time_slept = 0 lockfile = os.path.join(cwd, ".git", "index.lock") while os.path.exists(lockfile) and time_slept < MAX_WAIT_FOR_LOCK_S: await tornado.gen.sleep(CHECK_LOCK_INTERVAL_S) time_slept += CHECK_LOCK_INTERVAL_S # If the lock still exists at this point, we will likely fail anyway, but let's try anyway get_logger().debug("Execute {!s} in {!s}.".format(cmdline, cwd)) if username is not None and password is not None: code, output, error = await call_subprocess_with_authentication( cmdline, username, password, cwd, env, ) else: current_loop = tornado.ioloop.IOLoop.current() code, output, error = await current_loop.run_in_executor( None, call_subprocess, cmdline, cwd, env ) log_output = ( output[:MAX_LOG_OUTPUT] + "..." if len(output) > MAX_LOG_OUTPUT else output ) log_error = ( error[:MAX_LOG_OUTPUT] + "..." if len(error) > MAX_LOG_OUTPUT else error ) get_logger().debug( "Code: {}\nOutput: {}\nError: {}".format(code, log_output, log_error) ) except BaseException as e: code, output, error = -1, "", traceback.format_exc() get_logger().warning("Fail to execute {!s}".format(cmdline), exc_info=True) finally: execution_lock.release() return code, output, error def strip_and_split(s): """strip trailing \x00 and split on \x00 Useful for parsing output of git commands with -z flag. """ return s.strip("\x00").strip("\n").split("\x00") class Git: """ A single parent class containing all of the individual git methods in it. """ _GIT_CREDENTIAL_CACHE_DAEMON_PROCESS: subprocess.Popen = None def __init__(self, config=None): self._config = config self._execute_timeout = ( 20.0 if self._config is None else self._config.git_command_timeout ) def __del__(self): if self._GIT_CREDENTIAL_CACHE_DAEMON_PROCESS: self._GIT_CREDENTIAL_CACHE_DAEMON_PROCESS.terminate() async def __execute( self, cmdline: "List[str]", cwd: "str", env: "Optional[Dict[str, str]]" = None, username: "Optional[str]" = None, password: "Optional[str]" = None, is_binary=False, ) -> "Tuple[int, str, str]": return await execute( cmdline, cwd=cwd, timeout=self._execute_timeout, env=env, username=username, password=password, is_binary=is_binary, ) async def config(self, path, **kwargs): """Get or set Git options. If no kwargs, all options are returned. Otherwise kwargs are set. """ response = {"code": 1} if len(kwargs): output = [] for k, v in kwargs.items(): cmd = ["git", "config", "--add", k, v] code, out, err = await self.__execute(cmd, cwd=path) output.append(out.strip()) response["code"] = code if code != 0: response["command"] = " ".join(cmd) response["message"] = err.strip() return response response["message"] = "\n".join(output).strip() else: cmd = ["git", "config", "--list"] code, output, error = await self.__execute(cmd, cwd=path) response = {"code": code} if code != 0: response["command"] = " ".join(cmd) response["message"] = error.strip() else: raw = output.strip() s = CONFIG_PATTERN.split(raw) response["options"] = {k: v for k, v in zip(s[1::2], s[2::2])} return response async def changed_files(self, path, base=None, remote=None, single_commit=None): """Gets the list of changed files between two Git refs, or the files changed in a single commit There are two reserved "refs" for the base 1. WORKING : Represents the Git working tree 2. INDEX: Represents the Git staging area / index Keyword Arguments: single_commit {string} -- The single commit ref base {string} -- the base Git ref remote {string} -- the remote Git ref Returns: dict -- the response of format { "code": int, # Command status code "files": [string, string], # List of files changed. "message": [string] # Error response } """ if single_commit: cmd = ["git", "diff", single_commit, "--name-only", "-z"] elif base and remote: if base == "WORKING": cmd = ["git", "diff", remote, "--name-only", "-z"] elif base == "INDEX": cmd = ["git", "diff", "--staged", remote, "--name-only", "-z"] else: cmd = ["git", "diff", base, remote, "--name-only", "-z"] else: raise tornado.web.HTTPError( 400, "Either single_commit or (base and remote) must be provided" ) response = {} try: code, output, error = await self.__execute(cmd, cwd=path) except subprocess.CalledProcessError as e: response["code"] = e.returncode response["message"] = e.output.decode("utf-8") else: response["code"] = code if code != 0: response["command"] = " ".join(cmd) response["message"] = error else: response["files"] = strip_and_split(output) return response async def clone(self, path, repo_url, auth=None, versioning=True, submodules=False): """ Execute `git clone`. When no auth is provided, disables prompts for the password to avoid the terminal hanging. When auth is provided, await prompts for username/passwords and sends them :param path: the directory where the clone will be performed. :param repo_url: the URL of the repository to be cloned. :param auth: OPTIONAL dictionary with 'username' and 'password' fields :param versioning: OPTIONAL whether to clone or download a snapshot of the remote repository; default clone :param submodules: OPTIONAL whether to clone submodules content; default False :return: response with status code and error message. """ env = os.environ.copy() cmd = ["git", "clone"] if not versioning: cmd.append("--depth=1") current_content = set(os.listdir(path)) if submodules: cmd.append("--recurse-submodules") cmd.append(unquote(repo_url)) if auth: if auth.get("cache_credentials"): await self.ensure_credential_helper(path) env["GIT_TERMINAL_PROMPT"] = "1" cmd.append("-q") code, output, error = await self.__execute( cmd, username=auth["username"], password=auth["password"], cwd=path, env=env, ) else: env["GIT_TERMINAL_PROMPT"] = "0" code, output, error = await self.__execute( cmd, cwd=path, env=env, ) if not versioning: new_content = set(os.listdir(path)) directory = (new_content - current_content).pop() shutil.rmtree(f"{path}/{directory}/.git") response = {"code": code, "message": output.strip()} if code != 0: response["message"] = error.strip() return response async def fetch(self, path, auth=None): """ Execute git fetch command """ cwd = path # Start by fetching to get accurate ahead/behind status cmd = [ "git", "fetch", "--all", "--prune", ] # Run prune by default to help beginners env = os.environ.copy() if auth: if auth.get("cache_credentials"): await self.ensure_credential_helper(path) env["GIT_TERMINAL_PROMPT"] = "1" code, _, fetch_error = await self.__execute( cmd, cwd=cwd, username=auth["username"], password=auth["password"], env=env, ) else: env["GIT_TERMINAL_PROMPT"] = "0" code, _, fetch_error = await self.__execute(cmd, cwd=cwd, env=env) result = { "code": code, } if code != 0: result["command"] = " ".join(cmd) result["error"] = fetch_error result["message"] = fetch_error return result async def get_nbdiff( self, prev_content: str, curr_content: str, base_content=None ) -> dict: """Compute the diff between two notebooks. Args: prev_content: Notebook previous content curr_content: Notebook current content base_content: Notebook base content - only passed during a merge conflict Returns: if not base_content: {"base": Dict, "diff": Dict} else: {"base": Dict, "merge_decisions": Dict} """ def read_notebook(content): if not content: return nbformat.versions[nbformat.current_nbformat].new_notebook() if isinstance(content, dict): # Content may come from model as a dict directly return ( nbformat.versions[ content.get("nbformat", nbformat.current_nbformat) ] .nbjson.JSONReader() .to_notebook(content) ) else: return nbformat.reads(content, as_version=4) # TODO Fix this in nbdime def remove_cell_ids(nb): for cell in nb.cells: cell.pop("id", None) return nb current_loop = tornado.ioloop.IOLoop.current() prev_nb = await current_loop.run_in_executor(None, read_notebook, prev_content) curr_nb = await current_loop.run_in_executor(None, read_notebook, curr_content) if base_content: base_nb = await current_loop.run_in_executor( None, read_notebook, base_content ) # Only remove ids from merge_notebooks as a workaround _, merge_decisions = await current_loop.run_in_executor( None, merge_notebooks, remove_cell_ids(base_nb), remove_cell_ids(prev_nb), remove_cell_ids(curr_nb), ) return {"base": base_nb, "merge_decisions": merge_decisions} else: thediff = await current_loop.run_in_executor( None, diff_notebooks, prev_nb, curr_nb ) return {"base": prev_nb, "diff": thediff} async def status(self, path: str) -> dict: """ Execute git status command & return the result. """ cmd = ["git", "status", "--porcelain", "-b", "-u", "-z"] code, status, my_error = await self.__execute(cmd, cwd=path) if code != 0: return { "code": code, "command": " ".join(cmd), "message": my_error, } # Add attribute `is_binary` command = [ # Compare stage to an empty tree see `_is_binary` "git", "diff", "--numstat", "-z", "--cached", "4b825dc642cb6eb9a060e54bf8d69288fbee4904", ] text_code, text_output, _ = await self.__execute(command, cwd=path) are_binary = dict() if text_code == 0: for line in filter(lambda l: len(l) > 0, strip_and_split(text_output)): diff, name = line.rsplit("\t", maxsplit=1) are_binary[name] = diff.startswith("-\t-") data = { "code": code, "branch": None, "remote": None, "ahead": 0, "behind": 0, "files": [], } result = [] line_iterable = (line for line in strip_and_split(status) if line) try: first_line = next(line_iterable) # Interpret branch line match = GIT_BRANCH_STATUS.match(first_line) if match is not None: d = match.groupdict() branch = d.get("branch") if branch == "HEAD (no branch)": branch = "(detached)" elif branch.startswith("No commits yet on "): branch = "(initial)" data["branch"] = branch data["remote"] = d.get("remote") data["ahead"] = int(d.get("ahead") or 0) data["behind"] = int(d.get("behind") or 0) # Interpret file lines for line in line_iterable: name = line[3:] result.append( { "x": line[0], "y": line[1], "to": name, # if file was renamed, next line contains original path "from": next(line_iterable) if line[0] == "R" else name, "is_binary": are_binary.get(name, None), } ) data["files"] = result except StopIteration: # Raised if line_iterable is empty pass # Test for repository state states = { State.CHERRY_PICKING: "CHERRY_PICK_HEAD", State.MERGING: "MERGE_HEAD", # Looking at REBASE_HEAD is not reliable as it may not be clean in the .git folder # e.g. when skipping the last commit of a ongoing rebase # So looking for folder `rebase-apply` and `rebase-merge`; see https://stackoverflow.com/questions/3921409/how-to-know-if-there-is-a-git-rebase-in-progress State.REBASING: ["rebase-merge", "rebase-apply"], } state = State.DEFAULT for state_, head in states.items(): if isinstance(head, str): code, _, _ = await self.__execute( ["git", "show", "--quiet", head], cwd=path ) if code == 0: state = state_ break else: found = False for directory in head: code, output, _ = await self.__execute( ["git", "rev-parse", "--git-path", directory], cwd=path ) filepath = output.strip("\n\t ") if code == 0 and (Path(path) / filepath).exists(): found = True state = state_ break if found: break if state == State.DEFAULT and data["branch"] == "(detached)": state = State.DETACHED data["state"] = state return data async def log(self, path, history_count=10, follow_path=None): """ Execute git log command & return the result. """ is_single_file = follow_path != None cmd = [ "git", "log", "--pretty=format:%H%n%an%n%ar%n%s%n%P", ("-%d" % history_count), ] if is_single_file: cmd += [ "-z", "--numstat", "--follow", "--", follow_path, ] code, my_output, my_error = await self.__execute( cmd, cwd=path, ) if code != 0: return {"code": code, "command": " ".join(cmd), "message": my_error} result = [] line_array = my_output.splitlines() if is_single_file: parsed_lines = [] for line in line_array: parsed_lines.extend( re.sub(r"\t\0|\0", "\t", l) for l in line.strip("\0\t").split("\0\0", maxsplit=1) ) line_array = parsed_lines PREVIOUS_COMMIT_OFFSET = 6 if is_single_file else 5 for i in range(0, len(line_array), PREVIOUS_COMMIT_OFFSET): commit = { "commit": line_array[i], "author": line_array[i + 1], "date": line_array[i + 2], "commit_msg": line_array[i + 3], "pre_commits": line_array[i + 4].split(" ") if i + 4 < len(line_array) and line_array[i + 4] else [], } if is_single_file: commit["is_binary"] = line_array[i + 5].startswith("-\t-\t") # [insertions, deletions, previous_file_path?, current_file_path] file_info = line_array[i + 5].split() if len(file_info) == 4: commit["previous_file_path"] = file_info[2] commit["file_path"] = file_info[-1] result.append(commit) return {"code": code, "commits": result} async def detailed_log(self, selected_hash, path): """ Execute git log -m --cc -1 --numstat --oneline -z command (used to get insertions & deletions per file) & return the result. """ cmd = [ "git", "log", "--cc", "-m", "-1", "--oneline", "--numstat", "--pretty=format:%b%x00", "-z", selected_hash, ] code, my_output, my_error = await self.__execute( cmd, cwd=path, ) if code != 0: return {"code": code, "command": " ".join(cmd), "message": my_error} total_insertions = 0 total_deletions = 0 result = [] first_split = my_output.split("\x00", 1) commit_body = first_split[0].strip() line_iterable = iter(strip_and_split(first_split[1].strip())) for line in line_iterable: is_binary = line.startswith("-\t-\t") previous_file_path = "" tokens = line.split("\t") if len(tokens) == 3: insertions, deletions, file = line.split("\t") insertions = insertions.replace("-", "0") deletions = deletions.replace("-", "0") if file == "": # file was renamed or moved, we need next two lines of output from_path = next(line_iterable) to_path = next(line_iterable) previous_file_path = from_path modified_file_name = from_path + " => " + to_path modified_file_path = to_path else: modified_file_name = file.split("/")[-1] modified_file_path = file file_info = { "modified_file_path": modified_file_path, "modified_file_name": modified_file_name, "insertion": insertions, "deletion": deletions, "is_binary": is_binary, } if previous_file_path: file_info["previous_file_path"] = previous_file_path result.append(file_info) total_insertions += int(insertions) total_deletions += int(deletions) modified_file_note = "{num_files} files changed, {insertions} insertions(+), {deletions} deletions(-)".format( num_files=len(result), insertions=total_insertions, deletions=total_deletions, ) return { "code": code, "commit_body": commit_body, "modified_file_note": modified_file_note, "modified_files_count": str(len(result)), "number_of_insertions": str(total_insertions), "number_of_deletions": str(total_deletions), "modified_files": result, } async def diff(self, path, previous=None, current=None): """ Execute git diff command & return the result. """ cmd = ["git", "diff", "--numstat", "-z"] if previous: cmd.append(previous) if current: cmd.append(current) code, my_output, my_error = await self.__execute(cmd, cwd=path) if code != 0: return {"code": code, "command": " ".join(cmd), "message": my_error} result = [] line_array = strip_and_split(my_output) for line in line_array: linesplit = line.split() result.append( { "insertions": linesplit[0], "deletions": linesplit[1], "filename": linesplit[2], } ) return {"code": code, "result": result} async def branch(self, path): """ Execute 'git for-each-ref' command & return the result. """ heads = await self.branch_heads(path) if heads["code"] != 0: # error; bail return heads remotes = await self.branch_remotes(path) if remotes["code"] != 0: # error; bail return remotes # Extract commit hash in case of detached head is_detached = GIT_DETACHED_HEAD.match(heads["current_branch"]["name"]) if is_detached is not None: try: heads["current_branch"]["name"] = is_detached.groupdict()["commit"] except KeyError: pass else: # Extract branch name in case of rebasing rebasing = GIT_REBASING_BRANCH.match(heads["current_branch"]["name"]) if rebasing is not None: try: heads["current_branch"]["name"] = rebasing.groupdict()["branch"] except KeyError: pass # all's good; concatenate results and return return { "code": 0, "branches": heads["branches"] + remotes["branches"], "current_branch": heads["current_branch"], } async def branch_delete(self, path, branch): """Execute 'git branch -D '""" cmd = ["git", "branch", "-D", branch] code, _, error = await self.__execute(cmd, cwd=path) if code != 0: return {"code": code, "command": " ".join(cmd), "message": error} else: return {"code": code} async def branch_heads(self, path): """ Execute 'git for-each-ref' command on refs/heads & return the result. """ # Format reference: https://git-scm.com/docs/git-for-each-ref#_field_names formats = ["refname:short", "objectname", "upstream:short", "HEAD"] cmd = [ "git", "for-each-ref", "--format=" + "%09".join("%({})".format(f) for f in formats), "refs/heads/", ] code, output, error = await self.__execute(cmd, cwd=path) if code != 0: return {"code": code, "command": " ".join(cmd), "message": error} current_branch = None results = [] try: for name, commit_sha, upstream_name, is_current_branch in ( line.split("\t") for line in output.splitlines() ): is_current_branch = bool(is_current_branch.strip()) branch = { "is_current_branch": is_current_branch, "is_remote_branch": False, "name": name, "upstream": upstream_name if upstream_name else None, "top_commit": commit_sha, "tag": None, } results.append(branch) if is_current_branch: current_branch = branch # Above can fail in certain cases, such as an empty repo with # no commits. In that case, just fall back to determining # current branch if not current_branch: current_name = await self.get_current_branch(path) branch = { "is_current_branch": True, "is_remote_branch": False, "name": current_name, "upstream": None, "top_commit": None, "tag": None, } results.append(branch) current_branch = branch return { "code": code, "branches": results, "current_branch": current_branch, } except Exception as downstream_error: return { "code": -1, "command": " ".join(cmd), "message": str(downstream_error), } async def branch_remotes(self, path): """ Execute 'git for-each-ref' command on refs/heads & return the result. """ # Format reference: https://git-scm.com/docs/git-for-each-ref#_field_names formats = ["refname:short", "objectname"] cmd = [ "git", "for-each-ref", "--format=" + "%09".join("%({})".format(f) for f in formats), "refs/remotes/", ] code, output, error = await self.__execute(cmd, cwd=path) if code != 0: return {"code": code, "command": " ".join(cmd), "message": error} results = [] try: for name, commit_sha in (line.split("\t") for line in output.splitlines()): results.append( { "is_current_branch": False, "is_remote_branch": True, "name": name, "upstream": None, "top_commit": commit_sha, "tag": None, } ) return {"code": code, "branches": results} except Exception as downstream_error: return { "code": -1, "command": " ".join(cmd), "message": str(downstream_error), } async def show_top_level(self, path): """ Execute git --show-toplevel command & return the result. """ cmd = ["git", "rev-parse", "--show-toplevel"] code, my_output, my_error = await self.__execute( cmd, cwd=path, ) if code == 0: return {"code": code, "path": my_output.strip("\n")} else: # Handle special case where cwd not inside a git repo lower_error = my_error.lower() if "fatal: not a git repository" in lower_error: return {"code": 0, "path": None} return { "code": code, "command": " ".join(cmd), "message": my_error, } async def show_prefix(self, path): """ Execute git --show-prefix command & return the result. """ cmd = ["git", "rev-parse", "--show-prefix"] code, my_output, my_error = await self.__execute( cmd, cwd=path, ) if code == 0: result = {"code": code, "path": my_output.strip("\n")} return result else: # Handle special case where cwd not inside a git repo lower_error = my_error.lower() if "fatal: not a git repository" in lower_error: return {"code": 0, "path": None} return { "code": code, "command": " ".join(cmd), "message": my_error, } async def add(self, filename, path): """ Execute git add command & return the result. """ if not isinstance(filename, str): # assume filename is a sequence of str cmd = ["git", "add"] + list(filename) else: cmd = ["git", "add", filename] code, _, error = await self.__execute(cmd, cwd=path) if code != 0: return {"code": code, "command": " ".join(cmd), "message": error} return {"code": code} async def add_all(self, path): """ Execute git add all command & return the result. """ cmd = ["git", "add", "-A"] code, _, error = await self.__execute(cmd, cwd=path) if code != 0: return {"code": code, "command": " ".join(cmd), "message": error} return {"code": code} async def add_all_unstaged(self, path): """ Execute git add all unstaged command & return the result. """ cmd = ["git", "add", "-u"] code, _, error = await self.__execute(cmd, cwd=path) if code != 0: return {"code": code, "command": " ".join(cmd), "message": error} return {"code": code} async def add_all_untracked(self, path): """ Find all untracked files, execute git add & return the result. """ status = await self.status(path) if status["code"] != 0: return status untracked = [] for f in status["files"]: if f["x"] == "?" and f["y"] == "?": untracked.append(f["from"].strip('"')) return await self.add(untracked, path) async def reset(self, filename, path): """ Execute git reset command & return the result. """ cmd = ["git", "reset", "--", filename] code, _, error = await self.__execute(cmd, cwd=path) if code != 0: return {"code": code, "command": " ".join(cmd), "message": error} return {"code": code} async def reset_all(self, path): """ Execute git reset command & return the result. """ cmd = ["git", "reset"] code, _, error = await self.__execute(cmd, cwd=path) if code != 0: return {"code": code, "command": " ".join(cmd), "message": error} return {"code": code} async def delete_commit(self, commit_id, path): """ Delete a specified commit from the repository. """ cmd = ["git", "revert", "-m", "1", "--no-commit", commit_id] code, _, error = await self.__execute(cmd, cwd=path) if code != 0: return {"code": code, "command": " ".join(cmd), "message": error} return {"code": code} async def reset_to_commit(self, commit_id, path): """ Reset the current branch to a specific past commit. """ cmd = ["git", "reset", "--hard"] if commit_id: cmd.append(commit_id) code, _, error = await self.__execute(cmd, cwd=path) if code != 0: return {"code": code, "command": " ".join(cmd), "message": error} return {"code": code} async def checkout_new_branch(self, branchname, startpoint, path): """ Execute git checkout command & return the result. """ cmd = ["git", "checkout", "-b", branchname, startpoint] code, my_output, my_error = await self.__execute( cmd, cwd=path, ) if code == 0: return {"code": code, "message": my_output} else: return { "code": code, "command": " ".join(cmd), "message": my_error, } async def _get_branch_reference(self, branchname, path): """ Execute git rev-parse --symbolic-full-name and return the result (or None). """ code, my_output, _ = await self.__execute( ["git", "rev-parse", "--symbolic-full-name", branchname], cwd=path, ) if code == 0: return my_output.strip("\n") else: return None async def checkout_branch(self, branchname, path): """ Execute git checkout command & return the result. Use the --track parameter for a remote branch. """ reference_name = await self._get_branch_reference(branchname, path) if reference_name is None: is_remote_branch = False else: is_remote_branch = self._is_remote_branch(reference_name) if is_remote_branch: local_branchname = branchname.split("/")[-1] cmd = ["git", "checkout", "-B", local_branchname, branchname] else: cmd = ["git", "checkout", branchname] code, my_output, my_error = await self.__execute(cmd, cwd=path) if code == 0: return {"code": 0, "message": my_output} else: return {"code": code, "message": my_error, "command": " ".join(cmd)} async def checkout(self, filename, path): """ Execute git checkout command for the filename & return the result. """ cmd = ["git", "checkout", "--", filename] code, _, error = await self.__execute(cmd, cwd=path) if code != 0: return {"code": code, "command": " ".join(cmd), "message": error} return {"code": code} async def checkout_all(self, path): """ Execute git checkout command & return the result. """ cmd = ["git", "checkout", "--", "."] code, _, error = await self.__execute(cmd, cwd=path) if code != 0: return {"code": code, "command": " ".join(cmd), "message": error} return {"code": code} async def merge(self, branch: str, path: str) -> dict: """ Execute git merge command & return the result. """ cmd = ["git", "merge", branch] code, output, error = await self.__execute(cmd, cwd=path) if code != 0: return {"code": code, "command": " ".join(cmd), "message": error} return {"code": code, "message": output.strip()} async def commit(self, commit_msg, amend, path, author=None): """ Execute git commit command & return the result. If the amend argument is true, amend the commit instead of creating a new one. """ cmd = ["git", "commit"] if author: cmd.extend(["--author", author]) if amend: cmd.extend(["--amend", "--no-edit"]) else: cmd.extend(["--m", commit_msg]) code, _, error = await self.__execute(cmd, cwd=path) if code != 0: return {"code": code, "command": " ".join(cmd), "message": error} return {"code": code} async def pull(self, path, auth=None, cancel_on_conflict=False): """ Execute git pull --no-commit. Disables prompts for the password to avoid the terminal hanging while waiting for auth. """ env = os.environ.copy() if auth: if auth.get("cache_credentials"): await self.ensure_credential_helper(path) env["GIT_TERMINAL_PROMPT"] = "1" code, output, error = await self.__execute( ["git", "pull", "--no-commit"], username=auth["username"], password=auth["password"], cwd=path, env=env, ) else: env["GIT_TERMINAL_PROMPT"] = "0" code, output, error = await self.__execute( ["git", "pull", "--no-commit"], env=env, cwd=path, ) response = {"code": code, "message": output.strip()} if code != 0: output = output.strip() has_conflict = ( "automatic merge failed; fix conflicts and then commit the result." in output.lower() ) if cancel_on_conflict and has_conflict: code, _, error = await self.__execute( ["git", "merge", "--abort"], cwd=path, ) if code == 0: response[ "message" ] = "Unable to pull latest changes as doing so would result in a merge conflict. In order to push your local changes, you may want to consider creating a new branch based on your current work and pushing the new branch. Provided your repository is hosted (e.g., on GitHub), once pushed, you can create a pull request against the original branch on the remote repository and manually resolve the conflicts during pull request review." else: response["message"] = error.strip() elif has_conflict: response["message"] = output else: response["message"] = error.strip() return response async def push( self, remote, branch, path, auth=None, set_upstream=False, force=False, tags=True, ): """ Execute `git push $UPSTREAM $BRANCH`. The choice of upstream and branch is up to the caller. """ command = ["git", "push"] if tags: command.append("--tags") if force: command.append("--force-with-lease") if set_upstream: command.append("--set-upstream") command.extend([remote, branch]) env = os.environ.copy() if auth: if auth.get("cache_credentials"): await self.ensure_credential_helper(path) env["GIT_TERMINAL_PROMPT"] = "1" code, output, error = await self.__execute( command, username=auth["username"], password=auth["password"], cwd=path, env=env, ) else: env["GIT_TERMINAL_PROMPT"] = "0" code, output, error = await self.__execute( command, env=env, cwd=path, ) response = {"code": code, "message": output.strip()} if code != 0: response["message"] = error.strip() return response async def init(self, path): """ Execute git init command & return the result. """ cmd = ["git", "init"] cwd = path code, _, error = await self.__execute(cmd, cwd=cwd) actions = None if code == 0: code, actions = await self._maybe_run_actions("post_init", cwd) if code != 0: return { "code": code, "command": " ".join(cmd), "message": error, "actions": actions, } return {"code": code, "actions": actions} async def _maybe_run_actions(self, name, cwd): code = 0 actions = None if self._config and name in self._config.actions: actions = [] actions_list = self._config.actions[name] for action in actions_list: try: # We trust the actions as they were passed via a config and not the UI code, stdout, stderr = await self.__execute( shlex.split(action), cwd=cwd ) actions.append( { "cmd": action, "code": code, "stdout": stdout, "stderr": stderr, } ) # After any failure, stop except Exception as e: code = 1 actions.append( { "cmd": action, "code": 1, "stdout": None, "stderr": "Exception: {}".format(e), } ) if code != 0: break return code, actions def _is_remote_branch(self, branch_reference): """Check if given branch is remote branch by comparing with 'remotes/', TODO : Consider a better way to check remote branch """ return branch_reference.startswith("refs/remotes/") async def get_current_branch(self, path): """Use `symbolic-ref` to get the current branch name. In case of failure, assume that the HEAD is currently detached or rebasing, and fall back to the `branch` command to get the name. See https://git-blame.blogspot.com/2013/06/checking-current-branch-programatically.html """ command = ["git", "symbolic-ref", "--short", "HEAD"] code, output, error = await self.__execute(command, cwd=path) if code == 0: return output.strip() elif "not a symbolic ref" in error.lower(): current_branch = await self._get_current_branch_detached(path) return current_branch else: raise Exception( "Error [{}] occurred while executing [{}] command to get current branch.".format( error, " ".join(command) ) ) async def _get_current_branch_detached(self, path): """Execute 'git branch -a' to get current branch details in case of dirty state (rebasing, detached head,...).""" command = ["git", "branch", "-a"] code, output, error = await self.__execute(command, cwd=path) if code == 0: for branch in output.splitlines(): branch = branch.strip() if branch.startswith("*"): return branch.lstrip("* ") else: raise Exception( "Error [{}] occurred while executing [{}] command to get current state.".format( error, " ".join(command) ) ) async def get_upstream_branch(self, path, branch_name): """Execute 'git rev-parse --abbrev-ref branch_name@{upstream}' to get upstream branch name tracked by given local branch. Reference : https://git-scm.com/docs/git-rev-parse#git-rev-parse-emltbranchnamegtupstreamemegemmasterupstreamememuem """ command = [ "git", "rev-parse", "--abbrev-ref", "{}@{{upstream}}".format(branch_name), ] code, output, error = await self.__execute(command, cwd=path) if code != 0: return {"code": code, "command": " ".join(command), "message": error} rev_parse_output = output.strip() command = ["git", "config", "--local", "branch.{}.remote".format(branch_name)] code, output, error = await self.__execute(command, cwd=path) if code != 0: return {"code": code, "command": " ".join(command), "message": error} remote_name = output.strip() remote_branch = rev_parse_output.strip().replace(remote_name + "/", "", 1) return { "code": code, "remote_short_name": remote_name, "remote_branch": remote_branch, } async def _get_tag(self, path, commit_sha): """Execute 'git describe commit_sha' to get nearest tag associated with latest commit in branch. Reference : https://git-scm.com/docs/git-describe#git-describe-ltcommit-ishgt82308203 """ command = ["git", "describe", "--tags", commit_sha] code, output, error = await self.__execute(command, cwd=path) if code == 0: return output.strip() elif "fatal: no tags can describe '{}'.".format(commit_sha) in error.lower(): return None elif "fatal: no names found" in error.lower(): return None else: raise Exception( "Error [{}] occurred while executing [{}] command to get nearest tag associated with branch.".format( error, " ".join(command) ) ) async def _get_base_ref(self, path, filename): """Get the object reference for an unmerged ``filename`` at base stage. Execute git ls-files -u -z Returns: The object reference or None """ command = ["git", "ls-files", "-u", "-z", filename] code, output, error = await self.__execute(command, cwd=path) if code != 0: raise subprocess.CalledProcessError( code, " ".join(command), output=output, stderr=error ) split_line = strip_and_split(output)[0].split() return split_line[1] if len(split_line) > 1 else None async def show(self, path, ref, filename=None, is_binary=False): """ Execute git show Or git show Return the file content """ command = ["git", "show"] if filename is None: command.append(ref) else: command.append(f"{ref}:{filename}") code, output, error = await self.__execute( command, cwd=path, is_binary=is_binary ) error_messages = map( lambda n: n.lower(), [ "fatal: Path '{}' exists on disk, but not in '{}'".format( filename, ref ), "fatal: Path '{}' does not exist (neither on disk nor in the index)".format( filename ), "fatal: Path '{}' does not exist in '{}'".format(filename, ref), "fatal: Invalid object name 'HEAD'", ], ) lower_error = error.lower() if code == 0: return output elif any([msg in lower_error for msg in error_messages]): return "" else: raise tornado.web.HTTPError( log_message="Error [{}] occurred while executing [{}] command to retrieve plaintext diff.".format( error, " ".join(command) ) ) async def get_content(self, contents_manager, filename, path): """ Get the file content of filename. """ relative_repo = os.path.relpath(path, contents_manager.root_dir) try: # Never request notebook model - see https://github.com/jupyterlab/jupyterlab-git/issues/970 model = await ensure_async( contents_manager.get( path=os.path.join(relative_repo, filename), type="file" ) ) except tornado.web.HTTPError as error: # Handle versioned file being deleted case if error.status_code == 404 and ( error.log_message.startswith("No such file or directory: ") or error.log_message.startswith("file or directory does not exist:") ): return "" raise error return model["content"] async def get_content_at_reference( self, filename, reference, path, contents_manager ): """ Collect get content of the file at the git reference. """ if "special" in reference: if reference["special"] == "WORKING": content = await self.get_content(contents_manager, filename, path) elif reference["special"] == "INDEX": is_binary = await self._is_binary(filename, "INDEX", path) if is_binary: content = await self.show( path, reference["git"], filename, is_binary=True ) else: content = await self.show(path, "", filename) elif reference["special"] == "BASE": # Special case of file in merge conflict for which we want the base (aka common ancestor) version ref = await self._get_base_ref(path, filename) content = await self.show(path, ref) else: raise tornado.web.HTTPError( log_message="Error while retrieving plaintext content, unknown special ref '{}'.".format( reference["special"] ) ) elif "git" in reference: is_binary = await self._is_binary(filename, reference["git"], path) if is_binary: content = await self.show( path, reference["git"], filename, is_binary=True ) else: content = await self.show(path, reference["git"], filename) else: content = "" return {"content": content} async def _is_binary(self, filename, ref, path): """ Determine whether Git handles a file as binary or text. ## References - - - Args: filename (str): Filename (relative to the git repository) ref (str): Commit reference or "INDEX" if file is staged path (str): Git repository filepath Returns: bool: Is file binary? Raises: HTTPError: if git command failed """ if ref == "INDEX": command = [ "git", "diff", "--numstat", "--cached", "4b825dc642cb6eb9a060e54bf8d69288fbee4904", "--", filename, ] else: command = [ "git", "diff", "--numstat", "4b825dc642cb6eb9a060e54bf8d69288fbee4904", ref, "--", filename, ] # where 4b825... is a magic SHA which represents the empty tree code, output, error = await self.__execute(command, cwd=path) if code != 0: error_messages = map( lambda n: n.lower(), [ "fatal: Path '{}' does not exist (neither on disk nor in the index)".format( filename ), "fatal: bad revision 'HEAD'", ], ) lower_error = error.lower() if any([msg in lower_error for msg in error_messages]): return False raise tornado.web.HTTPError( log_message="Error while determining if file is binary or text '{}'.".format( error ) ) # For binary files, `--numstat` outputs two `-` characters separated by TABs: return output.startswith("-\t-\t") async def remote_add(self, path, url, name=DEFAULT_REMOTE_NAME): """Handle call to `git remote add` command. path: str Top Git repository path url: str Git remote url name: str Remote name; default "origin" """ cmd = ["git", "remote", "add", name, url] code, _, error = await self.__execute(cmd, cwd=path) response = {"code": code, "command": " ".join(cmd)} if code != 0: response["message"] = error return response async def remote_show(self, path, verbose=False): """Handle call to `git remote show` command. Args: path (str): Git repository path verbose (bool): true if details are needed, otherwise, false Returns: if not verbose: List[str]: Known remotes if verbose: List[ { name: str, url: str } ]: Known remotes """ command = ["git", "remote"] if verbose: command.extend(["-v", "show"]) else: command.append("show") code, output, error = await self.__execute(command, cwd=path) response = {"code": code, "command": " ".join(command)} if code == 0: if verbose: response["remotes"] = [ {"name": r.split("\t")[0], "url": r.split("\t")[1][:-7]} for r in output.splitlines() if "(push)" in r ] else: response["remotes"] = [r.strip() for r in output.splitlines()] else: response["message"] = error return response async def remote_remove(self, path, name): """Handle call to `git remote remove ` command. Args: path (str): Git repository path name (str): Remote name """ command = ["git", "remote", "remove", name] code, _, error = await self.__execute(command, cwd=path) response = {"code": code, "command": " ".join(command)} if code != 0: response["message"] = error return response def read_file(self, path): """ Reads file content located at path and returns it as a string path: str The path of the file """ try: file = pathlib.Path(path) content = file.read_text() return {"code": 0, "content": content} except BaseException as error: return {"code": -1, "content": ""} async def ensure_gitignore(self, path): """Handle call to ensure .gitignore file exists and the next append will be on a new line (this means an empty file or a file ending with \n). path: str Top Git repository path """ try: gitignore = pathlib.Path(path) / ".gitignore" if not gitignore.exists(): gitignore.touch() elif gitignore.stat().st_size > 0: content = gitignore.read_text() if content[-1] != "\n": with gitignore.open("a") as f: f.write("\n") except BaseException as error: return {"code": -1, "message": str(error)} return {"code": 0} async def ignore(self, path, file_path): """Handle call to add an entry in .gitignore. path: str Top Git repository path file_path: str The path of the file in .gitignore """ try: res = await self.ensure_gitignore(path) if res["code"] != 0: return res gitignore = pathlib.Path(path) / ".gitignore" with gitignore.open("a") as f: f.write(file_path + "\n") except BaseException as error: return {"code": -1, "message": str(error)} return {"code": 0} async def write_gitignore(self, path, content): """ Handle call to overwrite .gitignore. Takes the .gitignore file and clears its previous contents Writes the new content onto the file path: str Top Git repository path content: str New file contents """ try: res = await self.ensure_gitignore(path) if res["code"] != 0: return res gitignore = pathlib.Path(path) / ".gitignore" if content and content[-1] != "\n": content += "\n" gitignore.write_text(content) except BaseException as error: return {"code": -1, "message": str(error)} return {"code": 0} async def version(self): """Return the Git command version. If an error occurs, return None. """ command = ["git", "--version"] code, output, _ = await self.__execute(command, cwd=os.curdir) if code == 0: version = GIT_VERSION_REGEX.match(output) if version is not None: return version.group("version") return None async def tags(self, path): """List all tags of the git repository, including the commit each tag points to. path: str Git path repository """ formats = ["refname:short", "objectname"] command = [ "git", "for-each-ref", "--format=" + "%09".join("%({})".format(f) for f in formats), "refs/tags", ] code, output, error = await self.__execute(command, cwd=path) if code != 0: return {"code": code, "command": " ".join(command), "message": error} tags = [] for tag_name, commit_id in (line.split("\t") for line in output.splitlines()): tag = {"name": tag_name, "baseCommitId": commit_id} tags.append(tag) return {"code": code, "tags": tags} async def tag_checkout(self, path, tag): """Checkout the git repository at a given tag. path: str Git path repository tag : str Tag to checkout """ command = ["git", "checkout", "tags/" + tag] code, _, error = await self.__execute(command, cwd=path) if code == 0: return {"code": code, "message": "Tag {} checked out".format(tag)} else: return { "code": code, "command": " ".join(command), "message": error, } async def set_tag(self, path, tag, commitId): """Set a git tag pointing to a specific commit. path: str Git path repository tag : str Name of new tag. commitId: Identifier of commit tag is pointing to. """ command = ["git", "tag", tag, commitId] code, _, error = await self.__execute(command, cwd=path) if code == 0: return { "code": code, "message": "Tag {} created, pointing to commit {}".format( tag, commitId ), } else: return { "code": code, "command": " ".join(command), "message": error, } async def check_credential_helper(self, path: str) -> Optional[bool]: """ Check if the credential helper exists, and whether we need to setup a Git credential cache daemon in case the credential helper is Git credential cache. path: str Git path repository Return None if the credential helper is not set. Otherwise, return True if we need to setup a Git credential cache daemon, else False. Raise an exception if `git config` errored. """ git_config_response: Dict[str, str] = await self.config(path) if git_config_response["code"] != 0: raise RuntimeError(git_config_response["message"]) git_config_kv_pairs = git_config_response["options"] has_credential_helper = "credential.helper" in git_config_kv_pairs if not has_credential_helper: return None if has_credential_helper and GIT_CREDENTIAL_HELPER_CACHE.match( git_config_kv_pairs["credential.helper"].strip() ): return True return False async def ensure_credential_helper( self, path: str, env: Dict[str, str] = None ) -> None: """ Check whether `git config --list` contains `credential.helper`. If it is not set, then it will be set to the value string for `credential.helper` defined in the server settings. path: str Git path repository env: Dict[str, str] Environment variables """ try: has_credential_helper = await self.check_credential_helper(path) if has_credential_helper == False: return except RuntimeError as e: get_logger().error("Error checking credential helper: %s", e, exc_info=True) return cache_daemon_required = has_credential_helper == True if has_credential_helper is None: credential_helper: str = self._config.credential_helper await self.config(path, **{"credential.helper": credential_helper}) if GIT_CREDENTIAL_HELPER_CACHE.match(credential_helper.strip()): cache_daemon_required = True # special case: Git credential cache if cache_daemon_required: try: self.ensure_git_credential_cache_daemon(cwd=path, env=env) except Exception as e: get_logger().error( "Error setting up Git credential cache daemon: %s", e, exc_info=True ) def ensure_git_credential_cache_daemon( self, socket: Optional[pathlib.Path] = None, debug: bool = False, force: bool = False, cwd: Optional[str] = None, env: Dict[str, str] = None, ) -> None: """ Spawn a Git credential cache daemon with the socket file being `socket` if it does not exist. If `debug` is `True`, the daemon will be spawned with `--debug` flag. If `socket` is empty, it is set to `~/.git-credential-cache-daemon`. If `force` is `True`, a daemon will be spawned, and if the daemon process is accessible, the existing daemon process will be terminated before spawning a new one. Otherwise, if `force` is `False`, the PID of the existing daemon process is returned. If the daemon process is not accessible, `-1` is returned. `cwd` and `env` are passed to the process that spawns the daemon. """ if not socket: socket = pathlib.Path.home() / ".git-credential-cache" / "socket" if socket.exists(): return if self._GIT_CREDENTIAL_CACHE_DAEMON_PROCESS is None or force: if force and self._GIT_CREDENTIAL_CACHE_DAEMON_PROCESS: self._GIT_CREDENTIAL_CACHE_DAEMON_PROCESS.terminate() if not socket.parent.exists(): socket.parent.mkdir(parents=True, exist_ok=True) socket.parent.chmod(0o700) args: List[str] = ["git", "credential-cache--daemon"] if debug: args.append("--debug") args.append(socket) self._GIT_CREDENTIAL_CACHE_DAEMON_PROCESS = subprocess.Popen( args, cwd=cwd, env=env, ) get_logger().debug( "A credential cache daemon has been spawned with PID %d", self._GIT_CREDENTIAL_CACHE_DAEMON_PROCESS.pid, ) elif self._GIT_CREDENTIAL_CACHE_DAEMON_PROCESS.poll(): self.ensure_git_credential_cache_daemon(socket, debug, True, cwd, env) async def rebase(self, branch: str, path: str) -> dict: """ Execute git rebase command & return the result. Args: branch: Branch to rebase onto path: Git repository path """ cmd = ["git", "rebase", branch] code, output, error = await execute(cmd, cwd=path) if code != 0: return {"code": code, "command": " ".join(cmd), "message": error} return {"code": code, "message": output.strip()} async def resolve_rebase(self, path: str, action: RebaseAction) -> dict: """ Execute git rebase -- command & return the result. Args: path: Git repository path """ option = action.name.lower() cmd = ["git", "rebase", f"--{option}"] env = None # For continue we force the editor to not show up # Ref: https://stackoverflow.com/questions/43489971/how-to-suppress-the-editor-for-git-rebase-continue if option == "continue": env = os.environ.copy() env["GIT_EDITOR"] = "true" code, output, error = await execute(cmd, cwd=path, env=env) if code != 0: return {"code": code, "command": " ".join(cmd), "message": error} return {"code": code, "message": output.strip()} async def stash(self, path: str, stashMsg: str = "") -> dict: """ Stash changes in a dirty working directory away path: str Git path repository stashMsg (optional): str A message that describes the stash entry """ cmd = ["git", "stash"] if len(stashMsg) > 0: cmd.extend(["save", "-m", stashMsg]) env = os.environ.copy() # if the git command is run in a non-interactive terminal, it will not prompt for user input env["GIT_TERMINAL_PROMPT"] = "0" code, output, error = await self.__execute(cmd, cwd=path, env=env) # code 0: no changes to stash if code != 0: return {"code": code, "command": " ".join(cmd), "message": error} return {"code": code, "message": output.strip()} async def stash_list(self, path: str) -> dict: """ Execute git stash list command """ cmd = ["git", "stash", "list"] env = os.environ.copy() env["GIT_TERMINAL_PROMPT"] = "0" code, output, error = await self.__execute(cmd, cwd=path, env=env) if code != 0: return {"code": code, "command": " ".join(cmd), "message": error} stashes = [] for line in output.strip("\n").splitlines(): match = GIT_STASH_LIST.match(line) if match is not None: d = match.groupdict() d["index"] = int(d["index"]) stashes.append(d) return {"code": code, "stashes": stashes} async def stash_show(self, path: str, index: int) -> dict: """ Execute git stash show command """ # stash_index = "stash@{" + str(index) + "}" stash_index = f"stash@{{{index!s}}}" cmd = ["git", "stash", "show", "-p", stash_index, "--name-only"] env = os.environ.copy() env["GIT_TERMINAL_PROMPT"] = "0" code, output, error = await self.__execute(cmd, cwd=path, env=env) if code != 0: return {"code": code, "command": " ".join(cmd), "message": error} files = output.strip("\n").splitlines() return {"code": code, "files": files} async def pop_stash(self, path: str, stash_index: Optional[int] = None) -> dict: """ Execute git stash pop for a certain index of the stash list. If no index is provided, it will path: str Git path repository stash_index: number Index of the stash list is first applied to the current branch, then removed from the stash. If the index is not provided, the most recent stash (index=0) will be removed from the stash. """ cmd = ["git", "stash", "pop"] if stash_index: cmd.append(str(stash_index)) env = os.environ.copy() env["GIT_TERMINAL_PROMPT"] = "0" code, output, error = await self.__execute(cmd, cwd=path, env=env) if code != 0: return {"code": code, "command": " ".join(cmd), "message": error} return {"code": code, "message": output.strip()} async def drop_stash(self, path, stash_index: Optional[int] = None) -> dict: """ Execute git stash drop to delete a single stash entry. If not stash_index is provided, delete the entire stash. path: Git path repository stash_index: number or None Index of the stash list to remove from the stash. If None, the entire stash is removed. """ cmd = ["git", "stash"] if stash_index is None: cmd.append("clear") else: cmd.extend(["drop", str(stash_index)]) env = os.environ.copy() env["GIT_TERMINAL_PROMPT"] = "0" code, output, error = await self.__execute(cmd, cwd=path, env=env) if code != 0: return {"code": code, "command": " ".join(cmd), "message": error} return {"code": code, "message": output.strip()} async def apply_stash(self, path: str, stash_index: Optional[int] = None) -> dict: """ Execute git stash apply to apply a single stash entry to the repository. If not stash_index is provided, apply the latest stash. path: str Git path repository stash_index: number Index of the stash list is applied to the repository. """ # Clear cmd = ["git", "stash", "apply"] if stash_index is not None: cmd.append("stash@{" + str(stash_index) + "}") env = os.environ.copy() env["GIT_TERMINAL_PROMPT"] = "0" code, output, error = await self.__execute(cmd, cwd=path, env=env) # error: if code != 0: return {"code": code, "command": " ".join(cmd), "message": error} return {"code": code, "message": output.strip()} @property def excluded_paths(self) -> List[str]: """Wildcard-style path patterns that do not support git commands. You can use ``*`` to match everything or ``?`` to match any single character. """ return self._config.excluded_paths