# -*- coding: utf-8 -*- """ GEPARD - Gepard-Enabled PARticle Detection Copyright (C) 2018 Lars Bittrich and Josef Brandt, Leibniz-Institut für Polymerforschung Dresden e. V. This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program, see COPYING. If not, see . """ import os import pickle import numpy as np import sys import cv2 from copy import copy from typing import List, Dict, Tuple, TYPE_CHECKING from .analysis.particleContainer import ParticleContainer from .legacyConvert import legacyConversion, currentVersion from .helperfunctions import cv2imwrite_fix, cv2imread_fix from .coordTransform import CoordTransfer if TYPE_CHECKING: from .coordTransform import TrayMarker, ImageMarker # for legacy pickle import the old module name dataset must be found # (no relative import) from . import dataset from . import analysis sys.modules['dataset'] = dataset sys.modules['analysis'] = analysis sys.modules['gepardevaluation.dataset'] = dataset sys.modules['gepardevaluation.analysis.particleContainer'] = analysis.particleContainer sys.modules['gepardevaluation.analysis.particleAndMeasurement'] = analysis.particleAndMeasurement def loadData(fname): retds = None with open(fname, "rb") as fp: ds = pickle.load(fp) ds.fname = fname ds.readin = True retds = DataSet(fname) retds.version = 0 retds.__dict__.update(ds.__dict__) retds.updatePath() if retds.version < currentVersion: legacyConversion(retds) elif retds.zvalimg == "saved": retds.loadZvalImg() retds.maxdim = None retds.afterLoad() return retds def saveData(dataset, fname): with open(fname, "wb") as fp: # zvalimg is rather large and thus it is saved separately in a tif file # only onces after its creation zvalimg = dataset.zvalimg if zvalimg is not None: dataset.zvalimg = "saved" dataset.beforeSave() pickle.dump(dataset, fp, protocol=-1) dataset.zvalimg = zvalimg dataset.afterSave() def arrayCompare(a1, a2): if a1.shape != a2.shape: return False if a1.dtype != np.float32 and a1.dtype != np.float64: return np.all(a1 == a2) ind = np.isnan(a1) if not np.any(ind): return np.all(a1 == a2) return np.all(a1[~ind] == a2[~ind]) def listCompare(l1, l2): if len(l1) != len(l2): return False for l1i, l2i in zip(l1, l2): if isinstance(l1i, np.ndarray): if not isinstance(l2i, np.ndarray) or not arrayCompare(l1i, l2i): return False elif isinstance(l1i, (list, tuple)): if not isinstance(l2i, (list, tuple)) or not listCompare(l1i, l2i): return False elif l1i != l2i and ((~np.isnan(l1i)) or (~np.isnan(l2i))): return False return True def recursiveDictCompare(d1, d2): for key in d1: if not key in d2: print("key missing in d2:", key, flush=True) return False a = d1[key] b = d2[key] print(key, type(a), type(b), flush=True) if isinstance(a, np.ndarray): if not isinstance(b, np.ndarray) or not arrayCompare(a, b): print("data is different!", a, b) return False elif isinstance(a, dict): if not isinstance(b, dict): print("data is different!", a, b) return False if not recursiveDictCompare(a, b): return False elif isinstance(a, (list, tuple)): if not isinstance(b, (list, tuple)) or not listCompare(a, b): print("data is different!", a, b) return False elif a != b: if (a is not None) and (b is not None): print("data is different!", a, b) return False return True class DataSet(object): uid: int = 0 @classmethod def getUID(cls): """ returns a unique number managed by the dataset. will be saved and restored on load :return int: """ cls.uid += 1 return cls.uid def __init__(self, fname, newProject=False): self.fname = fname self.path, self.name = None, None # parameters specifically for optical scan self.version = currentVersion self.lastpos = None # x, y of center of lower left corner tile of image self.maxdim = None # x0, y0, x1, y1. x0, y0 = lastpos, x1, y1 = center of upper right corner tile of image. Only used during optical scan. self.pixelscale_df = None # µm / pixel --> scale of DARK FIELD camera (used for image stitching) self.pixelscale_bf = None # µm / pixel of DARK FIELD camera (set to same as bright field, if both use the same camera) self.imagedim_bf = None # width, height, angle of BRIGHT FIELD camera self.imagedim_df = None # width, height, angle of DARK FIELD camera (set to same as bright field, if both use the same camera) self.imagescanMode = 'df' # was the fullimage acquired in dark- or brightfield? self.fitpoints: np.ndarray = np.array([]) # manually adjusted positions aquired to define the specimen geometry self.fitindices = [] # which of the five positions in the ui are already known self.boundary = [] # scan boundary computed by a circle around the fitpoints + manual adjustments self.grid = [] # scan grid positions for optical scan self.zpositions = np.array([0]) # z-positions for optical scan self.heightmapParams: Tuple[float, float, float] = None # convert x, y coordinate to z coordinate (see self.mapHeight) self.zvalimg = None self.importedWithPlaceholderZValImg: bool = False self.coordinatetransform: CoordTransfer = None self.signx = 1. self.signy = -1. self.trayMarkers: List['TrayMarker'] = [] self.imageMarkers: List['ImageMarker'] = [] # tiling parameters self.pyramidParams = None # navigation params (e.g. visited particles) self.particleNavigationParams = None # parameters specifically for raman scan self.pshift = None # shift of spectrometer scan position relative to image center self.seedpoints: np.ndarray = np.array([]) # shape Nx3 array of N points, each storing x, y and radius self.seeddeletepoints: np.ndarray = np.array([]) # shape Nx3 array of N points, each storing x, y and radius self.particleContainer = ParticleContainer() self.particleDetectionDone = False self.hqiThresholds: Dict[int, Tuple[float, float]] = {} # key: SpecSeriesIndex, Value: (minHQI, maxHQI) self.colorSeed = 'default' self._particleTypeGroups: Dict[str, List[str]] = {} # Key: Groupname, Value: List of Assignments belonging to the group self.resultsUploadedToSQL = [] self.readin = True # a value that is always set to True at loadData # and mark that the coordinate system might be changed in the meantime self.mode = "prepare" if newProject: self.fname = self.newProject(fname) self.updatePath() @property def opticalScanDone(self) -> bool: return os.path.exists(self.getZvalImageName()) @property def specscandone(self) -> bool: return len(self.particleContainer.completedSpecScans) > 0 @property def coordinateSystemConfigured(self) -> bool: hasPixelScale = None not in [self.pixelscale_df, self.pixelscale_bf] hasDims = self.imagedim_df is not None and self.imagedim_bf is not None hasPos = self.lastpos is not None and self.maxdim is not None return hasPixelScale and hasDims and hasPos def invalidateCoordinateSystem(self) -> None: self.pixelscale_bf = self.pixelscale_df = None self.imagedim_bf = self.imagedim_df = None self.lastpos = self.maxdim = None def deleteAllMeasurements(self) -> None: self.particleContainer.completedSpecScans = [] def __eq__(self, other): return recursiveDictCompare(self.__dict__, other.__dict__) def afterLoad(self): self.particleContainer.datasetParent = self self.updatePath() DataSet.uid = self.uid def beforeSave(self): self.particleContainer.datasetParent = None self.uid = DataSet.uid def afterSave(self): self.afterLoad() def getPyramidParams(self): return self.pyramidParams def setPyramidParams(self, pyramid_params): self.pyramidParams = pyramid_params def getParticleNavigationParams(self): return self.particleNavigationParams def setParticleNavigationParams(self, part_nav_params): self.particleNavigationParams = part_nav_params def getPixelScale(self, mode=None): if mode is None: mode = self.imagescanMode return (self.pixelscale_df if mode == "df" else self.pixelscale_bf) def saveZvalImg(self): if self.zvalimg is not None: cv2imwrite_fix(self.getZvalImageName(), self.zvalimg) self.zvalimg = "saved" def loadZvalImg(self): if os.path.exists(self.getZvalImageName()): self.zvalimg = cv2imread_fix(self.getZvalImageName(), cv2.IMREAD_GRAYSCALE) def getZval(self, pixelpos): assert self.zvalimg is not None if self.zvalimg == "saved": self.loadZvalImg() i, j = int(round(pixelpos[1])), int(round(pixelpos[0])) if i >= self.zvalimg.shape[0]: print('error in getZval:', self.zvalimg.shape, i, j) i = self.zvalimg.shape[0] - 1 if j >= self.zvalimg.shape[1]: print('error in getZval:', self.zvalimg.shape, i, j) j = self.zvalimg.shape[1] - 1 zp = self.zvalimg[i, j] z0, z1 = self.zpositions[0], self.zpositions[-1] z = zp / 255. * (z1 - z0) + z0 return z def mapHeight(self, x, y, force=False): if not force: assert not self.readin assert self.heightmapParams is not None z = self.heightmapParams[0] * x + self.heightmapParams[1] * y + self.heightmapParams[2] return z def calculateHeightmapParams(self, softwareZ: float, userZ: float) -> Tuple[np.ndarray, float]: """Calculates the heightmapPparamters from the given fitPoints, corrected by relative coordinate offsets. :param softwareZ: The current z-position, as indicated by the instrument software :param userZ: A current "user-z" position, ideally the same as software-z, or dependening on the instrument also a bit differently.. """ points: np.ndarray = np.float32(self.fitpoints) # convert z to software z, which is relative to current user z points[:, 2] += softwareZ - userZ A = np.ones((points.shape[0], 3)) A[:, :2] = points[:, :2] b = points[:, 2] sol = np.linalg.lstsq(A, b, rcond=None)[0] self.heightmapParams = sol error: float = sol[0] * points[:, 0] + sol[1] * points[:, 1] + sol[2] - points[:, 2] return points, error def calculateLastPosAndMaxDimFromGrid(self) -> None: grid = np.asarray(self.grid) p0 = [grid[:, 0].min(), grid[:, 1].max()] p1 = [grid[:, 0].max(), grid[:, 1].min()] self.lastpos = p0 self.maxdim = p0 + p1 def calculateImageDimsFromZImg(self) -> None: """ Calcualtes the Imagedims, as if the entire image was obtained in a single shot. """ self.imagedim_df = np.array([self.zvalimg.shape[1] * self.pixelscale_df, self.zvalimg.shape[0] * self.pixelscale_df, 0.0]) self.imagedim_bf = np.array([self.zvalimg.shape[1] * self.pixelscale_bf, self.zvalimg.shape[0] * self.pixelscale_bf, 0.0]) def setZPositions(self, zLevels: np.ndarray) -> None: """Sets the zPositions, i.e., what z-Level OVER the calculated height map position (see self.mapHeight) corresponds each pixel of the zlevel-image""" self.zpositions = zLevels def mapToPixel(self, p, mode='df', force=False): if not force: assert not self.readin p0 = copy(self.lastpos) if self.coordinatetransform is not None: z = 0. if len(p) < 3 else p[2] T, pc = self.coordinatetransform.rotMatrix, self.coordinatetransform.offset p = (np.dot(np.array([p[0], p[1], z]) - pc, T.T)) assert mode in ['df', 'bf'], f'mapToPixel mode: {mode} not understood' pixelscale: float = self.pixelscale_bf if mode == 'bf' else self.pixelscale_df p0[0] -= self.signx * self.imagedim_df[0] / 2 p0[1] -= self.signy * self.imagedim_df[1] / 2 x, y = self.signx * (p[0] - p0[0]) / pixelscale, self.signy * (p[1] - p0[1]) / pixelscale return x, y def mapToLength(self, pixelpos, mode='df', force=False, returnz=False): if not force: assert not self.readin if mode not in ['df', 'bf']: raise ValueError(f'mapToLength mode: {mode} not understood') p0 = copy(self.lastpos) pixelscale = self.pixelscale_df if mode == 'df' else self.pixelscale_bf imgdim = self.imagedim_df if mode == 'df' else self.imagedim_bf p0[0] -= self.signx * imgdim[0] / 2 p0[1] -= self.signy * imgdim[1] / 2 x, y = (p0[0] + self.signx * pixelpos[0] * pixelscale), (p0[1] + self.signy * pixelpos[1] * pixelscale) z = None if (returnz and self.zvalimg is not None) or self.coordinatetransform is not None: z = self.mapHeight(x, y, force) z += self.getZval(pixelpos) if self.coordinatetransform is not None: x, y, z = tuple(self.coordinatetransform.applyTransform(np.array([x, y, z]))) if returnz: return x, y, z return x, y def mapToLengthWithoutCoordTransfer(self, pixelpos: tuple, mode: str = 'df') -> tuple: """ Temporarily resets the cooridnate transform and returns original coordinates of the dataset. Used for determining a new coordinate transfer. :param pixelpos: Tuple of x, y pixel coordinates :param mode: Acquisition mode of optical image ('df' or 'bf') :return: Tuple (x, y, z) world coordinates """ orgCoordTransform = self.coordinatetransform self.coordinatetransform = None x, y, z = self.mapToLength(pixelpos, mode, force=True, returnz=True) self.coordinatetransform = orgCoordTransform return x, y, z def mapToLengthSpectrometer(self, pixelpos, microscopeMode='df', noz=False): p0x, p0y, z = self.mapToLength(pixelpos, mode=microscopeMode, returnz=True) x, y = p0x + self.pshift[0], p0y + self.pshift[1] return x, y, z def saveNewImageMarkers(self, imgMarkerList: List['ImageMarker']) -> None: self.imageMarkers = imgMarkerList self.save() def newProject(self, fname): path = os.path.split(fname)[0] name = os.path.splitext(os.path.basename(fname))[0] newpath = os.path.join(path, name) fname = os.path.join(newpath, name + ".pkl") if not os.path.exists(newpath): os.mkdir(newpath) # for new projects a directory will be created elif os.path.exists(fname): # if this project is already there, load it instead self.__dict__.update(loadData(fname).__dict__) return fname def getTilePath(self): scandir = os.path.join(self.path, "tiles") if not os.path.exists(scandir): os.mkdir(scandir) return scandir def getScanPath(self): scandir = os.path.join(self.path, "scanimages") if not os.path.exists(scandir): os.mkdir(scandir) return scandir def getDetectParamsPath(self) -> str: return os.path.join(self.path, "detection.graph") def updatePath(self): self.path = os.path.split(self.fname)[0] self.name = os.path.splitext(os.path.basename(self.fname))[0] if hasattr(self.particleContainer, 'completedSpecScans'): # Will be false for older versions. Will be created in legacy convert for measSeries in self.particleContainer.completedSpecScans: measSeries.setUpToDataset(self) def getSpectraFileName(self): return os.path.join(self.path, 'spectra.npy') def getProjectFilePath(self) -> str: return self.fname def getImageName(self): return os.path.join(self.path, 'fullimage.tif') def getZvalImageName(self): return os.path.join(self.path, "zvalues.tif") def getLegacyImageName(self): return os.path.join(self.path, "fullimage.png") def getBackgroundImageName(self): return os.path.join(self.path, "background.bmp") def getParticleTypeGroups(self) -> Dict[str, List[str]]: return self._particleTypeGroups def setParticleTypeGroups(self, newGroups: Dict[str, List[str]]): self._particleTypeGroups = newGroups def getTmpImageName(self): return os.path.join(self.path, "tmp.bmp") def save(self): saveData(self, self.fname) def saveBackup(self): inc = 0 while True: directory = os.path.dirname(self.fname) filename = self.name + '_backup_' + str(inc) + '.pkl' path = os.path.join(directory, filename) if os.path.exists(path): inc += 1 else: saveData(self, path) return filename