Module BML.transform.base_transform

Expand source code
 #!/usr/bin/env python3

import os, time, multiprocessing
import ujson as json
from BML import utils
from BML.utils import ProcessingQueue
from BML.data.routes import dumpRoutes, getUpdatesInfos, parseUpdate, updateRoutes
from math import ceil

class BaseTransform(utils.BmlProcess):

    computeRoutes = True
    computeUpdates = True
    loadPrimingData = True
    fileExtension = ".json"
    params = {}

    def __init__(self, primingFile, dataFile, params, outFolder, logFiles):

        utils.BmlProcess.__init__(self, logFiles)
        
        self.params["Name"] = None
        self.params["Period"] = 5
        self.params["NbSnapshots"] = None
        self.params["Collectors"] = []
        self.params["IpVersion"] = [4,6]
        self.params["SkipIfExist"] = True
        self.params["Gzip"] = False
        
        self.setParams(params)
        
        self.primingFile = primingFile
        self.dataFile = dataFile
        self.outFolder = outFolder
        self.filePath = self.getFilePath(outFolder)
        self.transformedData = []

    def init(self):

        self.log("Init")
        
        if(self.primingFile[-3:]==".gz"):
            utils.printAndLog("Ungzip priming data", self.logFiles)
            utils.ungzipFile(self.primingFile)
            self.primingFile = self.primingFile[:-3]
        
        if(self.dataFile[-3:]==".gz"):
            utils.printAndLog("Ungzip updates", self.logFiles)
            utils.ungzipFile(self.dataFile)
            self.dataFile = self.dataFile[:-3]
        
        self.log("Load priming data")
        self.prior_routes = {}
        if(self.loadPrimingData):
            self.prior_routes = json.load(open(self.primingFile))
            for prefix in list(self.prior_routes.keys()):

                if(utils.ipVersion(prefix) in self.params["IpVersion"]):
        
                    if(len(self.params["Collectors"])>0):

                        for collector in list(self.prior_routes[prefix].keys()):
                            if(not collector in self.params["Collectors"]):
                                del self.prior_routes[prefix][collector]
                else:
                    del self.prior_routes[prefix]
        
        self.log("Get updates infos")
        self.header, self.startTime, self.endTime = getUpdatesInfos(self.dataFile)
        
        if(self.params["NbSnapshots"]==None):
            self.T = ceil((self.endTime-self.startTime)/60 /self.params["Period"]) + 1 # +1 because the first snapshot is at t=0
        else:
            self.T = self.params["NbSnapshots"]
        
        self.startProgress = self.startTime
        self.endProgress= self.endTime

    def computeSnapshot(self, t, routes, updatesParsed):
        self.transformedData.append(self.transforms(t, routes, updatesParsed))

    def compute(self):

        self.log("Compute")

        self.preProcess()

        lines = open(self.dataFile)

        routes = self.prior_routes
        updatesParsed = []

        t = 0
        i=0
        for line in lines:

            if(t == self.T):
                break
                
            if(i!=0):
                
                update = parseUpdate(line[:-1].split(','), self.header)

                if(self.startTime + t*self.params["Period"]*60 <= int(update["time"]) and t < self.T):
                    
                    self.computeSnapshot(t, routes, updatesParsed)
                    updatesParsed = []
                    t += 1

                prefix = update["fields"]["prefix"]

                if(utils.ipVersion(prefix) in self.params["IpVersion"]):
                    if(len(self.params["Collectors"])>0):
                        if(update["collector"] in self.params["Collectors"]):
                            if(self.computeRoutes):
                                routes = updateRoutes(routes, update)
                            if(self.computeUpdates):
                                updatesParsed.append(update)

                    else:
                        if(self.computeRoutes):
                            routes = updateRoutes(routes, update)
                        if(self.computeUpdates):
                            updatesParsed.append(update)

                self.printProgress(int(update["time"]))
            i+=1
                
    def getFileName(self):
            if(self.params["Name"] is None):
                return((self.__class__.__name__ + "_" + str(self.params["Period"])))
            else:
                return(self.params["Name"])
    
    def getFilePath(self, path):
        if(path is None):
            return(None)
        else:
            return(utils.mkdirPath(path) + self.getFileName() + self.fileExtension)

    def save(self):
        with open(self.filePath, 'w') as outfile:
            json.dump(self.transformedData, outfile)
        self.log("saved")

    def exists(self):
        return(os.path.exists(self.filePath) or os.path.exists(self.filePath+".gz"))

    def transforms(self, index, routes, updates):
        return(None)

    def preProcess(self):
        return(None)

    def postProcess(self, transformedData):
        return(transformedData)

    def execute(self):
        
        timeAtStart = time.time()
        
        self.log("###############")
        self.log("# Transform")
        self.log("###############")
        
        self.log("Folder: {}".format(self.outFolder))
        self.printParams()
        
        if(self.filePath!=None and self.exists() and self.params["SkipIfExist"]):
            self.log("Data exists, skipped")
        else:
            self.init()

            self.compute()

            self.transformedData = self.postProcess(self.transformedData)

            self.log("Computation time: " + utils.timeFormat(time.time()-timeAtStart))
            if(self.filePath!=None):
                self.save()
                if(self.params["Gzip"]):
                    utils.gzipFile(self.filePath, remove=True)
                    self.filePath += ".gz"
                self.log("Transformed data saved to: " + self.filePath)
                
            if(os.path.exists(self.dataFile+".gz")):
                os.remove(self.dataFile)
            if(os.path.exists(self.primingFile+".gz")):
                if(self.loadPrimingData):
                    os.remove(self.primingFile)

