Mercurial > hg-git-serve
view src/hggit_serve.py @ 9:5000914da3ff default tip
simplify handling of stream copying
| author | Paul Fisher <paul@pfish.zone> |
|---|---|
| date | Mon, 16 Feb 2026 00:12:57 -0500 |
| parents | fe3c9fae4d4d |
| children |
line wrap: on
line source
from __future__ import annotations import binascii import email.parser import email.policy import re import shutil import subprocess import typing as t import dulwich.refs import mercurial.error as hgerr from hggit import git_handler from mercurial import extensions from mercurial import registrar from mercurial import wireprotoserver from mercurial.thirdparty import attr if t.TYPE_CHECKING: import mercurial.hgweb.hgweb_mod_inner as web_inner import mercurial.hgweb.request as hgreq import mercurial.interfaces.repository as hgrepo import mercurial.ui as hgui class GittyRepo(hgrepo.IRepo, t.Protocol): githandler: git_handler.GitHandler PermissionCheck = t.Callable[ [web_inner.requestcontext, hgreq.parsedrequest, bytes], None, ] def _is_gitty(repo: hgrepo.IRepo) -> t.TypeGuard[GittyRepo]: """Ensures that we have hg-git installed and active.""" return hasattr(repo, 'githandler') _CGI_VAR = re.compile(rb'[A-Z0-9_]+$') """Environment variables that we need to pass to git-as-cgi.""" def _build_git_environ( req_ctx: web_inner.requestcontext, request: hgreq.parsedrequest, ) -> dict[bytes, bytes]: """Builds the environment to be sent to Git to serve HTTP.""" fixed = { k: v for (k, v) in request.rawenv.items() if isinstance(v, bytes) and _CGI_VAR.match(k) } fixed.update( { b'GIT_HTTP_EXPORT_ALL': b'yes', b'GIT_PROJECT_ROOT': req_ctx.repo.path, b'PATH_INFO': b'/git/' + request.dispatchpath, # Since Mercurial is taking care of authorization checking, # we tell Git to always allow push. b'GIT_CONFIG_COUNT': b'1', b'GIT_CONFIG_KEY_0': b'http.receivepack', b'GIT_CONFIG_VALUE_0': b'true', } ) return fixed def _parse_cgi_response( output: t.IO[bytes], ) -> tuple[bytes, dict[bytes, bytes], t.IO[bytes]]: """Parses a CGI response into a status, headers, and everyhting else.""" parser = email.parser.BytesFeedParser(policy=email.policy.HTTP) while line := output.readline(): if not line.rstrip(b'\r\n'): # We've reached the end of the headers. # Leave the rest in the output for later. break parser.feed(line) msg = parser.close() status = msg.get('Status', '200 OK I guess').encode('utf-8') del msg['Status'] # this won't raise an exception byte_headers = { k.encode('utf-8'): v.encode('utf-8') for (k, v) in msg.items() } return status, byte_headers, output _PULL = b'pull' _PUSH = b'push' _SERVICE_PERMISSIONS = { b'git-upload-pack': _PULL, b'git-receive-pack': _PUSH, } """The Mercurial permission corresponding to each Git action. These seem backwards because the direction of up/download is relative to the server, so when the client pulls, the server is *uploading*, and when the client pushes, the server is *downloading*. """ def _git_service_permission(request: hgreq.parsedrequest) -> bytes | None: """Figures out what Mercurial permission corresponds to a request from Git. If the request is a supported Git action, returns the permission it needs. If the request is not a Git action, returns None. """ if perm := _SERVICE_PERMISSIONS.get(request.dispatchpath): return perm if request.dispatchpath != b'info/refs': return None qs = request.querystring service = qs.removeprefix(b'service=') if qs == service: # Nothing was stripped. return None return _SERVICE_PERMISSIONS.get(service) def _handle_git_protocol( original: t.Callable[..., bool], req_ctx: web_inner.requestcontext, request: hgreq.parsedrequest, response: hgreq.wsgiresponse, check_permission: PermissionCheck, ) -> bool: """Intercepts requests from Git, if needed.""" perm = _git_service_permission(request) repo: hgrepo.IRepo = req_ctx.repo if not perm or not _is_gitty(repo): # We only handle Git requests to Gitty repos. return original(req_ctx, request, response, check_permission) # Permission workaround: Mercurial requires POSTs for push, # but the advertisement request from Git will be a GET. # We just lie to Mercurial about what we're doing. check_permission( req_ctx, ( attr.evolve(req_ctx.req, method=b'POST') if perm == _PUSH else req_ctx.req ), perm, ) cgi_env = _build_git_environ(req_ctx, request) http_backend = repo.ui.configlist( b'hggit-serve', b'http-backend', default=(b'git', b'http-backend') ) call = subprocess.Popen( http_backend, close_fds=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, env=cgi_env, text=False, ) assert call.stdout assert call.stdin # Git will not start writing output until stdin is fully closed. with call.stdin: # This is how we know if there's anything to read from bodyfh. # If we try to read from bodyfh on a request with no content, # it hangs forever. if b'CONTENT_LENGTH' in request.rawenv: shutil.copyfileobj(request.bodyfh, call.stdin) status, headers, rest = _parse_cgi_response(call.stdout) response.status = status for k, v in headers.items(): response.headers[k] = v def write_the_rest() -> t.Iterator[bytes]: with call, rest: # if it's good enough for shutil it's good enough for me while more := rest.read(shutil.COPY_BUFSIZE): yield more if perm == _PUSH: _importing_enter(repo) try: gh = repo.githandler gh.import_git_objects( b'git-push', remote_names=(), refs=gh.git.refs.as_dict() ) finally: _importing_exit(repo) response.setbodygen(write_the_rest()) response.sendresponse() return True # # Stuff so that we don't try to export revisions while we're importing. # _ILEVEL_ATTR = '@hggit_import_level' """An attribute that tracks how many "levels deep" we are into importing. We set this on the repository object when we're importing and remove it when we're done. It's not just a bool in case somebody sets up some crazy recursive hook situation where we start importing inside another import. """ def _importing_enter(repo: hgrepo.IRepo) -> None: """Call this before you start importing from Git.""" level = getattr(repo, _ILEVEL_ATTR, 0) + 1 setattr(repo, _ILEVEL_ATTR, level) def _is_importing(repo: hgrepo.IRepo) -> None: """Call this to check if you're currently importing.""" return hasattr(repo, _ILEVEL_ATTR) def _importing_exit(repo: hgrepo.IRepo) -> None: """Call this after you finish importing from Git.""" level = getattr(repo, _ILEVEL_ATTR) - 1 if level: setattr(repo, _ILEVEL_ATTR, level) else: delattr(repo, _ILEVEL_ATTR) # # Export handling. # def _clean_all_refs(refs: dulwich.refs.RefsContainer) -> None: """Removes all refs from the Git repository.""" def _set_head(ui: hgui.ui, repo: GittyRepo, at_name: bytes) -> None: """Creates a HEAD reference in Git referring to the current HEAD.""" # By default, we use '@', since that's what will be auto checked out. current = b'@' if current not in repo._bookmarks: current = repo._bookmarks.active or current # We'll be moving this (possibly fake) bookmark into Git. git_current = current if current == b'@': # @ is a special keyword in Git, so we can't use it as a bookmark. git_current = at_name git_branch = dulwich.refs.LOCAL_BRANCH_PREFIX + git_current if not dulwich.refs.check_ref_format(git_branch): # We can't export this ref to Git. Give up. ui.warn(f'{git_branch!r} is not a valid branch name for Git.'.encode()) return try: # Maybe this is a real bookmark? hgnode = repo._bookmarks[current] except KeyError: # Not a real bookmark. Assume we want the tip of the current branch. branch = repo.dirstate.branch() try: hgnode = repo.branchtip(branch) except hgerr.RepoLookupError: # This branch somehow doesn't exist??? ui.warn(f"{branch!r} doesn't seem to exist?".encode()) return hgsha = binascii.hexlify(hgnode) gitsha = repo.githandler.map_git_get(hgsha) if not gitsha: # No Git SHA to match this Hg sha. Give up. ui.warn(f'revision {hgsha!r} was not exported to Git'.encode()) return refs = repo.githandler.git.refs refs.add_packed_refs({git_branch: gitsha}) refs.set_symbolic_ref(b'HEAD', git_branch) def fix_refs_hook(ui: hgui.ui, repo: hgrepo.IRepo, **__: object) -> None: """Exports to Git and sets up for serving. See ``_fix_refs``.""" if not _is_gitty(repo): return _fix_refs(ui, repo) def _fix_refs(ui: hgui.ui, repo: GittyRepo) -> None: """After a git export, fix up the refs. This ensures that there are no leftover refs from older, removed bookmarks and that there is a proper HEAD set so that cloning works. """ refs = repo.githandler.git.refs # dump to allkeys so we explicitly are iterating over a snapshot # and not over something while we mutate for ref in refs.allkeys(): refs.remove_if_equals(ref, None) repo.githandler.export_hg_tags() repo.githandler.update_references() default_branch_name = ui.config( b'hggit-serve', b'default-branch', b'default' ) _set_head(ui, repo, default_branch_name) def export_hook(ui: hgui.ui, repo: hgrepo.IRepo, **__: object) -> None: """Maybe exports the repository to get after we get new revs.""" if not _is_gitty(repo): return auto_export = ui.config(b'hggit-serve', b'auto-export') if auto_export == b'never': return if auto_export == b'always' or git_handler.has_gitrepo(repo): if _is_importing(repo): ui.note(b'currently importing revs from git; not exporting\n') return repo.githandler.export_commits() _fix_refs(ui, repo) # # Interfacing with Mercurial # __version__ = '0.2.0' testedwith = b'7.1 7.2' minimumhgversion = b'7.1' cmdtable: dict[bytes, object] = {} command = registrar.command(cmdtable) def uisetup(_: hgui.ui) -> None: extensions.wrapfunction( wireprotoserver, 'handlewsgirequest', _handle_git_protocol ) def uipopulate(ui: hgui.ui) -> None: # Fix up our tags after a Git export. ui.setconfig( b'hooks', b'post-git-export.__gitserve_add_tag__', fix_refs_hook ) # Whenever we get new revisions, export them to the Git repository. ui.setconfig(b'hooks', b'txnclose.__gitserve_export__', export_hook) # Don't step on ourselves when importing data from Git. ui.setconfig( b'hooks', b'pre-git-import.__gitserve_suppress_export__', lambda _, repo, **__: _importing_enter(repo), ) ui.setconfig( b'hooks', b'post-git-import.__gitserve_suppress_export__', lambda _, repo, **__: _importing_exit(repo), ) __all__ = ( '__version__', 'cmdtable', 'command', 'minimumhgversion', 'testedwith', 'uipopulate', 'uisetup', )
