Module BML.data.updates

Expand source code
#!/usr/bin/env python3

import time, json
from pybgpstream import BGPStream, BGPRecord, BGPElem
from BML import utils

def serialize_sets(obj):
    if isinstance(obj, set):
        return list(obj)
    return obj

class UpdatesDump(utils.BmlProcess):

    def __init__(self, start, end, params, outFolder, logFiles):

        utils.BmlProcess.__init__(self, logFiles)

        self.startTime = start
        self.endTime = end

        self.stream = BGPStream()

        self.filePath = self.getFilePath(outFolder)

        self.params = {
            "Projects": ['ris','routeviews'],
            "Collectors": [],
            "IpVersion": [4,6],
            "UseRibs": False,
        }
        
        self.setParams(params)

    def startStream(self):

        if(self.startTime!=-1 and self.endTime!=-1):

            if(self.params["UseRibs"]):
                self.stream = BGPStream(
                    from_time=self.startTime, until_time=self.endTime,
                    collectors=self.params["Collectors"],
                    projects=self.params["Projects"]
                    )
            else:
                self.stream = BGPStream(
                    from_time=self.startTime, until_time=self.endTime,
                    collectors=self.params["Collectors"],
                    record_type="updates",
                    projects=self.params["Projects"]
                    )
        else:
            quit("Error: can't start stream, interval not set")

    def buildUpdatesDump(self):
        
        self.startProgress = self.startTime
        self.endProgress = self.endTime

        self.emptyFile()

        for record in self.stream.records():

            if record.status == "valid":

                for elem in record:

                    self.printProgress(elem.time)

                    if(elem.type=='A' or elem.type=='W' or elem.type=='R'):
                        
                        if(utils.ipVersion(elem.fields['prefix']) in self.params["IpVersion"]):

                            u = {}

                            u['collector'] = str(record.collector)
                            u['dump_time'] = str(record.dump_time)
                            u['type'] = str(elem.type)
                            u['time'] = str(int(elem.time))
                            u['peer_address'] = str(elem.peer_address)
                            u['peer_asn'] = str(elem.peer_asn)
                            u['fields'] = json.dumps(elem.fields, default=serialize_sets)

                            self.appendToFile(u)

    def getFilePath(self, path):
        return(utils.mkdirPath(path) + "updates.csv")

    def emptyFile(self):
        file = open(self.filePath,"w")
        file.write("collector,dump_time,type,time,peer_address,peer_asn,fields" + '\n')
        file.close()
        return(self.filePath)

    def appendToFile(self, u):
        file = open(self.filePath,"a")
        file.write(u['collector']+","+u['dump_time']+","+u['type']+","+u['time']+","+u['peer_address']+","+u['peer_asn']+","+u['fields'] + '\n')
        file.close()
        return(self.filePath)

    def execute(self):
        
        timeAtStart = time.time()
        
        self.log("###############")
        self.log("# Updates dump")
        self.log("###############")
        self.log("Start time: " + str(self.startTime))
        self.log("End time: " + str(self.endTime))
        self.log("Duration: " + utils.timeFormat(self.endTime-self.startTime))
        self.printParams()

        self.startStream()
        self.buildUpdatesDump()
        
        self.log("Computation time: " + utils.timeFormat(time.time()-timeAtStart))
        self.log("Updates dump saved to: " + self.filePath)
        

def dumpUpdates(start, end, outfolder, params=None, logFiles=None):
    
    if(params is None):
        params = {}
        
    if(logFiles is None):
        logFiles = []
    
    logFile = open(utils.mkdirPath(outfolder)+"updates_dump.log",'w')
    logFiles.append(logFile)

    updatesDump = UpdatesDump(start, end, params, outfolder, logFiles)
    updatesDump.execute()
    logFile.close()

    return(updatesDump.filePath)

Functions

def dumpUpdates(start, end, outfolder, params=None, logFiles=None)
Expand source code
def dumpUpdates(start, end, outfolder, params=None, logFiles=None):
    
    if(params is None):
        params = {}
        
    if(logFiles is None):
        logFiles = []
    
    logFile = open(utils.mkdirPath(outfolder)+"updates_dump.log",'w')
    logFiles.append(logFile)

    updatesDump = UpdatesDump(start, end, params, outfolder, logFiles)
    updatesDump.execute()
    logFile.close()

    return(updatesDump.filePath)
def serialize_sets(obj)
Expand source code
def serialize_sets(obj):
    if isinstance(obj, set):
        return list(obj)
    return obj

Classes

