# -*- coding: utf-8 -*- """ GEPARD - Gepard-Enabled PARticle Detection Copyright (C) 2018 Lars Bittrich and Josef Brandt, Leibniz-Institut für Polymerforschung Dresden e. V. This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program, see COPYING. If not, see . """ import numpy as np import cv2 cv2.useOptimized() from time import time from scipy.interpolate import InterpolatedUnivariateSpline from scipy import ndimage as ndi from skimage.feature import peak_local_max from skimage.morphology import watershed from random import random from .errors import InvalidParticleError def closeHolesOfSubImage(subimg): #Add padding to TrehsholdImage subimg = cv2.copyMakeBorder(subimg, 1, 1, 1, 1, 0) # Copy the thresholded image. im_floodfill = subimg.copy() # Mask used to flood filling. # Notice the size needs to be 2 pixels than the image. h, w = subimg.shape[:2] mask = np.zeros((h+2, w+2), np.uint8) # Floodfill from point (0, 0) cv2.floodFill(im_floodfill, mask, (0,0), 255); # Invert floodfilled image im_floodfill_inv = cv2.bitwise_not(im_floodfill) # Combine the two images to get the foreground. im_out = subimg | im_floodfill_inv return im_out[1:-1, 1:-1] class Parameter(object): def __init__(self, name, dtype, value=None, minval=None, maxval=None, decimals=0, stepsize=1, helptext=None, show=False, linkedParameter=None): self.name = name self.dtype = dtype self.value = value self.valrange = (minval, maxval) self.decimals = decimals self.stepsize = stepsize self.helptext = helptext self.show = show self.linkedParameter = linkedParameter class MeasurementPoint(object): def __init__(self, particleIndex, x, y): self.particleIndex = particleIndex self.x = x self.y = y class Segmentation(object): def __init__(self, dataset=None, parent=None): self.cancelcomputation = False self.parent = parent self.defaultParams = {'adaptiveHistEqu': False, 'claheTileSize': 128, 'contrastCurve': np.array([[50,0],[100,200],[200,255]]), 'activateContrastCurve': True, 'blurRadius': 9, 'activateLowThresh': True, 'lowThresh': 0.2, 'activateUpThresh': False, 'upThresh': 0.5, 'invertThresh': False, 'maxholebrightness': 0.5, 'minparticlearea': 20, 'maxparticlearea': 10000, 'minparticledistance': 20, 'closeBackground': True, 'measurefrac': 1, 'compactness': 0., 'seedRad': 3} if dataset is not None: self.detectParams = dataset.detectParams for key in self.defaultParams: if key not in self.detectParams: self.detectParams[key] = self.defaultParams[key] else: self.detectParams = self.defaultParams self.initializeParameters() def initializeParameters(self): parlist = [Parameter("adaptiveHistEqu", np.bool, self.detectParams['adaptiveHistEqu'], helptext="Adaptive histogram equalization", show=False, linkedParameter='claheTileSize'), Parameter("claheTileSize", int, self.detectParams['claheTileSize'], 1, 2048, 1, 1, helptext="Tile size for adaptive histogram adjustment\nThe Image will be split into tiles with size approx. (NxN)", show=True), Parameter("contrastCurve", np.ndarray, self.detectParams['contrastCurve'], helptext="Curve contrast"), Parameter("activateContrastCurve", np.bool, self.detectParams['activateContrastCurve'], helptext="activate Contrast curve", show=True, linkedParameter='contrastCurve'), Parameter("blurRadius", int, self.detectParams['blurRadius'], 3, 99, 1, 2, helptext="Blur radius", show=True), Parameter("invertThresh", np.bool, self.detectParams['invertThresh'], helptext="Invert the current threshold", show=False), Parameter("activateLowThresh", np.bool, self.detectParams['activateLowThresh'], helptext="activate lower threshold", show=False, linkedParameter='lowThresh'), Parameter("lowThresh", float, self.detectParams['lowThresh'], .01, .9, 2, .02, helptext="Lower threshold", show=True), Parameter("activateUpThresh", np.bool, self.detectParams['activateUpThresh'], helptext="activate upper threshold", show=False, linkedParameter='upThresh'), Parameter("upThresh", float, self.detectParams['upThresh'], .01, 1.0, 2, .02, helptext="Upper threshold", show=True), Parameter("maxholebrightness", float, self.detectParams['maxholebrightness'], 0, 1, 2, 0.02, helptext="Close holes brighter than..", show = True), Parameter("minparticlearea", int, self.detectParams['minparticlearea'], 1, 1000, 0, 50, helptext="Min. particle pixel area", show=False), Parameter("maxparticlearea", int, self.detectParams['maxparticlearea'], 10, 1E9, 0, 50, helptext="Max. particle pixel area", show=True), Parameter("minparticledistance", int, self.detectParams['minparticledistance'], 5, 1000, 0, 5, helptext="Min. distance between particles", show=False), Parameter("measurefrac", float, self.detectParams['measurefrac'], 0, 1, 2, stepsize = 0.05, helptext="measure fraction of particles", show=False), Parameter("closeBackground", np.bool, self.detectParams['closeBackground'], helptext="close holes in sure background", show=False), Parameter("sure_fg", None, helptext="Show sure foreground", show=True), Parameter("compactness", float, self.detectParams['compactness'], 0, 1, 2, 0.05, helptext="watershed compactness", show=False), Parameter("watershed", None, helptext="Show watershed markers", show=True), ] # make each parameter accessible via self.name # the variables are defined as properties and because of how the local context # in for loops works the actural setter and getter functions are defined inside # a separate contex in a local function def makeGetter(p): return lambda : p.value def makeSetter(p): def setter(value): p.value = value return setter for p in parlist: # variabels in self are writen directly to the name dictionary self.__dict__[p.name] = property(makeGetter(p), makeSetter(p)) self.parlist = parlist def setParameters(self, **kwargs): for key in kwargs: self.__dict__[key] = kwargs[key] def convert2Gray(self, img): gray = cv2.cvtColor(img,cv2.COLOR_RGB2GRAY) return gray def calculateHist(self, gray): hist = cv2.calcHist([gray],[0],None,[256],[0,256]) return hist def calculateHistFunction(self, points): t = np.linspace(0,1,800) x0 = np.concatenate(([-1.],points[:,0],[256.])) y0 = np.concatenate(([0.],points[:,1],[255.])) t0 = np.concatenate(([0.],np.cumsum(np.sqrt(np.diff(x0)**2+np.diff(y0)**2)))) t0 /= t0[-1] fx = InterpolatedUnivariateSpline(t0, x0, k=3) fy = InterpolatedUnivariateSpline(t0, y0, k=3) x = fx(t) y = fy(t) arr = np.zeros(256, dtype=np.uint8) xi = np.arange(256) ind = np.searchsorted(xi, x) arr[ind[ind<256]] = y[ind<256] arr[xi>points[:,0].max()] = 255 arr[xi255] = 255. arr[arr<0] = 0. return xi, arr def closeHoles(self, thresh): n, labels, stats, centroids = cv2.connectedComponentsWithStats(thresh, 8, cv2.CV_32S) newthresh = np.zeros_like(thresh) for label in range(1, n): up = stats[label, cv2.CC_STAT_TOP] left = stats[label, cv2.CC_STAT_LEFT] width = stats[label, cv2.CC_STAT_WIDTH] height = stats[label, cv2.CC_STAT_HEIGHT] subimg = np.uint8(255 * (labels[up:(up+height), left:(left+width)] == label)) newthresh[up:(up+height), left:(left+width)] += closeHolesOfSubImage(subimg) return newthresh def closeBrightHoles(self, thresh, grayimage, maxbrightness): n, labels, stats, centroids = cv2.connectedComponentsWithStats(thresh, 8, cv2.CV_32S) maxbrightness = np.uint8(maxbrightness * 255) print('num comps in brightHoles:', n) for label in range(1, n): up = stats[label, cv2.CC_STAT_TOP] left = stats[label, cv2.CC_STAT_LEFT] width = stats[label, cv2.CC_STAT_WIDTH] height = stats[label, cv2.CC_STAT_HEIGHT] subimg = np.uint8(255 * (labels[up:(up+height), left:(left+width)] == label)) # Add padding to TrehsholdImage subimg = cv2.copyMakeBorder(subimg, 1, 1, 1, 1, 0) # Copy the thresholded image. im_floodfill = subimg.copy() # Mask used to flood filling. # Notice the size needs to be 2 pixels than the image. h, w = subimg.shape[:2] mask = np.zeros((h+2, w+2), np.uint8) # Floodfill from point (0, 0) cv2.floodFill(im_floodfill, mask, (0,0), 255); indices = np.where(im_floodfill == 0)[0] if len(indices) > 0: if np.mean(grayimage[indices[0]]) > maxbrightness: # close hole and add closed image to thresh: im_floodfill_inv = cv2.bitwise_not(im_floodfill) # Combine the two images to get the foreground. im_out = subimg | im_floodfill_inv thresh[up:(up+height), left:(left+width)] += im_out[1:-1, 1:-1] return thresh def getEdgeBorders(self, image): edges = abs(cv2.Laplacian(image, cv2.CV_64F)) edges = cv2.blur(edges, (5, 5)) edges = edges**0.6 edges = edges/edges.max() return edges def filterThresholdByAreas(self, thresh, minarea, maxarea): newthresh = np.zeros_like(thresh) n, labels, stats, centroids = cv2.connectedComponentsWithStats(thresh, 8, cv2.CV_32S) print('num comps:', n) for label in range(1, n): area = stats[label, cv2.CC_STAT_AREA] if minarea < area < maxarea: up = stats[label, cv2.CC_STAT_TOP] left = stats[label, cv2.CC_STAT_LEFT] width = stats[label, cv2.CC_STAT_WIDTH] height = stats[label, cv2.CC_STAT_HEIGHT] subthresh = np.uint8(255 * (labels[up:(up+height), left:(left+width)] == label)) newthresh[up:(up+height), left:(left+width)] += subthresh return newthresh def getSureForeground(self, thresh, mindistance): sure_fg = np.zeros_like(thresh) n, labels, stats, centroids = cv2.connectedComponentsWithStats(thresh, 8, cv2.CV_32S) for label in range(1, n): up = stats[label, cv2.CC_STAT_TOP] left = stats[label, cv2.CC_STAT_LEFT] width = stats[label, cv2.CC_STAT_WIDTH] height = stats[label, cv2.CC_STAT_HEIGHT] subimg = np.uint8(255 * (labels[up:(up+height), left:(left+width)] == label)) subdist = cv2.distanceTransform(subimg, cv2.DIST_L2,3) subfg = np.uint8(peak_local_max(subdist, mindistance, indices = False)) if subfg.max() > 0 and random() < self.measurefrac: #i.e., at least one maximum value was added sure_fg[up:(up+height), left:(left+width)] += subfg elif random() < self.measurefrac: #simply get maximum of subdist submax = np.where(subdist == subdist.max()) sure_fg[up+submax[0][0], left+submax[1][0]] = 1 sure_fg = cv2.dilate(sure_fg, np.ones((3, 3))) return sure_fg def getMeasurementPoints(self, binParticle, numPoints=1): binParticle = cv2.copyMakeBorder(binParticle, 1, 1, 1, 1, 0) dist = cv2.distanceTransform(np.uint8(binParticle), cv2.DIST_L2,3) ind = np.argmax(dist) y = [ind//dist.shape[1]-1] x = [ind%dist.shape[1]-1] for i in range(numPoints-1): binParticle.flat[ind] = 0 dist = cv2.distanceTransform(np.uint8(binParticle), cv2.DIST_L2,3) ind = np.argmax(dist) y.append(ind//dist.shape[1]-1) x.append(ind%dist.shape[1]-1) return y, x def apply2Image(self, img, seedpoints, deletepoints, seedradius, dataset, return_step=None): t0 = time() # convert to gray image and do histrogram normalization gray = self.convert2Gray(img) print("gray") if self.adaptiveHistEqu: numTilesX = round(img.shape[1]/self.claheTileSize) numTilesY = round(img.shape[0]/self.claheTileSize) clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(numTilesY,numTilesX)) gray = clahe.apply(gray) if return_step=="claheTileSize": return gray, 0 print("adaptive Histogram Adjustment") if self.cancelcomputation: return None, None, None if self.activateContrastCurve: xi, arr = self.calculateHistFunction(self.contrastCurve) gray = arr[gray] print("contrast curve") if self.cancelcomputation: return None, None, None # return even if inactive! if return_step=="activateContrastCurve": return gray, 0 # image blur for noise-reduction blur = cv2.medianBlur(gray, self.blurRadius) blur = np.uint8(blur*(255/blur.max())) if return_step=="blurRadius": return blur, 0 print("blur") if self.cancelcomputation: return None, None, None # thresholding if self.activateLowThresh and not self.activateUpThresh: thresh = cv2.threshold(blur, int(255*self.lowThresh), 255, cv2.THRESH_BINARY)[1] if self.invertThresh: thresh = 255-thresh if return_step=="lowThresh": return thresh, 0 print("lower threshold") if self.cancelcomputation: return None, None, None elif self.activateLowThresh and self.activateUpThresh: lowerLimit, upperLimit = np.round(self.lowThresh*255), np.round(self.upThresh*255) thresh = np.zeros_like(blur) thresh[np.where(np.logical_and(blur >= lowerLimit, blur <= upperLimit))] = 255 if self.invertThresh: thresh = 255-thresh if return_step=="lowThresh" or return_step=="upThresh": return thresh, 0 print("between threshold") if self.cancelcomputation: return None, None, None elif not self.activateLowThresh and self.activateUpThresh: thresh = np.zeros_like(blur) thresh[np.where(blur <= np.round(self.upThresh*255))] = 255 if self.invertThresh: thresh = 255-thresh if return_step=="upThresh": return thresh, 0 print("upper threshold") if self.cancelcomputation: return None, None, None else: #no checkbox checked if self.parent is not None: self.parent.raiseWarning('No thresholding method selected!\nAborted detection..') print('NO THRESHOLDING SELECTED!') return blur, 0 #close holes darkter than self.max_brightness thresh = self.closeBrightHoles(thresh, blur, self.maxholebrightness) print("closed holes") if return_step=='maxholebrightness': return thresh, 0 if self.cancelcomputation: return None, None, None thresh = self.filterThresholdByAreas(thresh, self.minparticlearea, self.maxparticlearea) print('filter threshold by areas') if return_step == 'maxparticlearea': return thresh, 0 if self.cancelcomputation: return None, None, None ####get sure_fg '''the peak_local_max function takes the min distance between peaks. Unfortunately, that means that individual particles smaller than that distance are consequently disregarded. Hence, we need a connectec_components approach''' sure_fg = self.getSureForeground(thresh, self.minparticledistance) sure_bg = cv2.dilate(thresh, np.ones((5, 5)), iterations = 1) if self.closeBackground: sure_bg = self.closeHoles(sure_bg) # modify sure_fg and sure_bg with seedpoints and deletepoints if len(deletepoints)>0: h, w = sure_fg.shape[:2] mask = np.zeros((h+2, w+2), np.uint8) for p in np.int32(deletepoints): if 0 < p[0] < h and 0 < p[1] < w: #point has to be within image, otherwise the floodFill fails cv2.floodFill(sure_fg, mask, tuple([p[0], p[1]]), 0) for p in np.int32(seedpoints): cv2.circle(sure_fg, tuple([p[0], p[1]]), int(p[2]), 1, -1) for p in np.int32(deletepoints): cv2.circle(sure_fg, tuple([p[0], p[1]]), int(p[2]), 0, -1) cv2.circle(sure_bg, tuple([p[0], p[1]]), int(p[2]), 0, -1) print("sure_fg, sure_bg") if self.cancelcomputation: return None, None, None unknown = cv2.subtract(sure_bg, sure_fg) ret, markers = cv2.connectedComponents(sure_fg) markers = markers+1 markers[unknown==255] = 0 print("connectedComponents") if self.cancelcomputation: return None, None, None if return_step=="sure_fg": img = np.zeros_like(sure_fg) img[np.nonzero(sure_fg)] |= 1 #dilation of sure_fg is included in self.getSureForeground img[np.nonzero(sure_bg)] |= 2 return img, 1 dist_transform = cv2.distanceTransform(thresh, cv2.DIST_L2,5) print("distanceTransform") if self.cancelcomputation: return None, None, None #ich habe jetzt nur noch den Skimage Watershed integriert. Oben auskommentiert der opencv watershed, falls wir ihn doch nochmal für irgendwas brauchen... markers = ndi.label(sure_fg)[0] markers = watershed(-dist_transform, markers, mask=sure_bg, compactness = self.compactness, watershed_line = True) #labels = 0 for background, 1... for particles print("watershed") if self.cancelcomputation: return None, None, None if return_step=="watershed": return np.uint8(255*(markers!=0)), 0 if cv2.__version__ > '3.5': contours, hierarchy = cv2.findContours(markers, cv2.RETR_CCOMP, cv2.CHAIN_APPROX_NONE) else: temp, contours, hierarchy = cv2.findContours(markers, cv2.RETR_CCOMP, cv2.CHAIN_APPROX_NONE) print("contours") if self.cancelcomputation: return None, None, None from .analysis.particleCharacterization import getParticleStatsWithPixelScale #TODO: AH, this should be imported at beginning of the file, but there it always chrashes.. particlestats = [] measurementPoints = {} tmpcontours = [contours[i] for i in range(len(contours)) if hierarchy[0,i,3]<0] contours = [] particleIndex = 0 for cnt in tmpcontours: label = markers[cnt[0,0,1],cnt[0,0,0]] if label==0: continue try: stats = getParticleStatsWithPixelScale(cnt, img, dataset) except InvalidParticleError: print('invalid contour in detection, skipping partile. Contour is:', cnt) continue particlestats.append(stats) x0, x1 = cnt[:,0,0].min(), cnt[:,0,0].max() y0, y1 = cnt[:,0,1].min(), cnt[:,0,1].max() subimg = (markers[y0:y1+1,x0:x1+1]).copy() subimg[subimg!=label] = 0 y, x = self.getMeasurementPoints(subimg) contours.append(cnt) measurementPoints[particleIndex] = [] for index in range(0, len(x)): newMeasPoint = MeasurementPoint(particleIndex, x[index] + x0, y[index] + y0) measurementPoints[particleIndex].append(newMeasPoint) particleIndex += 1 if return_step is not None: raise NotImplementedError(f"this particular return_step: {return_step} is not implemented yet") tf = time() print("particle detection took:", tf-t0, "seconds") return measurementPoints, contours, particlestats