build-support/docker: customisable layering strategy

Allow customisation of the algorithm used to convert nix references
graph (created from docker image contents) to docker layers.

A collection of building blocks (python functions) is provided, which
use can assembled into a processing pipeline by specifying a list of
operations (and their initial arguments) via a nix list.

Nix references graph if first converted into a python igraph.Graph
object (with each vertex representing a nix path), which is then fed
into the user defined pipeline. Each stage in the pipeline represents a
function call, with initial arguments specified by the user in nix, and
the last argument being the result of the previous stage in the pipeline
(or the initial Graph object). Each step of the pipeline is expected to
produce a data structure consisting of arbitrarily  nested lists/dicts
with Graph objects (representing docker layers) at it's leafs. The
result of the last stage in the pipeline is recursively flattened (with
each dict converted into list of values), until a flat list of Graphs
remains. This is then output as a json array of arrays (each Graph
converted into an array of paths).

This functionality is made available via new `layeringPipeline` argument
to the `streamLayeredImage`/`buildLayeredImage` functions. The default
value of the argument has been chosen to to preserve current layering
behaviour.

Co-authored-by: Sandro <sandro.jaeckel@gmail.com>
This commit is contained in:
Adrian Gierakowski 2021-05-10 14:30:05 +01:00
parent bba140c5a3
commit 5b4a8db4d9
30 changed files with 7243 additions and 63 deletions

View File

