view src/git_serve/__init__.py @ 3:189f4a0bc653

don't try to pass bytes to pathlib.Path
author Paul Fisher <paul@pfish.zone>
date Sat, 14 Feb 2026 18:53:30 -0500
parents 871dcb2a2aeb
children 5ad58438318a
line wrap: on
line source

from __future__ import annotations

import binascii
import email.parser
import email.policy
import os.path
import re
import shutil
import subprocess
import threading
import typing as t

import dulwich.refs
import mercurial.error as hgerr
from mercurial import extensions
from mercurial import registrar
from mercurial import wireprotoserver

if t.TYPE_CHECKING:
    import hggit.git_handler
    import mercurial.hgweb.hgweb_mod_inner as web_inner
    import mercurial.hgweb.request as hgreq
    import mercurial.interfaces.repository as hgrepo
    import mercurial.ui as hgui

    class GittyRepo(hgrepo.IRepo, t.Protocol):
        githandler: hggit.git_handler.GitHandler

    PermissionCheck = t.Callable[
        [web_inner.requestcontext, hgreq.parsedrequest, bytes],
        None,
    ]
    GitPrelude = t.Sequence[bytes | str | os.PathLike]


_CGI_VAR = re.compile(rb'[A-Z0-9_]+$')
"""Environment variables that we need to pass to git-as-cgi."""


def _build_git_environ(
    req_ctx: web_inner.requestcontext,
    request: hgreq.parsedrequest,
) -> dict[bytes, bytes]:
    """Builds the environment to be sent to Git to serve HTTP."""
    fixed = {
        k: v
        for (k, v) in request.rawenv.items()
        if isinstance(v, bytes) and _CGI_VAR.match(k)
    }
    fixed[b'GIT_HTTP_EXPORT_ALL'] = b'yes'
    fixed[b'GIT_PROJECT_ROOT'] = req_ctx.repo.path
    fixed[b'PATH_INFO'] = b'/git/' + request.dispatchpath
    return fixed


def _parse_cgi_response(
    output: t.IO[bytes],
) -> tuple[bytes, dict[bytes, bytes], t.IO[bytes]]:
    parser = email.parser.BytesFeedParser(policy=email.policy.HTTP)
    while line := output.readline():
        if not line.rstrip(b'\r\n'):
            # We've reached the end of the headers.
            # Leave the rest in the output for later.
            break
        parser.feed(line)
    msg = parser.close()
    status = msg.get('Status', '200 OK I guess').encode('utf-8')
    del msg['Status']  # this won't raise an exception
    byte_headers = {
        k.encode('utf-8'): v.encode('utf-8') for (k, v) in msg.items()
    }
    return status, byte_headers, output


_feeds = 0
_feeder_lock = threading.Lock()


def feed_count() -> int:
    global _feeds
    with _feeder_lock:
        _feeds += 1
        return _feeds


def git_binary(ui: hgui.ui) -> bytes:
    return ui.config(b'git-serve', b'git', default=b'git')


def handle_git_protocol(
    original: t.Callable[..., bool],
    req_ctx: web_inner.requestcontext,
    request: hgreq.parsedrequest,
    response: hgreq.wsgiresponse,
    check_permission: PermissionCheck,
) -> bool:
    repo = req_ctx.repo
    if not is_gitty(repo) or b'git-protocol' not in request.headers:
        # We only handle Git requests; everything else is normal.
        return original(req_ctx, request, response, check_permission)
    check_permission(req_ctx, request, b'pull')
    # If a request is git, we assume we should be the one handling it.
    cgi_env = _build_git_environ(req_ctx, request)
    http_backend = req_ctx.repo.ui.configlist(
        b'git-serve', b'http-backend', default=(b'git', b'http-backend')
    )
    is_post = request.method == b'POST'
    call = subprocess.Popen(
        http_backend,
        close_fds=True,
        stdin=subprocess.PIPE if is_post else None,
        stdout=subprocess.PIPE,
        stderr=subprocess.DEVNULL,
        env=cgi_env,
        text=False,
    )
    assert call.stdout

    def feed():
        try:
            with call.stdin as stdin:
                shutil.copyfileobj(request.bodyfh, stdin)
        except (OSError, BrokenPipeError):
            pass  # Expected; this just means it's closed.

    if is_post:
        threading.Thread(target=feed, name=f'git-feeder-{feed_count()}').start()
    status, headers, rest = _parse_cgi_response(call.stdout)
    response.status = status
    for k, v in headers.items():
        response.headers[k] = v

    def write_the_rest():
        with call, rest:
            while more := rest.read(64 * 1024):
                yield more

    response.setbodygen(write_the_rest())
    response.sendresponse()
    return True


