Commit b2e6bcbc authored by Josef Brandt's avatar Josef Brandt

Tmp removal of FTIRInterface

Should not be online, until approved by Thermo Fischer
parent 2ff8dd71
# -*- coding: utf-8 -*-
"""
GEPARD - Gepard-Enabled PARticle Detection
Copyright (C) 2018 Lars Bittrich and Josef Brandt, Leibniz-Institut für
Polymerforschung Dresden e. V. <bittrich-lars@ipfdd.de>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program, see COPYING.
If not, see <https://www.gnu.org/licenses/>.
"""
import os
import win32ui
import cv2
import dde
import time
import numpy as np
import tempfile
import jcamp
try:
from .ramanbase import RamanBase
from .thermoFTIR.ddeclient import DDEClient
from .thermoFTIR.processScreenshot import ImageInterface
from .specHandler import SpectraHandler
from .comErrorWrapper import comErrorRepeater
from ..analysis.particleContainer import ParticleContainer
from ..analysis.particleAndMeasurement import Measurement
# from .configRaman import RamanSettingParam
except (ValueError, ImportError):
from ramancom.ramanbase import RamanBase
from ramancom.thermoFTIR.ddeclient import DDEClient
from ramancom.thermoFTIR.processScreenshot import ImageInterface
from ramancom. specHandler import SpectraHandler
from ramancom.comErrorWrapper import comErrorRepeater
# from analysis.particleContainer import ParticleContainer
# from analysis.particleAndMeasurement import Measurement
# from configRaman import RamanSettingParam
class OmnicDDE:
Application = "Omnic"
Thread = "Spectra"
def __init__(self, logger):
self.server = None
self.conversation = None
self.ddeClient = None
self.logger = logger
def connect(self):
self.server = dde.CreateServer()
self.server.Create("MyOmnic")
self.conversation = dde.CreateConversation(self.server)
self.conversation.ConnectTo(self.Application, self.Thread)
self.ddeClient: DDEClient = DDEClient('OMNIC', 'Spectra')
self.logger.info("Connection With Omnic Established")
def request(self, item: str) -> str:
return self.ddeClient.request(item).decode()
def execute(self, cmd: str) -> None:
self.conversation.Exec(cmd)
def poke(self, parameter: str, value) -> None:
# TODO: IMPLEMENT!
pass
# def new_window(self, windowName: str = "") -> None:
# if windowName == "":
# cmd = '[NewWindow]'
# else:
# cmd = f'[NewWindow """{windowName}"""]'
# self.execute(cmd)
# currentWindow = self.request('Window Title')
# self.logger.info(f"Window Created with Name: {currentWindow}")
#
# def clear_window(self):
# curWindow = self.request('Window Title')
# self.execute("[Select All]")
# self.execute("[DeleteSelectedSpectra]")
# self.logger.debug(f"Window {curWindow} Cleared")
@comErrorRepeater
def collect_backround(self, title: str = ""):
"""
Triggers OMNIC's background acquisition function. The current single beam spectrum is directly obtained
without any data processing.
"""
cmd = '[Invoke CollectBackground """' + title + '""" Auto Polling]'
self.logger.info(cmd)
self.execute(cmd)
self.logger.info(f"Collected Background {title} ...")
# curWindow = self.request('Window Title')
# self.execute('[MaximizeWindow """' + curWindow + '"""]')
menustatus: str = self.request('MenuStatus CollectBackground')
while menustatus == "Disabled":
time.sleep(1)
menustatus = self.request('MenuStatus CollectBackground')
self.logger.debug("Background Collection Finished")
# If the user does not specify the title, he probably doesn't want to save it
if title == "":
time.sleep(0.5)
self.execute("[DeleteSelectedSpectra]")
@comErrorRepeater
def collect_sample(self, title: str = ""):
"""
Triggers OMNIC's collect sample spectrum function. Requires set backgrounds and directly processes spectrum.
"""
cmd = f'[Invoke CollectSample """{title}""" Auto Polling]'
self.execute(cmd)
self.logger.info(f"Collecting Sample Spectrum {title} ...")
# currentWindow = self.request('Window Title')
# self.execute(f'[MaximizeWindow """{currentWindow}"""]')
menustate: str = self.request('MenuStatus CollectSample')
while menustate == "Disabled":
time.sleep(1)
menustate = self.request('MenuStatus CollectSample')
@comErrorRepeater
def collect_raw_spectrum(self, specTitle: str) -> np.ndarray:
"""
Exploits OMNIC's background spec function to avoid data processing. A spectrum is recorded, saved and
converted to np.ndarray. Finally the spectrum is deleted from OMNIC to avoid slow down and eventually crashing
of dde communication with many hundred spectra in OMNIC.
"""
self.collect_backround(specTitle)
spectrum: np.ndarray = self.get_spec_as_array(specTitle)
self.execute("[DeleteSelectedSpectra]")
return spectrum
@comErrorRepeater
def get_spec_as_array(self, specName: str) -> np.ndarray:
with tempfile.TemporaryDirectory() as tmpdir:
fpath: str = self.save_spec_to_file(specName, tmpdir)
jdx: dict = jcamp.JCAMP_reader(fpath)
spec: np.ndarray = np.transpose(np.vstack((jdx['x'], jdx['y'])))
return spec
@comErrorRepeater
def save_spec_to_file(self, specName: str, dirPath: str) -> str:
"""
Selects and saves the spectra of name to the desired directory. Returns the final specPath
"""
fpath: str = os.path.join(dirPath, specName + '.jdx')
self.logger.debug(f'saving {specName} to {dirPath}')
self.execute(f'[Select """{specName}"""]')
self.execute(rf'[ExportAs """{fpath}"""]')
return fpath
@comErrorRepeater
def import_spectrum(self, csvFname: str) -> None:
self.execute(rf'[Import """{csvFname}"""]')
self.logger.debug(f'imported spectrum {csvFname}')
@comErrorRepeater
def search_library(self, specName: str) -> dict:
"""
Searchs for the selected spectrum in the databases being configured in OMNICPicta.
It returns the top 10 found results, with hit quality as key and spec name as value.
"""
self.execute(f'[Select """{specName}"""]')
_results: dict = {}
self.execute("[Search]")
for i in range(10):
try:
self.execute(f'[SelectHit {i+1}]')
except dde.error:
self.logger.error(f'requested search hit index {i} was no accessible for spec {specName}.')
break
resultMatch: str = self.request('SearchHit MatchValue')
resultIDX: str = self.request('SearchHit Index')
resultLib: str = self.request('SearchHit FileName')
cmd = '[GetLibSpectrumTitle ' + resultIDX + ' """' + resultLib + '"""]'
self.execute(cmd)
resultTitle = self.request("Result Current")
hqi: float = float(resultMatch.replace(',', '.'))
_results[hqi] = resultTitle
self.execute("[DeleteSelectedSpectra]")
time.sleep(0.1)
return _results
@comErrorRepeater
def set_num_scans(self, numScans: int):
self.conversation.Poke('Collect NumScans', str(numScans))
@comErrorRepeater
def get_stage_coords(self) -> tuple:
self.execute('[StageGetXY]')
strXY: list = self.request("Result Current").split()
x: float = float(strXY[0][2:].replace(',', '.'))
y: float = float(strXY[1][2:].replace(',', '.'))
self.execute('[StageGetZ]')
strZ: str = self.request("Result Current")
z: float = float(strZ[2:].replace(',', '.'))
return x, y, z
@comErrorRepeater
def get_stage_z(self) -> float:
self.execute('[StageGetZ]')
strZ: str = self.request("Result Current")
z: float = float(strZ[2:].replace(',', '.'))
return z
@comErrorRepeater
def set_stage_xy(self, x: float, y: float):
strCoords: list = [str(val).replace('.', ',') for val in [x, y]]
self.execute(f'[StageSetXY {strCoords[0]} {strCoords[1]}]')
@comErrorRepeater
def set_stage_z(self, z: float):
zstr: str = str(z).replace('.', ',')
self.execute(f'[StageSetZ {zstr}]')
@comErrorRepeater
def set_aperture(self, width: float, height: float, angle: float) -> None:
width = np.clip(width, 5, 400)
height = np.clip(height, 5, 400)
self.logger.info(f'setting aperture to {np.round(width)} {np.round(height)} {np.round(angle)}')
self.execute(f'[ApertureSetXYR {np.round(width)} {np.round(height)} {np.round(angle)}]')
def disconnect(self):
self.server.Destroy()
self.ddeClient.__del__()
self.logger.info("DDE Connection with Omnic was Terminated")
class ApertureGroup(object):
def __init__(self, index: int):
self.index: int = index
self.areas: list = []
self.specIndices: list = []
@property
def widthHeight(self) -> float:
return np.sqrt(np.mean(self.areas))
class ThermoFTIRCom(RamanBase):
magn = 20
ramanParameters = []
def __init__(self, logger):
super().__init__()
self.name = 'ThermoFTIRCom'
self.logger = logger
self.dde: OmnicDDE = OmnicDDE(logger)
self.connected: bool = False
self.pixelscale = None
self.imageInterface: ImageInterface = ImageInterface(logger)
self.initialZ: float = 0.0
self.specHandler: SpectraHandler = SpectraHandler(self)
self.aptGroups: list = []
def connect(self):
try:
self.dde.connect()
self.connected = True
except dde.error:
self.connected = False
self.logger.error('Connection to Thermo FTIR / Omnic did not work!')
self.initialZ = self.getUserZ()
return self.connected
def updateImageConfig(self, dsetPath: str, ) -> None:
self.imageInterface.update_config_from_directory(dsetPath)
def getRamanPositionShift(self):
return 0, 0
def disconnect(self):
if self.connected:
self.dde.disconnect()
self.connected = False
def getPosition(self):
assert self.connected
return self.dde.get_stage_coords()
def moveToAbsolutePosition(self, x: float, y: float, z: float = None, epsxy: float = 0.5, epsz: float = 1):
assert self.connected
self.dde.set_stage_xy(x, y)
if z is not None:
self.moveZto(z)
x_set, y_set, z_set = self.dde.get_stage_coords()
assert abs(x_set - x) <= epsxy
assert abs(y_set - y) <= epsxy
if z is not None:
diffz: float = abs(z_set - z)
assert diffz <= epsz, f'diff z is {diffz}, max allowed: {epsz}'
def moveZto(self, z, epsz=1):
assert self.connected
self.dde.set_stage_z(z)
x_set, y_set, z_set = self.dde.get_stage_coords()
diffz: float = abs(z_set - z)
assert diffz <= epsz, f'diff z is {diffz}, max allowed: {epsz}'
def setAperture(self, width, height, angle) -> None:
self.dde.set_aperture(width, height, angle)
def saveImage(self, fname):
"""
Save current camera image to file name
:return:
"""
assert self.connected
assert self.imageInterface.correctConfig
img = self.imageInterface.get_videoImage()
img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
cv2.imwrite(fname, img)
def getImageDimensions(self, mode='df'):
"""
Get the image width and height in um and the orientation angle in degrees.
"""
assert self.connected
assert self.imageInterface.correctConfig
x_start, y_start, x_end, y_end = self.imageInterface.boundingBox
width: float = (x_end - x_start) * self.imageInterface.pixelscale
height: float = (y_end - y_start) * self.imageInterface.pixelscale
angle: float = 0.0
return width, height, angle
def getSoftwareZ(self):
return self.initialZ
def getUserZ(self):
return self.dde.get_stage_z()
def initiateMeasurement(self, ramanSettings):
self.logger.info('measurement start')
self.specHandler.setDatasetPath(ramanSettings['path'])
self.aptGroups = self._groupApertures(ramanSettings['Apertures'])
self._collectBackgrounds(self.aptGroups)
def triggerMeasurement(self, index) -> None:
self.logger.info(f'measuring point index {index}')
spectrum: np.ndarray = self.dde.collect_raw_spectrum(f'Spectrum {index}')
spectrum = self.specHandler.mapSpecToWavenumbers(spectrum)
backgroundID = self._getAptID_toSpecID(index)
backgroundSpec: np.ndarray = self.specHandler.getBackGroundOfID(backgroundID)
spectrum = self._backgroundcorrect(spectrum, backgroundSpec)
self.specHandler.registerNewSpectrum(spectrum, index)
def finishMeasurement(self, aborted=False):
self.logger.info(f'Measurement finished, aborted: {aborted}')
if not aborted:
self.specHandler.createSummarizedSpecFiles()
def evaluateSpectra(self, particleContainer) -> None: # Todo: Reimplement ParticleContainer typehint
"""
Passes the acquired spectra to the OMNIC library search to obtain the spectral assignments.
"""
spectra: np.ndarray = self.specHandler.getAllSpectra()
resultDicts: list = []
results: list = []
hqis: list = []
self.logger.info(f'getting spectra assignments for {spectra.shape[1]-1} spectra')
with tempfile.TemporaryDirectory() as tmpdir:
for i in range(spectra.shape[1]-1):
assert np.all(np.isfinite(spectra[:, i+1])), f'Spectrum index {i} to reimport to OMNIC for db search ' \
f'is not finite. Please check!'
curSpec: np.ndarray = np.transpose(np.vstack((spectra[:, 0], spectra[:, i+1])))
specName: str = f'spec{i}.csv'
fname: str = os.path.join(tmpdir, specName)
self._export_spec_to_csv(fname, curSpec)
self.dde.import_spectrum(fname)
curResults: dict = self.dde.search_library(specName)
if len(curResults) > 0:
highestHQI: float = max(curResults.keys())
resultDicts.append(curResults)
self.logger.debug(f'Results for spec {i}: {curResults}')
hqis.append(highestHQI)
results.append(curResults[highestHQI])
self.logger.debug(f'highestHQI: {highestHQI}')
else:
self.logger.debug(f'No results for spec {i}')
def _export_spec_to_csv(self, fname: str, spec: np.ndarray) -> None:
"""
Exports the spectra to the designated path, so it can be imported by OMNIC.
"""
np.savetxt(fname, spec, delimiter=';', fmt='%f')
def _collectBackgrounds(self, apertureGroups: list) -> None:
self.logger.info(f'start collecting {len(apertureGroups)} backgrounds')
for aptGroup in apertureGroups:
widthHeight = aptGroup.widthHeight
self.dde.set_aperture(widthHeight, widthHeight, 0)
spec = self.dde.collect_raw_spectrum(f'Background {aptGroup.index}')
self.specHandler.registerBackground(aptGroup.index, spec)
def _backgroundcorrect(self, sample: np.ndarray, bground: np.ndarray) -> np.ndarray:
"""
:param sample: The sample spec (first Col: wavenumbers, second Col: Intensities)
:param bground: The background spec (first Col: wavenumbers, second Col: Intensities)
"""
sample = self.specHandler.mapSpecToWavenumbers(sample)
self.logger.debug(f'about to subtract background: spec: {sample.shape}, background: {bground.shape}')
sample[:, 1] = -np.log((sample[:, 1] / bground[:, 1]) * 100)
return sample
def _getAptID_toSpecID(self, specID: int):
aptID: int = -1
for aptGroup in self.aptGroups:
if specID in aptGroup.specIndices:
aptID = aptGroup.index
break
assert aptID != -1, f'Aperture for Spectrum index {specID} was not found.'
return aptID
def _groupApertures(self, apertures: list, binMargin: float = 0.5) -> list:
"""
:param apertures: list of FTIRAperture objects to process
:param binMargin: relative size difference factor for grouping of apertures. 0.1 => grouped apertures can
have a 10 % difference in area
:returns list of groupedApertures with all relevant information:
"""
aptSizes: list = [apt.width*apt.height for apt in apertures]
sortedIndices: np.ndarray = np.argsort(aptSizes)
groupedApertures: list = []
curSize, maxUpperSize, curBinIndex = 0, 0, 0
for index in sortedIndices:
curSize = aptSizes[index]
if index == sortedIndices[0]:
maxUpperSize = curSize + binMargin * curSize
if curSize > maxUpperSize:
maxUpperSize = curSize + binMargin * curSize
curBinIndex += 1
if len(groupedApertures) <= curBinIndex:
groupedApertures.append(ApertureGroup(curBinIndex))
groupedApertures[curBinIndex].areas.append(curSize)
groupedApertures[curBinIndex].specIndices.append(index)
return groupedApertures
if __name__ == '__main__':
import logging
import os
testlogger: logging.Logger = logging.Logger('TestLogger')
testlogger.addHandler(logging.StreamHandler())
testlogger.setLevel(logging.DEBUG)
ftirCom: ThermoFTIRCom = ThermoFTIRCom(testlogger)
ftirCom.connect()
specs: np.ndarray = np.load(r'C:\Users\LabUser\Documents\Josef\TestMeasurements\5\spectra.npy')
try:
ftirCom.evaluateSpectra(specs)
except:
print('failed')
ftirCom.disconnect()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment