""" GEPARD - Gepard-Enabled PARticle Detection Copyright (C) 2018 Lars Bittrich and Josef Brandt, Leibniz-Institut für Polymerforschung Dresden e. V. This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program, see COPYING. If not, see . """ import os import numpy as np class SpectraHandler(object): """ Handles Spectra formatting and storage. :return: """ def __init__(self, ramanctrl): super(SpectraHandler, self).__init__() self.ramanctrlname = ramanctrl.name self.dsetpath = None self.tmpspecpath = None self.curSpecIndex = None self.spectraBatchSize = None self.wavenumbers: np.ndarray = None if ramanctrl.name == 'WITecCOM': self.specConverter = WITecSpecConverter() else: self.specConverter = None def setDatasetPath(self, path): self.dsetpath = path self.createTmpSpecFolder() def setSpectraBatchSize(self, batchSize): self.spectraBatchSize = batchSize def createTmpSpecFolder(self): assert self.dsetpath is not None self.tmpspecpath = os.path.join(self.dsetpath, 'spectra') if not os.path.exists(self.tmpspecpath): os.mkdir(self.tmpspecpath) def registerBackground(self, backgroundid: int, spec: np.ndarray) -> None: fname: str = self.getPathForBackroundOfID(backgroundid) spec = self.mapSpecToWavenumbers(spec) np.save(fname, spec) def getBackGroundOfID(self, id: int) -> np.ndarray: path: str = self.getPathForBackroundOfID(id) assert os.path.exists(path), f'requested backgound file {path} does not exist!' return np.load(path) def registerNewSpectrum(self, spectrum, specIndex: int, deassamble: bool = False): assert self.tmpspecpath is not None, 'Attempting to save spectrum but no specPath is set...' if deassamble: wavenumbers, intensities = self.specConverter.deassembleSpecString(spectrum) else: assert type(spectrum) == np.ndarray wavenumbers, intensities = spectrum[:, 0], spectrum[:, 1] if not np.all(np.isfinite(spectrum)): spectrum = self.makeSpecFinite(spectrum) if specIndex == 0: np.save(self.getPathForWavenumbers(), wavenumbers) fname = os.path.join(self.tmpspecpath, f'Spectrum ({specIndex}).npy') np.save(fname, intensities) self.curSpecIndex = specIndex def createSummarizedSpecFiles(self): allSpectra = self.getAllSpectra() allspecfname = os.path.join(self.dsetpath, 'spectra.npy') np.save(allspecfname, allSpectra) wavelength: float = (self.specConverter.excitWavel if self.specConverter is not None else 0.0) self.createTrueMatchTxt(allSpectra, wavelength) def getAllSpectra(self): numSpectra = self.curSpecIndex + 1 wavenumbers = np.load(os.path.join(self.tmpspecpath, 'Wavenumbers.npy')) allSpectra = np.zeros((wavenumbers.shape[0], numSpectra+1)) allSpectra[:, 0] = wavenumbers for i in range(numSpectra): curSpecPath = os.path.join(self.tmpspecpath, f'Spectrum ({i}).npy') allSpectra[:, i+1] = np.load(curSpecPath) os.remove(curSpecPath) return allSpectra def createTrueMatchTxt(self, allSpectra, wavelength): def writeHeader(fp): fp.write('[WITEC_TRUEMATCH_ASCII_HEADER]\n\r') fp.write('Version = 2.0\n\r\n\r') def writeWavenumbers(fp, wavenumbers): fp.write('[XData]\n\r') for line in wavenumbers: fp.write(str(line) + '\n\r') def writeSpectrum(fp, intensities): fp.write('\n\r') fp.write('[SpectrumHeader]\n\r') fp.write(f'Title = Spectrum {specIndex} \n\r') fp.write(f'ExcitationWavelength = {wavelength}\n\r') fp.write(f'SpectrumSize = {specSize}\n\r') fp.write('XDataKind = 1/cm\n\r\n\r') fp.write('[SampleMetaData]\n\r') fp.write(f'int Spectrum_Number = {specIndex}\n\r\n\r') fp.write('[SpectrumData]\n\r') for line in intensities: fp.write(str(line) + '\n\r') wavenumbers = allSpectra[:, 0] spectra = allSpectra[:, 1:] specSize = allSpectra.shape[0] del allSpectra numSpectra = spectra.shape[1] if self.spectraBatchSize is None: # if it was not explicitely set, it will be set to produce only one batch self.spectraBatchSize = numSpectra numBatches = int(np.ceil(numSpectra/self.spectraBatchSize)) for batchIndex in range(numBatches): outName = os.path.join(self.dsetpath, f'SpectraForTrueMatch {batchIndex}.txt') if os.path.exists(outName): os.remove(outName) if batchIndex < numBatches-1: specIndicesInBatch = np.arange(batchIndex*self.spectraBatchSize, (batchIndex+1)*self.spectraBatchSize) else: specIndicesInBatch = np.arange(batchIndex*self.spectraBatchSize, numSpectra) with open(outName, 'w') as fp: writeHeader(fp) writeWavenumbers(fp, wavenumbers) for specIndex in specIndicesInBatch: spec = spectra[:, specIndex] writeSpectrum(fp, spec) def mapSpecToWavenumbers(self, spec: np.ndarray) -> np.ndarray: """ In case of Thermo FTIR, not all recoreded spectra are guaranteed to have the same wavenumber range. To account for that, this function here checks wavenumbers and corrects, if necessary.. :param spec: np.array, shape [numWavenumbers, 2] :return correctedSpec: """ def getIntensityClosestToWavenumber(wavenumber: float) -> float: closestIndex = np.argmin(abs(spec[:, 0] - wavenumber)) return spec[closestIndex, 1] if self.ramanctrlname == "ThermoFTIRCom": if self.wavenumbers is None: self.wavenumbers = spec[:, 0] np.save(self.getPathForWavenumbers(), self.wavenumbers) else: if not np.array_equal(spec[:, 0], self.wavenumbers): corrSpec: np.ndarray = np.zeros((self.wavenumbers.shape[0], 2)) corrSpec[:, 0] = self.wavenumbers for i in range(corrSpec.shape[0]): corrSpec[i, 1] = getIntensityClosestToWavenumber(corrSpec[i, 0]) spec = corrSpec return spec def makeSpecFinite(self, spectrum: np.ndarray) -> np.ndarray: """ Removes all np.inf and np.nan from the spec. The missing values are obtained by linear interpolation between the closest valid values. """ def _get_lower_finite_index(invIndex: int, valInd: np.ndarray) -> int: possIndices: np.ndarray = valInd[valInd < invIndex] diff: np.ndarray = np.abs(possIndices-invIndex) return possIndices[np.argmin(diff)] def _get_higher_finite_index(invIndex: int, valInd: np.ndarray) -> int: possIndices: np.ndarray = valInd[valInd > invIndex] diff: np.ndarray = np.abs(possIndices-invIndex) return possIndices[np.argmin(diff)] _intensities: np.ndarray = spectrum[:, 1] finite: np.ndarray = np.isfinite(_intensities) validIndices: np.ndarray = np.where(finite == True)[0] failindices: np.ndarray = np.where(finite == False)[0] for ind in failindices: lowerInd: int = _get_lower_finite_index(ind, validIndices) lowerVal: float = _intensities[lowerInd] higherInd: int = _get_higher_finite_index(ind, validIndices) higherVal: float = _intensities[higherInd] diffInt: int = higherInd-lowerInd spectrum[ind, 1] = _intensities[lowerInd] + (ind-lowerInd) * (higherVal-lowerVal)/diffInt return spectrum def getPathForBackroundOfID(self, id: int) -> str: assert self.tmpspecpath is not None, 'temp spec path not set for SpecHandler!' return os.path.join(self.tmpspecpath, f'background {id}.npy') def getPathForWavenumbers(self) -> str: assert self.tmpspecpath is not None, 'temp spec path not set for SpecHandler!' return os.path.join(self.tmpspecpath, 'Wavenumbers.npy') class WITecSpecConverter: def __init__(self): self.excitWavel: int = -1 def deassembleSpecString(self, specString: str): keywordLines = self.getKeyWordLines(specString) try: specSize = self.getSpecSize(specString, keywordLines['SpectrumSize'][0]) except: print(keywordLines) raise wavenumbers = self.getWavenumbers(specString, keywordLines['[XData]'][0], specSize) xDataKind = self.getXDataKind(specString, keywordLines['XDataKind'][0]) self.excitWavel = self.getExcitationWavelength(specString, keywordLines['ExcitationWavelength'][0]) if xDataKind == 'nm': wavenumbers = self.convertWavenumbersFrom_nm_to_Percm(wavenumbers, self.excitWavel) else: print('warning, unexpected xDataKind:', xDataKind) print('please check how to deal with it!!!') assert False averaged_counts = self.getAveragedSpectra(specString, keywordLines['SpectrumData'], specSize) return wavenumbers, averaged_counts def getKeyWordLines(self, specString): keywordLines = {'[WITEC_TRUEMATCH_ASCII_HEADER]': [], '[XData]': [], 'ExcitationWavelength': [], 'SpectrumSize': [], 'XDataKind': [], 'SpectrumHeader': [], 'SampleMetaData': [], 'SpectrumData': []} for index, line in enumerate(specString): for key in keywordLines.keys(): if line.find(key) != -1: keywordLines[key].append(index) return keywordLines def getSpecSize(self, specString, specSizeIndex): line = specString[specSizeIndex] specSize = [int(s) for s in line.split() if self.isNumber(s)] assert len(specSize) == 1 return specSize[0] def getExcitationWavelength(self, specString, excitWavenumIndex): line = specString[excitWavenumIndex] excitWavel = [float(s) for s in line.split() if self.isNumber(s)] assert len(excitWavel) == 1 return excitWavel[0] def getXDataKind(self, specString, xDataKindIndex): line = specString[xDataKindIndex] return line.split()[-1] def getWavenumbers(self, specString, startXDataIndex, specSize): wavenumbers = [] curIndex = startXDataIndex + 1 curLine = specString[curIndex] while self.isNumber(curLine): wavenumbers.append(float(curLine)) curIndex += 1 curLine = specString[curIndex] assert len(wavenumbers) == specSize return wavenumbers def convertWavenumbersFrom_nm_to_Percm(self, wavenumbers, excit_nm): newWavenumbers = [] for abs_nm in wavenumbers: raman_shift = 1E7 / excit_nm - 1E7 / abs_nm newWavenumbers.append(raman_shift) return newWavenumbers def getAveragedSpectra(self, specString, startIndices, specSize): startIndices = [i + 1 for i in startIndices] # the spectrum starts one line AFTER the SpectrumData-Tag spectrum = [] for index in range(specSize): curSlice = [float(specString[index + startIndex]) for startIndex in startIndices] spectrum.append(np.mean(curSlice)) return spectrum def isNumber(self, string): isNumber = False try: float(string) isNumber = True except ValueError: pass return isNumber