class BaseTransformParallelized(BaseTransform):

    def __init__(self, primingFile, dataFile, params, outFolder, logFiles):
        
        self.params["nbProcess"] = multiprocessing.cpu_count()
        
        BaseTransform.__init__(self, primingFile, dataFile, params, outFolder, logFiles)
        
    def preProcess(self):
        manager = multiprocessing.Manager()
        self.data = manager.dict()
        self.pq = ProcessingQueue(nbProcess=self.params["nbProcess"])
        return(None)

    def computeSnapshot(self, t, routes, updatesParsed):
        self.pq.waitUntilFree()
        self.data[t] = None
        self.pq.addProcess(target=self.runTransforms, args=(self.data, t, routes, updatesParsed))
        self.pq.runOnce()
        #print("computeSnapshot:",t)

    def compute(self):

        BaseTransform.compute(self)
        
        self.pq.join()

        for i in range(len(self.data)):
            if(i in self.data):
                self.transformedData.append(self.data[i])
                del self.data[i]

    def runTransforms(self, data, index, routes, updates):
        data[index] = self.transforms(index, routes, updates)
        del routes
        del updates
        return(None)
    
def transform(transformation, primingFile, dataFile, params=None, outFolder=None, logFiles=None):
        
    if(logFiles is None):
        logFiles = []
                       
    if(params is None):
        params = {}
    
    if(outFolder!=None):
        logFile = open(utils.mkdirPath(outFolder)+"transform.log",'w')
        logFiles.append(logFile)
    
    transform = transformation(primingFile, dataFile, params, outFolder, logFiles)
    transform.execute()

    if(outFolder!=None):
        logFile.close()

    return(transform.transformedData, transform.filePath)

Functions

def transform(transformation, primingFile, dataFile, params=None, outFolder=None, logFiles=None)
Expand source code
def transform(transformation, primingFile, dataFile, params=None, outFolder=None, logFiles=None):
        
    if(logFiles is None):
        logFiles = []
                       
    if(params is None):
        params = {}
    
    if(outFolder!=None):
        logFile = open(utils.mkdirPath(outFolder)+"transform.log",'w')
        logFiles.append(logFile)
    
    transform = transformation(primingFile, dataFile, params, outFolder, logFiles)
    transform.execute()

    if(outFolder!=None):
        logFile.close()

    return(transform.transformedData, transform.filePath)

Classes