def clean_all_refs(refs: dulwich.refs.RefsContainer) -> None:
    for ref in refs.allkeys():
        refs.remove_if_equals(ref, None)


def set_head(repo: GittyRepo) -> None:
    """Creates a HEAD reference in Git referring to the current HEAD."""
    # By default, we use '@', since that's what will be auto checked out.
    current = b'@'
    if current not in repo._bookmarks:
        current = repo._bookmarks.active or current

    # We'll be moving this (possibly fake) bookmark into Git.
    git_current = current
    if current == b'@':
        # @ is a special keyword in Git, so we can't use it as a bookmark.
        git_current = b'__default__'
    git_branch = dulwich.refs.LOCAL_BRANCH_PREFIX + git_current
    if not dulwich.refs.check_ref_format(git_branch):
        # We can't export this ref to Git. Give up.
        return
    refs = repo.githandler.git.refs
    if git_branch not in refs:
        # This means our bookmark isn't actually in Git (usually because
        # there's no real bookmark called '@'). We need to fake it.
        try:
            # Maybe this is a real bookmark?
            hgsha = repo._bookmarks[current]
        except KeyError:
            # Not a real bookmark. Assume we want the tip of the current branch.
            branch = repo.dirstate.branch()
            try:
                tip = repo.branchtip(branch)
            except hgerr.RepoLookupError:
                # This branch somehow doesn't exist???
                return
            hgsha = binascii.hexlify(tip)
        gitsha = repo.githandler.map_git_get(hgsha)
        if not gitsha:
            # No Git SHA to match this Hg sha. Give up.
            return
        refs.add_packed_refs({git_branch: gitsha})
    refs.set_symbolic_ref(b'HEAD', git_branch)


def export_hook(ui: hgui.ui, repo: GittyRepo, **__: object) -> None:
    never_export = ui.configbool(b'git-serve', b'never-export')
    if never_export:
        return
    always_export = ui.configbool(b'git-serve', b'always-export', False)
    if always_export or os.path.isdir(repo.githandler.gitdir):
        export_repo(repo)


def export_repo(repo: GittyRepo) -> None:
    clean_all_refs(repo.githandler.git.refs)
    repo.githandler.export_commits()
    set_head(repo)


def is_gitty(repo: hgrepo.IRepo) -> t.TypeGuard[GittyRepo]:
    return hasattr(repo, 'githandler')


# Interfacing with Mercurial

__version__ = '0.1.1'

cmdtable: dict[bytes, object] = {}

command = registrar.command(cmdtable)


@command(b'git-serve-export')
def git_serve_export(_: hgui.ui, repo: hgrepo.IRepo, **__: object) -> None:
    if not is_gitty(repo):
        raise hgerr.Abort(b'this extension depends on the `hggit` extension')
    export_repo(repo)


def uisetup(_: hgui.ui) -> None:
    extensions.wrapfunction(
        wireprotoserver, 'handlewsgirequest', handle_git_protocol
    )


def reposetup(ui: hgui.ui, _: hgrepo.IRepo) -> None:
    ui.setconfig(b'hooks', b'txnclose.__gitserve_internal__', export_hook)


__all__ = ('__version__', 'cmdtable', 'command', 'uisetup', 'reposetup')