mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2024-12-30 17:56:58 +00:00
c545d5c0a1
This builds and parses wheels for a given target, then renders a DOT-format graph into a PNG file.
316 lines
11 KiB
Python
Executable File
316 lines
11 KiB
Python
Executable File
#!/usr/bin/env python
|
|
|
|
# Run this as "./graph-deps.py ." from your source tree, then open out.png .
|
|
# You can also use a PyPI package name, e.g. "./graph-deps.py tahoe-lafs".
|
|
#
|
|
# This builds all necessary wheels for your project (in a tempdir), scans
|
|
# them to learn their inter-dependencies, generates a DOT-format graph
|
|
# specification, then runs the "dot" program (from the "graphviz" package) to
|
|
# turn this into a PNG image.
|
|
|
|
# To hack on this script (e.g. change the way it generates DOT) without
|
|
# re-building the wheels each time, set --wheeldir= to some not-existent
|
|
# path. It will write the wheels to that directory instead of a tempdir. The
|
|
# next time you run it, if --wheeldir= points to a directory, it will read
|
|
# the wheels from there.
|
|
|
|
# To hack on the DOT output without re-running this script, add --write-dot,
|
|
# which will cause it to write "out.dot". Edit that file, then run "dot -Tpng
|
|
# out.dot >out.png" to re-render the graph.
|
|
|
|
# Install 'click' first. I run this with py2, but py3 might work too, if the
|
|
# wheels can be built with py3.
|
|
|
|
from __future__ import print_function, unicode_literals
|
|
import os, sys, subprocess, json, tempfile, zipfile, io, re, itertools
|
|
import email.parser
|
|
from pprint import pprint
|
|
import click
|
|
|
|
all_packages = {} # name -> version
|
|
all_reqs = {} # name -> specs
|
|
all_pure = set()
|
|
|
|
# 1: build a local directory of wheels for the given target
|
|
# pip wheel --wheel-dir=tempdir sys.argv[1]
|
|
def build_wheels(target, wheeldir):
|
|
print("-- building wheels for '%s' in %s" % (target, wheeldir))
|
|
pip = subprocess.Popen(["pip", "wheel", "--wheel-dir", wheeldir, target],
|
|
stdout=subprocess.PIPE)
|
|
stdout = pip.communicate()[0]
|
|
if pip.returncode != 0:
|
|
sys.exit(pip.returncode)
|
|
# 'pip wheel .' starts with "Processing /path/to/." but ends with
|
|
# "Successfully built PKGNAME". 'pip wheel PKGNAME' start with
|
|
# "Collecting PKGNAME" but ends with e.g. "Skipping foo, due to already
|
|
# being wheel."
|
|
lines = stdout.decode("utf-8").splitlines()
|
|
if lines[0].startswith("Collecting "):
|
|
root_pkgname = lines[0].split()[-1]
|
|
elif lines[-1].startswith("Successfully built "):
|
|
root_pkgname = lines[-1].split()[-1]
|
|
else:
|
|
print("Unable to figure out root package name")
|
|
print("'pip wheel %s' output is:" % target)
|
|
print(stdout)
|
|
sys.exit(1)
|
|
with open(os.path.join(wheeldir, "root_pkgname"), "w") as f:
|
|
f.write(root_pkgname+"\n")
|
|
|
|
def get_root_pkgname(wheeldir):
|
|
with open(os.path.join(wheeldir, "root_pkgname"), "r") as f:
|
|
return f.read().strip()
|
|
|
|
# 2: for each wheel, find the *.dist-info file, find metadata.json inside
|
|
# that, extract metadata.run_requires[0].requires
|
|
|
|
def add(name, version, extras, reqs, raw):
|
|
if set(reqs) - set([None]) - set(extras):
|
|
print("um, %s metadata has mismatching extras/reqs" % name)
|
|
pprint(extras)
|
|
pprint(reqs)
|
|
print("raw data:")
|
|
pprint(raw)
|
|
raise ValueError
|
|
if None not in reqs:
|
|
print("um, %s has no reqs" % name)
|
|
print("raw data:")
|
|
pprint(raw)
|
|
raise ValueError
|
|
all_packages[name] = version
|
|
all_reqs[name] = reqs
|
|
|
|
def parse_metadata_json(f):
|
|
md = json.loads(f.read().decode("utf-8"))
|
|
name = md["name"].lower()
|
|
version = md["version"]
|
|
try:
|
|
reqs = {None: []} # extra_name/None -> [specs]
|
|
if "run_requires" in md:
|
|
for r in md["run_requires"]:
|
|
reqs[r.get("extra", None)] = r["requires"]
|
|
# this package provides the following extras
|
|
extras = md.get("extras", [])
|
|
#for e in extras:
|
|
# if e not in reqs:
|
|
# reqs[e] = []
|
|
except KeyError:
|
|
print("error in '%s'" % name)
|
|
pprint(md)
|
|
raise
|
|
add(name, version, extras, reqs, md)
|
|
return name
|
|
|
|
def parse_METADATA(f):
|
|
data = f.read().decode("utf-8")
|
|
md = email.parser.Parser().parsestr(data)
|
|
|
|
name = md.get_all("Name")[0].lower()
|
|
version = md.get_all("Version")[0]
|
|
reqs = {None: []}
|
|
for req in md.get_all("Requires-Dist") or []: # untested
|
|
pieces = [p.strip() for p in req.split(";")]
|
|
spec = pieces[0]
|
|
extra = None
|
|
if len(pieces) > 1:
|
|
mo = re.search(r"extra == '(\w+)'", pieces[1])
|
|
if mo:
|
|
extra = mo.group(1)
|
|
if extra not in reqs:
|
|
reqs[extra] = []
|
|
reqs[extra].append(spec)
|
|
extras = md.get_all("Provides-Extra") or [] # untested
|
|
add(name, version, extras, reqs, data)
|
|
return name
|
|
|
|
def parse_wheels(wheeldir):
|
|
for fn in os.listdir(wheeldir):
|
|
if not fn.endswith(".whl"):
|
|
continue
|
|
zf = zipfile.ZipFile(os.path.join(wheeldir, fn))
|
|
zfnames = zf.namelist()
|
|
mdfns = [n for n in zfnames if n.endswith(".dist-info/metadata.json")]
|
|
if mdfns:
|
|
name = parse_metadata_json(zf.open(mdfns[0]))
|
|
else:
|
|
mdfns = [n for n in zfnames if n.endswith(".dist-info/METADATA")]
|
|
if mdfns:
|
|
name = parse_METADATA(zf.open(mdfns[0]))
|
|
else:
|
|
print("no metadata for", fn)
|
|
continue
|
|
is_pure = False
|
|
wheel_fns = [n for n in zfnames if n.endswith(".dist-info/WHEEL")]
|
|
if wheel_fns:
|
|
with zf.open(wheel_fns[0]) as wheel:
|
|
for line in wheel:
|
|
if line.lower().rstrip() == b"root-is-purelib: true":
|
|
is_pure = True
|
|
if is_pure:
|
|
all_pure.add(name)
|
|
return get_root_pkgname(wheeldir)
|
|
|
|
# 3: emit a .dot file with a graph of all the dependencies
|
|
|
|
def dot_name(name, extra):
|
|
# the 'dot' format enforces C identifier syntax on node names
|
|
assert name.lower() == name, name
|
|
name = "%s__%s" % (name, extra)
|
|
return name.replace("-", "_").replace(".", "_")
|
|
|
|
def parse_spec(spec):
|
|
# turn "twisted[tls] (>=16.0.0)" into "twisted"
|
|
pieces = spec.split()
|
|
name_and_extras = pieces[0]
|
|
paren_constraint = pieces[1] if len(pieces) > 1 else ""
|
|
if "[" in name_and_extras:
|
|
name = name_and_extras[:name_and_extras.find("[")]
|
|
extras_bracketed = name_and_extras[name_and_extras.find("["):]
|
|
extras = extras_bracketed.strip("[]").split(",")
|
|
else:
|
|
name = name_and_extras
|
|
extras = []
|
|
return name.lower(), extras, paren_constraint
|
|
|
|
def format_attrs(**kwargs):
|
|
# return "", or "[attr=value attr=value]"
|
|
if not kwargs or all([not(v) for v in kwargs.values()]):
|
|
return ""
|
|
def escape(s):
|
|
return s.replace('\n', r'\n').replace('"', r'\"')
|
|
pieces = ['%s="%s"' % (k, escape(kwargs[k]))
|
|
for k in sorted(kwargs)
|
|
if kwargs[k]]
|
|
body = " ".join(pieces)
|
|
return "[%s]" % body
|
|
|
|
# We draw a node for each wheel. When one of the inbound dependencies asks
|
|
# for an extra, we assign that (target, extra) pair a color. We draw outbound
|
|
# links for all non-extra dependencies in black. If something asked the
|
|
# target for an extra, we also draw links for the extra deps using the
|
|
# assigned color.
|
|
|
|
COLORS = itertools.cycle(["green", "blue", "red", "purple"])
|
|
extras_to_show = {} # maps (target, extraname) -> colorname
|
|
|
|
def add_extra_to_show(targetname, extraname):
|
|
key = (targetname, extraname)
|
|
if key not in extras_to_show:
|
|
extras_to_show[key] = next(COLORS)
|
|
|
|
_scanned = set()
|
|
def scan(name, extra=None, path=""):
|
|
dupkey = (name, extra)
|
|
if dupkey in _scanned:
|
|
#print("SCAN-SKIP %s %s[%s]" % (path, name, extra))
|
|
return
|
|
_scanned.add(dupkey)
|
|
#print("SCAN %s %s[%s]" % (path, name, extra))
|
|
add_extra_to_show(name, extra)
|
|
for spec in all_reqs[name][extra]:
|
|
#print("-", spec)
|
|
dep_name, dep_extras, dep_constraint = parse_spec(spec)
|
|
#print("--", dep_name, dep_extras)
|
|
children = set(dep_extras)
|
|
children.add(None)
|
|
for dep_extra in children:
|
|
scan(dep_name, dep_extra,
|
|
path=path+"->%s[%s]" % (dep_name, dep_extra))
|
|
|
|
def generate_dot():
|
|
f = io.StringIO()
|
|
f.write("digraph {\n")
|
|
for name, extra in extras_to_show.keys():
|
|
version = all_packages[name]
|
|
if extra:
|
|
label = "%s[%s]\n%s" % (name, extra, version)
|
|
else:
|
|
label = "%s\n%s" % (name, version)
|
|
color = None
|
|
if name not in all_pure:
|
|
color = "red"
|
|
f.write('%s %s\n' % (dot_name(name, extra),
|
|
format_attrs(label=label, color=color)))
|
|
|
|
for (source, extra), color in extras_to_show.items():
|
|
if extra:
|
|
f.write('%s -> %s [weight="50" style="dashed"]\n' %
|
|
(dot_name(source, extra),
|
|
dot_name(source, None)))
|
|
specs = all_reqs[source][extra]
|
|
for spec in specs:
|
|
reqname, reqextras, paren_constraint = parse_spec(spec)
|
|
#extras_bracketed = "[%s]" % ",".join(extras) if extras else ""
|
|
#edge_label = " ".join([p for p in [extras_bracketed,
|
|
# paren_constraint] if p])
|
|
assert None not in reqextras
|
|
if not reqextras:
|
|
reqextras = [None]
|
|
for reqextra in reqextras:
|
|
edge_label = ""
|
|
if extra:
|
|
edge_label += "(%s[%s] wants)\n" % (source, extra)
|
|
edge_label += spec
|
|
style = "bold" if reqextra else "solid"
|
|
f.write('%s -> %s %s\n' % (dot_name(source, extra),
|
|
dot_name(reqname, reqextra),
|
|
format_attrs(label=edge_label,
|
|
fontcolor=color,
|
|
style=style,
|
|
color=color)))
|
|
f.write("}\n")
|
|
return f
|
|
|
|
# 4: convert to .png
|
|
def dot_to_png(f, png_fn):
|
|
png = open(png_fn, "wb")
|
|
dot = subprocess.Popen(["dot", "-Tpng"], stdin=subprocess.PIPE, stdout=png)
|
|
dot.communicate(f.getvalue().encode("utf-8"))
|
|
if dot.returncode != 0:
|
|
sys.exit(dot.returncode)
|
|
png.close()
|
|
print("wrote graph to %s" % png_fn)
|
|
|
|
@click.command()
|
|
@click.argument("target")
|
|
@click.option("--wheeldir", default=None, type=str)
|
|
@click.option("--write-dot/--no-write-dot", default=False)
|
|
def go(target, wheeldir, write_dot):
|
|
if wheeldir:
|
|
if os.path.isdir(wheeldir):
|
|
print("loading wheels from", wheeldir)
|
|
root_pkgname = parse_wheels(wheeldir)
|
|
else:
|
|
assert not os.path.exists(wheeldir)
|
|
print("loading wheels from", wheeldir)
|
|
build_wheels(target, wheeldir)
|
|
root_pkgname = parse_wheels(wheeldir)
|
|
else:
|
|
wheeldir = tempfile.mkdtemp()
|
|
build_wheels(target, wheeldir)
|
|
root_pkgname = parse_wheels(wheeldir)
|
|
print("root package:", root_pkgname)
|
|
|
|
# parse the requirement specs (which look like "Twisted[tls] (>=13.0.0)")
|
|
# enough to identify the package name
|
|
pprint(all_packages)
|
|
pprint(all_reqs)
|
|
print("pure:", " ".join(sorted(all_pure)))
|
|
|
|
for name in all_packages.keys():
|
|
extras_to_show[(name, None)] = "black"
|
|
|
|
scan(root_pkgname)
|
|
f = generate_dot()
|
|
|
|
if write_dot:
|
|
with open("out.dot", "w") as dotf:
|
|
dotf.write(f.getvalue())
|
|
print("wrote DOT to out.dot")
|
|
dot_to_png(f, "out.png")
|
|
|
|
return 0
|
|
|
|
if __name__ == "__main__":
|
|
go()
|