# -*- coding: utf-8 -*- import networkx as nx import pytest import os class TestMinCostFlow: def test_simple_digraph(self): G = nx.DiGraph() G.add_node('a', demand=-5) G.add_node('d', demand=5) G.add_edge('a', 'b', weight=3, capacity=4) G.add_edge('a', 'c', weight=6, capacity=10) G.add_edge('b', 'd', weight=1, capacity=9) G.add_edge('c', 'd', weight=2, capacity=5) flowCost, H = nx.network_simplex(G) soln = {'a': {'b': 4, 'c': 1}, 'b': {'d': 4}, 'c': {'d': 1}, 'd': {}} assert flowCost == 24 assert nx.min_cost_flow_cost(G) == 24 assert H == soln assert nx.min_cost_flow(G) == soln assert nx.cost_of_flow(G, H) == 24 flowCost, H = nx.capacity_scaling(G) assert flowCost == 24 assert nx.cost_of_flow(G, H) == 24 assert H == soln def test_negcycle_infcap(self): G = nx.DiGraph() G.add_node('s', demand=-5) G.add_node('t', demand=5) G.add_edge('s', 'a', weight=1, capacity=3) G.add_edge('a', 'b', weight=3) G.add_edge('c', 'a', weight=-6) G.add_edge('b', 'd', weight=1) G.add_edge('d', 'c', weight=-2) G.add_edge('d', 't', weight=1, capacity=3) pytest.raises(nx.NetworkXUnfeasible, nx.network_simplex, G) pytest.raises(nx.NetworkXUnbounded, nx.capacity_scaling, G) def test_sum_demands_not_zero(self): G = nx.DiGraph() G.add_node('s', demand=-5) G.add_node('t', demand=4) G.add_edge('s', 'a', weight=1, capacity=3) G.add_edge('a', 'b', weight=3) G.add_edge('a', 'c', weight=-6) G.add_edge('b', 'd', weight=1) G.add_edge('c', 'd', weight=-2) G.add_edge('d', 't', weight=1, capacity=3) pytest.raises(nx.NetworkXUnfeasible, nx.network_simplex, G) pytest.raises(nx.NetworkXUnfeasible, nx.capacity_scaling, G) def test_no_flow_satisfying_demands(self): G = nx.DiGraph() G.add_node('s', demand=-5) G.add_node('t', demand=5) G.add_edge('s', 'a', weight=1, capacity=3) G.add_edge('a', 'b', weight=3) G.add_edge('a', 'c', weight=-6) G.add_edge('b', 'd', weight=1) G.add_edge('c', 'd', weight=-2) G.add_edge('d', 't', weight=1, capacity=3) pytest.raises(nx.NetworkXUnfeasible, nx.network_simplex, G) pytest.raises(nx.NetworkXUnfeasible, nx.capacity_scaling, G) def test_transshipment(self): G = nx.DiGraph() G.add_node('a', demand=1) G.add_node('b', demand=-2) G.add_node('c', demand=-2) G.add_node('d', demand=3) G.add_node('e', demand=-4) G.add_node('f', demand=-4) G.add_node('g', demand=3) G.add_node('h', demand=2) G.add_node('r', demand=3) G.add_edge('a', 'c', weight=3) G.add_edge('r', 'a', weight=2) G.add_edge('b', 'a', weight=9) G.add_edge('r', 'c', weight=0) G.add_edge('b', 'r', weight=-6) G.add_edge('c', 'd', weight=5) G.add_edge('e', 'r', weight=4) G.add_edge('e', 'f', weight=3) G.add_edge('h', 'b', weight=4) G.add_edge('f', 'd', weight=7) G.add_edge('f', 'h', weight=12) G.add_edge('g', 'd', weight=12) G.add_edge('f', 'g', weight=-1) G.add_edge('h', 'g', weight=-10) flowCost, H = nx.network_simplex(G) soln = {'a': {'c': 0}, 'b': {'a': 0, 'r': 2}, 'c': {'d': 3}, 'd': {}, 'e': {'r': 3, 'f': 1}, 'f': {'d': 0, 'g': 3, 'h': 2}, 'g': {'d': 0}, 'h': {'b': 0, 'g': 0}, 'r': {'a': 1, 'c': 1}} assert flowCost == 41 assert nx.min_cost_flow_cost(G) == 41 assert H == soln assert nx.min_cost_flow(G) == soln assert nx.cost_of_flow(G, H) == 41 flowCost, H = nx.capacity_scaling(G) assert flowCost == 41 assert nx.cost_of_flow(G, H) == 41 assert H == soln def test_max_flow_min_cost(self): G = nx.DiGraph() G.add_edge('s', 'a', bandwidth=6) G.add_edge('s', 'c', bandwidth=10, cost=10) G.add_edge('a', 'b', cost=6) G.add_edge('b', 'd', bandwidth=8, cost=7) G.add_edge('c', 'd', cost=10) G.add_edge('d', 't', bandwidth=5, cost=5) soln = {'s': {'a': 5, 'c': 0}, 'a': {'b': 5}, 'b': {'d': 5}, 'c': {'d': 0}, 'd': {'t': 5}, 't': {}} flow = nx.max_flow_min_cost(G, 's', 't', capacity='bandwidth', weight='cost') assert flow == soln assert nx.cost_of_flow(G, flow, weight='cost') == 90 G.add_edge('t', 's', cost=-100) flowCost, flow = nx.capacity_scaling(G, capacity='bandwidth', weight='cost') G.remove_edge('t', 's') assert flowCost == -410 assert flow['t']['s'] == 5 del flow['t']['s'] assert flow == soln assert nx.cost_of_flow(G, flow, weight='cost') == 90 def test_digraph1(self): # From Bradley, S. P., Hax, A. C. and Magnanti, T. L. Applied # Mathematical Programming. Addison-Wesley, 1977. G = nx.DiGraph() G.add_node(1, demand=-20) G.add_node(4, demand=5) G.add_node(5, demand=15) G.add_edges_from([(1, 2, {'capacity': 15, 'weight': 4}), (1, 3, {'capacity': 8, 'weight': 4}), (2, 3, {'weight': 2}), (2, 4, {'capacity': 4, 'weight': 2}), (2, 5, {'capacity': 10, 'weight': 6}), (3, 4, {'capacity': 15, 'weight': 1}), (3, 5, {'capacity': 5, 'weight': 3}), (4, 5, {'weight': 2}), (5, 3, {'capacity': 4, 'weight': 1})]) flowCost, H = nx.network_simplex(G) soln = {1: {2: 12, 3: 8}, 2: {3: 8, 4: 4, 5: 0}, 3: {4: 11, 5: 5}, 4: {5: 10}, 5: {3: 0}} assert flowCost == 150 assert nx.min_cost_flow_cost(G) == 150 assert H == soln assert nx.min_cost_flow(G) == soln assert nx.cost_of_flow(G, H) == 150 flowCost, H = nx.capacity_scaling(G) assert flowCost == 150 assert H == soln assert nx.cost_of_flow(G, H) == 150 def test_digraph2(self): # Example from ticket #430 from mfrasca. Original source: # http://www.cs.princeton.edu/courses/archive/spr03/cs226/lectures/mincost.4up.pdf, slide 11. G = nx.DiGraph() G.add_edge('s', 1, capacity=12) G.add_edge('s', 2, capacity=6) G.add_edge('s', 3, capacity=14) G.add_edge(1, 2, capacity=11, weight=4) G.add_edge(2, 3, capacity=9, weight=6) G.add_edge(1, 4, capacity=5, weight=5) G.add_edge(1, 5, capacity=2, weight=12) G.add_edge(2, 5, capacity=4, weight=4) G.add_edge(2, 6, capacity=2, weight=6) G.add_edge(3, 6, capacity=31, weight=3) G.add_edge(4, 5, capacity=18, weight=4) G.add_edge(5, 6, capacity=9, weight=5) G.add_edge(4, 't', capacity=3) G.add_edge(5, 't', capacity=7) G.add_edge(6, 't', capacity=22) flow = nx.max_flow_min_cost(G, 's', 't') soln = {1: {2: 6, 4: 5, 5: 1}, 2: {3: 6, 5: 4, 6: 2}, 3: {6: 20}, 4: {5: 2, 't': 3}, 5: {6: 0, 't': 7}, 6: {'t': 22}, 's': {1: 12, 2: 6, 3: 14}, 't': {}} assert flow == soln G.add_edge('t', 's', weight=-100) flowCost, flow = nx.capacity_scaling(G) G.remove_edge('t', 's') assert flow['t']['s'] == 32 assert flowCost == -3007 del flow['t']['s'] assert flow == soln assert nx.cost_of_flow(G, flow) == 193 def test_digraph3(self): """Combinatorial Optimization: Algorithms and Complexity, Papadimitriou Steiglitz at page 140 has an example, 7.1, but that admits multiple solutions, so I alter it a bit. From ticket #430 by mfrasca.""" G = nx.DiGraph() G.add_edge('s', 'a') G['s']['a'].update({0: 2, 1: 4}) G.add_edge('s', 'b') G['s']['b'].update({0: 2, 1: 1}) G.add_edge('a', 'b') G['a']['b'].update({0: 5, 1: 2}) G.add_edge('a', 't') G['a']['t'].update({0: 1, 1: 5}) G.add_edge('b', 'a') G['b']['a'].update({0: 1, 1: 3}) G.add_edge('b', 't') G['b']['t'].update({0: 3, 1: 2}) "PS.ex.7.1: testing main function" sol = nx.max_flow_min_cost(G, 's', 't', capacity=0, weight=1) flow = sum(v for v in sol['s'].values()) assert 4 == flow assert 23 == nx.cost_of_flow(G, sol, weight=1) assert sol['s'] == {'a': 2, 'b': 2} assert sol['a'] == {'b': 1, 't': 1} assert sol['b'] == {'a': 0, 't': 3} assert sol['t'] == {} G.add_edge('t', 's') G['t']['s'].update({1: -100}) flowCost, sol = nx.capacity_scaling(G, capacity=0, weight=1) G.remove_edge('t', 's') flow = sum(v for v in sol['s'].values()) assert 4 == flow assert sol['t']['s'] == 4 assert flowCost == -377 del sol['t']['s'] assert sol['s'] == {'a': 2, 'b': 2} assert sol['a'] == {'b': 1, 't': 1} assert sol['b'] == {'a': 0, 't': 3} assert sol['t'] == {} assert nx.cost_of_flow(G, sol, weight=1) == 23 def test_zero_capacity_edges(self): """Address issue raised in ticket #617 by arv.""" G = nx.DiGraph() G.add_edges_from([(1, 2, {'capacity': 1, 'weight': 1}), (1, 5, {'capacity': 1, 'weight': 1}), (2, 3, {'capacity': 0, 'weight': 1}), (2, 5, {'capacity': 1, 'weight': 1}), (5, 3, {'capacity': 2, 'weight': 1}), (5, 4, {'capacity': 0, 'weight': 1}), (3, 4, {'capacity': 2, 'weight': 1})]) G.nodes[1]['demand'] = -1 G.nodes[2]['demand'] = -1 G.nodes[4]['demand'] = 2 flowCost, H = nx.network_simplex(G) soln = {1: {2: 0, 5: 1}, 2: {3: 0, 5: 1}, 3: {4: 2}, 4: {}, 5: {3: 2, 4: 0}} assert flowCost == 6 assert nx.min_cost_flow_cost(G) == 6 assert H == soln assert nx.min_cost_flow(G) == soln assert nx.cost_of_flow(G, H) == 6 flowCost, H = nx.capacity_scaling(G) assert flowCost == 6 assert H == soln assert nx.cost_of_flow(G, H) == 6 def test_digon(self): """Check if digons are handled properly. Taken from ticket #618 by arv.""" nodes = [(1, {}), (2, {'demand': -4}), (3, {'demand': 4}), ] edges = [(1, 2, {'capacity': 3, 'weight': 600000}), (2, 1, {'capacity': 2, 'weight': 0}), (2, 3, {'capacity': 5, 'weight': 714285}), (3, 2, {'capacity': 2, 'weight': 0}), ] G = nx.DiGraph(edges) G.add_nodes_from(nodes) flowCost, H = nx.network_simplex(G) soln = {1: {2: 0}, 2: {1: 0, 3: 4}, 3: {2: 0}} assert flowCost == 2857140 assert nx.min_cost_flow_cost(G) == 2857140 assert H == soln assert nx.min_cost_flow(G) == soln assert nx.cost_of_flow(G, H) == 2857140 flowCost, H = nx.capacity_scaling(G) assert flowCost == 2857140 assert H == soln assert nx.cost_of_flow(G, H) == 2857140 def test_deadend(self): """Check if one-node cycles are handled properly. Taken from ticket #2906 from @sshraven.""" G = nx.DiGraph() G.add_nodes_from(range(5), demand=0) G.nodes[4]['demand'] = -13 G.nodes[3]['demand'] = 13 G.add_edges_from([(0,2), (0, 3), (2, 1)], capacity=20, weight=0.1) pytest.raises(nx.NetworkXUnfeasible, nx.min_cost_flow, G) def test_infinite_capacity_neg_digon(self): """An infinite capacity negative cost digon results in an unbounded instance.""" nodes = [(1, {}), (2, {'demand': -4}), (3, {'demand': 4}), ] edges = [(1, 2, {'weight': -600}), (2, 1, {'weight': 0}), (2, 3, {'capacity': 5, 'weight': 714285}), (3, 2, {'capacity': 2, 'weight': 0}), ] G = nx.DiGraph(edges) G.add_nodes_from(nodes) pytest.raises(nx.NetworkXUnbounded, nx.network_simplex, G) pytest.raises(nx.NetworkXUnbounded, nx.capacity_scaling, G) def test_finite_capacity_neg_digon(self): """The digon should receive the maximum amount of flow it can handle. Taken from ticket #749 by @chuongdo.""" G = nx.DiGraph() G.add_edge('a', 'b', capacity=1, weight=-1) G.add_edge('b', 'a', capacity=1, weight=-1) min_cost = -2 assert nx.min_cost_flow_cost(G) == min_cost flowCost, H = nx.capacity_scaling(G) assert flowCost == -2 assert H == {'a': {'b': 1}, 'b': {'a': 1}} assert nx.cost_of_flow(G, H) == -2 def test_multidigraph(self): """Multidigraphs are acceptable.""" G = nx.MultiDiGraph() G.add_weighted_edges_from([(1, 2, 1), (2, 3, 2)], weight='capacity') flowCost, H = nx.network_simplex(G) assert flowCost == 0 assert H == {1: {2: {0: 0}}, 2: {3: {0: 0}}, 3: {}} flowCost, H = nx.capacity_scaling(G) assert flowCost == 0 assert H == {1: {2: {0: 0}}, 2: {3: {0: 0}}, 3: {}} def test_negative_selfloops(self): """Negative selfloops should cause an exception if uncapacitated and always be saturated otherwise. """ G = nx.DiGraph() G.add_edge(1, 1, weight=-1) pytest.raises(nx.NetworkXUnbounded, nx.network_simplex, G) pytest.raises(nx.NetworkXUnbounded, nx.capacity_scaling, G) G[1][1]['capacity'] = 2 flowCost, H = nx.network_simplex(G) assert flowCost == -2 assert H == {1: {1: 2}} flowCost, H = nx.capacity_scaling(G) assert flowCost == -2 assert H == {1: {1: 2}} G = nx.MultiDiGraph() G.add_edge(1, 1, 'x', weight=-1) G.add_edge(1, 1, 'y', weight=1) pytest.raises(nx.NetworkXUnbounded, nx.network_simplex, G) pytest.raises(nx.NetworkXUnbounded, nx.capacity_scaling, G) G[1][1]['x']['capacity'] = 2 flowCost, H = nx.network_simplex(G) assert flowCost == -2 assert H == {1: {1: {'x': 2, 'y': 0}}} flowCost, H = nx.capacity_scaling(G) assert flowCost == -2 assert H == {1: {1: {'x': 2, 'y': 0}}} def test_bone_shaped(self): # From #1283 G = nx.DiGraph() G.add_node(0, demand=-4) G.add_node(1, demand=2) G.add_node(2, demand=2) G.add_node(3, demand=4) G.add_node(4, demand=-2) G.add_node(5, demand=-2) G.add_edge(0, 1, capacity=4) G.add_edge(0, 2, capacity=4) G.add_edge(4, 3, capacity=4) G.add_edge(5, 3, capacity=4) G.add_edge(0, 3, capacity=0) flowCost, H = nx.network_simplex(G) assert flowCost == 0 assert ( H == {0: {1: 2, 2: 2, 3: 0}, 1: {}, 2: {}, 3: {}, 4: {3: 2}, 5: {3: 2}}) flowCost, H = nx.capacity_scaling(G) assert flowCost == 0 assert ( H == {0: {1: 2, 2: 2, 3: 0}, 1: {}, 2: {}, 3: {}, 4: {3: 2}, 5: {3: 2}}) def test_exceptions(self): G = nx.Graph() pytest.raises(nx.NetworkXNotImplemented, nx.network_simplex, G) pytest.raises(nx.NetworkXNotImplemented, nx.capacity_scaling, G) G = nx.MultiGraph() pytest.raises(nx.NetworkXNotImplemented, nx.network_simplex, G) pytest.raises(nx.NetworkXNotImplemented, nx.capacity_scaling, G) G = nx.DiGraph() pytest.raises(nx.NetworkXError, nx.network_simplex, G) pytest.raises(nx.NetworkXError, nx.capacity_scaling, G) G.add_node(0, demand=float('inf')) pytest.raises(nx.NetworkXError, nx.network_simplex, G) pytest.raises(nx.NetworkXUnfeasible, nx.capacity_scaling, G) G.nodes[0]['demand'] = 0 G.add_node(1, demand=0) G.add_edge(0, 1, weight=-float('inf')) pytest.raises(nx.NetworkXError, nx.network_simplex, G) pytest.raises(nx.NetworkXUnfeasible, nx.capacity_scaling, G) G[0][1]['weight'] = 0 G.add_edge(0, 0, weight=float('inf')) pytest.raises(nx.NetworkXError, nx.network_simplex, G) #pytest.raises(nx.NetworkXError, nx.capacity_scaling, G) G[0][0]['weight'] = 0 G[0][1]['capacity'] = -1 pytest.raises(nx.NetworkXUnfeasible, nx.network_simplex, G) #pytest.raises(nx.NetworkXUnfeasible, nx.capacity_scaling, G) G[0][1]['capacity'] = 0 G[0][0]['capacity'] = -1 pytest.raises(nx.NetworkXUnfeasible, nx.network_simplex, G) #pytest.raises(nx.NetworkXUnfeasible, nx.capacity_scaling, G) def test_large(self): fname = os.path.join(os.path.dirname(__file__), 'netgen-2.gpickle.bz2') G = nx.read_gpickle(fname) flowCost, flowDict = nx.network_simplex(G) assert 6749969302 == flowCost assert 6749969302 == nx.cost_of_flow(G, flowDict) flowCost, flowDict = nx.capacity_scaling(G) assert 6749969302 == flowCost assert 6749969302 == nx.cost_of_flow(G, flowDict)