class UpdatesDump (start, end, params, outFolder, logFiles)
Expand source code
class UpdatesDump(utils.BmlProcess):

    def __init__(self, start, end, params, outFolder, logFiles):

        utils.BmlProcess.__init__(self, logFiles)

        self.startTime = start
        self.endTime = end

        self.stream = BGPStream()

        self.filePath = self.getFilePath(outFolder)

        self.params = {
            "Projects": ['ris','routeviews'],
            "Collectors": [],
            "IpVersion": [4,6],
            "UseRibs": False,
        }
        
        self.setParams(params)

    def startStream(self):

        if(self.startTime!=-1 and self.endTime!=-1):

            if(self.params["UseRibs"]):
                self.stream = BGPStream(
                    from_time=self.startTime, until_time=self.endTime,
                    collectors=self.params["Collectors"],
                    projects=self.params["Projects"]
                    )
            else:
                self.stream = BGPStream(
                    from_time=self.startTime, until_time=self.endTime,
                    collectors=self.params["Collectors"],
                    record_type="updates",
                    projects=self.params["Projects"]
                    )
        else:
            quit("Error: can't start stream, interval not set")

    def buildUpdatesDump(self):
        
        self.startProgress = self.startTime
        self.endProgress = self.endTime

        self.emptyFile()

        for record in self.stream.records():

            if record.status == "valid":

                for elem in record:

                    self.printProgress(elem.time)

                    if(elem.type=='A' or elem.type=='W' or elem.type=='R'):
                        
                        if(utils.ipVersion(elem.fields['prefix']) in self.params["IpVersion"]):

                            u = {}

                            u['collector'] = str(record.collector)
                            u['dump_time'] = str(record.dump_time)
                            u['type'] = str(elem.type)
                            u['time'] = str(int(elem.time))
                            u['peer_address'] = str(elem.peer_address)
                            u['peer_asn'] = str(elem.peer_asn)
                            u['fields'] = json.dumps(elem.fields, default=serialize_sets)

                            self.appendToFile(u)

    def getFilePath(self, path):
        return(utils.mkdirPath(path) + "updates.csv")

    def emptyFile(self):
        file = open(self.filePath,"w")
        file.write("collector,dump_time,type,time,peer_address,peer_asn,fields" + '\n')
        file.close()
        return(self.filePath)

    def appendToFile(self, u):
        file = open(self.filePath,"a")
        file.write(u['collector']+","+u['dump_time']+","+u['type']+","+u['time']+","+u['peer_address']+","+u['peer_asn']+","+u['fields'] + '\n')
        file.close()
        return(self.filePath)

    def execute(self):
        
        timeAtStart = time.time()
        
        self.log("###############")
        self.log("# Updates dump")
        self.log("###############")
        self.log("Start time: " + str(self.startTime))
        self.log("End time: " + str(self.endTime))
        self.log("Duration: " + utils.timeFormat(self.endTime-self.startTime))
        self.printParams()

        self.startStream()
        self.buildUpdatesDump()
        
        self.log("Computation time: " + utils.timeFormat(time.time()-timeAtStart))
        self.log("Updates dump saved to: " + self.filePath)

Ancestors

Methods

def appendToFile(self, u)
Expand source code
def appendToFile(self, u):
    file = open(self.filePath,"a")
    file.write(u['collector']+","+u['dump_time']+","+u['type']+","+u['time']+","+u['peer_address']+","+u['peer_asn']+","+u['fields'] + '\n')
    file.close()
    return(self.filePath)
def buildUpdatesDump(self)
Expand source code
def buildUpdatesDump(self):
    
    self.startProgress = self.startTime
    self.endProgress = self.endTime

    self.emptyFile()

    for record in self.stream.records():

        if record.status == "valid":

            for elem in record:

                self.printProgress(elem.time)

                if(elem.type=='A' or elem.type=='W' or elem.type=='R'):
                    
                    if(utils.ipVersion(elem.fields['prefix']) in self.params["IpVersion"]):

                        u = {}

                        u['collector'] = str(record.collector)
                        u['dump_time'] = str(record.dump_time)
                        u['type'] = str(elem.type)
                        u['time'] = str(int(elem.time))
                        u['peer_address'] = str(elem.peer_address)
                        u['peer_asn'] = str(elem.peer_asn)
                        u['fields'] = json.dumps(elem.fields, default=serialize_sets)

                        self.appendToFile(u)
def emptyFile(self)
Expand source code
def emptyFile(self):
    file = open(self.filePath,"w")
    file.write("collector,dump_time,type,time,peer_address,peer_asn,fields" + '\n')
    file.close()
    return(self.filePath)
def execute(self)
Expand source code
def execute(self):
    
    timeAtStart = time.time()
    
    self.log("###############")
    self.log("# Updates dump")
    self.log("###############")
    self.log("Start time: " + str(self.startTime))
    self.log("End time: " + str(self.endTime))
    self.log("Duration: " + utils.timeFormat(self.endTime-self.startTime))
    self.printParams()

    self.startStream()
    self.buildUpdatesDump()
    
    self.log("Computation time: " + utils.timeFormat(time.time()-timeAtStart))
    self.log("Updates dump saved to: " + self.filePath)
def getFilePath(self, path)
Expand source code
def getFilePath(self, path):
    return(utils.mkdirPath(path) + "updates.csv")
def startStream(self)
Expand source code
def startStream(self):

    if(self.startTime!=-1 and self.endTime!=-1):

        if(self.params["UseRibs"]):
            self.stream = BGPStream(
                from_time=self.startTime, until_time=self.endTime,
                collectors=self.params["Collectors"],
                projects=self.params["Projects"]
                )
        else:
            self.stream = BGPStream(
                from_time=self.startTime, until_time=self.endTime,
                collectors=self.params["Collectors"],
                record_type="updates",
                projects=self.params["Projects"]
                )
    else:
        quit("Error: can't start stream, interval not set")