specHandler.py 12.3 KB
Newer Older
Josef Brandt's avatar
Josef Brandt committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
"""
GEPARD - Gepard-Enabled PARticle Detection
Copyright (C) 2018  Lars Bittrich and Josef Brandt, Leibniz-Institut für 
Polymerforschung Dresden e. V. <bittrich-lars@ipfdd.de>    

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program, see COPYING.  
If not, see <https://www.gnu.org/licenses/>.
"""

import os
import numpy as np

Josef Brandt's avatar
Josef Brandt committed
24 25

class SpectraHandler(object):
Josef Brandt's avatar
Josef Brandt committed
26
    """
Josef Brandt's avatar
Josef Brandt committed
27
    Handles Spectra formatting and storage.
Josef Brandt's avatar
Josef Brandt committed
28 29
    :return:
    """
Josef Brandt's avatar
Josef Brandt committed
30 31 32
    def __init__(self, ramanctrl):
        super(SpectraHandler, self).__init__()
        self.ramanctrlname = ramanctrl.name
Josef Brandt's avatar
Josef Brandt committed
33 34 35
        self.dsetpath = None
        self.tmpspecpath = None
        self.curSpecIndex = None
36
        self.spectraBatchSize = None
Josef Brandt's avatar
Josef Brandt committed
37 38 39 40 41 42
        self.wavenumbers: np.ndarray = None
        if ramanctrl.name == 'WITecCOM':
            self.specConverter = WITecSpecConverter()
        else:
            self.specConverter = None

Josef Brandt's avatar
Josef Brandt committed
43 44
    def setDatasetPath(self, path):
        self.dsetpath = path
Josef Brandt's avatar
Josef Brandt committed
45 46
        self.createTmpSpecFolder()

47 48
    def setSpectraBatchSize(self, batchSize):
        self.spectraBatchSize = batchSize
Josef Brandt's avatar
Josef Brandt committed
49

Josef Brandt's avatar
Josef Brandt committed
50 51 52 53 54
    def createTmpSpecFolder(self):
        assert self.dsetpath is not None
        self.tmpspecpath = os.path.join(self.dsetpath, 'spectra')
        if not os.path.exists(self.tmpspecpath):
            os.mkdir(self.tmpspecpath)
Josef Brandt's avatar
Josef Brandt committed
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76

    def registerBackground(self, backgroundid: int, spec: np.ndarray) -> None:
        fname: str = self.getPathForBackroundOfID(backgroundid)
        spec = self.mapSpecToWavenumbers(spec)
        np.save(fname, spec)

    def getBackGroundOfID(self, id: int) -> np.ndarray:
        path: str = self.getPathForBackroundOfID(id)
        assert os.path.exists(path), f'requested backgound file {path} does not exist!'
        return np.load(path)

    def registerNewSpectrum(self, spectrum, specIndex: int, deassamble: bool = False):
        assert self.tmpspecpath is not None, 'Attempting to save spectrum but no specPath is set...'
        if deassamble:
            wavenumbers, intensities = self.specConverter.deassembleSpecString(spectrum)
        else:
            assert type(spectrum) == np.ndarray
            wavenumbers, intensities = spectrum[:, 0], spectrum[:, 1]

        if not np.all(np.isfinite(spectrum)):
            spectrum = self.makeSpecFinite(spectrum)

Josef Brandt's avatar
Josef Brandt committed
77
        if specIndex == 0:
Josef Brandt's avatar
Josef Brandt committed
78 79
            np.save(self.getPathForWavenumbers(), wavenumbers)

Josef Brandt's avatar
Josef Brandt committed
80
        fname = os.path.join(self.tmpspecpath, f'Spectrum ({specIndex}).npy')
Josef Brandt's avatar
Josef Brandt committed
81
        np.save(fname, intensities)
Josef Brandt's avatar
Josef Brandt committed
82
        self.curSpecIndex = specIndex
Josef Brandt's avatar
Josef Brandt committed
83

