...
 
Commits (9)
......@@ -20,3 +20,7 @@ cythonModules/build/
chemometrics/Assignments.txt
chemometrics/Data.txt
chemometrics/Assignments_all.txt
chemometrics/Data_all.txt
......@@ -5,7 +5,7 @@ from sklearn.decomposition import PCA
from sklearn.cluster import DBSCAN
# from scipy import spatial
# from itertools import combinations
from random import sample
from random import sample, random
import pickle
from sklearn.ensemble import RandomForestClassifier
from sklearn.neural_network import MLPClassifier
......@@ -14,9 +14,10 @@ import sys
sys.path.append("C://Users//xbrjos//Desktop//Python")
from gepard.analysis.particleContainer import ParticleContainer
from gepard.analysis import particleAndMeasurement as pm
from gepard.analysis import particleCharacterization as pc
from gepard.helperfunctions import cv2imread_fix
from methods import SubsamplingMethod
from helpers import timingDecorator
def get_pca(data: np.ndarray, numComp: int = 2) -> np.ndarray:
try:
......@@ -59,34 +60,57 @@ def get_n_points_closest_to_point(points: np.ndarray, n: int, refPoint: np.ndarr
return list(sortedIndices[:n])
def get_particle_featurematrix(particleContainer: ParticleContainer) -> np.ndarray:
def get_particle_featurematrix(particleContainer: ParticleContainer, fullimg: np.ndarray = None) -> np.ndarray:
"""
:return: np.ndarray, numRows: Features, numCols: Particles
"""
vectors: list = []
for particle in particleContainer.particles:
vectors.append(get_characteristic_vector(particle))
firstVecLength: int = 0
for index, particle in enumerate(particleContainer.particles):
particleImg: np.ndarray = None
if fullimg is not None:
particleImg = pc.getParticleImageFromFullimage(particle.contour, fullimg)
vector: np.ndarray = get_characteristic_vector(particle, particleImg)
if index == 0:
firstVecLength = vector.shape[0]
else:
assert vector.shape[0] == firstVecLength, 'particle feature vectors have non-uniform lengths...'
vectors.append(vector)
vectors: np.ndarray = np.array(vectors)
assert vectors.shape[0] == len(particleContainer.particles)
return vectors
def get_characteristic_vector(particle: pm.Particle) -> np.ndarray:
def get_characteristic_vector(particle: pm.Particle, particleImg: np.ndarray = None) -> np.ndarray:
vector: list = []
# vector += list(get_log_hu_moments(particle.contour))
vector += list(get_log_hu_moments(particle.contour))
vector.append(float(get_color_index(particle.color)))
vector.append(get_solidity(particle.contour))
vector.append(get_aspect_ratio(particle.contour))
vector.append(get_extent(particle.contour))
vector.append(cv2.contourArea(particle.contour))
# vector.append(get_shape_index(particle.shape))
# vector.append(cv2.arcLength(particle.contour, True))
vector.append(get_shape_index(particle.shape))
vector.append(cv2.arcLength(particle.contour, True))
for ftHarmonic in get_curvature_ft(particle.contour):
vector.append(ftHarmonic)
if particleImg is not None:
image_features: np.ndarray = get_image_feature_vec(particleImg)
for val in image_features:
vector.append(val)
# vector: np.ndarray = np.hstack((log_hu, color))
# if len(vector) != 11:
# print('error')
# assert len(vector) == 7 + 4, f'wrong feature vector: {vector} with shape: {vector.shape}'
for entry in vector:
try:
float(entry)
except ValueError:
print('not numeric value found')
raise
assert not np.isnan(entry)
return np.array(vector)
......@@ -95,9 +119,9 @@ def get_solidity(contour: np.ndarray) -> float:
hull: np.ndarray = cv2.convexHull(contour)
hull_area: float = cv2.contourArea(hull)
if area == 0 or hull_area == 0:
raise ValueError
solidity: float = area / hull_area
solidity: float = 0
else:
solidity: float = area / hull_area
return solidity
......@@ -112,10 +136,11 @@ def get_aspect_ratio(contour: np.ndarray) -> float:
if short > long:
long, short = short, long
if short == 0.0:
raise InvalidParticleError
aspectRatio: float = 1.0
if short > 0.0:
aspectRatio = long/short
return long/short
return aspectRatio
def get_extent(contour: np.ndarray) -> float:
......@@ -138,6 +163,23 @@ def get_log_hu_moments(contour: np.ndarray) -> np.ndarray:
return resultMoments[:, 0]
def get_image_feature_vec(particleImg: np.ndarray) -> np.ndarray:
meanStdev: np.ndarray = get_mean_and_stdev(particleImg)
return meanStdev.reshape(meanStdev.size)
def get_mean_and_stdev(img: np.ndarray) -> np.ndarray:
try:
meanStd: tuple = cv2.meanStdDev(img)
colorMean: np.ndarray = np.array([i[0] for i in meanStd[0]])
colorStd: np.ndarray = np.array([i[0] for i in meanStd[1]])
except cv2.error: # i.e, one pixel images...
colorMean: np.ndarray = np.array([128, 128, 128])
colorStd: np.ndarray = np.array([0, 0, 0])
# print('invalid here...:', img, img.shape, colorMean, colorStd, np.vstack((colorMean, colorStd)))
return np.vstack((colorMean, colorStd))
def get_color_hash(color: str, desiredLength: int = 4) -> np.ndarray:
colorArray: list = [int(i) for i in str(abs(hash(color)))[:desiredLength]]
return np.transpose(np.array(colorArray))
......@@ -150,23 +192,143 @@ def get_color_index(color: str) -> int:
return colors.index(color)
# def get_shape_index(shape: str) -> int:
# shapes: list = ['spherule', 'fibre', 'flake', 'irregular']
# assert shape in shapes
# return shapes.index(shape)
def get_shape_index(shape: str) -> int:
shapes: list = ['spherule', 'fibre', 'flake', 'irregular', None]
assert shape in shapes
return shapes.index(shape)
def get_curvature_ft(contour: np.ndarray, angularSegment: float = 20, numHarmonics: int = 8) -> np.ndarray:
"""
Calculates curvature of the contour and applies discrete fourier transform (dft) to it.
Adapted from Xu et al, "Comparison of shape features for the classification of wear particles",
doi:10.1016/S0952-1976(97)00017-1
:param angularSegment: In degree, section of 360 degree to consider for each angle
:param contour: Particle Contour
:param numHarmonics: Number of FT Harmonics to return
:return: the computed
"""
m: int = int(round(contour.shape[0] / (360 / angularSegment))) # smoothing factor, is this reasonable? That's, how it's described in the paper...
numPoints: int = contour.shape[0]
curvature: np.ndarray = np.zeros(numPoints)
for i in range(numPoints):
x, y = contour[i, 0, 0], contour[i, 0, 1]
index_before = i-m-1
index_after = i+m+1
if index_after > numPoints-1:
index_after -= numPoints
x_before, y_before = contour[index_before, 0, 0], contour[index_before, 0, 1]
x_after, y_after = contour[index_after, 0, 0], contour[index_after, 0, 1]
tan_before: float = 0
if x - x_before != 0:
tanr_before = np.rad2deg(np.arctan((y-y_before) / (x-x_before)))
tan_after: float = 0
if x_after - x != 0:
tan_after: float = np.rad2deg(np.arctan((y_after-y) / (x_after-x)))
curvature[i] = tan_after - tan_before
dft: np.ndarray = np.fft.fft(curvature)
freqs: np.ndarray = np.fft.fftfreq(numPoints)
mask: np.ndarray = freqs > 0
dft_true: np.ndarray = 2 * np.abs(dft/numPoints)
dft_true = dft_true[mask]
numFTPoints = dft_true.shape[0]
# let's only take the odd ones (they seem to carry more information).
# Also, make sure that always numHarmonics numbers come out. Fill up with zeros, if contour was very short
indicesToTake: np.ndarray = np.arange(numHarmonics)*2 + 1
final_dfts = np.zeros(numHarmonics)
for i, indexToTake in enumerate(indicesToTake):
if indexToTake < numFTPoints:
final_dfts[i] = dft_true[indexToTake]
return final_dfts
class TrainedSubsampling(SubsamplingMethod):
def __init__(self, particleContainer: ParticleContainer, desiredFraction: float,
path: str = r'C:\Users\xbrjos\Desktop\Python\Subsampling\chemometrics\RandomForestClassifier, score 0.72.pkl'):
path: str = r'C:\Users\xbrjos\Desktop\Python\Subsampling\chemometrics\RandomForestClassifier, score 0.79.pkl',
fakeScore: float = 0.8):
super(TrainedSubsampling, self).__init__(particleContainer, desiredFraction)
self.score: float = None
self.clf = None
self.clfPath: str = path
self.fraction = desiredFraction
self.fakeClassifier: bool = True
self.fakeScore: float = fakeScore
self.fractionForExtrapolation: float = 0.0
self.predictedMPIndices: list = []
self._predict_MP_Indices()
def equals(self, otherMethod) -> bool:
isEqual: bool = False
if type(otherMethod) == TrainedSubsampling and otherMethod.fraction == self.fraction:
if otherMethod.score == self.score and otherMethod.clf is self.clf:
isEqual = True
return isEqual
@property
def label(self) -> str:
return 'Trained Random Sampling'
label: str = 'Dummy Trained Random Sampling'
if self.fakeClassifier:
label += f' (score {self.fakeScore})'
else:
label += f' (score {self.score})'
return label
def _predict_MP_Indices(self) -> None:
from evaluation import is_MP_particle
if not self.fakeClassifier:
self._load_classifier()
fullimgpath: str = r'C:\Users\xbrjos\Desktop\temp MP\Fullimages'
dsetname: str = self.particleContainer.datasetParent.name
imgPath: str = os.path.join(fullimgpath, dsetname + '.tif')
fullimg = cv2imread_fix(imgPath)
features: np.ndarray = get_particle_featurematrix(self.particleContainer, fullimg)
predictions: np.ndarray = self.clf.predict(features)
else:
self.score = self.fakeScore
particles: list = self.particleContainer.particles
predictions: np.ndarray = np.zeros(len(particles))
for index, particle in enumerate(particles):
if is_MP_particle(particle):
if random() <= self.fakeScore:
predictions[index] = 1
else:
if random() > self.fakeScore:
predictions[index] = 1
mpIndices = list(np.where(predictions == 1)[0])
nonMPIndices = list(np.where(predictions == 0)[0])
numNonMPIndices = len(nonMPIndices)
fracNonMPToTake: float = float(np.clip(-0.5 + 1/0.1 * self.fraction, 0.0, 1.0))
numNonMPToTake: int = int(round(fracNonMPToTake * numNonMPIndices))
self.predictedMPIndices = mpIndices + sample(nonMPIndices, numNonMPToTake)
def get_maximum_achievable_fraction(self) -> float:
maxFrac: float = 0.10
return maxFrac
def apply_subsampling_method(self) -> list:
numParticlesToSelect = round(len(self.particleContainer.particles) * self.fraction)
if numParticlesToSelect > len(self.predictedMPIndices):
numParticlesToSelect = len(self.predictedMPIndices)
indicesToSelect = sample(self.predictedMPIndices, numParticlesToSelect)
selectedParticles: list = []
for index in indicesToSelect:
selectedParticles.append(self.particleContainer.getParticleOfIndex(index))
numOrigParticles = len(self.particleContainer.particles)
numEnhancedParticles = len(self.predictedMPIndices)
self.fractionForExtrapolation = self.fraction * (numOrigParticles/numEnhancedParticles)
return selectedParticles
def _load_classifier(self) -> None:
assert os.path.exists(self.clfPath)
......@@ -183,18 +345,37 @@ class TrainedSubsampling(SubsamplingMethod):
mpIndices: list = list(np.where(assignments == 1)[0])
nonMpIndices: list = list(np.where(assignments == 0)[0])
numEstimMPParticles: int = len(mpIndices)
numPredictedMP: int = len(mpIndices)
numParticlesToMeasure = round(len(predictedAssignments) * self.fraction)
if numParticlesToMeasure <= numEstimMPParticles:
if numParticlesToMeasure <= numPredictedMP:
indicesToMeasure = set(sample(mpIndices, numParticlesToMeasure))
else:
remainingIndices: int = int(numParticlesToMeasure - numEstimMPParticles)
remainingIndices: int = int(numParticlesToMeasure - numPredictedMP)
indicesToMeasure = set(mpIndices + sample(nonMpIndices, remainingIndices))
assert len(indicesToMeasure) == numParticlesToMeasure
return indicesToMeasure
def get_theoretic_frac(self) -> float:
"""
The theoretical fraction that considers also the scoring of the trained model.
It is used for extrapolating the mpCount of the subsampled particle list.
:return:
"""
# score: float = self.score
# diff: float = 1/self.fraction - 1 # i.e., from 50 % score to 100 % score
# factor: float = 1 + (1 - score)/0.5 * diff
# pow: float = (1-self.fakeScore)
# theoreticFactor: float = (1/factor**pow)
# print('actual fraction, theor. factor is', self.fraction, theoreticFactor)
# return theoreticFactor
return self.fractionForExtrapolation
# class ChemometricSubsampling(SubsamplingMethod):
# # def __init__(self, particleContainer: ParticleContainer, desiredFraction: float):
......@@ -328,4 +509,28 @@ class TrainedSubsampling(SubsamplingMethod):
# assert abs(totalPointsAdded - numPointsToSelect) <= 1
# for clusterIndex in pointsPerCluster.keys():
# assert 0 <= pointsPerCluster[clusterIndex] <= len(labels[labels == clusterIndex])
# return pointsPerCluster
\ No newline at end of file
# return pointsPerCluster
if __name__ == '__main__':
import matplotlib.pyplot as plt
fractions: np.ndarray = np.linspace(0.01, 1, 100)
scores: np.ndarray = np.linspace(0.5, 1.0, 5)
plt.clf()
for score in scores:
# if score == 0.5:
# theorFractions = fractions
# a, b, n = 1, 1, 1.5
# data1 = a * fractions**n / (fractions**n + b)
# data1 -= data1.min()
# data1 /= data1.max()
# theorFactors = 0.5 + 0.5*data1
theorFractions = []
for frac in fractions:
diff: float = 1 / frac - 1 # i.e., from 50 % score to 100 % score
factor: float = 1 + (1 - score) / 0.5 * diff
theorFractions.append(1/factor**0.2)
plt.plot(fractions, theorFractions, label=str(score))
plt.legend()
plt.show()
\ No newline at end of file
......@@ -12,9 +12,12 @@ from sklearn.feature_selection import chi2
import pickle
import time
import sys
import os
sys.path.append("C://Users//xbrjos//Desktop//Python")
from gepard import dataset
from gepard.analysis.particleContainer import ParticleContainer
from gepard.analysis.particleCharacterization import getParticleImageFromFullimage
from gepard.helperfunctions import cv2imread_fix
from input_output import get_pkls_from_directory
from chemometricMethods import get_log_hu_moments, get_color_index, get_pca, get_characteristic_vector
......@@ -61,38 +64,49 @@ if __name__ == '__main__':
recreateNew: bool = True
if recreateNew:
pklsInFolders: dict = get_pkls_from_directory(r'C:\Users\xbrjos\Desktop\temp MP\NewDatasets')
fullimgpath: str = r'C:\Users\xbrjos\Desktop\temp MP\Fullimages'
X: list = []
y: list = []
counter = 0
for folder in pklsInFolders.keys():
for pklPath in pklsInFolders[folder]:
if counter < 100:
if counter < 50:
dset: dataset.DataSet = dataset.loadData(pklPath)
print('loaded', dset.name)
print('loaded', dset.name, counter)
imgPath: str = os.path.join(fullimgpath, dset.name + '.tif')
fullimg = cv2imread_fix(imgPath)
print('loaded fullimg', imgPath, counter)
partContainer: ParticleContainer = dset.particleContainer
for particle in partContainer.particles:
features: np.ndarray = get_characteristic_vector(particle)
# features: list = [abs(i) for i in get_log_hu_moments(particle.contour)]
# features.append(get_color_index(particle.color))
firstVecLength: int = 0
for index, particle in enumerate(partContainer.particles):
partImg: np.ndarray = getParticleImageFromFullimage(particle.contour, fullimg)
features: np.ndarray = get_characteristic_vector(particle, partImg)
if index == 0:
firstVecLength = features.shape[0]
else:
assert features.shape[0] == firstVecLength
X.append(features)
y.append(int(is_MP_particle(particle)))
counter += 1
X: np.ndarray = np.array(X)
y: np.ndarray = np.array(y)
X_all: np.ndarray = np.array(X)
y_all: np.ndarray = np.array(y)
MPindices: np.ndarray = np.where(y == 1)[0]
nonMPindices: np.ndarray = np.where(y == 0)[0]
X_all: np.ndarray = SelectKBest(chi2, k=15).fit_transform(abs(X_all), y_all)
MPindices: np.ndarray = np.where(y_all == 1)[0]
nonMPindices: np.ndarray = np.where(y_all == 0)[0]
nonMPindices: list = sample(list(nonMPindices), len(MPindices))
X_MP: list = list(X[MPindices])
y_MP: list = list(y[MPindices])
X_MP: list = list(X_all[MPindices])
y_MP: list = list(y_all[MPindices])
X_nonMP: list = list(X[nonMPindices])
y_nonMP: list = list(y[nonMPindices])
X_nonMP: list = list(X_all[nonMPindices])
y_nonMP: list = list(y_all[nonMPindices])
assert set(y_MP) == {1}
assert set(y_nonMP) == {0}
......@@ -100,6 +114,7 @@ if __name__ == '__main__':
X_equalized: np.ndarray = np.array(X_MP + X_nonMP)
y_equalized: np.ndarray = np.array(y_MP + y_nonMP)
sum = X_MP + X_nonMP
dset: tuple = (X_equalized, y_equalized)
......@@ -110,15 +125,37 @@ if __name__ == '__main__':
dset: tuple = pickle.load(fp)
X, y = dset
# np.savetxt('Data.txt', X)
# np.savetxt('Assignments.txt', y)
# with open(r'C:\Users\xbrjos\Desktop\Python\Subsampling\chemometrics\RandomForestClassifier, score 0.72.pkl', "rb") as fp:
# clf: RandomForestClassifier = pickle.load(fp)
# y_predicted = clf.predict(X)
np.savetxt('Data.txt', X)
np.savetxt('Assignments.txt', y)
np.savetxt('Data_all.txt', X_all)
np.savetxt('Assignments_all.txt', y_all)
# princComps = get_pca(X.transpose(), numComp=2)
#
# plt.scatter(princComps[:, 0], princComps[:, 1])
# print(X_equalized.shape)
# X: np.ndarray = SelectKBest(chi2, k=5).fit_transform(X, y)
# X: np.ndarray = SelectKBest(chi2, k=15).fit_transform(X, y)
# print(X_equalized.shape)
test_classification_models((X, y))
X = StandardScaler().fit_transform(X)
clf = RandomForestClassifier(n_estimators=1000)
clf.fit(X, y)
score = clf.score(X_all, y_all)
y_predicted = clf.predict(X_all)
errors: dict = {int(k): 0 for k in np.unique(y_all)}
for j in range(len(y_predicted)):
if y_all[j] != y_predicted[j]:
errors[y_all[j]] += 1
print('num MP Particles in set:', len(X_MP))
print(f'randForest with test size {len(y_all)} has score {round(score, 2)}, errors: {errors}')
import sys
sys.path.append("C://Users//xbrjos//Desktop//Python")
from gepard.helperfunctions import cv2imread_fix
from gepard.dataset import DataSet,loadData
from gepard.analysis.particleContainer import ParticleContainer
import cv2
import numpy as np
from scipy import spatial
import os
import matplotlib.pyplot as plt
from helpers import get_filterDimensions_from_dataset, get_center_from_filter_dimensions, convert_length_to_pixels
# from evaluation import is_MP_particle
import evaluation
def get_particle_patchiness(dataset: DataSet, numCells: int = 50, onlyMP=False) -> float:
offset, diameter, [width, height] = get_filterDimensions_from_dataset(dataset)
center: np.ndarray = get_center_from_filter_dimensions(offset, diameter)
width: float = convert_length_to_pixels(dataset, width)
height: float = convert_length_to_pixels(dataset, height)
pixelsPerTile: int = max(int(round(width/numCells)), int(round(height/numCells)))
centerX: int = int(round(convert_length_to_pixels(dataset, center[0] / pixelsPerTile)))
centerY: int = int(round(convert_length_to_pixels(dataset, center[1] / pixelsPerTile)))
radius: int = int(round(convert_length_to_pixels(dataset, diameter / pixelsPerTile * 0.5)))
numRows: int = int(np.ceil(height / pixelsPerTile)) + 1
numCols: int = int(np.ceil(width / pixelsPerTile)) + 1
partCount: int = 0
densityImage: np.ndarray = np.zeros((numRows, numCols))
for particle in dataset.particleContainer.particles:
if (onlyMP and evaluation.is_MP_particle(particle)) or not onlyMP:
particleCenter: tuple = np.mean(particle.contour[:, 0, 0]), np.mean(particle.contour[:, 0, 1])
row: int = int(round(particleCenter[1] / pixelsPerTile))
col: int = int(round(particleCenter[0] / pixelsPerTile))
densityImage[row, col] += 1
partCount += 1
mask: np.ndarray = np.zeros_like(densityImage)
cv2.circle(mask, (centerY, centerX), radius, 1, -1)
relevantData: np.ndarray = densityImage[mask > 0]
mean: np.ndarray = np.round(np.mean(relevantData), 2)
std: np.ndarray = np.round(np.std(relevantData), 2)
ratio: float = round(std/mean, 2)
# plt.imshow(densityImage)
# plt.title(f'MP particle count: {partCount},\ndensity mean: {mean}, density std: {std},\npatchiness = {ratio}')
# plt.axis('off')
# plt.tight_layout()
# plt.show()
# print(f'sample: {dataset.name}, mean: {mean}, std: {std}, ratio = {ratio}')
return ratio
if __name__ == '__main__':
# imgpath: str = r'C:\Users\xbrjos\Desktop\temp MP\Fullimages'
# imgname: str = '181120_MCI_2_ds1+2_all_ kleiner500_10_1.tif'
# imgname: str = '190619_5_PTPH_sld_190321_ds1_50_1_neu.tif'
#191213_P190814_TPHZ_ds1_50_1
# img: np.ndarray = cv2imread_fix(os.path.join(imgpath, imgname))
# gray: np.ndarray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
# _, binimg = cv2.threshold(gray, 20, 1, cv2.THRESH_BINARY_INV)
# distmap: np.ndarray = cv2.distanceTransform(binimg, cv2.DIST_L1, 3)
# plt.imshow(distmap, cmap='gray')
paths: list = [r'C:\Users\xbrjos\Desktop\temp MP\NewDatasets\water\181120_MCI_2_ds1+2_all_ kleiner500_10_1.pkl',
r'C:\Users\xbrjos\Desktop\temp MP\NewDatasets\wastewater, water\191213_P190814_TPHZ_ds1_50_1.pkl',
r'C:\Users\xbrjos\Desktop\temp MP\NewDatasets\Air\191119_RW6_Solling_50_2_neu.pkl']
distances: list = []
allParticles = []
for path in paths:
dset = loadData(path)
get_particle_patchiness(dset, 50, onlyMP=True)
for particle in dset.particleContainer.particles:
if evaluation.is_MP_particle(particle):
allParticles.append(particle.getParticleAssignment())
print(set(allParticles))
\ No newline at end of file
import numpy as np
cimport numpy as np
cimport cython
from cython cimport boundscheck, wraparound
@boundscheck(False)
@wraparound(False)
cdef bint box_overlaps_contour(unsigned int[:] boxTopLeftXY, unsigned int boxSize, unsigned int[:, :, :] contourData):
cdef bint isOverlapping = False
cdef unsigned int xmin, xmax, width, boxXmin, boxXmax, ymin, ymax, height, boxYmin, boxYmax
xmin = np.min(contourData[:, 0, 1])
xmax = np.max(contourData[:, 0, 1])
width = xmax - xmin
boxXmin = boxTopLeftXY[0]
boxXmax = boxTopLeftXY[0] + boxSize
if xmin > (boxXmin-width/2):
if xmax < (boxXmax+width/2):
ymin = np.min(contourData[:, 0, 1])
ymax = np.max(contourData[:, 0, 1])
height = ymax - ymin
boxYmin = boxTopLeftXY[1]
boxYmax = boxTopLeftXY[1] + boxSize
if ymin > (boxYmin-height/2):
if ymax < (boxYmax+width/2):
isOverlapping = True
return isOverlapping
@boundscheck(False)
@wraparound(False)
def def_get_indices_of_overlapping_particles(contours not None, unsigned int[:, :] topLefts, unsigned int boxSize):
cdef Py_ssize_t i, j
cdef unsigned int counter, numParticles, numTopLefts
numParticles = len(contours)
numTopLefts = topLefts.shape[0]
cdef unsigned int[:] overlappingIndices = np.zeros(numParticles, dtype=np.uint32)
cdef unsigned int[:, :, :] currentContour
counter = 0
for i in range(numParticles):
currentContour = contours[i]
for j in range(numTopLefts):
if box_overlaps_contour(topLefts[j, :], boxSize, currentContour):
overlappingIndices[counter] = i
counter += 1
break
return overlappingIndices[:counter]
......@@ -3,33 +3,35 @@ cimport numpy as np
cimport numpy.random
cimport cython
from libc.math cimport sin, cos, round, abs
from libc.stdlib cimport rand, srand, RAND_MAX
DTYPE = np.float
ctypedef np.int32_t INT32_t
cdef get_random_topleft(double maxDist, double maxAngle, double radius, double boxSize):
cdef double angle, dist, x, y
cdef np.ndarray[INT32_t, ndim=1] newTopLeft
dist = np.random.rand() * maxDist
angle = np.random.rand() * maxAngle
newTopLeft = np.empty(2, dtype=np.int32)
x = dist*np.cos(angle) + radius - boxSize/2
y = dist*np.sin(angle) + radius - boxSize/2
newTopLeft[0] = np.int32(np.round(x))
newTopLeft[1] = np.int32(np.round(y))
cdef int newTopLeft[2]
dist = rand() / 32767.0 * maxDist
angle = rand() / 32767.0 * maxAngle
x = dist*cos(angle) + radius - boxSize/2
y = dist*sin(angle) + radius - boxSize/2
newTopLeft[0] = int(round(x))
newTopLeft[1] = int(round(y))
return newTopLeft
def get_random_topLefts(int numBoxes, double boxSize, double radius, double maxAngle, int seed=1337, int maxTries=50):
cdef np.ndarray[INT32_t, ndim=2] topLefts
cdef np.ndarray[INT32_t, ndim=1] newTopLeft
cdef int newTopLeft[2]
cdef double maxDist
cdef int outerCounter, counter, x, y, i, j, diffX, diffY, successfullyAdded
cdef bint validSolutionFound, boxOverlaps
np.random.seed(seed)
srand(seed) # setting seed
assert RAND_MAX == 32767 # this value is used in the random-function above. For performance-reasons, it is directly typed in there as a number
maxDist = radius - np.sqrt((boxSize/2)**2 + (boxSize/2)**2)
outerCounter = 0
validSolutionFound = False
......@@ -47,8 +49,8 @@ def get_random_topLefts(int numBoxes, double boxSize, double radius, double maxA
newTopLeft = get_random_topleft(maxDist, maxAngle, radius, boxSize)
boxOverlaps = False
for j in range(i):
diffX = abs(np.float(newTopLeft[0] - np.float(topLefts[j, 0])))
diffY = abs(np.float(newTopLeft[1] - np.float(topLefts[j, 1])))
diffX = abs(newTopLeft[0] - topLefts[j, 0])
diffY = abs(newTopLeft[1] - topLefts[j, 1])
if diffX < boxSize and diffY < boxSize:
boxOverlaps = True
......
......@@ -19,8 +19,14 @@ if len(sys.argv) == 1:
# ext = Extension("getRandomTopLefts", ["getRandomTopLefts.pyx"], extra_compile_args=['-O3'],)
# setup(
# name="get a given number of random topLefts",
# ext_modules=cythonize("randoms.pyx", annotate=True), # accepts a glob pattern
# include_dirs=[np.get_include()]
# )
setup(
name="get a given number of random topLefts",
ext_modules=cythonize("randoms.pyx", annotate=True), # accepts a glob pattern
name="checks which particle contours overlap the boxes",
ext_modules=cythonize("particleBoxOverlap.pyx", annotate=True), # accepts a glob pattern
include_dirs=[np.get_include()]
)
)
\ No newline at end of file
......@@ -5,11 +5,13 @@ Created on Wed Jan 22 13:57:28 2020
@author: luna
"""
import pickle
# import pickle
import os
import numpy as np
import matplotlib.pyplot as plt
import time
# import matplotlib.pyplot as plt
import concurrent.futures
import operator
import sys
sys.path.append("C://Users//xbrjos//Desktop//Python")
from gepard import dataset
......@@ -19,6 +21,7 @@ from helpers import ParticleBinSorter
import methods as meth
import geometricMethods as gmeth
from chemometrics import chemometricMethods as cmeth
from chemometrics.imageOperations import get_particle_patchiness
from datasetOperations import ParticleVariations
......@@ -32,8 +35,9 @@ def get_methods_to_test(dataset: dataset.DataSet, fractions: list = [], maxTries
:return: list of measurement Objects that are applicable
"""
if len(fractions) == 0:
fractions: list = [0.02, 0.05, 0.1, 0.25, 0.5, 0.7, 0.9]
# fractions: list = [0.02, 0.1, 0.5, 0.9]
fractions: list = [0.02, 0.03, 0.05, 0.1, 0.25, 0.5, 0.7, 0.9]
# fractions: list = [0.02, 0.06, 0.15, 0.2, 0.5]
# fractions: list = [0.01, 0.1, 0.5, 0.9]
methods: list = []
particleContainer = dataset.particleContainer
......@@ -46,14 +50,19 @@ def get_methods_to_test(dataset: dataset.DataSet, fractions: list = [], maxTries
methods += boxCreator.get_spiralBoxSubsamplers_for_fraction(fraction)
methods += boxCreator.get_randomBoxSubsamplers_for_fraction(fraction, maxTries=maxTries)
methods += boxCreator.get_randomQuarterBoxSubsamplers_for_fraction(fraction, maxTries=maxTries)
# methods.append(cmeth.ChemometricSubsampling(particleContainer, fraction))
for fakeScore in [0.6, 0.7, 0.8]:
trainedSampling = cmeth.TrainedSubsampling(particleContainer, fraction, fakeScore=fakeScore)
if trainedSampling.config_is_valid():
methods.append(trainedSampling)
return methods
def update_sample(sample, force: bool, index: int):
sample.load_dataset()
t0 = time.time()
methods: list = get_methods_to_test(sample.dataset)
print('getting methods for sample', sample.dataset.name, 'took', round(time.time()-t0, 2), 'seconds')
sample.update_result_with_methods(methods, force)
return sample, index
......@@ -70,6 +79,14 @@ def is_MP_particle(particle: Particle) -> bool:
return isMP
def get_number_of_MP_particles(particleList: list) -> int:
numMPParticles = 0
for particle in particleList:
if is_MP_particle(particle):
numMPParticles += 1
return numMPParticles
class TotalResults(object):
def __init__(self):
super(TotalResults, self).__init__()
......@@ -91,54 +108,88 @@ class TotalResults(object):
return newResult
def update_all(self, force: bool = False) -> None:
def update_all(self, force: bool = False, multiprocessing: bool = True) -> None:
"""
Updates all samples with all methods and all fractions
:param force: Wether to force an update of an already existing method.
:param force: Whether to force an update of an already existing method.
:param multiprocessing: Whether to spawn multiple processes for computation
:return:
"""
forceList: list = [force]*len(self.sampleResults)
indices: list = list(np.arange(len(self.sampleResults)))
numSamples: int = len(forceList)
numWorkers: int = 4 # in case of quadcore processor that seams reasonable??
chunksize: int = int(round(numSamples / numWorkers * 0.7)) # we want to have slightly more chunks than workers
print(f'multiprocessing with {numSamples} samples and chunksize of {chunksize}')
with concurrent.futures.ProcessPoolExecutor() as executor:
results = executor.map(update_sample, self.sampleResults, forceList, indices, chunksize=chunksize)
for index, res in enumerate(results):
updatedSample, processid = res
print(f'returned from process {processid}, iteration index {index}')
self.sampleResults[index] = updatedSample
if multiprocessing:
forceList: list = [force]*len(self.sampleResults)
indices: list = list(np.arange(len(self.sampleResults)))
# numSamples: int = len(forceList)
# numWorkers: int = 4 # in case of quadcore processor that seams reasonable??
# chunksize: int = int(round(numSamples / numWorkers * 0.7)) # we want to have slightly more chunks than workers
# print(f'multiprocessing with {numSamples} samples and chunksize of {chunksize}')
chunksize = 1
with concurrent.futures.ProcessPoolExecutor() as executor:
results = executor.map(update_sample, self.sampleResults, forceList, indices, chunksize=chunksize)
for index, res in enumerate(results):
updatedSample, processid = res
print(f'returned from process {processid}, iteration index {index}')
self.sampleResults[index] = updatedSample
else:
for index, sampleResult in enumerate(self.sampleResults):
updatedResult, i = update_sample(sampleResult, True, index)
self.sampleResults[index] = updatedResult
print(f'done updating {updatedResult.dataset.name} at index {index}')
def get_error_vs_fraction_data(self, attributes: list = [], methods: list = []) -> dict:
def get_error_vs_fraction_data(self, attributes: list = [], methods: list = [], partCount: tuple = (0, np.inf)) -> tuple:
"""
:param attributes: A list of attributes that should be used for filtering the samples. Only samples with an
attribute from within that list are considered.
:param methods: A list of methods to extract
:param patchiness: Tuple, first val: min patchiness, second val, max patchiness
:return: Dict: Key: Method Label,
Value: {Dict: Key:Measured Fraction, Value: Tuple (averaged MPCountError, StDev MPCountError) over all samples}
"""
result: dict = {}
allParticles: list = []
numSamples: int = 0
for sample in self.sampleResults:
sample: SampleResult = sample
if attributes == [] or sample.has_any_attribute(attributes):
for res in sample.results:
res: SubsamplingResult = res
method: meth.SubsamplingMethod = res.method
if methods == [] or method.matches_any_pattern(methods):
label: str = method.label
frac: float = method.fraction
error: float = res.mpCountError
stdev: float = res.mpCountErrorStDev
if label not in result.keys():
result[label] = {frac: [(error, stdev)]}
elif frac not in result[label].keys():
result[label][frac] = [(error, stdev)]
else:
result[label][frac].append((error, stdev))
if partCount[0] == 0 and partCount[1] == np.inf:
samplePartCount: int = 1
# samplePatchiness: float = 1.0 # doesn't matter in this case
else:
if sample.dataset is None:
sample.load_dataset()
samplePartCount: int = len(sample.dataset.particleContainer.particles)
# samplePatchiness: float = sample.get_patchiness()
# print(sample.sampleName, samplePatchiness)
if partCount[0] <= samplePartCount < partCount[1]:
numSamples += 1
if sample.dataset is None:
sample.load_dataset()
for particle in sample.dataset.particleContainer.particles:
allParticles.append(particle)
for res in sample.results:
res: SubsamplingResult = res
method: meth.SubsamplingMethod = res.method
if methods == [] or method.matches_any_pattern(methods):
label: str = method.label
frac: float = method.fraction
error: float = res.mpCountError
stdev: float = res.mpCountErrorStDev
if label not in result.keys():
result[label] = {frac: [(error, stdev)]}
elif frac not in result[label].keys():
result[label][frac] = [(error, stdev)]
else:
result[label][frac].append((error, stdev))
numMPParticles: float = get_number_of_MP_particles(allParticles)
stats: dict = {'numSamples': numSamples,
'meanParticleCount': round(len(allParticles) / numSamples),
'meanMPFrac': round(numMPParticles / len(allParticles) * 100, 1)}
for method in result.keys():
methodRes: dict = result[method]
......@@ -147,7 +198,7 @@ class TotalResults(object):
meanStd = np.mean([i[1] for i in methodRes[fraction]])
methodRes[fraction] = (meanError, meanStd)
return result
return stats, result
class SubsamplingResult(object):
......@@ -200,7 +251,12 @@ class SubsamplingResult(object):
:param subParticles:
:return:
"""
error: float = self._get_mp_count_error(origParticles, subParticles, self.method.fraction)
if type(self.method) == cmeth.TrainedSubsampling:
fraction = self.method.get_theoretic_frac()
else:
fraction = self.method.fraction
error: float = self._get_mp_count_error(origParticles, subParticles, fraction)
self.origParticleCount = len(origParticles)
self.mpCountErrors.append(error)
......@@ -214,9 +270,9 @@ class SubsamplingResult(object):
return binSorter.bins, mpCountErrorsPerBin
def _get_mp_count_error(self, allParticles: list, subParticles: list, fractionMeasured: float) -> float:
numMPOrig = self._get_number_of_MP_particles(allParticles)
numMPOrig = get_number_of_MP_particles(allParticles)
self.origMPCount = numMPOrig
numMPEstimate = self._get_number_of_MP_particles(subParticles) / fractionMeasured
numMPEstimate = get_number_of_MP_particles(subParticles) / fractionMeasured
self.estimMPCounts.append(numMPEstimate)
if numMPOrig != 0:
......@@ -232,25 +288,19 @@ class SubsamplingResult(object):
assert (exact != 0)
return abs(exact - estimate) / exact * 100
def _get_number_of_MP_particles(self, particleList: list) -> int:
numMPParticles = 0
for particle in particleList:
if is_MP_particle(particle):
numMPParticles += 1
return numMPParticles
class SampleResult(object):
"""
An object the stores all generated results per sample and can update and report on them.
"""
def __init__(self, filepath: str, numVariations: int = 5):
def __init__(self, filepath: str, numVariations: int = 20):
super(SampleResult, self).__init__()
self.filepath: str = filepath
self.dataset: dataset.DataSet = None
self.results: list = []
self.attributes: list = []
self.numVariations: int = numVariations # how often the sample is altered for each method
self.patchiness: float = -1
@property
def sampleName(self) -> str:
......@@ -260,6 +310,13 @@ class SampleResult(object):
self.dataset = dataset.loadData(self.filepath)
assert self.dataset is not None
def get_patchiness(self) -> float:
if not hasattr(self, "patchiness") or self.patchiness == -1:
if self.dataset is None:
self.load_dataset()
self.patchiness = get_particle_patchiness(self.dataset)
return self.patchiness
def update_result_with_methods(self, methods: list, force: bool = False) -> list:
"""
Updates result with the given method (contains desiredFraction already)
......@@ -271,8 +328,8 @@ class SampleResult(object):
self.load_dataset()
updatedMethods: list = []
t0 = time.time()
particleVariations: ParticleVariations = ParticleVariations(self.dataset, numVariations=self.numVariations)
needsToBeUpdated: dict = {method: False for method in methods}
for index, particleContainer in enumerate(particleVariations.get_particleContainer_variations()):
......@@ -294,11 +351,10 @@ class SampleResult(object):
if needsToBeUpdated[method]:
subParticles = method.apply_subsampling_method()
result.add_result(method.particleContainer.particles, subParticles)
if method not in updatedMethods:
updatedMethods.append(method)
updatedMethods.append(method)
# print(f'updated {self.sampleName} with {method.label} at fraction {method.fraction}, '
# f'iteration {index+1}')
# f'iteration {index+1}, took {round(time.time()-t0, 2)}, seconds')
print(f'finished updating sample {self.sampleName}, it took {round(time.time()-t0, 2)} seconds')
return updatedMethods
def set_attribute(self, newAttribute: str) -> None:
......@@ -319,8 +375,38 @@ class SampleResult(object):
return hasAttr
def has_attribute(self, attribute: str) -> bool:
attributes: list = [attr.lower() for attr in self.attributes]
return attribute.lower() in attributes
hasAttr: bool = False
if attribute.find('$') == -1:
attributes: list = [attr.lower() for attr in self.attributes]
hasAttr = attribute.lower() in attributes
else:
operators: dict = {
'<': operator.__lt__,
'<=': operator.__le__,
'==': operator.__eq__,
'!=': operator.__ne__,
'>=': operator.__ge__,
'>': operator.__gt__
}
if self.dataset is None:
self.load_dataset()
valueToCompare: float = 0.0
dsetValue: float = 0.0
for string in operators.keys():
if attribute.find(string) != -1:
valueToCompare = float(attribute.split(string)[1])
if attribute.find('numParticles') != -1:
dsetValue = len(self.dataset.particleContainer.particles)
break
elif attribute.find('mpFraction') != -1:
particles: list = self.dataset.particleContainer.particles
dsetValue = get_number_of_MP_particles(particles) / len(particles)
break
hasAttr = operators[string](dsetValue, valueToCompare)
return hasAttr
def _remove_result_of_method(self, method: meth.SubsamplingMethod) -> None:
"""
......
import time
import numpy as np
from itertools import combinations
from methods import SubsamplingMethod
......@@ -6,7 +7,7 @@ import sys
sys.path.append("C://Users//xbrjos//Desktop//Python")
from gepard import dataset
import helpers
from cythonModules import randoms
from cythonModules import randoms, particleBoxOverlap
def box_overlaps_other_box(topLeft1: list, topLeft2: list, boxSize: float) -> bool:
......@@ -21,7 +22,7 @@ def box_overlaps_other_box(topLeft1: list, topLeft2: list, boxSize: float) -> bo
class BoxSelectionSubsamplingMethod(SubsamplingMethod):
possibleBoxNumbers: list = [7, 10, 15]
possibleBoxNumbers: list = [5, 10, 20]
def __init__(self, *args):
super(BoxSelectionSubsamplingMethod, self).__init__(*args)
......@@ -53,16 +54,27 @@ class BoxSelectionSubsamplingMethod(SubsamplingMethod):
return abs(topleft[0] - cntStart[0]) + abs(topleft[1] - cntStart[1])
subParticles: list = []
topLefts: list = self.get_topLeft_of_boxes()
boxWidthHeight: tuple = (self.boxSize, self.boxSize)
topLefts: np.ndarray = np.array(self.get_topLeft_of_boxes())
cython: bool = False
if cython:
topLefts = np.round(topLefts).astype(np.uint32)
# contours: np.ndarray = np.array(self.particleContainer.getParticleContours())
contours = [cnt.astype(np.uint32) for cnt in self.particleContainer.getParticleContours()]
boxSize: np.uint32 = np.uint32(round(self.boxSize))
indices = particleBoxOverlap.def_get_indices_of_overlapping_particles(contours, topLefts, boxSize)
for index in indices:
subParticles.append(self.particleContainer.getParticleOfIndex(index))
for particle in self.particleContainer.particles:
cntStart: tuple = (particle.contour[0, 0, 0], particle.contour[0, 0, 1])
sortedTopLefts = sorted(topLefts, key=distanceToCnt)
else:
boxWidthHeight: tuple = (self.boxSize, self.boxSize)
for particle in self.particleContainer.particles:
cntStart: tuple = (particle.contour[0, 0, 0], particle.contour[0, 0, 1])
sortedTopLefts = sorted(topLefts, key=distanceToCnt)
for topLeftXY in sortedTopLefts:
if helpers.box_overlaps_contour(topLeftXY, boxWidthHeight, particle.contour):
subParticles.append(particle)
for topLeftXY in sortedTopLefts:
if helpers.box_overlaps_contour(topLeftXY, boxWidthHeight, particle.contour):
subParticles.append(particle)
break
return subParticles
......@@ -263,6 +275,7 @@ class CrossBoxSubSampling(BoxSelectionSubsamplingMethod):
numBoxes: int = 2 * self.numBoxesAcross - 1
totalBoxArea: float = numBoxes * (maxBoxSize ** 2)
maxFraction: float = totalBoxArea / self.filterArea
return maxFraction
def equals(self, otherMethod) -> bool:
......@@ -386,7 +399,7 @@ class RandomBoxSampling(BoxSelectionSubsamplingMethod):
@property
def label(self) -> str:
return f'Boxes random layout ({self.numBoxes} boxes)'
return f'Boxes Random layout ({self.numBoxes} boxes)'
def equals(self, otherMethod) -> bool:
equals: bool = False
......@@ -396,57 +409,13 @@ class RandomBoxSampling(BoxSelectionSubsamplingMethod):
return equals
def get_topLeft_of_boxes(self) -> list:
#
# valid, topLefts = randoms.get_random_topLefts(self.numBoxes, self.boxSize,
# self.filterDiameter/2, self.__maxAngle,
# seed=self.randomSeed, maxTries=self.maxTries)
#
# if not valid:
# raise AttributeError
#
# topLefts: list = [[topLefts[i, 0], topLefts[i, 1]] for i in range(topLefts.shape[0])]
#
def get_random_topleft() -> list:
angle = np.random.rand() * self.__maxAngle
dist = np.random.rand() * maxDist
x: float = dist * np.cos(angle) + radius - boxSize / 2
y: float = dist * np.sin(angle) + radius - boxSize / 2
return [x, y]
np.random.seed(self.randomSeed)
topLefts: list = []
boxSize: float = self.boxSize
radius: float = self.filterDiameter / 2
maxDist: float = radius - np.sqrt((boxSize / 2) ** 2 + (boxSize / 2) ** 2)
outerCounter: int = 0
validSolutionFound: bool = False
while not validSolutionFound and outerCounter < self.maxTries:
topLefts = []
for i in range(self.numBoxes):
if i == 0:
topLefts.append(get_random_topleft())
else:
counter: int = 0
while counter < 50:
newTopLeft: list = get_random_topleft()
for topLeft2 in topLefts:
if box_overlaps_other_box(newTopLeft, topLeft2, boxSize):
break
else: # i.e., if no break occurred
topLefts.append(newTopLeft)
break
counter += 1
if len(topLefts) == self.numBoxes:
validSolutionFound = True
else:
outerCounter += 1
if not validSolutionFound:
valid, topLefts = randoms.get_random_topLefts(self.numBoxes, self.boxSize,
self.filterDiameter/2, self.__maxAngle,
seed=self.randomSeed, maxTries=self.maxTries)
if not valid:
raise AttributeError
topLefts: list = [[topLefts[i, 0], topLefts[i, 1]] for i in range(topLefts.shape[0])]
return topLefts
......@@ -456,7 +425,7 @@ class RandomQuarterBoxes(RandomBoxSampling):
@property
def label(self) -> str:
return f'Boxes random layout (quarter) ({self.numBoxes} boxes)'
return f'Boxes Random layout (quarter) ({self.numBoxes} boxes)'
def determine_max_achievable_frac(method: BoxSelectionSubsamplingMethod, numBoxes: int) -> float:
......
from matplotlib.figure import Figure
import matplotlib.pyplot as plt
from matplotlib.ticker import ScalarFormatter, FixedLocator
from scipy import optimize
from mpl_toolkits.mplot3d import Axes3D
import numpy as np
from evaluation import TotalResults
from evaluation import TotalResults, get_number_of_MP_particles, is_MP_particle
from chemometrics.imageOperations import get_particle_patchiness
from helpers import get_filterDimensions_from_dataset, get_center_from_filter_dimensions, convert_length_to_pixels
def get_error_vs_frac_plot(totalResults: TotalResults, attributes: list = [], methods: list = [],
standarddevs=True, fill=True) -> Figure:
def get_error_vs_frac_plot(totalResults: TotalResults, attributes: list = [], methods: list = [], partCounts: list = [],
standarddevs=True, fill=True, poissonRef=True) -> Figure:
if len(attributes) == 0 and len(methods) != 0:
attributes = [[]]*len(methods)
elif len(methods) == 0 and len(attributes) != 0:
methods = [[]]*len(attributes)
if len(partCounts) == 0:
patchiness = [[]]*len(attributes)
assert len(attributes) == len(methods)
fig: Figure = plt.figure(figsize=(10, 5))
numRows: int = 1
numCols: int = 1
if len(attributes) == 0:
attributes = methods = [[]]
elif len(attributes) <= 2:
......@@ -23,58 +31,346 @@ def get_error_vs_frac_plot(totalResults: TotalResults, attributes: list = [], me
numRows = 2
numCols = np.ceil(len(attributes)/numRows)
fig: Figure = plt.figure(figsize=(14, 3.5 * numRows))
index = 0
for attrs, meths in zip(attributes, methods):
for attrs, meths, pcounts in zip(attributes, methods, partCounts):
ax = fig.add_subplot(numRows, numCols, index + 1)
errorPerFraction: dict = totalResults.get_error_vs_fraction_data(attributes=attrs,
methods=meths)
if pcounts != []:
stats, errorPerFraction = totalResults.get_error_vs_fraction_data(attributes=attrs, methods=meths,
partCount=pcounts)
else:
stats, errorPerFraction = totalResults.get_error_vs_fraction_data(attributes=attrs, methods=meths)
numSamples = stats['numSamples']
meanParticleCount = stats['meanParticleCount']
meanMPFrac = stats['meanMPFrac']
if poissonRef:
firstSample: str = list(errorPerFraction.keys())[0]
fractions: list = list(errorPerFraction[firstSample].keys())
meansCounts: np.ndarray = np.array([frac * meanParticleCount * meanMPFrac/100 for frac in fractions])
stdevs: np.ndarray = 1 / np.sqrt(meansCounts) # mean = varianz = stdev**2 in Poisson Distribution
means = stdevs**2
#Conversions to %
means *= 100
stdevs *= 100
fractions = [frac*100 for frac in fractions]
if not standarddevs:
ax.plot(fractions, means, label='Poisson', marker='s', alpha=0.3)
else:
line = ax.errorbar(fractions, means, stdevs, label='Poisson', marker='s', capsize=5, alpha=0.3)
if fill:
color = line[0].get_color()
ax.fill_between(fractions, means-stdevs, means+stdevs, alpha=0.1, facecolor=color)
# print('errorbars:', means-stdevs, means+stdevs)
for methodLabel in errorPerFraction.keys():
errorDict: dict = errorPerFraction[methodLabel]
fractions: list = list(errorDict.keys())
errors: np.ndarray = np.array([errorDict[fraction][0] for fraction in fractions])
stdevs: np.ndarray = np.array([errorDict[fraction][1] for fraction in fractions])
fractions = [i * 100 for i in fractions] # convert to % for plotting
alphascale: float = 1 if methodLabel.find('Random Subsa') == -1 else 0.3
if not standarddevs:
ax.plot(fractions, errors, label=methodLabel, marker='s')
else:
line = ax.errorbar(fractions, errors, stdevs, label=methodLabel, marker='s', capsize=5)
line = ax.errorbar(fractions, errors, stdevs, label=methodLabel, marker='s', capsize=5, alpha=alphascale)
if fill:
color = line[0].get_color()
ax.fill_between(fractions, errors-stdevs, errors+stdevs, alpha=0.2, facecolor=color)
ax.fill_between(fractions, errors-stdevs, errors+stdevs, alpha=0.2*alphascale, facecolor=color)
title: str = ''
if len(attrs) > 0:
for i in range(len(attrs)):
if attrs[i] == 'slush':
attrs[i] = 'sludge'
title = ', '.join(attr for attr in attrs)
print('title is', title)
ax.set_title(title, fontSize=15)
elif pcounts != []:
title += f'{pcounts[0]} <= num. Particles < {pcounts[1]}'
meanNumMP: int = int(round(meanParticleCount * meanMPFrac/100, 0))
title += f' ({numSamples} filters)\nAverage: {meanParticleCount} particles, {meanMPFrac} % MP, {meanNumMP} MP particles'
ax.set_title(title, fontSize=13)
ax.set_xscale('log')
ax.set_xlabel('measured fraction', fontsize=12)
ax.set_ylabel('mpCountError (%)', fontsize=12)
ax.set_xlim([0.9 * min(fractions), 1.05])
ax.xaxis.set_major_formatter(ScalarFormatter())
# ax.xaxis.set_major_locator(FixedLocator([0.02, 0.05, 0.1, 0.2, 0.5, 1.0]))
ax.xaxis.set_major_locator(FixedLocator([2, 5, 10, 20, 50, 100]))
ax.set_xlabel('measured fraction (%)', fontsize=12)
ax.set_ylabel('subsampling-error (%)', fontsize=12)
minX, maxX = 0.9 * min(fractions), 105
ax.hlines([20, 40, 60, 80], minX, maxX, colors='gray', alpha=0.5)
ax.set_xlim([minX, maxX])
ax.set_ylim([0, 100])
ax.legend()
index += 1
fig.tight_layout()
return fig
def get_distance_hist_plots(totalResults: TotalResults, attributes: list = []) -> Figure:
fig: Figure = plt.figure(figsize=(7, 5))
numRows: int = 1
numCols: int = 1
if len(attributes) == 0:
attributes = [[]]
elif len(attributes) <= 2:
numCols = len(attributes)
else:
numRows = 2
numCols = np.ceil(len(attributes) / numRows)
onlyMP: bool = True
ax = fig.add_subplot()
for index, attrs in enumerate(attributes):
# ax = fig.add_subplot(numRows, numCols, index + 1)
allParticles: list = []
densities: list = []
particleCounts: list = []
pathinesses: list = []
for sampleRes in totalResults.sampleResults:
if sampleRes.has_any_attribute(attrs):
if sampleRes.dataset is None:
sampleRes.load_dataset()
dset = sampleRes.dataset
patchiness: float = get_particle_patchiness(dset, onlyMP=onlyMP)
pathinesses.append(patchiness)
if onlyMP:
particleCount: int = 0
for particle in dset.particleContainer.particles:
if is_MP_particle(particle):
particleCount += 1
particleCounts.append(particleCount)
else:
particleCounts.append(len(dset.particleContainer.particles))
# for particle in dset.particleContainer.particles:
# allParticles.append(particle)
#
# offset, diameter, [width, height] = get_filterDimensions_from_dataset(dset)
# center = get_center_from_filter_dimensions(offset, diameter)
# center[0] = convert_length_to_pixels(dset, center[0])
# center[1] = convert_length_to_pixels(dset, center[1])
# histdata = get_distance_point_histogramdata(dset.particleContainer.particles, center)
# densities.append(histdata[1])
# ax.plot(histdata[0], histdata[1])
for i in range(len(attrs)):
if attrs[i] == 'slush':
attrs[i] = 'sludge'
ax.scatter(particleCounts, pathinesses, label=', '.join(attr for attr in attrs))
# numSamples = len(densities)
# partCounts: list = [len(i) for i in allParticles]
# meanParticleCount: float = round(len(allParticles) / numSamples)
# meanParticleCount: float = round(np.mean(partCounts))
# stdParticleCount: float = round(np.std(partCounts))
# mpFracs: list = [get_number_of_MP_particles(i)/len(i) for i in allParticles]
# meanMPFrac: float = round(np.mean(mpFracs) * 100, 1)
# stdMPFrac: float = round(np.std(mpFracs) * 100, 1)
# numMPParticles: float = get_number_of_MP_particles(allParticles)
# meanMPFrac: float = round(numMPParticles / len(allParticles) * 100, 1)
# meanPatchiness: float = round(np.mean(pathinesses), 2)
# title: str = ''
# if len(attrs) > 0:
# title = ', '.join(attr for attr in attrs)
# title += f'\n({numSamples} filters, avg. {meanParticleCount} particles, {meanMPFrac} % MP,'
# title += f'\navg. Particle Patchiness {meanPatchiness})'
# ax.set_title(title, fontSize=13)
# densities: np.ndarray = np.mean(np.array(densities), axis=0)
# densities /= densities.max()
# distances = np.array(histdata[0], dtype=np.float) * dset.pixelscale_df
# ax.plot(distances / 1000, densities)
# ax.set_xlabel('distance from filter center (mm)', fontsize=12)
# ax.set_xlim([0, 6])
# ax.set_ylabel('normalized particle density', fontsize=12)
# ax.set_ylim([0.0, 1.05])
ax.legend(fontsize=15)
ax.set_xscale('log')
if not onlyMP:
ax.set_xticks([1000, 5000, 10000, 50000, 100000])
ax.set_xlabel('Particle Count', fontsize=15)
else:
ax.set_xticks([10, 50, 100, 500])
ax.set_xlabel('MP Particle Count', fontsize=15)
ax.xaxis.set_major_formatter(ScalarFormatter())
ax.set_ylabel('Patchiness', fontsize=15)
for tick in ax.xaxis.get_major_ticks():
tick.label.set_fontsize(15)
for tick in ax.yaxis.get_major_ticks():
tick.label.set_fontsize(15)
fig.tight_layout()
return fig
def get_error_vs_mpfrac_plot(totalResults: TotalResults, attributes: list = []) -> Figure:
def quadratic_fit(x, a, b, c):
return a*x**2 + b*x + c
fig: Figure = plt.figure(figsize=(15, 5))
ax1 = fig.add_subplot(121)
ax2 = fig.add_subplot(122)
dataWithFractions: dict = {}
dataWithNumbers: dict = {}
for index, attrs in enumerate(attributes):
for sampleRes in totalResults.sampleResults:
if sampleRes.has_any_attribute(attrs):
if sampleRes.dataset is None:
sampleRes.load_dataset()