import math
import os
import time
import psutil
from multiprocess import Manager, Process, Value
[docs]
class Multi:
"""
Class that harnesses the power of "multiprocess" and wraps it, for ease of use, as a single callable line.
It is mainly aimed at very many large tasks that could be run on supercomputers with an ridiculous amount of RAM.
"""
def __init__(self, args, function):
"""
Constructor of the multiprocessing tool
Args:
args (list) The list of value to run. One process will be born per 1st level member
function (method) The method that needs to run as multiple instance
The constructor needs two thing, a method, and a list of lists.
Each sub-list of the list will be the list of arguments given to the child process running the given method
***The methods used must only take a single list as input; and then, deal with it's division into other variables***
ex.: Run 3 processes of g()
def g(args):
n1=args[0]
n2=args[1]
return n1+n2
list = [ [0,1], [1,1] ,[2,1] ]
mps = Multi( list, g ).processingSmallData() #note that "g" is not written "g()" we want to pass it, not run it
print( mps )
>>[1,2,3]
Variables:
args Manager().list() A list accessible from child processes; for the arguments
processes Manager().list() A list accessible from child processes; list of all the process ID started in this object
resultList Manager().list() A list accessible from child processes; for the return values
function method The method that needs to run as multiple instances
processlist list All of the child processes
mem1 Value(f) A value accessible from child processes; The memory needed to run a single child
memA Value(f) A value accessible from child processes; The current available memory
memT Value(f) A value accessible from child processes; The total amount of available memory at the creation of the object
nbAllowed int The amount of theoretical child processes that can fit in FREE memory
maxAllowed int The amount of theoretical child processes that can fit in TOTAL memory
tasks Value(i) A value accessible from child processes; The amount of child processes currently running
started Value(i) A value accessible from child processes; Number of started processes
finished Value(i) A value accessible from child processes; Number of finished processes
amount int The amount of processes that the object will start
startTime float the present time
timeForOne Value(f) A value accessible from child processes; Time it takes for the first iteration to complete
rewrite dict ints used the the terminal update rewriting process; amount of lines rewinded if large or small
"""
self.args = Manager().list(args)
self.processes = Manager().list()
self.resultList = Manager().list()
self.function = function
self.processlist = []
self.mem1 = Value("f", 1)
self.memA = Value("f", 1)
self.memT = Value("f", psutil.virtual_memory()[1])
self.nbAllowed = Value("i", 1)
self.maxAllowed = Value("i", 1)
self.tasks = Value("i", 0)
self.started = Value("i", 0)
self.finished = Value("i", 0)
self.amount = len(args)
self.startTime = 0
self.timeForOne = Value("f", 0)
self.rewrite = {True: 11, False: 6}
[docs]
def executeOnce(self, arg):
"""
Executes a method as a single process.
This method runs a specified function with the given argument
and stores the result in `self.resultList`.
Args:
arg (list): The list of arguments given to the method.
Returns:
None: The return value of the executed method is
passed to `self.resultList`.
"""
self.tasks.value += 1
self.started.value += 1
self.processes.append(os.getpid())
# execution of the passed method
self.resultList.append(self.function(arg))
self.tasks.value -= 1
self.finished.value += 1
[docs]
def processingLargeData(self):
"""
Method for executing multiprocessing on tasks that require a large amount
of individual memory. It first runs a single process and then starts as many
child processes as the available RAM permits, launching new ones as RAM is freed.
Variables:
p : Process
Represents a single child process.
Returns:
list: The multiprocess-friendly list that is updated by each child process.
Errors:
If another application reduces the available RAM mid-execution,
the multiprocessing framework outputs "Killed" and terminates the child process.
"""
print(
" Starting multiprocessing, this might take some time\n",
" The first process is ran alone for calibration purposes",
)
self.startTime = time.time()
# Multiprocess runs once alone
p = Process(target=self.executeOnce, args=([self.args.pop(0)]))
p.start()
# "\033[B" acts as a reverse \n, return the pointer one line up
print("\033[B" * self.rewrite[False], flush=True)
# adds the main thread on the list of processes to keep an eye on
self.processes.append(os.getpid())
# ask the buttler to start complimentary processes
alfred = Process(target=self.buttler, args=([True]))
alfred.start()
# give it a second to open the process, so it doesn't skip the while()
time.sleep(1)
while self.tasks.value > 0:
# wait for the calibration process to finish
time.sleep(0.1)
self.timeForOne.value = time.time() - self.startTime
while len(self.args) != 0:
if (self.tasks.value < self.maxAllowed.value) & (self.nbAllowed.value >= 1):
# Multiprocess runs the rest of the processes
p = Process(target=self.executeOnce, args=([self.args.pop(0)]))
self.processlist.append(p)
p.start()
time.sleep(0.1)
else:
time.sleep(0.1)
for p in self.processlist:
p.join()
time.sleep(1) # give it a second to close the processes, do not remove
alfred.terminate() # sorry Alfred, we have to let you go
# These weird prints need to be done because there is no telling where
# the terminal is at termination time, better make sure it's clean
print("\r", end="")
# "\033[B" acts as a reverse \n, return the pointer one line up
print("\033[B" * self.rewrite[True], flush=True)
# if a process was killed or didn't finished; it will be know here
print("Completed with ", str(self.amount - self.finished.value), " errors\n")
return self.resultList
[docs]
def buttler(self, memBloc):
"""
Ran as a child process, the buttler will
constantly run other methods forever.
In this case, it:
- Updates the memory capacity.
- Prints updates to the terminal
It exists so not to bottleneck the main thread.
Uses timers to execute it's methods because time.sleep()
it processor hungry if constantly called
"""
terminal = time.time()
mem = time.time()
while True:
now = time.time()
if now - terminal > 0.1: # has 0.1 second passed?
self.terminalUpdate(memBloc)
terminal = now
if now - mem > 1 & memBloc: # has 1 second passed?
self.memUpdate()
mem = now
[docs]
def memUpdate(self):
"""
Method that sets the baseline for memory calculation
+ output some information to the terminal
All memory values are in bytes
This method is ran from the buttler() and updates every second
Variables:
memBuffer : float
Percentage of bytes to subtract from the available RAM for safety purposes.
mem : float
Amount of bytes.
"""
memBuffer = 0.9 # 90%
self.memA.value = psutil.virtual_memory()[1] * memBuffer
for child in self.processes:
# in a try/except because processes ID
# are never removed from the list
try:
# uss memory usage; humch much is this process using NOW
mem = psutil.Process(child).memory_full_info()[8]
if self.mem1.value < mem:
# does it for the whole run in case this maximum
# is increased by future childs
self.mem1.value = mem
except Exception:
pass
if (self.mem1.value!=0):
self.nbAllowed.value = math.floor((self.memA.value / self.mem1.value))
self.maxAllowed.value = math.floor((self.memT.value / self.mem1.value))
else:
self.nbAllowed.value = 0.0
self.maxAllowed.value = 0.0
if self.nbAllowed.value < 1:
# Need to at least be able to start a single process
self.nbAllowed.value = 1
if self.maxAllowed.value < 1:
# Need to at least be able to start a single process
self.maxAllowed.value = 1
[docs]
def terminalUpdate(self, memBlock):
"""
Method that constantly updates the user about the currently run tasks
This method is ran from the buttler() and updates every 0.1 second
"""
s = self.started.value
a = self.amount
f = self.finished.value
nowTime = time.time()
# current execution time
eTime = round((nowTime - self.startTime) * 10) / 10
print("---")
if memBlock: # block of prints used only by processingLargeData()
print(
"Available memory: ",
round(self.memA.value / 10000000) / 100,
"/",
round(self.memT.value / 10000000) / 100,
"Gb ",
end="\n",
flush=True,
)
print(
"Active processes: ",
str(self.tasks.value),
" / ",
str(self.maxAllowed.value),
" ",
end="\n",
flush=True,
)
print("Min memory per: ", round(self.mem1.value / 10000000) / 100, "Gb ", end="\n", flush=True)
print("Time for one: ", round(self.timeForOne.value * 10) / 10, " seconds ", flush=True)
print("---")
if (a!=0):
print("Started: ", s, "/", a, " ", round(s / a * 100), "% ", flush=True)
print("Finished: ", f, "/", s, " ", round(f / a * 100), "% ", flush=True)
print("Time elapsed: ", eTime, " seconds ", flush=True)
print("---")
print("\r", end="", flush=True)
# "\033[B" acts as a reverse \n, return the pointer one line up
print("\033[A" * self.rewrite[memBlock], flush=True)
[docs]
def executeSmall(self, arg):
"""
The method executed by processingSmallData
Return:
Nothing, but the return value of the executed method is passed to a
global multiprocessing-friendly list
"""
self.started.value += 1
result = self.function(arg)
self.resultList.append(result)
self.finished.value += 1
[docs]
def processingSmallData(self):
"""
Method for executing mutliprocess on tasks that demand little to no
memory. Will immedialty start all the child processes. The packaging
causes some marginal time lost; Only use for methods that take at
least a second to run : below that, a for loop is likely much faster
Variables:
p : Process
Represents a single child process.
Returns:
The multiprocess-friendly list, that was updated by each child
Errors:
If other application reduce the avalable RAM mid-execution,
Multiprocess outputs "Killed" and kills the child.
"""
self.startTime = time.time()
alfred = Process(target=self.buttler, args=([False]))
alfred.start()
for a in range(len(self.args)):
p = Process(target=self.executeSmall, args=([self.args.pop(0)])) # Multiprocess runs
self.processlist.append(p)
p.start()
for p in self.processlist:
p.join()
time.sleep(1) # wait for the processes to close; do not remove
finishedTime = round(time.time() - self.startTime) * 10 / 10
alfred.terminate()
print("\r", end="")
# "\033[B" acts as a reverse \n, return the pointer one line up
print("\033[B" * self.rewrite[False], flush=True)
print("Completed ", len(self.resultList), "tasks in ", finishedTime, "seconds")
return self.resultList