Josef Brandt's avatar
Josef Brandt committed
84 85 86 87
    def createSummarizedSpecFiles(self):
        allSpectra = self.getAllSpectra()
        allspecfname = os.path.join(self.dsetpath, 'spectra.npy')
        np.save(allspecfname, allSpectra)
Josef Brandt's avatar
Josef Brandt committed
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
        wavelength: float = (self.specConverter.excitWavel if self.specConverter is not None else 0.0)
        self.createTrueMatchTxt(allSpectra, wavelength)

    def getAllSpectra(self):
        numSpectra = self.curSpecIndex + 1
        wavenumbers = np.load(os.path.join(self.tmpspecpath, 'Wavenumbers.npy'))
        allSpectra = np.zeros((wavenumbers.shape[0], numSpectra+1))
        allSpectra[:, 0] = wavenumbers
        for i in range(numSpectra):
            curSpecPath = os.path.join(self.tmpspecpath, f'Spectrum ({i}).npy')
            allSpectra[:, i+1] = np.load(curSpecPath)
            os.remove(curSpecPath)
        return allSpectra

    def createTrueMatchTxt(self, allSpectra, wavelength):
        def writeHeader(fp):
            fp.write('[WITEC_TRUEMATCH_ASCII_HEADER]\n\r')
            fp.write('Version = 2.0\n\r\n\r')

        def writeWavenumbers(fp, wavenumbers):
            fp.write('[XData]\n\r')
            for line in wavenumbers:
                fp.write(str(line) + '\n\r')

        def writeSpectrum(fp, intensities):
            fp.write('\n\r')
            fp.write('[SpectrumHeader]\n\r')
            fp.write(f'Title = Spectrum {specIndex} \n\r')
            fp.write(f'ExcitationWavelength = {wavelength}\n\r')
            fp.write(f'SpectrumSize = {specSize}\n\r')
            fp.write('XDataKind = 1/cm\n\r\n\r')
            fp.write('[SampleMetaData]\n\r')
            fp.write(f'int Spectrum_Number = {specIndex}\n\r\n\r')
            fp.write('[SpectrumData]\n\r')
            for line in intensities:
                    fp.write(str(line) + '\n\r')

        wavenumbers = allSpectra[:, 0]
        spectra = allSpectra[:, 1:]
        specSize = allSpectra.shape[0]
        del allSpectra
        numSpectra = spectra.shape[1]
        if self.spectraBatchSize is None:  # if it was not explicitely set, it will be set to produce only one batch
            self.spectraBatchSize = numSpectra
        numBatches = int(np.ceil(numSpectra/self.spectraBatchSize))

        for batchIndex in range(numBatches):
            outName = os.path.join(self.dsetpath, f'SpectraForTrueMatch {batchIndex}.txt')
            if os.path.exists(outName):
                os.remove(outName)

            if batchIndex < numBatches-1:
                specIndicesInBatch = np.arange(batchIndex*self.spectraBatchSize, (batchIndex+1)*self.spectraBatchSize)
            else:
                specIndicesInBatch = np.arange(batchIndex*self.spectraBatchSize, numSpectra)

            with open(outName, 'w') as fp:
                writeHeader(fp)
                writeWavenumbers(fp, wavenumbers)

                for specIndex in specIndicesInBatch:
                    spec = spectra[:, specIndex]
                    writeSpectrum(fp, spec)

    def mapSpecToWavenumbers(self, spec: np.ndarray) -> np.ndarray:
        """
        In case of Thermo FTIR, not all recoreded spectra are guaranteed to have the same wavenumber range.
        To account for that, this function here checks wavenumbers and corrects, if necessary..
        :param spec: np.array, shape [numWavenumbers, 2]
        :return correctedSpec:
        """
        def getIntensityClosestToWavenumber(wavenumber: float) -> float:
            closestIndex = np.argmin(abs(spec[:, 0] - wavenumber))
            return spec[closestIndex, 1]

        if self.ramanctrlname == "ThermoFTIRCom":
            if self.wavenumbers is None:
                self.wavenumbers = spec[:, 0]
                np.save(self.getPathForWavenumbers(), self.wavenumbers)
            else:
                if not np.array_equal(spec[:, 0], self.wavenumbers):
                    corrSpec: np.ndarray = np.zeros((self.wavenumbers.shape[0], 2))
                    corrSpec[:, 0] = self.wavenumbers

                    for i in range(corrSpec.shape[0]):
                        corrSpec[i, 1] = getIntensityClosestToWavenumber(corrSpec[i, 0])

                    spec = corrSpec

        return spec

    def makeSpecFinite(self, spectrum: np.ndarray) -> np.ndarray:
        """
        Removes all np.inf and np.nan from the spec. The missing values are obtained by linear interpolation
        between the closest valid values.
        """
        def _get_lower_finite_index(invIndex: int, valInd: np.ndarray) -> int:
            possIndices: np.ndarray = valInd[valInd < invIndex]
            diff: np.ndarray = np.abs(possIndices-invIndex)
            return possIndices[np.argmin(diff)]

        def _get_higher_finite_index(invIndex: int, valInd: np.ndarray) -> int:
            possIndices: np.ndarray = valInd[valInd > invIndex]
            diff: np.ndarray = np.abs(possIndices-invIndex)
            return possIndices[np.argmin(diff)]

        _intensities: np.ndarray = spectrum[:, 1]
        finite: np.ndarray = np.isfinite(_intensities)
        validIndices: np.ndarray = np.where(finite == True)[0]
        failindices: np.ndarray = np.where(finite == False)[0]
        for ind in failindices:
            lowerInd: int = _get_lower_finite_index(ind, validIndices)
            lowerVal: float = _intensities[lowerInd]
            higherInd: int = _get_higher_finite_index(ind, validIndices)
            higherVal: float = _intensities[higherInd]
            diffInt: int = higherInd-lowerInd
            spectrum[ind, 1] = _intensities[lowerInd] + (ind-lowerInd) * (higherVal-lowerVal)/diffInt

        return spectrum


    def getPathForBackroundOfID(self, id: int) -> str:
        assert self.tmpspecpath is not None, 'temp spec path not set for SpecHandler!'
        return os.path.join(self.tmpspecpath, f'background {id}.npy')

    def getPathForWavenumbers(self) -> str:
        assert self.tmpspecpath is not None, 'temp spec path not set for SpecHandler!'
        return os.path.join(self.tmpspecpath, 'Wavenumbers.npy')


