...
 
Commits (14)
......@@ -6,3 +6,11 @@ __pycache__/
*.png
*.res
cythonModules/build/
*.c
*.pyd
*.html
import numpy as np
import cv2
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.cluster import DBSCAN
from scipy import spatial
from itertools import combinations
from random import sample
import sys
sys.path.append("C://Users//xbrjos//Desktop//Python")
from gepard.analysis.particleContainer import ParticleContainer
from gepard.analysis import particleAndMeasurement as pm
from methods import SubsamplingMethod
def get_pca(data: np.ndarray, numComp: int = 2) -> np.ndarray:
try:
standardizedData = StandardScaler().fit_transform(data.copy())
except ValueError:
print('first standardscaler attempt failed, retrying..')
print('datashape', data.shape)
print('unique:', np.unique(data))
raise
pca = PCA(n_components=numComp)
princComp: np.ndarray = pca.fit_transform(np.transpose(standardizedData))
return princComp
def do_DBSCAN_clustering(data: np.ndarray, eps: float = 0.1, min_samples: int = 10) -> tuple:
"""
Does DBSCAN clustering and finds noisy data
:param data: The input array
:param eps:
:param min_samples:
:return: Cluster labels for each point in the dataset given to fit(). Noisy samples are given the label -1.
"""
assert data.shape[1] == 2
standardizedData = StandardScaler().fit_transform(data)
db = DBSCAN(eps=eps, min_samples=min_samples).fit(standardizedData)
return db.labels_, db.core_sample_indices_
def get_n_points_closest_to_point(points: np.ndarray, n: int, refPoint: np.ndarray) -> list:
"""
Returns a list with indices of n points that are closest to the indicated refPoint
:param points: np.ndarray, cols: x, y, rows: individual points
:param n: number of points to return
:param refPoint: np.array([x, y]) of reference point
:return: list of point indices
"""
distancesToPoints: np.ndarray = np.linalg.norm(points - refPoint, axis=1)
sortedIndices = np.argsort(distancesToPoints)
return list(sortedIndices[:n])
class ChemometricSubsampling(SubsamplingMethod):
def __init__(self, particleContainer: ParticleContainer, desiredFraction: float):
super(ChemometricSubsampling, self).__init__(particleContainer, desiredFraction)
@property
def label(self) -> str:
return 'Chemometric Selection'
def apply_subsampling_method(self) -> list:
vectors: np.ndarray = self._get_particle_featurematrix()
try:
princComps: np.ndarray = get_pca(vectors)
except ValueError:
print('numParticles:', len(self.particleContainer.particles))
print('input featurematrix shape', vectors.shape)
clusterLabels, coreIndices = do_DBSCAN_clustering(princComps)
indices: list = self._get_indices_from_clusterLabels(princComps, clusterLabels, coreIndices)
selectedParticles: list = []
for particle in self.particleContainer.particles:
if particle.index in indices:
selectedParticles.append(particle)
return selectedParticles
def _get_particle_featurematrix(self) -> np.ndarray:
"""
:return: np.ndarray, numRows: Features, numCols: Particles
"""
vectors: list = []
for particle in self.particleContainer.particles:
extractor: FeatureExtractor = FeatureExtractor(particle)
vectors.append(extractor.get_characteristic_vector())
vectors: np.ndarray = np.transpose(np.array(vectors))
assert vectors.shape == (11, len(self.particleContainer.particles)), f'wrong featureMat-shape: {vectors.shape}'
return vectors
def equals(self, otherMethod) -> bool:
equals: bool = False
if type(otherMethod) == ChemometricSubsampling and otherMethod.fraction == self.fraction:
equals = True
return equals
def _get_indices_from_clusterLabels(self, points: np.ndarray, labels: np.ndarray, centerIndices: np.ndarray) -> list:
indices: list = []
allIndices: np.ndarray = np.arange(len(labels))
numPointsPerCluster: dict = self._get_numPoints_per_cluster(labels)
for clusterIndex in set(labels):
indToAppend: list = []
nPoints: int = int(numPointsPerCluster[clusterIndex])
indicesInCluster: np.ndarray = allIndices[labels == clusterIndex]
if clusterIndex == -1:
for ind in sample(list(indicesInCluster), nPoints):
# assert ind not in indices
indices.append(ind)
else:
clusterPoints: np.ndarray = points[indicesInCluster]
centerPoint: np.ndarray = np.mean(clusterPoints, axis=0)
indicesToSelect: list = get_n_points_closest_to_point(clusterPoints, nPoints, centerPoint)
for ind in indicesToSelect:
origInd = indicesInCluster[ind]
indices.append(origInd)
assert len(set(indices)) == len(indices), f'The calculated indices contain duplicates, ' \
f'num duplicates: {len(indices) - len(set(indices))}'
return indices
def _get_numPoints_per_cluster(self, labels: np.ndarray, noiseAmpFactor: float = 5) -> dict:
"""
MP Particles are expected to be the minority of all particles. So, if datapoints were classified as noise
(i.e., label = -1), it is likely that MP is in there. The abundancy of points taken from the noise is multiplied
by the noiseAmpFactor
:param labels:
:param noiseAmpFactor:
:return: A dictionary with keys = cluster index (i.e., label) and value = number of points to take from that
"""
pointsPerCluster: dict = {}
if type(labels) != np.ndarray:
labels = np.array(labels)
individualLabels: set = set(labels)
numPointsToSelect = round(len(labels) * self.fraction)
if numPointsToSelect == 0:
numPointsToSelect = 1
numNoisePoints = len(labels[labels == -1])
numClusteredPoints = len(labels) - numNoisePoints
# # get max noiseAmpFactor
if noiseAmpFactor > 1/self.fraction:
noiseAmpFactor = 1/self.fraction
numAmpPoints = numClusteredPoints + numNoisePoints*noiseAmpFactor
fractionPerCluster = np.clip(numPointsToSelect / numAmpPoints, 0.0, 1.0)
tooFewPoints = numPointsToSelect < len(individualLabels)
totalPointsAdded = 0
for ind in individualLabels:
if ind > -1:
if not tooFewPoints:
pointsToAdd = round(fractionPerCluster * len(labels[labels == ind]))
else:
pointsToAdd = 1 if totalPointsAdded < numPointsToSelect else 0
pointsPerCluster[ind] = pointsToAdd
totalPointsAdded += pointsToAdd
# fill up the rest with noisePoints
if numNoisePoints > 0:
diff: float = np.clip(numPointsToSelect - totalPointsAdded, 0, numNoisePoints)
pointsPerCluster[-1] = diff
totalPointsAdded += diff
# just in case too many points were selected (due to rounding errors), keep on deleting until it matches
while totalPointsAdded > numPointsToSelect:
indexWithHighestCount = None
maxCount = 0
for index in pointsPerCluster.values():
if pointsPerCluster[index] > maxCount:
maxCount = pointsPerCluster[index]
indexWithHighestCount = index
pointsPerCluster[indexWithHighestCount] -= 1
totalPointsAdded -= 1
if not abs(totalPointsAdded - numPointsToSelect) <= 1:
print('error')
# assert abs(totalPointsAdded - numPointsToSelect) <= 1
for clusterIndex in pointsPerCluster.keys():
assert 0 <= pointsPerCluster[clusterIndex] <= len(labels[labels == clusterIndex])
return pointsPerCluster
class FeatureExtractor(object):
def __init__(self, particle: pm.Particle):
super(FeatureExtractor, self).__init__()
self.particle: pm.Particle = particle
def get_characteristic_vector(self) -> np.ndarray:
log_hu: np.ndarray = self._get_log_hu_moments()
color: np.ndarray = self._get_color_hash(self.particle.color, desiredLength=4)
vector: np.ndarray = np.hstack((log_hu, color))
if len(vector) != 11:
print('error')
assert len(vector) == 7 + 4, f'wrong feature vector: {vector} with shape: {vector.shape}'
return vector
def _get_log_hu_moments(self) -> np.ndarray:
moments: dict = cv2.moments(self.particle.contour)
resultMoments: np.ndarray = np.zeros((7, 1))
for index, mom in enumerate(cv2.HuMoments(moments)):
if mom != 0:
resultMoments[index] = -1 * np.copysign(1.0, mom) * np.log10(abs(mom))
else:
resultMoments[index] = 0
return resultMoments[:, 0]
def _get_color_hash(self, color: str, desiredLength: int = 4) -> np.ndarray:
colorArray: list = [int(i) for i in str(abs(hash(color)))[:desiredLength]]
return np.transpose(np.array(colorArray))
import numpy as np
cimport numpy as np
cimport cython
DTYPE = np.float
ctypedef np.int32_t INT32_t
def rotate_contour_around_point(np.ndarray[INT32_t, ndim=3] contour, np.ndarray[INT32_t, ndim=1] refPoint, np.float angleDegree):
"""
Rotates a point around another one... All coordinates in pixel space (integers)
:param contour: Array of points to be rotated, [:, 0, 0] = x, [:, 0, 1] = y
:param refPoint: The referemce point around which the first point is rotated, tuple of x and y
:param angleDegree: The angle in degree to rotate (counter-clockwise)
:return: Array of the rotated point, [:, 0, 0] = x, [:, 0, 1] = y
"""
cdef int i
cdef double theta, sin, cos, x, y
cdef np.ndarray[INT32_t, ndim=3] newContour
theta = np.deg2rad(angleDegree)
sin = np.sin(theta)
cos = np.cos(theta)
newContour = np.zeros_like(contour)
for i in range(contour.shape[0]):
x = cos * (contour[i, 0, 0]-refPoint[0]) - sin * (contour[i, 0, 1]-refPoint[1]) + refPoint[0]
y = sin * (contour[i, 0, 0]-refPoint[0]) + cos * (contour[i, 0, 1]-refPoint[1]) + refPoint[1]
newContour[i, 0, 0] = round(x)
newContour[i, 0, 1] = round(y)
return newContour
# try:
from setuptools import setup
from setuptools import Extension
from Cython.Build import cythonize
import numpy as np
import sys
if len(sys.argv) == 1:
sys.argv.append("build_ext")
sys.argv.append("--inplace")
ext = Extension("rotateContour", ["rotateContour.pyx"], extra_compile_args=['-O3'],)
setup(
name="rotate contour around reference point",
ext_modules=cythonize([ext], annotate=True), # accepts a glob pattern
include_dirs=[np.get_include()]
)
\ No newline at end of file
import copy
import numpy as np
import sys
sys.path.append("C://Users//xbrjos//Desktop//Python")
from gepard.analysis.particleContainer import ParticleContainer
from cythonModules import rotateContour
class ParticleVariations(object):
def __init__(self, particleContainer: ParticleContainer, numVariations: int = 10) -> None:
super(ParticleVariations, self).__init__()
self.origParticleContainer = particleContainer
self.numVariations = numVariations
def get_particleContainer_variations(self) -> ParticleContainer:
if self.numVariations > 0:
partContainer: ParticleContainer = self.origParticleContainer
contours: list = partContainer.getParticleContours()
center: tuple = round(np.mean(contours[:][0][0])),\
round(np.mean(contours[:][0][1]))
center: np.ndarray = np.array(center, dtype=np.int32)
angles = self._get_angles()
for i in range(self.numVariations):
if i > 0:
partContainer = copy.deepcopy(self.origParticleContainer)
for particle in partContainer.particles:
contour = np.int32(particle.contour)
particle.contour = rotateContour.rotate_contour_around_point(contour,
center, np.float(angles[i]))
yield partContainer
def _get_angles(self) -> np.ndarray:
angleIncrement: float = 360 / self.numVariations
return np.arange(self.numVariations) * angleIncrement
......@@ -6,16 +6,18 @@ Created on Wed Jan 22 13:57:28 2020
@author: luna
"""
import pickle
import sys
import os
import numpy as np
import matplotlib.pyplot as plt
import sys
sys.path.append("C://Users//xbrjos//Desktop//Python")
from gepard import dataset
from helpers import ParticleBinSorter
import methods as meth
import geometricMethods as gmeth
sys.path.append("C://Users//xbrjos//Desktop//Python")
from gepard import dataset
import chemometricMethods as cmeth
from datasetOperations import ParticleVariations
def get_name_from_directory(dirPath: str) -> str:
......@@ -23,9 +25,11 @@ def get_name_from_directory(dirPath: str) -> str:
class TotalResults(object):
methods: list = [meth.RandomSampling, meth.SizeBinFractioning, gmeth.CrossBoxSubSampling,
gmeth.SpiralBoxSubsampling]
measuredFreactions: list = [0.05, 0.1, 0.15, 0.2, 0.3, 0.5, 0.9]
# methods: list = [meth.RandomSampling, meth.SizeBinFractioning, gmeth.CrossBoxSubSampling,
# gmeth.SpiralBoxSubsampling, cmeth.ChemometricSubsampling]
# measuredFractions: list = [0.01, 0.05, 0.1, 0.15, 0.2, 0.5, 0.75, 0.9]
# measuredFractions: list = [0.1, 0.15, 0.2, 0.5, 0.75, 0.9]
measuredFractions: list = [0.1, 0.3, 0.5, 0.9]
def __init__(self):
super(TotalResults, self).__init__()
......@@ -47,18 +51,20 @@ class TotalResults(object):
return newResult
def update_all(self) -> None:
def update_all(self, force: bool = False) -> None:
"""
Updates all samples with all methods and all fractions
:param force: Wether to force an update of an already existing method.
:return:
"""
for index, sample in enumerate(self.sampleResults):
sample.load_dataset()
for fraction in self.measuredFreactions:
possibleMethods = self._get_methods_for_fraction(sample.dataset, fraction)
for curMethod in possibleMethods:
# print(f'updating {sample.sampleName} with {curMethod.label} at fraction {fraction}')
sample.update_result_with_method(curMethod)
possibleMethods: list = []
for fraction in self.measuredFractions:
for method in self._get_methods_for_fraction(sample.dataset, fraction):
possibleMethods.append(method)
sample.update_result_with_methods(possibleMethods, force=force)
print(f'processed {index+1} of {len(self.sampleResults)} samples')
def get_error_vs_fraction_data(self, attributes: list = [], methods: list = []) -> dict:
......@@ -102,23 +108,99 @@ class TotalResults(object):
particleContainer = dataset.particleContainer
methods: list = [meth.RandomSampling(particleContainer, fraction),
meth.SizeBinFractioning(particleContainer, fraction)]
boxCreator: gmeth.BoxSelectionCreator = gmeth.BoxSelectionCreator(dataset)
methods += boxCreator.get_crossBoxSubsamplers_for_fraction(fraction)
methods += boxCreator.get_spiralBoxSubsamplers_for_fraction(fraction)
methods.append(cmeth.ChemometricSubsampling(particleContainer, fraction))
return methods
class SubsamplingResult(object):
"""
Stores all interesting results from a subsampling experiment
"""
# TODO: UPDATE PATTERNS -> ARE THESE REASONABLE???
mpPatterns = ['poly', 'rubber', 'pb', 'pr', 'pg', 'py', 'pv']
def __init__(self, subsamplingMethod: meth.SubsamplingMethod):
super(SubsamplingResult, self).__init__()
self.method: meth.SubsamplingMethod = subsamplingMethod
self.mpCountErrors: list = []
# self.origParticleCount: int = None
# self.subSampledParticleCount: int = None
# self.mpCountErrorPerBin: tuple = None
@property
def mpCountError(self) -> float:
error: float = 0.0
if len(self.mpCountErrors) > 0:
error = float(np.mean(self.mpCountErrors))
return error
def reset_results(self) -> None:
"""
Deletes all results
:return:
"""
self.mpCountErrors = []
def add_result(self, origParticles: list, subParticles: list) -> None:
"""
Takes the particle lists from a subsampling method and appends the calculated results.
:param origParticles:
:param subParticles:
:return:
"""
self.mpCountErrors.append(self._get_mp_count_error(origParticles, subParticles, self.method.fraction))
def _get_mp_count_error_per_bin(self, allParticles: list, subParticles: list, fractionMeasured: float) -> tuple:
binSorter = ParticleBinSorter()
allParticlesInBins = binSorter.sort_particles_into_bins(allParticles)
subParticlesInBins = binSorter.sort_particles_into_bins(subParticles)
mpCountErrorsPerBin = []
for allParticleBin, subParticleBin in zip(allParticlesInBins, subParticlesInBins):
mpCountErrorsPerBin.append(self._get_mp_count_error(allParticleBin, subParticleBin, fractionMeasured))
return binSorter.bins, mpCountErrorsPerBin
def _get_mp_count_error(self, allParticles: list, subParticles: list, fractionMeasured: float) -> float:
numMPOrig = self._get_number_of_MP_particles(allParticles)
numMPEstimate = self._get_number_of_MP_particles(subParticles) / fractionMeasured
if numMPOrig != 0:
mpCountError = self._get_error_from_values(numMPOrig, numMPEstimate)
elif numMPEstimate == 0:
mpCountError = 0
else:
raise Exception # >0 particles in subsample, whereas none in entire sample. This cannot be!
return mpCountError
def _get_error_from_values(self, exact: float, estimate: float) -> float:
assert (exact != 0)
return abs(exact - estimate) / exact * 100
def _get_number_of_MP_particles(self, particleList: list) -> int:
numMPParticles = 0
for particle in particleList:
assignment = particle.getParticleAssignment()
for pattern in self.mpPatterns:
if assignment.lower().find(pattern) != -1:
numMPParticles += 1
break
return numMPParticles
class SampleResult(object):
"""
An object the actually stores all generated results per sample and can update and report on them.
"""
def __init__(self, filepath: str):
def __init__(self, filepath: str, numVariations: int = 10):
super(SampleResult, self).__init__()
self.filepath: str = filepath
self.dataset: dataset.DataSet = None
self.results: list = []
self.attributes: list = []
self.numVariations: int = numVariations # how often the sample is altered for each method
@property
def sampleName(self) -> str:
......@@ -128,25 +210,47 @@ class SampleResult(object):
self.dataset = dataset.loadData(self.filepath)
assert self.dataset is not None
def update_result_with_method(self, method: meth.SubsamplingMethod, force: bool = False) -> None:
def update_result_with_methods(self, methods: list, force: bool = False) -> list:
"""
Updates result with the given method (contains desiredFraction already)
:param method: The SubsamplingMethod Object
:param force: Wether to force an update. If False, the result is not updated, if it is already present.
:return:
:return: list of updated methods
"""
if not self._result_is_already_present(method) or force:
if force:
self._remove_result_of_method(method)
if self.dataset is None:
self.load_dataset()
method.particleContainer = self.dataset.particleContainer
newResult: SubsamplingResult = SubsamplingResult(method)
self.results.append(newResult)
newResult.update()
if self.dataset is None and len(methods) > 0:
self.load_dataset()
updatedMethods: list = []
particleVariations: ParticleVariations = ParticleVariations(self.dataset.particleContainer,
numVariations=self.numVariations)
needsToBeUpdated: dict = {method: False for method in methods}
for index, particleContainer in enumerate(particleVariations.get_particleContainer_variations()):
for method in methods:
result: SubsamplingResult = self._get_result_of_method(method)
method: meth.SubsamplingMethod = method
method.particleContainer = particleContainer
if index == 0:
if result is None:
result = SubsamplingResult(method)
self.results.append(result)
result.reset_results()
needsToBeUpdated[method] = True
elif force:
result.reset_results()
needsToBeUpdated[method] = True
if needsToBeUpdated[method]:
subParticles = method.apply_subsampling_method()
result.add_result(method.particleContainer.particles, subParticles)
if method not in updatedMethods:
updatedMethods.append(method)
print(f'updated {self.sampleName} with {method.label} at fraction {method.fraction}, '
f'iteration {index+1}')
return updatedMethods
def set_attribute(self, newAttribute: str) -> None:
"""
......@@ -156,7 +260,6 @@ class SampleResult(object):
"""
if not self.has_attribute(newAttribute):
self.attributes.append(newAttribute)
print(f'sample {self.filepath} has now attribute {newAttribute}')
def has_any_attribute(self, listOfAttributes: list) -> bool:
hasAttr: bool = False
......@@ -180,85 +283,21 @@ class SampleResult(object):
if method.equals(result.method):
self.results.remove(result)
def _result_is_already_present(self, method: meth.SubsamplingMethod) -> bool:
def _get_result_of_method(self, method: meth.SubsamplingMethod) -> SubsamplingResult:
"""
Checks, if a result with the given method (method type AND measured fraction) is already present.
:param method: The method object, specifying the subsampling method and the measured fraction
:return:
"""
isPresent: bool = False
requestedResult: SubsamplingResult = None
for result in self.results:
if method.equals(result.method):
isPresent = True
requestedResult = result
break
return isPresent
return requestedResult
# def _get_result_of_method(self, method: meth.SubsamplingMethod) -> SubsamplingResult:
# return None
class SubsamplingResult(object):
"""
Stores all interesting results from a subsampling experiment
"""
def __init__(self, subsamplingMethod: meth.SubsamplingMethod):
super(SubsamplingResult, self).__init__()
self.method: meth.SubsamplingMethod = subsamplingMethod
self.fraction = self.method.fraction
self.origParticleCount: int = None
self.subSampledParticleCount: int = None
self.mpCountError: float = None
self.mpCountErrorPerBin: tuple = None
def update(self) -> None:
"""
Updates all results from the method.
:return:
"""
assert self.method.particleContainer is not None
origParticles: list = self.method.particleContainer.particles
self.origParticleCount = len(origParticles)
subParticles: list = self.method.apply_subsampling_method()
self.subSampledParticleCount = len(subParticles)
fraction: float = self.method.fraction
self.mpCountError = self._get_mp_count_error(origParticles, subParticles, fraction)
# print(f'{self.origParticleCount} particles, thereof {self.subSampledParticleCount} measured, error: {self.mpCountError}')
self.mpCountErrorPerBin = self._get_mp_count_error_per_bin(origParticles, subParticles, fraction)
# print(f'method {self.method.label} updated, result is {self.mpCountError}')
def _get_mp_count_error_per_bin(self, allParticles: list, subParticles: list, fractionMeasured: float) -> tuple:
binSorter = ParticleBinSorter()
allParticlesInBins = binSorter.sort_particles_into_bins(allParticles)
subParticlesInBins = binSorter.sort_particles_into_bins(subParticles)
mpCountErrorsPerBin = []
for allParticleBin, subParticleBin in zip(allParticlesInBins, subParticlesInBins):
mpCountErrorsPerBin.append(self._get_mp_count_error(allParticleBin, subParticleBin, fractionMeasured))
return binSorter.bins, mpCountErrorsPerBin
def _get_mp_count_error(self, allParticles: list, subParticles: list, fractionMeasured: float) -> float:
numMPOrig = self._get_number_of_MP_particles(allParticles)
numMPEstimate = self._get_number_of_MP_particles(subParticles) / fractionMeasured
if numMPOrig != 0:
mpCountError = self._get_error_from_values(numMPOrig, numMPEstimate)
elif numMPEstimate == 0:
mpCountError = 0
else:
raise Exception # >0 particles in subsample, whereas none in entire sample. This cannot be!
return mpCountError
def _get_error_from_values(self, exact: float, estimate: float) -> float:
assert(exact != 0)
return abs(exact - estimate) / exact
def _get_number_of_MP_particles(self, particleList: list) -> int:
mpPatterns = ['poly', 'rubber', 'pb', 'pr', 'pg', 'py', 'pv']
numMPParticles = 0
for particle in particleList:
assignment = particle.getParticleAssignment()
for pattern in mpPatterns:
if assignment.lower().find(pattern) != -1:
numMPParticles += 1
break
return numMPParticles
from PyQt5 import QtGui, QtWidgets, QtCore
import numpy as np
import sys
sys.path.append("C://Users//xbrjos//Desktop//Python")
import gepard
from gepard import dataset
import helpers
import numpy as np
from cythonModules import rotateContour as rc
class FilterView(QtWidgets.QGraphicsView):
......@@ -14,6 +15,7 @@ class FilterView(QtWidgets.QGraphicsView):
self.setWindowTitle('FilterView')
self.dataset: dataset.DataSet = None
self.rotation: int = 0
scene = QtWidgets.QGraphicsScene(self)
scene.setItemIndexMethod(QtWidgets.QGraphicsScene.NoIndex)
......@@ -47,6 +49,20 @@ class FilterView(QtWidgets.QGraphicsView):
self._update_particle_contours()
self._fit_to_window()
@helpers.timingDecorator
def update_rotation(self, newRotation: int) -> None:
if newRotation != self.rotation:
angle: float = np.float(newRotation-self.rotation)
center: np.ndarray = np.array([self.filter.circleOffset[0] + self.filter.diameter/2,
self.filter.circleOffset[1] + self.filter.diameter/2], dtype=np.int32)
for particle in self.dataset.particleContainer.particles:
particle.contour = rc.rotate_contour_around_point(particle.contour, center, angle)
self._update_particle_contours()
self.rotation = newRotation
@helpers.timingDecorator
def _update_particle_contours(self) -> None:
self._remove_particle_contours()
if self.dataset is not None:
......
......@@ -25,14 +25,24 @@ class MainView(QtWidgets.QWidget):
loadDsetBtn = QtWidgets.QPushButton('Load Dataset')
loadDsetBtn.released.connect(self._load_dataset)
self.rotationSpinBox = QtWidgets.QSpinBox()
self.rotationSpinBox.setMinimum(0)
self.rotationSpinBox.setMaximum(359)
self.rotationSpinBox.setValue(0)
self.rotationSpinBox.setMaximumWidth(50)
self.rotationSpinBox.valueChanged.connect(self._update_fiter_rotation)
self.controlGroup = QtWidgets.QGroupBox()
self.controlGroupLayout = QtWidgets.QHBoxLayout()
self.controlGroup.setLayout(self.controlGroupLayout)
self.controlGroupLayout.addWidget(loadDsetBtn)
self.controlGroupLayout.addWidget(QtWidgets.QLabel('Filter Rotation'))
self.controlGroupLayout.addWidget(self.rotationSpinBox)
self.controlGroupLayout.addWidget(QtWidgets.QLabel('Select Subsampling Mode:'))
self.controlGroupLayout.addWidget(self.modeSelector)
self.controlGroupLayout.addWidget(self.activeModeControl)
self.controlGroupLayout.addStretch()
self.layout.addWidget(self.controlGroup)
self.filterView = FilterView()
......@@ -62,10 +72,11 @@ class MainView(QtWidgets.QWidget):
self.activeMode = requestedMode
self.activeModeControl = self.activeMode.get_control_groupBox()
self.controlGroupLayout.insertWidget(2, self.activeModeControl)
self.controlGroupLayout.insertWidget(3, self.activeModeControl)
self.activeMode.update_measure_viewItems()
@helpers.timingDecorator
def _load_dataset(self) -> None:
fname = QtWidgets.QFileDialog.getOpenFileName(self, 'Select .pkl file', filter='pkl file (*.pkl)')
if fname[0] != '':
......@@ -91,6 +102,10 @@ class MainView(QtWidgets.QWidget):
self.filterView.update_from_dataset(dset)
self.activeMode.update_measure_viewItems()
def _update_fiter_rotation(self):
self.filterView.update_rotation(self.rotationSpinBox.value())
self.activeMode.send_measuredParticles_to_filterview()
if __name__ == '__main__':
import sys
......
......@@ -9,6 +9,7 @@ class MeasureMode(QtCore.QObject):
self.filterView: FilterView = relatedFilterView
self.uiControls: QtWidgets.QGroupBox = QtWidgets.QGroupBox()
self.boxGenerator: BoxSelectionSubsamplingMethod = None
self.subParticles: list = []
def get_control_groupBox(self) -> QtWidgets.QGroupBox:
return self.uiControls
......@@ -16,7 +17,7 @@ class MeasureMode(QtCore.QObject):
def update_measure_viewItems(self) -> None:
raise NotImplementedError
def _send_measuredParticles_to_filterview(self) -> None:
def send_measuredParticles_to_filterview(self) -> None:
if self.boxGenerator.particleContainer is not None:
subParticles = self.boxGenerator.apply_subsampling_method()
self.filterView.update_measured_particles(subParticles)
......@@ -44,7 +45,7 @@ class CrossBoxMode(MeasureMode):
topLefts: list = self.boxGenerator.get_topLeft_of_boxes()
boxSize = self.boxGenerator.boxSize
self.filterView.update_measure_boxes(topLefts, boxSize)
self._send_measuredParticles_to_filterview()
self.send_measuredParticles_to_filterview()
class CrossBoxesControls(QtWidgets.QGroupBox):
......@@ -103,7 +104,7 @@ class SpiralBoxMode(MeasureMode):
topLefts: list = self.boxGenerator.get_topLeft_of_boxes()
boxSize = self.boxGenerator.boxSize
self.filterView.update_measure_boxes(topLefts, boxSize)
self._send_measuredParticles_to_filterview()
self.send_measuredParticles_to_filterview()
class SpiralBoxControls(QtWidgets.QGroupBox):
......
......@@ -3,24 +3,42 @@ import numpy as np
import sys
sys.path.append("C://Users//xbrjos//Desktop//Python")
from gepard import dataset
import time
def timingDecorator(callingFunction):
"""
A wrapper function for timing the duration of the given function.
:param callingFunction:
:return: wrapped function
"""
def wrapper(*args, **kwargs):
t0 = time.time()
ret = callingFunction(*args, **kwargs)
print(f'{callingFunction.__name__} took {np.round(time.time()-t0, 2)} seconds')
return ret
return wrapper
class ParticleBinSorter(object):
def __init__(self):
super(ParticleBinSorter, self).__init__()
self.bins = [5, 10, 20, 50, 100, 200, 500]
def sort_particles_into_bins(self, particleList):
particlesInBins = self._get_empty_bins()
for particle in particleList:
binIndex = self._get_binIndex_of_particle(particle)
particlesInBins[binIndex].append(particle)
return particlesInBins
def _get_empty_bins(self):
return [[] for _ in range(len(self.bins)+1)]
def _get_binIndex_of_particle(self, particle):
size = particle.getParticleSize()
binIndex = 0
......@@ -114,7 +132,7 @@ def get_polygon_area(polygon: QtGui.QPolygonF) -> float:
return area
def get_filterDimensions_from_dataset(dataset) -> tuple:
def get_filterDimensions_from_dataset(dataset: dataset.DataSet) -> tuple:
"""
Processes the datasets boundary items to calculate diameter and offset (coord system offset of circular filter
with respect to actual dataset). This is used to set circular filter dimensions to use in the geometric
......
......@@ -4,6 +4,7 @@ from evaluation import TotalResults
def load_results(fname: str) -> TotalResults:
# TODO: REMVOE DATASET FROM SAMPLERESULTS, OTHERWISE THE FILESIZE IS GOING TO BE HUGE
res: TotalResults = None
if os.path.exists(fname):
with open(fname, "rb") as fp:
......
......@@ -14,7 +14,7 @@ class SubsamplingMethod(object):
def __init__(self, particleConatainer, desiredFraction: float = 0.2):
super(SubsamplingMethod, self).__init__()
self.particleContainer = particleConatainer
self.fraction = desiredFraction
self.fraction: float = desiredFraction
@property
def label(self) -> str:
......
......@@ -10,33 +10,57 @@ SET GEPARD TO EVALUATION BRANCH (WITHOUT THE TILING STUFF), OTHERWISE SOME OF TH
"""
# results: TotalResults = TotalResults()
# pklsInFolders = get_pkls_from_directory(r'C:\Users\xbrjos\Desktop\temp MP\NewDatasets')
#
# for folder in pklsInFolders.keys():
# for samplePath in pklsInFolders[folder]:
# newSampleResult: SampleResult = results.add_sample(samplePath)
# for attr in get_attributes_from_foldername(folder):
# newSampleResult.set_attribute(attr)
#
# t0 = time.time()
# results.update_all()
# print('updating all took', time.time()-t0, 'seconds')
#
results: TotalResults = TotalResults()
pklsInFolders = get_pkls_from_directory(r'C:\Users\xbrjos\Desktop\temp MP\NewDatasets')
for folder in pklsInFolders.keys():
for samplePath in pklsInFolders[folder]:
newSampleResult: SampleResult = results.add_sample(samplePath)
for attr in get_attributes_from_foldername(folder):
newSampleResult.set_attribute(attr)
t0 = time.time()
results.update_all()
print('updating all took', time.time()-t0, 'seconds')
save_results('results1.res', results)
# results: TotalResults = load_results('results1.res')
# results.update_all(force=True)
# save_results('results1.res', results)
results: TotalResults = load_results('results1.res')
errorPerFraction: dict = results.get_error_vs_fraction_data(methods=['spiral', 'cross'])
plt.clf()
errorPerFraction: dict = results.get_error_vs_fraction_data(attributes=['air', 'water'],
methods=['chemo', 'sizeBin', 'random'])
plt.subplot(121)
for methodLabel in errorPerFraction.keys():
fractions: list = list(errorPerFraction[methodLabel].keys())
errors: list = list(errorPerFraction[methodLabel].values())
plt.plot(fractions, errors)
plt.scatter(fractions, errors, label=methodLabel)
plt.title('Air/Water sample', fontSize=15)
plt.xscale('log')
plt.xlabel('measured fraction', fontsize=12)
plt.ylabel('mpCountError (%)', fontsize=12)
plt.ylim([0, 100])
plt.legend()
errorPerFraction: dict = results.get_error_vs_fraction_data(attributes=['sediment', 'soil', 'beach', 'slush'],
methods=['chemo', 'sizeBin', 'random'])
plt.subplot(122)
for methodLabel in errorPerFraction.keys():
fractions: list = list(errorPerFraction[methodLabel].keys())
errors: list = list(errorPerFraction[methodLabel].values())
plt.plot(fractions, errors, label=methodLabel)
plt.plot(fractions, errors)
plt.scatter(fractions, errors, label=methodLabel)
plt.title('Spiral or Box Layouts')
plt.title('Sediment/Beach/Slush sample', fontSize=15)
plt.xscale('log')
plt.xlabel('measured fraction')
plt.ylabel('mpCountError')
plt.xlabel('measured fraction', fontsize=12)
plt.ylabel('mpCountError (%)', fontsize=12)
plt.ylim([0, 100])
plt.legend()
plt.show()
import numpy as np
import sys
sys.path.append("C://Users//xbrjos//Desktop//Python")
import gepard
from gepard.analysis.particleContainer import ParticleContainer
def get_default_ParticleContainer() -> ParticleContainer:
particleContainer: ParticleContainer = ParticleContainer(None)
particleContainer.initializeParticles(4)
contours: list = []
for i in range(4):
x = 10*i
contours.append(np.array([[[x, 0]], [[x+10, 0]], [[x+10, 10]], [[x, 10]]], dtype=np.int32))
particleContainer.setParticleContours(contours)
return particleContainer
import unittest
import cv2
import numpy as np
import sys
import matplotlib.pyplot as plt
from sklearn.datasets import make_blobs
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import DBSCAN
sys.path.append("C://Users//xbrjos//Desktop//Python")
from gepard.analysis import particleAndMeasurement as pm
from gepard.analysis.particleContainer import ParticleContainer
from gepard import dataset
import chemometricMethods as cmeth
class TestFeatureExtractor(unittest.TestCase):
def setUp(self) -> None:
self.extractor: cmeth.FeatureExtractor = cmeth.FeatureExtractor(None)
def test_get_contour_moments(self):
imgs = []
imgA: np.ndarray = np.zeros((200, 200), dtype=np.uint8)
cv2.putText(imgA, 'A', (25, 175), fontFace=cv2.FONT_HERSHEY_SIMPLEX,
fontScale=7, color=1, thickness=5)
imgs.append(imgA.copy())
imgA_translated: np.ndarray = np.zeros((200, 200), dtype=np.uint8)
cv2.putText(imgA_translated, 'A', (10, 180), fontFace=cv2.FONT_HERSHEY_SIMPLEX,
fontScale=7, color=1, thickness=5)
imgs.append(imgA_translated)
imgs.append(cv2.rotate(imgA, cv2.ROTATE_90_CLOCKWISE))
imgs.append(cv2.rotate(imgA, cv2.ROTATE_180))
imgs.append(cv2.resize(imgA, None, fx=0.5, fy=0.5, interpolation=cv2.INTER_LINEAR))
imgs.append(cv2.resize(imgA, None, fx=1.5, fy=1.5, interpolation=cv2.INTER_LINEAR))
moments: np.ndarray = np.zeros((7, len(imgs))) # we prepare an empty array for 7 hu moments per image
for i, img in enumerate(imgs):
contours, hierarchy = cv2.findContours(img, 1, 2)
particle: pm.Particle = pm.Particle()
particle.contour = contours[0]
self.extractor.particle = particle
hu: np.ndarray = self.extractor._get_log_hu_moments()
moments[:, i] = hu
# The first six hu moments are supposed to be invariant to scale, rotation and translation
# Small errors can occur, as the test image is of low resolution...
for i in range(6):
diff: np.ndarray = moments[i, :] - np.mean(moments[i, :])
self.assertFalse(np.any(diff > 0.1))
def test_get_color_hash(self):
for color in ['red', 'green', 'violet', 'blue', 'Blue', 'non-determinable', None]:
for numDigits in [4, 6, 8]:
hashNumber: int = abs(hash(color))
hashArray: np.ndarray = self.extractor._get_color_hash(color, numDigits)
self.assertEqual(len(hashArray), numDigits)
for i in range(numDigits):
self.assertEqual(hashArray[i], int(str(hashNumber)[i]))
class TestChemometricSubsampling(unittest.TestCase):
def setUp(self) -> None:
self.particleContainer: ParticleContainer = ParticleContainer(None)
self.numParticles: int = 5
self.particleContainer.initializeParticles(self.numParticles)
img: np.ndarray = np.zeros((20, 20), dtype=np.uint8)
cv2.putText(img, 'A', (2, 2), fontFace=cv2.FONT_HERSHEY_SIMPLEX,
fontScale=1, color=1, thickness=2)
contours, hierarchy = cv2.findContours(img, 1, 2)
self.particleContainer.setParticleContours([contours[0] for _ in range(self.numParticles)])
self.chemSubs: cmeth.ChemometricSubsampling = cmeth.ChemometricSubsampling(self.particleContainer,
desiredFraction=0.1)
def test_get_particle_featurematrix(self):
features: np.ndarray = self.chemSubs._get_particle_featurematrix()
self.assertEqual(features.shape, (11, self.numParticles))
for i in range(6):
diff: np.ndarray = features[i, :] - np.mean(features[i, :])
self.assertFalse(np.any(diff > 0.1))
def test_get_numPoints_per_cluster(self):
def get_orig_points_per_cluster(index):
return (index+1)*50
# numPointsPerCluster: int = 50
for frac in [0.01, 0.1, 0.5, 0.9]:
self.chemSubs.fraction = frac
for numClusters in [1, 5, 10]:
for numNoisePoints in [0, 10, 15]:
labels: list = []
for clusterIndex in range(numClusters):
# for _ in range(numPointsPerCluster):
for _ in range(get_orig_points_per_cluster(clusterIndex)):
labels.append(clusterIndex)
for _ in range(numNoisePoints):
labels.append(-1)
labels: np.ndarray = np.array(labels)
numTotal: int = len(labels)
origFrac: float = self.chemSubs.fraction
noiseAmpFactor = np.clip(5, 0, 1/frac)
pointsPerCluster: dict = self.chemSubs._get_numPoints_per_cluster(labels,
noiseAmpFactor=noiseAmpFactor)
numPointsToMeasure = round(numTotal*origFrac)
if numPointsToMeasure == 0:
numPointsToMeasure = 1
self.assertTrue(abs(sum(list(pointsPerCluster.values())) - numPointsToMeasure) <= 1)
if numNoisePoints == 0:
fractionPerCluster: float = frac
else:
# fractionPerCluster: float = numPointsToMeasure / (numClusters*numPointsPerCluster +
# numNoisePoints*noiseAmpFactor)
fractionPerCluster: float = numPointsToMeasure / (len(labels) - numNoisePoints +
numNoisePoints * noiseAmpFactor)
tooFewPoints = numPointsToMeasure < (numClusters + (1 if numNoisePoints > 0 else 0))
pointsFound: int = 0
for clusterIndex in pointsPerCluster.keys():
if clusterIndex > -1:
if not tooFewPoints:
pointsExpected = round(fractionPerCluster * get_orig_points_per_cluster(clusterIndex))
if pointsExpected == 0:
pointsExpected = 1
diff = abs(pointsPerCluster[clusterIndex] - pointsExpected)
self.assertTrue(diff <= 1)
else:
if pointsFound < numPointsToMeasure:
self.assertEqual(pointsPerCluster[clusterIndex], 1)
else:
self.assertEqual(pointsPerCluster[clusterIndex], 0)
pointsFound += pointsPerCluster[clusterIndex]
if numNoisePoints > 0:
self.assertTrue(abs(pointsPerCluster[-1] - (numPointsToMeasure - pointsFound)) <= 1)
def test_get_n_points_closest_to_center(self):
points: np.ndarray = np.array([[0, 0], [0, 1], [1, 0], [1, 1]])
refPoint: np.ndarray = np.array([0, 0])
closestPoints: list = cmeth.get_n_points_closest_to_point(points, 3, refPoint)
self.assertEqual(len(closestPoints), 3)
self.assertTrue(0 in closestPoints)
self.assertTrue(1 in closestPoints)
self.assertTrue(2 in closestPoints)
refPoint = np.array([2, 2])
closestPoints = cmeth.get_n_points_closest_to_point(points, 3, refPoint)
self.assertEqual(len(closestPoints), 3)
self.assertTrue(1 in closestPoints)
self.assertTrue(2 in closestPoints)
self.assertTrue(3 in closestPoints)
refPoint = np.array([2, 0.5])
closestPoints = cmeth.get_n_points_closest_to_point(points, 2, refPoint)
self.assertEqual(len(closestPoints), 2)
self.assertTrue(2 in closestPoints)
self.assertTrue(3 in closestPoints)
# def test_clustering(self):
# fname = r'C:\Users\xbrjos\Desktop\temp MP\190326_MCII_WWTP_SB_50_1\190326_MCII_WWTP_SB_50_1.pkl'
# # fname = r'C:\Users\xbrjos\Desktop\temp MP\190313_Soil_5_A_50_5_1_50_1\190313_Soil_5_A_50_5_1_50_1.pkl'
# # fname = r'C:\Users\xbrjos\Desktop\temp MP\190201_BSB_Stroomi_ds2_R1_R2_50\190201_BSB_Stroomi_ds2_R1_R2_50.pkl'
# dset: dataset.Dataset = dataset.loadData(fname)
# self.chemSubs.particleContainer = dset.particleContainer
# features: np.ndarray = self.chemSubs._get_particle_featurematrix()
# princComps: np.ndarray = cmeth.get_pca(features)
#
# X = StandardScaler().fit_transform(princComps)
#
# #############################################################################
# # Compute DBSCAN
# db = DBSCAN(eps=0.1, min_samples=10).fit(X)
#
# core_samples_mask = np.zeros_like(db.labels_, dtype=bool)
# core_samples_mask[db.core_sample_indices_] = True
# labels = db.labels_
#
# # Number of clusters in labels, ignoring noise if present.
# n_clusters_ = len(set(labels)) - (1 if -1 in labels else 0)
# n_noise_ = list(labels).count(-1)
#
# print('Estimated number of clusters: %d' % n_clusters_)
# print('Estimated number of noise points: %d' % n_noise_)
#
# # Black removed and is used for noise instead.
# unique_labels = set(labels)
# colors = [plt.cm.Spectral(each)
# for each in np.linspace(0, 1, len(unique_labels))]
# for k, col in zip(unique_labels, colors):
# if k == -1:
# # Black used for noise.
# col = [0, 0, 0, 1]
#
# class_member_mask = (labels == k)
#
# xy = X[class_member_mask & core_samples_mask]
# plt.plot(xy[:, 0], xy[:, 1], 'o', markerfacecolor=tuple(col),
# markeredgecolor='k', markersize=14)
#
# xy = X[class_member_mask & ~core_samples_mask]
# plt.plot(xy[:, 0], xy[:, 1], 'o', markerfacecolor=tuple(col),
# markeredgecolor='k', markersize=6)
#
# plt.title('Estimated number of clusters: %d' % n_clusters_)
# plt.show()
\ No newline at end of file
import unittest
import numpy as np
from cythonModules import rotateContour as rc
class CythonTester(unittest.TestCase):
def test_rotate_contour(self):
contour: np.ndarray = np.array([[[0, 5]],
[[5, 5]],
[[5, 0]]], dtype=np.int32)
refPoint: np.ndarray = np.array([0, 0], dtype=np.int32)
angle: float = 90.0
newContour: np.ndarray = rc.rotate_contour_around_point(contour, refPoint, angle)
self.assertAlmostEqual(newContour[0, 0, 0], -5)
self.assertAlmostEqual(newContour[0, 0, 1], 0)
self.assertAlmostEqual(newContour[1, 0, 0], -5)
self.assertAlmostEqual(newContour[1, 0, 1], 5)
self.assertAlmostEqual(newContour[2, 0, 0], 0)
self.assertAlmostEqual(newContour[2, 0, 1], 5)
angle = 180.0
newContour = rc.rotate_contour_around_point(contour, refPoint, angle)
self.assertAlmostEqual(newContour[0, 0, 0], 0)
self.assertAlmostEqual(newContour[0, 0, 1], -5)
self.assertAlmostEqual(newContour[1, 0, 0], -5)
self.assertAlmostEqual(newContour[1, 0, 1], -5)
self.assertAlmostEqual(newContour[2, 0, 0], -5)
self.assertAlmostEqual(newContour[2, 0, 1], 0)
angle = 270.0
newContour = rc.rotate_contour_around_point(contour, refPoint, angle)
self.assertAlmostEqual(newContour[0, 0, 0], 5)
self.assertAlmostEqual(newContour[0, 0, 1], 0)
self.assertAlmostEqual(newContour[1, 0, 0], 5)
self.assertAlmostEqual(newContour[1, 0, 1], -5)
self.assertAlmostEqual(newContour[2, 0, 0], 0)
self.assertAlmostEqual(newContour[2, 0, 1], -5)
refPoint = np.array([5, 5], dtype=np.int32)
angle: float = 90.0
newContour = rc.rotate_contour_around_point(contour, refPoint, angle)
self.assertAlmostEqual(newContour[0, 0, 0], 5)
self.assertAlmostEqual(newContour[0, 0, 1], 0)
self.assertAlmostEqual(newContour[2, 0, 0], 10)
self.assertAlmostEqual(newContour[2, 0, 1], 5)
angle = 180.0
newContour = rc.rotate_contour_around_point(contour, refPoint, angle)
self.assertAlmostEqual(newContour[0, 0, 0], 10)
self.assertAlmostEqual(newContour[0, 0, 1], 5)
self.assertAlmostEqual(newContour[2, 0, 0], 5)
self.assertAlmostEqual(newContour[2, 0, 1], 10)
angle = 270.0
newContour = rc.rotate_contour_around_point(contour, refPoint, angle)
self.assertAlmostEqual(newContour[0, 0, 0], 5)
self.assertAlmostEqual(newContour[0, 0, 1], 10)
self.assertAlmostEqual(newContour[2, 0, 0], 0)
self.assertAlmostEqual(newContour[2, 0, 1], 5)
import unittest
import numpy as np
import sys
sys.path.append("C://Users//xbrjos//Desktop//Python")
from gepard.analysis.particleContainer import ParticleContainer
from datasetOperations import ParticleVariations
from helpers_for_test import get_default_ParticleContainer
class TestParticleVariations(unittest.TestCase):
def test_get_particleContainer_variations(self):
particleContainer: ParticleContainer = get_default_ParticleContainer()
contours = particleContainer.getParticleContours()
center: tuple = round(np.mean(contours[:][0][0])), \
round(np.mean(contours[:][0][1]))
center: np.ndarray = np.array(center, dtype=np.int32)
for numVariations in [0, 1, 10, 20]:
particleVariations: ParticleVariations = ParticleVariations(particleContainer, numVariations)
foundContours: list = []
if numVariations == 0:
self.assertEqual(len(list(particleVariations.get_particleContainer_variations())), 0)
else:
for index, partContainer in enumerate(particleVariations.get_particleContainer_variations()):
if index == 0:
self.assertTrue(partContainer is particleContainer)
else:
self.assertFalse(partContainer is particleContainer)
contours = []
for particle in partContainer.particles:
contours.append(particle.contour)
contourHash = hash(particle.contour.tostring())
self.assertTrue(contourHash not in foundContours)
foundContours.append(contourHash)
self.assertEqual(index, numVariations-1)
def test_get_angles(self):
particleVariations: ParticleVariations = ParticleVariations(None, 2)
angles: list = list(particleVariations._get_angles())
self.assertEqual(angles, [0, 180])
particleVariations.numVariations = 4
angles: list = list(particleVariations._get_angles())
self.assertEqual(angles, [0, 90, 180, 270])
......@@ -8,14 +8,17 @@ Created on Wed Jan 22 13:58:25 2020
import unittest
import random
import numpy as np
import sys
sys.path.append("C://Users//xbrjos//Desktop//Python")
import gepard
from gepard.analysis.particleContainer import ParticleContainer
from gepard.analysis.particleAndMeasurement import Particle, Measurement
from evaluation import TotalResults, SampleResult, SubsamplingResult
import methods as meth
import geometricMethods as gmeth
from helpers_for_test import get_default_ParticleContainer
class TestTotalResults(unittest.TestCase):
......@@ -62,7 +65,9 @@ class TestTotalResults(unittest.TestCase):
possibleRandomMethods = 2
possibleCrossBoxMethods = 2
possibleSpiralBoxMethods = 3
totalPossible = possibleCrossBoxMethods + possibleRandomMethods + possibleSpiralBoxMethods
possibleChemometricMethods = 1
totalPossible = possibleCrossBoxMethods + possibleRandomMethods + \
possibleSpiralBoxMethods + possibleChemometricMethods
self.assertEqual(len(methods), totalPossible)
self.assertTrue(containsMethod(methods, meth.RandomSampling(dset, desiredFraction)))
self.assertTrue(containsMethod(methods, meth.SizeBinFractioning(dset, desiredFraction)))
......@@ -74,7 +79,9 @@ class TestTotalResults(unittest.TestCase):
possibleRandomMethods = 2
possibleCrossBoxMethods = 1
possibleSpiralBoxMethods = 0
totalPossible = possibleCrossBoxMethods + possibleRandomMethods + possibleSpiralBoxMethods
possibleChemometricMethods = 1
totalPossible = possibleCrossBoxMethods + possibleRandomMethods + \
possibleSpiralBoxMethods + possibleChemometricMethods
self.assertEqual(len(methods), totalPossible)
self.assertTrue(containsMethod(methods, meth.RandomSampling(dset, desiredFraction)))
self.assertTrue(containsMethod(methods, meth.SizeBinFractioning(dset, desiredFraction)))
......@@ -86,7 +93,9 @@ class TestTotalResults(unittest.TestCase):
possibleRandomMethods = 2
possibleCrossBoxMethods = 0
possibleSpiralBoxMethods = 0
totalPossible = possibleCrossBoxMethods + possibleRandomMethods + possibleSpiralBoxMethods
possibleChemometricMethods = 1
totalPossible = possibleCrossBoxMethods + possibleRandomMethods + \
possibleSpiralBoxMethods + possibleChemometricMethods
self.assertEqual(len(methods), totalPossible)
self.assertTrue(containsMethod(methods, meth.RandomSampling(dset, desiredFraction)))
self.assertTrue(containsMethod(methods, meth.SizeBinFractioning(dset, desiredFraction)))
......@@ -101,30 +110,30 @@ class TestTotalResults(unittest.TestCase):
firstMethod: meth.RandomSampling = meth.RandomSampling(None, 0.1)
firstResult: SubsamplingResult = SubsamplingResult(firstMethod)
firstResult.mpCountError = 0.8
firstResult.mpCountErrors = [0.8]
secondMethod: gmeth.CrossBoxSubSampling = gmeth.CrossBoxSubSampling(None, 0.1)
secondMethod.numBoxesAcross = 3
secondResult: SubsamplingResult = SubsamplingResult(secondMethod)
secondResult.mpCountError = 0.6
secondResult.mpCountErrors = [0.6]
thirdMethod: gmeth.CrossBoxSubSampling = gmeth.CrossBoxSubSampling(None, 0.1)
thirdMethod.numBoxesAcross = 5
self.assertEqual(thirdMethod.fraction, 0.1)
thirdResult: SubsamplingResult = SubsamplingResult(thirdMethod)
thirdResult.mpCountError = 0.4
thirdResult.mpCountErrors = [0.4]
thirdMethod2: gmeth.CrossBoxSubSampling = gmeth.CrossBoxSubSampling(None, 0.1)
thirdMethod2.numBoxesAcross = 5
self.assertEqual(thirdMethod2.fraction, 0.1)
thirdResult2: SubsamplingResult = SubsamplingResult(thirdMethod)
thirdResult2.mpCountError = 0.8
thirdResult2.mpCountErrors = [0.8]
thirdMethod3: gmeth.CrossBoxSubSampling = gmeth.CrossBoxSubSampling(None, 0.2)
thirdMethod3.numBoxesAcross = 5
self.assertEqual(thirdMethod3.fraction, 0.2)
thirdResult3: SubsamplingResult = SubsamplingResult(thirdMethod3)
thirdResult3.mpCountError = 0.5
thirdResult3.mpCountErrors = [0.5]
firstSample.results = [firstResult, secondResult, thirdResult, thirdResult3]
secondSample.results = [firstResult, secondResult, thirdResult2, thirdResult3]
......@@ -171,9 +180,12 @@ class TestTotalResults(unittest.TestCase):
class TestSampleResult(unittest.TestCase):
def setUp(self) -> None:
particleContainer = get_default_ParticleContainer()
self.sampleResult: SampleResult = SampleResult('fakePath/fakeFile.pkl')
self.sampleResult.dataset = gepard.dataset.DataSet('fakePath/fakeFile.pkl')
self.sampleResult.results.append(SubsamplingResult(meth.RandomSampling(None, 0.1)))
self.sampleResult.dataset.particleContainer = particleContainer
self.sampleResult.results.append(SubsamplingResult(meth.RandomSampling(particleContainer, 0.1)))
newMethod = gmeth.SpiralBoxSubsampling(None, 0.1)
newMethod.numBoxes = 10
......@@ -207,29 +219,40 @@ class TestSampleResult(unittest.TestCase):
self.assertEqual(method.fraction, 0.3)
self.assertEqual(method.numBoxes, 10)
def test_result_is_already_present(self):
newMethod: meth.SubsamplingMethod = meth.RandomSampling(None, 0.1)
self.assertTrue(self.sampleResult._result_is_already_present(newMethod))
newMethod: meth.SubsamplingMethod = meth.RandomSampling(None, 0.2)
self.assertFalse(self.sampleResult._result_is_already_present(newMethod))
def test_get_result_of_method(self):
particleContainer = get_default_ParticleContainer()
newMethod: meth.SubsamplingMethod = meth.RandomSampling(particleContainer, 0.1)
result: SubsamplingResult = self.sampleResult._get_result_of_method(newMethod)
self.assertTrue(result is not None)
self.assertTrue(result.method.equals(newMethod))
newMethod = meth.RandomSampling(particleContainer, 0.2)
result = self.sampleResult._get_result_of_method(newMethod)
self.assertTrue(result is None)
newMethod = gmeth.SpiralBoxSubsampling(particleContainer, 0.1)
result = self.sampleResult._get_result_of_method(newMethod)
self.assertTrue(result is not None)
self.assertTrue(result.method.equals(newMethod))
newMethod: meth.SubsamplingMethod = gmeth.SpiralBoxSubsampling(None, 0.1)
self.assertTrue(self.sampleResult._result_is_already_present(newMethod))
newMethod: meth.SubsamplingMethod = gmeth.SpiralBoxSubsampling(None, 0.2)
self.assertFalse(self.sampleResult._result_is_already_present(newMethod))
newMethod = gmeth.SpiralBoxSubsampling(particleContainer, 0.2)
result = self.sampleResult._get_result_of_method(newMethod)
self.assertTrue(result is None)
newMethod: meth.SubsamplingMethod = gmeth.CrossBoxSubSampling(None, 0.3)
self.assertFalse(self.sampleResult._result_is_already_present(newMethod))
newMethod = gmeth.CrossBoxSubSampling(particleContainer, 0.3)
result = self.sampleResult._get_result_of_method(newMethod)
self.assertTrue(result is None)
def test_remove_result_of_method(self):
particleContainer = get_default_ParticleContainer()
numOrigResults = len(self.sampleResult.results)
self.sampleResult._remove_result_of_method(meth.RandomSampling(None, 0.1))
self.sampleResult._remove_result_of_method(meth.RandomSampling(particleContainer, 0.1))