Module BML.transform.graph

Expand source code
from BML.transform import BaseTransform, BaseTransformParallelized
from BML.utils import ProcessingQueue

import networkit as nk
import networkx as nx

import numpy as np
import pandas as pd

import multiprocessing, pickle, time

def buildWeightedGraph(routes):

    graph =  nx.Graph()

    for prefix in routes.keys():

        #nbIp = 2 ** (32-int(prefix.split('/')[1]))
        nbIp = 1

        origins = []
        vertices = []
        edges = []

        for collector in routes[prefix].keys():
            
            for peer in routes[prefix][collector].keys():
                path = routes[prefix][collector][peer]
                path_vertices = []
                path_edges = []
                path_origin = ""

                if(path != None):

                    if('{' in path or '}' in path):
                        pass
                    else:
                        path_vertices = path.split(' ')
                        path_origin = path_vertices[-1]
                        for i in range(len(path_vertices)-1):
                            path_edges.append([path_vertices[i], path_vertices[i+1]])

                        if(path_origin not in origins):
                            origins.append(path_origin)

                        for vertex in path_vertices:
                            if(vertex not in vertices):
                                vertices.append(vertex)

                        for edge in path_edges:
                            if(edge not in edges):
                                edges.append(edge)

        for vertex in vertices:
            if(not graph.has_node(vertex)):
                graph.add_node(vertex, nbIp=0)

            
        for (a,b) in edges:
            if(not graph.has_edge(a,b)):
                graph.add_edge(a,b, nbIp=0)

            graph[a][b]['nbIp'] += nbIp

        for origin in origins:
            graph.nodes[origin]['nbIp'] += nbIp

    return(graph)

def buildGraph(routes):

    G =  nx.Graph()

    edges = set()

    for prefix in routes.keys():

        for collector in routes[prefix].keys():
            
            for peer in routes[prefix][collector].keys():
                path = routes[prefix][collector][peer]

                if(path != None):

                    if('{' in path or '}' in path):
                        pass
                    else:

                        path_vertices = path.split(' ')

                        for i in range(len(path_vertices)-1):
                            a,b = (path_vertices[i], path_vertices[i+1])
                            if(a!=b):
                                edges.add((a,b))
    
    G.add_edges_from(edges)

    return(G)

class Graph(BaseTransformParallelized):
    
    computeUpdates = False
    
    fileExtension = ".pickle"

    def __init__(self, primingFile, dataFile, params, outFolder, logFiles):
        
        self.params["relabel_nodes"] = False
        self.params["weighted"] = False

        BaseTransformParallelized.__init__(self, primingFile, dataFile, params, outFolder, logFiles)
        self.prevT = 0

    def computeSnapshot(self, t, routes, updatesParsed):
        
        self.pq.waitUntilFree()
        self.pq.addProcess(target=self.runBuildGraph, args=(self.data, t, routes))
        self.pq.runOnce()
        
        if(t!=0 and t%self.params["nbProcess"]==0 or t+1==self.T):
            
            self.pq.join()
            
            for i in range(self.prevT,t+1):
                self.pq.waitUntilFree()
                self.pq.addProcess(target=self.runTransforms, args=(self.data, i, self.data[i]))
                self.pq.runOnce()
            
            self.prevT = t+1
            
    def runBuildGraph(self, data, index, routes):
        if(self.params["weighted"]):
            G = buildWeightedGraph(routes)
        else:
            G = buildGraph(routes)
            
        if(self.params["relabel_nodes"]):
            G = nx.convert_node_labels_to_integers(G, label_attribute="ASN")
        data[index] = G
        del routes
        return(None)
    
    def runTransforms(self, data, index, G):
        data[index] = self.transforms(index, G)
        del G
        return(None)
    
    def save(self):
        with open(self.filePath, 'wb') as outfile:
            pickle.dump(self.transformedData, outfile)
        self.log("saved")

    def transforms(self, index, G):
        return(G)

Functions

def buildGraph(routes)
Expand source code
def buildGraph(routes):

    G =  nx.Graph()

    edges = set()

    for prefix in routes.keys():

        for collector in routes[prefix].keys():
            
            for peer in routes[prefix][collector].keys():
                path = routes[prefix][collector][peer]

                if(path != None):

                    if('{' in path or '}' in path):
                        pass
                    else:

                        path_vertices = path.split(' ')

                        for i in range(len(path_vertices)-1):
                            a,b = (path_vertices[i], path_vertices[i+1])
                            if(a!=b):
                                edges.add((a,b))
    
    G.add_edges_from(edges)

    return(G)
