Commit b5b8af90 authored by Josef Brandt's avatar Josef Brandt

Fix Cyclic import in instrumentConfig

Plus fixes in unittests
parent 9805e837
...@@ -24,125 +24,15 @@ import numpy as np ...@@ -24,125 +24,15 @@ import numpy as np
from multiprocessing import Process, Queue, Event from multiprocessing import Process, Queue, Event
import queue import queue
import os import os
import logging from typing import TYPE_CHECKING
import logging.handlers
from typing import TYPE_CHECKING, List
from ..cythonModules import tsp
from .uielements import TimeEstimateProgressbar from .uielements import TimeEstimateProgressbar
from ..gepardlogging import setDefaultLoggingConfig from ..specScan import scan, getShortestPath
from ..analysis.particleCharacterization import FTIRAperture from ..analysis.particleCharacterization import FTIRAperture
from ..helperfunctions import getInstrumentControl
if TYPE_CHECKING: if TYPE_CHECKING:
from ..sampleview import SampleView from ..sampleview import SampleView
def reorder(_points, N=20):
"""
Finds an efficient reordering of scan points in meandering horizontal
stripes.
Parameters
----------
_points : list of scan points in 2D
N : integer, optional
The number of horizontal stripes in which the points should be aranged.
The default is 20.
Returns
-------
newind : index array
The new index array to reorder the scan points for efficient travel
between points.
"""
y0, y1 = _points[:, 1].min(), _points[:, 1].max()
y = np.linspace(y0, y1+.1, N+1)
allind = np.arange(_points.shape[0])
newind = []
for i, yi in enumerate(y[:-1]):
yi1 = y[i+1]
indy = allind[(_points[:,1] >= yi) & (_points[:,1] < yi1)]
p = _points[indy, :]
indx = p[:, 0].argsort()
if i%2 == 1:
newind.append(indy[indx])
else:
newind.append(indy[indx[::-1]])
newind = np.concatenate(newind, axis=0)
assert np.unique(newind).shape[0] == allind.shape[0]
return newind
def getShortestPath(_points: np.ndarray) -> List[int]:
"""
:param _points: (N, 2) shape array of x, y coordinates
:returns: list of indices of shortest path to visit all points
"""
numPoints: int = _points.shape[0]
shortestPath: List[int] = list(np.arange(numPoints, dtype=np.int))
if numPoints >= 4: # the tsp needs at least four points (swap of two pairs of points concurrently)
lmin = None
for i in range(20, 41):
c = reorder(_points, i)
l = np.sum(np.sqrt(np.sum(np.diff(_points[c, :], axis=0) ** 2, axis=1)))
if lmin is None or l < lmin:
lmin = l
cmin = c
if numPoints < 20000:
shortestPath, T = tsp.tspcomp(np.double(_points), np.int32(cmin))
# assert np.all(np.sort(shortestPath) == np.arange(numPoints), dtype=np.int32)
assert np.array_equal(np.sort(shortestPath), np.arange(numPoints, dtype=np.int32))
return list(shortestPath)
def scan(specScanSettings, positions, controlclass, dataqueue, stopevent,
logpath=''):
if logpath != '':
logger = logging.getLogger('RamanScanLogger')
logger.addHandler(
logging.handlers.RotatingFileHandler(
logpath, maxBytes=5 * (1 << 20), backupCount=10)
)
setDefaultLoggingConfig(logger)
try:
instrctrl = getInstrumentControl(controlclass, logger)
instrctrl.connect()
if instrctrl.name == 'ThermoFTIRCom':
specScanSettings['Apertures'] = positions
instrctrl.initiateMeasurement(specScanSettings)
logger.info(instrctrl.name)
for i, p in enumerate(positions):
if not instrctrl.name == 'ThermoFTIRCom':
x, y, z = p
else:
x, y, z = p.centerX, p.centerY, p.centerZ
width, height, angle = p.width, p.height, p.angle
logger.info(f'{width}, {height}, {angle}')
instrctrl.setAperture(width, height, angle)
logger.info(f"position: {x}, {y}, {z}")
instrctrl.moveToAbsolutePosition(x, y, z)
logger.info("move done")
instrctrl.triggerMeasurement(i)
logger.info("trigger done")
if stopevent.is_set():
instrctrl.disconnect()
return
dataqueue.put(i)
instrctrl.finishMeasurement()
instrctrl.disconnect()
except:
logger.exception('Fatal error in ramanscan')
from ..errors import showErrorMessageAsWidget
showErrorMessageAsWidget('See ramanscanlog in project directory for information')
class SpecScanUI(QtWidgets.QWidget): class SpecScanUI(QtWidgets.QWidget):
imageUpdate = QtCore.pyqtSignal(str, name='imageUpdate') #str = 'df' (= darkfield) or 'bf' (=bright field) imageUpdate = QtCore.pyqtSignal(str, name='imageUpdate') #str = 'df' (= darkfield) or 'bf' (=bright field)
ramanscanUpdate = QtCore.pyqtSignal() ramanscanUpdate = QtCore.pyqtSignal()
......
...@@ -32,8 +32,7 @@ from socket import gethostname ...@@ -32,8 +32,7 @@ from socket import gethostname
try: # when running the witectesting, the paths have to be differently, as it is in the same directory as the other raman com modules try: # when running the witectesting, the paths have to be differently, as it is in the same directory as the other raman com modules
from .instrumentComBase import InstrumentComBase from .instrumentComBase import InstrumentComBase
# from .instrumentConfig import SpecScanParameter from ..specScan import SpecScanParameter
from . import instrumentConfig as ic
from ..errors import showErrorMessageAsWidget from ..errors import showErrorMessageAsWidget
except ImportError: except ImportError:
from instrumentComBase import InstrumentComBase from instrumentComBase import InstrumentComBase
...@@ -98,8 +97,8 @@ class WITecCOM(InstrumentComBase): ...@@ -98,8 +97,8 @@ class WITecCOM(InstrumentComBase):
CLSID = "{C45E77CE-3D66-489A-B5E2-159F443BD1AA}" CLSID = "{C45E77CE-3D66-489A-B5E2-159F443BD1AA}"
magn = 20 magn = 20
specScanParameters = [ic.SpecScanParameter('IntegrationTime (s)', 'double', default=0.5, minVal=0.01, maxVal=100), specScanParameters = [SpecScanParameter('IntegrationTime (s)', 'double', default=0.5, minVal=0.01, maxVal=100),
ic.SpecScanParameter('Accumulations', 'int', default=5, minVal=1, maxVal=100)] SpecScanParameter('Accumulations', 'int', default=5, minVal=1, maxVal=100)]
def __init__(self, logger, hostname=None): def __init__(self, logger, hostname=None):
super().__init__() super().__init__()
...@@ -260,8 +259,8 @@ class WITecCOM(InstrumentComBase): ...@@ -260,8 +259,8 @@ class WITecCOM(InstrumentComBase):
self.advSpec: SpectraHandler = SpectraHandler(self) self.advSpec: SpectraHandler = SpectraHandler(self)
if 'Autofocus' not in [param.name for param in self.specScanParameters]: if 'Autofocus' not in [param.name for param in self.specScanParameters]:
self.specScanParameters.append(ic.SpecScanParameter('Autofocus', 'checkBox', default=False)) self.specScanParameters.append(SpecScanParameter('Autofocus', 'checkBox', default=False))
self.specScanParameters.append(ic.SpecScanParameter('Spectra Batch Size', 'int', default=1000, minVal=1, maxVal=1e6)) self.specScanParameters.append(SpecScanParameter('Spectra Batch Size', 'int', default=1000, minVal=1, maxVal=1e6))
@comErrorRepeater @comErrorRepeater
def getBrightness(self): def getBrightness(self):
......
...@@ -21,7 +21,7 @@ If not, see <https://www.gnu.org/licenses/>. ...@@ -21,7 +21,7 @@ If not, see <https://www.gnu.org/licenses/>.
import os import os
import configparser import configparser
__all__ = ["InstrumentControl", "defaultPath", "simulatedRaman", "SpecScanParameter"] __all__ = ["InstrumentControl", "defaultPath", "simulatedRaman"]
defaultPath = os.path.dirname(os.path.split(__file__)[0]) defaultPath = os.path.dirname(os.path.split(__file__)[0])
...@@ -32,36 +32,6 @@ interface = "SIMULATED_RAMAN_CONTROL" ...@@ -32,36 +32,6 @@ interface = "SIMULATED_RAMAN_CONTROL"
videoImgScale = 1.0 videoImgScale = 1.0
class SpecScanParameter(object):
def __init__(self, name, dtype, default=None, minVal=None, maxVal=None, valList=None, openFileType=None):
self.name = name
self.dtype = dtype
self.value = default
self.minVal = minVal
self.maxVal = maxVal
self.valList = valList
self.openFileType = openFileType
if not self.hasValidType():
print('erroreneous type in setting parameter:', self.dtype)
def hasValidType(self):
if self.dtype in ['int', 'double', 'checkBox', 'combobox', 'selectBtn']:
return True
else:
return False
def value_of(self, obj):
if self.dtype in ['int', 'double']:
return obj.value()
elif self.dtype == 'checkBox':
return obj.isChecked()
elif self.dtype == 'combobox':
return obj.currentText()
elif self.dtype == 'selectBtn':
return obj.text()
try: try:
defaultPath = config["Defaults"]["file_path"] defaultPath = config["Defaults"]["file_path"]
except KeyError: except KeyError:
......
...@@ -24,14 +24,16 @@ Simualted Raman interface module for testing without actual raman system connect ...@@ -24,14 +24,16 @@ Simualted Raman interface module for testing without actual raman system connect
import sys import sys
stdout = sys.stdout stdout = sys.stdout
from time import sleep from time import sleep
from .simulatedStage import SimulatedStage
from ..instrumentComBase import InstrumentComBase from ..instrumentComBase import InstrumentComBase
from ...helperfunctions import cv2imwrite_fix from ...helperfunctions import cv2imwrite_fix
from .simulatedStage import SimulatedStage from ...specScan import SpecScanParameter
class SimulatedRaman(InstrumentComBase): class SimulatedRaman(InstrumentComBase):
magn = 20 magn = 20
specScanParameters = {} specScanParameters = [SpecScanParameter('IntegrationTime (s)', 'double', default=0.5, minVal=0.01, maxVal=100),
SpecScanParameter('Accumulations', 'int', default=5, minVal=1, maxVal=100)]
simulatedInterface = True simulatedInterface = True
def __init__(self, logger, ui: bool = False): def __init__(self, logger, ui: bool = False):
......
"""
GEPARD - Gepard-Enabled PARticle Detection
Copyright (C) 2018 Lars Bittrich and Josef Brandt, Leibniz-Institut für
Polymerforschung Dresden e. V. <bittrich-lars@ipfdd.de>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program, see COPYING.
If not, see <https://www.gnu.org/licenses/>.
Simualted Raman interface module for testing without actual raman system connected
"""
from typing import List
import logging
import logging.handlers
import numpy as np
from .cythonModules import tsp
from .gepardlogging import setDefaultLoggingConfig
from .helperfunctions import getInstrumentControl
class SpecScanParameter(object):
def __init__(self, name, dtype, default=None, minVal=None, maxVal=None, valList=None, openFileType=None):
self.name = name
self.dtype = dtype
self.value = default
self.minVal = minVal
self.maxVal = maxVal
self.valList = valList
self.openFileType = openFileType
if not self.hasValidType():
print('erroreneous type in setting parameter:', self.dtype)
def hasValidType(self):
if self.dtype in ['int', 'double', 'checkBox', 'combobox', 'selectBtn']:
return True
else:
return False
def value_of(self, obj):
if self.dtype in ['int', 'double']:
return obj.value()
elif self.dtype == 'checkBox':
return obj.isChecked()
elif self.dtype == 'combobox':
return obj.currentText()
elif self.dtype == 'selectBtn':
return obj.text()
def reorder(_points, N=20):
"""
Finds an efficient reordering of scan points in meandering horizontal
stripes.
Parameters
----------
_points : list of scan points in 2D
N : integer, optional
The number of horizontal stripes in which the points should be aranged.
The default is 20.
Returns
-------
newind : index array
The new index array to reorder the scan points for efficient travel
between points.
"""
y0, y1 = _points[:, 1].min(), _points[:, 1].max()
y = np.linspace(y0, y1 + .1, N + 1)
allind = np.arange(_points.shape[0])
newind = []
for i, yi in enumerate(y[:-1]):
yi1 = y[i + 1]
indy = allind[(_points[:, 1] >= yi) & (_points[:, 1] < yi1)]
p = _points[indy, :]
indx = p[:, 0].argsort()
if i % 2 == 1:
newind.append(indy[indx])
else:
newind.append(indy[indx[::-1]])
newind = np.concatenate(newind, axis=0)
assert np.unique(newind).shape[0] == allind.shape[0]
return newind
def getShortestPath(_points: np.ndarray) -> List[int]:
"""
:param _points: (N, 2) shape array of x, y coordinates
:returns: list of indices of shortest path to visit all points
"""
print('SHORTEST PATH HERE')
numPoints: int = _points.shape[0]
shortestPath: List[int] = list(np.arange(numPoints, dtype=np.int))
if numPoints >= 4: # the tsp needs at least four points (swap of two pairs of points concurrently)
lmin = None
for i in range(20, 41):
c = reorder(_points, i)
l = np.sum(np.sqrt(np.sum(np.diff(_points[c, :], axis=0) ** 2, axis=1)))
if lmin is None or l < lmin:
lmin = l
cmin = c
if numPoints < 20000:
shortestPath, T = tsp.tspcomp(np.double(_points), np.int32(cmin))
# assert np.all(np.sort(shortestPath) == np.arange(numPoints), dtype=np.int32)
assert np.array_equal(np.sort(shortestPath), np.arange(numPoints, dtype=np.int32))
return list(shortestPath)
def scan(specScanSettings, positions, controlclass, dataqueue, stopevent,
logpath=''):
if logpath != '':
logger = logging.getLogger('RamanScanLogger')
logger.addHandler(
logging.handlers.RotatingFileHandler(
logpath, maxBytes=5 * (1 << 20), backupCount=10)
)
setDefaultLoggingConfig(logger)
try:
instrctrl = getInstrumentControl(controlclass, logger)
instrctrl.connect()
if instrctrl.name == 'ThermoFTIRCom':
specScanSettings['Apertures'] = positions
instrctrl.initiateMeasurement(specScanSettings)
logger.info(instrctrl.name)
for i, p in enumerate(positions):
if not instrctrl.name == 'ThermoFTIRCom':
x, y, z = p
else:
x, y, z = p.centerX, p.centerY, p.centerZ
width, height, angle = p.width, p.height, p.angle
logger.info(f'{width}, {height}, {angle}')
instrctrl.setAperture(width, height, angle)
logger.info(f"position: {x}, {y}, {z}")
instrctrl.moveToAbsolutePosition(x, y, z)
logger.info("move done")
instrctrl.triggerMeasurement(i)
logger.info("trigger done")
if stopevent.is_set():
instrctrl.disconnect()
return
dataqueue.put(i)
instrctrl.finishMeasurement()
instrctrl.disconnect()
except:
logger.exception('Fatal error in ramanscan')
from .errors import showErrorMessageAsWidget
showErrorMessageAsWidget('See ramanscanlog in project directory for information')
...@@ -6,18 +6,20 @@ imports work..). Then they have to be added to the allTests-List direclty below. ...@@ -6,18 +6,20 @@ imports work..). Then they have to be added to the allTests-List direclty below.
import unittest import unittest
import sys import sys
sys.path.append(r'C:\Users\xbrjos\Desktop\Python') import os
scriptFolder = os.path.realpath(__file__)
gepardParentFolder = os.path.dirname(os.path.dirname(os.path.dirname(scriptFolder))) # three levels up..
sys.path.append(gepardParentFolder)
from gepard.unittests.test_coordTransform import TestCoordinateTransform from gepard.unittests.test_coordTransform import TestCoordinateTransform
from gepard.unittests.test_FTIRCom import TestFTIRCom
from gepard.unittests.test_getShortestPath import TestTSP from gepard.unittests.test_getShortestPath import TestTSP
from gepard.unittests.test_simulatedStage import Test_FakeCamera, TestSimulatedStage from gepard.unittests.test_simulatedStage import Test_FakeCamera, TestSimulatedStage
from gepard.unittests.test_specscan import TestSpecScan
from gepard.unittests.test_specscanui import TestSpecScanUI from gepard.unittests.test_specscanui import TestSpecScanUI
from gepard.unittests.test_spectraHandler import TestSpecHandler from gepard.unittests.test_spectraHandler import TestSpecHandler
from gepard.unittests.test_workModes import TestWorkModes from gepard.unittests.test_workModes import TestWorkModes
allTests: list = [TestCoordinateTransform, TestFTIRCom, TestTSP, Test_FakeCamera, TestSimulatedStage, allTests: list = [TestCoordinateTransform, TestTSP, Test_FakeCamera, TestSimulatedStage,
TestSpecScan, TestSpecScanUI, TestSpecHandler, TestWorkModes] TestSpecScanUI, TestSpecHandler, TestWorkModes]
def makeTestSuiteRunnable(suite: unittest.TestSuite): def makeTestSuiteRunnable(suite: unittest.TestSuite):
......
import unittest
import tempfile
import numpy as np
import logging
import os
import csv
from ..instrumentcom.FTIRCom import FTIRCom
class TestFTIRCom(unittest.TestCase):
def setUp(self) -> None:
logger = logging.getLogger('TestLogger')
FTIRCom.simulatedInterface = True
self.ftirCom = FTIRCom(logger)
def test_export_spec_as_csv(self) -> None:
testSpec: np.ndarray = np.zeros((10, 2))
testSpec[:, 1] = np.arange(10)
with tempfile.TemporaryDirectory() as tmpdir:
fname: str = os.path.join(tmpdir, 'testSpec.csv')
self.ftirCom._export_spec_to_csv(fname, testSpec)
self.assertTrue(os.path.exists(fname))
with open(fname, 'r') as csvfile:
index: int = 0
for index, row in enumerate(csv.reader(csvfile, delimiter=';')):
self.assertEqual(len(row), 2)
self.assertEqual(float(row[0]), testSpec[index, 0])
self.assertEqual(float(row[1]), testSpec[index, 1])
self.assertEqual(index, testSpec.shape[0]-1)
# # def test_evaluate_spectra(self) -> None:
# # mockSearch: MagicMock = MagicMock(name='search_library')
# # mockSearch.return_value = {75.0: 'PET',
# # 25.0: 'PTFE',
# # 3.0: 'unknown'}
# # self.ftirCom.dde.search_library = mockSearch
# #
# # mockGetSpec: MagicMock = MagicMock(name='getAllSpectra')
# # mockGetSpec.return_value = np.zeros((10, 2))
# # self.ftirCom.specHandler.getAllSpectra = mockGetSpec
# #
# # partContainer: ParticleContainer = ParticleContainer()
# # partContainer.addEmptyMeasurement()
# # partContainer.setMeasurementScanIndex(0, 0)
# #
# # self.ftirCom.evaluateSpectra(partContainer)
import unittest import unittest
import numpy as np import numpy as np
from ..gui.specscanui import getShortestPath from ..specScan import getShortestPath
class TestTSP(unittest.TestCase): class TestTSP(unittest.TestCase):
def test_tsp(self): def test_shortestPath(self):
for numPoints in range(10): for numPoints in range(10):
if numPoints > 0: if numPoints > 0:
points: np.ndarray = np.array([[0, i] for i in range(numPoints)], dtype=np.float) points: np.ndarray = np.array([[0, i] for i in range(numPoints)], dtype=np.float)
......
import logging import logging
from typing import TYPE_CHECKING
from ..dataset import DataSet from ..dataset import DataSet
from ..__main__ import GEPARDMainWindow from ..__main__ import GEPARDMainWindow
if TYPE_CHECKING:
from ..sampleview import SampleView
def getDefaultLogger() -> logging.Logger: def getDefaultLogger() -> logging.Logger:
...@@ -23,3 +26,8 @@ def getDefaultMainWin() -> GEPARDMainWindow: ...@@ -23,3 +26,8 @@ def getDefaultMainWin() -> GEPARDMainWindow:
gepard.view.pyramid.fromDataset(gepard.view.dataset) gepard.view.pyramid.fromDataset(gepard.view.dataset)
gepard.view._setUpToDataset() gepard.view._setUpToDataset()
return gepard return gepard
def getDefaultSampleview() -> 'SampleView':
gepard: GEPARDMainWindow = getDefaultMainWin()
return gepard.view
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment