fraggler

Build Status !pypi Download Status

logo

Description

Fraggler is for fragment analysis in Python! Fraggler is a Python package that provides functionality for analyzing and generating reports for fsa files. It offers both a Python API and a command-line tool.


Features

Peak Area Report Generation: Fraggler allows you to generate peak area reports for all input files. The package calculates peak areas based on specified parameters and generates a report summarizing the results.

Combined Peak Table Generation: Fraggler provides a command-line tool to generate a combined dataframe of peaks for all input files. This allows you to easily analyze and compare peaks across multiple files.

Customization Options: Fraggler offers various customization options to tailor the analysis to your specific needs. You can specify parameters such as ladder type, peak model, minimum ratio, minimum height, cutoff value, trace channel, peak height, and even provide a custom peaks file for specific assays and intervals.

Install

pip install fraggler

Dependencies

Fraggler depends on:

  • pandas
  • numpy
  • scikit-learn
  • networkx
  • lmfit
  • scipy
  • biopython
  • panel
  • fire
  • colorama
  • altair

Python API

To get an overview how the library can be used in a python environment, please look at the tutorial.ipynb.

CLI

fraggler area and fraggler peak

Usage

To generate peak area reports and a peak table for all input files, use the fraggler area or fraggler peak command followed by the required positional arguments and any optional flags.

  • If not specified, fraggler finds peaks agnostic in the fsa file. To specifiy custom assays with certain peaks and intervals, the user can add a .csv file to the --custom_peaks argument. The csv file MUST have the following shape:
name start stop amount min_ratio which peak_distance
prt1 140 150 2 0.2 FIRST 5
Example how how a file could look:
name,start,stop,amount,min_ratio,which,peak_distance
prt1,135,155,2,0.2,FIRST,
prt3,190,205,,0.2,FIRST,
prt2,222,236,2,0.2,FIRST,5
prt4,262,290,5,,,
  • name: Name of the assay
  • start: Start of the assay in basepairs
  • stop: Stop of the assay in basepairs
  • amount: Optional. Amount of peaks in assay. If left empty every peak in the interval is included.
  • min_ratio: Optional. Only peaks with the a ratio of the min_ratio of the highest peak is included, e.g. if min_ratio == .02, only peaks with a height of 20 is included, if the highest peak is 100 units
  • which: LARGEST | FIRST. Can be left empty. Which peak should be included if there are more peaks than the amount. if FIRST is set, then the two first peaks are chosen. If LARGEST are set, then the two largests peaks in the area are chosen. Defaults to LARGEST
  • peak_distance: Optional. Distance between peaks must be under this value.

Positional Arguments

The following positional arguments are required:

  • IN_PATH: Type str. Specifies the input path.
  • OUT_FOLDER: Type str. Specifies the output folder.
  • LADDER: Type str. Specifies the ladder used in the experiment.

Flags

The following flags can be used with the fraggler peak or fraggler area command:

  • -l, --ladder=LADDER: Type str. Specifies the ladder. Default value: 'LIZ'.
  • --peak_model=PEAK_MODEL: Type str. Specifies the peak model. Default value: 'gauss'.
  • --min_ratio=MIN_RATIO: Type float. Specifies the minimum ratio. Default value: 0.3.
  • --min_height=MIN_HEIGHT: Type int. Specifies the minimum height. Default value: 100.
  • --cutoff=CUTOFF: Type int. Specifies the cutoff value. Default value: 175.
  • -t, --trace_channel=TRACE_CHANNEL: Type str. Specifies the trace channel. Default value: 'DATA9'.
  • --peak_height=PEAK_HEIGHT: Type int. Specifies the peak height. Default value: 200.
  • --custom_peaks=CUSTOM_PEAKS: Type Optional[str]. Specifies custom peaks. Default value: None.
  • -e, --excel=EXCEL: Type: bool, Default: True

Typical usage

fraggler area IN_FOLDER OUT_FOLDER LIZ --min_ratio=0.2 

Documentation

Click here to get full documentation of API.

Output

One example of the report generated from fraggler area can be seen here:

Contributions

Please check out How to contribute

Contributing to fraggler

We want you to contribute and help to make fraggler better! See the Table of Contents on how to do this.

If you can't help coding, but wants to help otherwise, please consider:

  • Give fraggler a star here on github
  • Cite fraggler
  • Spread the word!

Table of Contents

FAQ

First, read the README or look read the documentation Documentation

Otherwise, open a new issue and ask away!

Contributions

Make sure everthing works as expected when doing a pull request

Reporting Bugs

First, check if...:

  • You are using the latest verision.
  • Check the issues

Enhancements

How to suggest new cool features to fraggler!

First, check if...:

  • You are using the latest verision.
  • Check the issues

Attribution

  • Please, cite fraggler and spread the word! This is how you can do it:
  • Cite the paper at [HERE]
  • Star the repo!
  • Use fraggler!
  • Tell a friend
 1r"""
 2.. include:: ../README.md
 3.. include:: ../CONTRIBUTION.md
 4"""
 5
 6__author__ = "William Rosenbaum and Pär Larsson"
 7
 8from .ladder_fitting.fit_ladder_model import FitLadderModel
 9from .ladder_fitting.peak_ladder_assigner import PeakLadderAssigner
10from .ladders.ladders import LADDERS
11from .plotting.plot_ladder import PlotLadder
12from .utils.baseline_removal import baseline_arPLS
13from .utils.fsa_file import FsaFile
14from .utils.utils import get_files, setup_logging
15from .utils.fraggler_object import (
16    FragglerPeak,
17    FragglerArea,
18    make_fraggler_peak,
19    make_fraggler_area,
20)
21from .applications.peak_area_multiplex import PeakAreaDeMultiplex
22from .plotting.plot_peak_area import PlotPeakArea
23from .plotting.plot_raw_data import PlotRawData
24from .plotting.plot_peaks import PlotPeaks
25from .plotting.plot_channels import make_fsa_data_df, plot_fsa_data
26from .reports.reports import (
27    generate_peak_report,
28    generate_area_report,
29    generate_no_peaks_report,
30)
31from .functions.generate_peak_table import generate_peak_table
32from .utils.peak_finder import PeakFinder
33from .cli import peak_report, area_report
34
35
36__all__ = [
37    "FitLadderModel",
38    "PeakLadderAssigner",
39    "LADDERS",
40    "baseline_arPLS",
41    "FsaFile",
42    "PeakArea",
43    "PeakAreaDeMultiplex",
44    "PlotPeakArea",
45    "PlotLadder",
46    "PlotRawData",
47    "PlotPeaks",
48    "generate_peak_table",
49    "get_files",
50    "setup_logging",
51    "make_fsa_data_df",
52    "plot_fsa_data",
53    "PeakFinder",
54    "FragglerPeak",
55    "FragglerArea",
56    "make_fraggler_peak",
57    "make_fraggler_area",
58    "generate_peak_report",
59    "generate_area_report",
60    "generate_no_peaks_report",
61    "peak_report",
62    "area_report",
63]
class FitLadderModel:
 25class FitLadderModel:
 26    def __init__(self, ladder_assigner: PeakLadderAssigner) -> None:
 27        """
 28        Initialize FitLadderModel object with ladder_assigner object and its attributes.
 29
 30        Args:
 31            ladder_assigner (PeakLadderAssigner): Object of PeakLadderAssigner class.
 32
 33        Returns:
 34            None.
 35        """
 36        self.n_knots = 2
 37        self.ladder_assigner = ladder_assigner
 38        self.fsa_file = self.ladder_assigner.fsa_file
 39        self.best_combination = ladder_assigner.assign_ladder_peak_sizes().reshape(
 40            -1, 1
 41        )
 42
 43        self.fit_model()
 44        self.mse, self.r2 = self.model_score()
 45        self.adjusted_baisepair_df = self.generate_adjusted_trace_df()
 46
 47    def fit_model(self) -> None:
 48        """
 49        Fit model based on ladder type.
 50
 51        Returns:
 52            None.
 53        """
 54        ladder = self.fsa_file.ladder
 55        if ladder == "ROX":
 56            self._fit_ROX_ladder()
 57        elif ladder == "LIZ" or ladder == "ORANGE":
 58            self._fit_LIZ_ladder()
 59        else:
 60            raise LadderNotFoundError(f"'{ladder}' is not a valid ladder")
 61
 62    def model_score(self) -> tuple[float, float]:
 63        """
 64        Calculate mean squared error and R-squared score of the fitted model.
 65
 66        Returns:
 67            Tuple (mse, r2).
 68        """
 69        true_Y = self.fsa_file.ref_sizes
 70        predicted_Y = self.model.predict(self.best_combination)
 71        mse = mean_squared_error(true_Y, predicted_Y)
 72        r2 = r2_score(true_Y, predicted_Y)
 73
 74        return mse, r2
 75
 76    def generate_adjusted_trace_df(self) -> pd.DataFrame:
 77        """
 78        Generate a dataframe with adjusted basepairs and peaks.
 79
 80        Returns:
 81            Dataframe with columns time, peaks and basepairs.
 82        """
 83        # Continue loop until all basepairs are unique by changing n_knots in SplineTransformer
 84        for _ in range(10):
 85            df = (
 86                pd.DataFrame({"peaks": self.fsa_file.trace})
 87                .reset_index()
 88                .rename(columns={"index": "time"})
 89                .assign(
 90                    basepairs=lambda x: self.model.predict(
 91                        x.time.to_numpy().reshape(-1, 1)
 92                    )
 93                )
 94                .loc[lambda x: x.basepairs >= 0]
 95            )
 96
 97            if df.shape[0] == df.basepairs.nunique():
 98                logging.info(f"Ladder fitting model: {self.model}")
 99                return df
100            # If not all bp are unique
101            self.n_knots += 1
102            self.fit_model()
103
104        # reaches only if there is a problem
105        raise ModelFittingError(
106            "There is a problem with the fitting of the model to the ladder"
107        )
108
109    def _fit_ROX_ladder(self) -> None:
110        """
111        Fit model with ROX ladder.
112
113        Returns:
114            None.
115        """
116        self.model = make_pipeline(
117            SplineTransformer(
118                degree=4, n_knots=self.n_knots + 4, extrapolation="continue"
119            ),
120            LinearRegression(fit_intercept=True),
121        )
122
123        X = self.best_combination
124        y = self.fsa_file.ref_sizes
125
126        self.model.fit(X, y)
127
128    def _fit_LIZ_ladder(self) -> None:
129        """
130        Fit model with LIZ ladder.
131
132        Returns:
133            None.
134        """
135        # NB!
136        # Changed the SplineTransformer(n_knots=2) instead of 3!!
137        self.model = make_pipeline(
138            SplineTransformer(degree=3, n_knots=self.n_knots, extrapolation="continue"),
139            LinearRegression(fit_intercept=True),
140        )
141
142        X = self.best_combination
143        y = self.fsa_file.ref_sizes
144
145        self.model.fit(X, y)
FitLadderModel( ladder_assigner: PeakLadderAssigner)
26    def __init__(self, ladder_assigner: PeakLadderAssigner) -> None:
27        """
28        Initialize FitLadderModel object with ladder_assigner object and its attributes.
29
30        Args:
31            ladder_assigner (PeakLadderAssigner): Object of PeakLadderAssigner class.
32
33        Returns:
34            None.
35        """
36        self.n_knots = 2
37        self.ladder_assigner = ladder_assigner
38        self.fsa_file = self.ladder_assigner.fsa_file
39        self.best_combination = ladder_assigner.assign_ladder_peak_sizes().reshape(
40            -1, 1
41        )
42
43        self.fit_model()
44        self.mse, self.r2 = self.model_score()
45        self.adjusted_baisepair_df = self.generate_adjusted_trace_df()

Initialize FitLadderModel object with ladder_assigner object and its attributes.

Args: ladder_assigner (PeakLadderAssigner): Object of PeakLadderAssigner class.

Returns: None.

n_knots
ladder_assigner
fsa_file
best_combination
adjusted_baisepair_df
def fit_model(self) -> None:
47    def fit_model(self) -> None:
48        """
49        Fit model based on ladder type.
50
51        Returns:
52            None.
53        """
54        ladder = self.fsa_file.ladder
55        if ladder == "ROX":
56            self._fit_ROX_ladder()
57        elif ladder == "LIZ" or ladder == "ORANGE":
58            self._fit_LIZ_ladder()
59        else:
60            raise LadderNotFoundError(f"'{ladder}' is not a valid ladder")

Fit model based on ladder type.

Returns: None.

def model_score(self) -> tuple[float, float]:
62    def model_score(self) -> tuple[float, float]:
63        """
64        Calculate mean squared error and R-squared score of the fitted model.
65
66        Returns:
67            Tuple (mse, r2).
68        """
69        true_Y = self.fsa_file.ref_sizes
70        predicted_Y = self.model.predict(self.best_combination)
71        mse = mean_squared_error(true_Y, predicted_Y)
72        r2 = r2_score(true_Y, predicted_Y)
73
74        return mse, r2

Calculate mean squared error and R-squared score of the fitted model.

Returns: Tuple (mse, r2).

def generate_adjusted_trace_df(self) -> pandas.core.frame.DataFrame:
 76    def generate_adjusted_trace_df(self) -> pd.DataFrame:
 77        """
 78        Generate a dataframe with adjusted basepairs and peaks.
 79
 80        Returns:
 81            Dataframe with columns time, peaks and basepairs.
 82        """
 83        # Continue loop until all basepairs are unique by changing n_knots in SplineTransformer
 84        for _ in range(10):
 85            df = (
 86                pd.DataFrame({"peaks": self.fsa_file.trace})
 87                .reset_index()
 88                .rename(columns={"index": "time"})
 89                .assign(
 90                    basepairs=lambda x: self.model.predict(
 91                        x.time.to_numpy().reshape(-1, 1)
 92                    )
 93                )
 94                .loc[lambda x: x.basepairs >= 0]
 95            )
 96
 97            if df.shape[0] == df.basepairs.nunique():
 98                logging.info(f"Ladder fitting model: {self.model}")
 99                return df