def buildWeightedGraph(routes)
Expand source code
def buildWeightedGraph(routes):

    graph =  nx.Graph()

    for prefix in routes.keys():

        #nbIp = 2 ** (32-int(prefix.split('/')[1]))
        nbIp = 1

        origins = []
        vertices = []
        edges = []

        for collector in routes[prefix].keys():
            
            for peer in routes[prefix][collector].keys():
                path = routes[prefix][collector][peer]
                path_vertices = []
                path_edges = []
                path_origin = ""

                if(path != None):

                    if('{' in path or '}' in path):
                        pass
                    else:
                        path_vertices = path.split(' ')
                        path_origin = path_vertices[-1]
                        for i in range(len(path_vertices)-1):
                            path_edges.append([path_vertices[i], path_vertices[i+1]])

                        if(path_origin not in origins):
                            origins.append(path_origin)

                        for vertex in path_vertices:
                            if(vertex not in vertices):
                                vertices.append(vertex)

                        for edge in path_edges:
                            if(edge not in edges):
                                edges.append(edge)

        for vertex in vertices:
            if(not graph.has_node(vertex)):
                graph.add_node(vertex, nbIp=0)

            
        for (a,b) in edges:
            if(not graph.has_edge(a,b)):
                graph.add_edge(a,b, nbIp=0)

            graph[a][b]['nbIp'] += nbIp

        for origin in origins:
            graph.nodes[origin]['nbIp'] += nbIp

    return(graph)

Classes

class Graph (primingFile, dataFile, params, outFolder, logFiles)
Expand source code
class Graph(BaseTransformParallelized):
    
    computeUpdates = False
    
    fileExtension = ".pickle"

    def __init__(self, primingFile, dataFile, params, outFolder, logFiles):
        
        self.params["relabel_nodes"] = False
        self.params["weighted"] = False

        BaseTransformParallelized.__init__(self, primingFile, dataFile, params, outFolder, logFiles)
        self.prevT = 0

    def computeSnapshot(self, t, routes, updatesParsed):
        
        self.pq.waitUntilFree()
        self.pq.addProcess(target=self.runBuildGraph, args=(self.data, t, routes))
        self.pq.runOnce()
        
        if(t!=0 and t%self.params["nbProcess"]==0 or t+1==self.T):
            
            self.pq.join()
            
            for i in range(self.prevT,t+1):
                self.pq.waitUntilFree()
                self.pq.addProcess(target=self.runTransforms, args=(self.data, i, self.data[i]))
                self.pq.runOnce()
            
            self.prevT = t+1
            
    def runBuildGraph(self, data, index, routes):
        if(self.params["weighted"]):
            G = buildWeightedGraph(routes)
        else:
            G = buildGraph(routes)
            
        if(self.params["relabel_nodes"]):
            G = nx.convert_node_labels_to_integers(G, label_attribute="ASN")
        data[index] = G
        del routes
        return(None)
    
    def runTransforms(self, data, index, G):
        data[index] = self.transforms(index, G)
        del G
        return(None)
    
    def save(self):
        with open(self.filePath, 'wb') as outfile:
            pickle.dump(self.transformedData, outfile)
        self.log("saved")

    def transforms(self, index, G):
        return(G)

Ancestors

Subclasses

Class variables

var computeUpdates
var fileExtension

Methods

def computeSnapshot(self, t, routes, updatesParsed)
Expand source code
def computeSnapshot(self, t, routes, updatesParsed):
    
    self.pq.waitUntilFree()
    self.pq.addProcess(target=self.runBuildGraph, args=(self.data, t, routes))
    self.pq.runOnce()
    
    if(t!=0 and t%self.params["nbProcess"]==0 or t+1==self.T):
        
        self.pq.join()
        
        for i in range(self.prevT,t+1):
            self.pq.waitUntilFree()
            self.pq.addProcess(target=self.runTransforms, args=(self.data, i, self.data[i]))
            self.pq.runOnce()
        
        self.prevT = t+1
def runBuildGraph(self, data, index, routes)
Expand source code
def runBuildGraph(self, data, index, routes):
    if(self.params["weighted"]):
        G = buildWeightedGraph(routes)
    else:
        G = buildGraph(routes)
        
    if(self.params["relabel_nodes"]):
        G = nx.convert_node_labels_to_integers(G, label_attribute="ASN")
    data[index] = G
    del routes
    return(None)
def runTransforms(self, data, index, G)
Expand source code
def runTransforms(self, data, index, G):
    data[index] = self.transforms(index, G)
    del G
    return(None)
def save(self)
Expand source code
def save(self):
    with open(self.filePath, 'wb') as outfile:
        pickle.dump(self.transformedData, outfile)
    self.log("saved")
def transforms(self, index, G)
Expand source code
def transforms(self, index, G):
    return(G)