...
 
Commits (15)
......@@ -14,3 +14,9 @@ cythonModules/build/
*.pyd
*.html
*.pkl
chemometrics/Assignments.txt
chemometrics/Data.txt
import numpy as np
import cv2
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.cluster import DBSCAN
from scipy import spatial
from itertools import combinations
from random import sample
import sys
sys.path.append("C://Users//xbrjos//Desktop//Python")
from gepard.analysis.particleContainer import ParticleContainer
from gepard.analysis import particleAndMeasurement as pm
from methods import SubsamplingMethod
def get_pca(data: np.ndarray, numComp: int = 2) -> np.ndarray:
try:
standardizedData = StandardScaler().fit_transform(data.copy())
except ValueError:
print('first standardscaler attempt failed, retrying..')
print('datashape', data.shape)
print('unique:', np.unique(data))
raise
pca = PCA(n_components=numComp)
princComp: np.ndarray = pca.fit_transform(np.transpose(standardizedData))
return princComp
def do_DBSCAN_clustering(data: np.ndarray, eps: float = 0.1, min_samples: int = 10) -> tuple:
"""
Does DBSCAN clustering and finds noisy data
:param data: The input array
:param eps:
:param min_samples:
:return: Cluster labels for each point in the dataset given to fit(). Noisy samples are given the label -1.
"""
assert data.shape[1] == 2
standardizedData = StandardScaler().fit_transform(data)
db = DBSCAN(eps=eps, min_samples=min_samples).fit(standardizedData)
return db.labels_, db.core_sample_indices_
def get_n_points_closest_to_point(points: np.ndarray, n: int, refPoint: np.ndarray) -> list:
"""
Returns a list with indices of n points that are closest to the indicated refPoint
:param points: np.ndarray, cols: x, y, rows: individual points
:param n: number of points to return
:param refPoint: np.array([x, y]) of reference point
:return: list of point indices
"""
distancesToPoints: np.ndarray = np.linalg.norm(points - refPoint, axis=1)
sortedIndices = np.argsort(distancesToPoints)
return list(sortedIndices[:n])
class ChemometricSubsampling(SubsamplingMethod):
def __init__(self, particleContainer: ParticleContainer, desiredFraction: float):
super(ChemometricSubsampling, self).__init__(particleContainer, desiredFraction)
@property
def label(self) -> str:
return 'Chemometric Selection'
def apply_subsampling_method(self) -> list:
vectors: np.ndarray = self._get_particle_featurematrix()
try:
princComps: np.ndarray = get_pca(vectors)
except ValueError:
print('numParticles:', len(self.particleContainer.particles))
print('input featurematrix shape', vectors.shape)
clusterLabels, coreIndices = do_DBSCAN_clustering(princComps)
indices: list = self._get_indices_from_clusterLabels(princComps, clusterLabels, coreIndices)
selectedParticles: list = []
for particle in self.particleContainer.particles:
if particle.index in indices:
selectedParticles.append(particle)
return selectedParticles
def _get_particle_featurematrix(self) -> np.ndarray:
"""
:return: np.ndarray, numRows: Features, numCols: Particles
"""
vectors: list = []
for particle in self.particleContainer.particles:
extractor: FeatureExtractor = FeatureExtractor(particle)
vectors.append(extractor.get_characteristic_vector())
vectors: np.ndarray = np.transpose(np.array(vectors))
assert vectors.shape == (11, len(self.particleContainer.particles)), f'wrong featureMat-shape: {vectors.shape}'
return vectors
def equals(self, otherMethod) -> bool:
equals: bool = False
if type(otherMethod) == ChemometricSubsampling and otherMethod.fraction == self.fraction:
equals = True
return equals
def _get_indices_from_clusterLabels(self, points: np.ndarray, labels: np.ndarray, centerIndices: np.ndarray) -> list:
indices: list = []
allIndices: np.ndarray = np.arange(len(labels))
numPointsPerCluster: dict = self._get_numPoints_per_cluster(labels)
for clusterIndex in set(labels):
indToAppend: list = []
nPoints: int = int(numPointsPerCluster[clusterIndex])
indicesInCluster: np.ndarray = allIndices[labels == clusterIndex]
if clusterIndex == -1:
for ind in sample(list(indicesInCluster), nPoints):
# assert ind not in indices
indices.append(ind)
else:
clusterPoints: np.ndarray = points[indicesInCluster]
centerPoint: np.ndarray = np.mean(clusterPoints, axis=0)
indicesToSelect: list = get_n_points_closest_to_point(clusterPoints, nPoints, centerPoint)
for ind in indicesToSelect:
origInd = indicesInCluster[ind]
indices.append(origInd)
assert len(set(indices)) == len(indices), f'The calculated indices contain duplicates, ' \
f'num duplicates: {len(indices) - len(set(indices))}'
return indices
def _get_numPoints_per_cluster(self, labels: np.ndarray, noiseAmpFactor: float = 5) -> dict:
"""
MP Particles are expected to be the minority of all particles. So, if datapoints were classified as noise
(i.e., label = -1), it is likely that MP is in there. The abundancy of points taken from the noise is multiplied
by the noiseAmpFactor
:param labels:
:param noiseAmpFactor:
:return: A dictionary with keys = cluster index (i.e., label) and value = number of points to take from that
"""
pointsPerCluster: dict = {}
if type(labels) != np.ndarray:
labels = np.array(labels)
individualLabels: set = set(labels)
numPointsToSelect = round(len(labels) * self.fraction)
if numPointsToSelect == 0:
numPointsToSelect = 1
numNoisePoints = len(labels[labels == -1])
numClusteredPoints = len(labels) - numNoisePoints
# # get max noiseAmpFactor
if noiseAmpFactor > 1/self.fraction:
noiseAmpFactor = 1/self.fraction
numAmpPoints = numClusteredPoints + numNoisePoints*noiseAmpFactor
fractionPerCluster = np.clip(numPointsToSelect / numAmpPoints, 0.0, 1.0)
tooFewPoints = numPointsToSelect < len(individualLabels)
totalPointsAdded = 0
for ind in individualLabels:
if ind > -1:
if not tooFewPoints:
pointsToAdd = round(fractionPerCluster * len(labels[labels == ind]))
else:
pointsToAdd = 1 if totalPointsAdded < numPointsToSelect else 0
pointsPerCluster[ind] = pointsToAdd
totalPointsAdded += pointsToAdd
# fill up the rest with noisePoints
if numNoisePoints > 0:
diff: float = np.clip(numPointsToSelect - totalPointsAdded, 0, numNoisePoints)
pointsPerCluster[-1] = diff
totalPointsAdded += diff
# just in case too many points were selected (due to rounding errors), keep on deleting until it matches
while totalPointsAdded > numPointsToSelect:
indexWithHighestCount = None
maxCount = 0
for index in pointsPerCluster.values():
if pointsPerCluster[index] > maxCount:
maxCount = pointsPerCluster[index]
indexWithHighestCount = index
pointsPerCluster[indexWithHighestCount] -= 1
totalPointsAdded -= 1
if not abs(totalPointsAdded - numPointsToSelect) <= 1:
print('error')
# assert abs(totalPointsAdded - numPointsToSelect) <= 1
for clusterIndex in pointsPerCluster.keys():
assert 0 <= pointsPerCluster[clusterIndex] <= len(labels[labels == clusterIndex])
return pointsPerCluster
class FeatureExtractor(object):
def __init__(self, particle: pm.Particle):
super(FeatureExtractor, self).__init__()
self.particle: pm.Particle = particle
def get_characteristic_vector(self) -> np.ndarray:
log_hu: np.ndarray = self._get_log_hu_moments()
color: np.ndarray = self._get_color_hash(self.particle.color, desiredLength=4)
vector: np.ndarray = np.hstack((log_hu, color))
if len(vector) != 11:
print('error')
assert len(vector) == 7 + 4, f'wrong feature vector: {vector} with shape: {vector.shape}'
return vector
def _get_log_hu_moments(self) -> np.ndarray:
moments: dict = cv2.moments(self.particle.contour)
resultMoments: np.ndarray = np.zeros((7, 1))
for index, mom in enumerate(cv2.HuMoments(moments)):
if mom != 0:
resultMoments[index] = -1 * np.copysign(1.0, mom) * np.log10(abs(mom))
else:
resultMoments[index] = 0
return resultMoments[:, 0]
def _get_color_hash(self, color: str, desiredLength: int = 4) -> np.ndarray:
colorArray: list = [int(i) for i in str(abs(hash(color)))[:desiredLength]]
return np.transpose(np.array(colorArray))
import numpy as np
import cv2
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.cluster import DBSCAN
# from scipy import spatial
# from itertools import combinations
from random import sample
import pickle
from sklearn.ensemble import RandomForestClassifier
from sklearn.neural_network import MLPClassifier
import os
import sys
sys.path.append("C://Users//xbrjos//Desktop//Python")
from gepard.analysis.particleContainer import ParticleContainer
from gepard.analysis import particleAndMeasurement as pm
from methods import SubsamplingMethod
def get_pca(data: np.ndarray, numComp: int = 2) -> np.ndarray:
try:
standardizedData = StandardScaler().fit_transform(data.copy())
except ValueError:
print('first standardscaler attempt failed, retrying..')
print('datashape', data.shape)
print('unique:', np.unique(data))
raise
pca: PCA = PCA(n_components=numComp)
princComp: np.ndarray = pca.fit_transform(np.transpose(standardizedData))
# print(f'pca explained variance with {numComp} princ comps is {sum(pca.explained_variance_)}')
return princComp
def do_DBSCAN_clustering(data: np.ndarray, eps: float = 0.1, min_samples: int = 10) -> tuple:
"""
Does DBSCAN clustering and finds noisy data
:param data: The input array
:param eps:
:param min_samples:
:return: Cluster labels for each point in the dataset given to fit(). Noisy samples are given the label -1.
"""
assert data.shape[1] == 2
standardizedData = StandardScaler().fit_transform(data)
db = DBSCAN(eps=eps, min_samples=min_samples).fit(standardizedData)
return db.labels_, db.core_sample_indices_
def get_n_points_closest_to_point(points: np.ndarray, n: int, refPoint: np.ndarray) -> list:
"""
Returns a list with indices of n points that are closest to the indicated refPoint
:param points: np.ndarray, cols: x, y, rows: individual points
:param n: number of points to return
:param refPoint: np.array([x, y]) of reference point
:return: list of point indices
"""
distancesToPoints: np.ndarray = np.linalg.norm(points - refPoint, axis=1)
sortedIndices = np.argsort(distancesToPoints)
return list(sortedIndices[:n])
def get_particle_featurematrix(particleContainer: ParticleContainer) -> np.ndarray:
"""
:return: np.ndarray, numRows: Features, numCols: Particles
"""
vectors: list = []
for particle in particleContainer.particles:
vectors.append(get_characteristic_vector(particle))
vectors: np.ndarray = np.array(vectors)
assert vectors.shape[0] == len(particleContainer.particles)
return vectors
def get_characteristic_vector(particle: pm.Particle) -> np.ndarray:
vector: list = []
# vector += list(get_log_hu_moments(particle.contour))
vector.append(float(get_color_index(particle.color)))
vector.append(get_solidity(particle.contour))
vector.append(get_aspect_ratio(particle.contour))
vector.append(get_extent(particle.contour))
vector.append(cv2.contourArea(particle.contour))
# vector.append(get_shape_index(particle.shape))
# vector.append(cv2.arcLength(particle.contour, True))
# vector: np.ndarray = np.hstack((log_hu, color))
# if len(vector) != 11:
# print('error')
# assert len(vector) == 7 + 4, f'wrong feature vector: {vector} with shape: {vector.shape}'
return np.array(vector)
def get_solidity(contour: np.ndarray) -> float:
area: float = cv2.contourArea(contour)
hull: np.ndarray = cv2.convexHull(contour)
hull_area: float = cv2.contourArea(hull)
if area == 0 or hull_area == 0:
raise ValueError
solidity: float = area / hull_area
return solidity
def get_aspect_ratio(contour: np.ndarray) -> float:
if contour.shape[0] >= 5: ##at least 5 points required for ellipse fitting...
ellipse = cv2.fitEllipse(contour)
short, long = ellipse[1]
else:
rect = cv2.minAreaRect(contour)
long, short = rect[1]
if short > long:
long, short = short, long
if short == 0.0:
raise InvalidParticleError
return long/short
def get_extent(contour: np.ndarray) -> float:
area: float = float(cv2.contourArea(contour))
x, y, w, h = cv2.boundingRect(contour)
rect_area: float = w * h
extent: float = area / rect_area
return extent
def get_log_hu_moments(contour: np.ndarray) -> np.ndarray:
moments: dict = cv2.moments(contour)
resultMoments: np.ndarray = np.zeros((7, 1))
for index, mom in enumerate(cv2.HuMoments(moments)):
if mom != 0:
resultMoments[index] = -1 * np.copysign(1.0, mom) * np.log10(abs(mom))
else:
resultMoments[index] = 0
return resultMoments[:, 0]
def get_color_hash(color: str, desiredLength: int = 4) -> np.ndarray:
colorArray: list = [int(i) for i in str(abs(hash(color)))[:desiredLength]]
return np.transpose(np.array(colorArray))
def get_color_index(color: str) -> int:
colors: list = ['transparent', 'green', 'brown', 'non-determinable', 'undedetermined', 'grey',
'red', 'yellow', 'white', 'blue']
assert color in colors, f'color not found: {color}'
return colors.index(color)
# def get_shape_index(shape: str) -> int:
# shapes: list = ['spherule', 'fibre', 'flake', 'irregular']
# assert shape in shapes
# return shapes.index(shape)
class TrainedSubsampling(SubsamplingMethod):
def __init__(self, particleContainer: ParticleContainer, desiredFraction: float,
path: str = r'C:\Users\xbrjos\Desktop\Python\Subsampling\chemometrics\RandomForestClassifier, score 0.72.pkl'):
super(TrainedSubsampling, self).__init__(particleContainer, desiredFraction)
self.score: float = None
self.clf = None
self.clfPath: str = path
@property
def label(self) -> str:
return 'Trained Random Sampling'
def _load_classifier(self) -> None:
assert os.path.exists(self.clfPath)
fname: str = self.clfPath
with open(fname, "rb") as fp:
self.clf = pickle.load(fp)
name: str = fname.split('.pkl')[0]
name: str = name.split('score')[1]
self.score = float(name)
def _get_measure_indices(self, predictedAssignments: list) -> set:
indicesToMeasure: set = set([])
assignments: np.ndarray = np.array(predictedAssignments)
mpIndices: list = list(np.where(assignments == 1)[0])
nonMpIndices: list = list(np.where(assignments == 0)[0])
numEstimMPParticles: int = len(mpIndices)
numParticlesToMeasure = round(len(predictedAssignments) * self.fraction)
if numParticlesToMeasure <= numEstimMPParticles:
indicesToMeasure = set(sample(mpIndices, numParticlesToMeasure))
else:
remainingIndices: int = int(numParticlesToMeasure - numEstimMPParticles)
indicesToMeasure = set(mpIndices + sample(nonMpIndices, remainingIndices))
assert len(indicesToMeasure) == numParticlesToMeasure
return indicesToMeasure
# class ChemometricSubsampling(SubsamplingMethod):
# # def __init__(self, particleContainer: ParticleContainer, desiredFraction: float):
# # super(ChemometricSubsampling, self).__init__(particleContainer, desiredFraction)
#
# @property
# def label(self) -> str:
# return 'Chemometric Selection'
#
# def apply_subsampling_method(self) -> list:
# vectors: np.ndarray = get_particle_featurematrix(self.particleContainer)
# try:
# princComps: np.ndarray = get_pca(vectors)
# except ValueError:
# print('numParticles:', len(self.particleContainer.particles))
# print('input featurematrix shape', vectors.shape)
# clusterLabels, coreIndices = do_DBSCAN_clustering(princComps)
# indices: list = self._get_indices_from_clusterLabels(princComps, clusterLabels, coreIndices)
#
# selectedParticles: list = []
# for particle in self.particleContainer.particles:
# if particle.index in indices:
# selectedParticles.append(particle)
#
# return selectedParticles
#
# def _get_particle_featurematrix(self) -> np.ndarray:
# """
# :return: np.ndarray, numRows: Particles, numCols: Features
# """
# vectors: list = []
# for particle in self.particleContainer.particles:
# # extractor: FeatureExtractor = FeatureExtractor(particle)
# vectors.append(extractor.get_characteristic_vector())
# vectors: np.array(vectors)
# # assert vectors.shape == (11, len(self.particleContainer.particles)), f'wrong featureMat-shape: {vectors.shape}'
# return vectors
#
# def equals(self, otherMethod) -> bool:
# equals: bool = False
# if type(otherMethod) == ChemometricSubsampling and otherMethod.fraction == self.fraction:
# equals = True
# return equals
#
# def _get_indices_from_clusterLabels(self, points: np.ndarray, labels: np.ndarray, centerIndices: np.ndarray) -> list:
# indices: list = []
# allIndices: np.ndarray = np.arange(len(labels))
# numPointsPerCluster: dict = self._get_numPoints_per_cluster(labels)
#
# for clusterIndex in set(labels):
# indToAppend: list = []
# nPoints: int = int(numPointsPerCluster[clusterIndex])
# indicesInCluster: np.ndarray = allIndices[labels == clusterIndex]
# if clusterIndex == -1:
# for ind in sample(list(indicesInCluster), nPoints):
# # assert ind not in indices
# indices.append(ind)
# else:
# clusterPoints: np.ndarray = points[indicesInCluster]
# centerPoint: np.ndarray = np.mean(clusterPoints, axis=0)
# indicesToSelect: list = get_n_points_closest_to_point(clusterPoints, nPoints, centerPoint)
# for ind in indicesToSelect:
# origInd = indicesInCluster[ind]
# indices.append(origInd)
#
# assert len(set(indices)) == len(indices), f'The calculated indices contain duplicates, ' \
# f'num duplicates: {len(indices) - len(set(indices))}'
# return indices
#
# def _get_numPoints_per_cluster(self, labels: np.ndarray, noiseAmpFactor: float = 5) -> dict:
# """
# MP Particles are expected to be the minority of all particles. So, if datapoints were classified as noise
# (i.e., label = -1), it is likely that MP is in there. The abundancy of points taken from the noise is multiplied
# by the noiseAmpFactor
# :param labels:
# :param noiseAmpFactor:
# :return: A dictionary with keys = cluster index (i.e., label) and value = number of points to take from that
# """
# pointsPerCluster: dict = {}
# if type(labels) != np.ndarray:
# labels = np.array(labels)
# individualLabels: set = set(labels)
# numPointsToSelect = round(len(labels) * self.fraction)
# if numPointsToSelect == 0:
# numPointsToSelect = 1
#
# numNoisePoints = len(labels[labels == -1])
# numClusteredPoints = len(labels) - numNoisePoints
#
# # # get max noiseAmpFactor
# if noiseAmpFactor > 1/self.fraction:
# noiseAmpFactor = 1/self.fraction
#
# numAmpPoints = numClusteredPoints + numNoisePoints*noiseAmpFactor
# fractionPerCluster = np.clip(numPointsToSelect / numAmpPoints, 0.0, 1.0)
#
# tooFewPoints = numPointsToSelect < len(individualLabels)
#
# totalPointsAdded = 0
# for ind in individualLabels:
# if ind > -1:
#
# if not tooFewPoints:
# pointsToAdd = round(fractionPerCluster * len(labels[labels == ind]))
# else:
# pointsToAdd = 1 if totalPointsAdded < numPointsToSelect else 0
#
# pointsPerCluster[ind] = pointsToAdd
# totalPointsAdded += pointsToAdd
#
# # fill up the rest with noisePoints
# if numNoisePoints > 0:
# diff: float = np.clip(numPointsToSelect - totalPointsAdded, 0, numNoisePoints)
# pointsPerCluster[-1] = diff
# totalPointsAdded += diff
#
# # just in case too many points were selected (due to rounding errors), keep on deleting until it matches
# while totalPointsAdded > numPointsToSelect:
# indexWithHighestCount = None
# maxCount = 0
# for index in pointsPerCluster.values():
# if pointsPerCluster[index] > maxCount:
# maxCount = pointsPerCluster[index]
# indexWithHighestCount = index
#
# pointsPerCluster[indexWithHighestCount] -= 1
# totalPointsAdded -= 1
#
# if not abs(totalPointsAdded - numPointsToSelect) <= 1:
# print('error')
# assert abs(totalPointsAdded - numPointsToSelect) <= 1
# for clusterIndex in pointsPerCluster.keys():
# assert 0 <= pointsPerCluster[clusterIndex] <= len(labels[labels == clusterIndex])
# return pointsPerCluster
\ No newline at end of file
import matplotlib.pyplot as plt
import numpy as np
from random import sample
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.feature_selection import SelectKBest
from sklearn.feature_selection import chi2
import pickle
import time
import sys
sys.path.append("C://Users//xbrjos//Desktop//Python")
from gepard import dataset
from gepard.analysis.particleContainer import ParticleContainer
from input_output import get_pkls_from_directory
from chemometricMethods import get_log_hu_moments, get_color_index, get_pca, get_characteristic_vector
from evaluation import is_MP_particle
def test_classification_models(dataset: tuple) -> None:
names = ["RandomForestClassifier", "NeuralNetClassifier"]
classifiers = [
RandomForestClassifier(n_estimators=1000),
MLPClassifier(alpha=1, max_iter=1000)]
t0 = time.time()
# preprocess dataset, split into training and test part
X, y = dataset
X = StandardScaler().fit_transform(X)
X_train, X_test, y_train, y_test = \
train_test_split(X, y, test_size=.3, random_state=42)
print(f'prepocessng finished after {round(time.time()-t0, 2)} seconds')
# iterate over classifiers
for name, clf in zip(names, classifiers):
t0 = time.time()
clf.fit(X_train, y_train)
print(f'fitting {name} took {round(time.time()-t0, 2)} seconds')
t0 = time.time()
score = clf.score(X_test, y_test)
with open(f'{name}, score {round(score, 2)}.pkl', "wb") as fp:
pickle.dump(clf, fp, protocol=-1)
y_predicted = clf.predict(X_test)
print(f'finished getting score and prediction after {round(time.time() - t0, 2)} seconds')
errors: dict = {int(k): 0 for k in np.unique(y_test)}
for j in range(len(y_predicted)):
if y_test[j] != y_predicted[j]:
errors[y_test[j]] += 1
print(f'{name} with test size {len(y_test)} has score {round(score, 2)}, errors: {errors}')
if __name__ == '__main__':
recreateNew: bool = True
if recreateNew:
pklsInFolders: dict = get_pkls_from_directory(r'C:\Users\xbrjos\Desktop\temp MP\NewDatasets')
X: list = []
y: list = []
counter = 0
for folder in pklsInFolders.keys():
for pklPath in pklsInFolders[folder]:
if counter < 100:
dset: dataset.DataSet = dataset.loadData(pklPath)
print('loaded', dset.name)
partContainer: ParticleContainer = dset.particleContainer
for particle in partContainer.particles:
features: np.ndarray = get_characteristic_vector(particle)
# features: list = [abs(i) for i in get_log_hu_moments(particle.contour)]
# features.append(get_color_index(particle.color))
X.append(features)
y.append(int(is_MP_particle(particle)))
counter += 1
X: np.ndarray = np.array(X)
y: np.ndarray = np.array(y)
MPindices: np.ndarray = np.where(y == 1)[0]
nonMPindices: np.ndarray = np.where(y == 0)[0]
nonMPindices: list = sample(list(nonMPindices), len(MPindices))
X_MP: list = list(X[MPindices])
y_MP: list = list(y[MPindices])
X_nonMP: list = list(X[nonMPindices])
y_nonMP: list = list(y[nonMPindices])
assert set(y_MP) == {1}
assert set(y_nonMP) == {0}
assert len(X_MP) == len(X_nonMP) == len(y_MP) == len(y_nonMP)
X_equalized: np.ndarray = np.array(X_MP + X_nonMP)
y_equalized: np.ndarray = np.array(y_MP + y_nonMP)
dset: tuple = (X_equalized, y_equalized)
with open('particleClassificaion.pkl', "wb") as fp:
pickle.dump(dset, fp, protocol=-1)
else:
with open('particleClassificaion.pkl', "rb") as fp:
dset: tuple = pickle.load(fp)
X, y = dset
# np.savetxt('Data.txt', X)
# np.savetxt('Assignments.txt', y)
# princComps = get_pca(X.transpose(), numComp=2)
#
# plt.scatter(princComps[:, 0], princComps[:, 1])
# print(X_equalized.shape)
# X: np.ndarray = SelectKBest(chi2, k=5).fit_transform(X, y)
# print(X_equalized.shape)
test_classification_models((X, y))
import numpy as np
cimport numpy as np
cimport numpy.random
cimport cython
DTYPE = np.float
ctypedef np.int32_t INT32_t
cdef get_random_topleft(double maxDist, double maxAngle, double radius, double boxSize):
cdef double angle, dist, x, y
cdef np.ndarray[INT32_t, ndim=1] newTopLeft
dist = np.random.rand() * maxDist
angle = np.random.rand() * maxAngle
newTopLeft = np.empty(2, dtype=np.int32)
x = dist*np.cos(angle) + radius - boxSize/2
y = dist*np.sin(angle) + radius - boxSize/2
newTopLeft[0] = np.int32(np.round(x))
newTopLeft[1] = np.int32(np.round(y))
return newTopLeft
def get_random_topLefts(int numBoxes, double boxSize, double radius, double maxAngle, int seed=1337, int maxTries=50):
cdef np.ndarray[INT32_t, ndim=2] topLefts
cdef np.ndarray[INT32_t, ndim=1] newTopLeft
cdef double maxDist
cdef int outerCounter, counter, x, y, i, j, diffX, diffY, successfullyAdded
cdef bint validSolutionFound, boxOverlaps
np.random.seed(seed)
maxDist = radius - np.sqrt((boxSize/2)**2 + (boxSize/2)**2)
outerCounter = 0
validSolutionFound = False
while not validSolutionFound and outerCounter < maxTries:
successfullyAdded = 0
topLefts = np.empty((numBoxes, 2), dtype=np.int32)
for i in range(numBoxes):
if i == 0:
topLefts[0, :] = get_random_topleft(maxDist, maxAngle, radius, boxSize)
successfullyAdded += 1
else:
counter = 0
while counter < 50:
newTopLeft = get_random_topleft(maxDist, maxAngle, radius, boxSize)
boxOverlaps = False
for j in range(i):
diffX = abs(np.float(newTopLeft[0] - np.float(topLefts[j, 0])))
diffY = abs(np.float(newTopLeft[1] - np.float(topLefts[j, 1])))
if diffX < boxSize and diffY < boxSize:
boxOverlaps = True
break
if boxOverlaps:
counter += 1
else:
topLefts[i, :] = newTopLeft
successfullyAdded += 1
break
if successfullyAdded == numBoxes:
validSolutionFound = True
else:
outerCounter += 1
return validSolutionFound, topLefts
\ No newline at end of file
......@@ -9,10 +9,18 @@ if len(sys.argv) == 1:
sys.argv.append("build_ext")
sys.argv.append("--inplace")
ext = Extension("rotateContour", ["rotateContour.pyx"], extra_compile_args=['-O3'],)
# ext = Extension("rotateContour", ["rotateContour.pyx"], extra_compile_args=['-O3'],)
# setup(
# name="rotate contour around reference point",
# ext_modules=cythonize([ext], annotate=True), # accepts a glob pattern
# include_dirs=[np.get_include()]
# )
# ext = Extension("getRandomTopLefts", ["getRandomTopLefts.pyx"], extra_compile_args=['-O3'],)
setup(
name="rotate contour around reference point",
ext_modules=cythonize([ext], annotate=True), # accepts a glob pattern
name="get a given number of random topLefts",
ext_modules=cythonize("randoms.pyx", annotate=True), # accepts a glob pattern
include_dirs=[np.get_include()]
)
\ No newline at end of file
)
......@@ -2,23 +2,28 @@ import copy
import numpy as np
import sys
sys.path.append("C://Users//xbrjos//Desktop//Python")
from gepard import dataset
from gepard.analysis.particleContainer import ParticleContainer
from cythonModules import rotateContour
from helpers import get_filterDimensions_from_dataset, get_center_from_filter_dimensions, convert_length_to_pixels
class ParticleVariations(object):
def __init__(self, particleContainer: ParticleContainer, numVariations: int = 10) -> None:
def __init__(self, dataset: dataset.DataSet, numVariations: int = 10) -> None:
super(ParticleVariations, self).__init__()
self.origParticleContainer = particleContainer
self.dataset: dataset.DataSet = dataset
self.origParticleContainer: ParticleContainer = self.dataset.particleContainer
self.numVariations = numVariations
def get_particleContainer_variations(self) -> ParticleContainer:
if self.numVariations > 0:
offset, diameter, [width, height] = get_filterDimensions_from_dataset(self.dataset)
diameter: float = convert_length_to_pixels(self.dataset, diameter)
offset: tuple = convert_length_to_pixels(self.dataset, offset[0]), \
convert_length_to_pixels(self.dataset, offset[1])
center: np.ndarray = get_center_from_filter_dimensions(offset, diameter)
partContainer: ParticleContainer = self.origParticleContainer
contours: list = partContainer.getParticleContours()
center: tuple = round(np.mean(contours[:][0][0])),\
round(np.mean(contours[:][0][1]))
center: np.ndarray = np.array(center, dtype=np.int32)
angles = self._get_angles()
for i in range(self.numVariations):
if i > 0:
......
......@@ -9,14 +9,16 @@ import pickle
import os
import numpy as np
import matplotlib.pyplot as plt
import concurrent.futures
import sys
sys.path.append("C://Users//xbrjos//Desktop//Python")
from gepard import dataset
from gepard.analysis.particleAndMeasurement import Particle
from helpers import ParticleBinSorter
import methods as meth
import geometricMethods as gmeth
import chemometricMethods as cmeth
from chemometrics import chemometricMethods as cmeth
from datasetOperations import ParticleVariations
......@@ -24,13 +26,51 @@ def get_name_from_directory(dirPath: str) -> str:
return str(os.path.basename(dirPath).split('.')[0])
class TotalResults(object):
# methods: list = [meth.RandomSampling, meth.SizeBinFractioning, gmeth.CrossBoxSubSampling,
# gmeth.SpiralBoxSubsampling, cmeth.ChemometricSubsampling]
# measuredFractions: list = [0.01, 0.05, 0.1, 0.15, 0.2, 0.5, 0.75, 0.9]
# measuredFractions: list = [0.1, 0.15, 0.2, 0.5, 0.75, 0.9]
measuredFractions: list = [0.1, 0.3, 0.5, 0.9]
def get_methods_to_test(dataset: dataset.DataSet, fractions: list = [], maxTries: int = 100) -> list:
"""
:param fraction: The desired fraction to measure
:return: list of measurement Objects that are applicable
"""
if len(fractions) == 0:
fractions: list = [0.02, 0.05, 0.1, 0.25, 0.5, 0.7, 0.9]
# fractions: list = [0.02, 0.1, 0.5, 0.9]
methods: list = []
particleContainer = dataset.particleContainer
for fraction in fractions:
methods.append(meth.RandomSampling(particleContainer, fraction))
methods.append(meth.SizeBinFractioning(particleContainer, fraction))
boxCreator: gmeth.BoxSelectionCreator = gmeth.BoxSelectionCreator(dataset)
methods += boxCreator.get_crossBoxSubsamplers_for_fraction(fraction)
methods += boxCreator.get_spiralBoxSubsamplers_for_fraction(fraction)
methods += boxCreator.get_randomBoxSubsamplers_for_fraction(fraction, maxTries=maxTries)
methods += boxCreator.get_randomQuarterBoxSubsamplers_for_fraction(fraction, maxTries=maxTries)
# methods.append(cmeth.ChemometricSubsampling(particleContainer, fraction))
return methods
def update_sample(sample, force: bool, index: int):
sample.load_dataset()
methods: list = get_methods_to_test(sample.dataset)
sample.update_result_with_methods(methods, force)
return sample, index
def is_MP_particle(particle: Particle) -> bool:
# TODO: UPDATE PATTERNS -> ARE THESE REASONABLE???
isMP: bool = False
mpPatterns = ['poly', 'rubber', 'pb', 'pr', 'pg', 'py', 'pv']
assignment = particle.getParticleAssignment()
for pattern in mpPatterns:
if assignment.lower().find(pattern) != -1:
isMP = True
break
return isMP
class TotalResults(object):
def __init__(self):
super(TotalResults, self).__init__()
self.sampleResults: list = []
......@@ -57,22 +97,28 @@ class TotalResults(object):
:param force: Wether to force an update of an already existing method.
:return:
"""
for index, sample in enumerate(self.sampleResults):
sample.load_dataset()
possibleMethods: list = []
for fraction in self.measuredFractions:
for method in self._get_methods_for_fraction(sample.dataset, fraction):
possibleMethods.append(method)
forceList: list = [force]*len(self.sampleResults)
indices: list = list(np.arange(len(self.sampleResults)))
numSamples: int = len(forceList)
numWorkers: int = 4 # in case of quadcore processor that seams reasonable??
chunksize: int = int(round(numSamples / numWorkers * 0.7)) # we want to have slightly more chunks than workers
print(f'multiprocessing with {numSamples} samples and chunksize of {chunksize}')
with concurrent.futures.ProcessPoolExecutor() as executor:
results = executor.map(update_sample, self.sampleResults, forceList, indices, chunksize=chunksize)
sample.update_result_with_methods(possibleMethods, force=force)
print(f'processed {index+1} of {len(self.sampleResults)} samples')
for index, res in enumerate(results):
updatedSample, processid = res
print(f'returned from process {processid}, iteration index {index}')
self.sampleResults[index] = updatedSample
def get_error_vs_fraction_data(self, attributes: list = [], methods: list = []) -> dict:
"""
Returns Dict: Key: Method Label, Value: (Dict: Key:Measured Fraction, Value: averaged MPCountError over all samples)
:param attributes: A list of attributes that should be used for filtering the samples. Only samples with an
attribute from within that list are considered.
:return:
:return: Dict: Key: Method Label,
Value: {Dict: Key:Measured Fraction, Value: Tuple (averaged MPCountError, StDev MPCountError) over all samples}
"""
result: dict = {}
for sample in self.sampleResults:
......@@ -85,49 +131,40 @@ class TotalResults(object):
label: str = method.label
frac: float = method.fraction
error: float = res.mpCountError
stdev: float = res.mpCountErrorStDev
if label not in result.keys():
result[label] = {frac: [error]}
result[label] = {frac: [(error, stdev)]}
elif frac not in result[label].keys():
result[label][frac] = [error]
result[label][frac] = [(error, stdev)]
else:
result[label][frac].append(error)
result[label][frac].append((error, stdev))
for method in result.keys():
methodRes: dict = result[method]
for fraction in methodRes.keys():
methodRes[fraction] = np.mean(methodRes[fraction])
meanError = np.mean([i[0] for i in methodRes[fraction]])
meanStd = np.mean([i[1] for i in methodRes[fraction]])
methodRes[fraction] = (meanError, meanStd)
return result
def _get_methods_for_fraction(self, dataset: dataset.DataSet, fraction: float) -> list:
"""
:param fraction: The desired fraction to measure
:return: list of measurement Objects that are applicable
"""
particleContainer = dataset.particleContainer
methods: list = [meth.RandomSampling(particleContainer, fraction),
meth.SizeBinFractioning(particleContainer, fraction)]
boxCreator: gmeth.BoxSelectionCreator = gmeth.BoxSelectionCreator(dataset)
methods += boxCreator.get_crossBoxSubsamplers_for_fraction(fraction)
methods += boxCreator.get_spiralBoxSubsamplers_for_fraction(fraction)
methods.append(cmeth.ChemometricSubsampling(particleContainer, fraction))
return methods
class SubsamplingResult(object):
"""
Stores all interesting results from a subsampling experiment
"""
# TODO: UPDATE PATTERNS -> ARE THESE REASONABLE???
mpPatterns = ['poly', 'rubber', 'pb', 'pr', 'pg', 'py', 'pv']
# # # TODO: UPDATE PATTERNS -> ARE THESE REASONABLE???
# mpPatterns = ['poly', 'rubber', 'pb', 'pr', 'pg', 'py', 'pv']
def __init__(self, subsamplingMethod: meth.SubsamplingMethod):
super(SubsamplingResult, self).__init__()
self.method: meth.SubsamplingMethod = subsamplingMethod
self.mpCountErrors: list = []
# self.origParticleCount: int = None
# self.subSampledParticleCount: int = None
self.origParticleCount: int = 0
self.subSampledParticleCount: int = 0
self.origMPCount: int = 0
self.estimMPCounts: list = []
# self.mpCountErrorPerBin: tuple = None
@property
......@@ -137,12 +174,24 @@ class SubsamplingResult(object):
error = float(np.mean(self.mpCountErrors))
return error
@property
def mpCountErrorStDev(self) -> float:
stdev: float = 0.0
if len(self.mpCountErrors) > 0:
stdev = float(np.std(self.mpCountErrors))
return stdev
@property
def estimMPCount(self) -> float:
return float(np.mean(self.estimMPCounts))
def reset_results(self) -> None:
"""
Deletes all results
:return:
"""
self.mpCountErrors = []
self.estimMPCounts = []
def add_result(self, origParticles: list, subParticles: list) -> None:
"""
......@@ -151,7 +200,9 @@ class SubsamplingResult(object):
:param subParticles:
:return:
"""
self.mpCountErrors.append(self._get_mp_count_error(origParticles, subParticles, self.method.fraction))
error: float = self._get_mp_count_error(origParticles, subParticles, self.method.fraction)
self.origParticleCount = len(origParticles)
self.mpCountErrors.append(error)
def _get_mp_count_error_per_bin(self, allParticles: list, subParticles: list, fractionMeasured: float) -> tuple:
binSorter = ParticleBinSorter()
......@@ -164,7 +215,9 @@ class SubsamplingResult(object):
def _get_mp_count_error(self, allParticles: list, subParticles: list, fractionMeasured: float) -> float:
numMPOrig = self._get_number_of_MP_particles(allParticles)
self.origMPCount = numMPOrig
numMPEstimate = self._get_number_of_MP_particles(subParticles) / fractionMeasured
self.estimMPCounts.append(numMPEstimate)
if numMPOrig != 0:
mpCountError = self._get_error_from_values(numMPOrig, numMPEstimate)
......@@ -182,19 +235,16 @@ class SubsamplingResult(object):
def _get_number_of_MP_particles(self, particleList: list) -> int:
numMPParticles = 0
for particle in particleList:
assignment = particle.getParticleAssignment()
for pattern in self.mpPatterns:
if assignment.lower().find(pattern) != -1:
numMPParticles += 1
break
if is_MP_particle(particle):
numMPParticles += 1
return numMPParticles
class SampleResult(object):
"""
An object the actually stores all generated results per sample and can update and report on them.
An object the stores all generated results per sample and can update and report on them.
"""
def __init__(self, filepath: str, numVariations: int = 10):
def __init__(self, filepath: str, numVariations: int = 5):
super(SampleResult, self).__init__()
self.filepath: str = filepath
self.dataset: dataset.DataSet = None
......@@ -213,7 +263,7 @@ class SampleResult(object):
def update_result_with_methods(self, methods: list, force: bool = False) -> list:
"""
Updates result with the given method (contains desiredFraction already)
:param method: The SubsamplingMethod Object
:param methods: List of the SubsamplingMethod Objects to use
:param force: Wether to force an update. If False, the result is not updated, if it is already present.
:return: list of updated methods
"""
......@@ -221,8 +271,7 @@ class SampleResult(object):
self.load_dataset()
updatedMethods: list = []
particleVariations: ParticleVariations = ParticleVariations(self.dataset.particleContainer,
numVariations=self.numVariations)
particleVariations: ParticleVariations = ParticleVariations(self.dataset, numVariations=self.numVariations)
needsToBeUpdated: dict = {method: False for method in methods}
......@@ -247,8 +296,8 @@ class SampleResult(object):
result.add_result(method.particleContainer.particles, subParticles)
if method not in updatedMethods:
updatedMethods.append(method)
print(f'updated {self.sampleName} with {method.label} at fraction {method.fraction}, '
f'iteration {index+1}')
# print(f'updated {self.sampleName} with {method.label} at fraction {method.fraction}, '
# f'iteration {index+1}')
return updatedMethods
......@@ -295,9 +344,3 @@ class SampleResult(object):
requestedResult = result
break
return requestedResult
# def _get_result_of_method(self, method: meth.SubsamplingMethod) -> SubsamplingResult:
# return None
import numpy as np
from itertools import combinations
from methods import SubsamplingMethod
from copy import deepcopy
import sys
sys.path.append("C://Users//xbrjos//Desktop//Python")
from gepard import dataset
import helpers
from cythonModules import randoms
def box_overlaps_other_box(topLeft1: list, topLeft2: list, boxSize: float) -> bool:
"""
Returns true if the two specified boxes overlap
:param topLeft1:
:param topLeft2:
:param boxSize:
:return:
"""
return abs(topLeft1[0] - topLeft2[0]) < boxSize and abs(topLeft1[1] - topLeft2[1]) < boxSize
class BoxSelectionSubsamplingMethod(SubsamplingMethod):
possibleBoxNumbers: list = [7, 10, 15]
def __init__(self, *args):
super(BoxSelectionSubsamplingMethod, self).__init__(*args)
self.filterDiameter: float = 500
self.offset: tuple = (0, 0)
self.numBoxes: int = 1
self.maxFractions: dict = {}
@property
def label(self) -> str:
......@@ -22,6 +38,16 @@ class BoxSelectionSubsamplingMethod(SubsamplingMethod):
def filterArea(self) -> float:
return np.pi * (self.filterDiameter / 2) ** 2
@property
def boxSize(self) -> float:
totalBoxArea: float = self.filterArea * self.fraction
boxArea: float = totalBoxArea / self.numBoxes
return boxArea ** 0.5
@property
def noBoxOverlap(self) -> bool:
return not self._boxes_are_overlapping(self.get_topLeft_of_boxes())
def apply_subsampling_method(self) -> list:
def distanceToCnt(topleft: tuple):
return abs(topleft[0] - cntStart[0]) + abs(topleft[1] - cntStart[1])
......@@ -37,7 +63,6 @@ class BoxSelectionSubsamplingMethod(SubsamplingMethod):
for topLeftXY in sortedTopLefts:
if helpers.box_overlaps_contour(topLeftXY, boxWidthHeight, particle.contour):
subParticles.append(particle)
break
return subParticles
......@@ -55,8 +80,48 @@ class BoxSelectionSubsamplingMethod(SubsamplingMethod):
newTopLefts.append((topLeft[0] + self.offset[0], topLeft[1] + self.offset[1]))
return newTopLefts
def equals(self, otherMethod) -> bool:
raise NotImplementedError
def _boxes_are_overlapping(self, topLefts: list) -> bool:
"""
Calculates if there is any overlap of the boxes
:return:
"""
overlaps: bool = False
boxSize = self.boxSize
for topLeft1, topLeft2 in combinations(topLefts, 2):
if box_overlaps_other_box(topLeft1, topLeft2, boxSize):
overlaps = True
break
return overlaps
def _get_max_distance_of_box_to_center(self, boxCenter: tuple, center: tuple = (0, 0)) -> float:
"""
Calculates the maximal distance of all point in a box to the given center
:param boxCenter:
:param center:
:return:
"""
center = np.array(center)
boxSize = self.boxSize
coords: np.ndarray = np.array([[boxCenter[0] - 0.5 * boxSize, boxCenter[1] - 0.5 * boxSize],
[boxCenter[0] + 0.5 * boxSize, boxCenter[1] - 0.5 * boxSize],
[boxCenter[0] - 0.5 * boxSize, boxCenter[1] + 0.5 * boxSize],
[boxCenter[0] + 0.5 * boxSize, boxCenter[1] + 0.5 * boxSize]])
distances: np.ndarray = np.linalg.norm(coords - center, axis=1)
return np.max(distances)
def get_maximum_achievable_fraction(self) -> float:
if len(self.maxFractions) == 0:
self.update_max_fractions()
if self.numBoxes not in self.maxFractions.keys():
self.maxFractions[self.numBoxes] = determine_max_achievable_frac(self, self.numBoxes)
return self.maxFractions[self.numBoxes]
def update_max_fractions(self) -> None:
for boxNum in self.possibleBoxNumbers:
self.maxFractions[boxNum] = determine_max_achievable_frac(self, boxNum)
class BoxSelectionCreator(object):
......@@ -71,10 +136,7 @@ class BoxSelectionCreator(object):
:return list of CrossBoxSubsamplers:
"""
crossBoxSubsamplers = []
offset, diameter, widthHeight = helpers.get_filterDimensions_from_dataset(self.dataset)
diameter = helpers.convert_length_to_pixels(self.dataset, diameter)
offset = helpers.convert_length_to_pixels(self.dataset, offset[0]), \
helpers.convert_length_to_pixels(self.dataset, offset[1])
diameter, offset = self._get_diameter_and_offset()
for numBoxesAcross in [3, 5]:
newBoxSelector: CrossBoxSubSampling = CrossBoxSubSampling(self.dataset.particleContainer, desiredFraction)
......@@ -82,8 +144,7 @@ class BoxSelectionCreator(object):
newBoxSelector.offset = offset
newBoxSelector.numBoxesAcross = numBoxesAcross
maxFraction: float = newBoxSelector.get_maximum_achievable_fraction()
if desiredFraction <= maxFraction:
if newBoxSelector.config_is_valid():
crossBoxSubsamplers.append(newBoxSelector)
return crossBoxSubsamplers
......@@ -95,10 +156,7 @@ class BoxSelectionCreator(object):
:return list of SpiralBoxSelectors:
"""
spiralBoxSubsamplers = []
offset, diameter, widthHeight = helpers.get_filterDimensions_from_dataset(self.dataset)
diameter = helpers.convert_length_to_pixels(self.dataset, diameter)
offset = helpers.convert_length_to_pixels(self.dataset, offset[0]), \
helpers.convert_length_to_pixels(self.dataset, offset[1])
diameter, offset = self._get_diameter_and_offset()
for numBoxes in SpiralBoxSubsampling.possibleBoxNumbers:
newBoxSelector: SpiralBoxSubsampling = SpiralBoxSubsampling(self.dataset.particleContainer, desiredFraction)
......@@ -106,11 +164,54 @@ class BoxSelectionCreator(object):
newBoxSelector.offset = offset
newBoxSelector.numBoxes = numBoxes
if newBoxSelector.noBoxOverlap:
if newBoxSelector.config_is_valid():
spiralBoxSubsamplers.append(newBoxSelector)
return spiralBoxSubsamplers
def get_randomBoxSubsamplers_for_fraction(self, desiredFraction: float, maxTries: int = 100) -> list:
randomBoxSamplers: list = []
diameter, offset = self._get_diameter_and_offset()
randomBoxSampler: RandomBoxSampling = RandomBoxSampling(None, desiredFraction)
randomBoxSampler.maxTries = maxTries
randomBoxSampler.update_max_fractions()
for numBoxes in randomBoxSampler.possibleBoxNumbers:
randomBoxSampler.numBoxes = numBoxes
if randomBoxSampler.config_is_valid():
newSampler: RandomBoxSampling = deepcopy(randomBoxSampler)
newSampler.particleContainer = self.dataset.particleContainer
newSampler.filterDiameter = diameter
newSampler.offset = offset
randomBoxSamplers.append(newSampler)
return randomBoxSamplers
def get_randomQuarterBoxSubsamplers_for_fraction(self, desiredFraction: float, maxTries: int = 100) -> list:
randomBoxSamplers: list = []
diameter, offset = self._get_diameter_and_offset()
randomBoxSampler: RandomQuarterBoxes = RandomQuarterBoxes(None, desiredFraction)
randomBoxSampler.maxTries = maxTries
randomBoxSampler.update_max_fractions()
for numBoxes in randomBoxSampler.possibleBoxNumbers:
randomBoxSampler.numBoxes = numBoxes
if randomBoxSampler.config_is_valid():
newSampler: RandomBoxSampling = deepcopy(randomBoxSampler)
newSampler.particleContainer = self.dataset.particleContainer
newSampler.filterDiameter = diameter
newSampler.offset = offset
randomBoxSamplers.append(newSampler)
return randomBoxSamplers
def _get_diameter_and_offset(self) -> tuple:
offset, diameter, widthHeight = helpers.get_filterDimensions_from_dataset(self.dataset)
diameter: float = helpers.convert_length_to_pixels(self.dataset, diameter)
offset: tuple = helpers.convert_length_to_pixels(self.dataset, offset[0]), \
helpers.convert_length_to_pixels(self.dataset, offset[1])
return diameter, offset
class CrossBoxSubSampling(BoxSelectionSubsamplingMethod):
def __init__(self, particleContainer, desiredFraction: float = 0.1) -> None:
......@@ -199,8 +300,6 @@ class CrossBoxSubSampling(BoxSelectionSubsamplingMethod):
class SpiralBoxSubsampling(BoxSelectionSubsamplingMethod):
possibleBoxNumbers: list = [7, 10, 15]
def __init__(self, particleContainer, desiredFraction: float = 0.1) -> None:
super(SpiralBoxSubsampling, self).__init__(particleContainer, desiredFraction)
self.numBoxes = 10
......@@ -209,16 +308,6 @@ class SpiralBoxSubsampling(BoxSelectionSubsamplingMethod):
def label(self) -> str:
return f'Boxes SpiralLayout ({self.numBoxes} boxes)'
@property
def noBoxOverlap(self) -> bool:
return not self._boxes_are_overlapping(self.get_topLeft_of_boxes())
@property
def boxSize(self) -> float:
totalBoxArea: float = self.filterArea * self.fraction
boxArea: float = totalBoxArea / self.numBoxes
return boxArea ** 0.5
@property
def spiralSlope(self) -> float:
return self.armDistance / (2 * np.pi)
......@@ -243,8 +332,8 @@ class SpiralBoxSubsampling(BoxSelectionSubsamplingMethod):
topLefts.append(newPoint)
theta += boxDistance / (slope * np.sqrt(1 + theta ** 2))
boxDistance *= 1.05
topLefts = self._move_and_scale_toplefts(topLefts)
if len(topLefts) > 1:
topLefts = self._move_and_scale_toplefts(topLefts)
return self._apply_offset_to_toplefts(topLefts)
def equals(self, otherMethod) -> bool:
......@@ -268,7 +357,7 @@ class SpiralBoxSubsampling(BoxSelectionSubsamplingMethod):
lastBoxCenter: tuple = (xCoordsBoxMiddles[-1], yCoordsBoxMiddles[-1])
distanceLastCenter: float = np.linalg.norm(lastBoxCenter)
maxDistanceInLastBox: float = self._get_max_distance_of_boxCenter_to_center(lastBoxCenter)
maxDistanceInLastBox: float = self._get_max_distance_of_box_to_center(lastBoxCenter)
halfBoxDistance: float = maxDistanceInLastBox - distanceLastCenter
desiredDistanceTotal: float = self.filterDiameter / 2
desiredDistanceCenter: float = desiredDistanceTotal - halfBoxDistance
......@@ -283,37 +372,123 @@ class SpiralBoxSubsampling(BoxSelectionSubsamplingMethod):
newTopLefts = zip(np.round(xCoords), np.round(yCoords))
return list(tuple(newTopLefts))
def _get_max_distance_of_boxCenter_to_center(self, boxCenter: tuple, center: tuple = (0, 0)) -> float:
"""
Calculates the maximal distance of a box to the given center
:param topLeft:
:param boxSize:
:return:
"""
center = np.array(center)
boxSize = self.boxSize
coords: np.ndarray = np.array([[boxCenter[0] - 0.5 * boxSize, boxCenter[1] - 0.5 * boxSize],
[boxCenter[0] + 0.5 * boxSize, boxCenter[1] - 0.5 * boxSize],
[boxCenter[0] - 0.5 * boxSize, boxCenter[1] + 0.5 * boxSize],
[boxCenter[0] + 0.5 * boxSize, boxCenter[1] + 0.5 * boxSize]])
distances: np.ndarray = np.linalg.norm(coords - center, axis=1)
return np.max(distances)
def _get_xy_at_angle(self, theta: float, centerXY: tuple = (0, 0)) -> tuple:
distance: float = self.spiralSlope * theta
return distance * np.cos(theta) + centerXY[0], distance * np.sin(theta) + centerXY[1]
def _boxes_are_overlapping(self, topLefts: list) -> bool:
"""
Calculates if there is any overlap of the boxes
:return:
"""
overlaps: bool = False
for topLeft1, topLeft2 in combinations(topLefts, 2):
if abs(topLeft1[0] - topLeft2[0]) < self.boxSize and abs(topLeft1[1] - topLeft2[1]) < self.boxSize:
overlaps = True
break
class RandomBoxSampling(BoxSelectionSubsamplingMethod):
def __init__(self, particleContainer, desiredFraction=0.1, maxAngle=2*np.pi):
super(RandomBoxSampling, self).__init__(particleContainer, desiredFraction)
self.numBoxes: int = 10
self.maxTries: int = 50
self.__maxAngle: float = maxAngle
return overlaps
@property
def label(self) -> str:
return f'Boxes random layout ({self.numBoxes} boxes)'
def equals(self, otherMethod) -> bool:
equals: bool = False
if type(otherMethod) == type(self) and otherMethod.fraction == self.fraction:
if otherMethod.numBoxes == self.numBoxes and otherMethod.__maxAngle == self.__maxAngle:
equals = True
return equals
def get_topLeft_of_boxes(self) -> list:
#
# valid, topLefts = randoms.get_random_topLefts(self.numBoxes, self.boxSize,
# self.filterDiameter/2, self.__maxAngle,
# seed=self.randomSeed, maxTries=self.maxTries)
#
# if not valid:
# raise AttributeError
#
# topLefts: list = [[topLefts[i, 0], topLefts[i, 1]] for i in range(topLefts.shape[0])]
#
def get_random_topleft() -> list:
angle = np.random.rand() * self.__maxAngle
dist = np.random.rand() * maxDist
x: float = dist * np.cos(angle) + radius - boxSize / 2
y: float = dist * np.sin(angle) + radius - boxSize / 2
return [x, y]
np.random.seed(self.randomSeed)
topLefts: list = []
boxSize: float = self.boxSize
radius: float = self.filterDiameter / 2
maxDist: float = radius - np.sqrt((boxSize / 2) ** 2 + (boxSize / 2) ** 2)
outerCounter: int = 0
validSolutionFound: bool = False
while not validSolutionFound and outerCounter < self.maxTries:
topLefts = []
for i in range(self.numBoxes):
if i == 0:
topLefts.append(get_random_topleft())
else:
counter: int = 0
while counter < 50:
newTopLeft: list = get_random_topleft()
for topLeft2 in topLefts:
if box_overlaps_other_box(newTopLeft, topLeft2, boxSize):
break
else: # i.e., if no break occurred
topLefts.append(newTopLeft)
break
counter += 1
if len(topLefts) == self.numBoxes:
validSolutionFound = True
else:
outerCounter += 1
if not validSolutionFound:
raise AttributeError
return topLefts
class RandomQuarterBoxes(RandomBoxSampling):
def __init__(self, particleContainer, desiredFraction=0.1, maxAngle=0.5*np.pi):
super(RandomQuarterBoxes, self).__init__(particleContainer, desiredFraction, maxAngle)
@property <