Commit a8488af0 authored by Josef Brandt's avatar Josef Brandt

SampleResult has attribute numVariations

for each numVariation, the filter is rotated a bit and any method can be applied again to the same sample
parent f83f5d56
......@@ -7,7 +7,6 @@ from scipy import spatial
from itertools import combinations
from random import sample
import time
import sys
sys.path.append("C://Users//xbrjos//Desktop//Python")
from gepard.analysis.particleContainer import ParticleContainer
......
import copy
import numpy as np
import sys
sys.path.append("C://Users//xbrjos//Desktop//Python")
from gepard.analysis.particleContainer import ParticleContainer
from cythonModules import rotateContour
class ParticleVariations(object):
def __init__(self, particleContainer: ParticleContainer, numVariations: int = 10) -> None:
super(ParticleVariations, self).__init__()
self.origParticleContainer = particleContainer
self.numVariations = numVariations
def get_particleContainer_variations(self) -> ParticleContainer:
if self.numVariations > 0:
partContainer: ParticleContainer = self.origParticleContainer
contours: list = partContainer.getParticleContours()
center: tuple = round(np.mean(contours[:][0][0])),\
round(np.mean(contours[:][0][1]))
center: np.ndarray = np.array(center, dtype=np.int32)
angles = self._get_angles()
for i in range(self.numVariations):
if i > 0:
partContainer = copy.deepcopy(self.origParticleContainer)
for particle in partContainer.particles:
contour = np.int32(particle.contour)
particle.contour = rotateContour.rotate_contour_around_point(contour,
center, np.float(angles[i]))
yield partContainer
def _get_angles(self) -> np.ndarray:
angleIncrement: float = 360 / self.numVariations
return np.arange(self.numVariations) * angleIncrement
......@@ -6,17 +6,18 @@ Created on Wed Jan 22 13:57:28 2020
@author: luna
"""
import pickle
import sys
import os
import numpy as np
import matplotlib.pyplot as plt
import sys
sys.path.append("C://Users//xbrjos//Desktop//Python")
from gepard import dataset
from helpers import ParticleBinSorter
import methods as meth
import geometricMethods as gmeth
import chemometricMethods as cmeth
sys.path.append("C://Users//xbrjos//Desktop//Python")
from gepard import dataset
from datasetOperations import ParticleVariations
def get_name_from_directory(dirPath: str) -> str:
......@@ -26,9 +27,9 @@ def get_name_from_directory(dirPath: str) -> str:
class TotalResults(object):
# methods: list = [meth.RandomSampling, meth.SizeBinFractioning, gmeth.CrossBoxSubSampling,
# gmeth.SpiralBoxSubsampling, cmeth.ChemometricSubsampling]
measuredFractions: list = [0.01, 0.05, 0.1, 0.15, 0.2, 0.5, 0.75, 0.9]
# measuredFractions: list = [0.01, 0.05, 0.1, 0.15, 0.2, 0.5, 0.75, 0.9]
# measuredFractions: list = [0.1, 0.15, 0.2, 0.5, 0.75, 0.9]
# measuredFractions: list = [0.1, 0.3, 0.5, 0.9]
measuredFractions: list = [0.1, 0.3, 0.5, 0.9]
def __init__(self):
super(TotalResults, self).__init__()
......@@ -58,11 +59,12 @@ class TotalResults(object):
"""
for index, sample in enumerate(self.sampleResults):
sample.load_dataset()
possibleMethods: list = []
for fraction in self.measuredFractions:
possibleMethods = self._get_methods_for_fraction(sample.dataset, fraction)
for curMethod in possibleMethods:
print(f'updating {sample.sampleName} with {curMethod.label} at fraction {fraction}')
sample.update_result_with_method(curMethod, force=force)
for method in self._get_methods_for_fraction(sample.dataset, fraction):
possibleMethods.append(method)
sample.update_result_with_methods(possibleMethods, force=force)
print(f'processed {index+1} of {len(self.sampleResults)} samples')
def get_error_vs_fraction_data(self, attributes: list = [], methods: list = []) -> dict:
......@@ -110,20 +112,95 @@ class TotalResults(object):
methods += boxCreator.get_crossBoxSubsamplers_for_fraction(fraction)
methods += boxCreator.get_spiralBoxSubsamplers_for_fraction(fraction)
methods.append(cmeth.ChemometricSubsampling(particleContainer, fraction))
# methods = [cmeth.ChemometricSubsampling(particleContainer, fraction)]
return methods
class SubsamplingResult(object):
"""
Stores all interesting results from a subsampling experiment
"""
# TODO: UPDATE PATTERNS -> ARE THESE REASONABLE???
mpPatterns = ['poly', 'rubber', 'pb', 'pr', 'pg', 'py', 'pv']
def __init__(self, subsamplingMethod: meth.SubsamplingMethod):
super(SubsamplingResult, self).__init__()
self.method: meth.SubsamplingMethod = subsamplingMethod
self.mpCountErrors: list = []
# self.origParticleCount: int = None
# self.subSampledParticleCount: int = None
# self.mpCountErrorPerBin: tuple = None
@property
def mpCountError(self) -> float:
error: float = 0.0
if len(self.mpCountErrors) > 0:
error = float(np.mean(self.mpCountErrors))
return error
def reset_results(self) -> None:
"""
Deletes all results
:return:
"""
self.mpCountErrors = []
def add_result(self, origParticles: list, subParticles: list) -> None:
"""
Takes the particle lists from a subsampling method and appends the calculated results.
:param origParticles:
:param subParticles:
:return:
"""
self.mpCountErrors.append(self._get_mp_count_error(origParticles, subParticles, self.method.fraction))
def _get_mp_count_error_per_bin(self, allParticles: list, subParticles: list, fractionMeasured: float) -> tuple:
binSorter = ParticleBinSorter()
allParticlesInBins = binSorter.sort_particles_into_bins(allParticles)
subParticlesInBins = binSorter.sort_particles_into_bins(subParticles)
mpCountErrorsPerBin = []
for allParticleBin, subParticleBin in zip(allParticlesInBins, subParticlesInBins):
mpCountErrorsPerBin.append(self._get_mp_count_error(allParticleBin, subParticleBin, fractionMeasured))
return binSorter.bins, mpCountErrorsPerBin
def _get_mp_count_error(self, allParticles: list, subParticles: list, fractionMeasured: float) -> float:
numMPOrig = self._get_number_of_MP_particles(allParticles)
numMPEstimate = self._get_number_of_MP_particles(subParticles) / fractionMeasured
if numMPOrig != 0:
mpCountError = self._get_error_from_values(numMPOrig, numMPEstimate)
elif numMPEstimate == 0:
mpCountError = 0
else:
raise Exception # >0 particles in subsample, whereas none in entire sample. This cannot be!
return mpCountError
def _get_error_from_values(self, exact: float, estimate: float) -> float:
assert (exact != 0)
return abs(exact - estimate) / exact * 100
def _get_number_of_MP_particles(self, particleList: list) -> int:
numMPParticles = 0
for particle in particleList:
assignment = particle.getParticleAssignment()
for pattern in self.mpPatterns:
if assignment.lower().find(pattern) != -1:
numMPParticles += 1
break
return numMPParticles
class SampleResult(object):
"""
An object the actually stores all generated results per sample and can update and report on them.
"""
def __init__(self, filepath: str):
def __init__(self, filepath: str, numVariations: int = 10):
super(SampleResult, self).__init__()
self.filepath: str = filepath
self.dataset: dataset.DataSet = None
self.results: list = []
self.attributes: list = []
self.numVariations: int = numVariations # how often the sample is altered for each method
@property
def sampleName(self) -> str:
......@@ -133,25 +210,47 @@ class SampleResult(object):
self.dataset = dataset.loadData(self.filepath)
assert self.dataset is not None
def update_result_with_method(self, method: meth.SubsamplingMethod, force: bool = False) -> None:
def update_result_with_methods(self, methods: list, force: bool = False) -> list:
"""
Updates result with the given method (contains desiredFraction already)
:param method: The SubsamplingMethod Object
:param force: Wether to force an update. If False, the result is not updated, if it is already present.
:return:
:return: list of updated methods
"""
if not self._result_is_already_present(method) or force:
if force:
self._remove_result_of_method(method)
if self.dataset is None:
self.load_dataset()
method.particleContainer = self.dataset.particleContainer
newResult: SubsamplingResult = SubsamplingResult(method)
self.results.append(newResult)
newResult.update()
if self.dataset is None and len(methods) > 0:
self.load_dataset()
updatedMethods: list = []
particleVariations: ParticleVariations = ParticleVariations(self.dataset.particleContainer,
numVariations=self.numVariations)
needsToBeUpdated: dict = {method: False for method in methods}
for index, particleContainer in enumerate(particleVariations.get_particleContainer_variations()):
for method in methods:
result: SubsamplingResult = self._get_result_of_method(method)
method: meth.SubsamplingMethod = method
method.particleContainer = particleContainer
if index == 0:
if result is None:
result = SubsamplingResult(method)
self.results.append(result)
result.reset_results()
needsToBeUpdated[method] = True
elif force:
result.reset_results()
needsToBeUpdated[method] = True
if needsToBeUpdated[method]:
subParticles = method.apply_subsampling_method()
result.add_result(method.particleContainer.particles, subParticles)
if method not in updatedMethods:
updatedMethods.append(method)
print(f'updated {self.sampleName} with {method.label} at fraction {method.fraction}, '
f'iteration {index+1}')
return updatedMethods
def set_attribute(self, newAttribute: str) -> None:
"""
......@@ -184,85 +283,21 @@ class SampleResult(object):
if method.equals(result.method):
self.results.remove(result)
def _result_is_already_present(self, method: meth.SubsamplingMethod) -> bool:
def _get_result_of_method(self, method: meth.SubsamplingMethod) -> SubsamplingResult:
"""
Checks, if a result with the given method (method type AND measured fraction) is already present.
:param method: The method object, specifying the subsampling method and the measured fraction
:return:
"""
isPresent: bool = False
requestedResult: SubsamplingResult = None
for result in self.results:
if method.equals(result.method):
isPresent = True
requestedResult = result
break
return isPresent
return requestedResult
# def _get_result_of_method(self, method: meth.SubsamplingMethod) -> SubsamplingResult:
# return None
class SubsamplingResult(object):
"""
Stores all interesting results from a subsampling experiment
"""
def __init__(self, subsamplingMethod: meth.SubsamplingMethod):
super(SubsamplingResult, self).__init__()
self.method: meth.SubsamplingMethod = subsamplingMethod
self.fraction = self.method.fraction
self.origParticleCount: int = None
self.subSampledParticleCount: int = None
self.mpCountError: float = None
self.mpCountErrorPerBin: tuple = None
# TODO: UPDATE PATTERNS -> ARE THESE REASONABLE???
self.mpPatterns = ['poly', 'rubber', 'pb', 'pr', 'pg', 'py', 'pv']
def update(self) -> None:
"""
Updates all results from the method.
:return:
"""
assert self.method.particleContainer is not None
origParticles: list = self.method.particleContainer.particles
self.origParticleCount = len(origParticles)
subParticles: list = self.method.apply_subsampling_method()
self.subSampledParticleCount = len(subParticles)
fraction: float = self.method.fraction
self.mpCountError = self._get_mp_count_error(origParticles, subParticles, fraction)
# print(f'{self.origParticleCount} particles, thereof {self.subSampledParticleCount} measured, error: {self.mpCountError}')
self.mpCountErrorPerBin = self._get_mp_count_error_per_bin(origParticles, subParticles, fraction)
# print(f'method {self.method.label} updated, result is {self.mpCountError}')
def _get_mp_count_error_per_bin(self, allParticles: list, subParticles: list, fractionMeasured: float) -> tuple:
binSorter = ParticleBinSorter()
allParticlesInBins = binSorter.sort_particles_into_bins(allParticles)
subParticlesInBins = binSorter.sort_particles_into_bins(subParticles)
mpCountErrorsPerBin = []
for allParticleBin, subParticleBin in zip(allParticlesInBins, subParticlesInBins):
mpCountErrorsPerBin.append(self._get_mp_count_error(allParticleBin, subParticleBin, fractionMeasured))
return binSorter.bins, mpCountErrorsPerBin
def _get_mp_count_error(self, allParticles: list, subParticles: list, fractionMeasured: float) -> float:
numMPOrig = self._get_number_of_MP_particles(allParticles)
numMPEstimate = self._get_number_of_MP_particles(subParticles) / fractionMeasured
if numMPOrig != 0:
mpCountError = self._get_error_from_values(numMPOrig, numMPEstimate)
elif numMPEstimate == 0:
mpCountError = 0
else:
raise Exception # >0 particles in subsample, whereas none in entire sample. This cannot be!
return mpCountError
def _get_error_from_values(self, exact: float, estimate: float) -> float:
assert(exact != 0)
return abs(exact - estimate) / exact * 100
def _get_number_of_MP_particles(self, particleList: list) -> int:
numMPParticles = 0
for particle in particleList:
assignment = particle.getParticleAssignment()
for pattern in self.mpPatterns:
if assignment.lower().find(pattern) != -1:
numMPParticles += 1
break
return numMPParticles
......@@ -14,7 +14,7 @@ class SubsamplingMethod(object):
def __init__(self, particleConatainer, desiredFraction: float = 0.2):
super(SubsamplingMethod, self).__init__()
self.particleContainer = particleConatainer
self.fraction = desiredFraction
self.fraction: float = desiredFraction
@property
def label(self) -> str:
......
import numpy as np
import sys
sys.path.append("C://Users//xbrjos//Desktop//Python")
import gepard
from gepard.analysis.particleContainer import ParticleContainer
def get_default_ParticleContainer() -> ParticleContainer:
particleContainer: ParticleContainer = ParticleContainer(None)
particleContainer.initializeParticles(4)
contours: list = []
for i in range(4):
x = 10*i
contours.append(np.array([[[x, 0]], [[x+10, 0]], [[x+10, 10]], [[x, 10]]], dtype=np.int32))
particleContainer.setParticleContours(contours)
return particleContainer
import unittest
import numpy as np
import sys
sys.path.append("C://Users//xbrjos//Desktop//Python")
from gepard.analysis.particleContainer import ParticleContainer
from datasetOperations import ParticleVariations
from helpers_for_test import get_default_ParticleContainer
class TestParticleVariations(unittest.TestCase):
def test_get_particleContainer_variations(self):
particleContainer: ParticleContainer = get_default_ParticleContainer()
contours = particleContainer.getParticleContours()
center: tuple = round(np.mean(contours[:][0][0])), \
round(np.mean(contours[:][0][1]))
center: np.ndarray = np.array(center, dtype=np.int32)
for numVariations in [0, 1, 10, 20]:
particleVariations: ParticleVariations = ParticleVariations(particleContainer, numVariations)
foundContours: list = []
if numVariations == 0:
self.assertEqual(len(list(particleVariations.get_particleContainer_variations())), 0)
else:
for index, partContainer in enumerate(particleVariations.get_particleContainer_variations()):
if index == 0:
self.assertTrue(partContainer is particleContainer)
else:
self.assertFalse(partContainer is particleContainer)
contours = []
for particle in partContainer.particles:
contours.append(particle.contour)
contourHash = hash(particle.contour.tostring())
self.assertTrue(contourHash not in foundContours)
foundContours.append(contourHash)
self.assertEqual(index, numVariations-1)
def test_get_angles(self):
particleVariations: ParticleVariations = ParticleVariations(None, 2)
angles: list = list(particleVariations._get_angles())
self.assertEqual(angles, [0, 180])
particleVariations.numVariations = 4
angles: list = list(particleVariations._get_angles())
self.assertEqual(angles, [0, 90, 180, 270])
......@@ -8,14 +8,17 @@ Created on Wed Jan 22 13:58:25 2020
import unittest
import random
import numpy as np
import sys
sys.path.append("C://Users//xbrjos//Desktop//Python")
import gepard
from gepard.analysis.particleContainer import ParticleContainer
from gepard.analysis.particleAndMeasurement import Particle, Measurement
from evaluation import TotalResults, SampleResult, SubsamplingResult
import methods as meth
import geometricMethods as gmeth
from helpers_for_test import get_default_ParticleContainer
class TestTotalResults(unittest.TestCase):
......@@ -107,30 +110,30 @@ class TestTotalResults(unittest.TestCase):
firstMethod: meth.RandomSampling = meth.RandomSampling(None, 0.1)
firstResult: SubsamplingResult = SubsamplingResult(firstMethod)
firstResult.mpCountError = 0.8
firstResult.mpCountErrors = [0.8]
secondMethod: gmeth.CrossBoxSubSampling = gmeth.CrossBoxSubSampling(None, 0.1)
secondMethod.numBoxesAcross = 3
secondResult: SubsamplingResult = SubsamplingResult(secondMethod)
secondResult.mpCountError = 0.6
secondResult.mpCountErrors = [0.6]
thirdMethod: gmeth.CrossBoxSubSampling = gmeth.CrossBoxSubSampling(None, 0.1)
thirdMethod.numBoxesAcross = 5
self.assertEqual(thirdMethod.fraction, 0.1)
thirdResult: SubsamplingResult = SubsamplingResult(thirdMethod)
thirdResult.mpCountError = 0.4
thirdResult.mpCountErrors = [0.4]
thirdMethod2: gmeth.CrossBoxSubSampling = gmeth.CrossBoxSubSampling(None, 0.1)
thirdMethod2.numBoxesAcross = 5
self.assertEqual(thirdMethod2.fraction, 0.1)
thirdResult2: SubsamplingResult = SubsamplingResult(thirdMethod)
thirdResult2.mpCountError = 0.8
thirdResult2.mpCountErrors = [0.8]
thirdMethod3: gmeth.CrossBoxSubSampling = gmeth.CrossBoxSubSampling(None, 0.2)
thirdMethod3.numBoxesAcross = 5
self.assertEqual(thirdMethod3.fraction, 0.2)
thirdResult3: SubsamplingResult = SubsamplingResult(thirdMethod3)
thirdResult3.mpCountError = 0.5
thirdResult3.mpCountErrors = [0.5]
firstSample.results = [firstResult, secondResult, thirdResult, thirdResult3]
secondSample.results = [firstResult, secondResult, thirdResult2, thirdResult3]
......@@ -177,9 +180,12 @@ class TestTotalResults(unittest.TestCase):
class TestSampleResult(unittest.TestCase):
def setUp(self) -> None:
particleContainer = get_default_ParticleContainer()
self.sampleResult: SampleResult = SampleResult('fakePath/fakeFile.pkl')
self.sampleResult.dataset = gepard.dataset.DataSet('fakePath/fakeFile.pkl')
self.sampleResult.results.append(SubsamplingResult(meth.RandomSampling(None, 0.1)))
self.sampleResult.dataset.particleContainer = particleContainer
self.sampleResult.results.append(SubsamplingResult(meth.RandomSampling(particleContainer, 0.1)))
newMethod = gmeth.SpiralBoxSubsampling(None, 0.1)
newMethod.numBoxes = 10
......@@ -213,29 +219,40 @@ class TestSampleResult(unittest.TestCase):
self.assertEqual(method.fraction, 0.3)
self.assertEqual(method.numBoxes, 10)
def test_result_is_already_present(self):
newMethod: meth.SubsamplingMethod = meth.RandomSampling(None, 0.1)
self.assertTrue(self.sampleResult._result_is_already_present(newMethod))
newMethod: meth.SubsamplingMethod = meth.RandomSampling(None, 0.2)
self.assertFalse(self.sampleResult._result_is_already_present(newMethod))
def test_get_result_of_method(self):
particleContainer = get_default_ParticleContainer()
newMethod: meth.SubsamplingMethod = meth.RandomSampling(particleContainer, 0.1)
result: SubsamplingResult = self.sampleResult._get_result_of_method(newMethod)
self.assertTrue(result is not None)
self.assertTrue(result.method.equals(newMethod))
newMethod = meth.RandomSampling(particleContainer, 0.2)
result = self.sampleResult._get_result_of_method(newMethod)
self.assertTrue(result is None)
newMethod = gmeth.SpiralBoxSubsampling(particleContainer, 0.1)
result = self.sampleResult._get_result_of_method(newMethod)
self.assertTrue(result is not None)
self.assertTrue(result.method.equals(newMethod))
newMethod: meth.SubsamplingMethod = gmeth.SpiralBoxSubsampling(None, 0.1)
self.assertTrue(self.sampleResult._result_is_already_present(newMethod))
newMethod: meth.SubsamplingMethod = gmeth.SpiralBoxSubsampling(None, 0.2)
self.assertFalse(self.sampleResult._result_is_already_present(newMethod))
newMethod = gmeth.SpiralBoxSubsampling(particleContainer, 0.2)
result = self.sampleResult._get_result_of_method(newMethod)
self.assertTrue(result is None)
newMethod: meth.SubsamplingMethod = gmeth.CrossBoxSubSampling(None, 0.3)
self.assertFalse(self.sampleResult._result_is_already_present(newMethod))
newMethod = gmeth.CrossBoxSubSampling(particleContainer, 0.3)
result = self.sampleResult._get_result_of_method(newMethod)
self.assertTrue(result is None)
def test_remove_result_of_method(self):
particleContainer = get_default_ParticleContainer()
numOrigResults = len(self.sampleResult.results)
self.sampleResult._remove_result_of_method(meth.RandomSampling(None, 0.1))
self.sampleResult._remove_result_of_method(meth.RandomSampling(particleContainer, 0.1))
self.assertEqual(len(self.sampleResult.results), numOrigResults-1)
self.sampleResult._remove_result_of_method(gmeth.SpiralBoxSubsampling(None, 0.1))
self.sampleResult._remove_result_of_method(gmeth.SpiralBoxSubsampling(particleContainer, 0.1))
self.assertEqual(len(self.sampleResult.results), numOrigResults-2)
self.sampleResult._remove_result_of_method(gmeth.SpiralBoxSubsampling(None, 0.2)) # this is one is not present...
self.sampleResult._remove_result_of_method(gmeth.SpiralBoxSubsampling(particleContainer, 0.2)) # this is one is not present...
self.assertEqual(len(self.sampleResult.results), numOrigResults-2)
def test_attributes(self):
......@@ -257,11 +274,68 @@ class TestSampleResult(unittest.TestCase):
self.assertFalse(self.sampleResult.has_any_attribute(['water', 'sediment']))
self.assertFalse(self.sampleResult.has_any_attribute(['beach']))
def test_update_result_with_methods(self):
particleContainer = get_default_ParticleContainer()
for numVariations in [1, 5, 20]:
self.sampleResult.numVariations = numVariations
for result in self.sampleResult.results:
result.reset_results()
methods: list = []
method1 = gmeth.SpiralBoxSubsampling(particleContainer, 0.1)
method1.numBoxes = 10
methods.append(method1)
method2 = gmeth.SpiralBoxSubsampling(particleContainer, 0.1)
method2.numBoxes = 15
methods.append(method2)
updatedMethods: list = self.sampleResult.update_result_with_methods(methods)
self.assertEqual(len(updatedMethods), 0) # because the methods were there already
for result in self.sampleResult.results:
self.assertEqual(len(result.mpCountErrors), 0) # because the added results haven't set any
updatedMethods = self.sampleResult.update_result_with_methods(methods, force=True)
self.assertEqual(len(updatedMethods), 2) # because now we force the update
self.assertTrue(method1 in updatedMethods)
self.assertTrue(method2 in updatedMethods)
firstParticleContainer: ParticleContainer = None
for result in self.sampleResult.results:
if result.method.equals(method1) or result.method.equals(method2):
if firstParticleContainer is None:
firstParticleContainer = result.method.particleContainer
else:
self.assertTrue(result.method.particleContainer is not firstParticleContainer)
self.assertEqual(len(result.mpCountErrors), numVariations)
else:
self.assertEqual(len(result.mpCountErrors), 0) # these were not updated
class TestSubsamplingResult(unittest.TestCase):
def setUp(self):
self.subsamplingResult: SubsamplingResult = SubsamplingResult(meth.RandomSampling(None, 0.1))
def test_add_result(self):
self.assertEqual(len(self.subsamplingResult.mpCountErrors), 0)
origParticles: list = self._get_MP_particles(100)
subParticles: list = self._get_MP_particles(15) # at fraction of 0.1, 10 particles would be expected
self.subsamplingResult.add_result(origParticles, subParticles)
self.assertEqual(len(self.subsamplingResult.mpCountErrors), 1)
self.assertEqual(self.subsamplingResult.mpCountErrors[0], 50)
subParticles = self._get_MP_particles(10) # at fraction of 0.1, 10 particles would be expected