class WITecSpecConverter:
    def __init__(self):
        self.excitWavel: int = -1

    def deassembleSpecString(self, specString: str):
Josef Brandt's avatar
Josef Brandt committed
223
        keywordLines = self.getKeyWordLines(specString)
Josef Brandt's avatar
Josef Brandt committed
224

Josef Brandt's avatar
Josef Brandt committed
225 226 227 228 229 230 231 232
        try:
            specSize = self.getSpecSize(specString, keywordLines['SpectrumSize'][0])
        except:
            print(keywordLines)
            raise
        wavenumbers = self.getWavenumbers(specString, keywordLines['[XData]'][0], specSize)
        xDataKind = self.getXDataKind(specString, keywordLines['XDataKind'][0])
        self.excitWavel = self.getExcitationWavelength(specString, keywordLines['ExcitationWavelength'][0])
Josef Brandt's avatar
Josef Brandt committed
233

Josef Brandt's avatar
Josef Brandt committed
234 235 236 237 238 239
        if xDataKind == 'nm':
            wavenumbers = self.convertWavenumbersFrom_nm_to_Percm(wavenumbers, self.excitWavel)
        else:
            print('warning, unexpected xDataKind:', xDataKind)
            print('please check how to deal with it!!!')
            assert False
Josef Brandt's avatar
Josef Brandt committed
240

Josef Brandt's avatar
Josef Brandt committed
241 242
        averaged_counts = self.getAveragedSpectra(specString, keywordLines['SpectrumData'], specSize)
        return wavenumbers, averaged_counts
Josef Brandt's avatar
Josef Brandt committed
243