class BaseTransform (primingFile, dataFile, params, outFolder, logFiles)
Expand source code
class BaseTransform(utils.BmlProcess):

    computeRoutes = True
    computeUpdates = True
    loadPrimingData = True
    fileExtension = ".json"
    params = {}

    def __init__(self, primingFile, dataFile, params, outFolder, logFiles):

        utils.BmlProcess.__init__(self, logFiles)
        
        self.params["Name"] = None
        self.params["Period"] = 5
        self.params["NbSnapshots"] = None
        self.params["Collectors"] = []
        self.params["IpVersion"] = [4,6]
        self.params["SkipIfExist"] = True
        self.params["Gzip"] = False
        
        self.setParams(params)
        
        self.primingFile = primingFile
        self.dataFile = dataFile
        self.outFolder = outFolder
        self.filePath = self.getFilePath(outFolder)
        self.transformedData = []

    def init(self):

        self.log("Init")
        
        if(self.primingFile[-3:]==".gz"):
            utils.printAndLog("Ungzip priming data", self.logFiles)
            utils.ungzipFile(self.primingFile)
            self.primingFile = self.primingFile[:-3]
        
        if(self.dataFile[-3:]==".gz"):
            utils.printAndLog("Ungzip updates", self.logFiles)
            utils.ungzipFile(self.dataFile)
            self.dataFile = self.dataFile[:-3]
        
        self.log("Load priming data")
        self.prior_routes = {}
        if(self.loadPrimingData):
            self.prior_routes = json.load(open(self.primingFile))
            for prefix in list(self.prior_routes.keys()):

                if(utils.ipVersion(prefix) in self.params["IpVersion"]):
        
                    if(len(self.params["Collectors"])>0):

                        for collector in list(self.prior_routes[prefix].keys()):
                            if(not collector in self.params["Collectors"]):
                                del self.prior_routes[prefix][collector]
                else:
                    del self.prior_routes[prefix]
        
        self.log("Get updates infos")
        self.header, self.startTime, self.endTime = getUpdatesInfos(self.dataFile)
        
        if(self.params["NbSnapshots"]==None):
            self.T = ceil((self.endTime-self.startTime)/60 /self.params["Period"]) + 1 # +1 because the first snapshot is at t=0
        else:
            self.T = self.params["NbSnapshots"]
        
        self.startProgress = self.startTime
        self.endProgress= self.endTime

    def computeSnapshot(self, t, routes, updatesParsed):
        self.transformedData.append(self.transforms(t, routes, updatesParsed))

    def compute(self):

        self.log("Compute")

        self.preProcess()

        lines = open(self.dataFile)

        routes = self.prior_routes
        updatesParsed = []

        t = 0
        i=0
        for line in lines:

            if(t == self.T):
                break
                
            if(i!=0):
                
                update = parseUpdate(line[:-1].split(','), self.header)

                if(self.startTime + t*self.params["Period"]*60 <= int(update["time"]) and t < self.T):
                    
                    self.computeSnapshot(t, routes, updatesParsed)
                    updatesParsed = []
                    t += 1

                prefix = update["fields"]["prefix"]

                if(utils.ipVersion(prefix) in self.params["IpVersion"]):
                    if(len(self.params["Collectors"])>0):
                        if(update["collector"] in self.params["Collectors"]):
                            if(self.computeRoutes):
                                routes = updateRoutes(routes, update)
                            if(self.computeUpdates):
                                updatesParsed.append(update)

                    else:
                        if(self.computeRoutes):
                            routes = updateRoutes(routes, update)
                        if(self.computeUpdates):
                            updatesParsed.append(update)

                self.printProgress(int(update["time"]))
            i+=1
                
    def getFileName(self):
            if(self.params["Name"] is None):
                return((self.__class__.__name__ + "_" + str(self.params["Period"])))
            else:
                return(self.params["Name"])
    
    def getFilePath(self, path):
        if(path is None):
            return(None)
        else:
            return(utils.mkdirPath(path) + self.getFileName() + self.fileExtension)

    def save(self):
        with open(self.filePath, 'w') as outfile:
            json.dump(self.transformedData, outfile)
        self.log("saved")

    def exists(self):
        return(os.path.exists(self.filePath) or os.path.exists(self.filePath+".gz"))

    def transforms(self, index, routes, updates):
        return(None)

    def preProcess(self):
        return(None)

    def postProcess(self, transformedData):
        return(transformedData)

    def execute(self):
        
        timeAtStart = time.time()
        
        self.log("###############")
        self.log("# Transform")
        self.log("###############")
        
        self.log("Folder: {}".format(self.outFolder))
        self.printParams()
        
        if(self.filePath!=None and self.exists() and self.params["SkipIfExist"]):
            self.log("Data exists, skipped")
        else:
            self.init()

            self.compute()

            self.transformedData = self.postProcess(self.transformedData)

            self.log("Computation time: " + utils.timeFormat(time.time()-timeAtStart))
            if(self.filePath!=None):
                self.save()
                if(self.params["Gzip"]):
                    utils.gzipFile(self.filePath, remove=True)
                    self.filePath += ".gz"
                self.log("Transformed data saved to: " + self.filePath)
                
            if(os.path.exists(self.dataFile+".gz")):
                os.remove(self.dataFile)
            if(os.path.exists(self.primingFile+".gz")):
                if(self.loadPrimingData):
                    os.remove(self.primingFile)

