Commit d6102fea authored by Josef Brandt's avatar Josef Brandt

AdvancedWITec

parent ab5fab1e
......@@ -21,7 +21,7 @@ If not, see <https://www.gnu.org/licenses/>.
import os
import sys
import cv2
try:
import pythoncom
import win32com.client
......@@ -32,8 +32,7 @@ from time import sleep, time
try: #when running the witectesting, the paths have to be differently, as it is in the same directory as the other raman com modules
from .ramanbase import RamanBase
from .configRaman import RamanSettingParam
from ..helperfunctions import cv2imread_fix, cv2imwrite_fix
except:
except ModuleNotFoundError:
from ramanbase import RamanBase
from configRaman import RamanSettingParam
from socket import gethostname
......@@ -134,13 +133,11 @@ class WITecCOM(RamanBase):
self.TimeSeriesSlowNumMeasurementsInterface = IBUCSCore.GetSubSystemDefaultInterface("UserParameters|SequencerTimeSeriesSlow|AmountOfMeasurements")
self.TimeSeriesSlowNumMeasurementsMan = win32com.client.CastTo(self.TimeSeriesSlowNumMeasurementsInterface, 'IBUCSInt')
# self.TimeSeriesSlowNameInterface = IBUCSCore.GetSubSystemDefaultInterface("UserParameters|SequencerTimeSeriesSlow|Naming|DataName")
self.TimeSeriesSlowNameMan = win32com.client.CastTo(self.TimeSeriesSlowNameInterface, 'IBUCSString')
self.TimeSeriesSlowNumAccumulationsInterface = IBUCSCore.GetSubSystemDefaultInterface("UserParameters|SequencerTimeSeriesSlow|SpectrumAcquisition|Accumulations")
self.TimeSeriesSlowNumAccumulationsMan = win32com.client.CastTo(self.TimeSeriesSlowNumAccumulationsInterface, 'IBUCSInt')
self.TimeSeriesSlowIntTimeInterface = IBUCSCore.GetSubSystemDefaultInterface("UserParameters|SequencerTimeSeriesSlow|SpectrumAcquisition|IntegrationTime")
self.TimeSeriesSlowIntTimeMan = win32com.client.CastTo(self.TimeSeriesSlowIntTimeInterface, 'IBUCSFloat')
# self.TimeSeriesSlowNumInterface = IBUCSCore.GetSubSystemDefaultInterface("UserParameters|SequencerTimeSeriesSlow|Naming|DataNumber")
self.TimeSeriesSlowNumMan = win32com.client.CastTo(self.TimeSeriesSlowNumInterface, 'IBUCSInt')
self.TimeSeriesSlowModeInterface = IBUCSCore.GetSubSystemDefaultInterface("UserParameters|SequencerTimeSeriesSlow|MeasurementMode")
self.TimeSeriesSlowModeMan = win32com.client.CastTo(self.TimeSeriesSlowModeInterface, 'IBUCSEnum')
......@@ -167,9 +164,47 @@ class WITecCOM(RamanBase):
self.BrightnessInterface = IBUCSCore.GetSubSystemDefaultInterface("MultiComm|MicroscopeControl|WhiteLight|Top|BrightnessPercentage")
self.BrightnessMan = win32com.client.CastTo(self.BrightnessInterface, 'IBUCSFloat')
try:
self.getAdvancedCOMFeatures(IBUCSCore)
self.advancedInterface = True
print('using advanced WITec features!')
except pythoncom.com_error:
print('not using advanced WITec features!')
self.advancedInterface = False
self.connected = True
return True
def getAdvancedCOMFeatures(self, IBUCSCore):
self.AutoFocusInterface = IBUCSCore.GetSubSystemDefaultInterface("UserParameters|SequencerAutoFocus|Start")
self.AutoFocusMan = win32com.client.CastTo(self.AutoFocusInterface, 'IBUCSTrigger')
self.SilentSpecTextInterface = IBUCSCore.GetSubSystemDefaultInterface("MultiComm|SequencerSingleSpectrum|SilentSpectrumAsText")
self.SilentSpecText = win32com.client.CastTo(self.SilentSpecTextInterface, 'IBUCSString')
self.SilentSpecDoInterface = IBUCSCore.GetSubSystemDefaultInterface("MultiComm|SequencerSingleSpectrum|DoSilentSpectrum")
self.DoSilentSpectrum = win32com.client.CastTo(self.SilentSpecDoInterface, 'IBUCSBool')
self.SingleSpecIntegrationTime = IBUCSCore.GetSubSystemDefaultInterface("UserParameters|SequencerSingleSpectrum|IntegrationTime")
self.SingleSpecIntegrationTimeMan = win32com.client.CastTo(self.SingleSpecIntegrationTime, 'IBUCSFloat')
self.SingleSpecNumAccs = IBUCSCore.GetSubSystemDefaultInterface("UserParameters|SequencerSingleSpectrum|NrOfAccumulations")
self.SingleSpecNumAccsMan = win32com.client.CastTo(self.SingleSpecNumAccs, 'IBUCSInt')
self.BeamPathState = IBUCSCore.GetSubSystemDefaultInterface("MultiComm|MicroscopeControl|BeamPath|State")
self.BeamPathSetRamanState = IBUCSCore.GetSubSystemDefaultInterface("MultiComm|MicroscopeControl|BeamPath|SetStateRaman")
self.MicroscopeIdle = IBUCSCore.GetSubSystemDefaultInterface("UserParameters|MicroscopeStateOnIdle")
self.BeamPathSetVideoStateInterface =IBUCSCore.GetSubSystemDefaultInterface("MultiComm|MicroscopeControl|BeamPath|SetStateVideo")
self.BeamPathSetVideoState = win32com.client.CastTo(self.BeamPathSetVideoStateInterface, 'IBUCSTrigger')
try:
from .advancedWITec import AdvancedWITecSpectra
except ModuleNotFoundError:
from advancedWITec import AdvancedWITecSpectra
self.advSpec = AdvancedWITecSpectra()
self.ramanParameters.append(RamanSettingParam('Autofocus', 'checkBox', default=True))
def getBrightness(self):
assert self.connected
return self.BrightnessMan.GetValue()
......@@ -276,6 +311,7 @@ class WITecCOM(RamanBase):
self.ImageNameMan.SetValue(fname)
self.ImageSaveMan.OperateTrigger()
sleep(.1)
def getImageDimensions(self, mode = 'df'):
""" Get the image width and height in um and the orientation angle in degrees.
......@@ -297,6 +333,27 @@ class WITecCOM(RamanBase):
def initiateMeasurement(self, ramanSettings):
assert self.connected
if not self.advancedInterface:
self.initiateTimeSeriesMeasurement(ramanSettings)
else:
self.initateSilentSpectrumAcquisition(ramanSettings)
def triggerMeasurement(self, num):
if not self.advancedInterface:
self.triggerTimeSeriesMeasurement(num)
else:
self.doSpectralAutoFocus()
self.acquireSilentSpectrum(num)
def finishMeasurement(self):
if self.advancedInterface:
self.advSpec.createSummarizedSpecFiles()
state = self.BeamPathState.GetValue()
if state == 'Raman':
self.MicroscopeIdle.SetValue('BeamPath|SetStateVideo')
self.BeamPathSetVideoState.OperateTrigger()
def initiateTimeSeriesMeasurement(self, ramanSettings):
self.timeseries = ramanSettings['numPoints']
self.TimeSeriesSlowNameMan.SetValue(ramanSettings['filename'])
self.TimeSeriesSlowNumMeasurementsMan.SetValue(ramanSettings['numPoints'])
......@@ -305,7 +362,7 @@ class WITecCOM(RamanBase):
self.TimeSeriesSlowModeMan.SetValueNumeric(0)
self.TimeSeriesSlowNumMan.SetValue(0)
self.TimeSeriesSlowStartMan.OperateTrigger()
sleep(0.1)
t1 = time()
while True:
......@@ -318,8 +375,18 @@ class WITecCOM(RamanBase):
if time()-t1>3.:
print("Waiting for measurement ready...")
t1 = time()
def triggerMeasurement(self, num):
def initateSilentSpectrumAcquisition(self, ramanSettings):
self.advSpec.setDatasetPath(ramanSettings['path'])
self.advSpec.createTmpSpecFolder()
state = self.BeamPathState.GetValue()
if state == 'Video':
self.MicroscopeIdle.SetValue('BeamPath|SetStateRaman')
self.BeamPathSetRamanState.OperateTrigger()
self.SingleSpecIntegrationTimeMan.SetValue(ramanSettings['IntegrationTime (s)'])
self.SingleSpecNumAccsMan.SetValue(ramanSettings['Accumulations'])
def triggerTimeSeriesMeasurement(self, num):
assert self.timeseries
self.TimeSeriesSlowNextMan.OperateTrigger()
# Wait until sequencer has finished
......@@ -345,6 +412,26 @@ class WITecCOM(RamanBase):
t1 = time()
if num==self.timeseries-1:
self.timeseries = False
def finishMeasurement(self):
pass
\ No newline at end of file
def doSpectralAutoFocus(self):
assert self.advancedInterface
self.AutoFocusMan.OperateTrigger()
self.waitForSequencerToFinish()
def acquireSilentSpectrum(self, num):
assert self.advancedInterface
self.DoSilentSpectrum.SetValue(True)
self.SequencerStartTrigger.OperateTrigger()
self.waitForSequencerToFinish()
out_spec = self.SilentSpecText.GetValue()
out_spec = out_spec.split('\n')
self.advSpec.registerNewSpectrum(out_spec, num)
return out_spec
def waitForSequencerToFinish(self):
while True:
self.SequencerBusyStatus.Update()
Busy = self.SequencerBusyStatus.GetSingleValueAsInt()[1]
if not Busy:
break
\ No newline at end of file
"""
GEPARD - Gepard-Enabled PARticle Detection
Copyright (C) 2018 Lars Bittrich and Josef Brandt, Leibniz-Institut für
Polymerforschung Dresden e. V. <bittrich-lars@ipfdd.de>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program, see COPYING.
If not, see <https://www.gnu.org/licenses/>.
"""
import os
import numpy as np
class AdvancedWITecSpectra(object):
"""
Handles Spectra formatting and storage when using the advanced "silent spectrum" option in the WITec COM interface
:return:
"""
def __init__(self):
super(AdvancedWITecSpectra, self).__init__()
self.dsetpath = None
self.tmpspecpath = None
self.curSpecIndex = None
self.excitWavel = None
def setDatasetPath(self, path):
self.dsetpath = path
def createTmpSpecFolder(self):
assert self.dsetpath is not None
self.tmpspecpath = os.path.join(self.dsetpath, 'spectra')
if not os.path.exists(self.tmpspecpath):
os.mkdir(self.tmpspecpath)
def registerNewSpectrum(self, specString, specIndex):
wavenumbers, averaged_counts = self.deassembleSpecString(specString)
if specIndex == 0:
fname = os.path.join(self.tmpspecpath, 'Wavenumbers.npy')
np.save(fname, wavenumbers)
fname = os.path.join(self.tmpspecpath, f'Spectrum ({specIndex}).npy')
np.save(fname, averaged_counts)
self.curSpecIndex = specIndex
def createSummarizedSpecFiles(self):
allSpectra = self.getAllSpectra()
allspecfname = os.path.join(self.dsetpath, 'spectra.npy')
np.save(allspecfname, allSpectra)
self.createTrueMatchTxt(allSpectra, self.excitWavel)
def deassembleSpecString(self, specString):
keywordLines = self.getKeyWordLines(specString)
try:
specSize = self.getSpecSize(specString, keywordLines['SpectrumSize'][0])
except:
print(keywordLines)
raise
wavenumbers = self.getWavenumbers(specString, keywordLines['[XData]'][0], specSize)
xDataKind = self.getXDataKind(specString, keywordLines['XDataKind'][0])
self.excitWavel = self.getExcitationWavelength(specString, keywordLines['ExcitationWavelength'][0])
if xDataKind == 'nm':
wavenumbers = self.convertWavenumbersFrom_nm_to_Percm(wavenumbers, self.excitWavel)
else:
print('warning, unexpected xDataKind:', xDataKind)
print('please check how to deal with it!!!')
assert False
averaged_counts = self.getAveragedSpectra(specString, keywordLines['SpectrumData'], specSize)
return wavenumbers, averaged_counts
def getKeyWordLines(self, specString):
keywordLines = {'[WITEC_TRUEMATCH_ASCII_HEADER]': [],
'[XData]': [],
'ExcitationWavelength': [],
'SpectrumSize': [],
'XDataKind': [],
'SpectrumHeader': [],
'SampleMetaData': [],
'SpectrumData': []}
for index, line in enumerate(specString):
for key in keywordLines.keys():
if line.find(key) != -1:
keywordLines[key].append(index)
return keywordLines
def getSpecSize(self, specString, specSizeIndex):
line = specString[specSizeIndex]
specSize = [int(s) for s in line.split() if self.isNumber(s)]
assert len(specSize) == 1
return specSize[0]
def getExcitationWavelength(self, specString, excitWavenumIndex):
line = specString[excitWavenumIndex]
excitWavel = [float(s) for s in line.split() if self.isNumber(s)]
assert len(excitWavel) == 1
return excitWavel[0]
def getXDataKind(self, specString, xDataKindIndex):
line = specString[xDataKindIndex]
return line.split()[-1]
def getWavenumbers(self, specString, startXDataIndex, specSize):
wavenumbers = []
curIndex = startXDataIndex+1
curLine = specString[curIndex]
while self.isNumber(curLine):
wavenumbers.append(float(curLine))
curIndex += 1
curLine = specString[curIndex]
assert len(wavenumbers) == specSize
return wavenumbers
def convertWavenumbersFrom_nm_to_Percm(self, wavenumbers, excit_nm):
newWavenumbers = []
for abs_nm in wavenumbers:
raman_shift = 1E7/excit_nm - 1E7/abs_nm
newWavenumbers.append(raman_shift)
return newWavenumbers
def getAveragedSpectra(self, specString, startIndices, specSize):
startIndices = [i+1 for i in startIndices] #the spectrum starts one line AFTER the SpectrumData-Tag
spectrum = []
for index in range(specSize):
curSlice = [float(specString[index + startIndex]) for startIndex in startIndices]
spectrum.append(np.mean(curSlice))
return spectrum
def getAllSpectra(self):
numSpectra = self.curSpecIndex + 1
wavenumbers = np.load(os.path.join(self.tmpspecpath, 'Wavenumbers.npy'))
allSpectra = np.zeros((wavenumbers.shape[0], numSpectra+1))
allSpectra[:, 0] = wavenumbers
for i in range(numSpectra):
curSpecPath = os.path.join(self.tmpspecpath, f'Spectrum ({i}).npy')
allSpectra[:, i+1] = np.load(curSpecPath )
os.remove(curSpecPath)
return allSpectra
def createTrueMatchTxt(self, allSpectra, wavelength):
wavenumbers = allSpectra[:, 0]
spectra = allSpectra[:, 1:]
specSize = allSpectra.shape[0]
del allSpectra
outName = os.path.join(self.dsetpath, 'SpectraForTrueMatch.txt')
if os.path.exists(outName):
os.remove(outName)
with open(outName, 'w') as fp:
fp.write('[WITEC_TRUEMATCH_ASCII_HEADER]\n\r')
fp.write('Version = 2.0\n\r\n\r')
fp.write('[XData]\n\r')
for line in wavenumbers:
fp.write(str(line) + '\n\r')
for specIndex in range(spectra.shape[1]):
fp.write('\n\r')
fp.write('[SpectrumHeader]\n\r')
fp.write(f'Title = Spectrum {specIndex} \n\r')
fp.write(f'ExcitationWavelength = {wavelength}\n\r')
fp.write(f'SpectrumSize = {specSize}\n\r')
fp.write('XDataKind = 1/cm\n\r\n\r')
fp.write('[SampleMetaData]\n\r')
# fp.write(f'double Integration_Time = )
fp.write(f'int Spectrum_Number = {specIndex}\n\r\n\r')
fp.write('[SpectrumData]\n\r')
spec = spectra[:, specIndex]
for line in spec:
fp.write(str(line) + '\n\r')
def isNumber(self, string):
isNumber = False
try:
float(string)
isNumber = True
except ValueError:
pass
return isNumber
......@@ -77,7 +77,8 @@ def scan(ramanSettings, positions, controlclass, dataqueue, stopevent,
return
dataqueue.put(i)
ramanctrl.finishMeasurement()
ramanctrl.disconnect()
if fp is not None:
fp.close()
......@@ -171,7 +172,6 @@ class RamanScanUI(QtWidgets.QWidget):
QtWidgets.QMessageBox.Yes |
QtWidgets.QMessageBox.No, QtWidgets.QMessageBox.No)
if reply == QtWidgets.QMessageBox.Yes:
self.ramanctrl.finishMeasurement()
self.timer.stop()
self.processstopevent.set()
self.process.join()
......
......@@ -253,6 +253,7 @@ class Segmentation(object):
for label in range(1, n):
area = stats[label, cv2.CC_STAT_AREA]
if self.minparticlearea < area < maxArea:
print(f'processing {label} of {n} compontents')
up = stats[label, cv2.CC_STAT_TOP]
left = stats[label, cv2.CC_STAT_LEFT]
width = stats[label, cv2.CC_STAT_WIDTH]
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment