This repository has been archived on 2023-03-25. You can view files and clone it, but cannot push or open issues or pull requests.
mightyscape-1.1-deprecated/extensions/networkx/algorithms/flow/tests/test_maxflow_large_graph.py
2020-07-30 01:16:18 +02:00

153 lines
4.9 KiB
Python

# -*- coding: utf-8 -*-
"""Maximum flow algorithms test suite on large graphs.
"""
__author__ = """Loïc Séguin-C. <loicseguin@gmail.com>"""
# Copyright (C) 2010 Loïc Séguin-C. <loicseguin@gmail.com>
# All rights reserved.
# BSD license.
import os
import networkx as nx
from networkx.algorithms.flow import build_flow_dict, build_residual_network
from networkx.algorithms.flow import boykov_kolmogorov
from networkx.algorithms.flow import dinitz
from networkx.algorithms.flow import edmonds_karp
from networkx.algorithms.flow import preflow_push
from networkx.algorithms.flow import shortest_augmenting_path
from networkx.testing import almost_equal
flow_funcs = [
boykov_kolmogorov,
dinitz,
edmonds_karp,
preflow_push,
shortest_augmenting_path,
]
msg = "Assertion failed in function: {0}"
def gen_pyramid(N):
# This graph admits a flow of value 1 for which every arc is at
# capacity (except the arcs incident to the sink which have
# infinite capacity).
G = nx.DiGraph()
for i in range(N - 1):
cap = 1. / (i + 2)
for j in range(i + 1):
G.add_edge((i, j), (i + 1, j),
capacity=cap)
cap = 1. / (i + 1) - cap
G.add_edge((i, j), (i + 1, j + 1),
capacity=cap)
cap = 1. / (i + 2) - cap
for j in range(N):
G.add_edge((N - 1, j), 't')
return G
def read_graph(name):
dirname = os.path.dirname(__file__)
path = os.path.join(dirname, name + '.gpickle.bz2')
return nx.read_gpickle(path)
def validate_flows(G, s, t, soln_value, R, flow_func):
flow_value = R.graph['flow_value']
flow_dict = build_flow_dict(G, R)
assert soln_value == flow_value, msg.format(flow_func.__name__)
assert set(G) == set(flow_dict), msg.format(flow_func.__name__)
for u in G:
assert set(G[u]) == set(flow_dict[u]), msg.format(flow_func.__name__)
excess = {u: 0 for u in flow_dict}
for u in flow_dict:
for v, flow in flow_dict[u].items():
assert flow <= G[u][v].get('capacity', float('inf')), msg.format(flow_func.__name__)
assert flow >= 0, msg.format(flow_func.__name__)
excess[u] -= flow
excess[v] += flow
for u, exc in excess.items():
if u == s:
assert exc == -soln_value, msg.format(flow_func.__name__)
elif u == t:
assert exc ==soln_value, msg.format(flow_func.__name__)
else:
assert exc == 0, msg.format(flow_func.__name__)
class TestMaxflowLargeGraph:
def test_complete_graph(self):
N = 50
G = nx.complete_graph(N)
nx.set_edge_attributes(G, 5, 'capacity')
R = build_residual_network(G, 'capacity')
kwargs = dict(residual=R)
for flow_func in flow_funcs:
kwargs['flow_func'] = flow_func
flow_value = nx.maximum_flow_value(G, 1, 2, **kwargs)
assert flow_value == 5 * (N - 1), msg.format(flow_func.__name__)
def test_pyramid(self):
N = 10
# N = 100 # this gives a graph with 5051 nodes
G = gen_pyramid(N)
R = build_residual_network(G, 'capacity')
kwargs = dict(residual=R)
for flow_func in flow_funcs:
kwargs['flow_func'] = flow_func
flow_value = nx.maximum_flow_value(G, (0, 0), 't', **kwargs)
assert almost_equal(flow_value, 1.), msg.format(flow_func.__name__)
def test_gl1(self):
G = read_graph('gl1')
s = 1
t = len(G)
R = build_residual_network(G, 'capacity')
kwargs = dict(residual=R)
# do one flow_func to save time
flow_func = flow_funcs[0]
validate_flows(G, s, t, 156545, flow_func(G, s, t, **kwargs),
flow_func)
# for flow_func in flow_funcs:
# validate_flows(G, s, t, 156545, flow_func(G, s, t, **kwargs),
# flow_func)
def test_gw1(self):
G = read_graph('gw1')
s = 1
t = len(G)
R = build_residual_network(G, 'capacity')
kwargs = dict(residual=R)
for flow_func in flow_funcs:
validate_flows(G, s, t, 1202018, flow_func(G, s, t, **kwargs),
flow_func)
def test_wlm3(self):
G = read_graph('wlm3')
s = 1
t = len(G)
R = build_residual_network(G, 'capacity')
kwargs = dict(residual=R)
# do one flow_func to save time
flow_func = flow_funcs[0]
validate_flows(G, s, t, 11875108, flow_func(G, s, t, **kwargs),
flow_func)
# for flow_func in flow_funcs:
# validate_flows(G, s, t, 11875108, flow_func(G, s, t, **kwargs),
# flow_func)
def test_preflow_push_global_relabel(self):
G = read_graph('gw1')
R = preflow_push(G, 1, len(G), global_relabel_freq=50)
assert R.graph['flow_value'] == 1202018