100            # If not all bp are unique
101            self.n_knots += 1
102            self.fit_model()
103
104        # reaches only if there is a problem
105        raise ModelFittingError(
106            "There is a problem with the fitting of the model to the ladder"
107        )

Generate a dataframe with adjusted basepairs and peaks.

Returns: Dataframe with columns time, peaks and basepairs.

class PeakLadderAssigner:
 13class PeakLadderAssigner:
 14    """
 15    A class that assigns peak sizes to a ladder for use in DNA fragment size analysis.
 16    The peak sizes are generated based on a given size standard and the input parameters.
 17
 18    Attributes:
 19    fsa_file (FsaFile): an object of the FsaFile class that contains parameters for analysis
 20
 21    Methods:
 22    assign_ladder_peak_sizes(): Assigns peak sizes to a ladder based on the given parameters and returns the best combination.
 23    get_peaks(size_standard): Finds peaks in the size standard based on given parameters and returns the peaks.
 24    generate_graph(peaks): Generates a graph based on the peaks and returns the graph.
 25    generate_combinations(graph): Generates combinations of peaks based on the graph and returns the combinations.
 26    get_best_fit(combinations, method): Finds the best fit for the given combinations and returns the best combination.
 27    _polynomial_model_inv_r2_score(ladder, comb): Returns a score based on the polynomial model inverse r2 score.
 28    _max_fractional_deviation_score(ladder, comb): Returns a score based on the maximum fractional deviation.
 29    _max_first_derivative_score(combination): Returns a score based on the maximum first derivative.
 30    _max_second_derivative_score(combination): Returns a score based on the maximum second derivative.
 31    _max_spline_second_derivative_score(combination): Returns a score based on the maximum spline second derivative.
 32    """
 33
 34    def __init__(self, fsa_file: FsaFile) -> None:
 35        """
 36        Initializes the class with an object of the FsaFile class.
 37
 38        Args:
 39        fsa_file (FsaFile): an object of the FsaFile class that contains parameters for analysis
 40        """
 41
 42        self.fsa_file = fsa_file
 43
 44    def assign_ladder_peak_sizes(self):
 45        """
 46        Assigns peak sizes to a ladder based on the given parameters and returns the best combination.
 47
 48        Returns:
 49        np.array: the best combination of peak sizes
 50        """
 51        peaks = self.get_peaks(self.fsa_file.size_standard)
 52        graph = self.generate_graph(peaks)
 53        combinations = self.generate_combinations(graph)
 54        best_combination = self.get_best_fit(combinations)
 55
 56        return best_combination
 57
 58    def get_peaks(self, size_standard) -> np.array:
 59        """
 60        Finds peaks in the size standard based on given parameters and returns the peaks.
 61
 62        Args:
 63        size_standard (np.array): the size standard array
 64
 65        Returns:
 66        np.array: the peaks array
 67        """
 68
 69        peaks_obj = signal.find_peaks(
 70            size_standard,
 71            distance=self.fsa_file.min_interpeak_distance,
 72            height=self.fsa_file.min_height,
 73        )
 74
 75        peaks = peaks_obj[0]
 76        heights = peaks_obj[1]["peak_heights"]
 77
 78        df = pd.DataFrame({"peaks": peaks, "heights": heights})
 79
 80        # TODO
 81        # Dropped the below, then it worked with the new test files
 82        # idxmax = df["heights"].idxmax()
 83        # df = df.drop(idxmax)
 84
 85        peaks_adj = df.nlargest(self.fsa_file.max_peak_count, ["heights"])
 86
 87        return peaks_adj["peaks"].sort_values().to_numpy()
 88
 89    def generate_graph(self, peaks) -> nx.DiGraph:
 90        """
 91        Generates a graph based on the peaks and returns the graph.
 92
 93        Args:
 94        peaks (np.array): the peaks array
 95
 96        Returns:
 97        nx.DiGraph: the graph object
 98        """
 99        G = nx.DiGraph()
100
101        for p in peaks:
102            G.add_node(p)
103
104        i = 0
105        while i < peaks.size:
106            j = i + 1
107            while j < peaks.size:
108                diff = peaks[j] - peaks[i]
109                if diff <= self.fsa_file.max_ladder_trace_distance:
110                    G.add_edge(peaks[i], peaks[j], length=diff)
111                j += 1
112            i += 1
113
114        return G
115
116    def generate_combinations(self, graph):
117        """
118        Generates combinations of nodes from a given directed acyclic graph (DAG)
119        that satisfy certain conditions.
120
121        Parameters:
122        graph (networkx.DiGraph): a DAG representing a state machine
123
124        Returns:
125        A numpy array representing a sequence of nodes that satisfies the
126        conditions of the DAG.
127        """
128
129        # Get start nodes that have zero in-degree
130        start_nodes = [node for node in graph.nodes if graph.in_degree(node) == 0]
131
132        # Get end nodes that have zero out-degree
133        end_nodes = [node for node in graph.nodes if graph.out_degree(node) == 0]
134        if len(start_nodes) == 0 or len(end_nodes) == 0:
135            raise ValueError("Graph does not have start or end nodes")
136
137        # Get all simple paths from start node to end node
138        all_paths = []
139        for start_node in start_nodes:
140            for end_node in end_nodes:
141                paths = nx.all_simple_paths(graph, start_node, end_node)
142                all_paths.extend(paths)
143
144        if len(all_paths) == 0:
145            raise ValueError("No paths found from start to end nodes")
146
147        # Generate combinations of nodes that satisfy certain conditions
148        for p_arr in all_paths:
149            for i in range(0, len(p_arr) - self.fsa_file.ref_count + 1):
150                yield np.array(p_arr[i : i + self.fsa_file.ref_count])
151
152    def get_best_fit(self, combinations, method="2nd_derivative"):
153        """
154        Calculates the best fit for a given set of combinations using a specified method.
155
156        Parameters:
157        combinations (iterable): an iterable containing combinations of nodes
158        method (str): a string indicating the method to use. The default method is
159            "2nd_derivative".
160
161        Returns:
162        A numpy array representing the combination of nodes with the best fit.
163        """
164
165        # Create an empty dataframe to store combinations and scores
166        df = pd.DataFrame()
167
168        # Store the combinations in the dataframe
169        df["combination"] = list(combinations)
170
171        # Calculate the score for each combination using the specified method
172        if method == "2nd_derivative":
173            df["score"] = np.vectorize(self._max_spline_second_derivative_score)(
174                df["combination"]
175            )
176
177        if method == "first_derivative":
178            df["score"] = np.vectorize(self._max_first_derivative_score)(
179                df["combination"]
180            )
181
182        # Sort the dataframe by score in ascending order
183        df = df.sort_values(by="score", ascending=True)
184
185        # Get the best combination (i.e., the one with the lowest score)
186        best = df.head(1)
187
188        # Return the best combination as a numpy array
189        return best.combination.squeeze()
190
191    @staticmethod
192    def _polynomial_model_inv_r2_score(ladder: np.array, comb: np.array) -> float:
193        """
194        Calculates the inverse of the R^2 (coefficient of determination) score for a polynomial regression model.
195
196        Parameters:
197        - ladder (np.array): array containing the ladder positions of a dataset
198        - comb (np.array): array containing the corresponding comb heights of a dataset
199
200        Returns:
201        - float: the inverse of the R^2 score for the polynomial model
202
203        Notes:
204        - This method fits a polynomial regression model of degree 3 to the dataset using NumPy's polyfit function.
205        - It then calculates the predicted values of the comb heights using NumPy's poly1d function.
206        - Finally, it calculates the R^2 score between the predicted comb heights and the actual comb heights using the r2_score function
207          from the scikit-learn library, and returns the inverse of that score.
208
209        """
210        # fit polynomial regression model
211        fit = np.polyfit(ladder, comb, 3)
212
213        # create predicted values using fitted model
214        predict = np.poly1d(fit)
215
216        # calculate R^2 score between predicted and actual values, and return its inverse
217        return 1 - r2_score(comb, predict(ladder))
218
219    @staticmethod
220    def _max_fractional_deviation_score(ladder: np.ndarray, comb: np.ndarray):
221        """
222        Calculates the maximum fractional deviation score for a dataset.
223
224        Parameters:
225        - ladder (np.ndarray): array containing the ladder positions of a dataset
226        - comb (np.ndarray): array containing the corresponding comb heights of a dataset
227
228        Returns:
229        - float: the maximum fractional deviation score
230
231        Notes:
232        - This method calculates the interval lengths between adjacent ladder positions and corresponding comb heights using NumPy's
233          diff function and min-max scaling with a feature range equal to the range of ladder positions.
234        - It then calculates the fractional deviation between the interval lengths and the scaled comb heights, and returns the
235          maximum deviation.
236
237        """
238        # calculate interval lengths and scaled comb height intervals
239        l_intervals = np.diff(ladder)
240        c_intervals = np.diff(minmax_scale(comb, feature_range=(ladder[0], ladder[-1])))
241
242        # calculate fractional deviation between interval lengths and scaled comb height intervals
243        frac_deviation = np.abs(l_intervals - c_intervals) / l_intervals
244
245        # find index of maximum deviation and return the corresponding value
246        max_frac_deviation_idx = np.argmax(frac_deviation)
247        return frac_deviation[max_frac_deviation_idx]
248
249    def _max_first_derivative_score(self, combination: np.ndarray) -> float:
250        """
251        Calculates the maximum first derivative score for the given combination of features.
252
253        Args:
254            combination (np.ndarray): A 1-dimensional numpy array containing a combination of features.
255
256        Returns:
257            float: The maximum first derivative score for the given combination of features.
258        """
259        # Scale the combination to the range of reference sizes
260        comb_scaled = minmax_scale(
261            combination,
262            feature_range=(self.fsa_file.ref_sizes[0], self.fsa_file.ref_sizes[-1]),
263        )
264
265        # Calculate the differences between consecutive values of the scaled combination and the reference sizes
266        diff_intervals = np.diff(comb_scaled) - np.diff(self.fsa_file.ref_sizes)
267
268        # Calculate the absolute gradient of the differences between consecutive values
269        abs_diff_intervals_gradient = np.abs(np.gradient(diff_intervals))
270
271        # Find the index of the maximum absolute gradient
272        max_abs_diff_intervals_gradient_idx = np.argmax(abs_diff_intervals_gradient)
273
274        # Return the maximum absolute gradient
275        return abs_diff_intervals_gradient[max_abs_diff_intervals_gradient_idx]
276
277    def _max_second_derivative_score(self, combination: np.ndarray) -> float:
278        """
279        Calculates the maximum absolute second derivative score of the given combination.
280
281        Args:
282            combination (np.ndarray): An array of feature values for a given combination.
283
284        Returns:
285            float: The maximum absolute second derivative score of the given combination.
286
287        Raises:
288            None.
289
290        """
291        # Scale the combination to match the range of reference sizes.
292        comb_scaled = minmax_scale(
293            combination,
294            feature_range=(self.fsa_file.ref_sizes[0], self.fsa_file.ref_sizes[-1]),
295        )
296
297        # Calculate the difference between the scaled combination and reference sizes.
298        diff_intervals = np.diff(comb_scaled) - np.diff(self.fsa_file.ref_sizes)
299
300        # Calculate the absolute second derivative of the difference intervals.
301        abs_second_derivative = np.abs(np.gradient(np.gradient(diff_intervals)))
302
303        # Find the index of the maximum absolute second derivative score.
304        max_second_derivative_idx = np.argmax(abs_second_derivative)
305
306        # Return the maximum absolute second derivative score.
307        return abs_second_derivative[max_second_derivative_idx]
308
309    def _max_spline_second_derivative_score(self, combination: np.ndarray) -> float:
310        """
311        Calculates the maximum absolute value of the second derivative of a given combination of values using a UnivariateSpline.
312
313        Args:
314        - self (object): an instance of the class containing the function
315        - combination (np.ndarray): an array of values representing a combination
316
317        Returns:
318        - max_value (float): the maximum absolute value of the second derivative of the given combination
319
320        """
321        # create a UnivariateSpline object using the reference sizes and the given combination
322        spl = UnivariateSpline(self.fsa_file.ref_sizes, combination, s=0)
323
324        # calculate the second derivative of the spline
325        der2 = spl.derivative(n=2)
326
327        # evaluate the second derivative at each reference size and return the maximum absolute value
328        max_value = max(abs(der2(self.fsa_file.ref_sizes)))
329
330        return max_value

A class that assigns peak sizes to a ladder for use in DNA fragment size analysis. The peak sizes are generated based on a given size standard and the input parameters.

Attributes: fsa_file (FsaFile): an object of the FsaFile class that contains parameters for analysis

Methods: assign_ladder_peak_sizes(): Assigns peak sizes to a ladder based on the given parameters and returns the best combination. get_peaks(size_standard): Finds peaks in the size standard based on given parameters and returns the peaks. generate_graph(peaks): Generates a graph based on the peaks and returns the graph. generate_combinations(graph): Generates combinations of peaks based on the graph and returns the combinations. get_best_fit(combinations, method): Finds the best fit for the given combinations and returns the best combination. _polynomial_model_inv_r2_score(ladder, comb): Returns a score based on the polynomial model inverse r2 score. _max_fractional_deviation_score(ladder, comb): Returns a score based on the maximum fractional deviation. _max_first_derivative_score(combination): Returns a score based on the maximum first derivative. _max_second_derivative_score(combination): Returns a score based on the maximum second derivative. _max_spline_second_derivative_score(combination): Returns a score based on the maximum spline second derivative.

