Module BML.utils.processing_queue
Expand source code
import multiprocessing, time
class ProcessingQueue(object):
"""Summary
Attributes:
finish (list): Description
nbProcess (TYPE): Description
processes (list): Description
queue (list): Description
running (list): Description
"""
def __init__(self, nbProcess=16):
"""Summary
Args:
nbProcess (int, optional): Description
"""
self.queue = []
self.running = []
self.finish = []
self.processes = []
self.nbProcess = nbProcess
for i in range(self.nbProcess):
self.processes.append(None)
self.running.append(None)
def stop(self):
for i in range(self.nbProcess):
self.processes[i].terminate()
def runOnce(self):
processesAlive = False
for i in range(self.nbProcess):
if(not self.processes[i] is None and self.processes[i].is_alive()):
processesAlive = True
else:
if(self.running[i]!=None):
self.finish.append(self.running[i])
self.running[i] = None
#if(self.processes[i].exitcode!=0):
# sys.exit("Subprocess terminated with exit code %i, execution stoped" % (self.processes[i].exitcode))
if(len(self.queue)>0):
(target, args, kwargs) = self.queue[0]
self.processes[i] = multiprocessing.Process(target=target, args=args, kwargs=kwargs)
self.processes[i].start()
processesAlive = True
self.running[i] = (target, args, kwargs)
self.queue.pop(0)
return(processesAlive)
def waitUntilFree(self):
while True:
for i in range(self.nbProcess):
if(self.processes[i] is None or not self.processes[i].is_alive()):
return
time.sleep(1)
def join(self):
for i in range(self.nbProcess):
if(not self.processes[i] is None):
self.processes[i].join()
def run(self, logFilePath=""):
"""Summary
Args:
logFilePath (str, optional): Description
"""
processesAlive = False
try:
while len(self.queue)>0 or processesAlive:
processesAlive = self.runOnce()
self.runLog(logFilePath)
time.sleep(1)
for i in range(self.nbProcess):
if(not self.processes[i] is None):
self.processes[i].join()
self.processes[i].close()
except Exception as e:
for i in range(self.nbProcess):
if(not self.processes[i] is None):
self.processes[i].terminate()
raise(e)
def addProcess(self, target=None, args=(), kwargs={}):
"""Summary
Args:
target (None, optional): Description
args (tuple, optional): Description
kwargs (dict, optional): Description
"""
self.queue.append((target, args, kwargs))
def formatLog(self, listP):
"""Summary
Args:
listP (TYPE): Description
Returns:
TYPE: Description
"""
i_space = len(str(len(listP)))
t_space = len("Function")
a_space = len("Args")
kw_space = len("Kwargs")
log = ""
for i in range(len(listP)):
if(listP[i]!=None):
(target, args, kwargs) = listP[i]
t_space = len(str(target.__name__)) if len(str(target.__name__))>t_space else t_space
a_space = len(str(args)) if len(str(args))>a_space else a_space
kw_space = len(str(kwargs)) if len(str(kwargs))>kw_space else kw_space
vline = ("="*(t_space+a_space+kw_space+13)) + "\n"
log+= vline
log += ("|{:<"+str(i_space)+"s}| {:"+str(t_space)+"s} | {:"+str(a_space)+"s} | {:"+str(kw_space)+"s} | \n").format("#","Function","Args","Kwargs")
descr = "|{:<"+str(i_space)+"d}| {:"+str(t_space)+"s} | {:"+str(a_space)+"s} | {:"+str(kw_space)+"s} | \n"
log+= vline
for i in range(len(listP)):
if(listP[i]!=None):
(target, args, kwargs) = listP[i]
log += descr.format(i, target.__name__, str(args), str(kwargs))
else:
log += descr.format(i, "Empty", "", "")
log += vline
return(log)
def runLog(self, logFilePath):
"""Summary
Args:
logFilePath (TYPE): Description
"""
if(not logFilePath==""):
log = "#######################\n"
log += "# Queue : Running \n"
log += "#######################\n"
log += self.formatLog(self.running) + "\n"
log += "#######################\n"
log += "# Queue : Waiting \n"
log += "#######################\n"
log += self.formatLog(self.queue) + "\n"
log += "#######################\n"
log += "# Queue : Finish \n"
log += "#######################\n"
log += self.formatLog(self.finish) + "\n"
with open(logFilePath, "w") as file:
file.write(log)
file.close()
Classes
class ProcessingQueue (nbProcess=16)
-
Summary
Attributes
finish
:list
- Description
nbProcess
:TYPE
- Description
processes
:list
- Description
queue
:list
- Description
running
:list
- Description
Summary
Args
nbProcess
:int
, optional- Description
Expand source code
class ProcessingQueue(object): """Summary Attributes: finish (list): Description nbProcess (TYPE): Description processes (list): Description queue (list): Description running (list): Description """ def __init__(self, nbProcess=16): """Summary Args: nbProcess (int, optional): Description """ self.queue = [] self.running = [] self.finish = [] self.processes = [] self.nbProcess = nbProcess for i in range(self.nbProcess): self.processes.append(None) self.running.append(None) def stop(self): for i in range(self.nbProcess): self.processes[i].terminate() def runOnce(self): processesAlive = False for i in range(self.nbProcess): if(not self.processes[i] is None and self.processes[i].is_alive()): processesAlive = True else: if(self.running[i]!=None): self.finish.append(self.running[i]) self.running[i] = None #if(self.processes[i].exitcode!=0): # sys.exit("Subprocess terminated with exit code %i, execution stoped" % (self.processes[i].exitcode)) if(len(self.queue)>0): (target, args, kwargs) = self.queue[0] self.processes[i] = multiprocessing.Process(target=target, args=args, kwargs=kwargs) self.processes[i].start() processesAlive = True self.running[i] = (target, args, kwargs) self.queue.pop(0) return(processesAlive) def waitUntilFree(self): while True: for i in range(self.nbProcess): if(self.processes[i] is None or not self.processes[i].is_alive()): return time.sleep(1) def join(self): for i in range(self.nbProcess): if(not self.processes[i] is None): self.processes[i].join() def run(self, logFilePath=""): """Summary Args: logFilePath (str, optional): Description """ processesAlive = False try: while len(self.queue)>0 or processesAlive: processesAlive = self.runOnce() self.runLog(logFilePath) time.sleep(1) for i in range(self.nbProcess): if(not self.processes[i] is None): self.processes[i].join() self.processes[i].close() except Exception as e: for i in range(self.nbProcess): if(not self.processes[i] is None): self.processes[i].terminate() raise(e) def addProcess(self, target=None, args=(), kwargs={}): """Summary Args: target (None, optional): Description args (tuple, optional): Description kwargs (dict, optional): Description """ self.queue.append((target, args, kwargs)) def formatLog(self, listP): """Summary Args: listP (TYPE): Description Returns: TYPE: Description """ i_space = len(str(len(listP))) t_space = len("Function") a_space = len("Args") kw_space = len("Kwargs") log = "" for i in range(len(listP)): if(listP[i]!=None): (target, args, kwargs) = listP[i] t_space = len(str(target.__name__)) if len(str(target.__name__))>t_space else t_space a_space = len(str(args)) if len(str(args))>a_space else a_space kw_space = len(str(kwargs)) if len(str(kwargs))>kw_space else kw_space vline = ("="*(t_space+a_space+kw_space+13)) + "\n" log+= vline log += ("|{:<"+str(i_space)+"s}| {:"+str(t_space)+"s} | {:"+str(a_space)+"s} | {:"+str(kw_space)+"s} | \n").format("#","Function","Args","Kwargs") descr = "|{:<"+str(i_space)+"d}| {:"+str(t_space)+"s} | {:"+str(a_space)+"s} | {:"+str(kw_space)+"s} | \n" log+= vline for i in range(len(listP)): if(listP[i]!=None): (target, args, kwargs) = listP[i] log += descr.format(i, target.__name__, str(args), str(kwargs)) else: log += descr.format(i, "Empty", "", "") log += vline return(log) def runLog(self, logFilePath): """Summary Args: logFilePath (TYPE): Description """ if(not logFilePath==""): log = "#######################\n" log += "# Queue : Running \n" log += "#######################\n" log += self.formatLog(self.running) + "\n" log += "#######################\n" log += "# Queue : Waiting \n" log += "#######################\n" log += self.formatLog(self.queue) + "\n" log += "#######################\n" log += "# Queue : Finish \n" log += "#######################\n" log += self.formatLog(self.finish) + "\n" with open(logFilePath, "w") as file: file.write(log) file.close()
Methods
def addProcess(self, target=None, args=(), kwargs={})
-
Summary
Args
target
:None
, optional- Description
args
:tuple
, optional- Description
kwargs
:dict
, optional- Description
Expand source code
def addProcess(self, target=None, args=(), kwargs={}): """Summary Args: target (None, optional): Description args (tuple, optional): Description kwargs (dict, optional): Description """ self.queue.append((target, args, kwargs))
def formatLog(self, listP)
-
Summary
Args
listP
:TYPE
- Description
Returns
TYPE
- Description
Expand source code
def formatLog(self, listP): """Summary Args: listP (TYPE): Description Returns: TYPE: Description """ i_space = len(str(len(listP))) t_space = len("Function") a_space = len("Args") kw_space = len("Kwargs") log = "" for i in range(len(listP)): if(listP[i]!=None): (target, args, kwargs) = listP[i] t_space = len(str(target.__name__)) if len(str(target.__name__))>t_space else t_space a_space = len(str(args)) if len(str(args))>a_space else a_space kw_space = len(str(kwargs)) if len(str(kwargs))>kw_space else kw_space vline = ("="*(t_space+a_space+kw_space+13)) + "\n" log+= vline log += ("|{:<"+str(i_space)+"s}| {:"+str(t_space)+"s} | {:"+str(a_space)+"s} | {:"+str(kw_space)+"s} | \n").format("#","Function","Args","Kwargs") descr = "|{:<"+str(i_space)+"d}| {:"+str(t_space)+"s} | {:"+str(a_space)+"s} | {:"+str(kw_space)+"s} | \n" log+= vline for i in range(len(listP)): if(listP[i]!=None): (target, args, kwargs) = listP[i] log += descr.format(i, target.__name__, str(args), str(kwargs)) else: log += descr.format(i, "Empty", "", "") log += vline return(log)
def join(self)
-
Expand source code
def join(self): for i in range(self.nbProcess): if(not self.processes[i] is None): self.processes[i].join()
def run(self, logFilePath='')
-
Summary
Args
logFilePath
:str
, optional- Description
Expand source code
def run(self, logFilePath=""): """Summary Args: logFilePath (str, optional): Description """ processesAlive = False try: while len(self.queue)>0 or processesAlive: processesAlive = self.runOnce() self.runLog(logFilePath) time.sleep(1) for i in range(self.nbProcess): if(not self.processes[i] is None): self.processes[i].join() self.processes[i].close() except Exception as e: for i in range(self.nbProcess): if(not self.processes[i] is None): self.processes[i].terminate() raise(e)
def runLog(self, logFilePath)
-
Summary
Args
logFilePath
:TYPE
- Description
Expand source code
def runLog(self, logFilePath): """Summary Args: logFilePath (TYPE): Description """ if(not logFilePath==""): log = "#######################\n" log += "# Queue : Running \n" log += "#######################\n" log += self.formatLog(self.running) + "\n" log += "#######################\n" log += "# Queue : Waiting \n" log += "#######################\n" log += self.formatLog(self.queue) + "\n" log += "#######################\n" log += "# Queue : Finish \n" log += "#######################\n" log += self.formatLog(self.finish) + "\n" with open(logFilePath, "w") as file: file.write(log) file.close()
def runOnce(self)
-
Expand source code
def runOnce(self): processesAlive = False for i in range(self.nbProcess): if(not self.processes[i] is None and self.processes[i].is_alive()): processesAlive = True else: if(self.running[i]!=None): self.finish.append(self.running[i]) self.running[i] = None #if(self.processes[i].exitcode!=0): # sys.exit("Subprocess terminated with exit code %i, execution stoped" % (self.processes[i].exitcode)) if(len(self.queue)>0): (target, args, kwargs) = self.queue[0] self.processes[i] = multiprocessing.Process(target=target, args=args, kwargs=kwargs) self.processes[i].start() processesAlive = True self.running[i] = (target, args, kwargs) self.queue.pop(0) return(processesAlive)
def stop(self)
-
Expand source code
def stop(self): for i in range(self.nbProcess): self.processes[i].terminate()
def waitUntilFree(self)
-
Expand source code
def waitUntilFree(self): while True: for i in range(self.nbProcess): if(self.processes[i] is None or not self.processes[i].is_alive()): return time.sleep(1)