@ -919,10 +919,19 @@ rec {
, includeStorePaths ? true
, includeNixDB ? false
, passthru ? {}
,
, # Pipeline used to produce docker layers. If not set, popularity contest
# algorithm is used. If set, maxLayers is ignored as the author of the
# pipeline can use one of the available functions (like "limit_layers")
# to control the amount of layers.
# See: pkgs/build-support/flatten-references-graph/src/flatten_references_graph/pipe.py
# for available functions, and it's test for how to use them.
# WARNING!! this interface is highly experimental and subject to change.
layeringPipeline ? null
, # Enables debug logging for the layering pipeline.
debug ? false
}:
assert
(lib.assertMsg (maxLayers > 1)
(lib.assertMsg (layeringPipeline == null && maxLayers > 1)
"the maxLayers argument of dockerTools.buildLayeredImage function must be greather than 1 (current value: ${toString maxLayers})");
assert
(lib.assertMsg (enableFakechroot -> !stdenv.hostPlatform.isDarwin) ''
@ -999,18 +1008,23 @@ rec {
'';
};
closureRoots = lib.optionals includeStorePaths /* normally true */ (
[ baseJson customisationLayer ]
);
overallClosure = writeText "closure" (lib.concatStringsSep " " closureRoots);
# These derivations are only created as implementation details of docker-tools,
# so they'll be excluded from the created images.
unnecessaryDrvs = [ baseJson overallClosure customisationLayer ];
layersJsonFile = buildPackages.dockerMakeLayers {
inherit debug;
closureRoots = optionals includeStorePaths [ baseJson customisationLayer ];
excludePaths = [ baseJson customisationLayer ];
pipeline =
if layeringPipeline != null
then layeringPipeline
else import
./popularity-contest-layering-pipeline.nix
{ inherit lib jq runCommand; }
{ inherit fromImage maxLayers; }
;
};
conf = runCommand "${baseName}-conf.json"
{
inherit fromImage maxLayers created mtime uid gid uname gname;
inherit fromImage created mtime uid gid uname gname layersJsonFile;
imageName = lib.toLower name;
preferLocalBuild = true;
passthru.imageTag =
@ -1018,7 +1032,6 @@ rec {
then tag
else
lib.head (lib.strings.splitString "-" (baseNameOf (builtins.unsafeDiscardStringContext conf.outPath)));
paths = buildPackages.referencesByPopularity overallClosure;
nativeBuildInputs = [ jq ];
} ''
${if (tag == null) then ''
@ -1038,54 +1051,7 @@ rec {
mtime="$(date -Iseconds -d "$mtime")"
fi
paths() {
cat $paths ${lib.concatMapStringsSep " "
(path: "| (grep -v ${path} || true)")
unnecessaryDrvs}
}
# Compute the number of layers that are already used by a potential
# 'fromImage' as well as the customization layer. Ensure that there is
# still at least one layer available to store the image contents.
usedLayers=0
# subtract number of base image layers
if [[ -n "$fromImage" ]]; then
(( usedLayers += $(tar -xOf "$fromImage" manifest.json | jq '.[0].Layers | length') ))
fi
# one layer will be taken up by the customisation layer
(( usedLayers += 1 ))
if ! (( $usedLayers < $maxLayers )); then
echo >&2 "Error: usedLayers $usedLayers layers to store 'fromImage' and" \
"'extraCommands', but only maxLayers=$maxLayers were" \
"allowed. At least 1 layer is required to store contents."
exit 1
fi
availableLayers=$(( maxLayers - usedLayers ))
# Create $maxLayers worth of Docker Layers, one layer per store path
# unless there are more paths than $maxLayers. In that case, create
# $maxLayers-1 for the most popular layers, and smush the remainaing
# store paths in to one final layer.
#
# The following code is fiddly w.r.t. ensuring every layer is
# created, and that no paths are missed. If you change the
# following lines, double-check that your code behaves properly
# when the number of layers equals:
# maxLayers-1, maxLayers, and maxLayers+1, 0
paths |
jq -sR '
rtrimstr("\n") | split("\n")
| (.[:$maxLayers-1] | map([.])) + [ .[$maxLayers-1:] ]
| map(select(length > 0))
' \
--argjson maxLayers "$availableLayers" > store_layers.json
# The index on $store_layers is necessary because the --slurpfile
# automatically reads the file as an array.
cat ${baseJson} | jq '
jq '
. + {
"store_dir": $store_dir,
"from_image": $from_image,
@ -1101,7 +1067,7 @@ rec {
}
' --arg store_dir "${storeDir}" \
--argjson from_image ${if fromImage == null then "null" else "'\"${fromImage}\"'"} \
--slurpfile store_layers store_layers.json \
--slurpfile store_layers "$layersJsonFile" \
--arg customisation_layer ${customisationLayer} \
--arg repo_tag "$imageName:$imageTag" \
--arg created "$created" \
@ -1109,8 +1075,9 @@ rec {
--arg uid "$uid" \
--arg gid "$gid" \
--arg uname "$uname" \
--arg gname "$gname" |
tee $out
--arg gname "$gname" \
${baseJson} \
| tee $out
'';
result = runCommand "stream-${baseName}"

View File

@ -0,0 +1,47 @@
{
coreutils,
flattenReferencesGraph,
lib,
jq,
stdenvNoCC,
}:
{
closureRoots,
excludePaths ? [ ],
# This could be a path to (or a derivation producing a path to)
# a json file containing the pipeline
pipeline ? [ ],
debug ? false,
}:
if closureRoots == [ ] then
builtins.toFile "docker-layers-empty" "[]"
else
stdenvNoCC.mkDerivation {
name = "docker-layers";
__structuredAttrs = true;
# graph, exclude_paths and pipeline are expected by the
# flatten_references_graph executable.
exportReferencesGraph.graph = closureRoots;
exclude_paths = excludePaths;
inherit pipeline;
# builder cannot refer to derivation outputs
PATH = "${coreutils}/bin:${flattenReferencesGraph}/bin:${jq}/bin";
builder = builtins.toFile "docker-make-layers-builder" ''
. .attrs.sh
flatten_references_graph_arg=.attrs.json
echo "pipeline: $pipeline"
if jq -e '.pipeline | type == "string"' .attrs.json; then
jq '. + { "pipeline": $pipeline[0] }' \
--slurpfile pipeline "$pipeline" \
.attrs.json > flatten_references_graph_arg.json
flatten_references_graph_arg=flatten_references_graph_arg.json
fi
${lib.optionalString debug "export DEBUG=True"}
flatten_references_graph "$flatten_references_graph_arg" > ''${outputs[out]}
'';
}

View File

@ -0,0 +1,34 @@
{
lib,
runCommand,
jq,
}:
{
maxLayers,
fromImage ? null,
}:
runCommand "popularity-contest-layering-pipeline.json" { inherit maxLayers; } ''
# Compute the number of layers that are already used by a potential
# 'fromImage' as well as the customization layer. Ensure that there is
# still at least one layer available to store the image contents.
# one layer will be taken up by the customisation layer
usedLayers=1
${lib.optionalString (fromImage != null) ''
# subtract number of base image layers
baseImageLayersCount=$(tar -xOf "${fromImage}" manifest.json | ${lib.getExe jq} '.[0].Layers | length')
(( usedLayers += baseImageLayersCount ))
''}
if ! (( $usedLayers < $maxLayers )); then
echo >&2 "Error: usedLayers $usedLayers layers to store 'fromImage' and" \
"'extraCommands', but only maxLayers=$maxLayers were" \
"allowed. At least 1 layer is required to store contents."
exit 1
fi
availableLayers=$(( maxLayers - usedLayers ))
# Produce pipeline which uses popularity_contest algo.
echo '[["popularity_contest"],["limit_layers",'$availableLayers']]' > $out
''

View File

@ -0,0 +1,54 @@
# Start this shell with:
# nix-shell path/to/root/of/nixpkgs -A flattenReferencesGraph.dev-shell
{
mkShell,
callPackage,
python3Packages,
}:
let
helpers = callPackage (import ./helpers.nix) { };
in
mkShell {
inputsFrom = [ (callPackage (import ./package.nix) { }) ];
buildInputs = [
helpers.format
helpers.lint
helpers.unittest
# This is needed to plot graphs when DEBUG_PLOT is set to True.
python3Packages.pycairo
# This can be used on linux to display the graphs.
# On other platforms the image viewer needs to be set with
# DEBUG_PLOT_IMAGE_VIEWER env var.
# pkgs.gwenview
];
shellHook = ''
echo '
**********************************************************************
**********************************************************************
Commands useful for development (should be executed from scr dir):
format
* formats all files in place using autopep8
lint
* lints all files using flake8
unittest
* runs all unit tests
following env vars can be set to enable extra output in tests:
- DEBUG=True - enable debug logging
- DEBUG_PLOT=True - plot graphs processed by split_paths.py and
subcomponent.py
- DEBUG_PLOT_IMAGE_VIEWER=$PATH_OF_IMAGE_VIEWER_APP - app used to
display plots (default: gwenview)
- DEBUG_PLOT_SAVE_BASE_NAME=$SOME_NAME - if set, plots will be saved
to files instead of displayed with image viewer
**********************************************************************
**********************************************************************
'
'';
}

View File

@ -0,0 +1,36 @@
{
bash,
writers,
python3Packages,
}:
let
writeCheckedBashBin =
name:
let
interpreter = "${bash}/bin/bash";
in
writers.makeScriptWriter {
inherit interpreter;
check = "${interpreter} -n $1";
} "/bin/${name}";
# Helpers used during build/development.
lint = writeCheckedBashBin "lint" ''
${python3Packages.flake8}/bin/flake8 --show-source ''${@}
'';
unittest = writeCheckedBashBin "unittest" ''
if [ "$#" -eq 0 ]; then
set -- discover -p '*_test.py'
fi
${python3Packages.python}/bin/python -m unittest "''${@}"
'';
format = writeCheckedBashBin "format" ''
${python3Packages.autopep8}/bin/autopep8 -r -i . "''${@}"
'';
in
{
inherit format lint unittest;
}

View File

@ -0,0 +1,32 @@
{
callPackage,
nix-gitignore,
python3Packages,
}:
let
helpers = callPackage ./helpers.nix { };
pythonPackages = python3Packages;
in
pythonPackages.buildPythonApplication {
version = "0.1.0";
pname = "flatten-references-graph";
# Note: this uses only ./src/.gitignore
src = nix-gitignore.gitignoreSource [ ] ./src;
propagatedBuildInputs = with pythonPackages; [
igraph
toolz
];
doCheck = true;
checkPhase = ''
${helpers.unittest}/bin/unittest
'';
passthru = {
dev-shell = callPackage ./dev-shell.nix { };
};
}

View File

@ -0,0 +1,4 @@
[flake8]
max-line-length = 80
[pep8]
aggressive = 1

View File

@ -0,0 +1 @@
__pycache__

View File

@ -0,0 +1,48 @@
import json as json
import sys as sys
from .lib import debug, load_json
from .flatten_references_graph import flatten_references_graph
def main_impl(file_path):
debug(f"loading json from {file_path}")
data = load_json(file_path)
# These are required
references_graph = data["graph"]
pipeline = data["pipeline"]
# This is optional
exclude_paths = data.get("exclude_paths")
debug("references_graph", references_graph)
debug("pipeline", pipeline)
debug("exclude_paths", exclude_paths)
result = flatten_references_graph(
references_graph,
pipeline,
exclude_paths=exclude_paths
)
debug("result", result)
return json.dumps(
result,
# For reproducibility.
sort_keys=True,
indent=2,
# Avoid tailing whitespaces.
separators=(",", ": ")
)
def main():
file_path = sys.argv[1]
print(main_impl(file_path))
if __name__ == "__main__":
main()

View File

@ -0,0 +1,52 @@
import unittest
import inspect as inspect
from .__main__ import main_impl
from .lib import path_relative_to_file
if __name__ == "__main__":
unittest.main()
class TestMain(unittest.TestCase):
def test_main_impl(self):
file_path = path_relative_to_file(
__file__,
"__test_fixtures/flatten-references-graph-main-input.json"
)
result = main_impl(file_path)
self.assertEqual(
result,
inspect.cleandoc(
"""
[
[
"B"
],
[
"C"
],
[
"A"
]
]
"""
)
)
def test_main_impl2(self):
file_path = path_relative_to_file(
__file__,
"__test_fixtures/flatten-references-graph-main-input-no-paths.json"
)
result = main_impl(file_path)
self.assertEqual(
result,
inspect.cleandoc("[]")
)

View File

@ -0,0 +1,31 @@
[
{
"closureSize": 1,
"narHash": "sha256:a",
"narSize": 2,
"path": "A",
"references": [
"A",
"B",
"C"
]
},
{
"closureSize": 3,
"narHash": "sha256:b",
"narSize": 4,
"path": "B",
"references": [
"C"
]
},
{
"closureSize": 5,
"narHash": "sha256:c",
"narSize": 6,
"path": "C",
"references": [
"C"
]
}
]

View File

@ -0,0 +1,23 @@
{
"graph": [
{
"closureSize": 168,
"narHash": "sha256:0dl4kfhb493yz8a5wgh0d2z3kr61z65gp85vx33rqwa1m1lnymy8",
"narSize": 168,
"path": "/nix/store/1kaw7p40kknss1qq8gark3azvfp26q8x-no-store-paths-base.json",
"references": []
}
],
"pipeline": [
[
"popularity_contest"
],
[
"limit_layers",
99
]
],
"exclude_paths": [
"/nix/store/1kaw7p40kknss1qq8gark3azvfp26q8x-no-store-paths-base.json"
]
}

View File

@ -0,0 +1,36 @@
{
"graph": [
{
"closureSize": 1,
"narHash": "sha256:a",
"narSize": 2,
"path": "A",
"references": [
"A",
"B",
"C"
]
},
{
"closureSize": 3,
"narHash": "sha256:b",
"narSize": 4,
"path": "B",
"references": [
"C"
]
},
{
"closureSize": 5,
"narHash": "sha256:c",
"narSize": 6,
"path": "C",
"references": [
"C"
]
}
],
"pipeline": [
["split_paths", ["B"]]
]
}

View File

@ -0,0 +1,45 @@
from toolz import curried as tlz
from .lib import (
flatten,
over,
references_graph_to_igraph
)
from .pipe import pipe
MAX_LAYERS = 127
def create_list_of_lists_of_strings(deeply_nested_lists_or_dicts_of_graphs):
list_of_graphs = flatten(deeply_nested_lists_or_dicts_of_graphs)
return list(
filter(
# remove empty layers
lambda xs: len(xs) > 0,
tlz.map(
lambda g: g.vs["name"],
list_of_graphs
)
)
)
def flatten_references_graph(references_graph, pipeline, exclude_paths=None):
if exclude_paths is not None:
exclude_paths = frozenset(exclude_paths)
references_graph = tlz.compose(
tlz.map(over(
"references",
lambda xs: frozenset(xs).difference(exclude_paths)
)),
tlz.remove(lambda node: node["path"] in exclude_paths)
)(references_graph)
igraph_graph = references_graph_to_igraph(references_graph)
return create_list_of_lists_of_strings(pipe(
pipeline,
igraph_graph
))

View File

@ -0,0 +1,121 @@
import unittest
from .flatten_references_graph import flatten_references_graph
# from .lib import path_relative_to_file, load_json
if __name__ == "__main__":
unittest.main()
references_graph = [
{
"closureSize": 1,
"narHash": "sha256:a",
"narSize": 2,
"path": "A",
"references": [
"A",
"C",
]
},
{
"closureSize": 3,
"narHash": "sha256:b",
"narSize": 4,
"path": "B",
"references": [
"C",
"D"
]
},
{
"closureSize": 5,
"narHash": "sha256:c",
"narSize": 6,
"path": "C",
"references": [
"C"
]
},
{
"closureSize": 7,
"narHash": "sha256:d",
"narSize": 8,
"path": "D",
"references": [
"D"
]
}
]
class Test(unittest.TestCase):
def test_flatten_references_graph(self):
pipeline = [
["split_paths", ["B"]],
]
result = flatten_references_graph(references_graph, pipeline)
self.assertEqual(
result,
[
# B and it's exclusive deps
["B", "D"],
# Common deps
["C"],
# Rest (without common deps)
["A"]
]
)
pipeline = [
["split_paths", ["B"]],
["over", "main", ["subcomponent_in", ["B"]]],
]
result = flatten_references_graph(references_graph, pipeline)
self.assertEqual(
result,
[
["B"],
["D"],
["C"],
["A"]
]
)
def test_flatten_references_graph_exclude_paths(self):
pipeline = [
["split_paths", ["B"]],
]
result = flatten_references_graph(
references_graph,
pipeline,
exclude_paths=["A"]
)
self.assertEqual(
result,
[
# A was excluded so there is no "rest" or "common" layer
["B", "C", "D"]
]
)
result = flatten_references_graph(
references_graph,
pipeline,
exclude_paths=["D"]
)
self.assertEqual(
result,
[
# D removed from this layer
["B"],
["C"],
["A"]
]
)

View File

@ -0,0 +1,329 @@
from collections.abc import Iterable
from pathlib import Path
from toolz import curried as tlz
from toolz import curry
import igraph as igraph
import itertools as itertools
import json as json
import os as os
import re as re
import sys
DEBUG = os.environ.get("DEBUG", False) == "True"
DEBUG_PLOT = os.environ.get("DEBUG_PLOT", False) == "True"
# If this is set, the plots will be saved to files instead of being displayed
# with default image viewer.
DEBUG_PLOT_SAVE_BASE_NAME = os.environ.get("DEBUG_PLOT_SAVE_BASE_NAME")
c = igraph.configuration.init()
# App used to open the plots when DEBUG_PLOT_SAVE_BASE_NAME is not set.
c["apps.image_viewer"] = os.environ.get("DEBUG_PLOT_IMAGE_VIEWER", "gwenview")
def debug(*args, **kwargs):
if DEBUG:
print(*args, file=sys.stderr, **kwargs)
def debug_plot(graph, name, **kwargs):
if not DEBUG_PLOT:
return
vertex_label = [
# remove /nix/store/HASH- prefix from labels
re.split("^/nix/store/[a-z0-9]{32}-", name)[-1]
for name in graph.vs["name"]
]
save_as = (
None if DEBUG_PLOT_SAVE_BASE_NAME is None
else DEBUG_PLOT_SAVE_BASE_NAME + name + ".png"
)
igraph.plot(
graph,
save_as,
vertex_label=vertex_label,
**(tlz.merge(
{
# "bbox": (3840, 2160),
"bbox": (800, 600),
"margin": 100,
"vertex_label_dist": -5,
"edge_color": "orange",
"vertex_size": 20,
"vertex_label_size": 30,
"edge_arrow_size": 2
},
kwargs
)),
)
def debug_plot_with_highligth(g, vs, layout):
debug_plot(
g,
layout=layout,
# layout=Layout(new_coords),
vertex_color=[
"green" if v.index in vs else "red"
for v in g.vs
]
)
@curry
def pick_keys(keys, d):
return {
key: d[key] for key in keys if key in d
}
def unnest_iterable(xs):
return itertools.chain.from_iterable(xs)
def load_json(file_path):
with open(file_path) as f:
return json.load(f)
@curry
def sorted_by(key, xs):
return sorted(xs, key=lambda x: x[key])
@curry
def find_vertex_by_name_or_none(graph, name):
try:
# NOTE: find by name is constant time.
return graph.vs.find(name)
# This will be thrown if vertex with given name is not found.
except ValueError:
return None
def subcomponent_multi(graph, vertices, mode="out"):
"""Return concatenated subcomponents generated by the given list of
vertices.
"""
return tlz.mapcat(
lambda vertex: graph.subcomponent(vertex, mode=mode),
vertices
)
@curry
def egdes_for_reference_graph_node(path_to_size_dict, reference_graph_node):
source = reference_graph_node["path"]
return map(
lambda x: {"source": source, "target": x},
sorted(
filter(
# references might contain source
lambda x: x != source,
reference_graph_node["references"]
),
key=lambda x: 1 * path_to_size_dict[x]
)
)
reference_graph_node_keys_to_keep = [
"closureSize",
"narSize"
]
pick_reference_graph_node_keys = pick_keys(reference_graph_node_keys_to_keep)
def vertex_from_reference_graph_node(reference_graph_node):
return tlz.merge(
{"name": reference_graph_node["path"]},
pick_reference_graph_node_keys(reference_graph_node)
)
def references_graph_to_igraph(references_graph):
"""
Converts result of exportReferencesGraph into an igraph directed graph.
Uses paths as igraph node names, and sets closureSize and narSize as
properties of igraph nodes.
"""
debug('references_graph', references_graph)
references_graph = sorted(references_graph, key=lambda x: 1 * x["narSize"])
# Short circuit since DictList throws an error if first argument (vertices)
# contains no elements.
# The error is: KeyError: 'name'
# here: https://github.com/igraph/python-igraph/blob/da7484807f5152a2c18c55dd4154653de2c7f5f7/src/igraph/__init__.py#L3091 # noqa: E501
# This looks like a bug.
if len(references_graph) == 0:
return empty_directed_graph()
path_to_size_dict = {
node["path"]: node["narSize"] for node in references_graph
}
debug('path_to_size_dict', path_to_size_dict)
return igraph.Graph.DictList(
map(vertex_from_reference_graph_node, references_graph),
unnest_iterable(map(
egdes_for_reference_graph_node(path_to_size_dict),
references_graph
)),
directed=True
)
@curry
def graph_vertex_index_to_name(graph, index):
return graph.vs[index]["name"]
def igraph_to_reference_graph(igraph_instance):
return [
tlz.merge(
{
"path": v["name"],
"references": list(map(
graph_vertex_index_to_name(igraph_instance),
igraph_instance.successors(v.index)
))
},
pick_reference_graph_node_keys(v.attributes())
)
for v in igraph_instance.vs
]
def load_closure_graph(file_path):
return references_graph_to_igraph(load_json(file_path))
def path_relative_to_file(file_path_from, file_path):
dir_path = Path(file_path_from).parent
return dir_path / file_path
def is_None(x):
return x is None
def not_None(x):
return x is not None
def print_layers(layers):
debug("\n::::LAYERS:::::")
for index, layer in enumerate(layers):
debug("")
debug("layer index:", index)
debug("[")
for v in layer.vs["name"]:
debug(" ", v)
debug("]")
def print_vs(graph):
for v in graph.vs:
debug(v)
def directed_graph(edges, vertices=None, vertex_attrs=[]):
graph = igraph.Graph.TupleList(edges, directed=True)
# Add detached vertices (without edges) if any.
if vertices is not None:
graph = graph + vertices
# Add vertex attributes if any.
for (name, attrs_dict) in vertex_attrs:
vertex = graph.vs.find(name)
for (k, v) in attrs_dict.items():
vertex[k] = v
return graph
def empty_directed_graph():
return directed_graph([])
def graph_is_empty(graph):
return len(graph.vs) == 0
def pick_attrs(attrs, x):
return {attr: getattr(x, attr) for attr in attrs}
def merge_graphs(graphs):
return tlz.reduce(lambda acc, g: acc + g, graphs, empty_directed_graph())
# Functions below can be used in user defined pipeline (see pipe.py).
# All functions need to be curried, and the user needs to be able to
# provide values for all arguments apart from the last one from nix code.
@curry
def over(prop_name, func, dictionary):
value = dictionary[prop_name]
return tlz.assoc(dictionary, prop_name, func(value))
# One argument functions also need to be curried to simplify processing of the
# pipeline.
@curry
def flatten(xs):
xs = xs.values() if isinstance(xs, dict) else xs
for x in xs:
if isinstance(x, Iterable) and not isinstance(x, (str, bytes)):
yield from flatten(x)
else:
yield x
@curry
def split_every(count, graph):
vs = graph.vs
return [
graph.induced_subgraph(vs[x:x + count])
for x in range(0, len(vs), count)
]
@curry
def limit_layers(max_count, graphs):
assert max_count > 0, "max count needs to > 0"
graphs_iterator = iter(graphs)
return tlz.concat([
tlz.take(max_count - 1, graphs_iterator),
# Merges all graphs remaining in the iterator, after initial
# max_count - 1 have been taken.
(lambda: (yield merge_graphs(graphs_iterator)))()
])
@curry
def remove_paths(paths, graph):
# Allow passing a single path.
if isinstance(paths, str):
paths = [paths]
indices_to_remove = tlz.compose(
list,
tlz.map(lambda v: v.index),
tlz.remove(is_None),
tlz.map(find_vertex_by_name_or_none(graph))
)(paths)
return graph - indices_to_remove if len(indices_to_remove) > 0 else graph
@curry
def reverse(iterator):
return reversed(list(iterator))

View File

@ -0,0 +1,199 @@
import unittest
from toolz import curried as tlz
from . import test_helpers as th
from .lib import (
directed_graph,
igraph_to_reference_graph,
limit_layers,
pick_keys,
references_graph_to_igraph,
reference_graph_node_keys_to_keep
)
if __name__ == "__main__":
unittest.main()
references_graph = [
{
"closureSize": 3,
"narHash": "sha256:d",
"narSize": 0,
"path": "D",
"references": [
"D"
]
},
{
"closureSize": 3,
"narHash": "sha256:b",
"narSize": 4,
"path": "B",
"references": [
"B"
]
},
{
"closureSize": 3,
"narHash": "sha256:e",
"narSize": 5,
"path": "E",
"references": [
"E"
]
},
{
"closureSize": 1,
"narHash": "sha256:a",
"narSize": 10,
"path": "A",
"references": [
# most of the time references contain self path, but not always.
"C",
"B",
]
},
{
"closureSize": 5,
"narHash": "sha256:c",
"narSize": 6,
"path": "C",
"references": [
"C",
"E",
"D"
]
},
{
"closureSize": 5,
"narHash": "sha256:f",
"narSize": 2,
"path": "F",
"references": [
"F"
]
}
]
class TestLib(unittest.TestCase, th.CustomAssertions):
def test_references_graph_to_igraph(self):
graph = references_graph_to_igraph(references_graph)
pick_preserved_keys = pick_keys(reference_graph_node_keys_to_keep)
self.assertGraphEqual(
graph,
directed_graph(
[
("A", "B"),
("A", "C"),
("C", "E"),
("C", "D"),
],
["F"],
# Add "narSize" and "closureSize" attributes to each node.
map(
lambda node: (node["path"], pick_preserved_keys(node)),
references_graph
)
)
)
def test_references_graph_to_igraph_one_node(self):
references_graph = [
{
'closureSize': 168,
'narHash': 'sha256:0dl4',
'narSize': 168,
'path': 'A',
'references': []
}
]
graph = references_graph_to_igraph(references_graph)
pick_preserved_keys = pick_keys(reference_graph_node_keys_to_keep)
self.assertGraphEqual(
graph,
directed_graph(
[],
["A"],
# Add "narSize" and "closureSize" attributes to each node.
map(
lambda node: (node["path"], pick_preserved_keys(node)),
references_graph
)
)
)
def test_references_graph_to_igraph_zero_nodes(self):
references_graph = []
graph = references_graph_to_igraph(references_graph)
self.assertGraphEqual(
graph,
directed_graph(
[],
[],
[]
)
)
def test_igraph_to_reference_graph(self):
graph = references_graph_to_igraph(references_graph)
nodes_by_path = {
node["path"]: node for node in references_graph
}
result = igraph_to_reference_graph(graph)
self.assertEqual(
len(result),
len(references_graph)
)
pick_preserved_keys = pick_keys([
"path",
*reference_graph_node_keys_to_keep
])
for node in result:
original_node = nodes_by_path[node["path"]]
self.assertDictEqual(
pick_preserved_keys(original_node),
pick_preserved_keys(node)
)
revove_self_ref = tlz.remove(lambda a: a == node["path"])
self.assertListEqual(
sorted(node["references"]),
sorted(revove_self_ref(original_node["references"]))
)
def test_limit_layers_nothing_to_do(self):
graph = references_graph_to_igraph(references_graph)
layers = [graph]
result = limit_layers(1, layers)
result_list = list(result)
self.assertEqual(
len(result_list),
1
)
self.assertGraphEqual(graph, result_list[0])

View File

@ -0,0 +1,80 @@
from toolz import curried as tlz
from toolz import curry
from . import lib as lib
from . import subcomponent as subcomponent
from .popularity_contest import popularity_contest
from .split_paths import split_paths
from .lib import (
# references_graph_to_igraph
debug,
pick_attrs
)
funcs = tlz.merge(
pick_attrs(
[
"flatten",
"over",
"split_every",
"limit_layers",
"remove_paths",
"reverse"
],
lib
),
pick_attrs(
[
"subcomponent_in",
"subcomponent_out",
],
subcomponent
),
{
"split_paths": split_paths,
"popularity_contest": popularity_contest,
"map": tlz.map
}
)
@curry
def nth_or_none(index, xs):
try:
return xs[index]
except IndexError:
return None
def preapply_func(func_call_data):
[func_name, *args] = func_call_data
debug("func_name", func_name)
debug("args", args)
debug('func_name in ["over"]', func_name in ["over"])
# TODO: these could be handled in more generic way by defining, for each
# function, which of the args are expected to be functions which need
# pre-applying.
if func_name == "over":
[first_arg, second_arg] = args
args = [first_arg, preapply_func(second_arg)]
elif func_name == "map":
args = [preapply_func(args[0])]
return funcs[func_name](*args)
@curry
def pipe(pipeline, data):
debug("pipeline", pipeline)
partial_funcs = list(tlz.map(preapply_func, pipeline))
debug('partial_funcs', partial_funcs)
return tlz.pipe(
data,
*partial_funcs
)
funcs["pipe"] = pipe

View File

@ -0,0 +1,153 @@
import unittest
from .pipe import pipe
from . import test_helpers as th
from .lib import (
directed_graph,
)
if __name__ == "__main__":
unittest.main()
def make_test_graph():
edges = [
("Root1", "A"),
("A", "B"),
("A", "C"),
("B", "D"),
("B", "E"),
("E", "F"),
("B", "G"),
("Root2", "B"),
("Root3", "C"),
]
return directed_graph(edges)
class CustomAssertions:
def runAndAssertResult(self, graph, pipeline, expected_graph_args):
result = list(pipe(pipeline, graph))
for (index, expected_graph_arg) in enumerate(expected_graph_args):
self.assertGraphEqual(
directed_graph(*expected_graph_arg),
result[index]
)
if __name__ == "__main__":
unittest.main()
class Test(
unittest.TestCase,
CustomAssertions,
th.CustomAssertions
):
def test_1(self):
pipeline = [
["split_paths", ["B"]],
[
"over",
"main",
[
"pipe",
[
["subcomponent_in", ["B"]],
[
"over",
"rest",
["popularity_contest"]
]
]
]
],
["flatten"],
["map", ["remove_paths", "Root3"]],
["limit_layers", 5],
]
expected_graph_args = [
# "B"" separated from the rest by "split_paths" and
# "subcomponent_in' stages.
([], ["B"]),
# Deps of "B", split into individual layers by "popularity_contest",
# with "F" being most popular
([], ["F"]),
([], ["D"]),
([], ["E"]),
# "rest" output of "split_paths" stage with "G" merged into it by
# "limit_layers" stage.
(
[
("Root1", "A"),
("A", "C")
],
["Root2", "G"]
)
]
self.runAndAssertResult(
make_test_graph(),
pipeline,
expected_graph_args
)
def test_2(self):
graph = directed_graph(
[
("Root1", "A"),
("A", "B"),
],
["Root2"]
)
self.runAndAssertResult(
graph,
[
["popularity_contest"],
],
[
# Ordered from most to least popular
([], ["B"]),
([], ["A"]),
([], ["Root1"]),
([], ["Root2"])
]
)
self.runAndAssertResult(
graph,
[
["popularity_contest"],
["limit_layers", 3],
],
[
# Most popular first
([], ["B"]),
([], ["A"]),
# Least popular combined
([], ["Root1", "Root2"]),
]
)
self.runAndAssertResult(
graph,
[
["popularity_contest"],
["reverse"],
["limit_layers", 3],
],
[
# Least popular first
([], ["Root2"]),
([], ["Root1"]),
# Most popular first
([], ["A", "B"])
]
)

View File

@ -0,0 +1,398 @@
# Using a simple algorithm, convert the references to a path in to a
# sorted list of dependent paths based on how often they're referenced
# and how deep in the tree they live. Equally-"popular" paths are then
# sorted by name.
#
# The existing writeReferencesToFile prints the paths in a simple
# ascii-based sorting of the paths.
#
# Sorting the paths by graph improves the chances that the difference
# between two builds appear near the end of the list, instead of near
# the beginning. This makes a difference for Nix builds which export a
# closure for another program to consume, if that program implements its
# own level of binary diffing.
#
# For an example, Docker Images. If each store path is a separate layer
# then Docker Images can be very efficiently transfered between systems,
# and we get very good cache reuse between images built with the same
# version of Nixpkgs. However, since Docker only reliably supports a
# small number of layers (42) it is important to pick the individual
# layers carefully. By storing very popular store paths in the first 40
# layers, we improve the chances that the next Docker image will share
# many of those layers.*
#
# Given the dependency tree:
#
# A - B - C - D -\
# \ \ \ \
# \ \ \ \
# \ \ - E ---- F
# \- G
#
# Nodes which have multiple references are duplicated:
#
# A - B - C - D - F
# \ \ \
# \ \ \- E - F
# \ \
# \ \- E - F
# \
# \- G
#
# Each leaf node is now replaced by a counter defaulted to 1:
#
# A - B - C - D - (F:1)
# \ \ \
# \ \ \- E - (F:1)
# \ \
# \ \- E - (F:1)
# \
# \- (G:1)
#
# Then each leaf counter is merged with its parent node, replacing the
# parent node with a counter of 1, and each existing counter being
# incremented by 1. That is to say `- D - (F:1)` becomes `- (D:1, F:2)`:
#
# A - B - C - (D:1, F:2)
# \ \ \
# \ \ \- (E:1, F:2)
# \ \
# \ \- (E:1, F:2)
# \
# \- (G:1)
#
# Then each leaf counter is merged with its parent node again, merging
# any counters, then incrementing each:
#
# A - B - (C:1, D:2, E:2, F:5)
# \ \
# \ \- (E:1, F:2)
# \
# \- (G:1)
#
# And again:
#
# A - (B:1, C:2, D:3, E:4, F:8)
# \
# \- (G:1)
#
# And again:
#
# (A:1, B:2, C:3, D:4, E:5, F:9, G:2)
#
# and then paths have the following "popularity":
#
# A 1
# B 2
# C 3
# D 4
# E 5
# F 9
# G 2
#
# and the popularity contest would result in the paths being printed as:
#
# F
# E
# D
# C
# B
# G
# A
#
# * Note: People who have used a Dockerfile before assume Docker's
# Layers are inherently ordered. However, this is not true -- Docker
# layers are content-addressable and are not explicitly layered until
# they are composed in to an Image.
import igraph as igraph
from collections import defaultdict
from operator import eq
from toolz import curried as tlz
from toolz import curry
from .lib import (
debug,
directed_graph,
igraph_to_reference_graph,
over,
pick_keys,
reference_graph_node_keys_to_keep
)
eq = curry(eq)
pick_keys_to_keep = pick_keys(reference_graph_node_keys_to_keep)
# Find paths in the original dataset which are never referenced by
# any other paths
def find_roots(closures):
debug('closures', closures)
roots = []
for closure in closures:
path = closure['path']
if not any_refer_to(path, closures):
roots.append(path)
return roots
def any_refer_to(path, closures):
for closure in closures:
if path != closure['path']:
if path in closure['references']:
return True
return False
def all_paths(closures):
paths = []
for closure in closures:
paths.append(closure['path'])
paths.extend(closure['references'])
paths.sort()
return list(set(paths))
# Convert:
#
# [
# { path: /nix/store/foo, references: [ /nix/store/foo, /nix/store/bar, /nix/store/baz ] }, # noqa: E501
# { path: /nix/store/bar, references: [ /nix/store/bar, /nix/store/baz ] },
# { path: /nix/store/baz, references: [ /nix/store/baz, /nix/store/tux ] },
# { path: /nix/store/tux, references: [ /nix/store/tux ] }
# ]
#
# To:
# {
# /nix/store/foo: [ /nix/store/bar, /nix/store/baz ],
# /nix/store/bar: [ /nix/store/baz ],
# /nix/store/baz: [ /nix/store/tux ] },
# /nix/store/tux: [ ]
# }
#
# Note that it drops self-references to avoid loops.
def make_lookup(closures):
return {
# remove self reference
node["path"]: over("references", tlz.remove(eq(node["path"])), node)
for node in closures
}
# Convert:
#
# /nix/store/foo with
# {
# /nix/store/foo: [ /nix/store/bar, /nix/store/baz ],
# /nix/store/bar: [ /nix/store/baz ],
# /nix/store/baz: [ /nix/store/tux ] },
# /nix/store/tux: [ ]
# }
#
# To:
#
# {
# /nix/store/bar: {
# /nix/store/baz: {
# /nix/store/tux: {}
# }
# },
# /nix/store/baz: {
# /nix/store/tux: {}
# }
# }
def make_graph_segment_from_root(subgraphs_cache, root, lookup):
children = {}
for ref in lookup[root]:
# make_graph_segment_from_root is a pure function, and will
# always return the same result based on a given input. Thus,
# cache computation.
#
# Python's assignment will use a pointer, preventing memory
# bloat for large graphs.
if ref not in subgraphs_cache:
debug("Subgraph Cache miss on {}".format(ref))
subgraphs_cache[ref] = make_graph_segment_from_root(
subgraphs_cache, ref, lookup
)
else:
debug("Subgraph Cache hit on {}".format(ref))
children[ref] = subgraphs_cache[ref]
return children
# Convert a graph segment in to a popularity-counted dictionary:
#
# From:
# {
# /nix/store/foo: {
# /nix/store/bar: {
# /nix/store/baz: {
# /nix/store/tux: {}
# }
# }
# /nix/store/baz: {
# /nix/store/tux: {}
# }
# }
# }
#
# to:
# [
# /nix/store/foo: 1
# /nix/store/bar: 2
# /nix/store/baz: 4
# /nix/store/tux: 6
# ]
def graph_popularity_contest(popularity_cache, full_graph):
popularity = defaultdict(int)
for path, subgraph in full_graph.items():
popularity[path] += 1
# graph_popularity_contest is a pure function, and will
# always return the same result based on a given input. Thus,
# cache computation.
#
# Python's assignment will use a pointer, preventing memory
# bloat for large graphs.
if path not in popularity_cache:
debug("Popularity Cache miss on", path)
popularity_cache[path] = graph_popularity_contest(
popularity_cache, subgraph
)
else:
debug("Popularity Cache hit on", path)
subcontest = popularity_cache[path]
for subpath, subpopularity in subcontest.items():
debug("Calculating popularity for", subpath)
popularity[subpath] += subpopularity + 1
return popularity
# Emit a list of packages by popularity, most first:
#
# From:
# [
# /nix/store/foo: 1
# /nix/store/bar: 1
# /nix/store/baz: 2
# /nix/store/tux: 2
# ]
#
# To:
# [ /nix/store/baz /nix/store/tux /nix/store/bar /nix/store/foo ]
def order_by_popularity(paths):
paths_by_popularity = defaultdict(list)
popularities = []
for path, popularity in paths.items():
popularities.append(popularity)
paths_by_popularity[popularity].append(path)
popularities = sorted(set(popularities))
flat_ordered = []
for popularity in popularities:
paths = paths_by_popularity[popularity]
paths.sort(key=package_name)
flat_ordered.extend(reversed(paths))
return list(reversed(flat_ordered))
def package_name(path):
parts = path.split('-')
start = parts.pop(0)
# don't throw away any data, so the order is always the same.
# even in cases where only the hash at the start has changed.
parts.append(start)
return '-'.join(parts)
@curry
def popularity_contest(graph):
# Data comes in as an igraph directed graph or in the format produced
# by nix's exportReferencesGraph:
# [
# { path: /nix/store/foo, references: [ /nix/store/foo, /nix/store/bar, /nix/store/baz ] }, # noqa: E501
# { path: /nix/store/bar, references: [ /nix/store/bar, /nix/store/baz ] }, # noqa: E501
# { path: /nix/store/baz, references: [ /nix/store/baz, /nix/store/tux ] }, # noqa: E501
# { path: /nix/store/tux, references: [ /nix/store/tux ] }
# ]
#
# We want to get out a list of paths ordered by how universally,
# important they are, ie: tux is referenced by every path, transitively
# so it should be #1
#
# [
# /nix/store/tux,
# /nix/store/baz,
# /nix/store/bar,
# /nix/store/foo,
# ]
#
# NOTE: the output is actually a list of igraph graphs with a single vertex
# with v["name"] == path, and some properties (defined in
# reference_graph_node_keys_to_keep) from the nodes of the input graph
# copied as vertex attributes.
debug('graph', graph)
if isinstance(graph, igraph.Graph):
graph = igraph_to_reference_graph(graph)
debug("Finding roots")
roots = find_roots(graph)
debug("Making lookup")
lookup = make_lookup(graph)
full_graph = {}
subgraphs_cache = {}
for root in roots:
debug("Making full graph for", root)
full_graph[root] = make_graph_segment_from_root(
subgraphs_cache,
root,
tlz.valmap(
tlz.get("references"),
lookup
)
)
debug("Running contest")
contest = graph_popularity_contest({}, full_graph)
debug("Ordering by popularity")
ordered = order_by_popularity(contest)
debug("Checking for missing paths")
missing = []
for path in all_paths(graph):
if path not in ordered:
missing.append(path)
ordered.extend(missing)
return map(
# Turn each path into a graph with 1 vertex.
lambda path: directed_graph(
# No edges
[],
# One vertex, with name=path
[path],
# Setting desired attributes on the vertex.
[(path, pick_keys_to_keep(lookup[path]))]
),
ordered
)

View File

@ -0,0 +1,335 @@
import unittest
from toolz import curry
from toolz import curried as tlz
from . import test_helpers as th
from .popularity_contest import (
all_paths,
any_refer_to,
find_roots,
graph_popularity_contest,
make_graph_segment_from_root,
make_lookup,
popularity_contest,
order_by_popularity
)
from .lib import (
directed_graph,
igraph_to_reference_graph,
over
)
if __name__ == "__main__":
unittest.main()
class CustomAssertions:
@curry
def assertResultKeys(self, keys, result):
self.assertListEqual(
list(result.keys()),
keys
)
return result
class Test(
unittest.TestCase,
CustomAssertions,
th.CustomAssertions
):
def test_empty_graph(self):
def test_empty(graph):
self.assertListEqual(
list(popularity_contest(graph)),
[]
)
# popularity_contest works with igraph graph or refurence_graph in
# form a list of dicts (as returned by nix's exportReferencesGraph)
test_empty(directed_graph([]))
test_empty([])
def test_popularity_contest(self):
# Making sure vertex attrs are preserved.
vertex_props_dict = {
"Root1": {"narSize": 1, "closureSize": 2},
"B": {"narSize": 3, "closureSize": 4},
"X": {"narSize": 5, "closureSize": 6},
}
edges = [
("Root1", "A"),
("A", "B"),
("A", "D"),
("D", "E"),
("B", "D"),
("B", "F"),
("Root2", "B"),
("Root3", "C")
]
detached_vertices = ["X"]
vertex_props = vertex_props_dict.items()
def test(graph):
result = list(popularity_contest(graph))
expected_paths = [
'E',
'D',
'F',
'B',
'A',
'C',
'Root1',
'Root2',
'Root3',
'X'
]
self.assertEqual(
len(result),
len(expected_paths)
)
for (index, path) in enumerate(expected_paths):
path_props = vertex_props_dict.get(path) or {}
self.assertGraphEqual(
result[index],
directed_graph([], [path], [(path, path_props)])
)
graph = directed_graph(edges, detached_vertices, vertex_props)
test(graph)
test(igraph_to_reference_graph(graph))
class TestFindRoots(unittest.TestCase):
def test_find_roots(self):
self.assertCountEqual(
find_roots([
{
"path": "/nix/store/foo",
"references": [
"/nix/store/foo",
"/nix/store/bar"
]
},
{
"path": "/nix/store/bar",
"references": [
"/nix/store/bar",
"/nix/store/tux"
]
},
{
"path": "/nix/store/hello",
"references": [
]
}
]),
["/nix/store/foo", "/nix/store/hello"]
)
class TestAnyReferTo(unittest.TestCase):
def test_has_references(self):
self.assertTrue(
any_refer_to(
"/nix/store/bar",
[
{
"path": "/nix/store/foo",
"references": [
"/nix/store/bar"
]
},
]
),
)
def test_no_references(self):
self.assertFalse(
any_refer_to(
"/nix/store/foo",
[
{
"path": "/nix/store/foo",
"references": [
"/nix/store/foo",
"/nix/store/bar"
]
},
]
),
)
class TestAllPaths(unittest.TestCase):
def test_returns_all_paths(self):
self.assertCountEqual(
all_paths([
{
"path": "/nix/store/foo",
"references": [
"/nix/store/foo",
"/nix/store/bar"
]
},
{
"path": "/nix/store/bar",
"references": [
"/nix/store/bar",
"/nix/store/tux"
]
},
{
"path": "/nix/store/hello",
"references": [
]
}
]),
["/nix/store/foo", "/nix/store/bar",
"/nix/store/hello", "/nix/store/tux", ]
)
def test_no_references(self):
self.assertFalse(
any_refer_to(
"/nix/store/foo",
[
{
"path": "/nix/store/foo",
"references": [
"/nix/store/foo",
"/nix/store/bar"
]
},
]
),
)
class TestMakeLookup(unittest.TestCase):
def test_returns_lookp(self):
self.assertDictEqual(
# "references" in the result are iterators so we need
# to convert them to a list before asserting.
tlz.valmap(over("references", list), make_lookup([
{
"path": "/nix/store/foo",
"references": [
"/nix/store/foo",
"/nix/store/bar",
"/nix/store/hello"
]
},
{
"path": "/nix/store/bar",
"references": [
"/nix/store/bar",
"/nix/store/tux"
]
},
{
"path": "/nix/store/hello",
"references": [
]
}
])),
{
"/nix/store/foo": {
"path": "/nix/store/foo",
"references": [
"/nix/store/bar",
"/nix/store/hello"
]
},
"/nix/store/bar": {
"path": "/nix/store/bar",
"references": [
"/nix/store/tux"
]
},
"/nix/store/hello": {
"path": "/nix/store/hello",
"references": [
]
}
}
)
class TestMakeGraphSegmentFromRoot(unittest.TestCase):
def test_returns_graph(self):
self.assertDictEqual(
make_graph_segment_from_root({}, "/nix/store/foo", {
"/nix/store/foo": ["/nix/store/bar"],
"/nix/store/bar": ["/nix/store/tux"],
"/nix/store/tux": [],
"/nix/store/hello": [],
}),
{
"/nix/store/bar": {
"/nix/store/tux": {}
}
}
)
def test_returns_graph_tiny(self):
self.assertDictEqual(
make_graph_segment_from_root({}, "/nix/store/tux", {
"/nix/store/foo": ["/nix/store/bar"],
"/nix/store/bar": ["/nix/store/tux"],
"/nix/store/tux": [],
}),
{}
)
class TestGraphPopularityContest(unittest.TestCase):
def test_counts_popularity(self):
self.assertDictEqual(
graph_popularity_contest({}, {
"/nix/store/foo": {
"/nix/store/bar": {
"/nix/store/baz": {
"/nix/store/tux": {}
}
},
"/nix/store/baz": {
"/nix/store/tux": {}
}
}
}),
{
"/nix/store/foo": 1,
"/nix/store/bar": 2,
"/nix/store/baz": 4,
"/nix/store/tux": 6,
}
)
class TestOrderByPopularity(unittest.TestCase):
def test_returns_in_order(self):
self.assertEqual(
order_by_popularity({
"/nix/store/foo": 1,
"/nix/store/bar": 1,
"/nix/store/baz": 2,
"/nix/store/tux": 2,
}),
[
"/nix/store/baz",
"/nix/store/tux",
"/nix/store/bar",
"/nix/store/foo"
]
)

View File

@ -0,0 +1,227 @@
from toolz import curried as tlz
from toolz import curry
from .lib import (
debug,
debug_plot,
DEBUG_PLOT,
find_vertex_by_name_or_none,
graph_is_empty,
is_None,
subcomponent_multi,
unnest_iterable
)
@curry
def coerce_to_singly_rooted_graph(fake_root_name, graph):
"""Add single root to the graph connected to all existing roots.
If graph has only one root, return the graph unchanged and the name
of the root vertex.
Otherwise return a modified graph (copy) and a name of the added root
vertex.
"""
roots = graph.vs.select(lambda v: len(graph.predecessors(v)) == 0)
root_names = roots["name"]
if len(root_names) == 1:
return graph, root_names[0]
else:
edges = [(fake_root_name, v) for v in root_names]
graph_with_root = graph + fake_root_name + edges
return graph_with_root, fake_root_name
@curry
def remove_vertex(vertex_name, graph):
"""Remove vertex with given name, returning copy of input graph if vertex
with given name is found in the graph
"""
vertex = find_vertex_by_name_or_none(graph)(vertex_name)
return graph - vertex_name if vertex else graph
def get_children_of(graph, vertex_names):
return unnest_iterable(map(
graph.successors,
tlz.remove(
is_None,
map(
find_vertex_by_name_or_none(graph),
vertex_names
)
)
))
def as_list(x):
return x if isinstance(x, list) else [x]
@curry
def split_path_spec_to_indices(graph, split_path_spec):
debug("split_path_spec", split_path_spec)
if isinstance(split_path_spec, dict):
if "children_of" in split_path_spec:
children_of = split_path_spec["children_of"]
return get_children_of(graph, as_list(children_of))
else:
raise Exception(
"Unexpected split path spec: dict with invalid keys."
"Valid: [\"children_of\"]"
)
else:
vertex = find_vertex_by_name_or_none(graph)(split_path_spec)
return [] if is_None(vertex) else [vertex.index]
call_count = 0
@curry
def split_paths(split_paths, graph_in):
debug("____")
debug("split_paths:", split_paths)
debug("graph_in:", graph_in)
if DEBUG_PLOT:
global call_count
graph_name_prefix = f"split_paths_{call_count}_"
call_count += 1
# Convert list of split_paths into list of vertex indices. Ignores
# split_paths which don"t match any vertices in the graph.
# All edges pointing at the indices will be deleted from the graph.
split_path_indices = list(unnest_iterable(map(
split_path_spec_to_indices(graph_in),
split_paths
)))
debug("split_path_indices:", split_path_indices)
# Short circuit if there is nothing to do (split_paths didn"t match any
# vertices in the graph).
if len(split_path_indices) == 0:
if DEBUG_PLOT:
layout = graph_in.layout('tree')
debug_plot(graph_in, f"{graph_name_prefix}input", layout=layout)
debug_plot(graph_in, f"{graph_name_prefix}result", layout=layout)
return {"rest": graph_in}
# If graph has multiple roots, add a single one connecting all existing
# roots to make it easy to split the graph into 2 sets of vertices after
# deleting edges pointing at split_path_indices.
fake_root_name = "__root__"
graph, root_name = coerce_to_singly_rooted_graph(fake_root_name, graph_in)
debug("root_name", root_name)
if (
find_vertex_by_name_or_none(graph)(root_name).index
in split_path_indices
):
if DEBUG_PLOT:
layout = graph_in.layout('tree')
debug_plot(graph_in, f"{graph_name_prefix}input", layout=layout)
debug_plot(
graph_in,
f"{graph_name_prefix}result",
layout=layout,
vertex_color="green"
)
return {"main": graph_in}
# Copy graph if coerce_to_singly_rooted_graph has not already created
# a copy, since we are going to mutate the graph and don"t want to
# mutate a function argument.
graph = graph if graph is not graph_in else graph.copy()
if DEBUG_PLOT:
layout = graph.layout('tree')
debug_plot(graph, f"{graph_name_prefix}input", layout=layout)
# Get incidences of all vertices which can be reached split_path_indices
# (including split_path_indices). This is a set of all split_paths and their
# dependencies.
split_off_vertex_indices = frozenset(
subcomponent_multi(graph, split_path_indices))
debug("split_off_vertex_indices", split_off_vertex_indices)
# Delete edges which point at any of the vertices in split_path_indices.
graph.delete_edges(_target_in=split_path_indices)
if DEBUG_PLOT:
debug_plot(graph, f"{graph_name_prefix}deleted_edges", layout=layout)
# Get incidences of all vertices which can be reached from the root. Since
# edges pointing at split_path_indices have been deleted, none of the
# split_path_indices will be included. Dependencies of rest_with_common will
# only be included if they can be reached from any vertex which is itself
# not in split_off_vertex_indices.
rest_with_common = frozenset(graph.subcomponent(root_name, mode="out"))
debug("rest_with_common", rest_with_common)
# Get a set of all dependencies common to split_path_indices and the rest
# of the graph.
common = split_off_vertex_indices.intersection(rest_with_common)
debug("common", common)
# Get a set of vertices which cannot be reached from split_path_indices.
rest_without_common = rest_with_common.difference(common)
debug("rest_without_common", rest_without_common)
# Get a set of split_path_indices and their dependencies which cannot be
# reached from the rest of the graph.
split_off_without_common = split_off_vertex_indices.difference(common)
debug("split_off_without_common", split_off_without_common)
if DEBUG_PLOT:
def choose_color(index):
if (index in split_off_without_common):
return "green"
elif (index in rest_without_common):
return "red"
else:
return "purple"
vertex_color = [choose_color(v.index) for v in graph.vs]
debug_plot(
graph,
f"{graph_name_prefix}result",
layout=layout,
vertex_color=vertex_color
)
# Return subgraphs based on calculated sets of vertices.
result_keys = ["main", "common", "rest"]
result_values = [
# Split paths and their deps (unreachable from rest of the graph).
graph.induced_subgraph(split_off_without_common),
# Dependencies of split paths which can be reached from the rest of the
# graph.
graph.induced_subgraph(common),
# Rest of the graph (without dependencies common with split paths).
graph.induced_subgraph(rest_without_common),
]
debug('result_values', result_values[0].vs["name"])
return tlz.valfilter(
tlz.complement(graph_is_empty),
dict(zip(
result_keys,
(
result_values if root_name != fake_root_name
# If root was added, remove it
else tlz.map(remove_vertex(fake_root_name), result_values)
)
))
)

View File

@ -0,0 +1,184 @@
import unittest
from toolz import curry
from . import test_helpers as th
from .split_paths import (
split_paths
)
from .lib import (
directed_graph,
pick_keys
)
if __name__ == "__main__":
unittest.main()
# Making sure vertex attrs are preserved.
vertex_props_dict = {
"Root1": {"a": 1, "b": 1},
"B": {"b": 2},
"X": {"x": 3}
}
def make_test_graph():
edges = [
("Root1", "A"),
("A", "B"),
("A", "D"),
("D", "E"),
("B", "D"),
("B", "F"),
("Root2", "B"),
("Root3", "C")
]
detached_vertices = ["X"]
vertex_props = vertex_props_dict.items()
return directed_graph(edges, detached_vertices, vertex_props)
class CustomAssertions:
@curry
def assertResultKeys(self, keys, result):
self.assertListEqual(
list(result.keys()),
keys
)
return result
class Test(
unittest.TestCase,
CustomAssertions,
th.CustomAssertions
):
def test_empty_paths(self):
input_graph = make_test_graph()
result = self.assertResultKeys(
["rest"],
split_paths([], input_graph)
)
self.assertGraphEqual(
result["rest"],
input_graph
)
def test_empty_graph(self):
empty_graph = directed_graph([])
def test_empty(paths):
result = self.assertResultKeys(
["rest"],
split_paths(paths, empty_graph)
)
self.assertGraphEqual(
result["rest"],
empty_graph
)
test_empty([])
test_empty(["B"])
def test_split_paths_single(self):
result = self.assertResultKeys(
["main", "common", "rest"],
split_paths(["B"], make_test_graph())
)
self.assertGraphEqual(
result["main"],
directed_graph(
[
("B", "F")
],
None,
pick_keys(["B"], vertex_props_dict).items()
)
)
self.assertGraphEqual(
result["rest"],
directed_graph(
[
("Root1", "A"),
("Root3", "C")
],
["Root2", "X"],
pick_keys(["Root1", "X"], vertex_props_dict).items()
)
)
self.assertGraphEqual(
result["common"],
directed_graph([("D", "E")])
)
def test_split_paths_multi(self):
result = self.assertResultKeys(
["main", "common", "rest"],
split_paths(["B", "Root3"], make_test_graph())
)
self.assertGraphEqual(
result["main"],
directed_graph(
[
("B", "F"),
("Root3", "C")
],
None,
pick_keys(["B"], vertex_props_dict).items()
)
)
self.assertGraphEqual(
result["rest"],
directed_graph(
[("Root1", "A")],
["Root2", "X"],
pick_keys(["Root1", "X"], vertex_props_dict).items()
)
)
self.assertGraphEqual(
result["common"],
directed_graph([("D", "E")])
)
def test_split_no_common(self):
result = self.assertResultKeys(
["main", "rest"],
split_paths(["D"], make_test_graph())
)
self.assertGraphEqual(
result["main"],
directed_graph([("D", "E")])
)
self.assertGraphEqual(
result["rest"],
directed_graph(
[
("Root1", "A"),
("A", "B"),
("B", "F"),
("Root2", "B"),
("Root3", "C"),
],
["X"],
pick_keys(["Root1", "B", "X"], vertex_props_dict).items()
)
)

View File

@ -0,0 +1,67 @@
from toolz import curry
from toolz import curried as tlz
from operator import attrgetter
from .lib import (
debug,
debug_plot,
DEBUG_PLOT,
find_vertex_by_name_or_none,
is_None,
subcomponent_multi
)
call_counts = {
"in": 0,
"out": 0
}
@curry
def subcomponent(mode, paths, graph):
if DEBUG_PLOT:
global call_counts
graph_name_prefix = f"subcomponent_{mode}_{call_counts[mode]}_"
call_counts[mode] += 1
layout = graph.layout('tree')
debug_plot(graph, f"{graph_name_prefix}input", layout=layout)
path_indices = tlz.compose(
tlz.map(attrgetter('index')),
tlz.remove(is_None),
tlz.map(find_vertex_by_name_or_none(graph))
)(paths)
debug("path_indices", path_indices)
main_indices = list(subcomponent_multi(graph, path_indices, mode))
debug('main_indices', main_indices)
if DEBUG_PLOT:
def choose_color(index):
if (index in main_indices):
return "green"
else:
return "red"
vertex_color = [choose_color(v.index) for v in graph.vs]
debug_plot(
graph,
f"{graph_name_prefix}result",
layout=layout,
vertex_color=vertex_color
)
return {
"main": graph.induced_subgraph(main_indices),
"rest": graph - main_indices
}
subcomponent_in = subcomponent("in")
subcomponent_out = subcomponent("out")

View File

@ -0,0 +1,219 @@
import unittest
from . import test_helpers as th
from .subcomponent import (
subcomponent_out,
subcomponent_in
)
from .lib import (
pick_keys,
directed_graph,
empty_directed_graph
)
if __name__ == "__main__":
unittest.main()
# Making sure vertex attrs are preserved.
vertex_props_dict = {
"Root1": {"a": 1, "b": 1},
"B": {"b": 2},
"X": {"x": 3}
}
def make_test_graph():
edges = [
("Root1", "A"),
("A", "B"),
("A", "C"),
("B", "D"),
("B", "E"),
("Root2", "B"),
("Root3", "C"),
]
detached_vertices = ["X"]
vertex_props = vertex_props_dict.items()
return directed_graph(edges, detached_vertices, vertex_props)
class CustomAssertions:
def assertResultKeys(self, result):
self.assertListEqual(
list(result.keys()),
["main", "rest"]
)
return result
class Test(
unittest.TestCase,
CustomAssertions,
th.CustomAssertions
):
def test_empty_paths(self):
def test(func):
input_graph = make_test_graph()
result = self.assertResultKeys(
func([], input_graph)
)
self.assertGraphEqual(
result["main"],
empty_directed_graph()
)
self.assertGraphEqual(
result["rest"],
input_graph
)
test(subcomponent_out)
test(subcomponent_in)
def test_empty_graph(self):
def test(func):
empty_graph = empty_directed_graph()
def test_empty(paths):
result = self.assertResultKeys(
func(paths, empty_graph)
)
self.assertGraphEqual(
result["main"],
empty_graph
)
self.assertGraphEqual(
result["rest"],
empty_graph
)
test_empty([])
test_empty(["B"])
test(subcomponent_out)
test(subcomponent_in)
def test_subcomponent_out(self):
result = self.assertResultKeys(
subcomponent_out(["B"], make_test_graph())
)
self.assertGraphEqual(
result["main"],
directed_graph(
[
("B", "D"),
("B", "E")
],
None,
pick_keys(["B"], vertex_props_dict).items()
)
)
self.assertGraphEqual(
result["rest"],
directed_graph(
[
("Root1", "A"),
("A", "C"),
("Root3", "C")
],
["Root2", "X"],
pick_keys(["Root1", "X"], vertex_props_dict).items()
)
)
def test_subcomponent_out_multi(self):
result = self.assertResultKeys(
subcomponent_out(["B", "Root3"], make_test_graph())
)
self.assertGraphEqual(
result["main"],
directed_graph(
[
("B", "D"),
("B", "E"),
("Root3", "C")
],
None,
pick_keys(["B"], vertex_props_dict).items()
)
)
self.assertGraphEqual(
result["rest"],
directed_graph(
[("Root1", "A")],
["Root2", "X"],
pick_keys(["Root1", "X"], vertex_props_dict).items()
)
)
def test_subcomponent_in(self):
result = self.assertResultKeys(
subcomponent_in(["B"], make_test_graph())
)
self.assertGraphEqual(
result["main"],
directed_graph(
[
("Root1", "A"),
("A", "B"),
("Root2", "B")
],
None,
pick_keys(["Root1", "B"], vertex_props_dict).items()
)
)
self.assertGraphEqual(
result["rest"],
directed_graph(
[("Root3", "C")],
["D", "E", "X"],
pick_keys(["X"], vertex_props_dict).items()
)
)
def test_subcomponent_in_multi(self):
result = self.assertResultKeys(
subcomponent_in(["B", "Root3"], make_test_graph())
)
self.assertGraphEqual(
result["main"],
directed_graph(
[
("Root1", "A"),
("A", "B"),
("Root2", "B"),
],
["Root3"],
pick_keys(["Root1", "B"], vertex_props_dict).items()
)
)
self.assertGraphEqual(
result["rest"],
directed_graph(
[],
["C", "D", "E", "X"],
pick_keys(["X"], vertex_props_dict).items()
)
)

View File

@ -0,0 +1,37 @@
from toolz import curried as tlz
from .lib import (
not_None,
graph_vertex_index_to_name
)
def edges_as_set(graph):
return frozenset(
(
graph_vertex_index_to_name(graph, e.source),
graph_vertex_index_to_name(graph, e.target)
) for e in graph.es
)
class CustomAssertions:
def assertGraphEqual(self, g1, g2):
self.assertSetEqual(
frozenset(g1.vs["name"]),
frozenset(g2.vs["name"])
)
self.assertSetEqual(
edges_as_set(g1),
edges_as_set(g2)
)
for name in g1.vs["name"]:
def get_vertex_attrs(g):
return tlz.valfilter(not_None, g.vs.find(name).attributes())
self.assertDictEqual(
get_vertex_attrs(g1),
get_vertex_attrs(g2),
)

View File

@ -0,0 +1,17 @@
from setuptools import setup
setup(
name="flatten_references_graph",
version="0.1.0",
author="Adrian Gierakowski",
packages=["flatten_references_graph"],
install_requires=[
"igraph",
"toolz"
],
entry_points={
"console_scripts": [
"flatten_references_graph=flatten_references_graph.__main__:main"
]
}
)

View File

@ -816,6 +816,8 @@ with pkgs;
referencesByPopularity = callPackage ../build-support/references-by-popularity { };
dockerMakeLayers = callPackage ../build-support/docker/make-layers.nix { };
removeReferencesTo = callPackage ../build-support/remove-references-to {
inherit (darwin) signingUtils;
};