PeakLadderAssigner(fsa_file: FsaFile)
34    def __init__(self, fsa_file: FsaFile) -> None:
35        """
36        Initializes the class with an object of the FsaFile class.
37
38        Args:
39        fsa_file (FsaFile): an object of the FsaFile class that contains parameters for analysis
40        """
41
42        self.fsa_file = fsa_file

Initializes the class with an object of the FsaFile class.

Args: fsa_file (FsaFile): an object of the FsaFile class that contains parameters for analysis

fsa_file
def assign_ladder_peak_sizes(self):
44    def assign_ladder_peak_sizes(self):
45        """
46        Assigns peak sizes to a ladder based on the given parameters and returns the best combination.
47
48        Returns:
49        np.array: the best combination of peak sizes
50        """
51        peaks = self.get_peaks(self.fsa_file.size_standard)
52        graph = self.generate_graph(peaks)
53        combinations = self.generate_combinations(graph)
54        best_combination = self.get_best_fit(combinations)
55
56        return best_combination

Assigns peak sizes to a ladder based on the given parameters and returns the best combination.

Returns: np.array: the best combination of peak sizes

def get_peaks(self, size_standard) -> <built-in function array>:
58    def get_peaks(self, size_standard) -> np.array:
59        """
60        Finds peaks in the size standard based on given parameters and returns the peaks.
61
62        Args:
63        size_standard (np.array): the size standard array
64
65        Returns:
66        np.array: the peaks array
67        """
68
69        peaks_obj = signal.find_peaks(
70            size_standard,
71            distance=self.fsa_file.min_interpeak_distance,
72            height=self.fsa_file.min_height,
73        )
74
75        peaks = peaks_obj[0]
76        heights = peaks_obj[1]["peak_heights"]
77
78        df = pd.DataFrame({"peaks": peaks, "heights": heights})
79
80        # TODO
81        # Dropped the below, then it worked with the new test files
82        # idxmax = df["heights"].idxmax()
83        # df = df.drop(idxmax)
84
85        peaks_adj = df.nlargest(self.fsa_file.max_peak_count, ["heights"])
86
87        return peaks_adj["peaks"].sort_values().to_numpy()

Finds peaks in the size standard based on given parameters and returns the peaks.

Args: size_standard (np.array): the size standard array

Returns: np.array: the peaks array

def generate_graph(self, peaks) -> networkx.classes.digraph.DiGraph:
 89    def generate_graph(self, peaks) -> nx.DiGraph:
 90        """
 91        Generates a graph based on the peaks and returns the graph.
 92
 93        Args:
 94        peaks (np.array): the peaks array
 95
 96        Returns:
 97        nx.DiGraph: the graph object
 98        """
 99        G = nx.DiGraph()
100
101        for p in peaks:
102            G.add_node(p)
103
104        i = 0
105        while i < peaks.size:
106            j = i + 1
107            while j < peaks.size:
108                diff = peaks[j] - peaks[i]
109                if diff <= self.fsa_file.max_ladder_trace_distance:
110                    G.add_edge(peaks[i], peaks[j], length=diff)
111                j += 1
112            i += 1
113
114        return G

Generates a graph based on the peaks and returns the graph.

Args: peaks (np.array): the peaks array

Returns: nx.DiGraph: the graph object

def generate_combinations(self, graph):
116    def generate_combinations(self, graph):
117        """
118        Generates combinations of nodes from a given directed acyclic graph (DAG)
119        that satisfy certain conditions.
120
121        Parameters:
122        graph (networkx.DiGraph): a DAG representing a state machine
123
124        Returns:
125        A numpy array representing a sequence of nodes that satisfies the
126        conditions of the DAG.
127        """
128
129        # Get start nodes that have zero in-degree
130        start_nodes = [node for node in graph.nodes if graph.in_degree(node) == 0]
131
132        # Get end nodes that have zero out-degree
133        end_nodes = [node for node in graph.nodes if graph.out_degree(node) == 0]
134        if len(start_nodes) == 0 or len(end_nodes) == 0:
135            raise ValueError("Graph does not have start or end nodes")
136
137        # Get all simple paths from start node to end node
138        all_paths = []
139        for start_node in start_nodes:
140            for end_node in end_nodes:
141                paths = nx.all_simple_paths(graph, start_node, end_node)
142                all_paths.extend(paths)
143
144        if len(all_paths) == 0:
145            raise ValueError("No paths found from start to end nodes")
146
147        # Generate combinations of nodes that satisfy certain conditions
148        for p_arr in all_paths:
149            for i in range(0, len(p_arr) - self.fsa_file.ref_count + 1):
150                yield np.array(p_arr[i : i + self.fsa_file.ref_count])

Generates combinations of nodes from a given directed acyclic graph (DAG) that satisfy certain conditions.

Parameters: graph (networkx.DiGraph): a DAG representing a state machine

Returns: A numpy array representing a sequence of nodes that satisfies the conditions of the DAG.

def get_best_fit(self, combinations, method='2nd_derivative'):
152    def get_best_fit(self, combinations, method="2nd_derivative"):
153        """
154        Calculates the best fit for a given set of combinations using a specified method.
155
156        Parameters:
157        combinations (iterable): an iterable containing combinations of nodes
158        method (str): a string indicating the method to use. The default method is
159            "2nd_derivative".
160
161        Returns:
162        A numpy array representing the combination of nodes with the best fit.
163        """
164
165        # Create an empty dataframe to store combinations and scores
166        df = pd.DataFrame()
167
168        # Store the combinations in the dataframe
169        df["combination"] = list(combinations)
170
171        # Calculate the score for each combination using the specified method
172        if method == "2nd_derivative":
173            df["score"] = np.vectorize(self._max_spline_second_derivative_score)(
174                df["combination"]
175            )
176
177        if method == "first_derivative":
178            df["score"] = np.vectorize(self._max_first_derivative_score)(
179                df["combination"]
180            )
181
182        # Sort the dataframe by score in ascending order
183        df = df.sort_values(by="score", ascending=True)
184
185        # Get the best combination (i.e., the one with the lowest score)
186        best = df.head(1)
187
188        # Return the best combination as a numpy array
189        return best.combination.squeeze()

Calculates the best fit for a given set of combinations using a specified method.

Parameters: combinations (iterable): an iterable containing combinations of nodes method (str): a string indicating the method to use. The default method is "2nd_derivative".

Returns: A numpy array representing the combination of nodes with the best fit.

LADDERS = {'LIZ': {'sizes': array([ 20, 40, 60, 80, 100, 114, 120, 140, 160, 180, 200, 214, 220, 240, 260, 280, 300, 314, 320, 340, 360, 380, 400, 414, 420, 440, 460, 480, 500, 514, 520, 540, 560, 580, 600]), 'distance': 30, 'height': 100, 'max_ladder_trace_distance': 300, 'channel': 'DATA205'}, 'ROX': {'sizes': array([ 79, 90, 105, 131, 151, 182, 201, 254, 306, 337, 362, 425, 486, 509, 560, 598, 674, 739, 799, 902, 1007]), 'distance': 30, 'height': 100, 'max_ladder_trace_distance': 600, 'channel': 'DATA4'}, 'ORANGE': {'sizes': array([ 73, 88, 123, 148, 173, 198, 223, 248, 273, 298, 324, 349, 373, 398, 423, 448, 470, 495, 520, 545, 555]), 'distance': 30, 'height': 100, 'max_ladder_trace_distance': 300, 'channel': 'DATA205'}}
def baseline_arPLS(y, ratio=0.99, lam=100, niter=1000, full_output=False):
16def baseline_arPLS(y, ratio=0.99, lam=100, niter=1000, full_output=False):
17    L = len(y)
18
19    diag = np.ones(L - 2)
20    D = sparse.spdiags([diag, -2 * diag, diag], [0, -1, -2], L, L - 2)
21
22    H = lam * D.dot(D.T)  # The transposes are flipped w.r.t the Algorithm on pg. 252
23
24    w = np.ones(L)
25    W = sparse.spdiags(w, 0, L, L)
26
27    crit = 1
28    count = 0
29
30    while crit > ratio:
31        z = linalg.spsolve(W + H, W * y)
32        d = y - z
33        dn = d[d < 0]
34
35        m = np.mean(dn)
36        s = np.std(dn)
37
38        w_new = 1 / (1 + np.exp(2 * (d - (2 * s - m)) / s))
39
40        crit = norm(w_new - w) / norm(w)
41
42        w = w_new
43        W.setdiag(w)  # Do not create a new matrix, just update diagonal values
44
45        count += 1
46
47        if count > niter:
48            print("Maximum number of iterations exceeded")
49            break
50
51    if full_output:
52        info = {"num_iter": count, "stop_criterion": crit}
53        return z, d, info
54    else:
55        return z
class FsaFile:
 14class FsaFile:
 15    def __init__(
 16        self,
 17        file: str,
 18        ladder: str,
 19        normalize: bool = False,
 20        trace_channel: str = "DATA1",
 21        size_standard_channel: str = None,
 22        min_interpeak_distance: int = None,
 23        min_height: int = None,
 24        max_ladder_trace_distance: int = None,
 25    ) -> None:
 26        """
 27        Constructs an FsaFile object with the given parameters.
 28
 29        Args:
 30            file (str): The path to the sequencing file.
 31            ladder (str): The name of the ladder used for sequencing.
 32            normalize (bool): Whether to normalize the data using the arPLS algorithm.
 33            trace_channel (str): The channel to extract the trace data from.
 34            size_standard_channel (str): The channel to extract the size standard data from.
 35            min_interpeak_distance (int): The minimum distance between peaks in the ladder.
 36            min_height (int): The minimum height of peaks in the ladder.
 37            max_ladder_trace_distance (int): The maximum distance between the ladder and the trace.
 38
 39        Returns:
 40            None
 41        """
 42        peak_count_padding = 3
 43        self.file = Path(file)
 44        self.file_name = self.file.parts[-1]
 45
 46        # Extract data from the sequencing file
 47        if ladder not in LADDERS.keys():
 48            raise LadderNotFoundError(f"'{ladder}' is not a valid ladder")
 49        self.ladder = ladder
 50        self.fsa = SeqIO.read(file, "abi").annotations["abif_raw"]
 51        self.trace_channel = trace_channel
 52        self.normalize = normalize
 53
 54        # Extract data from the ladder reference
 55        self.ref_sizes = LADDERS[ladder]["sizes"]
 56        self.ref_count = self.ref_sizes.size
 57
 58        # Use default values if nothing is given
 59        self.size_standard_channel = size_standard_channel or LADDERS[ladder]["channel"]
 60        self.min_interpeak_distance = (
 61            min_interpeak_distance or LADDERS[ladder]["distance"]
 62        )
 63        self.min_height = min_height or LADDERS[ladder]["height"]
 64        self.max_ladder_trace_distance = (
 65            max_ladder_trace_distance or LADDERS[ladder]["max_ladder_trace_distance"]
 66        )
 67        self.max_peak_count = self.ref_count + peak_count_padding
 68
 69        # Normalize data if requested
 70        if normalize:
 71            self.size_standard = np.array(
 72                baseline_arPLS(self.fsa[self.size_standard_channel])
 73            )
 74            self.trace = np.array(baseline_arPLS(self.fsa[trace_channel]))
 75        else:
 76            self.size_standard = np.array(self.fsa[self.size_standard_channel])
 77            self.trace = np.array(self.fsa[trace_channel])
 78
 79    def __str__(self):
 80        """
 81        Returns a string representation of the FsaFile object.
 82
 83        Returns:
 84            str: A string representation of the object.
 85        """
 86        return f"""
 87            FsaFile-object with following parameters:
 88            
 89            File: {self.file}
 90            Filename: {self.file_name}
 91            Size standard channel: {self.size_standard_channel}
 92            Ladder name: {self.ladder}
 93            Number of ladder steps: {self.ref_count}
 94            Minimum interpeak distance: {self.min_interpeak_distance}
 95            Minimum height: {self.min_height}
 96            Minimum Ladder trace distance: {self.max_ladder_trace_distance}
 97            Maximum peak count: {self.max_peak_count}
 98            Normalized data: {self.normalize}
 99            Trace Channel: {self.trace_channel}
100            Ladder Sizes: {self.ref_sizes}
101            """
FsaFile( file: str, ladder: str, normalize: bool = False, trace_channel: str = 'DATA1', size_standard_channel: str = None, min_interpeak_distance: int = None, min_height: int = None, max_ladder_trace_distance: int = None)
15    def __init__(
16        self,
17        file: str,
18        ladder: str,
19        normalize: bool = False,
20        trace_channel: str = "DATA1",
21        size_standard_channel: str = None,
22        min_interpeak_distance: int = None,
23        min_height: int = None,
24        max_ladder_trace_distance: int = None,
25    ) -> None:
26        """
27        Constructs an FsaFile object with the given parameters.
28
29        Args:
30            file (str): The path to the sequencing file.
31            ladder (str): The name of the ladder used for sequencing.
32            normalize (bool): Whether to normalize the data using the arPLS algorithm.
33            trace_channel (str): The channel to extract the trace data from.
34            size_standard_channel (str): The channel to extract the size standard data from.
35            min_interpeak_distance (int): The minimum distance between peaks in the ladder.
36            min_height (int): The minimum height of peaks in the ladder.
37            max_ladder_trace_distance (int): The maximum distance between the ladder and the trace.
38
39        Returns:
40            None
41        """
42        peak_count_padding = 3
43        self.file = Path(file)
44        self.file_name = self.file.parts[-1]
45
46        # Extract data from the sequencing file
47        if ladder not in LADDERS.keys():
48            raise LadderNotFoundError(f"'{ladder}' is not a valid ladder")
49        self.ladder = ladder
50        self.fsa = SeqIO.read(file, "abi").annotations["abif_raw"]
51        self.trace_channel = trace_channel
52        self.normalize = normalize
53
54        # Extract data from the ladder reference
55        self.ref_sizes = LADDERS[ladder]["sizes"]
56        self.ref_count = self.ref_sizes.size
57
58        # Use default values if nothing is given
59        self.size_standard_channel = size_standard_channel or LADDERS[ladder]["channel"]
60        self.min_interpeak_distance = (
61            min_interpeak_distance or LADDERS[ladder]["distance"]
62        )
63        self.min_height = min_height or LADDERS[ladder]["height"]
64        self.max_ladder_trace_distance = (
65            max_ladder_trace_distance or LADDERS[ladder]["max_ladder_trace_distance"]
66        )
67        self.max_peak_count = self.ref_count + peak_count_padding
68
69        # Normalize data if requested
70        if normalize:
71            self.size_standard = np.array(
72                baseline_arPLS(self.fsa[self.size_standard_channel])
73            )
74            self.trace = np.array(baseline_arPLS(self.fsa[trace_channel]))
75        else:
76            self.size_standard = np.array(self.fsa[self.size_standard_channel])
77            self.trace = np.array(self.fsa[trace_channel])

