Module BML.data.dataset
Expand source code
#!/usr/bin/env python3
import sys, os, random
import json
from BML import utils
from BML.data.updates import dumpUpdates
from BML.data.routes import dumpRoutes
def collectData(label, start_time, end_time, name, folder, params, logFiles=[]):
e_folder = utils.mkdirPath(folder + label + os.sep + str(name))
logFiles.append(open(e_folder + "log_collect_sample.log", "w"))
utils.printAndLog("##################", logFiles)
utils.printAndLog("# Collect sample", logFiles)
utils.printAndLog("#################", logFiles)
utils.printAndLog("Name: {}".format(name), logFiles)
utils.printAndLog("Label: {}".format(label), logFiles)
utils.printAndLog("Start time: {}".format(start_time), logFiles)
utils.printAndLog("End time: {}".format(end_time), logFiles)
utils.printAndLog("**************************", logFiles)
utils.printAndLog("* Priming data collection", logFiles)
utils.printAndLog("**************************", logFiles)
if(os.path.exists(e_folder + "priming_data" + os.sep +"routes.json.gz") and params["SkipIfExist"]):
utils.printAndLog("Data exists, skipped", logFiles)
else:
# Priming data collection
paramsUpdate = {
"Projects": params["Projects"],
"Collectors": params["Collectors"],
"IpVersion": params["IpVersion"],
"UseRibs": params["UseRibsPriming"],
}
primingUpdatesFile = dumpUpdates(start_time-params["PrimingPeriod"]*60, start_time, e_folder + "priming_data", params=paramsUpdate, logFiles=logFiles[:])
_ , primingDumpFile = dumpRoutes(primingUpdatesFile, routes={}, outFolder=e_folder + "priming_data", logFiles=logFiles[:])
utils.gzipFile(primingDumpFile, remove=True)
os.remove(primingUpdatesFile)
utils.printAndLog("********************", logFiles)
utils.printAndLog("* Data collection", logFiles)
utils.printAndLog("********************", logFiles)
if(os.path.exists(e_folder + "data" + os.sep +"updates.csv.gz") and params["SkipIfExist"]):
utils.printAndLog("Data exists, skipped", logFiles)
else:
# Data collection
paramsUpdate = {
"Projects": params["Projects"],
"Collectors": params["Collectors"],
"IpVersion": params["IpVersion"],
"UseRibs": params["UseRibsData"],
}
updatesFilePath = dumpUpdates(start_time, end_time, e_folder + "data", params=paramsUpdate, logFiles=logFiles[:])
utils.gzipFile(updatesFilePath, remove=True)
class Dataset():
def __init__(self, folder):
self.params = {
"Projects" : ['ris','routeviews'],
"Collectors" : [],
"IpVersion" : [4,6],
"PrimingPeriod" : 1*60,
"UseRibsPriming" : False,
"UseRibsData" : False,
"SkipIfExist" : True
}
self.folder= folder
def setParams(self, params):
for k,v in params.items():
if(k in self.params):
self.params[k] = v
else:
sys.exit("Unrecognized parameter:"+k)
def setPeriodsOfInterests(self, periodsOfInterests):
self.periodsOfInterests = periodsOfInterests
def getJobs(self):
jobs = []
for period in self.periodsOfInterests:
params = self.params.copy()
if("params" in period):
for k,v in period["params"].items():
if(k in params):
params[k] = v
else:
sys.exit("Unrecognized parameter:"+k)
j = {
'includes' : "from BML.data.dataset import collectData",
'target': "collectData",
'args': (period["label"], period["start_time"], period["end_time"], period["name"], self.folder, params),
'kwargs': {'logFiles':["LOG_ONLY"]}
}
jobs.append(j)
random.shuffle(jobs)
return(jobs)
Functions
def collectData(label, start_time, end_time, name, folder, params, logFiles=[])
-
Expand source code
def collectData(label, start_time, end_time, name, folder, params, logFiles=[]): e_folder = utils.mkdirPath(folder + label + os.sep + str(name)) logFiles.append(open(e_folder + "log_collect_sample.log", "w")) utils.printAndLog("##################", logFiles) utils.printAndLog("# Collect sample", logFiles) utils.printAndLog("#################", logFiles) utils.printAndLog("Name: {}".format(name), logFiles) utils.printAndLog("Label: {}".format(label), logFiles) utils.printAndLog("Start time: {}".format(start_time), logFiles) utils.printAndLog("End time: {}".format(end_time), logFiles) utils.printAndLog("**************************", logFiles) utils.printAndLog("* Priming data collection", logFiles) utils.printAndLog("**************************", logFiles) if(os.path.exists(e_folder + "priming_data" + os.sep +"routes.json.gz") and params["SkipIfExist"]): utils.printAndLog("Data exists, skipped", logFiles) else: # Priming data collection paramsUpdate = { "Projects": params["Projects"], "Collectors": params["Collectors"], "IpVersion": params["IpVersion"], "UseRibs": params["UseRibsPriming"], } primingUpdatesFile = dumpUpdates(start_time-params["PrimingPeriod"]*60, start_time, e_folder + "priming_data", params=paramsUpdate, logFiles=logFiles[:]) _ , primingDumpFile = dumpRoutes(primingUpdatesFile, routes={}, outFolder=e_folder + "priming_data", logFiles=logFiles[:]) utils.gzipFile(primingDumpFile, remove=True) os.remove(primingUpdatesFile) utils.printAndLog("********************", logFiles) utils.printAndLog("* Data collection", logFiles) utils.printAndLog("********************", logFiles) if(os.path.exists(e_folder + "data" + os.sep +"updates.csv.gz") and params["SkipIfExist"]): utils.printAndLog("Data exists, skipped", logFiles) else: # Data collection paramsUpdate = { "Projects": params["Projects"], "Collectors": params["Collectors"], "IpVersion": params["IpVersion"], "UseRibs": params["UseRibsData"], } updatesFilePath = dumpUpdates(start_time, end_time, e_folder + "data", params=paramsUpdate, logFiles=logFiles[:]) utils.gzipFile(updatesFilePath, remove=True)
Classes
class Dataset (folder)
-
Expand source code
class Dataset(): def __init__(self, folder): self.params = { "Projects" : ['ris','routeviews'], "Collectors" : [], "IpVersion" : [4,6], "PrimingPeriod" : 1*60, "UseRibsPriming" : False, "UseRibsData" : False, "SkipIfExist" : True } self.folder= folder def setParams(self, params): for k,v in params.items(): if(k in self.params): self.params[k] = v else: sys.exit("Unrecognized parameter:"+k) def setPeriodsOfInterests(self, periodsOfInterests): self.periodsOfInterests = periodsOfInterests def getJobs(self): jobs = [] for period in self.periodsOfInterests: params = self.params.copy() if("params" in period): for k,v in period["params"].items(): if(k in params): params[k] = v else: sys.exit("Unrecognized parameter:"+k) j = { 'includes' : "from BML.data.dataset import collectData", 'target': "collectData", 'args': (period["label"], period["start_time"], period["end_time"], period["name"], self.folder, params), 'kwargs': {'logFiles':["LOG_ONLY"]} } jobs.append(j) random.shuffle(jobs) return(jobs)
Methods
def getJobs(self)
-
Expand source code
def getJobs(self): jobs = [] for period in self.periodsOfInterests: params = self.params.copy() if("params" in period): for k,v in period["params"].items(): if(k in params): params[k] = v else: sys.exit("Unrecognized parameter:"+k) j = { 'includes' : "from BML.data.dataset import collectData", 'target': "collectData", 'args': (period["label"], period["start_time"], period["end_time"], period["name"], self.folder, params), 'kwargs': {'logFiles':["LOG_ONLY"]} } jobs.append(j) random.shuffle(jobs) return(jobs)
def setParams(self, params)
-
Expand source code
def setParams(self, params): for k,v in params.items(): if(k in self.params): self.params[k] = v else: sys.exit("Unrecognized parameter:"+k)
def setPeriodsOfInterests(self, periodsOfInterests)
-
Expand source code
def setPeriodsOfInterests(self, periodsOfInterests): self.periodsOfInterests = periodsOfInterests