...
 
Commits (7)
......@@ -14,3 +14,9 @@ cythonModules/build/
*.pyd
*.html
*.pkl
chemometrics/Assignments.txt
chemometrics/Data.txt
import numpy as np
import cv2
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.cluster import DBSCAN
from scipy import spatial
from itertools import combinations
from random import sample
import sys
sys.path.append("C://Users//xbrjos//Desktop//Python")
from gepard.analysis.particleContainer import ParticleContainer
from gepard.analysis import particleAndMeasurement as pm
from methods import SubsamplingMethod
def get_pca(data: np.ndarray, numComp: int = 2) -> np.ndarray:
try:
standardizedData = StandardScaler().fit_transform(data.copy())
except ValueError:
print('first standardscaler attempt failed, retrying..')
print('datashape', data.shape)
print('unique:', np.unique(data))
raise
pca = PCA(n_components=numComp)
princComp: np.ndarray = pca.fit_transform(np.transpose(standardizedData))
return princComp
def do_DBSCAN_clustering(data: np.ndarray, eps: float = 0.1, min_samples: int = 10) -> tuple:
"""
Does DBSCAN clustering and finds noisy data
:param data: The input array
:param eps:
:param min_samples:
:return: Cluster labels for each point in the dataset given to fit(). Noisy samples are given the label -1.
"""
assert data.shape[1] == 2
standardizedData = StandardScaler().fit_transform(data)
db = DBSCAN(eps=eps, min_samples=min_samples).fit(standardizedData)
return db.labels_, db.core_sample_indices_
def get_n_points_closest_to_point(points: np.ndarray, n: int, refPoint: np.ndarray) -> list:
"""
Returns a list with indices of n points that are closest to the indicated refPoint
:param points: np.ndarray, cols: x, y, rows: individual points
:param n: number of points to return
:param refPoint: np.array([x, y]) of reference point
:return: list of point indices
"""
distancesToPoints: np.ndarray = np.linalg.norm(points - refPoint, axis=1)
sortedIndices = np.argsort(distancesToPoints)
return list(sortedIndices[:n])
class ChemometricSubsampling(SubsamplingMethod):
def __init__(self, particleContainer: ParticleContainer, desiredFraction: float):
super(ChemometricSubsampling, self).__init__(particleContainer, desiredFraction)
@property
def label(self) -> str:
return 'Chemometric Selection'
def apply_subsampling_method(self) -> list:
vectors: np.ndarray = self._get_particle_featurematrix()
try:
princComps: np.ndarray = get_pca(vectors)
except ValueError:
print('numParticles:', len(self.particleContainer.particles))
print('input featurematrix shape', vectors.shape)
clusterLabels, coreIndices = do_DBSCAN_clustering(princComps)
indices: list = self._get_indices_from_clusterLabels(princComps, clusterLabels, coreIndices)
selectedParticles: list = []
for particle in self.particleContainer.particles:
if particle.index in indices:
selectedParticles.append(particle)
return selectedParticles
def _get_particle_featurematrix(self) -> np.ndarray:
"""
:return: np.ndarray, numRows: Features, numCols: Particles
"""
vectors: list = []
for particle in self.particleContainer.particles:
extractor: FeatureExtractor = FeatureExtractor(particle)
vectors.append(extractor.get_characteristic_vector())
vectors: np.ndarray = np.transpose(np.array(vectors))
assert vectors.shape == (11, len(self.particleContainer.particles)), f'wrong featureMat-shape: {vectors.shape}'
return vectors
def equals(self, otherMethod) -> bool:
equals: bool = False
if type(otherMethod) == ChemometricSubsampling and otherMethod.fraction == self.fraction:
equals = True
return equals
def _get_indices_from_clusterLabels(self, points: np.ndarray, labels: np.ndarray, centerIndices: np.ndarray) -> list:
indices: list = []
allIndices: np.ndarray = np.arange(len(labels))
numPointsPerCluster: dict = self._get_numPoints_per_cluster(labels)
for clusterIndex in set(labels):
indToAppend: list = []
nPoints: int = int(numPointsPerCluster[clusterIndex])
indicesInCluster: np.ndarray = allIndices[labels == clusterIndex]
if clusterIndex == -1:
for ind in sample(list(indicesInCluster), nPoints):
# assert ind not in indices
indices.append(ind)
else:
clusterPoints: np.ndarray = points[indicesInCluster]
centerPoint: np.ndarray = np.mean(clusterPoints, axis=0)
indicesToSelect: list = get_n_points_closest_to_point(clusterPoints, nPoints, centerPoint)
for ind in indicesToSelect:
origInd = indicesInCluster[ind]
indices.append(origInd)
assert len(set(indices)) == len(indices), f'The calculated indices contain duplicates, ' \
f'num duplicates: {len(indices) - len(set(indices))}'
return indices
def _get_numPoints_per_cluster(self, labels: np.ndarray, noiseAmpFactor: float = 5) -> dict:
"""
MP Particles are expected to be the minority of all particles. So, if datapoints were classified as noise
(i.e., label = -1), it is likely that MP is in there. The abundancy of points taken from the noise is multiplied
by the noiseAmpFactor
:param labels:
:param noiseAmpFactor:
:return: A dictionary with keys = cluster index (i.e., label) and value = number of points to take from that
"""
pointsPerCluster: dict = {}
if type(labels) != np.ndarray:
labels = np.array(labels)
individualLabels: set = set(labels)
numPointsToSelect = round(len(labels) * self.fraction)
if numPointsToSelect == 0:
numPointsToSelect = 1
numNoisePoints = len(labels[labels == -1])
numClusteredPoints = len(labels) - numNoisePoints
# # get max noiseAmpFactor
if noiseAmpFactor > 1/self.fraction:
noiseAmpFactor = 1/self.fraction
numAmpPoints = numClusteredPoints + numNoisePoints*noiseAmpFactor
fractionPerCluster = np.clip(numPointsToSelect / numAmpPoints, 0.0, 1.0)
tooFewPoints = numPointsToSelect < len(individualLabels)
totalPointsAdded = 0
for ind in individualLabels:
if ind > -1:
if not tooFewPoints:
pointsToAdd = round(fractionPerCluster * len(labels[labels == ind]))
else:
pointsToAdd = 1 if totalPointsAdded < numPointsToSelect else 0
pointsPerCluster[ind] = pointsToAdd
totalPointsAdded += pointsToAdd
# fill up the rest with noisePoints
if numNoisePoints > 0:
diff: float = np.clip(numPointsToSelect - totalPointsAdded, 0, numNoisePoints)
pointsPerCluster[-1] = diff
totalPointsAdded += diff
# just in case too many points were selected (due to rounding errors), keep on deleting until it matches
while totalPointsAdded > numPointsToSelect:
indexWithHighestCount = None
maxCount = 0
for index in pointsPerCluster.values():
if pointsPerCluster[index] > maxCount:
maxCount = pointsPerCluster[index]
indexWithHighestCount = index
pointsPerCluster[indexWithHighestCount] -= 1
totalPointsAdded -= 1
if not abs(totalPointsAdded - numPointsToSelect) <= 1:
print('error')
assert abs(totalPointsAdded - numPointsToSelect) <= 1
for clusterIndex in pointsPerCluster.keys():
assert 0 <= pointsPerCluster[clusterIndex] <= len(labels[labels == clusterIndex])
return pointsPerCluster
class FeatureExtractor(object):
def __init__(self, particle: pm.Particle):
super(FeatureExtractor, self).__init__()
self.particle: pm.Particle = particle
def get_characteristic_vector(self) -> np.ndarray:
log_hu: np.ndarray = self._get_log_hu_moments()
color: np.ndarray = self._get_color_hash(self.particle.color, desiredLength=4)
vector: np.ndarray = np.hstack((log_hu, color))
if len(vector) != 11:
print('error')
assert len(vector) == 7 + 4, f'wrong feature vector: {vector} with shape: {vector.shape}'
return vector
def _get_log_hu_moments(self) -> np.ndarray:
moments: dict = cv2.moments(self.particle.contour)
resultMoments: np.ndarray = np.zeros((7, 1))
for index, mom in enumerate(cv2.HuMoments(moments)):
if mom != 0:
resultMoments[index] = -1 * np.copysign(1.0, mom) * np.log10(abs(mom))
else:
resultMoments[index] = 0
return resultMoments[:, 0]
def _get_color_hash(self, color: str, desiredLength: int = 4) -> np.ndarray:
colorArray: list = [int(i) for i in str(abs(hash(color)))[:desiredLength]]
return np.transpose(np.array(colorArray))
import numpy as np
import cv2
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.cluster import DBSCAN
# from scipy import spatial
# from itertools import combinations
from random import sample
import pickle
from sklearn.ensemble import RandomForestClassifier
from sklearn.neural_network import MLPClassifier
import os
import sys
sys.path.append("C://Users//xbrjos//Desktop//Python")
from gepard.analysis.particleContainer import ParticleContainer
from gepard.analysis import particleAndMeasurement as pm
from gepard.analysis import particleCharacterization as pc
from methods import SubsamplingMethod
from helpers import timingDecorator
def get_pca(data: np.ndarray, numComp: int = 2) -> np.ndarray:
try:
standardizedData = StandardScaler().fit_transform(data.copy())
except ValueError:
print('first standardscaler attempt failed, retrying..')
print('datashape', data.shape)
print('unique:', np.unique(data))
raise
pca: PCA = PCA(n_components=numComp)
princComp: np.ndarray = pca.fit_transform(np.transpose(standardizedData))
# print(f'pca explained variance with {numComp} princ comps is {sum(pca.explained_variance_)}')
return princComp
def do_DBSCAN_clustering(data: np.ndarray, eps: float = 0.1, min_samples: int = 10) -> tuple:
"""
Does DBSCAN clustering and finds noisy data
:param data: The input array
:param eps:
:param min_samples:
:return: Cluster labels for each point in the dataset given to fit(). Noisy samples are given the label -1.
"""
assert data.shape[1] == 2
standardizedData = StandardScaler().fit_transform(data)
db = DBSCAN(eps=eps, min_samples=min_samples).fit(standardizedData)
return db.labels_, db.core_sample_indices_
def get_n_points_closest_to_point(points: np.ndarray, n: int, refPoint: np.ndarray) -> list:
"""
Returns a list with indices of n points that are closest to the indicated refPoint
:param points: np.ndarray, cols: x, y, rows: individual points
:param n: number of points to return
:param refPoint: np.array([x, y]) of reference point
:return: list of point indices
"""
distancesToPoints: np.ndarray = np.linalg.norm(points - refPoint, axis=1)
sortedIndices = np.argsort(distancesToPoints)
return list(sortedIndices[:n])
def get_particle_featurematrix(particleContainer: ParticleContainer) -> np.ndarray:
"""
:return: np.ndarray, numRows: Features, numCols: Particles
"""
vectors: list = []
for particle in particleContainer.particles:
vectors.append(get_characteristic_vector(particle))
vectors: np.ndarray = np.array(vectors)
assert vectors.shape[0] == len(particleContainer.particles)
return vectors
def get_characteristic_vector(particle: pm.Particle) -> np.ndarray:
vector: list = []
# vector += list(get_log_hu_moments(particle.contour))
vector.append(float(get_color_index(particle.color)))
vector.append(get_solidity(particle.contour))
vector.append(get_aspect_ratio(particle.contour))
vector.append(get_extent(particle.contour))
vector.append(cv2.contourArea(particle.contour))
# vector.append(get_shape_index(particle.shape))
# vector.append(cv2.arcLength(particle.contour, True))
# vector: np.ndarray = np.hstack((log_hu, color))
# if len(vector) != 11:
# print('error')
# assert len(vector) == 7 + 4, f'wrong feature vector: {vector} with shape: {vector.shape}'
return np.array(vector)
def get_solidity(contour: np.ndarray) -> float:
area: float = cv2.contourArea(contour)
hull: np.ndarray = cv2.convexHull(contour)
hull_area: float = cv2.contourArea(hull)
if area == 0 or hull_area == 0:
solidity: float = 0
else:
solidity: float = area / hull_area
return solidity
def get_aspect_ratio(contour: np.ndarray) -> float:
if contour.shape[0] >= 5: ##at least 5 points required for ellipse fitting...
ellipse = cv2.fitEllipse(contour)
short, long = ellipse[1]
else:
rect = cv2.minAreaRect(contour)
long, short = rect[1]
if short > long:
long, short = short, long
aspectRatio: float = 1.0
if short > 0.0:
aspectRatio = long/short
return aspectRatio
def get_extent(contour: np.ndarray) -> float:
area: float = float(cv2.contourArea(contour))
x, y, w, h = cv2.boundingRect(contour)
rect_area: float = w * h
extent: float = area / rect_area
return extent
def get_log_hu_moments(contour: np.ndarray) -> np.ndarray:
moments: dict = cv2.moments(contour)
resultMoments: np.ndarray = np.zeros((7, 1))
for index, mom in enumerate(cv2.HuMoments(moments)):
if mom != 0:
resultMoments[index] = -1 * np.copysign(1.0, mom) * np.log10(abs(mom))
else:
resultMoments[index] = 0
return resultMoments[:, 0]
def get_color_hash(color: str, desiredLength: int = 4) -> np.ndarray:
colorArray: list = [int(i) for i in str(abs(hash(color)))[:desiredLength]]
return np.transpose(np.array(colorArray))
def get_color_index(color: str) -> int:
colors: list = ['transparent', 'green', 'brown', 'non-determinable', 'undedetermined', 'grey',
'red', 'yellow', 'white', 'blue']
assert color in colors, f'color not found: {color}'
return colors.index(color)
# def get_shape_index(shape: str) -> int:
# shapes: list = ['spherule', 'fibre', 'flake', 'irregular']
# assert shape in shapes
# return shapes.index(shape)
class TrainedSubsampling(SubsamplingMethod):
def __init__(self, particleContainer: ParticleContainer, desiredFraction: float,
path: str = r'C:\Users\xbrjos\Desktop\Python\Subsampling\chemometrics\RandomForestClassifier, score 0.72.pkl'):
super(TrainedSubsampling, self).__init__(particleContainer, desiredFraction)
self.score: float = None
self.clf = None
self.clfPath: str = path
self.fraction = desiredFraction
# @property
# def fraction(self) -> float:
# return self.desiredFraction/2
def equals(self, otherMethod) -> bool:
isEqual: bool = False
if type(otherMethod) == TrainedSubsampling and otherMethod.fraction == self.fraction:
if otherMethod.score == self.score and otherMethod.clf is self.clf:
isEqual = True
return isEqual
@property
def label(self) -> str:
return 'Trained Random Sampling'
def get_maximum_achievable_fraction(self) -> float:
return 1.0
def apply_subsampling_method(self) -> list:
self._load_classifier()
features: np.ndarray = get_particle_featurematrix(self.particleContainer)
predictions: np.ndarray = self.clf.predict(features)
indicesToSelect: set = self._get_measure_indices(list(predictions))
selectedParticles: list = []
for particle in self.particleContainer.particles:
if particle.index in indicesToSelect:
selectedParticles.append(particle)
return selectedParticles
# def _make_subparticles_match_fraction(self, subParticles: list) -> list:
# return subParticles
def _load_classifier(self) -> None:
assert os.path.exists(self.clfPath)
fname: str = self.clfPath
with open(fname, "rb") as fp:
self.clf = pickle.load(fp)
name: str = fname.split('.pkl')[0]
name: str = name.split('score')[1]
self.score = float(name)
def _get_measure_indices(self, predictedAssignments: list) -> set:
indicesToMeasure: set = set([])
assignments: np.ndarray = np.array(predictedAssignments)
mpIndices: list = list(np.where(assignments == 1)[0])
nonMpIndices: list = list(np.where(assignments == 0)[0])
numEstimMPParticles: int = len(mpIndices)
numParticlesToMeasure = round(len(predictedAssignments) * self.fraction)
if numParticlesToMeasure <= numEstimMPParticles:
indicesToMeasure = set(sample(mpIndices, numParticlesToMeasure))
else:
remainingIndices: int = int(numParticlesToMeasure - numEstimMPParticles)
indicesToMeasure = set(mpIndices + sample(nonMpIndices, remainingIndices))
assert len(indicesToMeasure) == numParticlesToMeasure
return indicesToMeasure
def get_theoretic_frac(self) -> float:
"""
The theoretical fraction that considers also the scoring of the trained model.
It is used for extrapolating the mpCount of the subsampled particle list.
:return:
"""
score: float = self.score
diff: float = 1/self.fraction - 1 # i.e., from 50 % score to 100 % score
factor: float = 1 + (1 - score)/0.5 * diff
return 1 / factor
# return self.fraction
# class ChemometricSubsampling(SubsamplingMethod):
# # def __init__(self, particleContainer: ParticleContainer, desiredFraction: float):
# # super(ChemometricSubsampling, self).__init__(particleContainer, desiredFraction)
#
# @property
# def label(self) -> str:
# return 'Chemometric Selection'
#
# def apply_subsampling_method(self) -> list:
# vectors: np.ndarray = get_particle_featurematrix(self.particleContainer)
# try:
# princComps: np.ndarray = get_pca(vectors)
# except ValueError:
# print('numParticles:', len(self.particleContainer.particles))
# print('input featurematrix shape', vectors.shape)
# clusterLabels, coreIndices = do_DBSCAN_clustering(princComps)
# indices: list = self._get_indices_from_clusterLabels(princComps, clusterLabels, coreIndices)
#
# selectedParticles: list = []
# for particle in self.particleContainer.particles:
# if particle.index in indices:
# selectedParticles.append(particle)
#
# return selectedParticles
#
# def _get_particle_featurematrix(self) -> np.ndarray:
# """
# :return: np.ndarray, numRows: Particles, numCols: Features
# """
# vectors: list = []
# for particle in self.particleContainer.particles:
# # extractor: FeatureExtractor = FeatureExtractor(particle)
# vectors.append(extractor.get_characteristic_vector())
# vectors: np.array(vectors)
# # assert vectors.shape == (11, len(self.particleContainer.particles)), f'wrong featureMat-shape: {vectors.shape}'
# return vectors
#
# def equals(self, otherMethod) -> bool:
# equals: bool = False
# if type(otherMethod) == ChemometricSubsampling and otherMethod.fraction == self.fraction:
# equals = True
# return equals
#
# def _get_indices_from_clusterLabels(self, points: np.ndarray, labels: np.ndarray, centerIndices: np.ndarray) -> list:
# indices: list = []
# allIndices: np.ndarray = np.arange(len(labels))
# numPointsPerCluster: dict = self._get_numPoints_per_cluster(labels)
#
# for clusterIndex in set(labels):
# indToAppend: list = []
# nPoints: int = int(numPointsPerCluster[clusterIndex])
# indicesInCluster: np.ndarray = allIndices[labels == clusterIndex]
# if clusterIndex == -1:
# for ind in sample(list(indicesInCluster), nPoints):
# # assert ind not in indices
# indices.append(ind)
# else:
# clusterPoints: np.ndarray = points[indicesInCluster]
# centerPoint: np.ndarray = np.mean(clusterPoints, axis=0)
# indicesToSelect: list = get_n_points_closest_to_point(clusterPoints, nPoints, centerPoint)
# for ind in indicesToSelect:
# origInd = indicesInCluster[ind]
# indices.append(origInd)
#
# assert len(set(indices)) == len(indices), f'The calculated indices contain duplicates, ' \
# f'num duplicates: {len(indices) - len(set(indices))}'
# return indices
#
# def _get_numPoints_per_cluster(self, labels: np.ndarray, noiseAmpFactor: float = 5) -> dict:
# """
# MP Particles are expected to be the minority of all particles. So, if datapoints were classified as noise
# (i.e., label = -1), it is likely that MP is in there. The abundancy of points taken from the noise is multiplied
# by the noiseAmpFactor
# :param labels:
# :param noiseAmpFactor:
# :return: A dictionary with keys = cluster index (i.e., label) and value = number of points to take from that
# """
# pointsPerCluster: dict = {}
# if type(labels) != np.ndarray:
# labels = np.array(labels)
# individualLabels: set = set(labels)
# numPointsToSelect = round(len(labels) * self.fraction)
# if numPointsToSelect == 0:
# numPointsToSelect = 1
#
# numNoisePoints = len(labels[labels == -1])
# numClusteredPoints = len(labels) - numNoisePoints
#
# # # get max noiseAmpFactor
# if noiseAmpFactor > 1/self.fraction:
# noiseAmpFactor = 1/self.fraction
#
# numAmpPoints = numClusteredPoints + numNoisePoints*noiseAmpFactor
# fractionPerCluster = np.clip(numPointsToSelect / numAmpPoints, 0.0, 1.0)
#
# tooFewPoints = numPointsToSelect < len(individualLabels)
#
# totalPointsAdded = 0
# for ind in individualLabels:
# if ind > -1:
#
# if not tooFewPoints:
# pointsToAdd = round(fractionPerCluster * len(labels[labels == ind]))
# else:
# pointsToAdd = 1 if totalPointsAdded < numPointsToSelect else 0
#
# pointsPerCluster[ind] = pointsToAdd
# totalPointsAdded += pointsToAdd
#
# # fill up the rest with noisePoints
# if numNoisePoints > 0:
# diff: float = np.clip(numPointsToSelect - totalPointsAdded, 0, numNoisePoints)
# pointsPerCluster[-1] = diff
# totalPointsAdded += diff
#
# # just in case too many points were selected (due to rounding errors), keep on deleting until it matches
# while totalPointsAdded > numPointsToSelect:
# indexWithHighestCount = None
# maxCount = 0
# for index in pointsPerCluster.values():
# if pointsPerCluster[index] > maxCount:
# maxCount = pointsPerCluster[index]
# indexWithHighestCount = index
#
# pointsPerCluster[indexWithHighestCount] -= 1
# totalPointsAdded -= 1
#
# if not abs(totalPointsAdded - numPointsToSelect) <= 1:
# print('error')
# assert abs(totalPointsAdded - numPointsToSelect) <= 1
# for clusterIndex in pointsPerCluster.keys():
# assert 0 <= pointsPerCluster[clusterIndex] <= len(labels[labels == clusterIndex])
# return pointsPerCluster
\ No newline at end of file
import matplotlib.pyplot as plt
import numpy as np
from random import sample
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.feature_selection import SelectKBest
from sklearn.feature_selection import chi2
import pickle
import time
import sys
sys.path.append("C://Users//xbrjos//Desktop//Python")
from gepard import dataset
from gepard.analysis.particleContainer import ParticleContainer
from input_output import get_pkls_from_directory
from chemometricMethods import get_log_hu_moments, get_color_index, get_pca, get_characteristic_vector
from evaluation import is_MP_particle
def test_classification_models(dataset: tuple) -> None:
names = ["RandomForestClassifier", "NeuralNetClassifier"]
classifiers = [
RandomForestClassifier(n_estimators=1000),
MLPClassifier(alpha=1, max_iter=1000)]
t0 = time.time()
# preprocess dataset, split into training and test part
X, y = dataset
X = StandardScaler().fit_transform(X)
X_train, X_test, y_train, y_test = \
train_test_split(X, y, test_size=.3, random_state=42)
print(f'prepocessng finished after {round(time.time()-t0, 2)} seconds')
# iterate over classifiers
for name, clf in zip(names, classifiers):
t0 = time.time()
clf.fit(X_train, y_train)
print(f'fitting {name} took {round(time.time()-t0, 2)} seconds')
t0 = time.time()
score = clf.score(X_test, y_test)
with open(f'{name}, score {round(score, 2)}.pkl', "wb") as fp:
pickle.dump(clf, fp, protocol=-1)
y_predicted = clf.predict(X_test)
print(f'finished getting score and prediction after {round(time.time() - t0, 2)} seconds')
errors: dict = {int(k): 0 for k in np.unique(y_test)}
for j in range(len(y_predicted)):
if y_test[j] != y_predicted[j]:
errors[y_test[j]] += 1
print(f'{name} with test size {len(y_test)} has score {round(score, 2)}, errors: {errors}')
if __name__ == '__main__':
recreateNew: bool = False
if recreateNew:
pklsInFolders: dict = get_pkls_from_directory(r'C:\Users\xbrjos\Desktop\temp MP\NewDatasets')
X: list = []
y: list = []
counter = 0
for folder in pklsInFolders.keys():
for pklPath in pklsInFolders[folder]:
if counter < 100:
dset: dataset.DataSet = dataset.loadData(pklPath)
print('loaded', dset.name)
partContainer: ParticleContainer = dset.particleContainer
for particle in partContainer.particles:
features: np.ndarray = get_characteristic_vector(particle)
# features: list = [abs(i) for i in get_log_hu_moments(particle.contour)]
# features.append(get_color_index(particle.color))
X.append(features)
y.append(int(is_MP_particle(particle)))
counter += 1
X: np.ndarray = np.array(X)
y: np.ndarray = np.array(y)
MPindices: np.ndarray = np.where(y == 1)[0]
nonMPindices: np.ndarray = np.where(y == 0)[0]
nonMPindices: list = sample(list(nonMPindices), len(MPindices))
X_MP: list = list(X[MPindices])
y_MP: list = list(y[MPindices])
X_nonMP: list = list(X[nonMPindices])
y_nonMP: list = list(y[nonMPindices])
assert set(y_MP) == {1}
assert set(y_nonMP) == {0}
assert len(X_MP) == len(X_nonMP) == len(y_MP) == len(y_nonMP)
X_equalized: np.ndarray = np.array(X_MP + X_nonMP)
y_equalized: np.ndarray = np.array(y_MP + y_nonMP)
dset: tuple = (X_equalized, y_equalized)
with open('particleClassificaion.pkl', "wb") as fp:
pickle.dump(dset, fp, protocol=-1)
else:
with open('particleClassificaion.pkl', "rb") as fp:
dset: tuple = pickle.load(fp)
X, y = dset
with open(r'C:\Users\xbrjos\Desktop\Python\Subsampling\chemometrics\RandomForestClassifier, score 0.72.pkl', "rb") as fp:
clf: RandomForestClassifier = pickle.load(fp)
y_predicted = clf.predict(X)
# np.savetxt('Data.txt', X)
# np.savetxt('Assignments.txt', y)
# princComps = get_pca(X.transpose(), numComp=2)
#
# plt.scatter(princComps[:, 0], princComps[:, 1])
# print(X_equalized.shape)
# X: np.ndarray = SelectKBest(chi2, k=5).fit_transform(X, y)
# print(X_equalized.shape)
# test_classification_models((X, y))
......@@ -3,33 +3,35 @@ cimport numpy as np
cimport numpy.random
cimport cython
from libc.math cimport sin, cos, round, abs
from libc.stdlib cimport rand, srand, RAND_MAX
DTYPE = np.float
ctypedef np.int32_t INT32_t
cdef get_random_topleft(double maxDist, double maxAngle, double radius, double boxSize):
cdef double angle, dist, x, y
cdef np.ndarray[INT32_t, ndim=1] newTopLeft
dist = np.random.rand() * maxDist
angle = np.random.rand() * maxAngle
newTopLeft = np.empty(2, dtype=np.int32)
x = dist*np.cos(angle) + radius - boxSize/2
y = dist*np.sin(angle) + radius - boxSize/2
newTopLeft[0] = np.int32(np.round(x))
newTopLeft[1] = np.int32(np.round(y))
cdef int newTopLeft[2]
dist = rand() / 32767.0 * maxDist
angle = rand() / 32767.0 * maxAngle
x = dist*cos(angle) + radius - boxSize/2
y = dist*sin(angle) + radius - boxSize/2
newTopLeft[0] = int(round(x))
newTopLeft[1] = int(round(y))
return newTopLeft
def get_random_topLefts(int numBoxes, double boxSize, double radius, double maxAngle, int seed=1337, int maxTries=50):
cdef np.ndarray[INT32_t, ndim=2] topLefts
cdef np.ndarray[INT32_t, ndim=1] newTopLeft
cdef int newTopLeft[2]
cdef double maxDist
cdef int outerCounter, counter, x, y, i, j, diffX, diffY, successfullyAdded
cdef bint validSolutionFound, boxOverlaps
np.random.seed(seed)
srand(42) # setting seed
assert RAND_MAX == 32767 # this value is used in the random-function above. For performance-reasons, it is directly typed in there as a number
maxDist = radius - np.sqrt((boxSize/2)**2 + (boxSize/2)**2)
outerCounter = 0
validSolutionFound = False
......@@ -47,8 +49,8 @@ def get_random_topLefts(int numBoxes, double boxSize, double radius, double maxA
newTopLeft = get_random_topleft(maxDist, maxAngle, radius, boxSize)
boxOverlaps = False
for j in range(i):
diffX = abs(np.float(newTopLeft[0] - np.float(topLefts[j, 0])))
diffY = abs(np.float(newTopLeft[1] - np.float(topLefts[j, 1])))
diffX = abs(newTopLeft[0] - topLefts[j, 0])
diffY = abs(newTopLeft[1] - topLefts[j, 1])
if diffX < boxSize and diffY < boxSize:
boxOverlaps = True
......
......@@ -18,7 +18,7 @@ from gepard.analysis.particleAndMeasurement import Particle
from helpers import ParticleBinSorter
import methods as meth
import geometricMethods as gmeth
import chemometricMethods as cmeth
from chemometrics import chemometricMethods as cmeth
from datasetOperations import ParticleVariations
......@@ -26,14 +26,14 @@ def get_name_from_directory(dirPath: str) -> str:
return str(os.path.basename(dirPath).split('.')[0])
def get_methods_to_test(dataset: dataset.DataSet, fractions: list = []) -> list:
def get_methods_to_test(dataset: dataset.DataSet, fractions: list = [], maxTries: int = 100) -> list:
"""
:param fraction: The desired fraction to measure
:return: list of measurement Objects that are applicable
"""
if len(fractions) == 0:
fractions: list = [0.02, 0.05, 0.1, 0.25, 0.5, 0.7, 0.9]
# fractions: list = [0.02, 0.1, 0.5, 0.9]
# fractions: list = [0.02, 0.05, 0.1, 0.25, 0.5, 0.7, 0.9]
fractions: list = [0.1, 0.3, 0.5]
methods: list = []
particleContainer = dataset.particleContainer
......@@ -44,8 +44,9 @@ def get_methods_to_test(dataset: dataset.DataSet, fractions: list = []) -> list:
boxCreator: gmeth.BoxSelectionCreator = gmeth.BoxSelectionCreator(dataset)
methods += boxCreator.get_crossBoxSubsamplers_for_fraction(fraction)
methods += boxCreator.get_spiralBoxSubsamplers_for_fraction(fraction)
methods += boxCreator.get_randomBoxSubsamplers_for_fraction(fraction)
methods += boxCreator.get_randomQuarterBoxSubsamplers_for_fraction(fraction)
methods += boxCreator.get_randomBoxSubsamplers_for_fraction(fraction, maxTries=maxTries)
methods += boxCreator.get_randomQuarterBoxSubsamplers_for_fraction(fraction, maxTries=maxTries)
methods.append(cmeth.TrainedSubsampling(particleContainer, fraction))
# methods.append(cmeth.ChemometricSubsampling(particleContainer, fraction))
return methods
......@@ -91,27 +92,34 @@ class TotalResults(object):
return newResult
def update_all(self, force: bool = False) -> None:
def update_all(self, force: bool = False, multiprocessing: bool = True) -> None:
"""
Updates all samples with all methods and all fractions
:param force: Wether to force an update of an already existing method.
:param force: Whether to force an update of an already existing method.
:param multiprocessing: Whether to spawn multiple processes for computation
:return:
"""
forceList: list = [force]*len(self.sampleResults)
indices: list = list(np.arange(len(self.sampleResults)))
numSamples: int = len(forceList)
numWorkers: int = 4 # in case of quadcore processor that seams reasonable??
chunksize: int = int(round(numSamples / numWorkers * 0.7)) # we want to have slightly more chunks than workers
print(f'multiprocessing with {numSamples} samples and chunksize of {chunksize}')
with concurrent.futures.ProcessPoolExecutor() as executor:
results = executor.map(update_sample, self.sampleResults, forceList, indices, chunksize=chunksize)
for index, res in enumerate(results):
updatedSample, processid = res
print(f'returned from process {processid}, iteration index {index}')
self.sampleResults[index] = updatedSample
if multiprocessing:
forceList: list = [force]*len(self.sampleResults)
indices: list = list(np.arange(len(self.sampleResults)))
numSamples: int = len(forceList)
numWorkers: int = 4 # in case of quadcore processor that seams reasonable??
chunksize: int = int(round(numSamples / numWorkers * 0.7)) # we want to have slightly more chunks than workers
print(f'multiprocessing with {numSamples} samples and chunksize of {chunksize}')
with concurrent.futures.ProcessPoolExecutor() as executor:
results = executor.map(update_sample, self.sampleResults, forceList, indices, chunksize=chunksize)
for index, res in enumerate(results):
updatedSample, processid = res
print(f'returned from process {processid}, iteration index {index}')
self.sampleResults[index] = updatedSample
else:
for index, sampleResult in enumerate(self.sampleResults):
updatedResult, i = update_sample(sampleResult, True, index)
self.sampleResults[index] = updatedResult
print(f'done updating {updatedResult.dataset.name} at index {index}')
def get_error_vs_fraction_data(self, attributes: list = [], methods: list = []) -> dict:
"""
......@@ -200,7 +208,12 @@ class SubsamplingResult(object):
:param subParticles:
:return:
"""
error: float = self._get_mp_count_error(origParticles, subParticles, self.method.fraction)
if type(self.method) == cmeth.TrainedSubsampling:
fraction = self.method.get_theoretic_frac()
else:
fraction = self.method.fraction
error: float = self._get_mp_count_error(origParticles, subParticles, fraction)
self.origParticleCount = len(origParticles)
self.mpCountErrors.append(error)
......
......@@ -169,11 +169,12 @@ class BoxSelectionCreator(object):
return spiralBoxSubsamplers
def get_randomBoxSubsamplers_for_fraction(self, desiredFraction: float) -> list:
def get_randomBoxSubsamplers_for_fraction(self, desiredFraction: float, maxTries: int = 100) -> list:
randomBoxSamplers: list = []
diameter, offset = self._get_diameter_and_offset()
randomBoxSampler: RandomBoxSampling = RandomBoxSampling(None, desiredFraction)
randomBoxSampler.maxTries = maxTries
randomBoxSampler.update_max_fractions()
for numBoxes in randomBoxSampler.possibleBoxNumbers:
randomBoxSampler.numBoxes = numBoxes
......@@ -186,10 +187,11 @@ class BoxSelectionCreator(object):
return randomBoxSamplers
def get_randomQuarterBoxSubsamplers_for_fraction(self, desiredFraction: float) -> list:
def get_randomQuarterBoxSubsamplers_for_fraction(self, desiredFraction: float, maxTries: int = 100) -> list:
randomBoxSamplers: list = []
diameter, offset = self._get_diameter_and_offset()
randomBoxSampler: RandomQuarterBoxes = RandomQuarterBoxes(None, desiredFraction)
randomBoxSampler.maxTries = maxTries
randomBoxSampler.update_max_fractions()
for numBoxes in randomBoxSampler.possibleBoxNumbers:
......@@ -394,57 +396,14 @@ class RandomBoxSampling(BoxSelectionSubsamplingMethod):
return equals
def get_topLeft_of_boxes(self) -> list:
#
# valid, topLefts = randoms.get_random_topLefts(self.numBoxes, self.boxSize,
# self.filterDiameter/2, self.__maxAngle,
# seed=self.randomSeed, maxTries=self.maxTries)
#
# if not valid:
# raise AttributeError
#
# topLefts: list = [[topLefts[i, 0], topLefts[i, 1]] for i in range(topLefts.shape[0])]
#
def get_random_topleft() -> list:
angle = np.random.rand() * self.__maxAngle
dist = np.random.rand() * maxDist
x: float = dist * np.cos(angle) + radius - boxSize / 2
y: float = dist * np.sin(angle) + radius - boxSize / 2
return [x, y]
np.random.seed(self.randomSeed)
topLefts: list = []
boxSize: float = self.boxSize
radius: float = self.filterDiameter / 2
maxDist: float = radius - np.sqrt((boxSize / 2) ** 2 + (boxSize / 2) ** 2)
outerCounter: int = 0
validSolutionFound: bool = False
while not validSolutionFound and outerCounter < self.maxTries:
topLefts = []
for i in range(self.numBoxes):
if i == 0:
topLefts.append(get_random_topleft())
else:
counter: int = 0
while counter < 50:
newTopLeft: list = get_random_topleft()
for topLeft2 in topLefts:
if box_overlaps_other_box(newTopLeft, topLeft2, boxSize):
break
else: # i.e., if no break occurred
topLefts.append(newTopLeft)
break
counter += 1
if len(topLefts) == self.numBoxes:
validSolutionFound = True
else:
outerCounter += 1
if not validSolutionFound:
valid, topLefts = randoms.get_random_topLefts(self.numBoxes, self.boxSize,
self.filterDiameter/2, self.__maxAngle,
seed=self.randomSeed, maxTries=self.maxTries)
if not valid:
raise AttributeError
topLefts: list = [[topLefts[i, 0], topLefts[i, 1]] for i in range(topLefts.shape[0])]
return topLefts
......
......@@ -2,7 +2,7 @@ import os
import pickle
from evaluation import TotalResults
from helpers import timingDecorator
from chemometrics.chemometricMethods import TrainedSubsampling
def load_results(fname: str) -> TotalResults:
res: TotalResults = None
......@@ -17,6 +17,10 @@ def save_results(fname: str, result: TotalResults) -> None:
for sampleRes in result.sampleResults:
storedDsets[sampleRes.sampleName] = sampleRes.dataset
sampleRes.dataset = None
for subsamplingRes in sampleRes.results:
subsamplingRes.method.particleContainer = None
if type(subsamplingRes.method) == TrainedSubsampling:
subsamplingRes.method.clf = None
with open(fname, "wb") as fp:
pickle.dump(result, fp, protocol=-1)
......
......@@ -18,6 +18,15 @@ class SubsamplingMethod(object):
self.particleContainer = particleConatainer
self.fraction: float = desiredFraction
# @property
# def fraction(self) -> float:
# """
# The TrainedSubsampling, e.g., changes its fraction depending on the quality of its training.
# All "regular" methods just return the desired Fraction.
# :return:
# """
# return self.desiredFraction
@property
def label(self) -> str:
"""
......
......@@ -14,25 +14,27 @@ SET GEPARD TO EVALUATION BRANCH (WITHOUT THE TILING STUFF), OTHERWISE SOME OF TH
if __name__ == '__main__':
results: TotalResults = TotalResults()
pklsInFolders = get_pkls_from_directory(r'C:\Users\xbrjos\Desktop\temp MP\NewDatasets')
counter = 0
for folder in pklsInFolders.keys():
for samplePath in pklsInFolders[folder]:
newSampleResult: SampleResult = results.add_sample(samplePath)
for attr in get_attributes_from_foldername(folder):
newSampleResult.set_attribute(attr)
if counter < 10:
newSampleResult: SampleResult = results.add_sample(samplePath)
for attr in get_attributes_from_foldername(folder):
newSampleResult.set_attribute(attr)
counter += 1
t0 = time.time()
results.update_all()
results.update_all(multiprocessing=False)
print('updating all took', time.time()-t0, 'seconds')
save_results('results2.res', results)
# results: TotalResults = load_results('results1.res')
save_results('results_test.res', results)
# results: TotalResults = load_results('results2.res')
plot: Figure = get_error_vs_frac_plot(results, attributes=[[]],
methods=[['random', 'trained']], standarddevs=True)
# plot: Figure = get_error_vs_frac_plot(results, attributes=[['air', 'water'], ['sediment', 'soil', 'beach', 'slush']],
# methods=[['Boxes random']]*2)
# methods=[['random layout (7', 'random layout (1']]*2)
# methods=[[]]*2)
# methods=[['Random Subsampling', 'Sizebin']] * 2)
plot: Figure = get_error_vs_frac_plot(results,
attributes=[['air', 'water'], ['sediment', 'soil', 'beach', 'slush']],
methods=[['layout (7', 'layout (10', 'layout (15', 'cross', 'random subsampling', 'sizebin']] * 2)
# methods=[['layout (7', 'layout (10', 'layout (15', 'cross', 'random subsampling', 'sizebin']] * 2)
plot.show()
import numpy as np
import random
import sys
sys.path.append("C://Users//xbrjos//Desktop//Python")
import gepard
from gepard.dataset import DataSet
from gepard.analysis.particleContainer import ParticleContainer
from gepard.analysis.particleAndMeasurement import Particle, Measurement
def setMaxDim(dataset: DataSet, imgSize: float, minX: float, maxX: float, minY: float, maxY: float) -> None:
......@@ -27,5 +29,46 @@ def get_default_ParticleContainer() -> ParticleContainer:
x = 10*i
contours.append(np.array([[[x, 0]], [[x+10, 0]], [[x+10, 10]], [[x, 10]]], dtype=np.int32))
particleContainer.setParticleContours(contours)
particleContainer.particles[0].color = 'red'
particleContainer.particles[1].color = 'blue'
particleContainer.particles[2].color = 'green'
particleContainer.particles[3].color = 'transparent'
return particleContainer
def get_MP_particles(numParticles) -> list:
mpParticles = []
for _ in range(numParticles):
mpParticles.append(get_MP_particle())
return mpParticles
def get_non_MP_particles(numParticles) -> list:
nonMPParticles = []
for _ in range(numParticles):
nonMPParticles.append(get_non_MP_particle())
return nonMPParticles
def get_MP_particle() -> Particle:
random.seed(15203018)
polymerNames = ['Poly (methyl methacrylate',
'Polyethylene',
'Silicone rubber',
'PB15',
'PY13',
'PR20']
polymName = random.sample(polymerNames, 1)[0]
newParticle: Particle = Particle()
newMeas = Measurement()
newMeas.setAssignment(polymName)
newParticle.addMeasurement(newMeas)
return newParticle
def get_non_MP_particle() -> Particle:
newParticle: Particle = Particle()
newParticle.addMeasurement(Measurement())
return newParticle
......@@ -2,6 +2,9 @@ import unittest
import cv2
import numpy as np
import sys
# import os
from sklearn.ensemble import RandomForestClassifier
from sklearn.neural_network import MLPClassifier
import matplotlib.pyplot as plt
from sklearn.datasets import make_blobs
from sklearn.preprocessing import StandardScaler
......@@ -12,13 +15,12 @@ from gepard.analysis import particleAndMeasurement as pm
from gepard.analysis.particleContainer import ParticleContainer
from gepard import dataset
import chemometricMethods as cmeth
from chemometrics import chemometricMethods as cmeth
from helpers_for_test import get_default_ParticleContainer, get_non_MP_particles, get_MP_particles
from evaluation import SubsamplingResult
class TestFeatureExtractor(unittest.TestCase):
def setUp(self) -> None:
self.extractor: cmeth.FeatureExtractor = cmeth.FeatureExtractor(None)
class TestParticleFeatures(unittest.TestCase):
def test_get_contour_moments(self):
imgs = []
imgA: np.ndarray = np.zeros((200, 200), dtype=np.uint8)
......@@ -40,8 +42,7 @@ class TestFeatureExtractor(unittest.TestCase):
contours, hierarchy = cv2.findContours(img, 1, 2)
particle: pm.Particle = pm.Particle()
particle.contour = contours[0]
self.extractor.particle = particle
hu: np.ndarray = self.extractor._get_log_hu_moments()
hu: np.ndarray = cmeth.get_log_hu_moments(particle.contour)
moments[:, i] = hu
# The first six hu moments are supposed to be invariant to scale, rotation and translation
......@@ -54,116 +55,195 @@ class TestFeatureExtractor(unittest.TestCase):
for color in ['red', 'green', 'violet', 'blue', 'Blue', 'non-determinable', None]:
for numDigits in [4, 6, 8]:
hashNumber: int = abs(hash(color))
hashArray: np.ndarray = self.extractor._get_color_hash(color, numDigits)
hashArray: np.ndarray = cmeth.get_color_hash(color, numDigits)
self.assertEqual(len(hashArray), numDigits)
for i in range(numDigits):
self.assertEqual(hashArray[i], int(str(hashNumber)[i]))
def test_get_color_index(self):
colors: list = ['transparent', 'green', 'brown', 'non-determinable', 'undedetermined', 'grey',
'red', 'yellow', 'white', 'blue']
for index, color in enumerate(colors):
self.assertEqual(cmeth.get_color_index(color), index)
def test_get_particle_featurematrix(self):
particleContainer: ParticleContainer = get_default_ParticleContainer()
features: np.ndarray = cmeth.get_particle_featurematrix(particleContainer)
self.assertEqual(features.shape[0], len(particleContainer.particles))
class TestChemometricSubsampling(unittest.TestCase):
class TestTrainedSubsampling(unittest.TestCase):
def setUp(self) -> None:
self.particleContainer: ParticleContainer = ParticleContainer(None)
self.numParticles: int = 5
self.particleContainer.initializeParticles(self.numParticles)
img: np.ndarray = np.zeros((20, 20), dtype=np.uint8)
cv2.putText(img, 'A', (2, 2), fontFace=cv2.FONT_HERSHEY_SIMPLEX,
fontScale=1, color=1, thickness=2)
contours, hierarchy = cv2.findContours(img, 1, 2)
self.particleContainer.setParticleContours([contours[0] for _ in range(self.numParticles)])
self.chemSubs: cmeth.ChemometricSubsampling = cmeth.ChemometricSubsampling(self.particleContainer,
desiredFraction=0.1)
particleContainer: ParticleContainer = get_default_ParticleContainer()
self.trainedSampling: cmeth.TrainedSubsampling = cmeth.TrainedSubsampling(particleContainer, 0.1)
def test_get_particle_featurematrix(self):
features: np.ndarray = self.chemSubs._get_particle_featurematrix()
self.assertEqual(features.shape, (11, self.numParticles))
for i in range(6):
diff: np.ndarray = features[i, :] - np.mean(features[i, :])
self.assertFalse(np.any(diff > 0.1))
def test_label(self):
self.assertEqual(type(self.trainedSampling.label), str)
def test_load_classifier(self):
self.assertTrue(self.trainedSampling.clf is None)
self.assertTrue(self.trainedSampling.score is None)
self.trainedSampling.clfPath = r'C:\Users\xbrjos\Desktop\Python\Subsampling\chemometrics\RandomForestClassifier, score 0.72.pkl'
self.trainedSampling._load_classifier()
self.assertEqual(type(self.trainedSampling.clf), RandomForestClassifier)
self.assertEqual(self.trainedSampling.score, 0.72)
self.trainedSampling.clfPath = r'C:\Users\xbrjos\Desktop\Python\Subsampling\chemometrics\NeuralNetClassifier, score 0.7.pkl'
self.trainedSampling._load_classifier()
self.assertEqual(type(self.trainedSampling.clf), MLPClassifier)
self.assertEqual(self.trainedSampling.score, 0.7)
def test_get_measure_indices(self):
for mpFrac in [0.001, 0.01, 0.05]:
for numMPParticles in [1, 10, 100]:
numNonMP: int = int(numMPParticles * 1/mpFrac) - numMPParticles
predictedAssignments: list = [1]*numMPParticles + [0]*numNonMP # i.e., mpFrac *100 % mp particles
for frac in [0.01, 0.1, 0.5]:
self.trainedSampling.fraction = frac
indicesToMeasure: set = self.trainedSampling._get_measure_indices(predictedAssignments)
self.assertEqual(type(indicesToMeasure), set) # ensures no duplicates
def test_get_numPoints_per_cluster(self):
def get_orig_points_per_cluster(index):
return (index+1)*50
# numPointsPerCluster: int = 50
for frac in [0.01, 0.1, 0.5, 0.9]:
self.chemSubs.fraction = frac
for numClusters in [1, 5, 10]:
for numNoisePoints in [0, 10, 15]:
labels: list = []
for clusterIndex in range(numClusters):
# for _ in range(numPointsPerCluster):
for _ in range(get_orig_points_per_cluster(clusterIndex)):
labels.append(clusterIndex)
for _ in range(numNoisePoints):
labels.append(-1)
labels: np.ndarray = np.array(labels)
numTotal: int = len(labels)
origFrac: float = self.chemSubs.fraction
noiseAmpFactor = np.clip(5, 0, 1/frac)
pointsPerCluster: dict = self.chemSubs._get_numPoints_per_cluster(labels,
noiseAmpFactor=noiseAmpFactor)
numPointsToMeasure = round(numTotal*origFrac)
if numPointsToMeasure == 0:
numPointsToMeasure = 1
self.assertTrue(abs(sum(list(pointsPerCluster.values())) - numPointsToMeasure) <= 1)
if numNoisePoints == 0:
fractionPerCluster: float = frac
numParticlesToMeasure = round(len(predictedAssignments) * frac)
self.assertEqual(len(indicesToMeasure), numParticlesToMeasure)
if numParticlesToMeasure <= numMPParticles:
for index in range(numParticlesToMeasure): # all of the particles have to be MP particles
self.assertTrue(index < numParticlesToMeasure)
else:
# fractionPerCluster: float = numPointsToMeasure / (numClusters*numPointsPerCluster +
# numNoisePoints*noiseAmpFactor)
fractionPerCluster: float = numPointsToMeasure / (len(labels) - numNoisePoints +
numNoisePoints * noiseAmpFactor)
tooFewPoints = numPointsToMeasure < (numClusters + (1 if numNoisePoints > 0 else 0))
pointsFound: int = 0
for clusterIndex in pointsPerCluster.keys():
if clusterIndex > -1:
if not tooFewPoints:
pointsExpected = round(fractionPerCluster * get_orig_points_per_cluster(clusterIndex))
if pointsExpected == 0:
pointsExpected = 1
diff = abs(pointsPerCluster[clusterIndex] - pointsExpected)
self.assertTrue(diff <= 1)
else:
if pointsFound < numPointsToMeasure:
self.assertEqual(pointsPerCluster[clusterIndex], 1)
else:
self.assertEqual(pointsPerCluster[clusterIndex], 0)
pointsFound += pointsPerCluster[clusterIndex]
if numNoisePoints > 0:
self.assertTrue(abs(pointsPerCluster[-1] - (numPointsToMeasure - pointsFound)) <= 1)
def test_get_n_points_closest_to_center(self):
points: np.ndarray = np.array([[0, 0], [0, 1], [1, 0], [1, 1]])
refPoint: np.ndarray = np.array([0, 0])
closestPoints: list = cmeth.get_n_points_closest_to_point(points, 3, refPoint)
self.assertEqual(len(closestPoints), 3)
self.assertTrue(0 in closestPoints)
self.assertTrue(1 in closestPoints)
self.assertTrue(2 in closestPoints)
refPoint = np.array([2, 2])
closestPoints = cmeth.get_n_points_closest_to_point(points, 3, refPoint)
self.assertEqual(len(closestPoints), 3)
self.assertTrue(1 in closestPoints)
self.assertTrue(2 in closestPoints)
self.assertTrue(3 in closestPoints)
refPoint = np.array([2, 0.5])
closestPoints = cmeth.get_n_points_closest_to_point(points, 2, refPoint)
self.assertEqual(len(closestPoints), 2)
self.assertTrue(2 in closestPoints)
self.assertTrue(3 in closestPoints)
for index in range(numMPParticles): # all MP Particles should be measured
self.assertTrue(index in indicesToMeasure)
def test_get_theoretic_fraction(self):
for frac in [0.1, 0.3, 0.5, 0.9, 1.0]:
for score in [0.5, 0.7, 1.0]:
self.trainedSampling.fraction = frac
self.trainedSampling.score = score
score: float = self.trainedSampling.score
diff: float = 1 / self.trainedSampling.fraction - 1 # i.e., from 50 % score to 100 % score
factor: float = 1 + (1 - score) / 0.5 * diff
self.assertEqual(self.trainedSampling.get_theoretic_frac(), 1/factor)
# def test_make_subparticles_match_fraction(self):
# self.trainedSampling.desiredFraction = 0.5
# result: SubsamplingResult = SubsamplingResult(self.trainedSampling)
#
# allParticles: list = get_MP_particles(10) + get_non_MP_particles(990)
# subParticles: list = get_MP_particles(10) + get_non_MP_particles(490) # half of particles but ALL mp particles
# self.trainedSampling.particleContainer.particles = allParticles + subParticles
#
# self.trainedSampling.score = 1.0 # i.e., perfect prediction
# # modSubParticles: list = self.trainedSampling._make_subparticles_match_fraction(subParticles)
# result.add_result(subParticles, allParticles)
# self.assertEqual(result.mpCountError, 0)
#
# self.trainedSampling.score = 0.5 # i.e., completely random, no prediction quality
# # modSubParticles: list = self.trainedSampling._make_subparticles_match_fraction(subParticles)
# result.add_result(subParticles, allParticles)
# self.assertEqual(result.mpCountError, 100)
# class TestChemometricSubsampling(unittest.TestCase):
# def setUp(self) -> None:
# self.particleContainer: ParticleContainer = ParticleContainer(None)
# self.numParticles: int = 5
# self.particleContainer.initializeParticles(self.numParticles)
# img: np.ndarray = np.zeros((20, 20), dtype=np.uint8)
# cv2.putText(img, 'A', (2, 2), fontFace=cv2.FONT_HERSHEY_SIMPLEX,
# fontScale=1, color=1, thickness=2)
# contours, hierarchy = cv2.findContours(img, 1, 2)
# self.particleContainer.setParticleContours([contours[0] for _ in range(self.numParticles)])
# self.chemSubs: cmeth.ChemometricSubsampling = cmeth.ChemometricSubsampling(self.particleContainer,
# desiredFraction=0.1)
#
# def test_get_numPoints_per_cluster(self):
# def get_orig_points_per_cluster(index):
# return (index+1)*50
#
# # numPointsPerCluster: int = 50
# for frac in [0.01, 0.1, 0.5, 0.9]:
# self.chemSubs.fraction = frac
# for numClusters in [1, 5, 10]:
# for numNoisePoints in [0, 10, 15]:
# labels: list = []
# for clusterIndex in range(numClusters):
# # for _ in range(numPointsPerCluster):
# for _ in range(get_orig_points_per_cluster(clusterIndex)):
# labels.append(clusterIndex)
# for _ in range(numNoisePoints):
# labels.append(-1)
#
# labels: np.ndarray = np.array(labels)
# numTotal: int = len(labels)
# origFrac: float = self.chemSubs.fraction
#
# noiseAmpFactor = np.clip(5, 0, 1/frac)
# pointsPerCluster: dict = self.chemSubs._get_numPoints_per_cluster(labels,
# noiseAmpFactor=noiseAmpFactor)
# numPointsToMeasure = round(numTotal*origFrac)
# if numPointsToMeasure == 0:
# numPointsToMeasure = 1
#
# self.assertTrue(abs(sum(list(pointsPerCluster.values())) - numPointsToMeasure) <= 1)
#
# if numNoisePoints == 0:
# fractionPerCluster: float = frac
# else:
# # fractionPerCluster: float = numPointsToMeasure / (numClusters*numPointsPerCluster +
# # numNoisePoints*noiseAmpFactor)
# fractionPerCluster: float = numPointsToMeasure / (len(labels) - numNoisePoints +
# numNoisePoints * noiseAmpFactor)
#
# tooFewPoints = numPointsToMeasure < (numClusters + (1 if numNoisePoints > 0 else 0))
#
# pointsFound: int = 0
# for clusterIndex in pointsPerCluster.keys():
# if clusterIndex > -1:
# if not tooFewPoints:
# pointsExpected = round(fractionPerCluster * get_orig_points_per_cluster(clusterIndex))
# if pointsExpected == 0:
# pointsExpected = 1
#
# diff = abs(pointsPerCluster[clusterIndex] - pointsExpected)
# self.assertTrue(diff <= 1)
# else:
# if pointsFound < numPointsToMeasure:
# self.assertEqual(pointsPerCluster[clusterIndex], 1)
# else:
# self.assertEqual(pointsPerCluster[clusterIndex], 0)
#
# pointsFound += pointsPerCluster[clusterIndex]
#
# if numNoisePoints > 0:
# self.assertTrue(abs(pointsPerCluster[-1] - (numPointsToMeasure - pointsFound)) <= 1)
#
# def test_get_n_points_closest_to_center(self):
# points: np.ndarray = np.array([[0, 0], [0, 1], [1, 0], [1, 1]])
# refPoint: np.ndarray = np.array([0, 0])
# closestPoints: list = cmeth.get_n_points_closest_to_point(points, 3, refPoint)
# self.assertEqual(len(closestPoints), 3)
# self.assertTrue(0 in closestPoints)
# self.assertTrue(1 in closestPoints)
# self.assertTrue(2 in closestPoints)
#
# refPoint = np.array([2, 2])
# closestPoints = cmeth.get_n_points_closest_to_point(points, 3, refPoint)
# self.assertEqual(len(closestPoints), 3)
# self.assertTrue(1 in closestPoints)
# self.assertTrue(2 in closestPoints)
# self.assertTrue(3 in closestPoints)
#
# refPoint = np.array([2, 0.5])
# closestPoints = cmeth.get_n_points_closest_to_point(points, 2, refPoint)
# self.assertEqual(len(closestPoints), 2)
# self.assertTrue(2 in closestPoints)
# self.assertTrue(3 in closestPoints)
# def test_clustering(self):
# fname = r'C:\Users\xbrjos\Desktop\temp MP\190326_MCII_WWTP_SB_50_1\190326_MCII_WWTP_SB_50_1.pkl'
......
......@@ -7,18 +7,16 @@ Created on Wed Jan 22 13:58:25 2020
"""
import unittest
import random
import numpy as np
import sys
sys.path.append("C://Users//xbrjos//Desktop//Python")
import gepard
from gepard.analysis.particleContainer import ParticleContainer
from gepard.analysis.particleAndMeasurement import Particle, Measurement
from evaluation import TotalResults, SampleResult, SubsamplingResult, get_methods_to_test
import methods as meth
import geometricMethods as gmeth
from helpers_for_test import get_default_ParticleContainer, get_default_DataSet
from chemometrics.chemometricMethods import TrainedSubsampling
from helpers_for_test import get_default_ParticleContainer, get_default_DataSet, get_MP_particles, get_non_MP_particles, get_MP_particle, get_non_MP_particle
class TestTotalResults(unittest.TestCase):
......@@ -254,13 +252,13 @@ class TestSampleResult(unittest.TestCase):
<