Constructs an FsaFile object with the given parameters.

Args: file (str): The path to the sequencing file. ladder (str): The name of the ladder used for sequencing. normalize (bool): Whether to normalize the data using the arPLS algorithm. trace_channel (str): The channel to extract the trace data from. size_standard_channel (str): The channel to extract the size standard data from. min_interpeak_distance (int): The minimum distance between peaks in the ladder. min_height (int): The minimum height of peaks in the ladder. max_ladder_trace_distance (int): The maximum distance between the ladder and the trace.

Returns: None

file
file_name
ladder
fsa
trace_channel
normalize
ref_sizes
ref_count
size_standard_channel
min_interpeak_distance
min_height
max_ladder_trace_distance
max_peak_count
PeakArea
class PeakAreaDeMultiplex:
 25class PeakAreaDeMultiplex:
 26    def __init__(
 27        self,
 28        peaks: PeakFinder,
 29        cutoff: float = None,
 30    ) -> None:
 31        self.peaks = peaks
 32        self.cutoff = cutoff or None
 33        self.peaks_dataframe = self.peaks.peaks_dataframe
 34        self.peak_information = self.peaks.peak_information
 35        self.file_name = self.peaks.file_name
 36        self.peaks_index = self.peaks.peaks_index
 37        self.found_peaks = self.peaks.found_peaks
 38        self.area_plots = []
 39
 40        self.find_peak_widths()
 41        # divade peaks based on their assay they belonging
 42        self.divided_assays = self.divide_peak_assays()
 43        # how many assays does this sample contain?
 44        self.number_of_assays = len(self.divided_assays)
 45        # divide all peaks in each assay into separate dataframes
 46        self.divided_peaks = [self.divide_peaks(x) for x in self.divided_assays]
 47        # logging
 48        logging.info(
 49            f"""
 50        Number of assays found: {self.number_of_assays}
 51        Number of peaks found: {self.peak_information.shape[0]}
 52        """
 53        )
 54
 55    def __iter__(self):
 56        return PeakAreaDeMultiplexIterator(self.number_of_assays)
 57
 58    def find_peak_widths(self, rel_height: float = 0.95):
 59        widths = peak_widths(
 60            self.peaks_dataframe.peaks,
 61            self.peaks_index,
 62            rel_height=rel_height,
 63        )
 64
 65        df = pd.DataFrame(widths).T
 66        df.columns = ["x", "peak_height", "peak_start", "peak_end"]
 67
 68        self.peak_widths = (
 69            df.assign(peak_start=lambda x: np.floor(x.peak_start).astype(int))
 70            .assign(peak_end=lambda x: np.ceil(x.peak_end).astype(int))
 71            .assign(peak_name=lambda x: range(1, x.shape[0] + 1))
 72            .merge(self.peak_information, on="peak_name")
 73        )
 74
 75    def divide_peak_assays(self) -> list[pd.DataFrame]:
 76        """
 77        Divide the peaks belonging to different assays based on their assay number
 78        """
 79        df = self.peak_widths
 80        return [df.loc[df.assay == x] for x in df.assay.unique()]
 81
 82    def divide_peaks(self, assay: pd.DataFrame, padding: int = 4) -> list[pd.DataFrame]:
 83        # add some padding to the left and right to be sure to include everything in the peak
 84        return [
 85            self.peaks_dataframe.iloc[x.peak_start - padding : x.peak_end + padding]
 86            for x in assay.itertuples()
 87        ]
 88
 89    def fit_lmfit_model(self, peak_finding_model: str, assay_number: int):
 90        if assay_number >= self.number_of_assays:
 91            raise IndexError(
 92                f"""
 93                The sample only contains {self.number_of_assays} assays. 
 94                Use a number inside of the range.
 95                Indexing starts at 0.
 96                """
 97            )
 98
 99        if peak_finding_model == "gauss":
100            model = GaussianModel()
101        elif peak_finding_model == "voigt":
102            model = VoigtModel()
103        elif peak_finding_model == "lorentzian":
104            model = LorentzianModel()
105        else:
106            raise NotImplementedError(
107                f"""
108                {peak_finding_model} is not implemented! 
109                Options: [gauss, voigt, lorentzian]
110                """
111            )
112
113        fit_df = []
114        fit_params = []
115        fit_report = []
116        for df in self.divided_peaks[assay_number]:
117            df = df.copy()
118            y = df.peaks.to_numpy()
119            # CHANGED to time instead of basepair
120            x = df.time.to_numpy()
121
122            params = model.guess(y, x)
123            out = model.fit(y, params, x=x)
124
125            fit_df.append(df.assign(fitted=out.best_fit, model=peak_finding_model))
126            fit_params.append(out.values)
127            fit_report.append(out.fit_report())
128
129        # Update the instances of the model fit
130        self.fit_df = fit_df
131        self.fit_params = fit_params
132        self.fit_report = fit_report
133
134    # TODO
135    # Fix so that the cutoff is a range or something else.
136    def calculate_quotient(
137        self,
138    ) -> None:
139
140        """
141        :Params:
142        """
143        areas = np.array([x["amplitude"] for x in self.fit_params])
144
145        right_by_left = (
146            self.cutoff is None
147            or pd.concat(self.fit_df).basepairs.mean() >= self.cutoff
148        )
149        # if there only is 1 peak, return 0
150        if len(areas) == 1:
151            self.quotient = 0
152            return
153
154        # if there only are 2 peaks, return the quotient
155        if len(areas) == 2:
156            # left peak divided by right peak
157            if not right_by_left:
158                self.quotient = areas[0] / areas[1]
159                return
160            # right peak divided by left peak
161            self.quotient = areas[1] / areas[0]
162            return
163
164        # return the last peak divided by the mean of the peaks to the left of it
165        self.quotient = areas[-1] / areas[:-1].mean()
166
167    def peak_position_area_dataframe(
168        self, assay_number: int, name: str
169    ) -> pd.DataFrame:
170        """
171        Returns a DataFrame of each peak and its properties
172        """
173        dataframes = []
174        for i, _ in enumerate(self.fit_df):
175            report = self.fit_report[i]
176            r_value = float(re.findall(r"R-squared *= (0\.\d{3})", report)[0])
177            df = (
178                self.fit_df[i]
179                .loc[lambda x: x.peaks == x.peaks.max()]
180                .assign(area=self.fit_params[i]["amplitude"])
181                .assign(r_value=r_value)
182                .assign(peak_name=f"Peak {i + 1}")
183                .drop(columns="time")
184                .reset_index(drop=True)
185                .rename(
186                    columns={
187                        "peaks": "peak_height",
188                        "fitted": "fitted_peak_height",
189                    }
190                )
191                .drop_duplicates("peak_name")
192                .assign(file_name=self.file_name)
193                .assign(assay_name=name)
194            )
195            dataframes.append(df)
196
197        self.assay_peak_area_df = pd.concat(dataframes).assign(
198            quotient=self.quotient,
199            peak_number=lambda x: x.shape[0],
200            assay_number=assay_number + 1,
201        )
202
203    def plot_areas(self, peak_finding_model: str):
204        fig_areas, axs = plt.subplots(
205            1, len(self.fit_df), sharey=True, figsize=(20, 10)
206        )
207        # if there is only one peak
208        if len(self.fit_df) == 1:
209            axs.plot(self.fit_df[0].basepairs, self.fit_df[0].peaks, "o")
210            axs.plot(self.fit_df[0].basepairs, self.fit_df[0].fitted)
211            axs.set_title(f"Peak 1 area: {self.fit_params[0]['amplitude']: .1f}")
212            axs.grid()
213        # if more than one peak
214        else:
215            for i, ax in enumerate(axs):
216                ax.plot(
217                    self.fit_df[i].basepairs,
218                    self.fit_df[i].peaks,
219                    "o",
220                )
221                ax.plot(self.fit_df[i].basepairs, self.fit_df[i].fitted)
222                ax.set_title(
223                    f"Peak {i + 1} area: {self.fit_params[i]['amplitude']: .1f}"
224                )
225                ax.grid()
226
227        fig_areas.suptitle(f"Quotient: {self.quotient: .2f}")
228        fig_areas.legend(["Raw data", "Model"])
229        fig_areas.supxlabel("basepairs")
230        fig_areas.supylabel("intensity")
231        plt.close()
232
233        return fig_areas
234
235    def fit_assay_peaks(
236        self,
237        peak_finding_model: str,
238        assay_number: int,
239        name: str,
240    ) -> None:
241        """
242        Runs fit_lmfit_model, calculate_quotient and peak_position_area_dataframe
243        """
244        self.fit_lmfit_model(peak_finding_model, assay_number)
245        self.calculate_quotient()
246        self.peak_position_area_dataframe(assay_number, name)
247        # area plots
248        area_plot = self.plot_areas(peak_finding_model)
249        self.area_plots.append(area_plot)
250        return self.assay_peak_area_df
251
252    def assays_dataframe(self, peak_finding_model: str = "gauss"):
253        dfs = []
254        for i in self:
255            if self.peaks.custom_peaks is not None:
256                name = self.peaks.custom_peaks.name.unique()[i]
257            else:
258                name = ""
259            dfs.append(self.fit_assay_peaks(peak_finding_model, i, name))
260        df = pd.concat(dfs, ignore_index=True)
261        # initialize attribute self.final_df
262        self.final_df = df
263        return df
PeakAreaDeMultiplex(peaks: PeakFinder, cutoff: float = None)
26    def __init__(
27        self,
28        peaks: PeakFinder,
29        cutoff: float = None,
30    ) -> None:
31        self.peaks = peaks
32        self.cutoff = cutoff or None
33        self.peaks_dataframe = self.peaks.peaks_dataframe
34        self.peak_information = self.peaks.peak_information
35        self.file_name = self.peaks.file_name
36        self.peaks_index = self.peaks.peaks_index
37        self.found_peaks = self.peaks.found_peaks
38        self.area_plots = []
39
40        self.find_peak_widths()
41        # divade peaks based on their assay they belonging
42        self.divided_assays = self.divide_peak_assays()
43        # how many assays does this sample contain?
44        self.number_of_assays = len(self.divided_assays)
45        # divide all peaks in each assay into separate dataframes
46        self.divided_peaks = [self.divide_peaks(x) for x in self.divided_assays]
47        # logging
48        logging.info(
49            f"""
50        Number of assays found: {self.number_of_assays}
51        Number of peaks found: {self.peak_information.shape[0]}
52        """
53        )
peaks
cutoff
peaks_dataframe
peak_information
file_name
peaks_index
found_peaks
area_plots
divided_assays
number_of_assays
divided_peaks
def find_peak_widths(self, rel_height: float = 0.95):
58    def find_peak_widths(self, rel_height: float = 0.95):
59        widths = peak_widths(
60            self.peaks_dataframe.peaks,
61            self.peaks_index,
62            rel_height=rel_height,
63        )
64
65        df = pd.DataFrame(widths).T
66        df.columns = ["x", "peak_height", "peak_start", "peak_end"]
67
68        self.peak_widths = (
69            df.assign(peak_start=lambda x: np.floor(x.peak_start).astype(int))
70            .assign(peak_end=lambda x: np.ceil(x.peak_end).astype(int))
71            .assign(peak_name=lambda x: range(1, x.shape[0] + 1))
72            .merge(self.peak_information, on="peak_name")
73        )
def divide_peak_assays(self) -> list[pandas.core.frame.DataFrame]:
75    def divide_peak_assays(self) -> list[pd.DataFrame]:
76        """
77        Divide the peaks belonging to different assays based on their assay number
78        """
79        df = self.peak_widths
80        return [df.loc[df.assay == x] for x in df.assay.unique()]

Divide the peaks belonging to different assays based on their assay number

def divide_peaks( self, assay: pandas.core.frame.DataFrame, padding: int = 4) -> list[pandas.core.frame.DataFrame]:
82    def divide_peaks(self, assay: pd.DataFrame, padding: int = 4) -> list[pd.DataFrame]:
83        # add some padding to the left and right to be sure to include everything in the peak
84        return [
85            self.peaks_dataframe.iloc[x.peak_start - padding : x.peak_end + padding]
86            for x in assay.itertuples()
87        ]
def fit_lmfit_model(self, peak_finding_model: str, assay_number: int):
 89    def fit_lmfit_model(self, peak_finding_model: str, assay_number: int):
 90        if assay_number >= self.number_of_assays:
 91            raise IndexError(
 92                f"""
 93                The sample only contains {self.number_of_assays} assays. 
 94                Use a number inside of the range.
 95                Indexing starts at 0.
 96                """
 97            )
 98
 99        if peak_finding_model == "gauss":
