Module BML.utils.utils
Summary
Expand source code
"""Summary
"""
import sys, os, time, gzip, shutil, io, json
from datetime import datetime
from .processing_queue import ProcessingQueue
def ipVersion(prefix):
if(":"in prefix):
return(6)
else:
return(4)
def getIndexList(l):
index = {}
for i in range(len(l)):
index[l[i]] = i
return(index)
def timeFormat(time):
"""Summary
Args:
time (TYPE): Description
Returns:
TYPE: Description
"""
time = int(time)
seconds = time%60
time = time//60
minutes = (time)%60
time = time//60
hours = time%60
timeStr = str(hours) +"h " + str(minutes) + "m " + str(seconds) + "s"
return(timeStr)
def mkdirPath(path):
"""
Create all folder in a path
Args:
path (str): a path
Returns:
str: the created path
"""
folderPath = ""
for folder in path.split(os.sep):
if(folder!=""):
folderPath += folder + os.sep
if(not os.path.isdir(folderPath)):
os.mkdir(folderPath)
return(folderPath)
def printAndLog(line, files, indent=" "):
"""Summary
Args:
line (TYPE): Description
files (TYPE): Description
indent (str, optional): Description
"""
nbFiles = len(files)-1
if(len(files)==0 or not files[0]=="LOG_ONLY"):
print(indent*nbFiles + line)
count = 0
for file in files:
if(not file=="LOG_ONLY" ):
file.write(indent*(nbFiles-count) + line + os.linesep)
file.flush()
count += 1
def printProgress(pObject, logFiles):
"""Summary
Args:
pObject (TYPE): Description
logFiles (TYPE): Description
"""
progressPrev = 0
while pObject.is_alive():
progress = pObject.getProgress()
if((progress<=10 and progress!=progressPrev) or progress-progressPrev>=5):
printAndLog("Progress: "+ str(progress) + "%", logFiles)
progressPrev = progress
time.sleep(1)
def getTimestamp(y,m,d,h,mn,s):
"""Summary
Args:
y (TYPE): Description
m (TYPE): Description
d (TYPE): Description
h (TYPE): Description
s (TYPE): Description
Returns:
TYPE: Description
"""
return(int((datetime(y,m,d,h,mn,s) - datetime(1970, 1, 1)).total_seconds()))
def gzipFile(filepath, remove=True):
"""Summary
Args:
filepath (TYPE): Description
remove (bool, optional): Description
Returns:
TYPE: Description
"""
filegzpath = filepath + ".gz"
with open(filepath, 'rb') as f_in:
with gzip.open(filegzpath, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
if(remove):
os.remove(filepath)
return(filegzpath)
def ungzipFile(filegzpath, remove=False):
"""Summary
Args:
filegzpath (TYPE): Description
remove (bool, optional): Description
Returns:
TYPE: Description
"""
filepath = filegzpath.split(".gz")[0]
with gzip.open(filegzpath, 'rb') as f_in:
with open(filepath, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
if(remove):
os.remove(filegzpath)
return(filepath)
def ungzipFileInMemory(filegzpath):
f_out = io.BytesIO()
with gzip.open(filegzpath, 'rb') as f_in:
shutil.copyfileobj(f_in, f_out)
f_in.close()
f_out.seek(0)
return(f_out)
class MissingArg(Exception):
"""
Exeception class for missing args
"""
pass
def printHelp(help):
"""
Print help for usage of a command
Args:
help (str): help text
"""
print("")
print(" Usage :")
print(" "+sys.argv[0]+help)
print("")
def getArg(arg, optional=False):
"""
Get an argument from the command line, (e.g. "-a" for the value after "-a" in the command line).
If the the arg is missing, return None if the is arg is optional otherwise a MissingArg exception is raised.
Args:
arg (str): arg to get
optional (bool): argument to get
Returns:
str: arg value
"""
if(arg in sys.argv):
if(len(sys.argv) > (sys.argv.index(arg)+1)):
return(sys.argv[sys.argv.index(arg)+1])
if(optional):
return(None)
raise(MissingArg("Missing argument : "+arg))
def getIntArg(arg, optional=False):
"""
Similar to "getArg" but return the integer value of the arg.
Args:
arg (str): arg to get
optional (bool): argument to get
Returns:
int: arg value
"""
return(int(getArg(arg, optional)))
def saveJobs(jobs, folder, parts=1, indent=None):
"""
Dump a jobs list to multiple parts.
Args:
jobs (list): a jobs list
folder (str): the output folder
parts (int, optional): the number of parts
"""
nbElem = (len(jobs)//parts) + 1
for i in range(parts):
start = i*nbElem
end = (i+1)*nbElem
if(end>len(jobs)):
end = len(jobs)
filepath = "{}{}.json".format(mkdirPath(folder), i)
with open(filepath, "w") as file:
json.dump(jobs[start:end], file, indent=indent)
file.close()
def runJobs(jobs, folder, nbProcess=1):
processingQueue = ProcessingQueue(nbProcess=nbProcess)
logFiles = []
folder = mkdirPath(folder)
logFile = open(folder+"jobs.log",'w')
logFiles.append(logFile)
printAndLog("################", logFiles)
printAndLog("# Run jobs list ", logFiles)
printAndLog("################", logFiles)
timeAtStart = time.time()
printAndLog("Number of processes to execute: {}".format(len(jobs)), logFiles)
printAndLog("Number of processes in parallel: {}".format(nbProcess), logFiles)
for j in jobs:
exec(j["includes"])
processingQueue.addProcess(target=locals()[j["target"]], args=j["args"], kwargs=j["kwargs"])
printAndLog("Processing queue: started", logFiles)
printAndLog("To monitor the execution run: watch -n 1 cat {}queue.log".format(folder), logFiles)
#processingQueue.run(logFilePath= "{}queue.log".format(folder))
from tqdm.auto import tqdm
bar = tqdm(total=len(jobs))
prev = 0
while(len(processingQueue.finish)<len(jobs)):
processingQueue.runOnce()
val = len(processingQueue.finish)
if(val!=prev):
bar.update(val-prev)
prev=val
processingQueue.runLog("{}queue.log".format(folder))
processingQueue.waitUntilFree()
processingQueue.run()
printAndLog("Processing queue: finish", logFiles)
printAndLog("Computation time: {}".format(timeFormat(time.time()-timeAtStart)), logFiles)
logFile.close()
def getTransform(folder, transform, name=None, period=2, gzip=False):
data = {}
excludedFolders = ['transform_jobs', 'collect_jobs']
filename = ""
if(name is None):
filename += transform + "_" + str(period)
else:
filename += name
filename += ".json"
if(gzip):
filename += ".gz"
for label in os.listdir(folder):
if(label not in excludedFolders):
data[label] = {}
for sample in os.listdir(folder+"/"+label):
if(sample not in ['.ipynb_checkpoints']):
filepath = folder+"/"+label + "/" + sample + "/transform/" + transform + "/" + filename
if(os.path.exists(filepath)):
data[label][sample] = json.load(open(filepath))
return(data)
Functions
def getArg(arg, optional=False)
-
Get an argument from the command line, (e.g. "-a" for the value after "-a" in the command line). If the the arg is missing, return None if the is arg is optional otherwise a MissingArg exception is raised.
Args
arg
:str
- arg to get
optional
:bool
- argument to get
Returns
str
- arg value
Expand source code
def getArg(arg, optional=False): """ Get an argument from the command line, (e.g. "-a" for the value after "-a" in the command line). If the the arg is missing, return None if the is arg is optional otherwise a MissingArg exception is raised. Args: arg (str): arg to get optional (bool): argument to get Returns: str: arg value """ if(arg in sys.argv): if(len(sys.argv) > (sys.argv.index(arg)+1)): return(sys.argv[sys.argv.index(arg)+1]) if(optional): return(None) raise(MissingArg("Missing argument : "+arg))
def getIndexList(l)
-
Expand source code
def getIndexList(l): index = {} for i in range(len(l)): index[l[i]] = i return(index)
def getIntArg(arg, optional=False)
-
Similar to "getArg" but return the integer value of the arg.
Args
arg
:str
- arg to get
optional
:bool
- argument to get
Returns
int
- arg value
Expand source code
def getIntArg(arg, optional=False): """ Similar to "getArg" but return the integer value of the arg. Args: arg (str): arg to get optional (bool): argument to get Returns: int: arg value """ return(int(getArg(arg, optional)))
def getTimestamp(y, m, d, h, mn, s)
-
Summary
Args
y
:TYPE
- Description
m
:TYPE
- Description
d
:TYPE
- Description
h
:TYPE
- Description
s
:TYPE
- Description
Returns
TYPE
- Description
Expand source code
def getTimestamp(y,m,d,h,mn,s): """Summary Args: y (TYPE): Description m (TYPE): Description d (TYPE): Description h (TYPE): Description s (TYPE): Description Returns: TYPE: Description """ return(int((datetime(y,m,d,h,mn,s) - datetime(1970, 1, 1)).total_seconds()))
def getTransform(folder, transform, name=None, period=2, gzip=False)
-
Expand source code
def getTransform(folder, transform, name=None, period=2, gzip=False): data = {} excludedFolders = ['transform_jobs', 'collect_jobs'] filename = "" if(name is None): filename += transform + "_" + str(period) else: filename += name filename += ".json" if(gzip): filename += ".gz" for label in os.listdir(folder): if(label not in excludedFolders): data[label] = {} for sample in os.listdir(folder+"/"+label): if(sample not in ['.ipynb_checkpoints']): filepath = folder+"/"+label + "/" + sample + "/transform/" + transform + "/" + filename if(os.path.exists(filepath)): data[label][sample] = json.load(open(filepath)) return(data)
def gzipFile(filepath, remove=True)
-
Summary
Args
filepath
:TYPE
- Description
remove
:bool
, optional- Description
Returns
TYPE
- Description
Expand source code
def gzipFile(filepath, remove=True): """Summary Args: filepath (TYPE): Description remove (bool, optional): Description Returns: TYPE: Description """ filegzpath = filepath + ".gz" with open(filepath, 'rb') as f_in: with gzip.open(filegzpath, 'wb') as f_out: shutil.copyfileobj(f_in, f_out) if(remove): os.remove(filepath) return(filegzpath)
def ipVersion(prefix)
-
Expand source code
def ipVersion(prefix): if(":"in prefix): return(6) else: return(4)
def mkdirPath(path)
-
Create all folder in a path
Args
path
:str
- a path
Returns
str
- the created path
Expand source code
def mkdirPath(path): """ Create all folder in a path Args: path (str): a path Returns: str: the created path """ folderPath = "" for folder in path.split(os.sep): if(folder!=""): folderPath += folder + os.sep if(not os.path.isdir(folderPath)): os.mkdir(folderPath) return(folderPath)
def printAndLog(line, files, indent=' ')
-
Summary
Args
line
:TYPE
- Description
files
:TYPE
- Description
indent
:str
, optional- Description
Expand source code
def printAndLog(line, files, indent=" "): """Summary Args: line (TYPE): Description files (TYPE): Description indent (str, optional): Description """ nbFiles = len(files)-1 if(len(files)==0 or not files[0]=="LOG_ONLY"): print(indent*nbFiles + line) count = 0 for file in files: if(not file=="LOG_ONLY" ): file.write(indent*(nbFiles-count) + line + os.linesep) file.flush() count += 1
def printHelp(help)
-
Print help for usage of a command
Args
help
:str
- help text
Expand source code
def printHelp(help): """ Print help for usage of a command Args: help (str): help text """ print("") print(" Usage :") print(" "+sys.argv[0]+help) print("")
def printProgress(pObject, logFiles)
-
Summary
Args
pObject
:TYPE
- Description
logFiles
:TYPE
- Description
Expand source code
def printProgress(pObject, logFiles): """Summary Args: pObject (TYPE): Description logFiles (TYPE): Description """ progressPrev = 0 while pObject.is_alive(): progress = pObject.getProgress() if((progress<=10 and progress!=progressPrev) or progress-progressPrev>=5): printAndLog("Progress: "+ str(progress) + "%", logFiles) progressPrev = progress time.sleep(1)
def runJobs(jobs, folder, nbProcess=1)
-
Expand source code
def runJobs(jobs, folder, nbProcess=1): processingQueue = ProcessingQueue(nbProcess=nbProcess) logFiles = [] folder = mkdirPath(folder) logFile = open(folder+"jobs.log",'w') logFiles.append(logFile) printAndLog("################", logFiles) printAndLog("# Run jobs list ", logFiles) printAndLog("################", logFiles) timeAtStart = time.time() printAndLog("Number of processes to execute: {}".format(len(jobs)), logFiles) printAndLog("Number of processes in parallel: {}".format(nbProcess), logFiles) for j in jobs: exec(j["includes"]) processingQueue.addProcess(target=locals()[j["target"]], args=j["args"], kwargs=j["kwargs"]) printAndLog("Processing queue: started", logFiles) printAndLog("To monitor the execution run: watch -n 1 cat {}queue.log".format(folder), logFiles) #processingQueue.run(logFilePath= "{}queue.log".format(folder)) from tqdm.auto import tqdm bar = tqdm(total=len(jobs)) prev = 0 while(len(processingQueue.finish)<len(jobs)): processingQueue.runOnce() val = len(processingQueue.finish) if(val!=prev): bar.update(val-prev) prev=val processingQueue.runLog("{}queue.log".format(folder)) processingQueue.waitUntilFree() processingQueue.run() printAndLog("Processing queue: finish", logFiles) printAndLog("Computation time: {}".format(timeFormat(time.time()-timeAtStart)), logFiles) logFile.close()
def saveJobs(jobs, folder, parts=1, indent=None)
-
Dump a jobs list to multiple parts.
Args
jobs
:list
- a jobs list
folder
:str
- the output folder
parts
:int
, optional- the number of parts
Expand source code
def saveJobs(jobs, folder, parts=1, indent=None): """ Dump a jobs list to multiple parts. Args: jobs (list): a jobs list folder (str): the output folder parts (int, optional): the number of parts """ nbElem = (len(jobs)//parts) + 1 for i in range(parts): start = i*nbElem end = (i+1)*nbElem if(end>len(jobs)): end = len(jobs) filepath = "{}{}.json".format(mkdirPath(folder), i) with open(filepath, "w") as file: json.dump(jobs[start:end], file, indent=indent) file.close()
def timeFormat(time)
-
Summary
Args
time
:TYPE
- Description
Returns
TYPE
- Description
Expand source code
def timeFormat(time): """Summary Args: time (TYPE): Description Returns: TYPE: Description """ time = int(time) seconds = time%60 time = time//60 minutes = (time)%60 time = time//60 hours = time%60 timeStr = str(hours) +"h " + str(minutes) + "m " + str(seconds) + "s" return(timeStr)
def ungzipFile(filegzpath, remove=False)
-
Summary
Args
filegzpath
:TYPE
- Description
remove
:bool
, optional- Description
Returns
TYPE
- Description
Expand source code
def ungzipFile(filegzpath, remove=False): """Summary Args: filegzpath (TYPE): Description remove (bool, optional): Description Returns: TYPE: Description """ filepath = filegzpath.split(".gz")[0] with gzip.open(filegzpath, 'rb') as f_in: with open(filepath, 'wb') as f_out: shutil.copyfileobj(f_in, f_out) if(remove): os.remove(filegzpath) return(filepath)
def ungzipFileInMemory(filegzpath)
-
Expand source code
def ungzipFileInMemory(filegzpath): f_out = io.BytesIO() with gzip.open(filegzpath, 'rb') as f_in: shutil.copyfileobj(f_in, f_out) f_in.close() f_out.seek(0) return(f_out)
Classes
class MissingArg (*args, **kwargs)
-
Exeception class for missing args
Expand source code
class MissingArg(Exception): """ Exeception class for missing args """ pass
Ancestors
- builtins.Exception
- builtins.BaseException