view src/hggit_serve.py @ 8:fe3c9fae4d4d

Add support for pushes, and improve authentication. Now you can `git push` to a Mercurial repository! Also we check permissions much more precisely.
author Paul Fisher <paul@pfish.zone>
date Sun, 15 Feb 2026 22:26:15 -0500
parents 4f42fdbb25f2
children 5000914da3ff
line wrap: on
line source

from __future__ import annotations

import binascii
import email.parser
import email.policy
import re
import shutil
import subprocess
import typing as t

import dulwich.refs
import mercurial.error as hgerr
from hggit import git_handler
from mercurial import extensions
from mercurial import registrar
from mercurial import wireprotoserver
from mercurial.thirdparty import attr

if t.TYPE_CHECKING:
    import mercurial.hgweb.hgweb_mod_inner as web_inner
    import mercurial.hgweb.request as hgreq
    import mercurial.interfaces.repository as hgrepo
    import mercurial.ui as hgui

    class GittyRepo(hgrepo.IRepo, t.Protocol):
        githandler: git_handler.GitHandler

    PermissionCheck = t.Callable[
        [web_inner.requestcontext, hgreq.parsedrequest, bytes],
        None,
    ]


def _is_gitty(repo: hgrepo.IRepo) -> t.TypeGuard[GittyRepo]:
    """Ensures that we have hg-git installed and active."""
    return hasattr(repo, 'githandler')


_CGI_VAR = re.compile(rb'[A-Z0-9_]+$')
"""Environment variables that we need to pass to git-as-cgi."""


def _build_git_environ(
    req_ctx: web_inner.requestcontext,
    request: hgreq.parsedrequest,
) -> dict[bytes, bytes]:
    """Builds the environment to be sent to Git to serve HTTP."""
    fixed = {
        k: v
        for (k, v) in request.rawenv.items()
        if isinstance(v, bytes) and _CGI_VAR.match(k)
    }
    fixed.update(
        {
            b'GIT_HTTP_EXPORT_ALL': b'yes',
            b'GIT_PROJECT_ROOT': req_ctx.repo.path,
            b'PATH_INFO': b'/git/' + request.dispatchpath,
            # Since Mercurial is taking care of authorization checking,
            # we tell Git to always allow push.
            b'GIT_CONFIG_COUNT': b'1',
            b'GIT_CONFIG_KEY_0': b'http.receivepack',
            b'GIT_CONFIG_VALUE_0': b'true',
        }
    )
    return fixed


def _parse_cgi_response(
    output: t.IO[bytes],
) -> tuple[bytes, dict[bytes, bytes], t.IO[bytes]]:
    """Parses a CGI response into a status, headers, and everyhting else."""
    parser = email.parser.BytesFeedParser(policy=email.policy.HTTP)
    while line := output.readline():
        if not line.rstrip(b'\r\n'):
            # We've reached the end of the headers.
            # Leave the rest in the output for later.
            break
        parser.feed(line)
    msg = parser.close()
    status = msg.get('Status', '200 OK I guess').encode('utf-8')
    del msg['Status']  # this won't raise an exception
    byte_headers = {
        k.encode('utf-8'): v.encode('utf-8') for (k, v) in msg.items()
    }
    return status, byte_headers, output


_PULL = b'pull'
_PUSH = b'push'

_SERVICE_PERMISSIONS = {
    b'git-upload-pack': _PULL,
    b'git-receive-pack': _PUSH,
}
"""The Mercurial permission corresponding to each Git action.

These seem backwards because the direction of up/download is relative to
the server, so when the client pulls, the server is *uploading*,
and when the client pushes, the server is *downloading*.
"""


def _git_service_permission(request: hgreq.parsedrequest) -> bytes | None:
    """Figures out what Mercurial permission corresponds to a request from Git.

    If the request is a supported Git action, returns the permission it needs.
    If the request is not a Git action, returns None.
    """
    if perm := _SERVICE_PERMISSIONS.get(request.dispatchpath):
        return perm
    if request.dispatchpath != b'info/refs':
        return None
    qs = request.querystring
    service = qs.removeprefix(b'service=')
    if qs == service:
        # Nothing was stripped.
        return None
    return _SERVICE_PERMISSIONS.get(service)