100            model = GaussianModel()
101        elif peak_finding_model == "voigt":
102            model = VoigtModel()
103        elif peak_finding_model == "lorentzian":
104            model = LorentzianModel()
105        else:
106            raise NotImplementedError(
107                f"""
108                {peak_finding_model} is not implemented! 
109                Options: [gauss, voigt, lorentzian]
110                """
111            )
112
113        fit_df = []
114        fit_params = []
115        fit_report = []
116        for df in self.divided_peaks[assay_number]:
117            df = df.copy()
118            y = df.peaks.to_numpy()
119            # CHANGED to time instead of basepair
120            x = df.time.to_numpy()
121
122            params = model.guess(y, x)
123            out = model.fit(y, params, x=x)
124
125            fit_df.append(df.assign(fitted=out.best_fit, model=peak_finding_model))
126            fit_params.append(out.values)
127            fit_report.append(out.fit_report())
128
129        # Update the instances of the model fit
130        self.fit_df = fit_df
131        self.fit_params = fit_params
132        self.fit_report = fit_report
def calculate_quotient(self) -> None:
136    def calculate_quotient(
137        self,
138    ) -> None:
139
140        """
141        :Params:
142        """
143        areas = np.array([x["amplitude"] for x in self.fit_params])
144
145        right_by_left = (
146            self.cutoff is None
147            or pd.concat(self.fit_df).basepairs.mean() >= self.cutoff
148        )
149        # if there only is 1 peak, return 0
150        if len(areas) == 1:
151            self.quotient = 0
152            return
153
154        # if there only are 2 peaks, return the quotient
155        if len(areas) == 2:
156            # left peak divided by right peak
157            if not right_by_left:
158                self.quotient = areas[0] / areas[1]
159                return
160            # right peak divided by left peak
161            self.quotient = areas[1] / areas[0]
162            return
163
164        # return the last peak divided by the mean of the peaks to the left of it
165        self.quotient = areas[-1] / areas[:-1].mean()

:Params:

def peak_position_area_dataframe(self, assay_number: int, name: str) -> pandas.core.frame.DataFrame:
167    def peak_position_area_dataframe(
168        self, assay_number: int, name: str
169    ) -> pd.DataFrame:
170        """
171        Returns a DataFrame of each peak and its properties
172        """
173        dataframes = []
174        for i, _ in enumerate(self.fit_df):
175            report = self.fit_report[i]
176            r_value = float(re.findall(r"R-squared *= (0\.\d{3})", report)[0])
177            df = (
178                self.fit_df[i]
179                .loc[lambda x: x.peaks == x.peaks.max()]
180                .assign(area=self.fit_params[i]["amplitude"])
181                .assign(r_value=r_value)
182                .assign(peak_name=f"Peak {i + 1}")
183                .drop(columns="time")
184                .reset_index(drop=True)
185                .rename(
186                    columns={
187                        "peaks": "peak_height",
188                        "fitted": "fitted_peak_height",
189                    }
190                )
191                .drop_duplicates("peak_name")
192                .assign(file_name=self.file_name)
193                .assign(assay_name=name)
194            )
195            dataframes.append(df)
196
197        self.assay_peak_area_df = pd.concat(dataframes).assign(
198            quotient=self.quotient,
199            peak_number=lambda x: x.shape[0],
200            assay_number=assay_number + 1,
201        )

Returns a DataFrame of each peak and its properties

def plot_areas(self, peak_finding_model: str):
203    def plot_areas(self, peak_finding_model: str):
204        fig_areas, axs = plt.subplots(
205            1, len(self.fit_df), sharey=True, figsize=(20, 10)
206        )
207        # if there is only one peak
208        if len(self.fit_df) == 1:
209            axs.plot(self.fit_df[0].basepairs, self.fit_df[0].peaks, "o")
210            axs.plot(self.fit_df[0].basepairs, self.fit_df[0].fitted)
211            axs.set_title(f"Peak 1 area: {self.fit_params[0]['amplitude']: .1f}")
212            axs.grid()
213        # if more than one peak
214        else:
215            for i, ax in enumerate(axs):
216                ax.plot(
217                    self.fit_df[i].basepairs,
218                    self.fit_df[i].peaks,
219                    "o",
220                )
221                ax.plot(self.fit_df[i].basepairs, self.fit_df[i].fitted)
222                ax.set_title(
223                    f"Peak {i + 1} area: {self.fit_params[i]['amplitude']: .1f}"
224                )
225                ax.grid()
226
227        fig_areas.suptitle(f"Quotient: {self.quotient: .2f}")
228        fig_areas.legend(["Raw data", "Model"])
229        fig_areas.supxlabel("basepairs")
230        fig_areas.supylabel("intensity")
231        plt.close()
232
233        return fig_areas
def fit_assay_peaks(self, peak_finding_model: str, assay_number: int, name: str) -> None:
235    def fit_assay_peaks(
236        self,
237        peak_finding_model: str,
238        assay_number: int,
239        name: str,
240    ) -> None:
241        """
242        Runs fit_lmfit_model, calculate_quotient and peak_position_area_dataframe
243        """
244        self.fit_lmfit_model(peak_finding_model, assay_number)
245        self.calculate_quotient()
246        self.peak_position_area_dataframe(assay_number, name)
247        # area plots
248        area_plot = self.plot_areas(peak_finding_model)
249        self.area_plots.append(area_plot)
250        return self.assay_peak_area_df

Runs fit_lmfit_model, calculate_quotient and peak_position_area_dataframe

def assays_dataframe(self, peak_finding_model: str = 'gauss'):
252    def assays_dataframe(self, peak_finding_model: str = "gauss"):
253        dfs = []
254        for i in self:
255            if self.peaks.custom_peaks is not None:
256                name = self.peaks.custom_peaks.name.unique()[i]
257            else:
258                name = ""
259            dfs.append(self.fit_assay_peaks(peak_finding_model, i, name))
260        df = pd.concat(dfs, ignore_index=True)
261        # initialize attribute self.final_df
262        self.final_df = df
263        return df
class PlotPeakArea:
 8class PlotPeakArea:
 9    def __init__(self, peak_area: PeakAreaDeMultiplex):
10        self.peak_area = peak_area
11
12    def plot_areas(self, peak_finding_model: str, assay_number: int):
13
14        # TODO: this functions get called again here...
15        self.peak_area.fit_assay_peaks(peak_finding_model, assay_number, name="")
16
17        fig_areas, axs = plt.subplots(
18            1, len(self.peak_area.fit_df), sharey=True, figsize=(20, 10)
19        )
20
21        # if there is only one peak
22        if len(self.peak_area.fit_df) == 1:
23            axs.plot(
24                self.peak_area.fit_df[0].basepairs, self.peak_area.fit_df[0].peaks, "o"
25            )
26            axs.plot(
27                self.peak_area.fit_df[0].basepairs, self.peak_area.fit_df[0].fitted
28            )
29            axs.set_title(
30                f"Peak 1 area: {self.peak_area.fit_params[0]['amplitude']: .1f}"
31            )
32            axs.grid()
33        # if more than one peak
34        else:
35            for i, ax in enumerate(axs):
36                ax.plot(
37                    self.peak_area.fit_df[i].basepairs,
38                    self.peak_area.fit_df[i].peaks,
39                    "o",
40                )
41                ax.plot(
42                    self.peak_area.fit_df[i].basepairs, self.peak_area.fit_df[i].fitted
43                )
44                ax.set_title(
45                    f"Peak {i + 1} area: {self.peak_area.fit_params[i]['amplitude']: .1f}"
46                )
47                ax.grid()
48
49        fig_areas.suptitle(f"Quotient: {self.peak_area.quotient: .2f}")
50        fig_areas.legend(["Raw data", "Model"])
51        fig_areas.supxlabel("basepairs")
52        fig_areas.supylabel("intensity")
53        plt.close()
54
55        return fig_areas
PlotPeakArea( peak_area: PeakAreaDeMultiplex)
 9    def __init__(self, peak_area: PeakAreaDeMultiplex):
10        self.peak_area = peak_area
peak_area
def plot_areas(self, peak_finding_model: str, assay_number: int):
12    def plot_areas(self, peak_finding_model: str, assay_number: int):
13
14        # TODO: this functions get called again here...
15        self.peak_area.fit_assay_peaks(peak_finding_model, assay_number, name="")
16
17        fig_areas, axs = plt.subplots(
18            1, len(self.peak_area.fit_df), sharey=True, figsize=(20, 10)
19        )
20
21        # if there is only one peak
22        if len(self.peak_area.fit_df) == 1:
23            axs.plot(
24                self.peak_area.fit_df[0].basepairs, self.peak_area.fit_df[0].peaks, "o"
25            )
26            axs.plot(
27                self.peak_area.fit_df[0].basepairs, self.peak_area.fit_df[0].fitted
28            )
29            axs.set_title(
30                f"Peak 1 area: {self.peak_area.fit_params[0]['amplitude']: .1f}"
31            )
32            axs.grid()
33        # if more than one peak
34        else:
35            for i, ax in enumerate(axs):
36                ax.plot(
37                    self.peak_area.fit_df[i].basepairs,
38                    self.peak_area.fit_df[i].peaks,
39                    "o",
40                )
41                ax.plot(
42                    self.peak_area.fit_df[i].basepairs, self.peak_area.fit_df[i].fitted
43                )
44                ax.set_title(
45                    f"Peak {i + 1} area: {self.peak_area.fit_params[i]['amplitude']: .1f}"
46                )
47                ax.grid()
48
49        fig_areas.suptitle(f"Quotient: {self.peak_area.quotient: .2f}")
50        fig_areas.legend(["Raw data", "Model"])
51        fig_areas.supxlabel("basepairs")
52        fig_areas.supylabel("intensity")
53        plt.close()
54
55        return fig_areas
class PlotLadder:
 9class PlotLadder:
10    def __init__(self, model: FitLadderModel) -> None:
11        self.model = model
12
13    @property
14    def plot_ladder_peaks(self) -> matplotlib.figure.Figure:
15        trace = self.model.fsa_file.size_standard
16        best_combination = self.model.best_combination
17        ladder_name = self.model.fsa_file.ladder
18        ladder_size = self.model.fsa_file.ref_sizes
19
20        fig_ladder_peaks = plt.figure(figsize=(20, 10))
21        plt.plot(trace)
22        plt.plot(best_combination, trace[best_combination], "o")
23        plt.xlabel("Time")
24        plt.ylabel("Intensity")
25        plt.legend(["Trace", "Peak (bp)"])
26        plt.title(ladder_name)
27        plt.grid()
28
29        for peak, ladder in zip(best_combination, ladder_size):
30            plt.text(peak, trace[peak], ladder)
31
32        plt.close()
33        return fig_ladder_peaks
34
35    @property
36    def plot_model_fit(self):
37
38        ladder_size = self.model.fsa_file.ref_sizes
39        best_combination = self.model.best_combination
40
41        predicted = self.model.model.predict(best_combination)
42        ladder_name = self.model.fsa_file.ladder
43
44        mse = self.model.mse
45        r2 = self.model.r2
46
47        fig_model_fit = plt.figure(figsize=(20, 10))
48        plt.plot(ladder_size, best_combination, "o")
49        plt.plot(predicted, best_combination, "x")
50        plt.xticks(np.arange(0, np.max(ladder_size), 50))
51        plt.xlabel("bp")
52        plt.yticks(np.arange(0, np.max(best_combination), 500))
53        plt.suptitle(ladder_name)
54        plt.title(f"{mse=}, {r2=}")
55        # plt.title(f"{self.model.model[0]=}")
56        plt.legend(["True value", "Predicted value"])
57        plt.grid()
58
59        plt.close()
60        return fig_model_fit
PlotLadder(model: FitLadderModel)
10    def __init__(self, model: FitLadderModel) -> None:
11        self.model = model
model
plot_ladder_peaks: matplotlib.figure.Figure
13    @property
14    def plot_ladder_peaks(self) -> matplotlib.figure.Figure:
15        trace = self.model.fsa_file.size_standard
16        best_combination = self.model.best_combination
17        ladder_name = self.model.fsa_file.ladder
18        ladder_size = self.model.fsa_file.ref_sizes
19
20        fig_ladder_peaks = plt.figure(figsize=(20, 10))
21        plt.plot(trace)
22        plt.plot(best_combination, trace[best_combination], "o")
23        plt.xlabel("Time")
24        plt.ylabel("Intensity")
25        plt.legend(["Trace", "Peak (bp)"])
26        plt.title(ladder_name)
27        plt.grid()
28
29        for peak, ladder in zip(best_combination, ladder_size):
30            plt.text(peak, trace[peak], ladder)
31
32        plt.close()
33        return fig_ladder_peaks
plot_model_fit
35    @property
36    def plot_model_fit(self):
37
38        ladder_size = self.model.fsa_file.ref_sizes
39        best_combination = self.model.best_combination
40
41        predicted = self.model.model.predict(best_combination)
42        ladder_name = self.model.fsa_file.ladder
43
44        mse = self.model.mse
45        r2 = self.model.r2
46
47        fig_model_fit = plt.figure(figsize=(20, 10))
48        plt.plot(ladder_size, best_combination, "o")
49        plt.plot(predicted, best_combination, "x")
50        plt.xticks(np.arange(0, np.max(ladder_size), 50))
51        plt.xlabel("bp")
52        plt.yticks(np.arange(0, np.max(best_combination), 500))
53        plt.suptitle(ladder_name)
54        plt.title(f"{mse=}, {r2=}")
55        # plt.title(f"{self.model.model[0]=}")
56        plt.legend(["True value", "Predicted value"])
57        plt.grid()
58
59        plt.close()
60        return fig_model_fit
class PlotRawData:
 6class PlotRawData:
 7    def __init__(self, fsa: fraggler.FsaFile):
 8        self.fsa = fsa
 9