Josef Brandt's avatar
Josef Brandt committed
244 245 246 247
    def getKeyWordLines(self, specString):
        keywordLines = {'[WITEC_TRUEMATCH_ASCII_HEADER]': [],
                        '[XData]': [],
                        'ExcitationWavelength': [],
Josef Brandt's avatar
Josef Brandt committed
248 249
                        'SpectrumSize': [],
                        'XDataKind': [],
Josef Brandt's avatar
Josef Brandt committed
250 251 252
                        'SpectrumHeader': [],
                        'SampleMetaData': [],
                        'SpectrumData': []}
Josef Brandt's avatar
Josef Brandt committed
253

Josef Brandt's avatar
Josef Brandt committed
254 255 256 257 258
        for index, line in enumerate(specString):
            for key in keywordLines.keys():
                if line.find(key) != -1:
                    keywordLines[key].append(index)
        return keywordLines
Josef Brandt's avatar
Josef Brandt committed
259

Josef Brandt's avatar
Josef Brandt committed
260 261 262 263 264
    def getSpecSize(self, specString, specSizeIndex):
        line = specString[specSizeIndex]
        specSize = [int(s) for s in line.split() if self.isNumber(s)]
        assert len(specSize) == 1
        return specSize[0]
Josef Brandt's avatar
Josef Brandt committed
265

Josef Brandt's avatar
Josef Brandt committed
266 267 268 269 270
    def getExcitationWavelength(self, specString, excitWavenumIndex):
        line = specString[excitWavenumIndex]
        excitWavel = [float(s) for s in line.split() if self.isNumber(s)]
        assert len(excitWavel) == 1
        return excitWavel[0]
Josef Brandt's avatar
Josef Brandt committed
271

Josef Brandt's avatar
Josef Brandt committed
272 273 274
    def getXDataKind(self, specString, xDataKindIndex):
        line = specString[xDataKindIndex]
        return line.split()[-1]
Josef Brandt's avatar
Josef Brandt committed
275

Josef Brandt's avatar
Josef Brandt committed
276 277
    def getWavenumbers(self, specString, startXDataIndex, specSize):
        wavenumbers = []
Josef Brandt's avatar
Josef Brandt committed
278
        curIndex = startXDataIndex + 1
Josef Brandt's avatar
Josef Brandt committed
279
        curLine = specString[curIndex]
Josef Brandt's avatar
Josef Brandt committed
280

Josef Brandt's avatar
Josef Brandt committed
281 282 283 284
        while self.isNumber(curLine):
            wavenumbers.append(float(curLine))
            curIndex += 1
            curLine = specString[curIndex]
Josef Brandt's avatar
Josef Brandt committed
285

Josef Brandt's avatar
Josef Brandt committed
286 287
        assert len(wavenumbers) == specSize
        return wavenumbers
Josef Brandt's avatar
Josef Brandt committed
288

Josef Brandt's avatar
Josef Brandt committed
289 290 291
    def convertWavenumbersFrom_nm_to_Percm(self, wavenumbers, excit_nm):
        newWavenumbers = []
        for abs_nm in wavenumbers:
Josef Brandt's avatar
Josef Brandt committed
292
            raman_shift = 1E7 / excit_nm - 1E7 / abs_nm
Josef Brandt's avatar
Josef Brandt committed
293 294
            newWavenumbers.append(raman_shift)
        return newWavenumbers
Josef Brandt's avatar
Josef Brandt committed
295

Josef Brandt's avatar
Josef Brandt committed
296
    def getAveragedSpectra(self, specString, startIndices, specSize):
Josef Brandt's avatar
Josef Brandt committed
297
        startIndices = [i + 1 for i in startIndices]  # the spectrum starts one line AFTER the SpectrumData-Tag
Josef Brandt's avatar
Josef Brandt committed
298 299 300 301 302
        spectrum = []
        for index in range(specSize):
            curSlice = [float(specString[index + startIndex]) for startIndex in startIndices]
            spectrum.append(np.mean(curSlice))
        return spectrum
Josef Brandt's avatar
Josef Brandt committed
303

Josef Brandt's avatar
Josef Brandt committed
304 305 306 307 308 309 310 311
    def isNumber(self, string):
        isNumber = False
        try:
            float(string)
            isNumber = True
        except ValueError:
            pass
        return isNumber