Source code for mh_utils.csv_parser.classes

#!/usr/bin/env python3
Classes to model parts of MassHunter CSV files.

.. versionadded:: 0.2.0
#  Copyright © 2020-2021 Dominic Davis-Foster <>
#  Permission is hereby granted, free of charge, to any person obtaining a copy
#  of this software and associated documentation files (the "Software"), to deal
#  in the Software without restriction, including without limitation the rights
#  to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
#  copies of the Software, and to permit persons to whom the Software is
#  furnished to do so, subject to the following conditions:
#  The above copyright notice and this permission notice shall be included in all
#  copies or substantial portions of the Software.

# stdlib
from collections import OrderedDict
from decimal import Decimal
from typing import Dict, Iterable, List, Optional, Tuple, Type, TypeVar

# 3rd party
import numpy
import pandas  # type: ignore
import sdjson
from cawdrey import AlphaDict
from domdf_python_tools import doctools
from domdf_python_tools.bases import Dictable
from domdf_python_tools.doctools import prettify_docstrings
from domdf_python_tools.paths import PathPlus
from domdf_python_tools.typing import PathLike

__all__ = [

pandas.Series.__module_ = "pandas"

_S = TypeVar("_S", bound="Sample")
_SL = TypeVar("_SL", bound="SampleList")
_R = TypeVar("_R", bound="Result")

[docs]@prettify_docstrings class Sample(Dictable): """ Represents a sample in a MassHunter CSV file. :param sample_name: :param sample_type: :param instrument_name: :param position: :param user: :param acq_method: :param da_method: :param irm_cal_status: :param filename: :param results: """ def __init__( self, sample_name, sample_type, instrument_name, position, user, acq_method, da_method, irm_cal_status, filename, results=None, ): super().__init__() self.sample_name = sample_name self.sample_type = sample_type self.instrument_name = instrument_name self.position = position self.user = user self.acq_method = acq_method self.da_method = da_method self.irm_cal_status = irm_cal_status self.filename = filename self._results: Dict[float, Result] if results is None: self._results = {} elif isinstance(results, dict): self._results = {} for cpd_no, compound in results.items(): if isinstance(compound, dict): self._results[cpd_no] = Result(**compound) else: self._results[cpd_no] = compound elif isinstance(results, list): self._results = {} for compound in results: if isinstance(compound, dict): tmp_result = Result(**compound) cpd_no = tmp_result.index self._results[cpd_no] = tmp_result else: self._results[compound.index] = compound else: raise TypeError(f"Unknown type for `results`: {type(results)}")
[docs] def add_result(self, result): """ Add a result to the sample. :param result: """ self._results[result.index] = result
@property def results_list(self) -> List["Result"]: """ Returns a list of results in the order in which they were identified. I.e. sorted by the ``Cpd`` value from the csv export. :rtype: .. clearpage:: """ results_list = [] for key in sorted(self._results.keys()): results_list.append(self._results[key]) return results_list def __eq__(self, other): if isinstance(other, self.__class__): return ( self.sample_name == other.sample_name and self.sample_type == other.sample_type and self.filename == other.filename and self.acq_method == other.acq_method )
[docs] @classmethod def from_series(cls: Type[_S], series) -> _S: """ Constuct a :class:`~.Sample` from a :class:`pandas.Series`. :param series: :return: """ sample_name = series["Sample Name"] sample_type = series["Sample Type"] filename = series["File"] instrument_name = series["Instrument Name"] position = series["Position"] user = series["User Name"] acq_method = series["Acq Method"] da_method = series["DA Method"] irm_cal_status = series["IRM Calibration status"] return cls( sample_name, sample_type, instrument_name, position, user, acq_method, da_method, irm_cal_status, filename, )
def __repr__(self): return f"Sample({self.sample_name})" @property def __dict__(self): return AlphaDict( sample_name=self.sample_name, sample_type=self.sample_type, instrument_name=self.instrument_name, position=self.position, user=self.user, acq_method=self.acq_method, da_method=self.da_method, irm_cal_status=self.irm_cal_status, filename=self.filename, results=self.results_list )
[docs]@prettify_docstrings class Result(Dictable): r""" Represents a Result in a MassHunter CSV file. .. raw:: latex \begin{multicols}{2} :param cas: :param name: :param hits: :param index: :param formula: :param score: :param abundance: :param height: :param area: :param diff_mDa: :param diff_ppm: :param rt: :param start: :param end: :param width: :param tgt_rt: :param rt_diff: :param mz: :param product_mz: :param base_peak: :param mass: :param average_mass: :param tgt_mass: :param mining_algorithm: :param z_count: :param max_z: :param min_z: :param n_ions: :param polarity: :param label: :param flags: :param flag_severity: :param flag_severity_code: .. raw:: latex \end{multicols} """ def __init__( self, cas, name: str, hits, index: int = -1, formula: str = '', score: float = 0.0, abundance: float = 0, height: float = 0, area: float = 0, diff_mDa: float = 0.0, diff_ppm: float = 0.0, rt: float = 0.0, start: float = 0.0, end: float = 0.0, width: float = 0.0, tgt_rt: float = 0.0, rt_diff: float = 0.0, mz: float = 0.0, product_mz: float = 0.0, base_peak: float = 0.0, mass: float = 0.0, average_mass: float = 0.0, tgt_mass: float = 0.0, mining_algorithm: str = '', z_count: int = 0, max_z: int = 0, min_z: int = 0, n_ions: int = 0, polarity: str = '', label: str = '', flags: str = '', flag_severity: str = '', flag_severity_code: int = 0, ): super().__init__() # Possible also AL (ID Source) and AM (ID Techniques Applied) self._cas = cas str = str(name) self.hits = hits self.formula: str = str(formula) self.score: Decimal = Decimal(score) self.abundance: float = int(abundance) self.height: float = int(height) self.area: float = int(area) self.diff_mDa: Decimal = Decimal(diff_mDa) self.diff_ppm: Decimal = Decimal(diff_ppm) self.rt: Decimal = Decimal(rt) self.start: Decimal = Decimal(start) self.end: Decimal = Decimal(end) self.width: Decimal = Decimal(width) self.tgt_rt: Decimal = Decimal(tgt_rt) self.rt_diff: Decimal = Decimal(rt_diff) Decimal = Decimal(mz) self.product_mz: Decimal = Decimal(product_mz) self.base_peak: Decimal = Decimal(base_peak) self.mass: Decimal = Decimal(mass) self.average_mass: Decimal = Decimal(average_mass) self.tgt_mass: Decimal = Decimal(tgt_mass) self.mining_algorithm: str = str(mining_algorithm) self.z_count: int = int(z_count) self.max_z: int = int(max_z) self.min_z: int = int(min_z) self.n_ions: int = int(n_ions) self.polarity: str = str(polarity) self.label: str = str(label) self.flags: str = str(flags) self.flag_severity: str = str(flag_severity) self.flag_severity_code: int = int(flag_severity_code) self.index: int = index # Tracks the number of the result in the sample # "Score (Tgt)",
[docs] @classmethod def from_series(cls: Type[_R], series: pandas.Series) -> _R: """ Consruct a :class:`~.classes.Result` from a :class:`pandas.Series`. :param series: :rtype: .. clearpage:: """ cas = series["CAS"] name = series["Name"] index = series["Cpd"] hits = series["Hits"] formula = series["Formula"] score = series["Score"] abundance = series["Abund"] height = series["Height"] area = series["Area"] diff_mDa = series["Diff (Tgt, mDa)"] diff_ppm = series["Diff (Tgt, ppm)"] rt = series["RT"] start = series["Start"] end = series["End"] width = series["Width"] tgt_rt = series["RT (Tgt)"] rt_diff = series["RT Diff (Tgt)"] mz = series["m/z"] product_mz = series["m/z (prod.)"] base_peak = series["Base Peak"] mass = series["Mass"] average_mass = series["Avg Mass"] tgt_mass = series["Mass (Tgt)"] mining_algorithm = series["Mining Algorithm"] z_count = series["Z Count"] max_z = series["Max Z"] min_z = series["Min Z"] n_ions = series["Ions"] polarity = series["Polarity"] label = series["Label"] flags = series["Flags (Tgt)"] flag_severity = series["Flag Severity (Tgt)"] flag_severity_code = series["Flag Severity Code (Tgt)"] return cls( cas, name, hits, index, formula, score, abundance, height, area, diff_mDa, diff_ppm, rt, start, end, width, tgt_rt, rt_diff, mz, product_mz, base_peak, mass, average_mass, tgt_mass, mining_algorithm, z_count, max_z, min_z, n_ions, polarity, label, flags, flag_severity, flag_severity_code, )
def __repr__(self): return f"Result({}; {self.formula}; {self.rt}; {self.score})" @property def __dict__(self): return AlphaDict( cas=self._cas,, hits=self.hits, formula=self.formula, score=self.score, abundance=self.abundance, height=self.height, area=self.area, diff_mDa=self.diff_mDa, diff_ppm=self.diff_ppm, rt=self.rt, start=self.start, end=self.end, width=self.width, tgt_rt=self.tgt_rt, rt_diff=self.rt_diff,, product_mz=self.product_mz, base_peak=self.base_peak, mass=self.mass, average_mass=self.average_mass, tgt_mass=self.tgt_mass, mining_algorithm=self.mining_algorithm, z_count=self.z_count, max_z=self.max_z, min_z=self.min_z, n_ions=self.n_ions, polarity=self.polarity, label=self.label, flags=self.flags, flag_severity=self.flag_severity, flag_severity_code=self.flag_severity_code, index=self.index, ) def __eq__(self, other): if isinstance(other, str): return other.casefold() == else: return NotImplemented
[docs]class SampleList(List[Sample]): """ A list of :class:`mh_utils.csv_parser.classes.Sample` objects. """
[docs] @doctools.append_docstring_from(Sample.__init__) def add_new_sample(self, *args, **kwargs): """ Add a new sample to the list and return the :class:`~classes.Sample` object representing it. """ # noqa: D400 tmp_sample = Sample(*args, **kwargs) return self.add_sample(tmp_sample)
[docs] def add_sample(self, sample: Sample) -> Sample: """ Add a :class:`~.Sample` object to the list. :param sample: :rtype: .. clearpage:: """ if sample in self: return self[self.index(sample)] else: self.append(sample) return sample
# def find_sample(self, sample_name: str) -> Optional[Sample]: # if sample_name in self: # return self[self.index(sample_name)] # else: # return None
[docs] def add_sample_from_series(self, series: pandas.Series) -> Sample: """ Create a new sample object from a :class:`pandas.series` and add it to the list. :returns: The newly created :class:`~classes.Sample` object. :param series: """ tmp_sample = Sample.from_series(series) return self.add_sample(tmp_sample)
[docs] def sort_samples(self, key: str, reverse: bool = False): """ Sort the list of :class:`~.Samples` in place. :param key: The name of the property in the sample to sort by. :param reverse: Whether the list should be sorted in reverse order. :rtype: .. clearpage:: """ self.sort(key=lambda samp: getattr(samp, key), reverse=reverse)
[docs] def reorder_samples(self, order_mapping: Dict, key: str = "sample_name"): """ Reorder the list of :class:`~.Samples` in place. :param order_mapping: A mapping between sample names and their new position in the list. For example: .. code-block:: python order_mapping = { "Propellant 1ug +ve": 0, "Propellant 1mg +ve": 1, "Propellant 1ug -ve": 2, "Propellant 1mg -ve": 3, } :param key: The name of the property in the sample to sort by. """ self.sort(key=lambda s: order_mapping[getattr(s, key)], reverse=True)
[docs] def rename_samples(self, rename_mapping: Dict, key: str = "sample_name"): r""" Rename the samples in the list. :param rename_mapping: A mapping between current sample names and their new names. :param key: The name of the property in the sample to sort by. Use ``rename_mapping=``\:py:obj:`None` or omit the sample from the ``rename_mapping`` entirely to leave the name unchanged. For example: .. code-block:: python rename_mapping = { "Propellant 1ug +ve": "Alliant Unique 1µg/L +ESI", "Propellant 1mg +ve": "Alliant Unique 1mg/L +ESI", "Propellant 1mg -ve": None, } """ for sample in self: if getattr(sample, key) in rename_mapping and rename_mapping[getattr(sample, key)]: sample.sample_name = rename_mapping.pop(getattr(sample, key))
[docs] def get_areas_and_scores( self, compound_name: str, include_none: bool = False, ) -> Tuple[OrderedDict, OrderedDict]: """ Returns two dictionaries: one containing sample names and peak areas for the compound with the given name, the other containing sample names and scores. :param compound_name: :param include_none: Whether samples where the compound was not found should be included in the results. """ # noqa: D400 peak_areas: "OrderedDict[str, Optional[float]]" = OrderedDict() scores: "OrderedDict[str, Optional[Decimal]]" = OrderedDict() for sample in self: for result in sample.results_list: if == compound_name: peak_areas[sample.sample_name] = result.area scores[sample.sample_name] = result.score break else: if include_none: peak_areas[sample.sample_name] = None scores[sample.sample_name] = None return peak_areas, scores
[docs] def get_retention_times(self, compound_name: str, include_none: bool = False) -> OrderedDict: """ Returns a dictionary containing sample names and retention times for the compound with the given name. :param compound_name: :param include_none: Whether samples where the compound was not found should be included in the results. """ # noqa: D400 times = OrderedDict() for sample in self: for result in sample.results_list: if == compound_name: times[sample.sample_name] = float(result.rt) break else: if include_none: times[sample.sample_name] = numpy.nan return times
[docs] def get_peak_areas(self, compound_name: str, include_none: bool = False) -> OrderedDict: """ Returns a dictionary containing sample names and peak areas for the compound with the given name. :param compound_name: :param include_none: Whether samples where the compound was not found should be included in the results. """ # noqa: D400 return self.get_areas_and_scores(compound_name, include_none)[0]
[docs] def get_areas_for_compounds( self, compound_names: Iterable[str], include_none: bool = False, ) -> "SamplesAreaDict": """ Returns a dictionary containing sample names and peak areas for the compounds with the given names. :param compound_names: :param include_none: Whether samples where none of the specified compounds were found should be included in the results. """ # noqa: D400 all_areas, all_scores = self.get_areas_and_scores_for_compounds(compound_names, include_none) return all_areas
[docs] def get_areas_and_scores_for_compounds( self, compound_names: Iterable[str], include_none: bool = False, ) -> Tuple["SamplesAreaDict", "SamplesScoresDict"]: """ Returns two dictionaries: one containing sample names and peak areas for the compounds with the given names, the other containing sample names and scores. :param compound_names: :param include_none: Whether samples where none of the specified compounds were found should be included in the results. :rtype: .. clearpage:: """ # noqa: D400 tmp_all_areas = SamplesAreaDict() tmp_all_scores = SamplesScoresDict() for name in compound_names: areas = self.get_peak_areas(name, True) scores = self.get_scores(name, True) for sample_name, area in areas.items(): if sample_name not in tmp_all_areas: tmp_all_areas[sample_name] = dict() tmp_all_scores[sample_name] = dict() tmp_all_areas[sample_name][name] = area tmp_all_scores[sample_name][name] = scores[sample_name] if include_none: return tmp_all_areas, tmp_all_scores else: all_areas = SamplesAreaDict() all_scores = SamplesScoresDict() for sample_name, compound_areas in tmp_all_areas.items(): if any(list(compound_areas.values())): all_areas[sample_name] = compound_areas all_scores[sample_name] = tmp_all_scores[sample_name] return all_areas, all_scores
[docs] def get_compounds(self) -> List[str]: """ Returns a list containing the names of the compounds present in the samples in alphabetical order. """ compounds = set() for sample in self: for result in sample.results_list: compounds.add( return sorted(compounds)
[docs] def get_scores(self, compound_name: str, include_none: bool = False) -> OrderedDict: """ Returns a dictionary containing sample names and scores for the compound with the given name. :param compound_name: :param include_none: Whether samples where the compound was not found should be included in the results. :rtype: .. clearpage:: """ # noqa: D400 return self.get_areas_and_scores(compound_name, include_none)[1]
[docs] def filter( # noqa: A003 # pylint: disable=redefined-builtin self: _SL, sample_names: Iterable[str], key: str = "sample_name", exclude: bool = False, ) -> _SL: """ Filter the list to only contain sample_names whose name is in ``sample_names``. :param sample_names: A list of sample names to include :param key: The name of the property in the sample to sort by. :param exclude: If :py:obj:`True`, any sample whose name is in ``sample_names`` will be excluded from the output, rather than included. """ new_sample_list = self.__class__() for sample in self: if exclude: if getattr(sample, key) in sample_names: continue else: if getattr(sample, key) not in sample_names: continue new_sample_list.append(sample) return new_sample_list
@property def sample_names(self) -> List[str]: """ Returns a list of sample names in the :class:`~.classes.SampleList`. """ return [sample.sample_name for sample in self]
[docs] @classmethod def from_json_file(cls: Type[_SL], filename: PathLike, **kwargs) -> _SL: r""" Construct a :class:`~.classes.SampleList` from JSON file. :param filename: The filename of the JSON file. :param \*\*kwargs: Keyword arguments passed to :meth:`domdf_python_tools.paths.PathPlus.load_json`. """ all_samples = cls() for sample in PathPlus(filename).load_json( json_library=sdjson, # type: ignore **kwargs, ): all_samples.append(Sample(**sample)) return all_samples
[docs]class BaseSamplePropertyDict(OrderedDict): """ OrderedDict to store a single property of a set of samples. Keys are the sample names and the values are dictionaries mapping compound names to property values. """ @property def sample_names(self) -> List[str]: """ Returns a list of sample names in the :class:`~.BaseSamplePropertyDict`. """ return list(self.keys()) @property def n_samples(self) -> int: """ Returns the number of samples in the :class:`~.BaseSamplePropertyDict`. """ return len(self.keys()) @property def n_compounds(self) -> int: """ Returns the number of compounds in the :class:`~.BaseSamplePropertyDict`. """ for val in self.values(): return len(val) return 0
[docs]class SamplesAreaDict(BaseSamplePropertyDict): """ :class:`collections.OrderedDict` to store area information parsed from MassHunter results CSV files. """
[docs] def get_compound_areas(self, compound_name: str) -> List[float]: """ Get the peak areas for the given compound in every sample. :param compound_name: """ areas = [] for sample_name, compound_areas in self.items(): for name, area in compound_areas.items(): if compound_name == name: if area is None: areas.append(0.0) else: areas.append(area) return areas
[docs]class SamplesScoresDict(BaseSamplePropertyDict): """ :class:`collections.OrderedDict` to store score information parsed from MassHunter results CSV files. """
[docs] def get_compound_scores(self, compound_name: str) -> List[float]: """ Get the peak scores for the given compound in every sample. :param compound_name: """ scores = [] for sample_name, compound_scores in self.items(): for name, score in compound_scores.items(): if compound_name == name: if score is None: scores.append(0.0) else: scores.append(score) return scores
@sdjson.encoders.register(Sample) @sdjson.encoders.register(Result) def encode_result_or_sample(obj): # noqa: D103 return dict(obj) @sdjson.encoders.register(set) def encode_set(obj): # noqa: D103 return list(obj) @sdjson.encoders.register(Decimal) def encode_decimal(obj): # noqa: D103 return str(obj)