10    @property
11    def plot_raw_data(self):
12        fig = plt.figure(figsize=(20, 10))
13
14        plt.plot(self.fsa.trace)
15        plt.xlabel("Time")
16        plt.ylabel("Intensity")
17
18        plt.close()
19        return fig
PlotRawData(fsa: FsaFile)
7    def __init__(self, fsa: fraggler.FsaFile):
8        self.fsa = fsa
fsa
plot_raw_data
10    @property
11    def plot_raw_data(self):
12        fig = plt.figure(figsize=(20, 10))
13
14        plt.plot(self.fsa.trace)
15        plt.xlabel("Time")
16        plt.ylabel("Intensity")
17
18        plt.close()
19        return fig
class PlotPeaks:
 7class PlotPeaks:
 8    def __init__(self, peaks: PeakFinder):
 9        self.peaks = peaks
10
11    @property
12    def plot_peaks(self):
13        fig_peaks = plt.figure(figsize=(20, 10))
14
15        df = self.peaks.peaks_dataframe.loc[
16            lambda x: x.basepairs > self.peaks.peak_information.basepairs.min() - 10
17        ].loc[lambda x: x.basepairs < self.peaks.peak_information.basepairs.max() + 10]
18
19        plt.plot(df.basepairs, df.peaks)
20        plt.plot(
21            self.peaks.peak_information.basepairs,
22            self.peaks.peak_information.peaks,
23            "o",
24        )
25        for x, y in zip(
26            self.peaks.peak_information.basepairs,
27            self.peaks.peak_information.peaks,
28        ):
29            plt.text(x, y, f"{round(x, 1)} bp")
30
31        channel = self.peaks.model.fsa_file.trace_channel
32        plt.title(f"Channel: {channel}")
33        plt.xticks(np.arange(df.basepairs.min(), df.basepairs.max(), 10), rotation=90)
34        plt.ylabel("intensity")
35        plt.xlabel("basepairs")
36        plt.grid()
37        plt.close()
38
39        return fig_peaks
PlotPeaks(peaks: PeakFinder)
8    def __init__(self, peaks: PeakFinder):
9        self.peaks = peaks
peaks
plot_peaks
11    @property
12    def plot_peaks(self):
13        fig_peaks = plt.figure(figsize=(20, 10))
14
15        df = self.peaks.peaks_dataframe.loc[
16            lambda x: x.basepairs > self.peaks.peak_information.basepairs.min() - 10
17        ].loc[lambda x: x.basepairs < self.peaks.peak_information.basepairs.max() + 10]
18
19        plt.plot(df.basepairs, df.peaks)
20        plt.plot(
21            self.peaks.peak_information.basepairs,
22            self.peaks.peak_information.peaks,
23            "o",
24        )
25        for x, y in zip(
26            self.peaks.peak_information.basepairs,
27            self.peaks.peak_information.peaks,
28        ):
29            plt.text(x, y, f"{round(x, 1)} bp")
30
31        channel = self.peaks.model.fsa_file.trace_channel
32        plt.title(f"Channel: {channel}")
33        plt.xticks(np.arange(df.basepairs.min(), df.basepairs.max(), 10), rotation=90)
34        plt.ylabel("intensity")
35        plt.xlabel("basepairs")
36        plt.grid()
37        plt.close()
38
39        return fig_peaks
def generate_peak_table(in_files: str | list) -> pandas.core.frame.DataFrame:
 8def generate_peak_table(
 9    in_files: str | list,
10) -> pd.DataFrame:
11    if isinstance(in_files, str):
12        in_files = [x for x in Path(in_files).iterdir() if x.suffix == ".fsa"]
13
14    peak_dfs = []
15    for x in in_files:
16        try:
17            fsa = fraggler.FsaFile(
18                x, ladder, min_height=min_height, trace_channel=trace_channel
19            )
20            pla = fraggler.PeakLadderAssigner(fsa)
21            model = fraggler.FitLadderModel(pla)
22            pam = fraggler.PeakAreaDeMultiplex(
23                model,
24                cutoff=cutoff,
25                min_ratio=min_ratio,
26                custom_peaks=custom_peaks,
27            )
28            peak_dfs.append(pam.assays_dataframe(peak_model))
29        except:
30            print(f"FAILED: {fsa.file_name}")
31
32    return pd.concat(peak_dfs).reset_index(drop=True)
def get_files(in_path: str) -> list[pathlib.Path]:
 7def get_files(in_path: str) -> list[Path]:
 8    # If in_path is a directory, get a list of all .fsa files in it
 9    if Path(in_path).is_dir():
10        files = [x for x in Path(in_path).iterdir() if x.suffix == ".fsa"]
11    else:
12        files = [Path(in_path)]
13    return files
def setup_logging(outdir: str) -> None:
16def setup_logging(outdir: str) -> None:
17    """
18    Set up the logging object and saves the log file to the same dir as the results files.
19    """
20    NOW = datetime.now().strftime("%Y-%m-%d_%H:%M")
21
22    if not (outdir := Path(outdir)).exists():
23        outdir.mkdir(parents=True)
24
25    LOG_FILE = f"{outdir}/fraggler_{NOW}.log"
26
27    logging.basicConfig(
28        level=logging.INFO,
29        format="%(asctime)s [%(levelname)s] \n%(message)s",
30        datefmt="%Y-%m-%d %I:%M:%S",
31        handlers=[logging.FileHandler(LOG_FILE), logging.StreamHandler()],
32    )

Set up the logging object and saves the log file to the same dir as the results files.

def make_fsa_data_df(fsa, ladder: bool = False) -> pandas.core.frame.DataFrame:
 6def make_fsa_data_df(fsa, ladder: bool = False) -> pd.DataFrame:
 7    data = ["DATA1", "DATA2", "DATA3", "DATA4"]
 8    if ladder:
 9        data.append("DATA205")
