Commit 046c9871 authored by Josef Brandt's avatar Josef Brandt

Merge branch 'Development'

parents 8074c9d8 deabc28f
......@@ -20,3 +20,7 @@ cythonModules/build/
chemometrics/Assignments.txt
chemometrics/Data.txt
chemometrics/Assignments_all.txt
chemometrics/Data_all.txt
......@@ -5,7 +5,7 @@ from sklearn.decomposition import PCA
from sklearn.cluster import DBSCAN
# from scipy import spatial
# from itertools import combinations
from random import sample
from random import sample, random
import pickle
from sklearn.ensemble import RandomForestClassifier
from sklearn.neural_network import MLPClassifier
......@@ -14,9 +14,10 @@ import sys
sys.path.append("C://Users//xbrjos//Desktop//Python")
from gepard.analysis.particleContainer import ParticleContainer
from gepard.analysis import particleAndMeasurement as pm
from gepard.analysis import particleCharacterization as pc
from gepard.helperfunctions import cv2imread_fix
from methods import SubsamplingMethod
from helpers import timingDecorator
def get_pca(data: np.ndarray, numComp: int = 2) -> np.ndarray:
try:
......@@ -59,34 +60,57 @@ def get_n_points_closest_to_point(points: np.ndarray, n: int, refPoint: np.ndarr
return list(sortedIndices[:n])
def get_particle_featurematrix(particleContainer: ParticleContainer) -> np.ndarray:
def get_particle_featurematrix(particleContainer: ParticleContainer, fullimg: np.ndarray = None) -> np.ndarray:
"""
:return: np.ndarray, numRows: Features, numCols: Particles
"""
vectors: list = []
for particle in particleContainer.particles:
vectors.append(get_characteristic_vector(particle))
firstVecLength: int = 0
for index, particle in enumerate(particleContainer.particles):
particleImg: np.ndarray = None
if fullimg is not None:
particleImg = pc.getParticleImageFromFullimage(particle.contour, fullimg)
vector: np.ndarray = get_characteristic_vector(particle, particleImg)
if index == 0:
firstVecLength = vector.shape[0]
else:
assert vector.shape[0] == firstVecLength, 'particle feature vectors have non-uniform lengths...'
vectors.append(vector)
vectors: np.ndarray = np.array(vectors)
assert vectors.shape[0] == len(particleContainer.particles)
return vectors
def get_characteristic_vector(particle: pm.Particle) -> np.ndarray:
def get_characteristic_vector(particle: pm.Particle, particleImg: np.ndarray = None) -> np.ndarray:
vector: list = []
# vector += list(get_log_hu_moments(particle.contour))
vector += list(get_log_hu_moments(particle.contour))
vector.append(float(get_color_index(particle.color)))
vector.append(get_solidity(particle.contour))
vector.append(get_aspect_ratio(particle.contour))
vector.append(get_extent(particle.contour))
vector.append(cv2.contourArea(particle.contour))
# vector.append(get_shape_index(particle.shape))
# vector.append(cv2.arcLength(particle.contour, True))
vector.append(get_shape_index(particle.shape))
vector.append(cv2.arcLength(particle.contour, True))
for ftHarmonic in get_curvature_ft(particle.contour):
vector.append(ftHarmonic)
if particleImg is not None:
image_features: np.ndarray = get_image_feature_vec(particleImg)
for val in image_features:
vector.append(val)
# vector: np.ndarray = np.hstack((log_hu, color))
# if len(vector) != 11:
# print('error')
# assert len(vector) == 7 + 4, f'wrong feature vector: {vector} with shape: {vector.shape}'
for entry in vector:
try:
float(entry)
except ValueError:
print('not numeric value found')
raise
assert not np.isnan(entry)
return np.array(vector)
......@@ -95,9 +119,9 @@ def get_solidity(contour: np.ndarray) -> float:
hull: np.ndarray = cv2.convexHull(contour)
hull_area: float = cv2.contourArea(hull)
if area == 0 or hull_area == 0:
raise ValueError
solidity: float = area / hull_area
solidity: float = 0
else:
solidity: float = area / hull_area
return solidity
......@@ -112,10 +136,11 @@ def get_aspect_ratio(contour: np.ndarray) -> float:
if short > long:
long, short = short, long
if short == 0.0:
raise InvalidParticleError
aspectRatio: float = 1.0
if short > 0.0:
aspectRatio = long/short
return long/short
return aspectRatio
def get_extent(contour: np.ndarray) -> float:
......@@ -138,6 +163,23 @@ def get_log_hu_moments(contour: np.ndarray) -> np.ndarray:
return resultMoments[:, 0]
def get_image_feature_vec(particleImg: np.ndarray) -> np.ndarray:
meanStdev: np.ndarray = get_mean_and_stdev(particleImg)
return meanStdev.reshape(meanStdev.size)
def get_mean_and_stdev(img: np.ndarray) -> np.ndarray:
try:
meanStd: tuple = cv2.meanStdDev(img)
colorMean: np.ndarray = np.array([i[0] for i in meanStd[0]])
colorStd: np.ndarray = np.array([i[0] for i in meanStd[1]])
except cv2.error: # i.e, one pixel images...
colorMean: np.ndarray = np.array([128, 128, 128])
colorStd: np.ndarray = np.array([0, 0, 0])
# print('invalid here...:', img, img.shape, colorMean, colorStd, np.vstack((colorMean, colorStd)))
return np.vstack((colorMean, colorStd))
def get_color_hash(color: str, desiredLength: int = 4) -> np.ndarray:
colorArray: list = [int(i) for i in str(abs(hash(color)))[:desiredLength]]
return np.transpose(np.array(colorArray))
......@@ -150,23 +192,143 @@ def get_color_index(color: str) -> int:
return colors.index(color)
# def get_shape_index(shape: str) -> int:
# shapes: list = ['spherule', 'fibre', 'flake', 'irregular']
# assert shape in shapes
# return shapes.index(shape)
def get_shape_index(shape: str) -> int:
shapes: list = ['spherule', 'fibre', 'flake', 'irregular', None]
assert shape in shapes
return shapes.index(shape)
def get_curvature_ft(contour: np.ndarray, angularSegment: float = 20, numHarmonics: int = 8) -> np.ndarray:
"""
Calculates curvature of the contour and applies discrete fourier transform (dft) to it.
Adapted from Xu et al, "Comparison of shape features for the classification of wear particles",
doi:10.1016/S0952-1976(97)00017-1
:param angularSegment: In degree, section of 360 degree to consider for each angle
:param contour: Particle Contour
:param numHarmonics: Number of FT Harmonics to return
:return: the computed
"""
m: int = int(round(contour.shape[0] / (360 / angularSegment))) # smoothing factor, is this reasonable? That's, how it's described in the paper...
numPoints: int = contour.shape[0]
curvature: np.ndarray = np.zeros(numPoints)
for i in range(numPoints):
x, y = contour[i, 0, 0], contour[i, 0, 1]
index_before = i-m-1
index_after = i+m+1
if index_after > numPoints-1:
index_after -= numPoints
x_before, y_before = contour[index_before, 0, 0], contour[index_before, 0, 1]
x_after, y_after = contour[index_after, 0, 0], contour[index_after, 0, 1]
tan_before: float = 0
if x - x_before != 0:
tanr_before = np.rad2deg(np.arctan((y-y_before) / (x-x_before)))
tan_after: float = 0
if x_after - x != 0:
tan_after: float = np.rad2deg(np.arctan((y_after-y) / (x_after-x)))
curvature[i] = tan_after - tan_before
dft: np.ndarray = np.fft.fft(curvature)
freqs: np.ndarray = np.fft.fftfreq(numPoints)
mask: np.ndarray = freqs > 0
dft_true: np.ndarray = 2 * np.abs(dft/numPoints)
dft_true = dft_true[mask]
numFTPoints = dft_true.shape[0]
# let's only take the odd ones (they seem to carry more information).
# Also, make sure that always numHarmonics numbers come out. Fill up with zeros, if contour was very short
indicesToTake: np.ndarray = np.arange(numHarmonics)*2 + 1
final_dfts = np.zeros(numHarmonics)
for i, indexToTake in enumerate(indicesToTake):
if indexToTake < numFTPoints:
final_dfts[i] = dft_true[indexToTake]
return final_dfts
class TrainedSubsampling(SubsamplingMethod):
def __init__(self, particleContainer: ParticleContainer, desiredFraction: float,
path: str = r'C:\Users\xbrjos\Desktop\Python\Subsampling\chemometrics\RandomForestClassifier, score 0.72.pkl'):
path: str = r'C:\Users\xbrjos\Desktop\Python\Subsampling\chemometrics\RandomForestClassifier, score 0.79.pkl',
fakeScore: float = 0.8):
super(TrainedSubsampling, self).__init__(particleContainer, desiredFraction)
self.score: float = None
self.clf = None
self.clfPath: str = path
self.fraction = desiredFraction
self.fakeClassifier: bool = True
self.fakeScore: float = fakeScore
self.fractionForExtrapolation: float = 0.0
self.predictedMPIndices: list = []
self._predict_MP_Indices()
def equals(self, otherMethod) -> bool:
isEqual: bool = False
if type(otherMethod) == TrainedSubsampling and otherMethod.fraction == self.fraction:
if otherMethod.score == self.score and otherMethod.clf is self.clf:
isEqual = True
return isEqual
@property
def label(self) -> str:
return 'Trained Random Sampling'
label: str = 'Dummy Trained Random Sampling'
if self.fakeClassifier:
label += f' (score {self.fakeScore})'
else:
label += f' (score {self.score})'
return label
def _predict_MP_Indices(self) -> None:
from evaluation import is_MP_particle
if not self.fakeClassifier:
self._load_classifier()
fullimgpath: str = r'C:\Users\xbrjos\Desktop\temp MP\Fullimages'
dsetname: str = self.particleContainer.datasetParent.name
imgPath: str = os.path.join(fullimgpath, dsetname + '.tif')
fullimg = cv2imread_fix(imgPath)
features: np.ndarray = get_particle_featurematrix(self.particleContainer, fullimg)
predictions: np.ndarray = self.clf.predict(features)
else:
self.score = self.fakeScore
particles: list = self.particleContainer.particles
predictions: np.ndarray = np.zeros(len(particles))
for index, particle in enumerate(particles):
if is_MP_particle(particle):
if random() <= self.fakeScore:
predictions[index] = 1
else:
if random() > self.fakeScore:
predictions[index] = 1
mpIndices = list(np.where(predictions == 1)[0])
nonMPIndices = list(np.where(predictions == 0)[0])
numNonMPIndices = len(nonMPIndices)
fracNonMPToTake: float = float(np.clip(-0.5 + 1/0.1 * self.fraction, 0.0, 1.0))
numNonMPToTake: int = int(round(fracNonMPToTake * numNonMPIndices))
self.predictedMPIndices = mpIndices + sample(nonMPIndices, numNonMPToTake)
def get_maximum_achievable_fraction(self) -> float:
maxFrac: float = 0.10
return maxFrac
def apply_subsampling_method(self) -> list:
numParticlesToSelect = round(len(self.particleContainer.particles) * self.fraction)
if numParticlesToSelect > len(self.predictedMPIndices):
numParticlesToSelect = len(self.predictedMPIndices)
indicesToSelect = sample(self.predictedMPIndices, numParticlesToSelect)
selectedParticles: list = []
for index in indicesToSelect:
selectedParticles.append(self.particleContainer.getParticleOfIndex(index))
numOrigParticles = len(self.particleContainer.particles)
numEnhancedParticles = len(self.predictedMPIndices)
self.fractionForExtrapolation = self.fraction * (numOrigParticles/numEnhancedParticles)
return selectedParticles
def _load_classifier(self) -> None:
assert os.path.exists(self.clfPath)
......@@ -183,18 +345,37 @@ class TrainedSubsampling(SubsamplingMethod):
mpIndices: list = list(np.where(assignments == 1)[0])
nonMpIndices: list = list(np.where(assignments == 0)[0])
numEstimMPParticles: int = len(mpIndices)
numPredictedMP: int = len(mpIndices)
numParticlesToMeasure = round(len(predictedAssignments) * self.fraction)
if numParticlesToMeasure <= numEstimMPParticles:
if numParticlesToMeasure <= numPredictedMP:
indicesToMeasure = set(sample(mpIndices, numParticlesToMeasure))
else:
remainingIndices: int = int(numParticlesToMeasure - numEstimMPParticles)
remainingIndices: int = int(numParticlesToMeasure - numPredictedMP)
indicesToMeasure = set(mpIndices + sample(nonMpIndices, remainingIndices))
assert len(indicesToMeasure) == numParticlesToMeasure
return indicesToMeasure
def get_theoretic_frac(self) -> float:
"""
The theoretical fraction that considers also the scoring of the trained model.
It is used for extrapolating the mpCount of the subsampled particle list.
:return:
"""
# score: float = self.score
# diff: float = 1/self.fraction - 1 # i.e., from 50 % score to 100 % score
# factor: float = 1 + (1 - score)/0.5 * diff
# pow: float = (1-self.fakeScore)
# theoreticFactor: float = (1/factor**pow)
# print('actual fraction, theor. factor is', self.fraction, theoreticFactor)
# return theoreticFactor
return self.fractionForExtrapolation
# class ChemometricSubsampling(SubsamplingMethod):
# # def __init__(self, particleContainer: ParticleContainer, desiredFraction: float):
......@@ -328,4 +509,28 @@ class TrainedSubsampling(SubsamplingMethod):
# assert abs(totalPointsAdded - numPointsToSelect) <= 1
# for clusterIndex in pointsPerCluster.keys():
# assert 0 <= pointsPerCluster[clusterIndex] <= len(labels[labels == clusterIndex])
# return pointsPerCluster
\ No newline at end of file
# return pointsPerCluster
if __name__ == '__main__':
import matplotlib.pyplot as plt
fractions: np.ndarray = np.linspace(0.01, 1, 100)
scores: np.ndarray = np.linspace(0.5, 1.0, 5)
plt.clf()
for score in scores:
# if score == 0.5:
# theorFractions = fractions
# a, b, n = 1, 1, 1.5
# data1 = a * fractions**n / (fractions**n + b)
# data1 -= data1.min()
# data1 /= data1.max()
# theorFactors = 0.5 + 0.5*data1
theorFractions = []
for frac in fractions:
diff: float = 1 / frac - 1 # i.e., from 50 % score to 100 % score
factor: float = 1 + (1 - score) / 0.5 * diff
theorFractions.append(1/factor**0.2)
plt.plot(fractions, theorFractions, label=str(score))
plt.legend()
plt.show()
\ No newline at end of file
......@@ -12,9 +12,12 @@ from sklearn.feature_selection import chi2
import pickle
import time
import sys
import os
sys.path.append("C://Users//xbrjos//Desktop//Python")
from gepard import dataset
from gepard.analysis.particleContainer import ParticleContainer
from gepard.analysis.particleCharacterization import getParticleImageFromFullimage
from gepard.helperfunctions import cv2imread_fix
from input_output import get_pkls_from_directory
from chemometricMethods import get_log_hu_moments, get_color_index, get_pca, get_characteristic_vector
......@@ -61,38 +64,49 @@ if __name__ == '__main__':
recreateNew: bool = True
if recreateNew:
pklsInFolders: dict = get_pkls_from_directory(r'C:\Users\xbrjos\Desktop\temp MP\NewDatasets')
fullimgpath: str = r'C:\Users\xbrjos\Desktop\temp MP\Fullimages'
X: list = []
y: list = []
counter = 0
for folder in pklsInFolders.keys():
for pklPath in pklsInFolders[folder]:
if counter < 100:
if counter < 50:
dset: dataset.DataSet = dataset.loadData(pklPath)
print('loaded', dset.name)
print('loaded', dset.name, counter)
imgPath: str = os.path.join(fullimgpath, dset.name + '.tif')
fullimg = cv2imread_fix(imgPath)
print('loaded fullimg', imgPath, counter)
partContainer: ParticleContainer = dset.particleContainer
for particle in partContainer.particles:
features: np.ndarray = get_characteristic_vector(particle)
# features: list = [abs(i) for i in get_log_hu_moments(particle.contour)]
# features.append(get_color_index(particle.color))
firstVecLength: int = 0
for index, particle in enumerate(partContainer.particles):
partImg: np.ndarray = getParticleImageFromFullimage(particle.contour, fullimg)
features: np.ndarray = get_characteristic_vector(particle, partImg)
if index == 0:
firstVecLength = features.shape[0]
else:
assert features.shape[0] == firstVecLength
X.append(features)
y.append(int(is_MP_particle(particle)))
counter += 1
X: np.ndarray = np.array(X)
y: np.ndarray = np.array(y)
X_all: np.ndarray = np.array(X)
y_all: np.ndarray = np.array(y)
MPindices: np.ndarray = np.where(y == 1)[0]
nonMPindices: np.ndarray = np.where(y == 0)[0]
X_all: np.ndarray = SelectKBest(chi2, k=15).fit_transform(abs(X_all), y_all)
MPindices: np.ndarray = np.where(y_all == 1)[0]
nonMPindices: np.ndarray = np.where(y_all == 0)[0]
nonMPindices: list = sample(list(nonMPindices), len(MPindices))
X_MP: list = list(X[MPindices])
y_MP: list = list(y[MPindices])
X_MP: list = list(X_all[MPindices])
y_MP: list = list(y_all[MPindices])
X_nonMP: list = list(X[nonMPindices])
y_nonMP: list = list(y[nonMPindices])
X_nonMP: list = list(X_all[nonMPindices])
y_nonMP: list = list(y_all[nonMPindices])
assert set(y_MP) == {1}
assert set(y_nonMP) == {0}
......@@ -100,6 +114,7 @@ if __name__ == '__main__':
X_equalized: np.ndarray = np.array(X_MP + X_nonMP)
y_equalized: np.ndarray = np.array(y_MP + y_nonMP)
sum = X_MP + X_nonMP
dset: tuple = (X_equalized, y_equalized)
......@@ -110,15 +125,37 @@ if __name__ == '__main__':
dset: tuple = pickle.load(fp)
X, y = dset
# np.savetxt('Data.txt', X)
# np.savetxt('Assignments.txt', y)
# with open(r'C:\Users\xbrjos\Desktop\Python\Subsampling\chemometrics\RandomForestClassifier, score 0.72.pkl', "rb") as fp:
# clf: RandomForestClassifier = pickle.load(fp)
# y_predicted = clf.predict(X)
np.savetxt('Data.txt', X)
np.savetxt('Assignments.txt', y)
np.savetxt('Data_all.txt', X_all)
np.savetxt('Assignments_all.txt', y_all)
# princComps = get_pca(X.transpose(), numComp=2)
#
# plt.scatter(princComps[:, 0], princComps[:, 1])
# print(X_equalized.shape)
# X: np.ndarray = SelectKBest(chi2, k=5).fit_transform(X, y)
# X: np.ndarray = SelectKBest(chi2, k=15).fit_transform(X, y)
# print(X_equalized.shape)
test_classification_models((X, y))
X = StandardScaler().fit_transform(X)
clf = RandomForestClassifier(n_estimators=1000)
clf.fit(X, y)
score = clf.score(X_all, y_all)
y_predicted = clf.predict(X_all)
errors: dict = {int(k): 0 for k in np.unique(y_all)}
for j in range(len(y_predicted)):
if y_all[j] != y_predicted[j]:
errors[y_all[j]] += 1
print('num MP Particles in set:', len(X_MP))
print(f'randForest with test size {len(y_all)} has score {round(score, 2)}, errors: {errors}')
import sys
sys.path.append("C://Users//xbrjos//Desktop//Python")
from gepard.helperfunctions import cv2imread_fix
from gepard.dataset import DataSet,loadData
from gepard.analysis.particleContainer import ParticleContainer
import cv2
import numpy as np
from scipy import spatial
import os
import matplotlib.pyplot as plt
from helpers import get_filterDimensions_from_dataset, get_center_from_filter_dimensions, convert_length_to_pixels
# from evaluation import is_MP_particle
import evaluation
def get_particle_patchiness(dataset: DataSet, numCells: int = 50, onlyMP=False) -> float:
offset, diameter, [width, height] = get_filterDimensions_from_dataset(dataset)
center: np.ndarray = get_center_from_filter_dimensions(offset, diameter)
width: float = convert_length_to_pixels(dataset, width)
height: float = convert_length_to_pixels(dataset, height)
pixelsPerTile: int = max(int(round(width/numCells)), int(round(height/numCells)))
centerX: int = int(round(convert_length_to_pixels(dataset, center[0] / pixelsPerTile)))
centerY: int = int(round(convert_length_to_pixels(dataset, center[1] / pixelsPerTile)))
radius: int = int(round(convert_length_to_pixels(dataset, diameter / pixelsPerTile * 0.5)))
numRows: int = int(np.ceil(height / pixelsPerTile)) + 1
numCols: int = int(np.ceil(width / pixelsPerTile)) + 1
partCount: int = 0
densityImage: np.ndarray = np.zeros((numRows, numCols))
for particle in dataset.particleContainer.particles:
if (onlyMP and evaluation.is_MP_particle(particle)) or not onlyMP:
particleCenter: tuple = np.mean(particle.contour[:, 0, 0]), np.mean(particle.contour[:, 0, 1])
row: int = int(round(particleCenter[1] / pixelsPerTile))
col: int = int(round(particleCenter[0] / pixelsPerTile))
densityImage[row, col] += 1
partCount += 1
mask: np.ndarray = np.zeros_like(densityImage)
cv2.circle(mask, (centerY, centerX), radius, 1, -1)
relevantData: np.ndarray = densityImage[mask > 0]
mean: np.ndarray = np.round(np.mean(relevantData), 2)
std: np.ndarray = np.round(np.std(relevantData), 2)
ratio: float = round(std/mean, 2)
# plt.imshow(densityImage)
# plt.title(f'MP particle count: {partCount},\ndensity mean: {mean}, density std: {std},\npatchiness = {ratio}')
# plt.axis('off')
# plt.tight_layout()
# plt.show()
# print(f'sample: {dataset.name}, mean: {mean}, std: {std}, ratio = {ratio}')
return ratio
if __name__ == '__main__':
# imgpath: str = r'C:\Users\xbrjos\Desktop\temp MP\Fullimages'
# imgname: str = '181120_MCI_2_ds1+2_all_ kleiner500_10_1.tif'
# imgname: str = '190619_5_PTPH_sld_190321_ds1_50_1_neu.tif'
#191213_P190814_TPHZ_ds1_50_1
# img: np.ndarray = cv2imread_fix(os.path.join(imgpath, imgname))
# gray: np.ndarray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
# _, binimg = cv2.threshold(gray, 20, 1, cv2.THRESH_BINARY_INV)
# distmap: np.ndarray = cv2.distanceTransform(binimg, cv2.DIST_L1, 3)
# plt.imshow(distmap, cmap='gray')
paths: list = [r'C:\Users\xbrjos\Desktop\temp MP\NewDatasets\water\181120_MCI_2_ds1+2_all_ kleiner500_10_1.pkl',
r'C:\Users\xbrjos\Desktop\temp MP\NewDatasets\wastewater, water\191213_P190814_TPHZ_ds1_50_1.pkl',
r'C:\Users\xbrjos\Desktop\temp MP\NewDatasets\Air\191119_RW6_Solling_50_2_neu.pkl']
distances: list = []
allParticles = []
for path in paths:
dset = loadData(path)
get_particle_patchiness(dset, 50, onlyMP=True)
for particle in dset.particleContainer.particles:
if evaluation.is_MP_particle(particle):
allParticles.append(particle.getParticleAssignment())
print(set(allParticles))
\ No newline at end of file
import numpy as np
cimport numpy as np
cimport cython
from cython cimport boundscheck, wraparound
@boundscheck(False)
@wraparound(False)
cdef bint box_overlaps_contour(unsigned int[:] boxTopLeftXY, unsigned int boxSize, unsigned int[:, :, :] contourData):
cdef bint isOverlapping = False
cdef unsigned int xmin, xmax, width, boxXmin, boxXmax, ymin, ymax, height, boxYmin, boxYmax
xmin = np.min(contourData[:, 0, 1])
xmax = np.max(contourData[:, 0, 1])
width = xmax - xmin
boxXmin = boxTopLeftXY[0]
boxXmax = boxTopLeftXY[0] + boxSize