update.py: support remotes other than github

right now we can only refer to repositories hosted on github.
This allows to give random git repo uris like for instance:
https://git.sr.ht/~whynothugo/lsp_lines.nvim
This commit is contained in:
Matthieu Coudron 2022-01-11 00:20:52 +01:00
parent 2cbde6d1a1
commit 46c68ad418
2 changed files with 142 additions and 71 deletions

View file

@ -81,29 +81,71 @@ def make_request(url: str) -> urllib.request.Request:
headers["Authorization"] = f"token {token}" headers["Authorization"] = f"token {token}"
return urllib.request.Request(url, headers=headers) return urllib.request.Request(url, headers=headers)
@dataclass
class PluginDesc:
owner: str
repo: str
branch: str
alias: Optional[str]
class Repo: class Repo:
def __init__( def __init__(
self, owner: str, name: str, branch: str, alias: Optional[str] self, uri: str, branch: str, alias: Optional[str]
) -> None: ) -> None:
self.owner = owner self.uri = uri
self.name = name '''Url to the repo'''
self.branch = branch self.branch = branch
self.alias = alias self.alias = alias
self.redirect: Dict[str, str] = {} self.redirect: Dict[str, str] = {}
def url(self, path: str) -> str: @property
return urljoin(f"https://github.com/{self.owner}/{self.name}/", path) def name(self):
return self.uri.split('/')[-1]
def __repr__(self) -> str: def __repr__(self) -> str:
return f"Repo({self.owner}, {self.name})" return f"Repo({self.name}, {self.uri})"
@retry(urllib.error.URLError, tries=4, delay=3, backoff=2)
def has_submodules(self) -> bool:
return True
@retry(urllib.error.URLError, tries=4, delay=3, backoff=2)
def latest_commit(self) -> Tuple[str, datetime]:
loaded = self._prefetch(None)
updated = datetime.strptime(loaded['date'], "%Y-%m-%dT%H:%M:%S%z")
return loaded['rev'], updated
def _prefetch(self, ref: Optional[str]):
cmd = ["nix-prefetch-git", "--quiet", "--fetch-submodules", self.uri]
if ref is not None:
cmd.append(ref)
log.debug(cmd)
data = subprocess.check_output(cmd)
loaded = json.loads(data)
return loaded
def prefetch(self, ref: Optional[str]) -> str:
loaded = self._prefetch(ref)
return loaded["sha256"]
def as_nix(self, plugin: "Plugin") -> str:
return f'''fetchgit {{
url = "{self.uri}";
rev = "{plugin.commit}";
sha256 = "{plugin.sha256}";
}}'''
class RepoGitHub(Repo):
def __init__(
self, owner: str, repo: str, branch: str, alias: Optional[str]
) -> None:
self.owner = owner
self.repo = repo
'''Url to the repo'''
super().__init__(self.url(""), branch, alias)
log.debug("Instantiating github repo %s/%s", self.owner, self.repo)
@property
def name(self):
return self.repo
def url(self, path: str) -> str:
return urljoin(f"https://github.com/{self.owner}/{self.name}/", path)
@retry(urllib.error.URLError, tries=4, delay=3, backoff=2) @retry(urllib.error.URLError, tries=4, delay=3, backoff=2)
def has_submodules(self) -> bool: def has_submodules(self) -> bool:
@ -122,7 +164,7 @@ class Repo:
commit_url = self.url(f"commits/{self.branch}.atom") commit_url = self.url(f"commits/{self.branch}.atom")
commit_req = make_request(commit_url) commit_req = make_request(commit_url)
with urllib.request.urlopen(commit_req, timeout=10) as req: with urllib.request.urlopen(commit_req, timeout=10) as req:
self.check_for_redirect(commit_url, req) self._check_for_redirect(commit_url, req)
xml = req.read() xml = req.read()
root = ET.fromstring(xml) root = ET.fromstring(xml)
latest_entry = root.find(ATOM_ENTRY) latest_entry = root.find(ATOM_ENTRY)
@ -137,7 +179,7 @@ class Repo:
updated = datetime.strptime(updated_tag.text, "%Y-%m-%dT%H:%M:%SZ") updated = datetime.strptime(updated_tag.text, "%Y-%m-%dT%H:%M:%SZ")
return Path(str(url.path)).name, updated return Path(str(url.path)).name, updated
def check_for_redirect(self, url: str, req: http.client.HTTPResponse): def _check_for_redirect(self, url: str, req: http.client.HTTPResponse):
response_url = req.geturl() response_url = req.geturl()
if url != response_url: if url != response_url:
new_owner, new_name = ( new_owner, new_name = (
@ -150,11 +192,13 @@ class Repo:
new_plugin = plugin_line.format(owner=new_owner, name=new_name) new_plugin = plugin_line.format(owner=new_owner, name=new_name)
self.redirect[old_plugin] = new_plugin self.redirect[old_plugin] = new_plugin
def prefetch_git(self, ref: str) -> str:
data = subprocess.check_output( def prefetch(self, commit: str) -> str:
["nix-prefetch-git", "--fetch-submodules", self.url(""), ref] if self.has_submodules():
) sha256 = super().prefetch(commit)
return json.loads(data)["sha256"] else:
sha256 = self.prefetch_github(commit)
return sha256
def prefetch_github(self, ref: str) -> str: def prefetch_github(self, ref: str) -> str:
data = subprocess.check_output( data = subprocess.check_output(
@ -162,6 +206,33 @@ class Repo:
) )
return data.strip().decode("utf-8") return data.strip().decode("utf-8")
def as_nix(self, plugin: "Plugin") -> str:
if plugin.has_submodules:
submodule_attr = "\n fetchSubmodules = true;"
else:
submodule_attr = ""
return f'''fetchFromGitHub {{
owner = "{self.owner}";
repo = "{self.repo}";
rev = "{plugin.commit}";
sha256 = "{plugin.sha256}";{submodule_attr}
}}'''
@dataclass
class PluginDesc:
repo: Repo
branch: str
alias: Optional[str]
@property
def name(self):
if self.alias is None:
return self.repo.name
else:
return self.alias
class Plugin: class Plugin:
def __init__( def __init__(
@ -193,6 +264,7 @@ class Plugin:
return copy return copy
class Editor: class Editor:
"""The configuration of the update script.""" """The configuration of the update script."""
@ -241,9 +313,9 @@ class Editor:
def create_parser(self): def create_parser(self):
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description=( description=(f"""
f"Updates nix derivations for {self.name} plugins" Updates nix derivations for {self.name} plugins.\n
f"By default from {self.default_in} to {self.default_out}" By default from {self.default_in} to {self.default_out}"""
) )
) )
parser.add_argument( parser.add_argument(
@ -320,26 +392,24 @@ def prefetch_plugin(
p: PluginDesc, p: PluginDesc,
cache: "Optional[Cache]" = None, cache: "Optional[Cache]" = None,
) -> Tuple[Plugin, Dict[str, str]]: ) -> Tuple[Plugin, Dict[str, str]]:
user, repo_name, branch, alias = p.owner, p.repo, p.branch, p.alias repo, branch, alias = p.repo, p.branch, p.alias
log.info(f"Fetching last commit for plugin {user}/{repo_name}@{branch}") name = alias or p.repo.name
repo = Repo(user, repo_name, branch, alias) commit = None
log.info(f"Fetching last commit for plugin {name} from {repo.uri}@{branch}")
commit, date = repo.latest_commit() commit, date = repo.latest_commit()
has_submodules = repo.has_submodules()
cached_plugin = cache[commit] if cache else None cached_plugin = cache[commit] if cache else None
if cached_plugin is not None: if cached_plugin is not None:
log.debug("Cache hit !") log.debug("Cache hit !")
cached_plugin.name = alias or repo_name cached_plugin.name = name
cached_plugin.date = date cached_plugin.date = date
return cached_plugin, repo.redirect return cached_plugin, repo.redirect
print(f"prefetch {user}/{repo_name}") has_submodules = repo.has_submodules()
if has_submodules: print(f"prefetch {name}")
sha256 = repo.prefetch_git(commit) sha256 = repo.prefetch(commit)
else:
sha256 = repo.prefetch_github(commit)
return ( return (
Plugin(alias or repo_name, commit, has_submodules, sha256, date=date), Plugin(name, commit, has_submodules, sha256, date=date),
repo.redirect, repo.redirect,
) )
@ -360,16 +430,17 @@ def print_download_error(plugin: str, ex: Exception):
def check_results( def check_results(
results: List[Tuple[str, str, Union[Exception, Plugin], Dict[str, str]]] results: List[Tuple[PluginDesc, Union[Exception, Plugin], Dict[str, str]]]
) -> Tuple[List[Tuple[str, str, Plugin]], Dict[str, str]]: ) -> Tuple[List[Tuple[PluginDesc, Plugin]], Dict[str, str]]:
''' '''
failures: List[Tuple[str, Exception]] = [] failures: List[Tuple[str, Exception]] = []
plugins = [] plugins = []
redirects: Dict[str, str] = {} redirects: Dict[str, str] = {}
for (owner, name, result, redirect) in results: for (pdesc, result, redirect) in results:
if isinstance(result, Exception): if isinstance(result, Exception):
failures.append((name, result)) failures.append((pdesc.name, result))
else: else:
plugins.append((owner, name, result)) plugins.append((pdesc, result))
redirects.update(redirect) redirects.update(redirect)
print(f"{len(results) - len(failures)} plugins were checked", end="") print(f"{len(results) - len(failures)} plugins were checked", end="")
@ -384,17 +455,29 @@ def check_results(
sys.exit(1) sys.exit(1)
def make_repo(uri, branch, alias) -> Repo:
'''Instantiate a Repo with the correct specialization depending on server (gitub spec)'''
# dumb check to see if it's of the form owner/repo (=> github) or https://...
res = uri.split('/')
if len(res) <= 2:
repo = RepoGitHub(res[0], res[1], branch, alias)
else:
repo = Repo(uri.strip(), branch, alias)
return repo
def parse_plugin_line(line: str) -> PluginDesc: def parse_plugin_line(line: str) -> PluginDesc:
branch = "HEAD" branch = "HEAD"
alias = None alias = None
name, repo = line.split("/") uri = line
if " as " in repo: if " as " in uri:
repo, alias = repo.split(" as ") uri, alias = line.split(" as ")
alias = alias.strip() alias = alias.strip()
if "@" in repo: if "@" in line:
repo, branch = repo.split("@") uri, branch = line.split("@")
return PluginDesc(name.strip(), repo.strip(), branch.strip(), alias) repo = make_repo(uri.strip(), branch.strip(), alias)
return PluginDesc(repo, branch.strip(), alias)
def load_plugin_spec(plugin_file: str) -> List[PluginDesc]: def load_plugin_spec(plugin_file: str) -> List[PluginDesc]:
@ -404,10 +487,6 @@ def load_plugin_spec(plugin_file: str) -> List[PluginDesc]:
if line.startswith("#"): if line.startswith("#"):
continue continue
plugin = parse_plugin_line(line) plugin = parse_plugin_line(line)
if not plugin.owner:
msg = f"Invalid repository {line}, must be in the format owner/repo[ as alias]"
print(msg, file=sys.stderr)
sys.exit(1)
plugins.append(plugin) plugins.append(plugin)
return plugins return plugins
@ -467,14 +546,13 @@ class Cache:
def prefetch( def prefetch(
pluginDesc: PluginDesc, cache: Cache pluginDesc: PluginDesc, cache: Cache
) -> Tuple[str, str, Union[Exception, Plugin], dict]: ) -> Tuple[PluginDesc, Union[Exception, Plugin], dict]:
owner, repo = pluginDesc.owner, pluginDesc.repo
try: try:
plugin, redirect = prefetch_plugin(pluginDesc, cache) plugin, redirect = prefetch_plugin(pluginDesc, cache)
cache[plugin.commit] = plugin cache[plugin.commit] = plugin
return (owner, repo, plugin, redirect) return (pluginDesc, plugin, redirect)
except Exception as e: except Exception as e:
return (owner, repo, e, {}) return (pluginDesc, e, {})
def rewrite_input( def rewrite_input(

View file

@ -52,38 +52,31 @@ HEADER = (
class VimEditor(pluginupdate.Editor): class VimEditor(pluginupdate.Editor):
def generate_nix(self, plugins: List[Tuple[str, str, pluginupdate.Plugin]], outfile: str): def generate_nix(self, plugins: List[Tuple[pluginupdate.PluginDesc, pluginupdate.Plugin]], outfile: str):
sorted_plugins = sorted(plugins, key=lambda v: v[2].name.lower()) sorted_plugins = sorted(plugins, key=lambda v: v[0].name.lower())
with open(outfile, "w+") as f: with open(outfile, "w+") as f:
f.write(HEADER) f.write(HEADER)
f.write(textwrap.dedent(""" f.write(textwrap.dedent("""
{ lib, buildVimPluginFrom2Nix, fetchFromGitHub }: { lib, buildVimPluginFrom2Nix, fetchFromGitHub, fetchgit }:
final: prev: final: prev:
{""" {"""
)) ))
for owner, repo, plugin in sorted_plugins: for pdesc, plugin in sorted_plugins:
if plugin.has_submodules:
submodule_attr = "\n fetchSubmodules = true;"
else:
submodule_attr = ""
f.write(textwrap.indent(textwrap.dedent( repo = pdesc.repo
src_nix = repo.as_nix(plugin)
f.write(
f""" f"""
{plugin.normalized_name} = buildVimPluginFrom2Nix {{ {plugin.normalized_name} = buildVimPluginFrom2Nix {{
pname = "{plugin.name}"; pname = "{plugin.name}";
version = "{plugin.version}"; version = "{plugin.version}";
src = fetchFromGitHub {{ src = {src_nix};
owner = "{owner}"; meta.homepage = "{repo.uri}";
repo = "{repo}";
rev = "{plugin.commit}";
sha256 = "{plugin.sha256}";{submodule_attr}
}};
meta.homepage = "https://github.com/{owner}/{repo}/";
}}; }};
""" """
), ' ')) )
f.write("\n}\n") f.write("\n}\n")
print(f"updated {outfile}") print(f"updated {outfile}")