Ancestors

Subclasses

Class variables

var computeRoutes
var computeUpdates
var fileExtension
var loadPrimingData
var params

Methods

def compute(self)
Expand source code
def compute(self):

    self.log("Compute")

    self.preProcess()

    lines = open(self.dataFile)

    routes = self.prior_routes
    updatesParsed = []

    t = 0
    i=0
    for line in lines:

        if(t == self.T):
            break
            
        if(i!=0):
            
            update = parseUpdate(line[:-1].split(','), self.header)

            if(self.startTime + t*self.params["Period"]*60 <= int(update["time"]) and t < self.T):
                
                self.computeSnapshot(t, routes, updatesParsed)
                updatesParsed = []
                t += 1

            prefix = update["fields"]["prefix"]

            if(utils.ipVersion(prefix) in self.params["IpVersion"]):
                if(len(self.params["Collectors"])>0):
                    if(update["collector"] in self.params["Collectors"]):
                        if(self.computeRoutes):
                            routes = updateRoutes(routes, update)
                        if(self.computeUpdates):
                            updatesParsed.append(update)

                else:
                    if(self.computeRoutes):
                        routes = updateRoutes(routes, update)
                    if(self.computeUpdates):
                        updatesParsed.append(update)

            self.printProgress(int(update["time"]))
        i+=1
def computeSnapshot(self, t, routes, updatesParsed)
Expand source code
def computeSnapshot(self, t, routes, updatesParsed):
    self.transformedData.append(self.transforms(t, routes, updatesParsed))
def execute(self)
Expand source code
def execute(self):
    
    timeAtStart = time.time()
    
    self.log("###############")
    self.log("# Transform")
    self.log("###############")
    
    self.log("Folder: {}".format(self.outFolder))
    self.printParams()
    
    if(self.filePath!=None and self.exists() and self.params["SkipIfExist"]):
        self.log("Data exists, skipped")
    else:
        self.init()

        self.compute()

        self.transformedData = self.postProcess(self.transformedData)

        self.log("Computation time: " + utils.timeFormat(time.time()-timeAtStart))
        if(self.filePath!=None):
            self.save()
            if(self.params["Gzip"]):
                utils.gzipFile(self.filePath, remove=True)
                self.filePath += ".gz"
            self.log("Transformed data saved to: " + self.filePath)
            
        if(os.path.exists(self.dataFile+".gz")):
            os.remove(self.dataFile)
        if(os.path.exists(self.primingFile+".gz")):
            if(self.loadPrimingData):
                os.remove(self.primingFile)
def exists(self)
Expand source code
def exists(self):
    return(os.path.exists(self.filePath) or os.path.exists(self.filePath+".gz"))
def getFileName(self)
Expand source code
def getFileName(self):
        if(self.params["Name"] is None):
            return((self.__class__.__name__ + "_" + str(self.params["Period"])))
        else:
            return(self.params["Name"])
def getFilePath(self, path)
Expand source code
def getFilePath(self, path):
    if(path is None):
        return(None)
    else:
        return(utils.mkdirPath(path) + self.getFileName() + self.fileExtension)