10    dfs = []
11    for d in data:
12        df = (
13            pd.DataFrame()
14            .assign(data=fsa.fsa[d])
15            .assign(channel=d)
16            .assign(time=lambda x: range(x.shape[0]))
17        )
18        dfs.append(df)
19    return pd.concat(dfs)
def plot_fsa_data(fsa: str, ladder: bool = False) -> list:
22def plot_fsa_data(fsa: str, ladder: bool = False) -> list:
23    alt.data_transformers.disable_max_rows()
24    df = make_fsa_data_df(fsa, ladder)
25
26    plots = []
27    for channel in df.channel.unique():
28        plot = (
29            alt.Chart(df.loc[lambda x: x.channel == channel])
30            .mark_line()
31            .encode(
32                alt.X("time:Q", title="Time"),
33                alt.Y("data:Q", title="Intensity"),
34                alt.Color("channel:N"),
35            )
36            .properties(
37                width=800,
38                height=500,
39            )  # .interactive()
40        )
41        plots.append((plot, channel))
42
43    all_data = (
44        alt.Chart(df)
45        .mark_line()
46        .encode(
47            alt.X("time:Q", title="Time"),
48            alt.Y("data:Q", title="Intensity"),
49            alt.Color("channel:N"),
50        )
51        .properties(
52            width=800,
53            height=500,
54        )  # .interactive()
55    )
56    plots.append((all_data, "All channels"))
57    return plots
class PeakFinder:
 77class PeakFinder:
 78    def __init__(
 79        self,
 80        model: FitLadderModel,
 81        min_ratio: float = 0.15,
 82        search_peaks_start: int = 110,
 83        peak_height: int = 350,
 84        distance_between_assays: int = 15,
 85        custom_peaks: str | pd.DataFrame = None,
 86    ) -> None:
 87        self.model = model
 88        self.raw_data = self.model.adjusted_baisepair_df
 89        self.file_name = self.model.fsa_file.file_name
 90        self.search_peaks_start = search_peaks_start
 91        self.peak_height = peak_height
 92        self.min_ratio = min_ratio
 93        self.distance_between_assays = distance_between_assays
 94        self.custom_peaks = (
 95            custom_peaks.fillna(0)
 96            if isinstance(custom_peaks, pd.DataFrame)
 97            else pd.read_csv(custom_peaks).fillna(0)
 98            if isinstance(custom_peaks, str)
 99            else None
100        )
101
102        # Validation of custom_peaks
103        self.run_validation()
104        # find peaks, custom or agnostic
105        self.find_peaks()
106        # continue whether peaks are found or not.
107        self.found_peaks()
108
109    def run_validation(self):
110        if self.custom_peaks is None:
111            return
112
113        if is_overlapping(self.custom_peaks):
114            raise OverlappingIntervalError("Overlapping intervals!")
115            exit(1)
116
117        if not has_columns(self.custom_peaks):
118            raise WrongColumnsError("Wrong columns!")
119            exit(1)
120
121    def find_peaks(self):
122        if self.custom_peaks is not None:
123            self.find_peaks_customized(
124                peak_height=self.peak_height,
125            )
126        else:
127            self.find_peaks_agnostic(
128                peak_height=self.peak_height,
129                min_ratio=self.min_ratio,
130                distance_between_assays=self.distance_between_assays,
131            )
132
133    def found_peaks(self):
134        # if no peaks could be found
135        if self.peak_information.shape[0] == 0:
136            self.found_peaks = False
137            logging.warning(f"No peaks could be found. Please look at raw data.")
138        # if peaks are found
139        else:
140            self.found_peaks = True
141
142    def find_peaks_agnostic(
143        self,
144        peak_height: int,
145        min_ratio: float,
146        distance_between_assays: int,
147    ) -> None:
148        peaks_dataframe = self.raw_data.loc[
149            lambda x: x.basepairs > self.search_peaks_start
150        ]
151        peaks_index, _ = find_peaks(peaks_dataframe.peaks, height=peak_height)
152
153        peak_information = (
154            peaks_dataframe.iloc[peaks_index]
155            .assign(peaks_index=peaks_index)
156            .assign(peak_name=lambda x: range(1, x.shape[0] + 1))
157            # separate the peaks into different assay groups depending on the distance
158            # between the peaks
159            .assign(difference=lambda x: x.basepairs.diff())
160            .fillna(100)
161            .assign(
162                assay=lambda x: np.select(
163                    [x.difference > distance_between_assays],
164                    [x.peak_name * 10],
165                    default=pd.NA,
166                )
167            )
168            .fillna(method="ffill")
169            .assign(max_peak=lambda x: x.groupby("assay")["peaks"].transform(np.max))
170            .assign(ratio=lambda x: x.peaks / x.max_peak)
171            .loc[lambda x: x.ratio > min_ratio]
172            .assign(peak_name=lambda x: range(1, x.shape[0] + 1))
173        )
174
175        # update peaks_index based on the above filtering
176        peaks_index = peak_information.peaks_index.to_numpy()
177
178        # update class attributes
179        self.peaks_index = peaks_index
180        self.peaks_dataframe = peaks_dataframe
181        self.peak_information = peak_information
182
183    def find_peaks_customized(
184        self,
185        peak_height: int,
186    ) -> None:
187
188        # Filter where to start search for the peaks
189        peaks_dataframe = self.raw_data.loc[
190            lambda x: x.basepairs > self.search_peaks_start
191        ]
192        # Find the peaks
193        peaks_index, _ = find_peaks(peaks_dataframe.peaks, height=peak_height)
194
195        # Filter the df to get right peaks
196        peak_information = peaks_dataframe.iloc[peaks_index].assign(
197            peaks_index=peaks_index
198        )
199        # Filter the above df based on the custom peaks from the user
200        customized_peaks = []
201        for assay in self.custom_peaks.itertuples():
202            df = (
203                peak_information.loc[lambda x: x.basepairs > assay.start]
204                .loc[lambda x: x.basepairs < assay.stop]
205                .assign(assay=assay.name)
206            )
207
208            # Rank the peaks by height and filter out the smallest ones
209            if assay.amount != 0:
210                if assay.which == "LARGEST" or assay.which == "":
211                    df = (
212                        df.assign(max_peak=lambda x: x.peaks.max())
213                        .assign(ratio=lambda x: x.peaks / x.max_peak)
214                        .loc[lambda x: x.ratio > assay.min_ratio]
215                        .assign(rank_peak=lambda x: x.peaks.rank(ascending=False))
216                        .loc[lambda x: x.rank_peak <= assay.amount]
217                        .drop(columns=["rank_peak"])
218                    )
219                    if assay.peak_distance != 0:
220                        df = (
221                            df.assign(distance=lambda x: x.basepairs.diff())
222                            .assign(distance=lambda x: x.distance.fillna(0))
223                            .loc[lambda x: x.distance <= assay.peak_distance]
224                            .drop(columns=["distance"])
225                        )
226
227                elif assay.which == "FIRST":
228                    df = (
229                        df.assign(max_peak=lambda x: x.peaks.max())
230                        .assign(ratio=lambda x: x.peaks / x.max_peak)
231                        .loc[lambda x: x.ratio > assay.min_ratio]
232                        .sort_values("basepairs", ascending=True)
233                        .head(assay.amount)
234                    )
235                    if assay.peak_distance != 0:
236                        df = (
237                            df.assign(distance=lambda x: x.basepairs.diff())
238                            .assign(distance=lambda x: x.distance.fillna(0))
239                            .loc[lambda x: x.distance <= assay.peak_distance]
240                            .drop(columns=["distance"])
241                        )
242                else:
243                    print("[ERROR]: column `which` must be `FIRST` or `LARGEST`")
244                    exit(1)
245
246            customized_peaks.append(df)
247
248        peak_information = (
249            pd.concat(customized_peaks)
250            .reset_index()
251            .assign(peak_name=lambda x: range(1, x.shape[0] + 1))
252        )
253
254        # update peaks_index based on the above filtering
255        peaks_index = peak_information.peaks_index.to_numpy()
256
257        # update class attributes
258        self.peaks_index = peaks_index
259        self.peaks_dataframe = peaks_dataframe
260        self.peak_information = peak_information
PeakFinder( model: FitLadderModel, min_ratio: float = 0.15, search_peaks_start: int = 110, peak_height: int = 350, distance_between_assays: int = 15, custom_peaks: str | pandas.core.frame.DataFrame = None)
 78    def __init__(
 79        self,
 80        model: FitLadderModel,
 81        min_ratio: float = 0.15,
 82        search_peaks_start: int = 110,
 83        peak_height: int = 350,
 84        distance_between_assays: int = 15,
 85        custom_peaks: str | pd.DataFrame = None,
 86    ) -> None:
 87        self.model = model
 88        self.raw_data = self.model.adjusted_baisepair_df
 89        self.file_name = self.model.fsa_file.file_name
 90        self.search_peaks_start = search_peaks_start
 91        self.peak_height = peak_height
 92        self.min_ratio = min_ratio
 93        self.distance_between_assays = distance_between_assays
 94        self.custom_peaks = (
 95            custom_peaks.fillna(0)
 96            if isinstance(custom_peaks, pd.DataFrame)
 97            else pd.read_csv(custom_peaks).fillna(0)
 98            if isinstance(custom_peaks, str)
 99            else None
100        )
101
102        # Validation of custom_peaks
103        self.run_validation()
104        # find peaks, custom or agnostic
105        self.find_peaks()
106        # continue whether peaks are found or not.
107        self.found_peaks()
model
raw_data
file_name
search_peaks_start
peak_height
min_ratio
distance_between_assays
custom_peaks
def run_validation(self):
109    def run_validation(self):
110        if self.custom_peaks is None:
111            return
112
113        if is_overlapping(self.custom_peaks):
114            raise OverlappingIntervalError("Overlapping intervals!")
115            exit(1)
116
117        if not has_columns(self.custom_peaks):
118            raise WrongColumnsError("Wrong columns!")
119            exit(1)
def find_peaks(self):
121    def find_peaks(self):
122        if self.custom_peaks is not None:
123            self.find_peaks_customized(
124                peak_height=self.peak_height,
125            )
126        else:
127            self.find_peaks_agnostic(
128                peak_height=self.peak_height,
129                min_ratio=self.min_ratio,
130                distance_between_assays=self.distance_between_assays,
131            )
def found_peaks(self):
133    def found_peaks(self):
134        # if no peaks could be found
135        if self.peak_information.shape[0] == 0:
136            self.found_peaks = False
137            logging.warning(f"No peaks could be found. Please look at raw data.")
138        # if peaks are found
139        else:
140            self.found_peaks = True
def find_peaks_agnostic( self, peak_height: int, min_ratio: float, distance_between_assays: int) -> None:
142    def find_peaks_agnostic(
143        self,
144        peak_height: int,
145        min_ratio: float,
146        distance_between_assays: int,
147    ) -> None:
148        peaks_dataframe = self.raw_data.loc[
149            lambda x: x.basepairs > self.search_peaks_start
150        ]
151        peaks_index, _ = find_peaks(peaks_dataframe.peaks, height=peak_height)
152
153        peak_information = (
154            peaks_dataframe.iloc[peaks_index]
155            .assign(peaks_index=peaks_index)
156            .assign(peak_name=lambda x: range(1, x.shape[0] + 1))
157            # separate the peaks into different assay groups depending on the distance
158            # between the peaks
159            .assign(difference=lambda x: x.basepairs.diff())
160            .fillna(100)
161            .assign(
162                assay=lambda x: np.select(
163                    [x.difference > distance_between_assays],
164                    [x.peak_name * 10],
165                    default=pd.NA,
166                )
167            )
168            .fillna(method="ffill")
169            .assign(max_peak=lambda x: x.groupby("assay")["peaks"].transform(np.max))
170            .assign(ratio=lambda x: x.peaks / x.max_peak)
171            .loc[lambda x: x.ratio > min_ratio]
172            .assign(peak_name=lambda x: range(1, x.shape[0] + 1))
173        )
174
175        # update peaks_index based on the above filtering
176        peaks_index = peak_information.peaks_index.to_numpy()
177
178        # update class attributes
179        self.peaks_index = peaks_index
180        self.peaks_dataframe = peaks_dataframe
181        self.peak_information = peak_information
def find_peaks_customized(self, peak_height: int) -> None:
183    def find_peaks_customized(
184        self,
185        peak_height: int,
186    ) -> None:
187
188        # Filter where to start search for the peaks
189        peaks_dataframe = self.raw_data.loc[
190            lambda x: x.basepairs > self.search_peaks_start
191        ]
192        # Find the peaks
193        peaks_index, _ = find_peaks(peaks_dataframe.peaks, height=peak_height)
194
195        # Filter the df to get right peaks
196        peak_information = peaks_dataframe.iloc[peaks_index].assign(
197            peaks_index=peaks_index
198        )
199        # Filter the above df based on the custom peaks from the user
200        customized_peaks = []
201        for assay in self.custom_peaks.itertuples():
202            df = (
203                peak_information.loc[lambda x: x.basepairs > assay.start]
204                .loc[lambda x: x.basepairs < assay.stop]
205                .assign(assay=assay.name)
206            )
207
208            # Rank the peaks by height and filter out the smallest ones
209            if assay.amount != 0:
210                if assay.which == "LARGEST" or assay.which == "":
211                    df = (
212                        df.assign(max_peak=lambda x: x.peaks.max())
213                        .assign(ratio=lambda x: x.peaks / x.max_peak)
214                        .loc[lambda x: x.ratio > assay.min_ratio]
215                        .assign(rank_peak=lambda x: x.peaks.rank(ascending=False))
216                        .loc[lambda x: x.rank_peak <= assay.amount]
217                        .drop(columns=["rank_peak"])
218                    )
219                    if assay.peak_distance != 0:
220                        df = (
221                            df.assign(distance=lambda x: x.basepairs.diff())
222                            .assign(distance=lambda x: x.distance.fillna(0))
223                            .loc[lambda x: x.distance <= assay.peak_distance]
224                            .drop(columns=["distance"])
225                        )
226
227                elif assay.which == "FIRST":
228                    df = (
229                        df.assign(max_peak=lambda x: x.peaks.max())
230                        .assign(ratio=lambda x: x.peaks / x.max_peak)
231                        .loc[lambda x: x.ratio > assay.min_ratio]
232                        .sort_values("basepairs", ascending=True)
233                        .head(assay.amount)
234                    )
235                    if assay.peak_distance != 0:
236                        df = (
237                            df.assign(distance=lambda x: x.basepairs.diff())
238                            .assign(distance=lambda x: x.distance.fillna(0))
239                            .loc[lambda x: x.distance <= assay.peak_distance]
240                            .drop(columns=["distance"])
241                        )
242                else:
243                    print("[ERROR]: column `which` must be `FIRST` or `LARGEST`")
244                    exit(1)
245
246            customized_peaks.append(df)
247
248        peak_information = (
249            pd.concat(customized_peaks)
250            .reset_index()
251            .assign(peak_name=lambda x: range(1, x.shape[0] + 1))
252        )
253
254        # update peaks_index based on the above filtering
255        peaks_index = peak_information.peaks_index.to_numpy()
256
257        # update class attributes
258        self.peaks_index = peaks_index
259        self.peaks_dataframe = peaks_dataframe
260        self.peak_information = peak_information
@dataclass
class FragglerPeak:
13@dataclass
14class FragglerPeak:
15    fsa: FsaFile
16    ladder_assigner: PeakLadderAssigner
17    model: FitLadderModel
18    peaks: PeakFinder
FragglerPeak( fsa: FsaFile, ladder_assigner: PeakLadderAssigner, model: FitLadderModel, peaks: PeakFinder)
fsa: FsaFile
ladder_assigner: PeakLadderAssigner
peaks: PeakFinder
@dataclass
class FragglerArea:
21@dataclass
22class FragglerArea:
23    fsa: FsaFile
24    ladder_assigner: PeakLadderAssigner
25    model: FitLadderModel
26    peaks: PeakFinder
27    areas: PeakAreaDeMultiplex
FragglerArea( fsa: FsaFile, ladder_assigner: PeakLadderAssigner, model: FitLadderModel, peaks: PeakFinder, areas: PeakAreaDeMultiplex)
fsa: FsaFile
ladder_assigner: PeakLadderAssigner
peaks: PeakFinder
def make_fraggler_peak( file: pathlib.Path, ladder: str, min_height: int, min_ratio: float, trace_channel: str, peak_height: int, custom_peaks: str | None, size_standard_channel: str | None, distance_between_assays: int) -> FragglerPeak | str:
30def make_fraggler_peak(
31    file: Path,
32    ladder: str,
33    min_height: int,
34    min_ratio: float,
35    trace_channel: str,
36    peak_height: int,
37    custom_peaks: str | None,
38    size_standard_channel: str | None,
39    distance_between_assays: int,
40) -> FragglerPeak | str:
41    file = Path(file)
42    try:
43        fsa = FsaFile(
44            file,
45            ladder,
46            min_height=min_height,
47            trace_channel=trace_channel,
48            size_standard_channel=size_standard_channel,
49        )
50        ladder_assigner = PeakLadderAssigner(fsa)
51        model = FitLadderModel(ladder_assigner)
52        peaks = PeakFinder(
53            model,
54            min_ratio=min_ratio,
55            peak_height=peak_height,
56            custom_peaks=custom_peaks,
57            distance_between_assays=distance_between_assays,
58        )
59
60        return FragglerPeak(
61            fsa=fsa,
62            ladder_assigner=ladder_assigner,
63            model=model,
64            peaks=peaks,
65        )
66
67    except Exception as e:
68        logging.error(
69            f"""Following file did not work: {file.stem}
70        Reason: {e}
71        """
72        )
73
74        return file.stem
def make_fraggler_area( file: pathlib.Path, ladder: str, min_height: int, min_ratio: float, trace_channel: str, peak_height: int, custom_peaks: str | None, size_standard_channel: str | None, distance_between_assays: int, cutoff: int) -> FragglerArea | str:
 77def make_fraggler_area(
 78    file: Path,
 79    ladder: str,
 80    min_height: int,
 81    min_ratio: float,
 82    trace_channel: str,
 83    peak_height: int,
 84    custom_peaks: str | None,
 85    size_standard_channel: str | None,
 86    distance_between_assays: int,
 87    cutoff: int,
 88) -> FragglerArea | str:
 89    file = Path(file)
 90    try:
 91        fsa = FsaFile(
 92            file,
 93            ladder,
 94            min_height=min_height,
 95            trace_channel=trace_channel,
 96            size_standard_channel=size_standard_channel,
 97        )
 98        ladder_assigner = PeakLadderAssigner(fsa)
 99        model = FitLadderModel(ladder_assigner)
