Source code for aphylogeo.alignement

import glob
import os
import sys
import json
import statistics as st
import subprocess
from collections import defaultdict
from io import StringIO
from itertools import combinations
from pathlib import Path


import Bio.SeqIO
from Bio import AlignIO
from Bio.Align import PairwiseAligner
from Bio.Align.Applications import ClustalwCommandline, MafftCommandline
from Bio.Seq import Seq
import pandas as pd

from .params import Params
from .multiProcessor import Multi

import itertools
import textdistance as td


[docs] class Alignment: """ Class that contains the data of a multiple sequence alignment. """ def __init__(self, alignment_method: str, msa): """ init method args: alignment_method (str): The method used to align the sequences msa (dict): The multiple sequence alignment data """ self.type = "Alignment" self.alignment_method = alignment_method self.msa = msa
[docs] def to_dict(self): """ Method that converts the alignment data to a dictionary args: None """ # Convert the msa data to string format msa_str_dict = {key: self.msa_to_string(self.msa[key]) for key in self.msa} return {"type": self.type, "alignment_method": self.alignment_method, "msa": msa_str_dict}
[docs] @staticmethod def msa_to_string(msa_obj): """ Method that converts the alignment data to a string Args: MSA (AlignIO): The multiple sequence alignment data """ return "\n".join([f">{record.id}\n{str(record.seq)}" for record in msa_obj])
[docs] @staticmethod def msa_from_string(msa_str): """ Method that converts the alignment string to MSA Args: MSA string """ return AlignIO.read(StringIO(msa_str), "fasta")
[docs] @classmethod def from_dict(cls, d): # Convert msa data back to its original format msa_dict = {key: cls.msa_from_string(d["msa"][key]) for key in d["msa"]} return cls(d["alignment_method"], msa_dict)
[docs] def save_to_json(self, filename): """ Method that saves a json sequence file Args: filename (str): The name of the json file to save """ with open(filename, "w") as f: json.dump(self.to_dict(), f)
[docs] @classmethod def load_from_json(cls, filename): """ Method that loads a json sequence file Args: filename (str): The name of the json file to load Returns: alignment class """ with open(filename, "r") as f: data = json.load(f) return cls.from_dict(data)
[docs] @classmethod def from_json_string(cls, json_string): """ Method that loads an Alignment object from a JSON string. Args: json_string (str): The JSON string to load. Returns: An instance of the Alignment class. """ data = json.loads(json_string) return cls.from_dict(data)
[docs] @classmethod def from_fasta_file(cls, filename, alignment_method): """ Method that loads a sequence from a fasta file Args: filename (str): The name of the fasta file to load alignment_method: The method used to align the sequences Returns: _type_: MSA """ with open(filename, "r") as f: msa = AlignIO.read(f, "fasta") msa_dict = {msa.id: msa} return cls(alignment_method, msa_dict)
[docs] class AlignSequences: """ Class that perform a heuristic Multiple Sequence Alignement and windi from a single fasta file. """ def __init__(self, sequences, makeDebugFiles=False): """ Constructor if the alignment object. All parts of the process are available as variables. Inputs: sequences (dict) key = Sequence ID, value = Seq() makeDebugFiles (Boolean) if True, will create a folder with all the intermediate files args: self.sequences (Dictionary) Read data from the fasta file key = Sequence ID value = Seq() self.centroidKey (String) Specimen ID of the centroid sequence self.centroidSeq (Seq()) The centroid sequence self.aligned (Dictionary) Sequences after pairwise alignement key = Sequence ID (String) the specimen's ID value = Sequence (Seq()) the specimen's DNA sequence self.heuristicMSA (Dictionary) Sequences after star style alignement key = Sequence ID (String) the specimen's ID value = Sequence (Seq()) the specimen's DNA sequence self.windowed (Dictionary of Dictionary) Sequences after being windowed key = Window ID (String) as "startPosition_endPosition" value = (Dictionary) key = Sequence ID (String) the specimen's ID value = Sequence (Seq()) the specimen's DNA sequence ##todo self.msa (AlignIO()) self.rate_similarity self.method_similarity """ self.sequences = sequences self.makeDebugFiles = makeDebugFiles
[docs] def align(self) -> Alignment: """ Method that align sequences Returns: Alignment: The alignment object """ if Params.alignment_method == "0": heuristicMSA = self.sequences elif Params.alignment_method == "1": centroidKey = self.getSequenceCentroid()[0] centroidSeq = self.sequences.pop(centroidKey) aligned = self.alignSequencesWithPairwise(centroidKey, centroidSeq) if Params.fit_method == "1": heuristicMSA = self.starAlignement(centroidKey, aligned) elif Params.fit_method == "2": heuristicMSA = self.narrowFitPairwise(aligned) elif Params.alignment_method == "2": heuristicMSA = self.muscleAlign() elif Params.alignment_method == "3": heuristicMSA = self.clustalAlign() elif Params.alignment_method == "4": heuristicMSA = self.mafftAlign() else: raise ValueError("Invalid alignment method") [os.remove(file) for file in glob.glob("aphylogeo/bin/tmp/*.fasta")] # Remove temp fasta files windowed = self.slidingWindow(heuristicMSA) self.msa = self.makeMSA(windowed) self.alignment = Alignment(Params.alignment_method, self.msa) return self.alignment
[docs] def getSequenceCentroid(self): """ Method that picks the centroid sequence in a dictionary of sequences variables: list (list) Needed variable format for using Multiprocessor return: a list of: resultKey (String) resultSum (int) """ seqs = list((self.sequences).items()) print("\nSearching for the centroid") # formats input as a list for the Multiprocess seq_pairs_swapped = [((i[0][1], i[0][0]), (i[1][1], i[1][0])) for i in combinations(seqs, r=2)] seq_pairs = [list(sum(i, ())) for i in seq_pairs_swapped] # starts all the processes align_scores = Multi(seq_pairs, self.ScoreSingle).processingLargeData() # formats the multiprocess output back in a dictionnary scores = defaultdict(list) for pair in align_scores: for i in range(0, 2): scores[pair[i]].append(pair[2]) centroid_accession = max({key: st.median(val) for key, val in scores.items()}) centroid_score = st.median(scores[centroid_accession]) print(f"The centroid is '{centroid_accession}' with a median score of {centroid_score}\n") return [centroid_accession, centroid_score]
[docs] def ScoreSingle(self, args): """ Method the gets only the score of a couple of sequence regarding the pairwise alignement Args: a list: seqA (Seq()) Sequence A; considered the refenrence seqAID (String) Specimen A's ID seqB (Seq()) Sequence B seqBID (String) Speciment B's ID return: seqAID see above seqBID see above score (float) the resulting score of this couple of alignement """ seqA, seqB = args[0], args[2] seqAID, seqBID = args[1], args[3] aligner = PairwiseAligner() score = aligner.score(seqA, seqB) return (seqAID, seqBID, score)
[docs] def alignSequencesWithPairwise(self, centroidKey, centroidSeq): """ Method that aligns multiple DNA sequences. The first speciment of the dataset is used as the main pivot. This method uses parrallel computing. Variables: seqs (Dictionary) see self.sequences list (list) Needed variable format for using Multiprocessor result (list) output of all the processes Return: resultList (Dictionary) see self.aligned """ print("\nStarting sequence alignement") seqs = self.sequences seq_pairs = [] for seqXID in seqs.keys(): seq_pairs.append([centroidKey, centroidSeq, seqXID, seqs[seqXID]]) align_scores = Multi(seq_pairs, self.alignSingle).processingLargeData() aligned = {} # reformats the output in a dictionnary for i in align_scores: temp = {} temp[i[2]] = Seq((i[1][0])) temp[i[0]] = Seq((i[1][1])) aligned[str(i[0] + " vs " + i[2])] = temp # JUST TO MAKE THE DEBUG FILES if self.makeDebugFiles: directory = os.path.abspath("./debug/1_alignSequences") os.makedirs(directory, exist_ok=True) # os.mkdir("./debug/1_alignSequences") for w in aligned.keys(): self.dictToFile(aligned[w], str("1_alignSequences/" + w), ".fasta") # JUST TO MAKE THE DEBUG FILES return aligned
[docs] def muscleAlign(self): """Method to perform a multiple DNA sequence alignment using Muscle Algorithm Returns (Dict): heuristicMSA - Keys: accession ID - Values: Aligned sequences """ try: if sys.platform == "win32": muscle_exe = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + "\\aphylogeo\\bin\\muscle5.1.win64.exe" out_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + "\\aphylogeo\\bin\\tmp" elif (sys.platform == "linux1") | (sys.platform == "linux2") | (sys.platform == "linux") | (sys.platform == "darwin"): muscle_exe = r"aphylogeo/bin/muscle5.1.linux_intel64" out_dir = r"aphylogeo/bin/tmp/" in_file = Params.reference_gene_filepath out_file = os.path.splitext(os.path.basename(Params.reference_gene_filepath))[0] out_fullname = str(out_dir + out_file + "_muscle_aligned.fasta") process = subprocess.Popen([muscle_exe, "-align", in_file, "-output", out_fullname]) process.wait() records = Bio.SeqIO.parse(out_fullname, "fasta") return {rec.id: str(rec.seq) for rec in records} # If the muscle executable is not found except FileNotFoundError: print("Muscle executable not found inside the bin folder. Please check the documentation for installation instructions.") sys.exit(1)
[docs] def clustalAlign(self): """Method to perform a multiple DNA sequence alignment using ClustalW2 Algorithm Returns (Dict): heuristicMSA - Keys: accession ID - Values: Aligned sequences """ try: if sys.platform == "win32": clustal_exe = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + "\\aphylogeo\\bin\\clustalw2.exe" fasta_out = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + "\\aphylogeo\\bin\\tmp\\clustal_alignment.fasta" elif (sys.platform == "linux1") | (sys.platform == "linux2") | (sys.platform == "linux") | (sys.platform == "darwin"): clustal_exe = r"aphylogeo/bin/clustalw2" fasta_out = r"aphylogeo/bin/tmp/clustal_alignment.fasta" in_file = Params.reference_gene_filepath clustalw_cline = ClustalwCommandline(clustal_exe, infile=in_file, outfile=fasta_out, output="FASTA") out, err = clustalw_cline() records = Bio.SeqIO.parse(fasta_out, "fasta") return {rec.id: str(rec.seq) for rec in records} # If the clustalw executable is not found except FileNotFoundError: print("ClustalW2 executable not found inside the bin folder. Please check the documentation for installation instructions.") sys.exit(1)
[docs] def mafftAlign(self): """Method to perform a multiple DNA sequence alignment using MAFFT Algorithm Returns (Dict): heuristicMSA - Keys: accession ID - Values: Aligned sequences """ try: if sys.platform == "win32": mafft_exe = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + "\\aphylogeo\\bin\\mafft-win\\mafft.bat" elif (sys.platform == "linux1") | (sys.platform == "linux2") | (sys.platform == "linux") | (sys.platform == "darwin"): mafft_exe = r"aphylogeo/bin/mafft-linux64/mafft.bat" in_file = Params.reference_gene_filepath mafft_cline = MafftCommandline(mafft_exe, input=in_file) out, err = mafft_cline() fasta_io = StringIO(out) records = Bio.SeqIO.parse(fasta_io, "fasta") return {rec.id: str(rec.seq).upper() for rec in records} # If the mafft executable is not found except FileNotFoundError: print("MAFFT executable not found inside the bin folder. Please check the documentation for installation instructions.") sys.exit(1)
[docs] def alignSingle(self, args): """ Method that aligns two DNA sequences using the pairwise2 algorithm. Args: args (list) scID (String) The centroid sequence ID to compare to sc (Seq()) The DNA sequence of scID seqBID (String) The sequence ID to compare with seqB (Seq()) The DNA sequence of seqBID Return: (list) seqBID see above aligned (List) The list containing the results. scID see above """ scID, seqBID = args[0], args[2] sc, seqB = args[1], args[3] # Must return Alignment at index [0] of BioPairwiseAlignment aligner = PairwiseAligner() aligned = aligner.align(sc, seqB)[0] return [seqBID, aligned, scID]
[docs] def narrowFitPairwise(self, aligned): """Fit length of a centroid sequence and its pairwise aligned sequences The length of each sequence from the pairwise alignment are set equal by inserting dash (-) in most appropriate location of a given sequence. args: alignment: dict of nested dict {accession couple #1 : {Centroid Acc:Centroid Aligned Seq, Non-centroid Acc #1: non-centroid Aligned Seq #1}, ... , {accession couple #n : {Centroid Acc:Centroid Aligned Seq, Non-centroid Acc #n: non-centroid Aligned Seq #n}} Returns: A dictionary of all accessions and their fitted aligned sequences. """ seqs = self.getAlignSeqs(aligned) max_len = max(self.getAlignSeqLens(aligned)) for nucleo_i in range(0, max_len): for seq_i in range(0, len(seqs)): if self.isCurrentCharDash(seqs, seq_i, nucleo_i): seqs = self.insertDashToShorterSeq(seqs, nucleo_i, aligned) seqs = self.appendDashToShorterSeqs(seqs, max_len) return self.mergeFitPairwise(aligned, seqs)
[docs] def getAlignSeqs(self, aligned): """Extract all sequences aligned using a pairwise alignment args: alignment: see fitPairwise(alignment) docstring Returns: List of sequences aligned through pairwise alignment """ seqs = [] for alignment in aligned: seqs.append([str(seq) for seq in aligned[alignment].values()]) return list(sum(seqs, []))
[docs] def getAlignSeqLens(self, aligned): """Get length of all sequences aligned using a pairwise alignment args: alignment: see fitPairwise(alignment) docstring Returns: List of the length of each aligned sequences """ return [len(seq) for seq in self.getAlignSeqs(aligned)]
[docs] def getAlignCouple(self, aligned): """Get nested couple accessions and their respective sequences args: alignment: see fitPairwise(alignment) docstring Returns: List of paired accessions and their aligned sequences """ return [val for val in list(aligned.values())]
[docs] def extractOneAlignAcc(self, aligned, nest_ord=0): """Extract the accession from a nested alignment couple args: alignment: see fitPairwise(alignment) docstring nest_ord (int) optional: The position of the nested accessions (Default = 0 (centroid), 1 (aligned sequence)) Returns: The list of either centroid (nest_ord = 0 (Default)) or non-centroid (nest_ord = 1) accessions of a group of sequences aligned throug pairwise alignment. """ try: return [list(i)[nest_ord] for i in self.getAlignCouple(aligned)] # Return the centroid sequence if an invalid position is queried except IndexError: return [list(i)[0] for i in self.getAlignCouple(aligned)]
[docs] def isCurrentCharDash(self, seqs, seq_i, ch_i): """Assess whether the character at current cursor position is a dash args: seqs (list): aligned sequences to fit seq_i (int): index of the current sequence ch_i (int): index of the currenct character Returns: True if the current character assessed is a dash, False otherwise """ try: return seqs[seq_i][ch_i] == "-" except IndexError: return False
[docs] def insertDashToShorterSeq(self, seqs, ch_i, aligned): """Insert a dash at the current position of a sequence Insert a dash (-) character in a sequence if its length is shorter than the longest one in the group of aligned sequence. args: seqs (list): aligned sequences to fit seq_i (int): index of the current sequence Returns (List): - The fitted sequences of a pairwise alignment """ for seq_j in range(0, len(seqs)): try: if (len(seqs[seq_j]) < max(self.getAlignSeqLens(aligned))) & (seqs[seq_j][ch_i] != "-"): seqs[seq_j] = seqs[seq_j][:ch_i] + "-" + seqs[seq_j][ch_i:] except IndexError: seqs[seq_j] = seqs[seq_j][:ch_i] + "-" return seqs
[docs] def mergeFitPairwise(self, aligned, seqs): """Generate a dictionary of all accessions and their fitted sequences args: alignment: see fitPairwise(alignment) docstring seqs (list): aligned sequences to fit Returns (Dict): Group of accessions and their fitted sequences from a pairwise alignment """ centroid = {list(set(self.extractOneAlignAcc(aligned)))[0]: seqs[0]} non_centroid = dict(zip(self.extractOneAlignAcc(aligned, 1), seqs[1::2])) return centroid | non_centroid
[docs] def appendDashToShorterSeqs(self, seqs, max_len): """Append dash to all sequences shorter than the longest one from a list of sequences args: seqs, list: List of fitted sequences post pairwise alignment max_len int: Length of the longest aligned sequence, including the blank/dash Returns: List of sequences with dash appended where applicable """ return [f"{str(seq):-<{max_len}}" for seq in seqs]
[docs] def starAlignement(self, centroidKey, aligned): """ Method that combs through all the pairwise alignments couples and makes it so that every sequenced is aligned with every other sequences. If a "-" is found in the seqA of a pair, but not another, it is inserted into every other ones. Example: pair1: pair2: seqA1: TACTAC seqA2: TAC-TAC seqB1: TACTAC seqB2: TACTTAC becomes: seqA1: TAC-TAC seqA2: TAC-TAC seqB1: TAC-TAC seqB2: TACTTAC and outputs: seqA : TAC-TAC #now combines SeqA1 and SeqA2 seqB1: TAC-TAC seqB2: TACTTAC then, we compare the aligned set with the next pair: SeqA : TAC-TAC seqA3: TACTA-C seqB1: TAC-TAC seqB3: TACTAAC seqB2: TACTTAC wich makes: SeqA : TAC-TA-C seqA3: TAC-TA-C seqB1: TAC-TA-C seqB3: TAC-TAAC seqB2: TACTTA-C and outputs: SeqA : TAC-TA-C #now combines SeqA1, SeqA2 and SeqA3 seqB1: TAC-TA-C seqB2: TACTTA-C seqB3: TAC-TAAC over and over again Return: starAlign (dict) see self.heuristicMSA """ scKey = centroidKey starAlign = {} for k in aligned.keys(): # couple is SeqA and SeqB of a pairwise alignement couple = aligned[k] a = list(couple.keys()) if scKey in a: a.remove(scKey) if not a: # No second sequence, skip this pair continue sNewKey = a[0] # SeqB ID, *not* the reference starAlign[scKey] = str(couple[scKey]) # SeqA, the reference starAlign[sNewKey] = str(couple[sNewKey]) # SeqB, *not* the reference if len(starAlign) > 2: starAlign = self.merge(starAlign, scKey, sNewKey, centroidKey) starAlign["temp"] = starAlign[scKey] # SeqA, the *old* reference starAlign.pop("temp") # JUST TO MAKE THE DEBUG FILES if self.makeDebugFiles: directory_path = "./debug/2_starAlignement" os.makedirs(directory_path, exist_ok=True) self.dictToFile(starAlign, "2_starAlignement/starAligned", ".fasta") # JUST TO MAKE THE DEBUG FILES return starAlign
[docs] def merge(self, result, k1, k2, centroidKey): """ Method that loops through each position of two strings ans compares the Chars. Arguments: result (dict) the dictionnary of objects to compare; contains only object that have already been aligned + a new pair to align can be refered to as "aligned set" k1 (String) The Key of the object we want compared k2 (String) The Key of the object we want compared Variables: minLen (int) The number of char in the smallest of the two strings pos (int) The char position at wich we are now; it loops nChar (char) The char from k1 tChar (char) The char from K2 keylist (list) Ultimatly, contains all the keys of the object that need to change Return: result (dict) The same object we started with, but with one more aligned pair inside. """ newRef = result[k1] tempRef = result["temp"] minLen = 1 pos = 0 while pos < minLen: # The sequence length could change at each iteration # these assignemnts needs to be done each loop to get # to the true end and not to skip the '-' we just added newRef = result[k1] tempRef = result["temp"] minLen = min(len(newRef), len(tempRef)) nChar = newRef[pos] tChar = tempRef[pos] if nChar != tChar: # - found in the new ref, change all but the 2 new elements if nChar == "-": keyList = list(result.keys()) keyList.remove(k1) keyList.remove(k2) # - found in the old ref; change the 2 new elements elif tChar == "-": keyList = [k1, k2] else: errStr = str( 'Alignement error. Merge() found "' + str(nChar) + '" and "' + str(tChar) + '" ' + "at position " + str(pos) + " of two versions of the centroid " + "sequences\n" + "Please check the previous methods" + " and ensure the pairwise alignemnt is correct" + "\nCentroid ID: " + str(centroidKey) + "\nPairwise seq ID" + " last inserted: " + str(k2) ) raise Exception(errStr) result = self.insertDash(result, pos, keyList) pos += 1 return result
[docs] def insertDash(self, dict, pos, keyList): """ Method that inserts a "-" at [pos] in a string at every Key in a dict Arguments: dict (dict) contains many objects as: - key = (string) - values = (string) pos (int) the char position at wich to insert keyList (list) list of keys of objects to modify Variables: char (char) The char to insert Return: dict (dict) The same we started with, with the modifications """ for k in keyList: char = "-" s = dict[k] s = s[:pos] + char + s[pos:] dict[k] = s return dict
[docs] def slidingWindow(self, heuristicMSA, optimized=True): """ Method that slices all the sequences in a dictionary to a specific window (substring) Example: step_size=3 window_size=5 123 : CGGCTCAGCT --> 123_3_7 : GCTCA 456 : TAGCTTCAGT --> 456_3_7 : GCTTC Args: alignedSequences (Dictionary) - Key (String) is the ID of the specimen - Data (Seq(String)) is the specimen's DNS sequence others* (var) see param.yaml Return: resultDict (Dictionary) Key is originalKey_i_j originalKey = the name of the key before the window i = The starting position of the window, relative to the original sequence j = The ending position of the window, relative to the original sequence """ step = Params.window_size windowed_alignment = dict() seq_len = max([len(h) for h in heuristicMSA.values()]) paddedMSA = {key: str(val).ljust(seq_len, "-") for key, val in heuristicMSA.items()} if optimized: for i in range(0, seq_len, step): if i + step < seq_len: windowed_alignment[f"{i}_{i + step - 1}"] = {key: val[i : i + step - 1] for key, val in paddedMSA.items()} combinations = itertools.combinations(windowed_alignment[f"{i}_{i + step - 1}"].values(), 2) df = pd.DataFrame(list(combinations)) similarity_score = float(self.similarity(df)) if Params.rate_similarity > similarity_score: windowed_alignment.pop(f"{i}_{i + step - 1}") else: windowed_alignment[f"{i}_{seq_len-1}"] = {key: val[i : i + seq_len - 1] for key, val in paddedMSA.items()} combinations = itertools.combinations(windowed_alignment[f"{i}_{seq_len-1}"].values(), 2) df = pd.DataFrame(list(combinations)) similarity_score2 = float(self.similarity(df)) if Params.rate_similarity > similarity_score2: windowed_alignment.pop(f"{i}_{seq_len-1}") else: for i in range(0, seq_len, step): if i + step < seq_len: windowed_alignment[f"{i}_{i + step - 1}"] = {key: val[i : i + step - 1] for key, val in paddedMSA.items()} else: windowed_alignment[f"{i}_{seq_len-1}"] = {key: val[i : i + seq_len - 1] for key, val in paddedMSA.items()} # if self.rate_similarity[0] < self.similarity(df): # windowed_alignment.pop(f"{i}_{seq_len-1}") # print(self.rate_similarity) # JUST TO MAKE THE DEBUG FILES if self.makeDebugFiles: directory_path = "./debug/3_slidingWindow" os.makedirs(directory_path, exist_ok=True) for w in windowed_alignment.keys(): self.dictToFile(windowed_alignment[w], "3_slidingWindow/" + w, ".fasta") # JUST TO MAKE THE DEBUG FILES return windowed_alignment
[docs] def dictToFile(self, dict, filename, ext): """ Debuging method that creates files from a dictonnary of sequences. File is put in the debug file of the cwd args: dict (dict) the objects to write in the file - key = (string) - values = (string) filename (String) the name of the future file ext (String) the file extension return: dict (dict) see dict from arguments """ dir = "./debug" if not os.path.exists(dir): os.mkdir(dir) file_path = os.path.join(dir, str(filename) + ext) os.makedirs(os.path.dirname(file_path), exist_ok=True) with open(file_path, "w") as f: for key in dict.keys(): f.write(">" + str(key) + "\n") f.write(str(dict[key] + "\n")) return dict
[docs] def makeMSA(self, windowed): """ Method that create a dictionnary of Multiple Sequence Alignment(MSA) objects from bioPython. Each entry in the dictionnary is a MSA object of a single sliding window. returns: msaSet (dict) - key (String) - the window name - value (AlignIO) - the MSA object """ msaSet = {} for windowSet in windowed.keys(): data = "" window = windowed[windowSet] for seq in window.keys(): data += str(">" + seq + "\n" + window[seq] + "\n") msaSet[windowSet] = AlignIO.read(StringIO(data), "fasta") return msaSet
[docs] def fileToDict(filename, ext): """ Method that reads a fasta file and returns a dictionnary of Seq objects arguments: filename (String) the name of the file ext (String) the file extension return: dict (dict) - key = (string) - values = (string) """ f = open(Path(filename + ext), "r") dict = {} for line in f: if line[0] == ">": key = line[1:-1] dict[key] = "" else: dict[key] += line[:-1] f.close() return dict
[docs] def fileToAlignIO(filename, ext): """ Method that reads a fasta file and returns a AlignIO object arguments: filename (String) the name of the file ext (String) the file extension return: alignIO (AlignIO) the AlignIO object """ f = open(Path(filename + ext), "r") data = "" for line in f: data += line f.close() alignIO = AlignIO.read(StringIO(data), "fasta") return alignIO
[docs] def similarity(self, df): """ Method that compute similarity in all string in dataframe (df) Method source: https://yassineelkhal.medium.com/the-complete-guide-to-string-similarity-algorithms-1290ad07c6b7 arguments: method (int) similarity method 1: Hamming distance 2: Levenshtein distance 3: Damerau-Levenshtein distance 4: Jaro similarity 5: Jaro-Winkler similarity 6: Smith–Waterman similarity 7: Jaccard similarity 8: Sørensen-Dice similarity --NOT IMPLEMENTED-- 9: Tversky similarity 10: Overlap similarity 11: Cosine similarity 12: N-gram similarity 13: Ratcliff-Obershelp similarity 14: Longest common substring/subsequence similarity df (dataframe) dataframe of all string sequances return: percentage similarity (float) the similarity between the dataframe of all strings elements """ rateSimilarity = 0.0 if Params.method_similarity[0] == "1": rateSimilarity = df.apply(lambda x: td.hamming.normalized_similarity(x[0], x[1]), axis=1).mean() * 100 elif Params.method_similarity[0] == "2": rateSimilarity = df.apply(lambda x: td.levenshtein.normalized_similarity(x[0], x[1]), axis=1).mean() * 100 elif Params.method_similarity[0] == "3": rateSimilarity = df.apply(lambda x: td.damerau_levenshtein.normalized_similarity(x[0], x[1]), axis=1).mean() * 100 elif Params.method_similarity[0] == "4": rateSimilarity = df.apply(lambda x: td.jaro(x[0], x[1]), axis=1).mean() * 100 elif Params.method_similarity[0] == "5": rateSimilarity = df.apply(lambda x: td.jaro_winkler(x[0], x[1]), axis=1).mean() * 100 elif Params.method_similarity[0] == "6": rateSimilarity = df.apply(lambda x: td.smith_waterman(x[0], x[1]), axis=1).mean() * 100 elif Params.method_similarity[0] == "7": rateSimilarity = df.apply(lambda x: td.jaccard(x[0], x[1]), axis=1).mean() * 100 elif Params.method_similarity[0] == "8": rateSimilarity = df.apply(lambda x: td.sorencen(x[0], x[1]), axis=1).mean() * 100 return rateSimilarity