fraggler
Description
Fraggler is for fragment analysis in Python! Fraggler is a Python package that provides functionality for analyzing and generating reports for fsa files. It offers both a Python API and a command-line tool.
Features
Peak Area Report Generation
: Fraggler allows you to generate peak area reports for all input files. The package calculates peak areas based on specified parameters and generates a report summarizing the results.
Combined Peak Table Generation
: Fraggler provides a command-line tool to generate a combined dataframe of peaks for all input files. This allows you to easily analyze and compare peaks across multiple files.
Customization Options
: Fraggler offers various customization options to tailor the analysis to your specific needs. You can specify parameters such as ladder type, peak model, minimum ratio, minimum height, cutoff value, trace channel, peak height, and even provide a custom peaks file for specific assays and intervals.
Install
pip install fraggler
Dependencies
Fraggler depends on:
- pandas
- numpy
- scikit-learn
- networkx
- lmfit
- scipy
- biopython
- panel
- fire
- colorama
- altair
Python API
To get an overview how the library can be used in a python environment, please look at the tutorial.ipynb.
CLI
fraggler area
and fraggler peak
Usage
To generate peak area reports and a peak table for all input files, use the fraggler area
or fraggler peak
command followed by the required positional arguments and any optional flags.
- If not specified, fraggler finds peaks agnostic in the
fsa file
. To specifiy custom assays with certain peaks and intervals, the user can add a .csv file to the--custom_peaks
argument. The csv file MUST have the following shape:
name | start | stop | amount | min_ratio | which | peak_distance |
---|---|---|---|---|---|---|
prt1 | 140 | 150 | 2 | 0.2 | FIRST | 5 |
Example how how a file could look:
name,start,stop,amount,min_ratio,which,peak_distance
prt1,135,155,2,0.2,FIRST,
prt3,190,205,,0.2,FIRST,
prt2,222,236,2,0.2,FIRST,5
prt4,262,290,5,,,
name
: Name of the assaystart
: Start of the assay in basepairsstop
: Stop of the assay in basepairsamount
: Optional. Amount of peaks in assay. If left empty every peak in the interval is included.min_ratio
: Optional. Only peaks with the a ratio of themin_ratio
of the highest peak is included, e.g. ifmin_ratio == .02
, only peaks with a height of 20 is included, if the highest peak is 100 unitswhich
: LARGEST | FIRST. Can be left empty. Which peak should be included if there are more peaks than theamount
. if FIRST is set, then the two first peaks are chosen. If LARGEST are set, then the two largests peaks in the area are chosen. Defaults to LARGESTpeak_distance
: Optional. Distance between peaks must be under this value.
Positional Arguments
The following positional arguments are required:
IN_PATH
: Typestr
. Specifies the input path.OUT_FOLDER
: Typestr
. Specifies the output folder.LADDER
: Typestr
. Specifies the ladder used in the experiment.
Flags
The following flags can be used with the fraggler peak
or fraggler area
command:
-l, --ladder=LADDER
: Typestr
. Specifies the ladder. Default value: 'LIZ'.--peak_model=PEAK_MODEL
: Typestr
. Specifies the peak model. Default value: 'gauss'.--min_ratio=MIN_RATIO
: Typefloat
. Specifies the minimum ratio. Default value: 0.3.--min_height=MIN_HEIGHT
: Typeint
. Specifies the minimum height. Default value: 100.--cutoff=CUTOFF
: Typeint
. Specifies the cutoff value. Default value: 175.-t, --trace_channel=TRACE_CHANNEL
: Typestr
. Specifies the trace channel. Default value: 'DATA9'.--peak_height=PEAK_HEIGHT
: Typeint
. Specifies the peak height. Default value: 200.--custom_peaks=CUSTOM_PEAKS
: TypeOptional[str]
. Specifies custom peaks. Default value: None.-e, --excel=EXCEL
: Type:bool
, Default: True
Typical usage
fraggler area IN_FOLDER OUT_FOLDER LIZ --min_ratio=0.2
Documentation
Click here to get full documentation of API.
Output
One example of the report generated from fraggler area
can be seen here:
Contributions
Please check out How to contribute
Contributing to fraggler
We want you to contribute and help to make fraggler better! See the Table of Contents on how to do this.
If you can't help coding, but wants to help otherwise, please consider:
- Give fraggler a star here on github
- Cite fraggler
- Spread the word!
Table of Contents
FAQ
First, read the
README
or look read the documentation Documentation
Otherwise, open a new issue and ask away!
Contributions
Make sure everthing works as expected when doing a pull request
Reporting Bugs
First, check if...:
- You are using the latest verision.
- Check the issues
Enhancements
How to suggest new cool features to fraggler!
First, check if...:
- You are using the latest verision.
- Check the issues
Attribution
- Please, cite fraggler and spread the word! This is how you can do it:
- Cite the paper at [HERE]
- Star the repo!
- Use fraggler!
- Tell a friend
1r""" 2.. include:: ../README.md 3.. include:: ../CONTRIBUTION.md 4""" 5 6__author__ = "William Rosenbaum and Pär Larsson" 7 8from .ladder_fitting.fit_ladder_model import FitLadderModel 9from .ladder_fitting.peak_ladder_assigner import PeakLadderAssigner 10from .ladders.ladders import LADDERS 11from .plotting.plot_ladder import PlotLadder 12from .utils.baseline_removal import baseline_arPLS 13from .utils.fsa_file import FsaFile 14from .utils.utils import get_files, setup_logging 15from .utils.fraggler_object import ( 16 FragglerPeak, 17 FragglerArea, 18 make_fraggler_peak, 19 make_fraggler_area, 20) 21from .applications.peak_area_multiplex import PeakAreaDeMultiplex 22from .plotting.plot_peak_area import PlotPeakArea 23from .plotting.plot_raw_data import PlotRawData 24from .plotting.plot_peaks import PlotPeaks 25from .plotting.plot_channels import make_fsa_data_df, plot_fsa_data 26from .reports.reports import ( 27 generate_peak_report, 28 generate_area_report, 29 generate_no_peaks_report, 30) 31from .functions.generate_peak_table import generate_peak_table 32from .utils.peak_finder import PeakFinder 33from .cli import peak_report, area_report 34 35 36__all__ = [ 37 "FitLadderModel", 38 "PeakLadderAssigner", 39 "LADDERS", 40 "baseline_arPLS", 41 "FsaFile", 42 "PeakArea", 43 "PeakAreaDeMultiplex", 44 "PlotPeakArea", 45 "PlotLadder", 46 "PlotRawData", 47 "PlotPeaks", 48 "generate_peak_table", 49 "get_files", 50 "setup_logging", 51 "make_fsa_data_df", 52 "plot_fsa_data", 53 "PeakFinder", 54 "FragglerPeak", 55 "FragglerArea", 56 "make_fraggler_peak", 57 "make_fraggler_area", 58 "generate_peak_report", 59 "generate_area_report", 60 "generate_no_peaks_report", 61 "peak_report", 62 "area_report", 63]
25class FitLadderModel: 26 def __init__(self, ladder_assigner: PeakLadderAssigner) -> None: 27 """ 28 Initialize FitLadderModel object with ladder_assigner object and its attributes. 29 30 Args: 31 ladder_assigner (PeakLadderAssigner): Object of PeakLadderAssigner class. 32 33 Returns: 34 None. 35 """ 36 self.n_knots = 2 37 self.ladder_assigner = ladder_assigner 38 self.fsa_file = self.ladder_assigner.fsa_file 39 self.best_combination = ladder_assigner.assign_ladder_peak_sizes().reshape( 40 -1, 1 41 ) 42 43 self.fit_model() 44 self.mse, self.r2 = self.model_score() 45 self.adjusted_baisepair_df = self.generate_adjusted_trace_df() 46 47 def fit_model(self) -> None: 48 """ 49 Fit model based on ladder type. 50 51 Returns: 52 None. 53 """ 54 ladder = self.fsa_file.ladder 55 if ladder == "ROX": 56 self._fit_ROX_ladder() 57 elif ladder == "LIZ" or ladder == "ORANGE": 58 self._fit_LIZ_ladder() 59 else: 60 raise LadderNotFoundError(f"'{ladder}' is not a valid ladder") 61 62 def model_score(self) -> tuple[float, float]: 63 """ 64 Calculate mean squared error and R-squared score of the fitted model. 65 66 Returns: 67 Tuple (mse, r2). 68 """ 69 true_Y = self.fsa_file.ref_sizes 70 predicted_Y = self.model.predict(self.best_combination) 71 mse = mean_squared_error(true_Y, predicted_Y) 72 r2 = r2_score(true_Y, predicted_Y) 73 74 return mse, r2 75 76 def generate_adjusted_trace_df(self) -> pd.DataFrame: 77 """ 78 Generate a dataframe with adjusted basepairs and peaks. 79 80 Returns: 81 Dataframe with columns time, peaks and basepairs. 82 """ 83 # Continue loop until all basepairs are unique by changing n_knots in SplineTransformer 84 for _ in range(10): 85 df = ( 86 pd.DataFrame({"peaks": self.fsa_file.trace}) 87 .reset_index() 88 .rename(columns={"index": "time"}) 89 .assign( 90 basepairs=lambda x: self.model.predict( 91 x.time.to_numpy().reshape(-1, 1) 92 ) 93 ) 94 .loc[lambda x: x.basepairs >= 0] 95 ) 96 97 if df.shape[0] == df.basepairs.nunique(): 98 logging.info(f"Ladder fitting model: {self.model}") 99 return df 100 # If not all bp are unique 101 self.n_knots += 1 102 self.fit_model() 103 104 # reaches only if there is a problem 105 raise ModelFittingError( 106 "There is a problem with the fitting of the model to the ladder" 107 ) 108 109 def _fit_ROX_ladder(self) -> None: 110 """ 111 Fit model with ROX ladder. 112 113 Returns: 114 None. 115 """ 116 self.model = make_pipeline( 117 SplineTransformer( 118 degree=4, n_knots=self.n_knots + 4, extrapolation="continue" 119 ), 120 LinearRegression(fit_intercept=True), 121 ) 122 123 X = self.best_combination 124 y = self.fsa_file.ref_sizes 125 126 self.model.fit(X, y) 127 128 def _fit_LIZ_ladder(self) -> None: 129 """ 130 Fit model with LIZ ladder. 131 132 Returns: 133 None. 134 """ 135 # NB! 136 # Changed the SplineTransformer(n_knots=2) instead of 3!! 137 self.model = make_pipeline( 138 SplineTransformer(degree=3, n_knots=self.n_knots, extrapolation="continue"), 139 LinearRegression(fit_intercept=True), 140 ) 141 142 X = self.best_combination 143 y = self.fsa_file.ref_sizes 144 145 self.model.fit(X, y)
26 def __init__(self, ladder_assigner: PeakLadderAssigner) -> None: 27 """ 28 Initialize FitLadderModel object with ladder_assigner object and its attributes. 29 30 Args: 31 ladder_assigner (PeakLadderAssigner): Object of PeakLadderAssigner class. 32 33 Returns: 34 None. 35 """ 36 self.n_knots = 2 37 self.ladder_assigner = ladder_assigner 38 self.fsa_file = self.ladder_assigner.fsa_file 39 self.best_combination = ladder_assigner.assign_ladder_peak_sizes().reshape( 40 -1, 1 41 ) 42 43 self.fit_model() 44 self.mse, self.r2 = self.model_score() 45 self.adjusted_baisepair_df = self.generate_adjusted_trace_df()
Initialize FitLadderModel object with ladder_assigner object and its attributes.
Args: ladder_assigner (PeakLadderAssigner): Object of PeakLadderAssigner class.
Returns: None.
47 def fit_model(self) -> None: 48 """ 49 Fit model based on ladder type. 50 51 Returns: 52 None. 53 """ 54 ladder = self.fsa_file.ladder 55 if ladder == "ROX": 56 self._fit_ROX_ladder() 57 elif ladder == "LIZ" or ladder == "ORANGE": 58 self._fit_LIZ_ladder() 59 else: 60 raise LadderNotFoundError(f"'{ladder}' is not a valid ladder")
Fit model based on ladder type.
Returns: None.
62 def model_score(self) -> tuple[float, float]: 63 """ 64 Calculate mean squared error and R-squared score of the fitted model. 65 66 Returns: 67 Tuple (mse, r2). 68 """ 69 true_Y = self.fsa_file.ref_sizes 70 predicted_Y = self.model.predict(self.best_combination) 71 mse = mean_squared_error(true_Y, predicted_Y) 72 r2 = r2_score(true_Y, predicted_Y) 73 74 return mse, r2
Calculate mean squared error and R-squared score of the fitted model.
Returns: Tuple (mse, r2).
76 def generate_adjusted_trace_df(self) -> pd.DataFrame: 77 """ 78 Generate a dataframe with adjusted basepairs and peaks. 79 80 Returns: 81 Dataframe with columns time, peaks and basepairs. 82 """ 83 # Continue loop until all basepairs are unique by changing n_knots in SplineTransformer 84 for _ in range(10): 85 df = ( 86 pd.DataFrame({"peaks": self.fsa_file.trace}) 87 .reset_index() 88 .rename(columns={"index": "time"}) 89 .assign( 90 basepairs=lambda x: self.model.predict( 91 x.time.to_numpy().reshape(-1, 1) 92 ) 93 ) 94 .loc[lambda x: x.basepairs >= 0] 95 ) 96 97 if df.shape[0] == df.basepairs.nunique(): 98 logging.info(f"Ladder fitting model: {self.model}") 99 return df 100 # If not all bp are unique 101 self.n_knots += 1 102 self.fit_model() 103 104 # reaches only if there is a problem 105 raise ModelFittingError( 106 "There is a problem with the fitting of the model to the ladder" 107 )
Generate a dataframe with adjusted basepairs and peaks.
Returns: Dataframe with columns time, peaks and basepairs.
13class PeakLadderAssigner: 14 """ 15 A class that assigns peak sizes to a ladder for use in DNA fragment size analysis. 16 The peak sizes are generated based on a given size standard and the input parameters. 17 18 Attributes: 19 fsa_file (FsaFile): an object of the FsaFile class that contains parameters for analysis 20 21 Methods: 22 assign_ladder_peak_sizes(): Assigns peak sizes to a ladder based on the given parameters and returns the best combination. 23 get_peaks(size_standard): Finds peaks in the size standard based on given parameters and returns the peaks. 24 generate_graph(peaks): Generates a graph based on the peaks and returns the graph. 25 generate_combinations(graph): Generates combinations of peaks based on the graph and returns the combinations. 26 get_best_fit(combinations, method): Finds the best fit for the given combinations and returns the best combination. 27 _polynomial_model_inv_r2_score(ladder, comb): Returns a score based on the polynomial model inverse r2 score. 28 _max_fractional_deviation_score(ladder, comb): Returns a score based on the maximum fractional deviation. 29 _max_first_derivative_score(combination): Returns a score based on the maximum first derivative. 30 _max_second_derivative_score(combination): Returns a score based on the maximum second derivative. 31 _max_spline_second_derivative_score(combination): Returns a score based on the maximum spline second derivative. 32 """ 33 34 def __init__(self, fsa_file: FsaFile) -> None: 35 """ 36 Initializes the class with an object of the FsaFile class. 37 38 Args: 39 fsa_file (FsaFile): an object of the FsaFile class that contains parameters for analysis 40 """ 41 42 self.fsa_file = fsa_file 43 44 def assign_ladder_peak_sizes(self): 45 """ 46 Assigns peak sizes to a ladder based on the given parameters and returns the best combination. 47 48 Returns: 49 np.array: the best combination of peak sizes 50 """ 51 peaks = self.get_peaks(self.fsa_file.size_standard) 52 graph = self.generate_graph(peaks) 53 combinations = self.generate_combinations(graph) 54 best_combination = self.get_best_fit(combinations) 55 56 return best_combination 57 58 def get_peaks(self, size_standard) -> np.array: 59 """ 60 Finds peaks in the size standard based on given parameters and returns the peaks. 61 62 Args: 63 size_standard (np.array): the size standard array 64 65 Returns: 66 np.array: the peaks array 67 """ 68 69 peaks_obj = signal.find_peaks( 70 size_standard, 71 distance=self.fsa_file.min_interpeak_distance, 72 height=self.fsa_file.min_height, 73 ) 74 75 peaks = peaks_obj[0] 76 heights = peaks_obj[1]["peak_heights"] 77 78 df = pd.DataFrame({"peaks": peaks, "heights": heights}) 79 80 # TODO 81 # Dropped the below, then it worked with the new test files 82 # idxmax = df["heights"].idxmax() 83 # df = df.drop(idxmax) 84 85 peaks_adj = df.nlargest(self.fsa_file.max_peak_count, ["heights"]) 86 87 return peaks_adj["peaks"].sort_values().to_numpy() 88 89 def generate_graph(self, peaks) -> nx.DiGraph: 90 """ 91 Generates a graph based on the peaks and returns the graph. 92 93 Args: 94 peaks (np.array): the peaks array 95 96 Returns: 97 nx.DiGraph: the graph object 98 """ 99 G = nx.DiGraph() 100 101 for p in peaks: 102 G.add_node(p) 103 104 i = 0 105 while i < peaks.size: 106 j = i + 1 107 while j < peaks.size: 108 diff = peaks[j] - peaks[i] 109 if diff <= self.fsa_file.max_ladder_trace_distance: 110 G.add_edge(peaks[i], peaks[j], length=diff) 111 j += 1 112 i += 1 113 114 return G 115 116 def generate_combinations(self, graph): 117 """ 118 Generates combinations of nodes from a given directed acyclic graph (DAG) 119 that satisfy certain conditions. 120 121 Parameters: 122 graph (networkx.DiGraph): a DAG representing a state machine 123 124 Returns: 125 A numpy array representing a sequence of nodes that satisfies the 126 conditions of the DAG. 127 """ 128 129 # Get start nodes that have zero in-degree 130 start_nodes = [node for node in graph.nodes if graph.in_degree(node) == 0] 131 132 # Get end nodes that have zero out-degree 133 end_nodes = [node for node in graph.nodes if graph.out_degree(node) == 0] 134 if len(start_nodes) == 0 or len(end_nodes) == 0: 135 raise ValueError("Graph does not have start or end nodes") 136 137 # Get all simple paths from start node to end node 138 all_paths = [] 139 for start_node in start_nodes: 140 for end_node in end_nodes: 141 paths = nx.all_simple_paths(graph, start_node, end_node) 142 all_paths.extend(paths) 143 144 if len(all_paths) == 0: 145 raise ValueError("No paths found from start to end nodes") 146 147 # Generate combinations of nodes that satisfy certain conditions 148 for p_arr in all_paths: 149 for i in range(0, len(p_arr) - self.fsa_file.ref_count + 1): 150 yield np.array(p_arr[i : i + self.fsa_file.ref_count]) 151 152 def get_best_fit(self, combinations, method="2nd_derivative"): 153 """ 154 Calculates the best fit for a given set of combinations using a specified method. 155 156 Parameters: 157 combinations (iterable): an iterable containing combinations of nodes 158 method (str): a string indicating the method to use. The default method is 159 "2nd_derivative". 160 161 Returns: 162 A numpy array representing the combination of nodes with the best fit. 163 """ 164 165 # Create an empty dataframe to store combinations and scores 166 df = pd.DataFrame() 167 168 # Store the combinations in the dataframe 169 df["combination"] = list(combinations) 170 171 # Calculate the score for each combination using the specified method 172 if method == "2nd_derivative": 173 df["score"] = np.vectorize(self._max_spline_second_derivative_score)( 174 df["combination"] 175 ) 176 177 if method == "first_derivative": 178 df["score"] = np.vectorize(self._max_first_derivative_score)( 179 df["combination"] 180 ) 181 182 # Sort the dataframe by score in ascending order 183 df = df.sort_values(by="score", ascending=True) 184 185 # Get the best combination (i.e., the one with the lowest score) 186 best = df.head(1) 187 188 # Return the best combination as a numpy array 189 return best.combination.squeeze() 190 191 @staticmethod 192 def _polynomial_model_inv_r2_score(ladder: np.array, comb: np.array) -> float: 193 """ 194 Calculates the inverse of the R^2 (coefficient of determination) score for a polynomial regression model. 195 196 Parameters: 197 - ladder (np.array): array containing the ladder positions of a dataset 198 - comb (np.array): array containing the corresponding comb heights of a dataset 199 200 Returns: 201 - float: the inverse of the R^2 score for the polynomial model 202 203 Notes: 204 - This method fits a polynomial regression model of degree 3 to the dataset using NumPy's polyfit function. 205 - It then calculates the predicted values of the comb heights using NumPy's poly1d function. 206 - Finally, it calculates the R^2 score between the predicted comb heights and the actual comb heights using the r2_score function 207 from the scikit-learn library, and returns the inverse of that score. 208 209 """ 210 # fit polynomial regression model 211 fit = np.polyfit(ladder, comb, 3) 212 213 # create predicted values using fitted model 214 predict = np.poly1d(fit) 215 216 # calculate R^2 score between predicted and actual values, and return its inverse 217 return 1 - r2_score(comb, predict(ladder)) 218 219 @staticmethod 220 def _max_fractional_deviation_score(ladder: np.ndarray, comb: np.ndarray): 221 """ 222 Calculates the maximum fractional deviation score for a dataset. 223 224 Parameters: 225 - ladder (np.ndarray): array containing the ladder positions of a dataset 226 - comb (np.ndarray): array containing the corresponding comb heights of a dataset 227 228 Returns: 229 - float: the maximum fractional deviation score 230 231 Notes: 232 - This method calculates the interval lengths between adjacent ladder positions and corresponding comb heights using NumPy's 233 diff function and min-max scaling with a feature range equal to the range of ladder positions. 234 - It then calculates the fractional deviation between the interval lengths and the scaled comb heights, and returns the 235 maximum deviation. 236 237 """ 238 # calculate interval lengths and scaled comb height intervals 239 l_intervals = np.diff(ladder) 240 c_intervals = np.diff(minmax_scale(comb, feature_range=(ladder[0], ladder[-1]))) 241 242 # calculate fractional deviation between interval lengths and scaled comb height intervals 243 frac_deviation = np.abs(l_intervals - c_intervals) / l_intervals 244 245 # find index of maximum deviation and return the corresponding value 246 max_frac_deviation_idx = np.argmax(frac_deviation) 247 return frac_deviation[max_frac_deviation_idx] 248 249 def _max_first_derivative_score(self, combination: np.ndarray) -> float: 250 """ 251 Calculates the maximum first derivative score for the given combination of features. 252 253 Args: 254 combination (np.ndarray): A 1-dimensional numpy array containing a combination of features. 255 256 Returns: 257 float: The maximum first derivative score for the given combination of features. 258 """ 259 # Scale the combination to the range of reference sizes 260 comb_scaled = minmax_scale( 261 combination, 262 feature_range=(self.fsa_file.ref_sizes[0], self.fsa_file.ref_sizes[-1]), 263 ) 264 265 # Calculate the differences between consecutive values of the scaled combination and the reference sizes 266 diff_intervals = np.diff(comb_scaled) - np.diff(self.fsa_file.ref_sizes) 267 268 # Calculate the absolute gradient of the differences between consecutive values 269 abs_diff_intervals_gradient = np.abs(np.gradient(diff_intervals)) 270 271 # Find the index of the maximum absolute gradient 272 max_abs_diff_intervals_gradient_idx = np.argmax(abs_diff_intervals_gradient) 273 274 # Return the maximum absolute gradient 275 return abs_diff_intervals_gradient[max_abs_diff_intervals_gradient_idx] 276 277 def _max_second_derivative_score(self, combination: np.ndarray) -> float: 278 """ 279 Calculates the maximum absolute second derivative score of the given combination. 280 281 Args: 282 combination (np.ndarray): An array of feature values for a given combination. 283 284 Returns: 285 float: The maximum absolute second derivative score of the given combination. 286 287 Raises: 288 None. 289 290 """ 291 # Scale the combination to match the range of reference sizes. 292 comb_scaled = minmax_scale( 293 combination, 294 feature_range=(self.fsa_file.ref_sizes[0], self.fsa_file.ref_sizes[-1]), 295 ) 296 297 # Calculate the difference between the scaled combination and reference sizes. 298 diff_intervals = np.diff(comb_scaled) - np.diff(self.fsa_file.ref_sizes) 299 300 # Calculate the absolute second derivative of the difference intervals. 301 abs_second_derivative = np.abs(np.gradient(np.gradient(diff_intervals))) 302 303 # Find the index of the maximum absolute second derivative score. 304 max_second_derivative_idx = np.argmax(abs_second_derivative) 305 306 # Return the maximum absolute second derivative score. 307 return abs_second_derivative[max_second_derivative_idx] 308 309 def _max_spline_second_derivative_score(self, combination: np.ndarray) -> float: 310 """ 311 Calculates the maximum absolute value of the second derivative of a given combination of values using a UnivariateSpline. 312 313 Args: 314 - self (object): an instance of the class containing the function 315 - combination (np.ndarray): an array of values representing a combination 316 317 Returns: 318 - max_value (float): the maximum absolute value of the second derivative of the given combination 319 320 """ 321 # create a UnivariateSpline object using the reference sizes and the given combination 322 spl = UnivariateSpline(self.fsa_file.ref_sizes, combination, s=0) 323 324 # calculate the second derivative of the spline 325 der2 = spl.derivative(n=2) 326 327 # evaluate the second derivative at each reference size and return the maximum absolute value 328 max_value = max(abs(der2(self.fsa_file.ref_sizes))) 329 330 return max_value
A class that assigns peak sizes to a ladder for use in DNA fragment size analysis. The peak sizes are generated based on a given size standard and the input parameters.
Attributes: fsa_file (FsaFile): an object of the FsaFile class that contains parameters for analysis
Methods: assign_ladder_peak_sizes(): Assigns peak sizes to a ladder based on the given parameters and returns the best combination. get_peaks(size_standard): Finds peaks in the size standard based on given parameters and returns the peaks. generate_graph(peaks): Generates a graph based on the peaks and returns the graph. generate_combinations(graph): Generates combinations of peaks based on the graph and returns the combinations. get_best_fit(combinations, method): Finds the best fit for the given combinations and returns the best combination. _polynomial_model_inv_r2_score(ladder, comb): Returns a score based on the polynomial model inverse r2 score. _max_fractional_deviation_score(ladder, comb): Returns a score based on the maximum fractional deviation. _max_first_derivative_score(combination): Returns a score based on the maximum first derivative. _max_second_derivative_score(combination): Returns a score based on the maximum second derivative. _max_spline_second_derivative_score(combination): Returns a score based on the maximum spline second derivative.
34 def __init__(self, fsa_file: FsaFile) -> None: 35 """ 36 Initializes the class with an object of the FsaFile class. 37 38 Args: 39 fsa_file (FsaFile): an object of the FsaFile class that contains parameters for analysis 40 """ 41 42 self.fsa_file = fsa_file
Initializes the class with an object of the FsaFile class.
Args: fsa_file (FsaFile): an object of the FsaFile class that contains parameters for analysis
44 def assign_ladder_peak_sizes(self): 45 """ 46 Assigns peak sizes to a ladder based on the given parameters and returns the best combination. 47 48 Returns: 49 np.array: the best combination of peak sizes 50 """ 51 peaks = self.get_peaks(self.fsa_file.size_standard) 52 graph = self.generate_graph(peaks) 53 combinations = self.generate_combinations(graph) 54 best_combination = self.get_best_fit(combinations) 55 56 return best_combination
Assigns peak sizes to a ladder based on the given parameters and returns the best combination.
Returns: np.array: the best combination of peak sizes
58 def get_peaks(self, size_standard) -> np.array: 59 """ 60 Finds peaks in the size standard based on given parameters and returns the peaks. 61 62 Args: 63 size_standard (np.array): the size standard array 64 65 Returns: 66 np.array: the peaks array 67 """ 68 69 peaks_obj = signal.find_peaks( 70 size_standard, 71 distance=self.fsa_file.min_interpeak_distance, 72 height=self.fsa_file.min_height, 73 ) 74 75 peaks = peaks_obj[0] 76 heights = peaks_obj[1]["peak_heights"] 77 78 df = pd.DataFrame({"peaks": peaks, "heights": heights}) 79 80 # TODO 81 # Dropped the below, then it worked with the new test files 82 # idxmax = df["heights"].idxmax() 83 # df = df.drop(idxmax) 84 85 peaks_adj = df.nlargest(self.fsa_file.max_peak_count, ["heights"]) 86 87 return peaks_adj["peaks"].sort_values().to_numpy()
Finds peaks in the size standard based on given parameters and returns the peaks.
Args: size_standard (np.array): the size standard array
Returns: np.array: the peaks array
89 def generate_graph(self, peaks) -> nx.DiGraph: 90 """ 91 Generates a graph based on the peaks and returns the graph. 92 93 Args: 94 peaks (np.array): the peaks array 95 96 Returns: 97 nx.DiGraph: the graph object 98 """ 99 G = nx.DiGraph() 100 101 for p in peaks: 102 G.add_node(p) 103 104 i = 0 105 while i < peaks.size: 106 j = i + 1 107 while j < peaks.size: 108 diff = peaks[j] - peaks[i] 109 if diff <= self.fsa_file.max_ladder_trace_distance: 110 G.add_edge(peaks[i], peaks[j], length=diff) 111 j += 1 112 i += 1 113 114 return G
Generates a graph based on the peaks and returns the graph.
Args: peaks (np.array): the peaks array
Returns: nx.DiGraph: the graph object
116 def generate_combinations(self, graph): 117 """ 118 Generates combinations of nodes from a given directed acyclic graph (DAG) 119 that satisfy certain conditions. 120 121 Parameters: 122 graph (networkx.DiGraph): a DAG representing a state machine 123 124 Returns: 125 A numpy array representing a sequence of nodes that satisfies the 126 conditions of the DAG. 127 """ 128 129 # Get start nodes that have zero in-degree 130 start_nodes = [node for node in graph.nodes if graph.in_degree(node) == 0] 131 132 # Get end nodes that have zero out-degree 133 end_nodes = [node for node in graph.nodes if graph.out_degree(node) == 0] 134 if len(start_nodes) == 0 or len(end_nodes) == 0: 135 raise ValueError("Graph does not have start or end nodes") 136 137 # Get all simple paths from start node to end node 138 all_paths = [] 139 for start_node in start_nodes: 140 for end_node in end_nodes: 141 paths = nx.all_simple_paths(graph, start_node, end_node) 142 all_paths.extend(paths) 143 144 if len(all_paths) == 0: 145 raise ValueError("No paths found from start to end nodes") 146 147 # Generate combinations of nodes that satisfy certain conditions 148 for p_arr in all_paths: 149 for i in range(0, len(p_arr) - self.fsa_file.ref_count + 1): 150 yield np.array(p_arr[i : i + self.fsa_file.ref_count])
Generates combinations of nodes from a given directed acyclic graph (DAG) that satisfy certain conditions.
Parameters: graph (networkx.DiGraph): a DAG representing a state machine
Returns: A numpy array representing a sequence of nodes that satisfies the conditions of the DAG.
152 def get_best_fit(self, combinations, method="2nd_derivative"): 153 """ 154 Calculates the best fit for a given set of combinations using a specified method. 155 156 Parameters: 157 combinations (iterable): an iterable containing combinations of nodes 158 method (str): a string indicating the method to use. The default method is 159 "2nd_derivative". 160 161 Returns: 162 A numpy array representing the combination of nodes with the best fit. 163 """ 164 165 # Create an empty dataframe to store combinations and scores 166 df = pd.DataFrame() 167 168 # Store the combinations in the dataframe 169 df["combination"] = list(combinations) 170 171 # Calculate the score for each combination using the specified method 172 if method == "2nd_derivative": 173 df["score"] = np.vectorize(self._max_spline_second_derivative_score)( 174 df["combination"] 175 ) 176 177 if method == "first_derivative": 178 df["score"] = np.vectorize(self._max_first_derivative_score)( 179 df["combination"] 180 ) 181 182 # Sort the dataframe by score in ascending order 183 df = df.sort_values(by="score", ascending=True) 184 185 # Get the best combination (i.e., the one with the lowest score) 186 best = df.head(1) 187 188 # Return the best combination as a numpy array 189 return best.combination.squeeze()
Calculates the best fit for a given set of combinations using a specified method.
Parameters: combinations (iterable): an iterable containing combinations of nodes method (str): a string indicating the method to use. The default method is "2nd_derivative".
Returns: A numpy array representing the combination of nodes with the best fit.
16def baseline_arPLS(y, ratio=0.99, lam=100, niter=1000, full_output=False): 17 L = len(y) 18 19 diag = np.ones(L - 2) 20 D = sparse.spdiags([diag, -2 * diag, diag], [0, -1, -2], L, L - 2) 21 22 H = lam * D.dot(D.T) # The transposes are flipped w.r.t the Algorithm on pg. 252 23 24 w = np.ones(L) 25 W = sparse.spdiags(w, 0, L, L) 26 27 crit = 1 28 count = 0 29 30 while crit > ratio: 31 z = linalg.spsolve(W + H, W * y) 32 d = y - z 33 dn = d[d < 0] 34 35 m = np.mean(dn) 36 s = np.std(dn) 37 38 w_new = 1 / (1 + np.exp(2 * (d - (2 * s - m)) / s)) 39 40 crit = norm(w_new - w) / norm(w) 41 42 w = w_new 43 W.setdiag(w) # Do not create a new matrix, just update diagonal values 44 45 count += 1 46 47 if count > niter: 48 print("Maximum number of iterations exceeded") 49 break 50 51 if full_output: 52 info = {"num_iter": count, "stop_criterion": crit} 53 return z, d, info 54 else: 55 return z
14class FsaFile: 15 def __init__( 16 self, 17 file: str, 18 ladder: str, 19 normalize: bool = False, 20 trace_channel: str = "DATA1", 21 size_standard_channel: str = None, 22 min_interpeak_distance: int = None, 23 min_height: int = None, 24 max_ladder_trace_distance: int = None, 25 ) -> None: 26 """ 27 Constructs an FsaFile object with the given parameters. 28 29 Args: 30 file (str): The path to the sequencing file. 31 ladder (str): The name of the ladder used for sequencing. 32 normalize (bool): Whether to normalize the data using the arPLS algorithm. 33 trace_channel (str): The channel to extract the trace data from. 34 size_standard_channel (str): The channel to extract the size standard data from. 35 min_interpeak_distance (int): The minimum distance between peaks in the ladder. 36 min_height (int): The minimum height of peaks in the ladder. 37 max_ladder_trace_distance (int): The maximum distance between the ladder and the trace. 38 39 Returns: 40 None 41 """ 42 peak_count_padding = 3 43 self.file = Path(file) 44 self.file_name = self.file.parts[-1] 45 46 # Extract data from the sequencing file 47 if ladder not in LADDERS.keys(): 48 raise LadderNotFoundError(f"'{ladder}' is not a valid ladder") 49 self.ladder = ladder 50 self.fsa = SeqIO.read(file, "abi").annotations["abif_raw"] 51 self.trace_channel = trace_channel 52 self.normalize = normalize 53 54 # Extract data from the ladder reference 55 self.ref_sizes = LADDERS[ladder]["sizes"] 56 self.ref_count = self.ref_sizes.size 57 58 # Use default values if nothing is given 59 self.size_standard_channel = size_standard_channel or LADDERS[ladder]["channel"] 60 self.min_interpeak_distance = ( 61 min_interpeak_distance or LADDERS[ladder]["distance"] 62 ) 63 self.min_height = min_height or LADDERS[ladder]["height"] 64 self.max_ladder_trace_distance = ( 65 max_ladder_trace_distance or LADDERS[ladder]["max_ladder_trace_distance"] 66 ) 67 self.max_peak_count = self.ref_count + peak_count_padding 68 69 # Normalize data if requested 70 if normalize: 71 self.size_standard = np.array( 72 baseline_arPLS(self.fsa[self.size_standard_channel]) 73 ) 74 self.trace = np.array(baseline_arPLS(self.fsa[trace_channel])) 75 else: 76 self.size_standard = np.array(self.fsa[self.size_standard_channel]) 77 self.trace = np.array(self.fsa[trace_channel]) 78 79 def __str__(self): 80 """ 81 Returns a string representation of the FsaFile object. 82 83 Returns: 84 str: A string representation of the object. 85 """ 86 return f""" 87 FsaFile-object with following parameters: 88 89 File: {self.file} 90 Filename: {self.file_name} 91 Size standard channel: {self.size_standard_channel} 92 Ladder name: {self.ladder} 93 Number of ladder steps: {self.ref_count} 94 Minimum interpeak distance: {self.min_interpeak_distance} 95 Minimum height: {self.min_height} 96 Minimum Ladder trace distance: {self.max_ladder_trace_distance} 97 Maximum peak count: {self.max_peak_count} 98 Normalized data: {self.normalize} 99 Trace Channel: {self.trace_channel} 100 Ladder Sizes: {self.ref_sizes} 101 """
15 def __init__( 16 self, 17 file: str, 18 ladder: str, 19 normalize: bool = False, 20 trace_channel: str = "DATA1", 21 size_standard_channel: str = None, 22 min_interpeak_distance: int = None, 23 min_height: int = None, 24 max_ladder_trace_distance: int = None, 25 ) -> None: 26 """ 27 Constructs an FsaFile object with the given parameters. 28 29 Args: 30 file (str): The path to the sequencing file. 31 ladder (str): The name of the ladder used for sequencing. 32 normalize (bool): Whether to normalize the data using the arPLS algorithm. 33 trace_channel (str): The channel to extract the trace data from. 34 size_standard_channel (str): The channel to extract the size standard data from. 35 min_interpeak_distance (int): The minimum distance between peaks in the ladder. 36 min_height (int): The minimum height of peaks in the ladder. 37 max_ladder_trace_distance (int): The maximum distance between the ladder and the trace. 38 39 Returns: 40 None 41 """ 42 peak_count_padding = 3 43 self.file = Path(file) 44 self.file_name = self.file.parts[-1] 45 46 # Extract data from the sequencing file 47 if ladder not in LADDERS.keys(): 48 raise LadderNotFoundError(f"'{ladder}' is not a valid ladder") 49 self.ladder = ladder 50 self.fsa = SeqIO.read(file, "abi").annotations["abif_raw"] 51 self.trace_channel = trace_channel 52 self.normalize = normalize 53 54 # Extract data from the ladder reference 55 self.ref_sizes = LADDERS[ladder]["sizes"] 56 self.ref_count = self.ref_sizes.size 57 58 # Use default values if nothing is given 59 self.size_standard_channel = size_standard_channel or LADDERS[ladder]["channel"] 60 self.min_interpeak_distance = ( 61 min_interpeak_distance or LADDERS[ladder]["distance"] 62 ) 63 self.min_height = min_height or LADDERS[ladder]["height"] 64 self.max_ladder_trace_distance = ( 65 max_ladder_trace_distance or LADDERS[ladder]["max_ladder_trace_distance"] 66 ) 67 self.max_peak_count = self.ref_count + peak_count_padding 68 69 # Normalize data if requested 70 if normalize: 71 self.size_standard = np.array( 72 baseline_arPLS(self.fsa[self.size_standard_channel]) 73 ) 74 self.trace = np.array(baseline_arPLS(self.fsa[trace_channel])) 75 else: 76 self.size_standard = np.array(self.fsa[self.size_standard_channel]) 77 self.trace = np.array(self.fsa[trace_channel])
Constructs an FsaFile object with the given parameters.
Args: file (str): The path to the sequencing file. ladder (str): The name of the ladder used for sequencing. normalize (bool): Whether to normalize the data using the arPLS algorithm. trace_channel (str): The channel to extract the trace data from. size_standard_channel (str): The channel to extract the size standard data from. min_interpeak_distance (int): The minimum distance between peaks in the ladder. min_height (int): The minimum height of peaks in the ladder. max_ladder_trace_distance (int): The maximum distance between the ladder and the trace.
Returns: None
25class PeakAreaDeMultiplex: 26 def __init__( 27 self, 28 peaks: PeakFinder, 29 cutoff: float = None, 30 ) -> None: 31 self.peaks = peaks 32 self.cutoff = cutoff or None 33 self.peaks_dataframe = self.peaks.peaks_dataframe 34 self.peak_information = self.peaks.peak_information 35 self.file_name = self.peaks.file_name 36 self.peaks_index = self.peaks.peaks_index 37 self.found_peaks = self.peaks.found_peaks 38 self.area_plots = [] 39 40 self.find_peak_widths() 41 # divade peaks based on their assay they belonging 42 self.divided_assays = self.divide_peak_assays() 43 # how many assays does this sample contain? 44 self.number_of_assays = len(self.divided_assays) 45 # divide all peaks in each assay into separate dataframes 46 self.divided_peaks = [self.divide_peaks(x) for x in self.divided_assays] 47 # logging 48 logging.info( 49 f""" 50 Number of assays found: {self.number_of_assays} 51 Number of peaks found: {self.peak_information.shape[0]} 52 """ 53 ) 54 55 def __iter__(self): 56 return PeakAreaDeMultiplexIterator(self.number_of_assays) 57 58 def find_peak_widths(self, rel_height: float = 0.95): 59 widths = peak_widths( 60 self.peaks_dataframe.peaks, 61 self.peaks_index, 62 rel_height=rel_height, 63 ) 64 65 df = pd.DataFrame(widths).T 66 df.columns = ["x", "peak_height", "peak_start", "peak_end"] 67 68 self.peak_widths = ( 69 df.assign(peak_start=lambda x: np.floor(x.peak_start).astype(int)) 70 .assign(peak_end=lambda x: np.ceil(x.peak_end).astype(int)) 71 .assign(peak_name=lambda x: range(1, x.shape[0] + 1)) 72 .merge(self.peak_information, on="peak_name") 73 ) 74 75 def divide_peak_assays(self) -> list[pd.DataFrame]: 76 """ 77 Divide the peaks belonging to different assays based on their assay number 78 """ 79 df = self.peak_widths 80 return [df.loc[df.assay == x] for x in df.assay.unique()] 81 82 def divide_peaks(self, assay: pd.DataFrame, padding: int = 4) -> list[pd.DataFrame]: 83 # add some padding to the left and right to be sure to include everything in the peak 84 return [ 85 self.peaks_dataframe.iloc[x.peak_start - padding : x.peak_end + padding] 86 for x in assay.itertuples() 87 ] 88 89 def fit_lmfit_model(self, peak_finding_model: str, assay_number: int): 90 if assay_number >= self.number_of_assays: 91 raise IndexError( 92 f""" 93 The sample only contains {self.number_of_assays} assays. 94 Use a number inside of the range. 95 Indexing starts at 0. 96 """ 97 ) 98 99 if peak_finding_model == "gauss": 100 model = GaussianModel() 101 elif peak_finding_model == "voigt": 102 model = VoigtModel() 103 elif peak_finding_model == "lorentzian": 104 model = LorentzianModel() 105 else: 106 raise NotImplementedError( 107 f""" 108 {peak_finding_model} is not implemented! 109 Options: [gauss, voigt, lorentzian] 110 """ 111 ) 112 113 fit_df = [] 114 fit_params = [] 115 fit_report = [] 116 for df in self.divided_peaks[assay_number]: 117 df = df.copy() 118 y = df.peaks.to_numpy() 119 # CHANGED to time instead of basepair 120 x = df.time.to_numpy() 121 122 params = model.guess(y, x) 123 out = model.fit(y, params, x=x) 124 125 fit_df.append(df.assign(fitted=out.best_fit, model=peak_finding_model)) 126 fit_params.append(out.values) 127 fit_report.append(out.fit_report()) 128 129 # Update the instances of the model fit 130 self.fit_df = fit_df 131 self.fit_params = fit_params 132 self.fit_report = fit_report 133 134 # TODO 135 # Fix so that the cutoff is a range or something else. 136 def calculate_quotient( 137 self, 138 ) -> None: 139 140 """ 141 :Params: 142 """ 143 areas = np.array([x["amplitude"] for x in self.fit_params]) 144 145 right_by_left = ( 146 self.cutoff is None 147 or pd.concat(self.fit_df).basepairs.mean() >= self.cutoff 148 ) 149 # if there only is 1 peak, return 0 150 if len(areas) == 1: 151 self.quotient = 0 152 return 153 154 # if there only are 2 peaks, return the quotient 155 if len(areas) == 2: 156 # left peak divided by right peak 157 if not right_by_left: 158 self.quotient = areas[0] / areas[1] 159 return 160 # right peak divided by left peak 161 self.quotient = areas[1] / areas[0] 162 return 163 164 # return the last peak divided by the mean of the peaks to the left of it 165 self.quotient = areas[-1] / areas[:-1].mean() 166 167 def peak_position_area_dataframe( 168 self, assay_number: int, name: str 169 ) -> pd.DataFrame: 170 """ 171 Returns a DataFrame of each peak and its properties 172 """ 173 dataframes = [] 174 for i, _ in enumerate(self.fit_df): 175 report = self.fit_report[i] 176 r_value = float(re.findall(r"R-squared *= (0\.\d{3})", report)[0]) 177 df = ( 178 self.fit_df[i] 179 .loc[lambda x: x.peaks == x.peaks.max()] 180 .assign(area=self.fit_params[i]["amplitude"]) 181 .assign(r_value=r_value) 182 .assign(peak_name=f"Peak {i + 1}") 183 .drop(columns="time") 184 .reset_index(drop=True) 185 .rename( 186 columns={ 187 "peaks": "peak_height", 188 "fitted": "fitted_peak_height", 189 } 190 ) 191 .drop_duplicates("peak_name") 192 .assign(file_name=self.file_name) 193 .assign(assay_name=name) 194 ) 195 dataframes.append(df) 196 197 self.assay_peak_area_df = pd.concat(dataframes).assign( 198 quotient=self.quotient, 199 peak_number=lambda x: x.shape[0], 200 assay_number=assay_number + 1, 201 ) 202 203 def plot_areas(self, peak_finding_model: str): 204 fig_areas, axs = plt.subplots( 205 1, len(self.fit_df), sharey=True, figsize=(20, 10) 206 ) 207 # if there is only one peak 208 if len(self.fit_df) == 1: 209 axs.plot(self.fit_df[0].basepairs, self.fit_df[0].peaks, "o") 210 axs.plot(self.fit_df[0].basepairs, self.fit_df[0].fitted) 211 axs.set_title(f"Peak 1 area: {self.fit_params[0]['amplitude']: .1f}") 212 axs.grid() 213 # if more than one peak 214 else: 215 for i, ax in enumerate(axs): 216 ax.plot( 217 self.fit_df[i].basepairs, 218 self.fit_df[i].peaks, 219 "o", 220 ) 221 ax.plot(self.fit_df[i].basepairs, self.fit_df[i].fitted) 222 ax.set_title( 223 f"Peak {i + 1} area: {self.fit_params[i]['amplitude']: .1f}" 224 ) 225 ax.grid() 226 227 fig_areas.suptitle(f"Quotient: {self.quotient: .2f}") 228 fig_areas.legend(["Raw data", "Model"]) 229 fig_areas.supxlabel("basepairs") 230 fig_areas.supylabel("intensity") 231 plt.close() 232 233 return fig_areas 234 235 def fit_assay_peaks( 236 self, 237 peak_finding_model: str, 238 assay_number: int, 239 name: str, 240 ) -> None: 241 """ 242 Runs fit_lmfit_model, calculate_quotient and peak_position_area_dataframe 243 """ 244 self.fit_lmfit_model(peak_finding_model, assay_number) 245 self.calculate_quotient() 246 self.peak_position_area_dataframe(assay_number, name) 247 # area plots 248 area_plot = self.plot_areas(peak_finding_model) 249 self.area_plots.append(area_plot) 250 return self.assay_peak_area_df 251 252 def assays_dataframe(self, peak_finding_model: str = "gauss"): 253 dfs = [] 254 for i in self: 255 if self.peaks.custom_peaks is not None: 256 name = self.peaks.custom_peaks.name.unique()[i] 257 else: 258 name = "" 259 dfs.append(self.fit_assay_peaks(peak_finding_model, i, name)) 260 df = pd.concat(dfs, ignore_index=True) 261 # initialize attribute self.final_df 262 self.final_df = df 263 return df
26 def __init__( 27 self, 28 peaks: PeakFinder, 29 cutoff: float = None, 30 ) -> None: 31 self.peaks = peaks 32 self.cutoff = cutoff or None 33 self.peaks_dataframe = self.peaks.peaks_dataframe 34 self.peak_information = self.peaks.peak_information 35 self.file_name = self.peaks.file_name 36 self.peaks_index = self.peaks.peaks_index 37 self.found_peaks = self.peaks.found_peaks 38 self.area_plots = [] 39 40 self.find_peak_widths() 41 # divade peaks based on their assay they belonging 42 self.divided_assays = self.divide_peak_assays() 43 # how many assays does this sample contain? 44 self.number_of_assays = len(self.divided_assays) 45 # divide all peaks in each assay into separate dataframes 46 self.divided_peaks = [self.divide_peaks(x) for x in self.divided_assays] 47 # logging 48 logging.info( 49 f""" 50 Number of assays found: {self.number_of_assays} 51 Number of peaks found: {self.peak_information.shape[0]} 52 """ 53 )
58 def find_peak_widths(self, rel_height: float = 0.95): 59 widths = peak_widths( 60 self.peaks_dataframe.peaks, 61 self.peaks_index, 62 rel_height=rel_height, 63 ) 64 65 df = pd.DataFrame(widths).T 66 df.columns = ["x", "peak_height", "peak_start", "peak_end"] 67 68 self.peak_widths = ( 69 df.assign(peak_start=lambda x: np.floor(x.peak_start).astype(int)) 70 .assign(peak_end=lambda x: np.ceil(x.peak_end).astype(int)) 71 .assign(peak_name=lambda x: range(1, x.shape[0] + 1)) 72 .merge(self.peak_information, on="peak_name") 73 )
75 def divide_peak_assays(self) -> list[pd.DataFrame]: 76 """ 77 Divide the peaks belonging to different assays based on their assay number 78 """ 79 df = self.peak_widths 80 return [df.loc[df.assay == x] for x in df.assay.unique()]
Divide the peaks belonging to different assays based on their assay number
82 def divide_peaks(self, assay: pd.DataFrame, padding: int = 4) -> list[pd.DataFrame]: 83 # add some padding to the left and right to be sure to include everything in the peak 84 return [ 85 self.peaks_dataframe.iloc[x.peak_start - padding : x.peak_end + padding] 86 for x in assay.itertuples() 87 ]
89 def fit_lmfit_model(self, peak_finding_model: str, assay_number: int): 90 if assay_number >= self.number_of_assays: 91 raise IndexError( 92 f""" 93 The sample only contains {self.number_of_assays} assays. 94 Use a number inside of the range. 95 Indexing starts at 0. 96 """ 97 ) 98 99 if peak_finding_model == "gauss": 100 model = GaussianModel() 101 elif peak_finding_model == "voigt": 102 model = VoigtModel() 103 elif peak_finding_model == "lorentzian": 104 model = LorentzianModel() 105 else: 106 raise NotImplementedError( 107 f""" 108 {peak_finding_model} is not implemented! 109 Options: [gauss, voigt, lorentzian] 110 """ 111 ) 112 113 fit_df = [] 114 fit_params = [] 115 fit_report = [] 116 for df in self.divided_peaks[assay_number]: 117 df = df.copy() 118 y = df.peaks.to_numpy() 119 # CHANGED to time instead of basepair 120 x = df.time.to_numpy() 121 122 params = model.guess(y, x) 123 out = model.fit(y, params, x=x) 124 125 fit_df.append(df.assign(fitted=out.best_fit, model=peak_finding_model)) 126 fit_params.append(out.values) 127 fit_report.append(out.fit_report()) 128 129 # Update the instances of the model fit 130 self.fit_df = fit_df 131 self.fit_params = fit_params 132 self.fit_report = fit_report
136 def calculate_quotient( 137 self, 138 ) -> None: 139 140 """ 141 :Params: 142 """ 143 areas = np.array([x["amplitude"] for x in self.fit_params]) 144 145 right_by_left = ( 146 self.cutoff is None 147 or pd.concat(self.fit_df).basepairs.mean() >= self.cutoff 148 ) 149 # if there only is 1 peak, return 0 150 if len(areas) == 1: 151 self.quotient = 0 152 return 153 154 # if there only are 2 peaks, return the quotient 155 if len(areas) == 2: 156 # left peak divided by right peak 157 if not right_by_left: 158 self.quotient = areas[0] / areas[1] 159 return 160 # right peak divided by left peak 161 self.quotient = areas[1] / areas[0] 162 return 163 164 # return the last peak divided by the mean of the peaks to the left of it 165 self.quotient = areas[-1] / areas[:-1].mean()
:Params:
167 def peak_position_area_dataframe( 168 self, assay_number: int, name: str 169 ) -> pd.DataFrame: 170 """ 171 Returns a DataFrame of each peak and its properties 172 """ 173 dataframes = [] 174 for i, _ in enumerate(self.fit_df): 175 report = self.fit_report[i] 176 r_value = float(re.findall(r"R-squared *= (0\.\d{3})", report)[0]) 177 df = ( 178 self.fit_df[i] 179 .loc[lambda x: x.peaks == x.peaks.max()] 180 .assign(area=self.fit_params[i]["amplitude"]) 181 .assign(r_value=r_value) 182 .assign(peak_name=f"Peak {i + 1}") 183 .drop(columns="time") 184 .reset_index(drop=True) 185 .rename( 186 columns={ 187 "peaks": "peak_height", 188 "fitted": "fitted_peak_height", 189 } 190 ) 191 .drop_duplicates("peak_name") 192 .assign(file_name=self.file_name) 193 .assign(assay_name=name) 194 ) 195 dataframes.append(df) 196 197 self.assay_peak_area_df = pd.concat(dataframes).assign( 198 quotient=self.quotient, 199 peak_number=lambda x: x.shape[0], 200 assay_number=assay_number + 1, 201 )
Returns a DataFrame of each peak and its properties
203 def plot_areas(self, peak_finding_model: str): 204 fig_areas, axs = plt.subplots( 205 1, len(self.fit_df), sharey=True, figsize=(20, 10) 206 ) 207 # if there is only one peak 208 if len(self.fit_df) == 1: 209 axs.plot(self.fit_df[0].basepairs, self.fit_df[0].peaks, "o") 210 axs.plot(self.fit_df[0].basepairs, self.fit_df[0].fitted) 211 axs.set_title(f"Peak 1 area: {self.fit_params[0]['amplitude']: .1f}") 212 axs.grid() 213 # if more than one peak 214 else: 215 for i, ax in enumerate(axs): 216 ax.plot( 217 self.fit_df[i].basepairs, 218 self.fit_df[i].peaks, 219 "o", 220 ) 221 ax.plot(self.fit_df[i].basepairs, self.fit_df[i].fitted) 222 ax.set_title( 223 f"Peak {i + 1} area: {self.fit_params[i]['amplitude']: .1f}" 224 ) 225 ax.grid() 226 227 fig_areas.suptitle(f"Quotient: {self.quotient: .2f}") 228 fig_areas.legend(["Raw data", "Model"]) 229 fig_areas.supxlabel("basepairs") 230 fig_areas.supylabel("intensity") 231 plt.close() 232 233 return fig_areas
235 def fit_assay_peaks( 236 self, 237 peak_finding_model: str, 238 assay_number: int, 239 name: str, 240 ) -> None: 241 """ 242 Runs fit_lmfit_model, calculate_quotient and peak_position_area_dataframe 243 """ 244 self.fit_lmfit_model(peak_finding_model, assay_number) 245 self.calculate_quotient() 246 self.peak_position_area_dataframe(assay_number, name) 247 # area plots 248 area_plot = self.plot_areas(peak_finding_model) 249 self.area_plots.append(area_plot) 250 return self.assay_peak_area_df
Runs fit_lmfit_model, calculate_quotient and peak_position_area_dataframe
252 def assays_dataframe(self, peak_finding_model: str = "gauss"): 253 dfs = [] 254 for i in self: 255 if self.peaks.custom_peaks is not None: 256 name = self.peaks.custom_peaks.name.unique()[i] 257 else: 258 name = "" 259 dfs.append(self.fit_assay_peaks(peak_finding_model, i, name)) 260 df = pd.concat(dfs, ignore_index=True) 261 # initialize attribute self.final_df 262 self.final_df = df 263 return df
8class PlotPeakArea: 9 def __init__(self, peak_area: PeakAreaDeMultiplex): 10 self.peak_area = peak_area 11 12 def plot_areas(self, peak_finding_model: str, assay_number: int): 13 14 # TODO: this functions get called again here... 15 self.peak_area.fit_assay_peaks(peak_finding_model, assay_number, name="") 16 17 fig_areas, axs = plt.subplots( 18 1, len(self.peak_area.fit_df), sharey=True, figsize=(20, 10) 19 ) 20 21 # if there is only one peak 22 if len(self.peak_area.fit_df) == 1: 23 axs.plot( 24 self.peak_area.fit_df[0].basepairs, self.peak_area.fit_df[0].peaks, "o" 25 ) 26 axs.plot( 27 self.peak_area.fit_df[0].basepairs, self.peak_area.fit_df[0].fitted 28 ) 29 axs.set_title( 30 f"Peak 1 area: {self.peak_area.fit_params[0]['amplitude']: .1f}" 31 ) 32 axs.grid() 33 # if more than one peak 34 else: 35 for i, ax in enumerate(axs): 36 ax.plot( 37 self.peak_area.fit_df[i].basepairs, 38 self.peak_area.fit_df[i].peaks, 39 "o", 40 ) 41 ax.plot( 42 self.peak_area.fit_df[i].basepairs, self.peak_area.fit_df[i].fitted 43 ) 44 ax.set_title( 45 f"Peak {i + 1} area: {self.peak_area.fit_params[i]['amplitude']: .1f}" 46 ) 47 ax.grid() 48 49 fig_areas.suptitle(f"Quotient: {self.peak_area.quotient: .2f}") 50 fig_areas.legend(["Raw data", "Model"]) 51 fig_areas.supxlabel("basepairs") 52 fig_areas.supylabel("intensity") 53 plt.close() 54 55 return fig_areas
12 def plot_areas(self, peak_finding_model: str, assay_number: int): 13 14 # TODO: this functions get called again here... 15 self.peak_area.fit_assay_peaks(peak_finding_model, assay_number, name="") 16 17 fig_areas, axs = plt.subplots( 18 1, len(self.peak_area.fit_df), sharey=True, figsize=(20, 10) 19 ) 20 21 # if there is only one peak 22 if len(self.peak_area.fit_df) == 1: 23 axs.plot( 24 self.peak_area.fit_df[0].basepairs, self.peak_area.fit_df[0].peaks, "o" 25 ) 26 axs.plot( 27 self.peak_area.fit_df[0].basepairs, self.peak_area.fit_df[0].fitted 28 ) 29 axs.set_title( 30 f"Peak 1 area: {self.peak_area.fit_params[0]['amplitude']: .1f}" 31 ) 32 axs.grid() 33 # if more than one peak 34 else: 35 for i, ax in enumerate(axs): 36 ax.plot( 37 self.peak_area.fit_df[i].basepairs, 38 self.peak_area.fit_df[i].peaks, 39 "o", 40 ) 41 ax.plot( 42 self.peak_area.fit_df[i].basepairs, self.peak_area.fit_df[i].fitted 43 ) 44 ax.set_title( 45 f"Peak {i + 1} area: {self.peak_area.fit_params[i]['amplitude']: .1f}" 46 ) 47 ax.grid() 48 49 fig_areas.suptitle(f"Quotient: {self.peak_area.quotient: .2f}") 50 fig_areas.legend(["Raw data", "Model"]) 51 fig_areas.supxlabel("basepairs") 52 fig_areas.supylabel("intensity") 53 plt.close() 54 55 return fig_areas
9class PlotLadder: 10 def __init__(self, model: FitLadderModel) -> None: 11 self.model = model 12 13 @property 14 def plot_ladder_peaks(self) -> matplotlib.figure.Figure: 15 trace = self.model.fsa_file.size_standard 16 best_combination = self.model.best_combination 17 ladder_name = self.model.fsa_file.ladder 18 ladder_size = self.model.fsa_file.ref_sizes 19 20 fig_ladder_peaks = plt.figure(figsize=(20, 10)) 21 plt.plot(trace) 22 plt.plot(best_combination, trace[best_combination], "o") 23 plt.xlabel("Time") 24 plt.ylabel("Intensity") 25 plt.legend(["Trace", "Peak (bp)"]) 26 plt.title(ladder_name) 27 plt.grid() 28 29 for peak, ladder in zip(best_combination, ladder_size): 30 plt.text(peak, trace[peak], ladder) 31 32 plt.close() 33 return fig_ladder_peaks 34 35 @property 36 def plot_model_fit(self): 37 38 ladder_size = self.model.fsa_file.ref_sizes 39 best_combination = self.model.best_combination 40 41 predicted = self.model.model.predict(best_combination) 42 ladder_name = self.model.fsa_file.ladder 43 44 mse = self.model.mse 45 r2 = self.model.r2 46 47 fig_model_fit = plt.figure(figsize=(20, 10)) 48 plt.plot(ladder_size, best_combination, "o") 49 plt.plot(predicted, best_combination, "x") 50 plt.xticks(np.arange(0, np.max(ladder_size), 50)) 51 plt.xlabel("bp") 52 plt.yticks(np.arange(0, np.max(best_combination), 500)) 53 plt.suptitle(ladder_name) 54 plt.title(f"{mse=}, {r2=}") 55 # plt.title(f"{self.model.model[0]=}") 56 plt.legend(["True value", "Predicted value"]) 57 plt.grid() 58 59 plt.close() 60 return fig_model_fit
13 @property 14 def plot_ladder_peaks(self) -> matplotlib.figure.Figure: 15 trace = self.model.fsa_file.size_standard 16 best_combination = self.model.best_combination 17 ladder_name = self.model.fsa_file.ladder 18 ladder_size = self.model.fsa_file.ref_sizes 19 20 fig_ladder_peaks = plt.figure(figsize=(20, 10)) 21 plt.plot(trace) 22 plt.plot(best_combination, trace[best_combination], "o") 23 plt.xlabel("Time") 24 plt.ylabel("Intensity") 25 plt.legend(["Trace", "Peak (bp)"]) 26 plt.title(ladder_name) 27 plt.grid() 28 29 for peak, ladder in zip(best_combination, ladder_size): 30 plt.text(peak, trace[peak], ladder) 31 32 plt.close() 33 return fig_ladder_peaks
35 @property 36 def plot_model_fit(self): 37 38 ladder_size = self.model.fsa_file.ref_sizes 39 best_combination = self.model.best_combination 40 41 predicted = self.model.model.predict(best_combination) 42 ladder_name = self.model.fsa_file.ladder 43 44 mse = self.model.mse 45 r2 = self.model.r2 46 47 fig_model_fit = plt.figure(figsize=(20, 10)) 48 plt.plot(ladder_size, best_combination, "o") 49 plt.plot(predicted, best_combination, "x") 50 plt.xticks(np.arange(0, np.max(ladder_size), 50)) 51 plt.xlabel("bp") 52 plt.yticks(np.arange(0, np.max(best_combination), 500)) 53 plt.suptitle(ladder_name) 54 plt.title(f"{mse=}, {r2=}") 55 # plt.title(f"{self.model.model[0]=}") 56 plt.legend(["True value", "Predicted value"]) 57 plt.grid() 58 59 plt.close() 60 return fig_model_fit
7class PlotPeaks: 8 def __init__(self, peaks: PeakFinder): 9 self.peaks = peaks 10 11 @property 12 def plot_peaks(self): 13 fig_peaks = plt.figure(figsize=(20, 10)) 14 15 df = self.peaks.peaks_dataframe.loc[ 16 lambda x: x.basepairs > self.peaks.peak_information.basepairs.min() - 10 17 ].loc[lambda x: x.basepairs < self.peaks.peak_information.basepairs.max() + 10] 18 19 plt.plot(df.basepairs, df.peaks) 20 plt.plot( 21 self.peaks.peak_information.basepairs, 22 self.peaks.peak_information.peaks, 23 "o", 24 ) 25 for x, y in zip( 26 self.peaks.peak_information.basepairs, 27 self.peaks.peak_information.peaks, 28 ): 29 plt.text(x, y, f"{round(x, 1)} bp") 30 31 channel = self.peaks.model.fsa_file.trace_channel 32 plt.title(f"Channel: {channel}") 33 plt.xticks(np.arange(df.basepairs.min(), df.basepairs.max(), 10), rotation=90) 34 plt.ylabel("intensity") 35 plt.xlabel("basepairs") 36 plt.grid() 37 plt.close() 38 39 return fig_peaks
11 @property 12 def plot_peaks(self): 13 fig_peaks = plt.figure(figsize=(20, 10)) 14 15 df = self.peaks.peaks_dataframe.loc[ 16 lambda x: x.basepairs > self.peaks.peak_information.basepairs.min() - 10 17 ].loc[lambda x: x.basepairs < self.peaks.peak_information.basepairs.max() + 10] 18 19 plt.plot(df.basepairs, df.peaks) 20 plt.plot( 21 self.peaks.peak_information.basepairs, 22 self.peaks.peak_information.peaks, 23 "o", 24 ) 25 for x, y in zip( 26 self.peaks.peak_information.basepairs, 27 self.peaks.peak_information.peaks, 28 ): 29 plt.text(x, y, f"{round(x, 1)} bp") 30 31 channel = self.peaks.model.fsa_file.trace_channel 32 plt.title(f"Channel: {channel}") 33 plt.xticks(np.arange(df.basepairs.min(), df.basepairs.max(), 10), rotation=90) 34 plt.ylabel("intensity") 35 plt.xlabel("basepairs") 36 plt.grid() 37 plt.close() 38 39 return fig_peaks
8def generate_peak_table( 9 in_files: str | list, 10) -> pd.DataFrame: 11 if isinstance(in_files, str): 12 in_files = [x for x in Path(in_files).iterdir() if x.suffix == ".fsa"] 13 14 peak_dfs = [] 15 for x in in_files: 16 try: 17 fsa = fraggler.FsaFile( 18 x, ladder, min_height=min_height, trace_channel=trace_channel 19 ) 20 pla = fraggler.PeakLadderAssigner(fsa) 21 model = fraggler.FitLadderModel(pla) 22 pam = fraggler.PeakAreaDeMultiplex( 23 model, 24 cutoff=cutoff, 25 min_ratio=min_ratio, 26 custom_peaks=custom_peaks, 27 ) 28 peak_dfs.append(pam.assays_dataframe(peak_model)) 29 except: 30 print(f"FAILED: {fsa.file_name}") 31 32 return pd.concat(peak_dfs).reset_index(drop=True)
16def setup_logging(outdir: str) -> None: 17 """ 18 Set up the logging object and saves the log file to the same dir as the results files. 19 """ 20 NOW = datetime.now().strftime("%Y-%m-%d_%H:%M") 21 22 if not (outdir := Path(outdir)).exists(): 23 outdir.mkdir(parents=True) 24 25 LOG_FILE = f"{outdir}/fraggler_{NOW}.log" 26 27 logging.basicConfig( 28 level=logging.INFO, 29 format="%(asctime)s [%(levelname)s] \n%(message)s", 30 datefmt="%Y-%m-%d %I:%M:%S", 31 handlers=[logging.FileHandler(LOG_FILE), logging.StreamHandler()], 32 )
Set up the logging object and saves the log file to the same dir as the results files.
6def make_fsa_data_df(fsa, ladder: bool = False) -> pd.DataFrame: 7 data = ["DATA1", "DATA2", "DATA3", "DATA4"] 8 if ladder: 9 data.append("DATA205") 10 dfs = [] 11 for d in data: 12 df = ( 13 pd.DataFrame() 14 .assign(data=fsa.fsa[d]) 15 .assign(channel=d) 16 .assign(time=lambda x: range(x.shape[0])) 17 ) 18 dfs.append(df) 19 return pd.concat(dfs)
22def plot_fsa_data(fsa: str, ladder: bool = False) -> list: 23 alt.data_transformers.disable_max_rows() 24 df = make_fsa_data_df(fsa, ladder) 25 26 plots = [] 27 for channel in df.channel.unique(): 28 plot = ( 29 alt.Chart(df.loc[lambda x: x.channel == channel]) 30 .mark_line() 31 .encode( 32 alt.X("time:Q", title="Time"), 33 alt.Y("data:Q", title="Intensity"), 34 alt.Color("channel:N"), 35 ) 36 .properties( 37 width=800, 38 height=500, 39 ) # .interactive() 40 ) 41 plots.append((plot, channel)) 42 43 all_data = ( 44 alt.Chart(df) 45 .mark_line() 46 .encode( 47 alt.X("time:Q", title="Time"), 48 alt.Y("data:Q", title="Intensity"), 49 alt.Color("channel:N"), 50 ) 51 .properties( 52 width=800, 53 height=500, 54 ) # .interactive() 55 ) 56 plots.append((all_data, "All channels")) 57 return plots
77class PeakFinder: 78 def __init__( 79 self, 80 model: FitLadderModel, 81 min_ratio: float = 0.15, 82 search_peaks_start: int = 110, 83 peak_height: int = 350, 84 distance_between_assays: int = 15, 85 custom_peaks: str | pd.DataFrame = None, 86 ) -> None: 87 self.model = model 88 self.raw_data = self.model.adjusted_baisepair_df 89 self.file_name = self.model.fsa_file.file_name 90 self.search_peaks_start = search_peaks_start 91 self.peak_height = peak_height 92 self.min_ratio = min_ratio 93 self.distance_between_assays = distance_between_assays 94 self.custom_peaks = ( 95 custom_peaks.fillna(0) 96 if isinstance(custom_peaks, pd.DataFrame) 97 else pd.read_csv(custom_peaks).fillna(0) 98 if isinstance(custom_peaks, str) 99 else None 100 ) 101 102 # Validation of custom_peaks 103 self.run_validation() 104 # find peaks, custom or agnostic 105 self.find_peaks() 106 # continue whether peaks are found or not. 107 self.found_peaks() 108 109 def run_validation(self): 110 if self.custom_peaks is None: 111 return 112 113 if is_overlapping(self.custom_peaks): 114 raise OverlappingIntervalError("Overlapping intervals!") 115 exit(1) 116 117 if not has_columns(self.custom_peaks): 118 raise WrongColumnsError("Wrong columns!") 119 exit(1) 120 121 def find_peaks(self): 122 if self.custom_peaks is not None: 123 self.find_peaks_customized( 124 peak_height=self.peak_height, 125 ) 126 else: 127 self.find_peaks_agnostic( 128 peak_height=self.peak_height, 129 min_ratio=self.min_ratio, 130 distance_between_assays=self.distance_between_assays, 131 ) 132 133 def found_peaks(self): 134 # if no peaks could be found 135 if self.peak_information.shape[0] == 0: 136 self.found_peaks = False 137 logging.warning(f"No peaks could be found. Please look at raw data.") 138 # if peaks are found 139 else: 140 self.found_peaks = True 141 142 def find_peaks_agnostic( 143 self, 144 peak_height: int, 145 min_ratio: float, 146 distance_between_assays: int, 147 ) -> None: 148 peaks_dataframe = self.raw_data.loc[ 149 lambda x: x.basepairs > self.search_peaks_start 150 ] 151 peaks_index, _ = find_peaks(peaks_dataframe.peaks, height=peak_height) 152 153 peak_information = ( 154 peaks_dataframe.iloc[peaks_index] 155 .assign(peaks_index=peaks_index) 156 .assign(peak_name=lambda x: range(1, x.shape[0] + 1)) 157 # separate the peaks into different assay groups depending on the distance 158 # between the peaks 159 .assign(difference=lambda x: x.basepairs.diff()) 160 .fillna(100) 161 .assign( 162 assay=lambda x: np.select( 163 [x.difference > distance_between_assays], 164 [x.peak_name * 10], 165 default=pd.NA, 166 ) 167 ) 168 .fillna(method="ffill") 169 .assign(max_peak=lambda x: x.groupby("assay")["peaks"].transform(np.max)) 170 .assign(ratio=lambda x: x.peaks / x.max_peak) 171 .loc[lambda x: x.ratio > min_ratio] 172 .assign(peak_name=lambda x: range(1, x.shape[0] + 1)) 173 ) 174 175 # update peaks_index based on the above filtering 176 peaks_index = peak_information.peaks_index.to_numpy() 177 178 # update class attributes 179 self.peaks_index = peaks_index 180 self.peaks_dataframe = peaks_dataframe 181 self.peak_information = peak_information 182 183 def find_peaks_customized( 184 self, 185 peak_height: int, 186 ) -> None: 187 188 # Filter where to start search for the peaks 189 peaks_dataframe = self.raw_data.loc[ 190 lambda x: x.basepairs > self.search_peaks_start 191 ] 192 # Find the peaks 193 peaks_index, _ = find_peaks(peaks_dataframe.peaks, height=peak_height) 194 195 # Filter the df to get right peaks 196 peak_information = peaks_dataframe.iloc[peaks_index].assign( 197 peaks_index=peaks_index 198 ) 199 # Filter the above df based on the custom peaks from the user 200 customized_peaks = [] 201 for assay in self.custom_peaks.itertuples(): 202 df = ( 203 peak_information.loc[lambda x: x.basepairs > assay.start] 204 .loc[lambda x: x.basepairs < assay.stop] 205 .assign(assay=assay.name) 206 ) 207 208 # Rank the peaks by height and filter out the smallest ones 209 if assay.amount != 0: 210 if assay.which == "LARGEST" or assay.which == "": 211 df = ( 212 df.assign(max_peak=lambda x: x.peaks.max()) 213 .assign(ratio=lambda x: x.peaks / x.max_peak) 214 .loc[lambda x: x.ratio > assay.min_ratio] 215 .assign(rank_peak=lambda x: x.peaks.rank(ascending=False)) 216 .loc[lambda x: x.rank_peak <= assay.amount] 217 .drop(columns=["rank_peak"]) 218 ) 219 if assay.peak_distance != 0: 220 df = ( 221 df.assign(distance=lambda x: x.basepairs.diff()) 222 .assign(distance=lambda x: x.distance.fillna(0)) 223 .loc[lambda x: x.distance <= assay.peak_distance] 224 .drop(columns=["distance"]) 225 ) 226 227 elif assay.which == "FIRST": 228 df = ( 229 df.assign(max_peak=lambda x: x.peaks.max()) 230 .assign(ratio=lambda x: x.peaks / x.max_peak) 231 .loc[lambda x: x.ratio > assay.min_ratio] 232 .sort_values("basepairs", ascending=True) 233 .head(assay.amount) 234 ) 235 if assay.peak_distance != 0: 236 df = ( 237 df.assign(distance=lambda x: x.basepairs.diff()) 238 .assign(distance=lambda x: x.distance.fillna(0)) 239 .loc[lambda x: x.distance <= assay.peak_distance] 240 .drop(columns=["distance"]) 241 ) 242 else: 243 print("[ERROR]: column `which` must be `FIRST` or `LARGEST`") 244 exit(1) 245 246 customized_peaks.append(df) 247 248 peak_information = ( 249 pd.concat(customized_peaks) 250 .reset_index() 251 .assign(peak_name=lambda x: range(1, x.shape[0] + 1)) 252 ) 253 254 # update peaks_index based on the above filtering 255 peaks_index = peak_information.peaks_index.to_numpy() 256 257 # update class attributes 258 self.peaks_index = peaks_index 259 self.peaks_dataframe = peaks_dataframe 260 self.peak_information = peak_information
78 def __init__( 79 self, 80 model: FitLadderModel, 81 min_ratio: float = 0.15, 82 search_peaks_start: int = 110, 83 peak_height: int = 350, 84 distance_between_assays: int = 15, 85 custom_peaks: str | pd.DataFrame = None, 86 ) -> None: 87 self.model = model 88 self.raw_data = self.model.adjusted_baisepair_df 89 self.file_name = self.model.fsa_file.file_name 90 self.search_peaks_start = search_peaks_start 91 self.peak_height = peak_height 92 self.min_ratio = min_ratio 93 self.distance_between_assays = distance_between_assays 94 self.custom_peaks = ( 95 custom_peaks.fillna(0) 96 if isinstance(custom_peaks, pd.DataFrame) 97 else pd.read_csv(custom_peaks).fillna(0) 98 if isinstance(custom_peaks, str) 99 else None 100 ) 101 102 # Validation of custom_peaks 103 self.run_validation() 104 # find peaks, custom or agnostic 105 self.find_peaks() 106 # continue whether peaks are found or not. 107 self.found_peaks()
121 def find_peaks(self): 122 if self.custom_peaks is not None: 123 self.find_peaks_customized( 124 peak_height=self.peak_height, 125 ) 126 else: 127 self.find_peaks_agnostic( 128 peak_height=self.peak_height, 129 min_ratio=self.min_ratio, 130 distance_between_assays=self.distance_between_assays, 131 )
142 def find_peaks_agnostic( 143 self, 144 peak_height: int, 145 min_ratio: float, 146 distance_between_assays: int, 147 ) -> None: 148 peaks_dataframe = self.raw_data.loc[ 149 lambda x: x.basepairs > self.search_peaks_start 150 ] 151 peaks_index, _ = find_peaks(peaks_dataframe.peaks, height=peak_height) 152 153 peak_information = ( 154 peaks_dataframe.iloc[peaks_index] 155 .assign(peaks_index=peaks_index) 156 .assign(peak_name=lambda x: range(1, x.shape[0] + 1)) 157 # separate the peaks into different assay groups depending on the distance 158 # between the peaks 159 .assign(difference=lambda x: x.basepairs.diff()) 160 .fillna(100) 161 .assign( 162 assay=lambda x: np.select( 163 [x.difference > distance_between_assays], 164 [x.peak_name * 10], 165 default=pd.NA, 166 ) 167 ) 168 .fillna(method="ffill") 169 .assign(max_peak=lambda x: x.groupby("assay")["peaks"].transform(np.max)) 170 .assign(ratio=lambda x: x.peaks / x.max_peak) 171 .loc[lambda x: x.ratio > min_ratio] 172 .assign(peak_name=lambda x: range(1, x.shape[0] + 1)) 173 ) 174 175 # update peaks_index based on the above filtering 176 peaks_index = peak_information.peaks_index.to_numpy() 177 178 # update class attributes 179 self.peaks_index = peaks_index 180 self.peaks_dataframe = peaks_dataframe 181 self.peak_information = peak_information
183 def find_peaks_customized( 184 self, 185 peak_height: int, 186 ) -> None: 187 188 # Filter where to start search for the peaks 189 peaks_dataframe = self.raw_data.loc[ 190 lambda x: x.basepairs > self.search_peaks_start 191 ] 192 # Find the peaks 193 peaks_index, _ = find_peaks(peaks_dataframe.peaks, height=peak_height) 194 195 # Filter the df to get right peaks 196 peak_information = peaks_dataframe.iloc[peaks_index].assign( 197 peaks_index=peaks_index 198 ) 199 # Filter the above df based on the custom peaks from the user 200 customized_peaks = [] 201 for assay in self.custom_peaks.itertuples(): 202 df = ( 203 peak_information.loc[lambda x: x.basepairs > assay.start] 204 .loc[lambda x: x.basepairs < assay.stop] 205 .assign(assay=assay.name) 206 ) 207 208 # Rank the peaks by height and filter out the smallest ones 209 if assay.amount != 0: 210 if assay.which == "LARGEST" or assay.which == "": 211 df = ( 212 df.assign(max_peak=lambda x: x.peaks.max()) 213 .assign(ratio=lambda x: x.peaks / x.max_peak) 214 .loc[lambda x: x.ratio > assay.min_ratio] 215 .assign(rank_peak=lambda x: x.peaks.rank(ascending=False)) 216 .loc[lambda x: x.rank_peak <= assay.amount] 217 .drop(columns=["rank_peak"]) 218 ) 219 if assay.peak_distance != 0: 220 df = ( 221 df.assign(distance=lambda x: x.basepairs.diff()) 222 .assign(distance=lambda x: x.distance.fillna(0)) 223 .loc[lambda x: x.distance <= assay.peak_distance] 224 .drop(columns=["distance"]) 225 ) 226 227 elif assay.which == "FIRST": 228 df = ( 229 df.assign(max_peak=lambda x: x.peaks.max()) 230 .assign(ratio=lambda x: x.peaks / x.max_peak) 231 .loc[lambda x: x.ratio > assay.min_ratio] 232 .sort_values("basepairs", ascending=True) 233 .head(assay.amount) 234 ) 235 if assay.peak_distance != 0: 236 df = ( 237 df.assign(distance=lambda x: x.basepairs.diff()) 238 .assign(distance=lambda x: x.distance.fillna(0)) 239 .loc[lambda x: x.distance <= assay.peak_distance] 240 .drop(columns=["distance"]) 241 ) 242 else: 243 print("[ERROR]: column `which` must be `FIRST` or `LARGEST`") 244 exit(1) 245 246 customized_peaks.append(df) 247 248 peak_information = ( 249 pd.concat(customized_peaks) 250 .reset_index() 251 .assign(peak_name=lambda x: range(1, x.shape[0] + 1)) 252 ) 253 254 # update peaks_index based on the above filtering 255 peaks_index = peak_information.peaks_index.to_numpy() 256 257 # update class attributes 258 self.peaks_index = peaks_index 259 self.peaks_dataframe = peaks_dataframe 260 self.peak_information = peak_information
13@dataclass 14class FragglerPeak: 15 fsa: FsaFile 16 ladder_assigner: PeakLadderAssigner 17 model: FitLadderModel 18 peaks: PeakFinder
21@dataclass 22class FragglerArea: 23 fsa: FsaFile 24 ladder_assigner: PeakLadderAssigner 25 model: FitLadderModel 26 peaks: PeakFinder 27 areas: PeakAreaDeMultiplex
30def make_fraggler_peak( 31 file: Path, 32 ladder: str, 33 min_height: int, 34 min_ratio: float, 35 trace_channel: str, 36 peak_height: int, 37 custom_peaks: str | None, 38 size_standard_channel: str | None, 39 distance_between_assays: int, 40) -> FragglerPeak | str: 41 file = Path(file) 42 try: 43 fsa = FsaFile( 44 file, 45 ladder, 46 min_height=min_height, 47 trace_channel=trace_channel, 48 size_standard_channel=size_standard_channel, 49 ) 50 ladder_assigner = PeakLadderAssigner(fsa) 51 model = FitLadderModel(ladder_assigner) 52 peaks = PeakFinder( 53 model, 54 min_ratio=min_ratio, 55 peak_height=peak_height, 56 custom_peaks=custom_peaks, 57 distance_between_assays=distance_between_assays, 58 ) 59 60 return FragglerPeak( 61 fsa=fsa, 62 ladder_assigner=ladder_assigner, 63 model=model, 64 peaks=peaks, 65 ) 66 67 except Exception as e: 68 logging.error( 69 f"""Following file did not work: {file.stem} 70 Reason: {e} 71 """ 72 ) 73 74 return file.stem
77def make_fraggler_area( 78 file: Path, 79 ladder: str, 80 min_height: int, 81 min_ratio: float, 82 trace_channel: str, 83 peak_height: int, 84 custom_peaks: str | None, 85 size_standard_channel: str | None, 86 distance_between_assays: int, 87 cutoff: int, 88) -> FragglerArea | str: 89 file = Path(file) 90 try: 91 fsa = FsaFile( 92 file, 93 ladder, 94 min_height=min_height, 95 trace_channel=trace_channel, 96 size_standard_channel=size_standard_channel, 97 ) 98 ladder_assigner = PeakLadderAssigner(fsa) 99 model = FitLadderModel(ladder_assigner) 100 peaks = PeakFinder( 101 model, 102 min_ratio=min_ratio, 103 peak_height=peak_height, 104 custom_peaks=custom_peaks, 105 distance_between_assays=distance_between_assays, 106 ) 107 areas = PeakAreaDeMultiplex( 108 peaks, 109 cutoff, 110 ) 111 112 return FragglerArea( 113 fsa=fsa, 114 ladder_assigner=ladder_assigner, 115 model=model, 116 peaks=peaks, 117 areas=areas, 118 ) 119 120 except Exception as e: 121 logging.error( 122 f"""Following file did not work: {file.stem} 123 Reason: {e} 124 """ 125 ) 126 127 return file.stem
64def generate_peak_report(fraggler: FragglerPeak) -> pn.layout.base.Column: 65 ### ----- Raw Data ----- ### 66 channel_header = header( 67 text="## Plot of channels", 68 bg_color="#04c273", 69 height=80, 70 textalign="left", 71 ) 72 # PLOT 73 channel_tab = pn.Tabs() 74 for plot, name in plot_fsa_data(fraggler.fsa): 75 pane = pn.pane.Vega(plot.interactive(), sizing_mode="stretch_both", name=name) 76 channel_tab.append(pane) 77 78 channels_section = pn.Column(channel_header, channel_tab) 79 80 ### ----- Peaks ----- ### 81 peaks_header = header( 82 text="## Plot of Peaks", 83 bg_color="#04c273", 84 height=80, 85 textalign="left", 86 ) 87 88 # PLOT 89 peaks_plot = PlotPeaks(fraggler.peaks).plot_peaks 90 peaks_pane = pn.pane.Matplotlib(peaks_plot, name="Peaks") 91 92 # Section 93 peaks_tab = pn.Tabs( 94 peaks_pane, 95 ) 96 peaks_section = pn.Column(peaks_header, peaks_tab) 97 98 ### ----- Ladder Information ----- ### 99 ladder_header = header( 100 text="## Information about the ladder", 101 bg_color="#04c273", 102 height=80, 103 textalign="left", 104 ) 105 # Ladder peak plot 106 ladder_plot = PlotLadder(fraggler.model) 107 ladder_peak_plot = pn.pane.Matplotlib( 108 ladder_plot.plot_ladder_peaks, 109 name="Ladder Peak Plot", 110 ) 111 # Ladder Correlation 112 ladder_correlation_plot = pn.pane.Matplotlib( 113 ladder_plot.plot_model_fit, 114 name="Ladder Correlation Plot", 115 ) 116 117 # Section 118 ladder_tab = pn.Tabs( 119 ladder_peak_plot, 120 ladder_correlation_plot, 121 ) 122 ladder_section = pn.Column(ladder_header, ladder_tab) 123 124 ### ----- Peaks dataframe ----- ### 125 dataframe_header = header( 126 text="## Peaks Table", bg_color="#04c273", height=80, textalign="left" 127 ) 128 # Create dataframe 129 df = fraggler.peaks.peak_information.assign(file_name=fraggler.fsa.file_name)[ 130 ["file_name", "basepairs", "peaks", "assay_name"] 131 ].rename(columns={"peaks": "peak_height"}) 132 # DataFrame Tabulator 133 peaks_df_tab = pn.widgets.Tabulator( 134 df, 135 layout="fit_columns", 136 pagination="local", 137 page_size=15, 138 show_index=False, 139 name="Peaks Table", 140 ) 141 142 # Section 143 dataframe_tab = pn.Tabs(peaks_df_tab) 144 dataframe_section = pn.Column(dataframe_header, dataframe_tab) 145 146 ### CREATE REPORT ### 147 148 file_name = fraggler.fsa.file_name 149 date = fraggler.fsa.fsa["RUND1"] 150 head = make_header(file_name, date) 151 152 all_tabs = pn.Tabs( 153 ("Channels", channels_section), 154 ("Peaks", peaks_section), 155 ("Ladder", ladder_section), 156 ("Peaks Table", dataframe_section), 157 tabs_location="left", 158 ) 159 report = pn.Column( 160 head, 161 pn.layout.Divider(), 162 all_tabs, 163 ) 164 165 return report
168def generate_area_report( 169 fraggler: FragglerArea, 170 peak_model: str = "gauss", 171) -> pn.layout.base.Column: 172 173 ### ----- Raw Data ----- ### 174 channel_header = header( 175 text="## Plot of channels", 176 bg_color="#04c273", 177 height=80, 178 textalign="left", 179 ) 180 # PLOT 181 channel_tab = pn.Tabs() 182 for plot, name in plot_fsa_data(fraggler.fsa): 183 pane = pn.pane.Vega(plot.interactive(), sizing_mode="stretch_both", name=name) 184 channel_tab.append(pane) 185 186 channels_section = pn.Column(channel_header, channel_tab) 187 188 ### ----- Peaks ----- ### 189 peaks_header = header( 190 text="## Plot of Peaks", 191 bg_color="#04c273", 192 height=80, 193 textalign="left", 194 ) 195 196 # PLOT 197 peaks_plot = PlotPeaks(fraggler.peaks).plot_peaks 198 peaks_pane = pn.pane.Matplotlib(peaks_plot, name="Peaks") 199 200 # Section 201 peaks_tab = pn.Tabs( 202 peaks_pane, 203 ) 204 peaks_section = pn.Column(peaks_header, peaks_tab) 205 206 ### ----- Ladder Information ----- ### 207 ladder_header = header( 208 text="## Information about the ladder", 209 bg_color="#04c273", 210 height=80, 211 textalign="left", 212 ) 213 # Ladder peak plot 214 ladder_plot = PlotLadder(fraggler.model) 215 ladder_peak_plot = pn.pane.Matplotlib( 216 ladder_plot.plot_ladder_peaks, 217 name="Ladder Peak Plot", 218 ) 219 # Ladder Correlation 220 ladder_correlation_plot = pn.pane.Matplotlib( 221 ladder_plot.plot_model_fit, 222 name="Ladder Correlation Plot", 223 ) 224 225 # Section 226 ladder_tab = pn.Tabs( 227 ladder_peak_plot, 228 ladder_correlation_plot, 229 ) 230 ladder_section = pn.Column(ladder_header, ladder_tab) 231 232 ### ----- Areas Information ----- ### 233 areas_header = header( 234 text="## Peak Areas", bg_color="#04c273", height=80, textalign="left" 235 ) 236 areas_tab = pn.Tabs() 237 for i, plot in enumerate(fraggler.areas.area_plots): 238 name = f"Assay {i + 1}" 239 plot_pane = pn.pane.Matplotlib(plot, name=name) 240 areas_tab.append(plot_pane) 241 242 # Section 243 areas_section = pn.Column(areas_header, areas_tab) 244 245 ### ----- Peaks DataFrame ----- ### 246 dataframe_header = header( 247 text="## Peaks Table", bg_color="#04c273", height=80, textalign="left" 248 ) 249 250 if not hasattr(fraggler.areas, "final_df"): 251 df = fraggler.areas.assays_dataframe(peak_model) 252 else: 253 df = fraggler.areas.final_df 254 255 # DataFrame Tabulator 256 peaks_df_tab = pn.widgets.Tabulator( 257 df, 258 layout="fit_columns", 259 pagination="local", 260 page_size=15, 261 show_index=False, 262 name="Peaks Table", 263 ) 264 265 # Section 266 dataframe_tab = pn.Tabs(peaks_df_tab) 267 dataframe_section = pn.Column(dataframe_header, dataframe_tab) 268 269 ### CREATE REPORT ### 270 271 file_name = fraggler.fsa.file_name 272 date = fraggler.fsa.fsa["RUND1"] 273 head = make_header(file_name, date) 274 275 all_tabs = pn.Tabs( 276 ("Channels", channels_section), 277 ("Peaks", peaks_section), 278 ("Ladder", ladder_section), 279 ("Areas", areas_section), 280 ("Peak Table", dataframe_section), 281 tabs_location="left", 282 ) 283 report = pn.Column( 284 head, 285 pn.layout.Divider(), 286 all_tabs, 287 ) 288 289 return report
292def generate_no_peaks_report(fsa: FsaFile): 293 channel_header = header( 294 text="## Plot of channels", 295 bg_color="#04c273", 296 height=80, 297 textalign="left", 298 ) 299 # PLOT 300 channel_tab = pn.Tabs() 301 for plot, name in plot_fsa_data(fsa): 302 pane = pn.pane.Vega(plot.interactive(), sizing_mode="stretch_both", name=name) 303 channel_tab.append(pane) 304 channels_section = pn.Column(channel_header, channel_tab) 305 306 ### CREATE REPORT ### 307 file_name = fsa.file_name 308 date = fsa.fsa["RUND1"] 309 head = header( 310 "# No peaks could be generated. Please look at the raw data.", height=100 311 ) 312 313 all_tabs = pn.Tabs( 314 ("Channels", channels_section), 315 tabs_location="left", 316 ) 317 report = pn.Column( 318 head, 319 pn.layout.Divider(), 320 all_tabs, 321 ) 322 323 return report
160def peak_report( 161 in_path: str, 162 out_folder: str, 163 ladder: str, 164 min_ratio: float = 0.2, 165 min_height: int = 30, 166 channel: str = "DATA1", 167 peak_height: int = 500, 168 size_standard_channel: str | None = None, 169 distance_between_assays: int = 15, 170 custom_peaks: str = None, 171 out_format: str = "excel", 172) -> None: 173 174 if custom_peaks: 175 peak_height = 0 176 177 print(ASCII_ART) 178 179 # Logging 180 fraggler.setup_logging(out_folder) 181 INFO = f""" 182 Runned command: 183 {' '.join(sys.argv)} 184 185 Running fraggler with following parameters: 186 In path: {in_path} 187 Out folder: {out_folder} 188 Ladder: {ladder} 189 Min ratio: {min_ratio} 190 Min height: {min_height} 191 Trace channel: {channel} 192 Peak Height: {peak_height} 193 Custom Peaks: {custom_peaks} 194 Out format: {out_format} 195 Size standard channel: {size_standard_channel} 196 Distance between assays: {distance_between_assays} 197 """ 198 logging.info(INFO) 199 200 # Files 201 files = fraggler.get_files(in_path) 202 out_folder = Path(out_folder) 203 204 # Generate a peak area report for each file 205 failed_files = [] 206 no_peaks = [] 207 peak_dfs = [] 208 for file in files: 209 logging.info(f"Processing file: {file}") 210 fraggler_object = fraggler.make_fraggler_peak( 211 file=file, 212 ladder=ladder, 213 min_height=min_height, 214 min_ratio=min_ratio, 215 trace_channel=channel, 216 distance_between_assays=distance_between_assays, 217 size_standard_channel=size_standard_channel, 218 peak_height=peak_height, 219 custom_peaks=custom_peaks, 220 ) 221 222 # If make_fraggler_object failed 223 if isinstance(fraggler_object, str): 224 failed_files.append(fraggler_object) 225 fsa = fraggler.FsaFile(file, ladder) 226 report = fraggler.generate_no_peaks_report(fsa) 227 out_name = out_folder / f"{file.stem}_fraggler_failed.html" 228 report.save(out_name) 229 continue 230 231 if not fraggler_object.peaks.found_peaks: 232 no_peaks.append(file.stem) 233 fsa = fraggler.FsaFile(file, ladder) 234 report = fraggler.generate_no_peaks_report(fsa) 235 out_name = out_folder / f"{file.stem}_failed.html" 236 report.save(out_name) 237 continue 238 239 # add peaks to dataframe 240 df = fraggler_object.peaks.peak_information.assign( 241 file_name=fraggler_object.fsa.file_name 242 )[["file_name", "basepairs", "peaks"]].rename(columns={"peaks": "peak_height"}) 243 peak_dfs.append(df) 244 245 # generate report 246 report = fraggler.generate_peak_report(fraggler_object) 247 out_name = out_folder / f"{file.stem}_fraggler_peak.html" 248 report.save(out_name) 249 250 # Save dataframe 251 if peak_dfs: 252 save_df_format(peak_dfs, out_folder, in_path, out_format) 253 254 # log failed files 255 logging.info(f"Fraggler done for files in {in_path}!") 256 257 if failed_files: 258 failed_files = "\n".join(failed_files) 259 logging.warning(f"{f.YELLOW}Following files failed: {failed_files}") 260 if no_peaks: 261 no_peaks = "\n".join(no_peaks) 262 logging.warning(f"{f.YELLOW}Following files had no peaks: {no_peaks}")
55def area_report( 56 in_path: str, 57 out_folder: str, 58 ladder: str, 59 peak_model: str = "gauss", 60 min_ratio: float = 0.2, 61 min_height: int = 30, 62 cutoff: int = 175, 63 channel: str = "DATA1", 64 peak_height: int = 500, 65 size_standard_channel: str | None = None, 66 distance_between_assays: int = 15, 67 custom_peaks: str = None, 68 out_format: str = "excel", 69) -> None: 70 71 print(ASCII_ART) 72 if custom_peaks: 73 peak_height = 0 74 75 # Logging 76 fraggler.setup_logging(out_folder) 77 INFO = f""" 78 Runned command: 79 {' '.join(sys.argv)} 80 81 Running fraggler with following parameters: 82 In path: {in_path} 83 Out folder: {out_folder} 84 Ladder: {ladder} 85 Peak model: {peak_model} 86 Min ratio: {min_ratio} 87 Min height: {min_height} 88 Cutoff: {cutoff} 89 Trace channel: {channel} 90 Peak Height: {peak_height} 91 Custom Peaks: {custom_peaks} 92 Out format: {out_format} 93 Size standard channel: {size_standard_channel} 94 Distance between assays: {distance_between_assays} 95 """ 96 logging.info(INFO) 97 98 # Files 99 files = fraggler.get_files(in_path) 100 out_folder = Path(out_folder) 101 102 # Generate a peak area report for each file 103 failed_files = [] 104 no_peaks = [] 105 peak_dfs = [] 106 for file in files: 107 logging.info(f"Processing file: {file}") 108 fraggler_object = fraggler.make_fraggler_area( 109 file=file, 110 ladder=ladder, 111 min_height=min_height, 112 min_ratio=min_ratio, 113 trace_channel=channel, 114 distance_between_assays=distance_between_assays, 115 size_standard_channel=size_standard_channel, 116 cutoff=cutoff, 117 peak_height=peak_height, 118 custom_peaks=custom_peaks, 119 ) 120 121 # If make_fraggler_object failed 122 if isinstance(fraggler_object, str): 123 failed_files.append(fraggler_object) 124 fsa = fraggler.FsaFile(file, ladder) 125 report = fraggler.generate_no_peaks_report(fsa) 126 out_name = out_folder / f"{file.stem}_fraggler_failed.html" 127 report.save(out_name) 128 continue 129 130 if not fraggler_object.peaks.found_peaks: 131 no_peaks.append(file.stem) 132 fsa = fraggler.FsaFile(file, ladder) 133 report = fraggler.generate_no_peaks_report(fsa) 134 out_name = out_folder / f"{file.stem}_failed.html" 135 report.save(out_name) 136 continue 137 138 # generate report and peak table 139 140 peak_dfs.append(fraggler_object.areas.assays_dataframe(peak_model)) 141 report = fraggler.generate_area_report(fraggler_object, peak_model) 142 out_name = out_folder / f"{file.stem}_fraggler_area.html" 143 report.save(out_name) 144 145 # Save dataframe 146 if peak_dfs: 147 save_df_format(peak_dfs, out_folder, in_path, out_format) 148 149 # log failed files 150 logging.info(f"Fraggler done for files in {in_path}!") 151 152 if failed_files: 153 failed_files = "\n".join(failed_files) 154 logging.warning(f"{f.YELLOW}Following files failed: {failed_files}") 155 if no_peaks: 156 no_peaks = "\n".join(no_peaks) 157 logging.warning(f"{f.YELLOW}Following files had no peaks: {no_peaks}")