100        peaks = PeakFinder(
101            model,
102            min_ratio=min_ratio,
103            peak_height=peak_height,
104            custom_peaks=custom_peaks,
105            distance_between_assays=distance_between_assays,
106        )
107        areas = PeakAreaDeMultiplex(
108            peaks,
109            cutoff,
110        )
111
112        return FragglerArea(
113            fsa=fsa,
114            ladder_assigner=ladder_assigner,
115            model=model,
116            peaks=peaks,
117            areas=areas,
118        )
119
120    except Exception as e:
121        logging.error(
122            f"""Following file did not work: {file.stem}
123        Reason: {e}
124        """
125        )
126
127        return file.stem
def generate_peak_report( fraggler: FragglerPeak) -> panel.layout.base.Column:
 64def generate_peak_report(fraggler: FragglerPeak) -> pn.layout.base.Column:
 65    ### ----- Raw Data ----- ###
 66    channel_header = header(
 67        text="## Plot of channels",
 68        bg_color="#04c273",
 69        height=80,
 70        textalign="left",
 71    )
 72    # PLOT
 73    channel_tab = pn.Tabs()
 74    for plot, name in plot_fsa_data(fraggler.fsa):
 75        pane = pn.pane.Vega(plot.interactive(), sizing_mode="stretch_both", name=name)
 76        channel_tab.append(pane)
 77
 78    channels_section = pn.Column(channel_header, channel_tab)
 79
 80    ### ----- Peaks ----- ###
 81    peaks_header = header(
 82        text="## Plot of Peaks",
 83        bg_color="#04c273",
 84        height=80,
 85        textalign="left",
 86    )
 87
 88    # PLOT
 89    peaks_plot = PlotPeaks(fraggler.peaks).plot_peaks
 90    peaks_pane = pn.pane.Matplotlib(peaks_plot, name="Peaks")
 91
 92    # Section
 93    peaks_tab = pn.Tabs(
 94        peaks_pane,
 95    )
 96    peaks_section = pn.Column(peaks_header, peaks_tab)
 97
 98    ### ----- Ladder Information ----- ###
 99    ladder_header = header(
100        text="## Information about the ladder",
101        bg_color="#04c273",
102        height=80,
103        textalign="left",
104    )
105    # Ladder peak plot
106    ladder_plot = PlotLadder(fraggler.model)
107    ladder_peak_plot = pn.pane.Matplotlib(
108        ladder_plot.plot_ladder_peaks,
109        name="Ladder Peak Plot",
110    )
111    # Ladder Correlation
112    ladder_correlation_plot = pn.pane.Matplotlib(
113        ladder_plot.plot_model_fit,
114        name="Ladder Correlation Plot",
115    )
116
117    # Section
118    ladder_tab = pn.Tabs(
119        ladder_peak_plot,
120        ladder_correlation_plot,
121    )
122    ladder_section = pn.Column(ladder_header, ladder_tab)
123
124    ### ----- Peaks dataframe ----- ###
125    dataframe_header = header(
126        text="## Peaks Table", bg_color="#04c273", height=80, textalign="left"
127    )
128    # Create dataframe
129    df = fraggler.peaks.peak_information.assign(file_name=fraggler.fsa.file_name)[
130        ["file_name", "basepairs", "peaks", "assay_name"]
131    ].rename(columns={"peaks": "peak_height"})
132    # DataFrame Tabulator
133    peaks_df_tab = pn.widgets.Tabulator(
134        df,
135        layout="fit_columns",
136        pagination="local",
137        page_size=15,
138        show_index=False,
139        name="Peaks Table",
140    )
141
142    # Section
143    dataframe_tab = pn.Tabs(peaks_df_tab)
144    dataframe_section = pn.Column(dataframe_header, dataframe_tab)
145
146    ### CREATE REPORT ###
147
148    file_name = fraggler.fsa.file_name
149    date = fraggler.fsa.fsa["RUND1"]
150    head = make_header(file_name, date)
151
152    all_tabs = pn.Tabs(
153        ("Channels", channels_section),
154        ("Peaks", peaks_section),
155        ("Ladder", ladder_section),
156        ("Peaks Table", dataframe_section),
157        tabs_location="left",
158    )
159    report = pn.Column(
160        head,
161        pn.layout.Divider(),
162        all_tabs,
163    )
164
165    return report
def generate_area_report( fraggler: FragglerArea, peak_model: str = 'gauss') -> panel.layout.base.Column:
168def generate_area_report(
169    fraggler: FragglerArea,
170    peak_model: str = "gauss",
171) -> pn.layout.base.Column:
172
173    ### ----- Raw Data ----- ###
174    channel_header = header(
175        text="## Plot of channels",
176        bg_color="#04c273",
177        height=80,
178        textalign="left",
179    )
180    # PLOT
181    channel_tab = pn.Tabs()
182    for plot, name in plot_fsa_data(fraggler.fsa):
183        pane = pn.pane.Vega(plot.interactive(), sizing_mode="stretch_both", name=name)
184        channel_tab.append(pane)
185
186    channels_section = pn.Column(channel_header, channel_tab)
187
188    ### ----- Peaks ----- ###
189    peaks_header = header(
190        text="## Plot of Peaks",
191        bg_color="#04c273",
192        height=80,
193        textalign="left",
194    )
195
196    # PLOT
197    peaks_plot = PlotPeaks(fraggler.peaks).plot_peaks
198    peaks_pane = pn.pane.Matplotlib(peaks_plot, name="Peaks")
199
200    # Section
201    peaks_tab = pn.Tabs(
202        peaks_pane,
203    )
204    peaks_section = pn.Column(peaks_header, peaks_tab)
205
206    ### ----- Ladder Information ----- ###
207    ladder_header = header(
208        text="## Information about the ladder",
209        bg_color="#04c273",
210        height=80,
211        textalign="left",
212    )
213    # Ladder peak plot
214    ladder_plot = PlotLadder(fraggler.model)
215    ladder_peak_plot = pn.pane.Matplotlib(
216        ladder_plot.plot_ladder_peaks,
217        name="Ladder Peak Plot",
218    )
219    # Ladder Correlation
220    ladder_correlation_plot = pn.pane.Matplotlib(
221        ladder_plot.plot_model_fit,
222        name="Ladder Correlation Plot",
223    )
224
225    # Section
226    ladder_tab = pn.Tabs(
227        ladder_peak_plot,
228        ladder_correlation_plot,
229    )
230    ladder_section = pn.Column(ladder_header, ladder_tab)
231
232    ### ----- Areas Information ----- ###
233    areas_header = header(
234        text="## Peak Areas", bg_color="#04c273", height=80, textalign="left"
235    )
236    areas_tab = pn.Tabs()
237    for i, plot in enumerate(fraggler.areas.area_plots):
238        name = f"Assay {i + 1}"
239        plot_pane = pn.pane.Matplotlib(plot, name=name)
240        areas_tab.append(plot_pane)
241
242    # Section
243    areas_section = pn.Column(areas_header, areas_tab)
244
245    ### ----- Peaks DataFrame ----- ###
246    dataframe_header = header(
247        text="## Peaks Table", bg_color="#04c273", height=80, textalign="left"
248    )
249
250    if not hasattr(fraggler.areas, "final_df"):
251        df = fraggler.areas.assays_dataframe(peak_model)
252    else:
253        df = fraggler.areas.final_df
254
255    # DataFrame Tabulator
256    peaks_df_tab = pn.widgets.Tabulator(
257        df,
258        layout="fit_columns",
259        pagination="local",
260        page_size=15,
261        show_index=False,
262        name="Peaks Table",
263    )
264
265    # Section
266    dataframe_tab = pn.Tabs(peaks_df_tab)
267    dataframe_section = pn.Column(dataframe_header, dataframe_tab)
268
269    ### CREATE REPORT ###
270
271    file_name = fraggler.fsa.file_name
272    date = fraggler.fsa.fsa["RUND1"]
273    head = make_header(file_name, date)
274
275    all_tabs = pn.Tabs(
276        ("Channels", channels_section),
277        ("Peaks", peaks_section),
278        ("Ladder", ladder_section),
279        ("Areas", areas_section),
280        ("Peak Table", dataframe_section),
281        tabs_location="left",
282    )
283    report = pn.Column(
284        head,
285        pn.layout.Divider(),
286        all_tabs,
287    )
288
289    return report
def generate_no_peaks_report(fsa: FsaFile):
292def generate_no_peaks_report(fsa: FsaFile):
293    channel_header = header(
294        text="## Plot of channels",
295        bg_color="#04c273",
296        height=80,
297        textalign="left",
298    )
299    # PLOT
300    channel_tab = pn.Tabs()
301    for plot, name in plot_fsa_data(fsa):
302        pane = pn.pane.Vega(plot.interactive(), sizing_mode="stretch_both", name=name)
303        channel_tab.append(pane)
304    channels_section = pn.Column(channel_header, channel_tab)
305
306    ### CREATE REPORT ###
307    file_name = fsa.file_name
308    date = fsa.fsa["RUND1"]
309    head = header(
310        "# No peaks could be generated. Please look at the raw data.", height=100
311    )
312
313    all_tabs = pn.Tabs(
314        ("Channels", channels_section),
315        tabs_location="left",
316    )
317    report = pn.Column(
318        head,
319        pn.layout.Divider(),
320        all_tabs,
321    )
322
323    return report
def peak_report( in_path: str, out_folder: str, ladder: str, min_ratio: float = 0.2, min_height: int = 30, channel: str = 'DATA1', peak_height: int = 500, size_standard_channel: str | None = None, distance_between_assays: int = 15, custom_peaks: str = None, out_format: str = 'excel') -> None:
160def peak_report(
161    in_path: str,
162    out_folder: str,
163    ladder: str,
164    min_ratio: float = 0.2,
165    min_height: int = 30,
166    channel: str = "DATA1",
167    peak_height: int = 500,
168    size_standard_channel: str | None = None,
169    distance_between_assays: int = 15,
170    custom_peaks: str = None,
171    out_format: str = "excel",
172) -> None:
173
174    if custom_peaks:
175        peak_height = 0
176
177    print(ASCII_ART)
178
179    # Logging
180    fraggler.setup_logging(out_folder)
181    INFO = f"""
182    Runned command:
183    {' '.join(sys.argv)}
184
185    Running fraggler with following parameters:
186        In path: {in_path}
187        Out folder: {out_folder}
188        Ladder: {ladder}
189        Min ratio: {min_ratio}
190        Min height: {min_height}
191        Trace channel: {channel}
192        Peak Height: {peak_height}
193        Custom Peaks: {custom_peaks}
194        Out format: {out_format}
195        Size standard channel: {size_standard_channel}
196        Distance between assays: {distance_between_assays}
197    """
198    logging.info(INFO)
199
200    # Files
201    files = fraggler.get_files(in_path)
202    out_folder = Path(out_folder)
203
204    # Generate a peak area report for each file
205    failed_files = []
206    no_peaks = []
207    peak_dfs = []
208    for file in files:
209        logging.info(f"Processing file: {file}")
210        fraggler_object = fraggler.make_fraggler_peak(
211            file=file,
212            ladder=ladder,
213            min_height=min_height,
214            min_ratio=min_ratio,
215            trace_channel=channel,
216            distance_between_assays=distance_between_assays,
217            size_standard_channel=size_standard_channel,
218            peak_height=peak_height,
219            custom_peaks=custom_peaks,
220        )
221
222        # If make_fraggler_object failed
223        if isinstance(fraggler_object, str):
224            failed_files.append(fraggler_object)
225            fsa = fraggler.FsaFile(file, ladder)
226            report = fraggler.generate_no_peaks_report(fsa)
227            out_name = out_folder / f"{file.stem}_fraggler_failed.html"
228            report.save(out_name)
229            continue
230
231        if not fraggler_object.peaks.found_peaks:
232            no_peaks.append(file.stem)
233            fsa = fraggler.FsaFile(file, ladder)
234            report = fraggler.generate_no_peaks_report(fsa)
235            out_name = out_folder / f"{file.stem}_failed.html"
236            report.save(out_name)
237            continue
238
239        # add peaks to dataframe
240        df = fraggler_object.peaks.peak_information.assign(
241            file_name=fraggler_object.fsa.file_name
242        )[["file_name", "basepairs", "peaks"]].rename(columns={"peaks": "peak_height"})
243        peak_dfs.append(df)
244
245        # generate report
246        report = fraggler.generate_peak_report(fraggler_object)
247        out_name = out_folder / f"{file.stem}_fraggler_peak.html"
248        report.save(out_name)
249
250    # Save dataframe
251    if peak_dfs:
252        save_df_format(peak_dfs, out_folder, in_path, out_format)
253
254    # log failed files
255    logging.info(f"Fraggler done for files in {in_path}!")
256
257    if failed_files:
258        failed_files = "\n".join(failed_files)
259        logging.warning(f"{f.YELLOW}Following files failed: {failed_files}")
260    if no_peaks:
261        no_peaks = "\n".join(no_peaks)
262        logging.warning(f"{f.YELLOW}Following files had no peaks: {no_peaks}")
def area_report( in_path: str, out_folder: str, ladder: str, peak_model: str = 'gauss', min_ratio: float = 0.2, min_height: int = 30, cutoff: int = 175, channel: str = 'DATA1', peak_height: int = 500, size_standard_channel: str | None = None, distance_between_assays: int = 15, custom_peaks: str = None, out_format: str = 'excel') -> None:
 55def area_report(
 56    in_path: str,
 57    out_folder: str,
 58    ladder: str,
 59    peak_model: str = "gauss",
 60    min_ratio: float = 0.2,
 61    min_height: int = 30,
 62    cutoff: int = 175,
 63    channel: str = "DATA1",
 64    peak_height: int = 500,
 65    size_standard_channel: str | None = None,
 66    distance_between_assays: int = 15,
 67    custom_peaks: str = None,
 68    out_format: str = "excel",
 69) -> None:
 70
 71    print(ASCII_ART)
 72    if custom_peaks:
 73        peak_height = 0
 74
 75    # Logging
 76    fraggler.setup_logging(out_folder)
 77    INFO = f"""
 78    Runned command:
 79    {' '.join(sys.argv)}
 80
 81    Running fraggler with following parameters:
 82        In path: {in_path}
 83        Out folder: {out_folder}
 84        Ladder: {ladder}
 85        Peak model: {peak_model}
 86        Min ratio: {min_ratio}
 87        Min height: {min_height}
 88        Cutoff: {cutoff}
 89        Trace channel: {channel}
 90        Peak Height: {peak_height}
 91        Custom Peaks: {custom_peaks}
 92        Out format: {out_format}
 93        Size standard channel: {size_standard_channel}
 94        Distance between assays: {distance_between_assays}
 95    """
 96    logging.info(INFO)
 97
 98    # Files
 99    files = fraggler.get_files(in_path)
100    out_folder = Path(out_folder)
101
102    # Generate a peak area report for each file
103    failed_files = []
104    no_peaks = []
105    peak_dfs = []
106    for file in files:
107        logging.info(f"Processing file: {file}")
108        fraggler_object = fraggler.make_fraggler_area(
109            file=file,
110            ladder=ladder,
111            min_height=min_height,
112            min_ratio=min_ratio,
113            trace_channel=channel,
114            distance_between_assays=distance_between_assays,
115            size_standard_channel=size_standard_channel,
116            cutoff=cutoff,
117            peak_height=peak_height,
118            custom_peaks=custom_peaks,
119        )
120
121        # If make_fraggler_object failed
122        if isinstance(fraggler_object, str):
123            failed_files.append(fraggler_object)
124            fsa = fraggler.FsaFile(file, ladder)
125            report = fraggler.generate_no_peaks_report(fsa)
126            out_name = out_folder / f"{file.stem}_fraggler_failed.html"
127            report.save(out_name)
128            continue
129
130        if not fraggler_object.peaks.found_peaks:
131            no_peaks.append(file.stem)
132            fsa = fraggler.FsaFile(file, ladder)
133            report = fraggler.generate_no_peaks_report(fsa)
134            out_name = out_folder / f"{file.stem}_failed.html"
135            report.save(out_name)
136            continue
137
138        # generate report and peak table
139
140        peak_dfs.append(fraggler_object.areas.assays_dataframe(peak_model))
141        report = fraggler.generate_area_report(fraggler_object, peak_model)
142        out_name = out_folder / f"{file.stem}_fraggler_area.html"
143        report.save(out_name)
144
145    # Save dataframe
146    if peak_dfs:
147        save_df_format(peak_dfs, out_folder, in_path, out_format)
148
149    # log failed files
150    logging.info(f"Fraggler done for files in {in_path}!")
151
152    if failed_files:
153        failed_files = "\n".join(failed_files)
154        logging.warning(f"{f.YELLOW}Following files failed: {failed_files}")
155    if no_peaks:
156        no_peaks = "\n".join(no_peaks)
157        logging.warning(f"{f.YELLOW}Following files had no peaks: {no_peaks}")