def _handle_git_protocol(
    original: t.Callable[..., bool],
    req_ctx: web_inner.requestcontext,
    request: hgreq.parsedrequest,
    response: hgreq.wsgiresponse,
    check_permission: PermissionCheck,
) -> bool:
    """Intercepts requests from Git, if needed."""
    perm = _git_service_permission(request)
    repo: hgrepo.IRepo = req_ctx.repo
    if not perm or not _is_gitty(repo):
        # We only handle Git requests to Gitty repos.
        return original(req_ctx, request, response, check_permission)

    # Permission workaround: Mercurial requires POSTs for push,
    # but the advertisement request from Git will be a GET.
    # We just lie to Mercurial about what we're doing.
    check_permission(
        req_ctx,
        (
            attr.evolve(req_ctx.req, method=b'POST')
            if perm == _PUSH
            else req_ctx.req
        ),
        perm,
    )
    cgi_env = _build_git_environ(req_ctx, request)
    content_length_hdr = request.headers.get(b'content-length', b'0')
    try:
        content_length = int(content_length_hdr)
    except ValueError as ve:
        raise hgerr.InputError(
            f'Invalid content-length {content_length_hdr!r}'.encode()
        ) from ve
    http_backend = repo.ui.configlist(
        b'hggit-serve', b'http-backend', default=(b'git', b'http-backend')
    )
    call = subprocess.Popen(
        http_backend,
        close_fds=True,
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
        stderr=subprocess.DEVNULL,
        env=cgi_env,
        text=False,
    )
    assert call.stdout
    assert call.stdin
    # Git will not start writing output until stdin is fully closed.
    with call.stdin:
        if content_length:
            shutil.copyfileobj(
                request.bodyfh, call.stdin, length=content_length
            )

    status, headers, rest = _parse_cgi_response(call.stdout)
    response.status = status
    for k, v in headers.items():
        response.headers[k] = v

    def write_the_rest() -> t.Iterator[bytes]:
        with call, rest:
            while more := rest.read(1024 * 1024):
                yield more
        if perm == _PUSH:
            _importing_enter(repo)
            try:
                gh = repo.githandler
                gh.import_git_objects(
                    b'git-push', remote_names=(), refs=gh.git.refs.as_dict()
                )
            finally:
                _importing_exit(repo)

    response.setbodygen(write_the_rest())
    response.sendresponse()
    return True


#
# Stuff so that we don't try to export revisions while we're importing.
#

_ILEVEL_ATTR = '@hggit_import_level'
"""An attribute that tracks how many "levels deep" we are into importing.

We set this on the repository object when we're importing and remove it
when we're done. It's not just a bool in case somebody sets up some crazy
recursive hook situation where we start importing inside another import.
"""


def _importing_enter(repo: hgrepo.IRepo) -> None:
    """Call this before you start importing from Git."""
    level = getattr(repo, _ILEVEL_ATTR, 0) + 1
    setattr(repo, _ILEVEL_ATTR, level)


def _is_importing(repo: hgrepo.IRepo) -> None:
    """Call this to check if you're currently importing."""
    return hasattr(repo, _ILEVEL_ATTR)


def _importing_exit(repo: hgrepo.IRepo) -> None:
    """Call this after you finish importing from Git."""
    level = getattr(repo, _ILEVEL_ATTR) - 1
    if level:
        setattr(repo, _ILEVEL_ATTR, level)
    else:
        delattr(repo, _ILEVEL_ATTR)


#
# Export handling.
#


def _clean_all_refs(refs: dulwich.refs.RefsContainer) -> None:
    """Removes all refs from the Git repository."""