def init(self)
Expand source code
def init(self):

    self.log("Init")
    
    if(self.primingFile[-3:]==".gz"):
        utils.printAndLog("Ungzip priming data", self.logFiles)
        utils.ungzipFile(self.primingFile)
        self.primingFile = self.primingFile[:-3]
    
    if(self.dataFile[-3:]==".gz"):
        utils.printAndLog("Ungzip updates", self.logFiles)
        utils.ungzipFile(self.dataFile)
        self.dataFile = self.dataFile[:-3]
    
    self.log("Load priming data")
    self.prior_routes = {}
    if(self.loadPrimingData):
        self.prior_routes = json.load(open(self.primingFile))
        for prefix in list(self.prior_routes.keys()):

            if(utils.ipVersion(prefix) in self.params["IpVersion"]):
    
                if(len(self.params["Collectors"])>0):

                    for collector in list(self.prior_routes[prefix].keys()):
                        if(not collector in self.params["Collectors"]):
                            del self.prior_routes[prefix][collector]
            else:
                del self.prior_routes[prefix]
    
    self.log("Get updates infos")
    self.header, self.startTime, self.endTime = getUpdatesInfos(self.dataFile)
    
    if(self.params["NbSnapshots"]==None):
        self.T = ceil((self.endTime-self.startTime)/60 /self.params["Period"]) + 1 # +1 because the first snapshot is at t=0
    else:
        self.T = self.params["NbSnapshots"]
    
    self.startProgress = self.startTime
    self.endProgress= self.endTime
def postProcess(self, transformedData)
Expand source code
def postProcess(self, transformedData):
    return(transformedData)
def preProcess(self)
Expand source code
def preProcess(self):
    return(None)
def save(self)
Expand source code
def save(self):
    with open(self.filePath, 'w') as outfile:
        json.dump(self.transformedData, outfile)
    self.log("saved")
def transforms(self, index, routes, updates)
Expand source code
def transforms(self, index, routes, updates):
    return(None)
class BaseTransformParallelized (primingFile, dataFile, params, outFolder, logFiles)
Expand source code
class BaseTransformParallelized(BaseTransform):

    def __init__(self, primingFile, dataFile, params, outFolder, logFiles):
        
        self.params["nbProcess"] = multiprocessing.cpu_count()
        
        BaseTransform.__init__(self, primingFile, dataFile, params, outFolder, logFiles)
        
    def preProcess(self):
        manager = multiprocessing.Manager()
        self.data = manager.dict()
        self.pq = ProcessingQueue(nbProcess=self.params["nbProcess"])
        return(None)

    def computeSnapshot(self, t, routes, updatesParsed):
        self.pq.waitUntilFree()
        self.data[t] = None
        self.pq.addProcess(target=self.runTransforms, args=(self.data, t, routes, updatesParsed))
        self.pq.runOnce()
        #print("computeSnapshot:",t)

    def compute(self):

        BaseTransform.compute(self)
        
        self.pq.join()

        for i in range(len(self.data)):
            if(i in self.data):
                self.transformedData.append(self.data[i])
                del self.data[i]

    def runTransforms(self, data, index, routes, updates):
        data[index] = self.transforms(index, routes, updates)
        del routes
        del updates
        return(None)

Ancestors

Subclasses

Methods

def compute(self)
Expand source code
def compute(self):

    BaseTransform.compute(self)
    
    self.pq.join()

    for i in range(len(self.data)):
        if(i in self.data):
            self.transformedData.append(self.data[i])
            del self.data[i]
def computeSnapshot(self, t, routes, updatesParsed)
Expand source code
def computeSnapshot(self, t, routes, updatesParsed):
    self.pq.waitUntilFree()
    self.data[t] = None
    self.pq.addProcess(target=self.runTransforms, args=(self.data, t, routes, updatesParsed))
    self.pq.runOnce()
    #print("computeSnapshot:",t)
def preProcess(self)
Expand source code
def preProcess(self):
    manager = multiprocessing.Manager()
    self.data = manager.dict()
    self.pq = ProcessingQueue(nbProcess=self.params["nbProcess"])
    return(None)
def runTransforms(self, data, index, routes, updates)
Expand source code
def runTransforms(self, data, index, routes, updates):
    data[index] = self.transforms(index, routes, updates)
    del routes
    del updates
    return(None)