Mercurial > hg-git-serve
diff src/hggit_serve.py @ 7:4f42fdbb25f2
rename to hggit_serve
| author | Paul Fisher <paul@pfish.zone> |
|---|---|
| date | Sun, 15 Feb 2026 01:49:42 -0500 |
| parents | src/git_serve/__init__.py@7113e0ac3662 |
| children | fe3c9fae4d4d |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/hggit_serve.py Sun Feb 15 01:49:42 2026 -0500 @@ -0,0 +1,243 @@ +from __future__ import annotations + +import binascii +import email.parser +import email.policy +import os.path +import re +import shutil +import subprocess +import typing as t + +import dulwich.refs +import mercurial.error as hgerr +from mercurial import extensions +from mercurial import registrar +from mercurial import wireprotoserver + +if t.TYPE_CHECKING: + import hggit.git_handler + import mercurial.hgweb.hgweb_mod_inner as web_inner + import mercurial.hgweb.request as hgreq + import mercurial.interfaces.repository as hgrepo + import mercurial.ui as hgui + + class GittyRepo(hgrepo.IRepo, t.Protocol): + githandler: hggit.git_handler.GitHandler + + PermissionCheck = t.Callable[ + [web_inner.requestcontext, hgreq.parsedrequest, bytes], + None, + ] + GitPrelude = t.Sequence[bytes | str | os.PathLike] + + +def _is_gitty(repo: hgrepo.IRepo) -> t.TypeGuard[GittyRepo]: + """Ensures that we have hg-git installed and active.""" + return hasattr(repo, 'githandler') + + +_CGI_VAR = re.compile(rb'[A-Z0-9_]+$') +"""Environment variables that we need to pass to git-as-cgi.""" + + +def _build_git_environ( + req_ctx: web_inner.requestcontext, + request: hgreq.parsedrequest, +) -> dict[bytes, bytes]: + """Builds the environment to be sent to Git to serve HTTP.""" + fixed = { + k: v + for (k, v) in request.rawenv.items() + if isinstance(v, bytes) and _CGI_VAR.match(k) + } + fixed[b'GIT_HTTP_EXPORT_ALL'] = b'yes' + fixed[b'GIT_PROJECT_ROOT'] = req_ctx.repo.path + fixed[b'PATH_INFO'] = b'/git/' + request.dispatchpath + return fixed + + +def _parse_cgi_response( + output: t.IO[bytes], +) -> tuple[bytes, dict[bytes, bytes], t.IO[bytes]]: + """Parses a CGI response into a status, headers, and everyhting else.""" + parser = email.parser.BytesFeedParser(policy=email.policy.HTTP) + while line := output.readline(): + if not line.rstrip(b'\r\n'): + # We've reached the end of the headers. + # Leave the rest in the output for later. + break + parser.feed(line) + msg = parser.close() + status = msg.get('Status', '200 OK I guess').encode('utf-8') + del msg['Status'] # this won't raise an exception + byte_headers = { + k.encode('utf-8'): v.encode('utf-8') for (k, v) in msg.items() + } + return status, byte_headers, output + + +def _handle_git_protocol( + original: t.Callable[..., bool], + req_ctx: web_inner.requestcontext, + request: hgreq.parsedrequest, + response: hgreq.wsgiresponse, + check_permission: PermissionCheck, +) -> bool: + """Intercepts requests from Git, if needed.""" + repo = req_ctx.repo + if not _is_gitty(repo) or b'git-protocol' not in request.headers: + # We only handle Git requests; everything else is normal. + return original(req_ctx, request, response, check_permission) + check_permission(req_ctx, request, b'pull') + # If a request is git, we assume we should be the one handling it. + cgi_env = _build_git_environ(req_ctx, request) + content_length_hdr = request.headers.get(b'content-length', b'0') + try: + content_length = int(content_length_hdr) + except ValueError as ve: + raise hgerr.InputError( + f'Invalid content-length {content_length!r}'.encode() + ) from ve + http_backend = req_ctx.repo.ui.configlist( + b'git-serve', b'http-backend', default=(b'git', b'http-backend') + ) + call = subprocess.Popen( + http_backend, + close_fds=True, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + env=cgi_env, + text=False, + ) + assert call.stdout + assert call.stdin + # Git will not start writing output until stdin is fully closed. + with call.stdin: + if content_length: + shutil.copyfileobj( + request.bodyfh, call.stdin, length=content_length + ) + + status, headers, rest = _parse_cgi_response(call.stdout) + response.status = status + for k, v in headers.items(): + response.headers[k] = v + + def write_the_rest(): + with call, rest: + while more := rest.read(1024 * 1024): + yield more + + response.setbodygen(write_the_rest()) + response.sendresponse() + return True + + +def _clean_all_refs(refs: dulwich.refs.RefsContainer) -> None: + """Removes all refs from the Git repository.""" + for ref in refs.allkeys(): + refs.remove_if_equals(ref, None) + + +def _set_head(ui: hgui.ui, repo: GittyRepo, at_name: bytes) -> None: + """Creates a HEAD reference in Git referring to the current HEAD.""" + # By default, we use '@', since that's what will be auto checked out. + current = b'@' + if current not in repo._bookmarks: + current = repo._bookmarks.active or current + + # We'll be moving this (possibly fake) bookmark into Git. + git_current = current + if current == b'@': + # @ is a special keyword in Git, so we can't use it as a bookmark. + git_current = at_name + git_branch = dulwich.refs.LOCAL_BRANCH_PREFIX + git_current + if not dulwich.refs.check_ref_format(git_branch): + # We can't export this ref to Git. Give up. + ui.warn(f'{git_branch!r} is not a valid branch name for Git.'.encode()) + return + try: + # Maybe this is a real bookmark? + hgsha = repo._bookmarks[current] + except KeyError: + # Not a real bookmark. Assume we want the tip of the current branch. + branch = repo.dirstate.branch() + try: + tip = repo.branchtip(branch) + except hgerr.RepoLookupError: + # This branch somehow doesn't exist??? + ui.warn(f"{branch} doesn't seem to exist?".encode()) + return + hgsha = binascii.hexlify(tip) + gitsha = repo.githandler.map_git_get(hgsha) + if not gitsha: + # No Git SHA to match this Hg sha. Give up. + ui.warn(f'revision {hgsha} was not exported to Git'.encode()) + return + refs = repo.githandler.git.refs + refs.add_packed_refs({git_branch: gitsha}) + refs.set_symbolic_ref(b'HEAD', git_branch) + + +def fix_refs_hook(ui: hgui.ui, repo: hgrepo.IRepo, **__: object) -> None: + """Exports to Git and sets up for serving.""" + if not _is_gitty(repo): + return + _fix_refs(ui, repo) + + +def _fix_refs(ui: hgui.ui, repo: GittyRepo) -> None: + """After a git export, fix up the refs.""" + _clean_all_refs(repo.githandler.git.refs) + repo.githandler.export_hg_tags() + repo.githandler.update_references() + default_branch_name = ui.config( + b'hggit-serve', b'default-branch', b'default' + ) + _set_head(ui, repo, default_branch_name) + + +def export_hook(ui: hgui.ui, repo: hgrepo.IRepo, **__: object) -> None: + if not _is_gitty(repo): + return + auto_export = ui.config(b'hggit-serve', b'auto-export') + if auto_export == b'never': + return + if auto_export == b'always' or os.path.isdir(repo.githandler.gitdir): + repo.githandler.export_commits() + _fix_refs(ui, repo) + + +# Interfacing with Mercurial + +__version__ = '0.1.5' +testedwith = b'7.1 7.2' + +cmdtable: dict[bytes, object] = {} + +command = registrar.command(cmdtable) + + +def uisetup(_: hgui.ui) -> None: + extensions.wrapfunction( + wireprotoserver, 'handlewsgirequest', _handle_git_protocol + ) + + +def uipopulate(ui: hgui.ui) -> None: + ui.setconfig( + b'hooks', b'post-git-export.__gitserve_add_tag__', fix_refs_hook + ) + ui.setconfig(b'hooks', b'txnclose.__gitserve_export__', export_hook) + + +__all__ = ( + '__version__', + 'cmdtable', + 'command', + 'testedwith', + 'uipopulate', + 'uisetup', +)