def _set_head(ui: hgui.ui, repo: GittyRepo, at_name: bytes) -> None:
    """Creates a HEAD reference in Git referring to the current HEAD."""
    # By default, we use '@', since that's what will be auto checked out.
    current = b'@'
    if current not in repo._bookmarks:
        current = repo._bookmarks.active or current

    # We'll be moving this (possibly fake) bookmark into Git.
    git_current = current
    if current == b'@':
        # @ is a special keyword in Git, so we can't use it as a bookmark.
        git_current = at_name
    git_branch = dulwich.refs.LOCAL_BRANCH_PREFIX + git_current
    if not dulwich.refs.check_ref_format(git_branch):
        # We can't export this ref to Git. Give up.
        ui.warn(f'{git_branch!r} is not a valid branch name for Git.'.encode())
        return
    try:
        # Maybe this is a real bookmark?
        hgnode = repo._bookmarks[current]
    except KeyError:
        # Not a real bookmark. Assume we want the tip of the current branch.
        branch = repo.dirstate.branch()
        try:
            hgnode = repo.branchtip(branch)
        except hgerr.RepoLookupError:
            # This branch somehow doesn't exist???
            ui.warn(f"{branch!r} doesn't seem to exist?".encode())
            return
    hgsha = binascii.hexlify(hgnode)
    gitsha = repo.githandler.map_git_get(hgsha)
    if not gitsha:
        # No Git SHA to match this Hg sha. Give up.
        ui.warn(f'revision {hgsha!r} was not exported to Git'.encode())
        return
    refs = repo.githandler.git.refs
    refs.add_packed_refs({git_branch: gitsha})
    refs.set_symbolic_ref(b'HEAD', git_branch)


def fix_refs_hook(ui: hgui.ui, repo: hgrepo.IRepo, **__: object) -> None:
    """Exports to Git and sets up for serving.  See ``_fix_refs``."""
    if not _is_gitty(repo):
        return
    _fix_refs(ui, repo)


def _fix_refs(ui: hgui.ui, repo: GittyRepo) -> None:
    """After a git export, fix up the refs.

    This ensures that there are no leftover refs from older, removed bookmarks
    and that there is a proper HEAD set so that cloning works.
    """
    refs = repo.githandler.git.refs
    # dump to allkeys so we explicitly are iterating over a snapshot
    # and not over something while we mutate
    for ref in refs.allkeys():
        refs.remove_if_equals(ref, None)
    repo.githandler.export_hg_tags()
    repo.githandler.update_references()
    default_branch_name = ui.config(
        b'hggit-serve', b'default-branch', b'default'
    )
    _set_head(ui, repo, default_branch_name)


def export_hook(ui: hgui.ui, repo: hgrepo.IRepo, **__: object) -> None:
    """Maybe exports the repository to get after we get new revs."""
    if not _is_gitty(repo):
        return
    auto_export = ui.config(b'hggit-serve', b'auto-export')
    if auto_export == b'never':
        return
    if auto_export == b'always' or git_handler.has_gitrepo(repo):
        if _is_importing(repo):
            ui.note(b'currently importing revs from git; not exporting\n')
            return
        repo.githandler.export_commits()
        _fix_refs(ui, repo)


#
# Interfacing with Mercurial
#

__version__ = '0.2.0'
testedwith = b'7.1 7.2'
minimumhgversion = b'7.1'

cmdtable: dict[bytes, object] = {}

command = registrar.command(cmdtable)


def uisetup(_: hgui.ui) -> None:
    extensions.wrapfunction(
        wireprotoserver, 'handlewsgirequest', _handle_git_protocol
    )


def uipopulate(ui: hgui.ui) -> None:
    # Fix up our tags after a Git export.
    ui.setconfig(
        b'hooks', b'post-git-export.__gitserve_add_tag__', fix_refs_hook
    )
    # Whenever we get new revisions, export them to the Git repository.
    ui.setconfig(b'hooks', b'txnclose.__gitserve_export__', export_hook)
    # Don't step on ourselves when importing data from Git.
    ui.setconfig(
        b'hooks',
        b'pre-git-import.__gitserve_suppress_export__',
        lambda _, repo, **__: _importing_enter(repo),
    )
    ui.setconfig(
        b'hooks',
        b'post-git-import.__gitserve_suppress_export__',
        lambda _, repo, **__: _importing_exit(repo),
    )


__all__ = (
    '__version__',
    'cmdtable',
    'command',
    'minimumhgversion',
    'testedwith',
    'uipopulate',
    'uisetup',
)