Source code for aphylogeo.multiProcessor

import math
import os
import time

import psutil
from multiprocess import Manager, Process, Value


[docs] class Multi: """ Class that harnesses the power of "multiprocess" and wraps it, for ease of use, as a single callable line. It is mainly aimed at very many large tasks that could be run on supercomputers with an ridiculous amount of RAM. """ def __init__(self, args, function): """ Constructor of the multiprocessing tool Args: args (list) The list of value to run. One process will be born per 1st level member function (method) The method that needs to run as multiple instance The constructor needs two thing, a method, and a list of lists. Each sub-list of the list will be the list of arguments given to the child process running the given method ***The methods used must only take a single list as input; and then, deal with it's division into other variables*** ex.: Run 3 processes of g() def g(args): n1=args[0] n2=args[1] return n1+n2 list = [ [0,1], [1,1] ,[2,1] ] mps = Multi( list, g ).processingSmallData() #note that "g" is not written "g()" we want to pass it, not run it print( mps ) >>[1,2,3] Variables: args Manager().list() A list accessible from child processes; for the arguments processes Manager().list() A list accessible from child processes; list of all the process ID started in this object resultList Manager().list() A list accessible from child processes; for the return values function method The method that needs to run as multiple instances processlist list All of the child processes mem1 Value(f) A value accessible from child processes; The memory needed to run a single child memA Value(f) A value accessible from child processes; The current available memory memT Value(f) A value accessible from child processes; The total amount of available memory at the creation of the object nbAllowed int The amount of theoretical child processes that can fit in FREE memory maxAllowed int The amount of theoretical child processes that can fit in TOTAL memory tasks Value(i) A value accessible from child processes; The amount of child processes currently running started Value(i) A value accessible from child processes; Number of started processes finished Value(i) A value accessible from child processes; Number of finished processes amount int The amount of processes that the object will start startTime float the present time timeForOne Value(f) A value accessible from child processes; Time it takes for the first iteration to complete rewrite dict ints used the the terminal update rewriting process; amount of lines rewinded if large or small """ self.args = Manager().list(args) self.processes = Manager().list() self.resultList = Manager().list() self.function = function self.processlist = [] self.mem1 = Value("f", 1) self.memA = Value("f", 1) self.memT = Value("f", psutil.virtual_memory()[1]) self.nbAllowed = Value("i", 1) self.maxAllowed = Value("i", 1) self.tasks = Value("i", 0) self.started = Value("i", 0) self.finished = Value("i", 0) self.amount = len(args) self.startTime = 0 self.timeForOne = Value("f", 0) self.rewrite = {True: 11, False: 6}
[docs] def executeOnce(self, arg): """ Executes a method as a single process. This method runs a specified function with the given argument and stores the result in `self.resultList`. Args: arg (list): The list of arguments given to the method. Returns: None: The return value of the executed method is passed to `self.resultList`. """ self.tasks.value += 1 self.started.value += 1 self.processes.append(os.getpid()) # execution of the passed method self.resultList.append(self.function(arg)) self.tasks.value -= 1 self.finished.value += 1
[docs] def processingLargeData(self): """ Method for executing multiprocessing on tasks that require a large amount of individual memory. It first runs a single process and then starts as many child processes as the available RAM permits, launching new ones as RAM is freed. Variables: p : Process Represents a single child process. Returns: list: The multiprocess-friendly list that is updated by each child process. Errors: If another application reduces the available RAM mid-execution, the multiprocessing framework outputs "Killed" and terminates the child process. """ print( " Starting multiprocessing, this might take some time\n", " The first process is ran alone for calibration purposes", ) self.startTime = time.time() # Multiprocess runs once alone p = Process(target=self.executeOnce, args=([self.args.pop(0)])) p.start() # "\033[B" acts as a reverse \n, return the pointer one line up print("\033[B" * self.rewrite[False], flush=True) # adds the main thread on the list of processes to keep an eye on self.processes.append(os.getpid()) # ask the buttler to start complimentary processes alfred = Process(target=self.buttler, args=([True])) alfred.start() # give it a second to open the process, so it doesn't skip the while() time.sleep(1) while self.tasks.value > 0: # wait for the calibration process to finish time.sleep(0.1) self.timeForOne.value = time.time() - self.startTime while len(self.args) != 0: if (self.tasks.value < self.maxAllowed.value) & (self.nbAllowed.value >= 1): # Multiprocess runs the rest of the processes p = Process(target=self.executeOnce, args=([self.args.pop(0)])) self.processlist.append(p) p.start() time.sleep(0.1) else: time.sleep(0.1) for p in self.processlist: p.join() time.sleep(1) # give it a second to close the processes, do not remove alfred.terminate() # sorry Alfred, we have to let you go # These weird prints need to be done because there is no telling where # the terminal is at termination time, better make sure it's clean print("\r", end="") # "\033[B" acts as a reverse \n, return the pointer one line up print("\033[B" * self.rewrite[True], flush=True) # if a process was killed or didn't finished; it will be know here print("Completed with ", str(self.amount - self.finished.value), " errors\n") return self.resultList
[docs] def buttler(self, memBloc): """ Ran as a child process, the buttler will constantly run other methods forever. In this case, it: - Updates the memory capacity. - Prints updates to the terminal It exists so not to bottleneck the main thread. Uses timers to execute it's methods because time.sleep() it processor hungry if constantly called """ terminal = time.time() mem = time.time() while True: now = time.time() if now - terminal > 0.1: # has 0.1 second passed? self.terminalUpdate(memBloc) terminal = now if now - mem > 1 & memBloc: # has 1 second passed? self.memUpdate() mem = now
[docs] def memUpdate(self): """ Method that sets the baseline for memory calculation + output some information to the terminal All memory values are in bytes This method is ran from the buttler() and updates every second Variables: memBuffer : float Percentage of bytes to subtract from the available RAM for safety purposes. mem : float Amount of bytes. """ memBuffer = 0.9 # 90% self.memA.value = psutil.virtual_memory()[1] * memBuffer for child in self.processes: # in a try/except because processes ID # are never removed from the list try: # uss memory usage; humch much is this process using NOW mem = psutil.Process(child).memory_full_info()[8] if self.mem1.value < mem: # does it for the whole run in case this maximum # is increased by future childs self.mem1.value = mem except Exception: pass if (self.mem1.value!=0): self.nbAllowed.value = math.floor((self.memA.value / self.mem1.value)) self.maxAllowed.value = math.floor((self.memT.value / self.mem1.value)) else: self.nbAllowed.value = 0.0 self.maxAllowed.value = 0.0 if self.nbAllowed.value < 1: # Need to at least be able to start a single process self.nbAllowed.value = 1 if self.maxAllowed.value < 1: # Need to at least be able to start a single process self.maxAllowed.value = 1
[docs] def terminalUpdate(self, memBlock): """ Method that constantly updates the user about the currently run tasks This method is ran from the buttler() and updates every 0.1 second """ s = self.started.value a = self.amount f = self.finished.value nowTime = time.time() # current execution time eTime = round((nowTime - self.startTime) * 10) / 10 print("---") if memBlock: # block of prints used only by processingLargeData() print( "Available memory: ", round(self.memA.value / 10000000) / 100, "/", round(self.memT.value / 10000000) / 100, "Gb ", end="\n", flush=True, ) print( "Active processes: ", str(self.tasks.value), " / ", str(self.maxAllowed.value), " ", end="\n", flush=True, ) print("Min memory per: ", round(self.mem1.value / 10000000) / 100, "Gb ", end="\n", flush=True) print("Time for one: ", round(self.timeForOne.value * 10) / 10, " seconds ", flush=True) print("---") if (a!=0): print("Started: ", s, "/", a, " ", round(s / a * 100), "% ", flush=True) print("Finished: ", f, "/", s, " ", round(f / a * 100), "% ", flush=True) print("Time elapsed: ", eTime, " seconds ", flush=True) print("---") print("\r", end="", flush=True) # "\033[B" acts as a reverse \n, return the pointer one line up print("\033[A" * self.rewrite[memBlock], flush=True)
[docs] def executeSmall(self, arg): """ The method executed by processingSmallData Return: Nothing, but the return value of the executed method is passed to a global multiprocessing-friendly list """ self.started.value += 1 result = self.function(arg) self.resultList.append(result) self.finished.value += 1
[docs] def processingSmallData(self): """ Method for executing mutliprocess on tasks that demand little to no memory. Will immedialty start all the child processes. The packaging causes some marginal time lost; Only use for methods that take at least a second to run : below that, a for loop is likely much faster Variables: p : Process Represents a single child process. Returns: The multiprocess-friendly list, that was updated by each child Errors: If other application reduce the avalable RAM mid-execution, Multiprocess outputs "Killed" and kills the child. """ self.startTime = time.time() alfred = Process(target=self.buttler, args=([False])) alfred.start() for a in range(len(self.args)): p = Process(target=self.executeSmall, args=([self.args.pop(0)])) # Multiprocess runs self.processlist.append(p) p.start() for p in self.processlist: p.join() time.sleep(1) # wait for the processes to close; do not remove finishedTime = round(time.time() - self.startTime) * 10 / 10 alfred.terminate() print("\r", end="") # "\033[B" acts as a reverse \n, return the pointer one line up print("\033[B" * self.rewrite[False], flush=True) print("Completed ", len(self.resultList), "tasks in ", finishedTime, "seconds") return self.resultList