Commit d7ed0e0f authored by Josef Brandt's avatar Josef Brandt

Merge branch 'master' into Development

parents 68561321 8074c9d8
......@@ -14,3 +14,9 @@ cythonModules/build/
*.pyd
*.html
*.pkl
chemometrics/Assignments.txt
chemometrics/Data.txt
import numpy as np
import cv2
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.cluster import DBSCAN
from scipy import spatial
from itertools import combinations
from random import sample
import sys
sys.path.append("C://Users//xbrjos//Desktop//Python")
from gepard.analysis.particleContainer import ParticleContainer
from gepard.analysis import particleAndMeasurement as pm
from methods import SubsamplingMethod
def get_pca(data: np.ndarray, numComp: int = 2) -> np.ndarray:
try:
standardizedData = StandardScaler().fit_transform(data.copy())
except ValueError:
print('first standardscaler attempt failed, retrying..')
print('datashape', data.shape)
print('unique:', np.unique(data))
raise
pca = PCA(n_components=numComp)
princComp: np.ndarray = pca.fit_transform(np.transpose(standardizedData))
return princComp
def do_DBSCAN_clustering(data: np.ndarray, eps: float = 0.1, min_samples: int = 10) -> tuple:
"""
Does DBSCAN clustering and finds noisy data
:param data: The input array
:param eps:
:param min_samples:
:return: Cluster labels for each point in the dataset given to fit(). Noisy samples are given the label -1.
"""
assert data.shape[1] == 2
standardizedData = StandardScaler().fit_transform(data)
db = DBSCAN(eps=eps, min_samples=min_samples).fit(standardizedData)
return db.labels_, db.core_sample_indices_
def get_n_points_closest_to_point(points: np.ndarray, n: int, refPoint: np.ndarray) -> list:
"""
Returns a list with indices of n points that are closest to the indicated refPoint
:param points: np.ndarray, cols: x, y, rows: individual points
:param n: number of points to return
:param refPoint: np.array([x, y]) of reference point
:return: list of point indices
"""
distancesToPoints: np.ndarray = np.linalg.norm(points - refPoint, axis=1)
sortedIndices = np.argsort(distancesToPoints)
return list(sortedIndices[:n])
class ChemometricSubsampling(SubsamplingMethod):
def __init__(self, particleContainer: ParticleContainer, desiredFraction: float):
super(ChemometricSubsampling, self).__init__(particleContainer, desiredFraction)
@property
def label(self) -> str:
return 'Chemometric Selection'
def apply_subsampling_method(self) -> list:
vectors: np.ndarray = self._get_particle_featurematrix()
try:
princComps: np.ndarray = get_pca(vectors)
except ValueError:
print('numParticles:', len(self.particleContainer.particles))
print('input featurematrix shape', vectors.shape)
clusterLabels, coreIndices = do_DBSCAN_clustering(princComps)
indices: list = self._get_indices_from_clusterLabels(princComps, clusterLabels, coreIndices)
selectedParticles: list = []
for particle in self.particleContainer.particles:
if particle.index in indices:
selectedParticles.append(particle)
return selectedParticles
def _get_particle_featurematrix(self) -> np.ndarray:
"""
:return: np.ndarray, numRows: Features, numCols: Particles
"""
vectors: list = []
for particle in self.particleContainer.particles:
extractor: FeatureExtractor = FeatureExtractor(particle)
vectors.append(extractor.get_characteristic_vector())
vectors: np.ndarray = np.transpose(np.array(vectors))
assert vectors.shape == (11, len(self.particleContainer.particles)), f'wrong featureMat-shape: {vectors.shape}'
return vectors
def equals(self, otherMethod) -> bool:
equals: bool = False
if type(otherMethod) == ChemometricSubsampling and otherMethod.fraction == self.fraction:
equals = True
return equals
def _get_indices_from_clusterLabels(self, points: np.ndarray, labels: np.ndarray, centerIndices: np.ndarray) -> list:
indices: list = []
allIndices: np.ndarray = np.arange(len(labels))
numPointsPerCluster: dict = self._get_numPoints_per_cluster(labels)
for clusterIndex in set(labels):
indToAppend: list = []
nPoints: int = int(numPointsPerCluster[clusterIndex])
indicesInCluster: np.ndarray = allIndices[labels == clusterIndex]
if clusterIndex == -1:
for ind in sample(list(indicesInCluster), nPoints):
# assert ind not in indices
indices.append(ind)
else:
clusterPoints: np.ndarray = points[indicesInCluster]
centerPoint: np.ndarray = np.mean(clusterPoints, axis=0)
indicesToSelect: list = get_n_points_closest_to_point(clusterPoints, nPoints, centerPoint)
for ind in indicesToSelect:
origInd = indicesInCluster[ind]
indices.append(origInd)
assert len(set(indices)) == len(indices), f'The calculated indices contain duplicates, ' \
f'num duplicates: {len(indices) - len(set(indices))}'
return indices
def _get_numPoints_per_cluster(self, labels: np.ndarray, noiseAmpFactor: float = 5) -> dict:
"""
MP Particles are expected to be the minority of all particles. So, if datapoints were classified as noise
(i.e., label = -1), it is likely that MP is in there. The abundancy of points taken from the noise is multiplied
by the noiseAmpFactor
:param labels:
:param noiseAmpFactor:
:return: A dictionary with keys = cluster index (i.e., label) and value = number of points to take from that
"""
pointsPerCluster: dict = {}
if type(labels) != np.ndarray:
labels = np.array(labels)
individualLabels: set = set(labels)
numPointsToSelect = round(len(labels) * self.fraction)
if numPointsToSelect == 0:
numPointsToSelect = 1
numNoisePoints = len(labels[labels == -1])
numClusteredPoints = len(labels) - numNoisePoints
# # get max noiseAmpFactor
if noiseAmpFactor > 1/self.fraction:
noiseAmpFactor = 1/self.fraction
numAmpPoints = numClusteredPoints + numNoisePoints*noiseAmpFactor
fractionPerCluster = np.clip(numPointsToSelect / numAmpPoints, 0.0, 1.0)
tooFewPoints = numPointsToSelect < len(individualLabels)
totalPointsAdded = 0
for ind in individualLabels:
if ind > -1:
if not tooFewPoints:
pointsToAdd = round(fractionPerCluster * len(labels[labels == ind]))
else:
pointsToAdd = 1 if totalPointsAdded < numPointsToSelect else 0
pointsPerCluster[ind] = pointsToAdd
totalPointsAdded += pointsToAdd
# fill up the rest with noisePoints
if numNoisePoints > 0:
diff: float = np.clip(numPointsToSelect - totalPointsAdded, 0, numNoisePoints)
pointsPerCluster[-1] = diff
totalPointsAdded += diff
# just in case too many points were selected (due to rounding errors), keep on deleting until it matches
while totalPointsAdded > numPointsToSelect:
indexWithHighestCount = None
maxCount = 0
for index in pointsPerCluster.values():
if pointsPerCluster[index] > maxCount:
maxCount = pointsPerCluster[index]
indexWithHighestCount = index
pointsPerCluster[indexWithHighestCount] -= 1
totalPointsAdded -= 1
if not abs(totalPointsAdded - numPointsToSelect) <= 1:
print('error')
assert abs(totalPointsAdded - numPointsToSelect) <= 1
for clusterIndex in pointsPerCluster.keys():
assert 0 <= pointsPerCluster[clusterIndex] <= len(labels[labels == clusterIndex])
return pointsPerCluster
class FeatureExtractor(object):
def __init__(self, particle: pm.Particle):
super(FeatureExtractor, self).__init__()
self.particle: pm.Particle = particle
def get_characteristic_vector(self) -> np.ndarray:
log_hu: np.ndarray = self._get_log_hu_moments()
color: np.ndarray = self._get_color_hash(self.particle.color, desiredLength=4)
vector: np.ndarray = np.hstack((log_hu, color))
if len(vector) != 11:
print('error')
assert len(vector) == 7 + 4, f'wrong feature vector: {vector} with shape: {vector.shape}'
return vector
def _get_log_hu_moments(self) -> np.ndarray:
moments: dict = cv2.moments(self.particle.contour)
resultMoments: np.ndarray = np.zeros((7, 1))
for index, mom in enumerate(cv2.HuMoments(moments)):
if mom != 0:
resultMoments[index] = -1 * np.copysign(1.0, mom) * np.log10(abs(mom))
else:
resultMoments[index] = 0
return resultMoments[:, 0]
def _get_color_hash(self, color: str, desiredLength: int = 4) -> np.ndarray:
colorArray: list = [int(i) for i in str(abs(hash(color)))[:desiredLength]]
return np.transpose(np.array(colorArray))
import numpy as np
import cv2
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.cluster import DBSCAN
# from scipy import spatial
# from itertools import combinations
from random import sample
import pickle
from sklearn.ensemble import RandomForestClassifier
from sklearn.neural_network import MLPClassifier
import os
import sys
sys.path.append("C://Users//xbrjos//Desktop//Python")
from gepard.analysis.particleContainer import ParticleContainer
from gepard.analysis import particleAndMeasurement as pm
from methods import SubsamplingMethod
def get_pca(data: np.ndarray, numComp: int = 2) -> np.ndarray:
try:
standardizedData = StandardScaler().fit_transform(data.copy())
except ValueError:
print('first standardscaler attempt failed, retrying..')
print('datashape', data.shape)
print('unique:', np.unique(data))
raise
pca: PCA = PCA(n_components=numComp)
princComp: np.ndarray = pca.fit_transform(np.transpose(standardizedData))
# print(f'pca explained variance with {numComp} princ comps is {sum(pca.explained_variance_)}')
return princComp
def do_DBSCAN_clustering(data: np.ndarray, eps: float = 0.1, min_samples: int = 10) -> tuple:
"""
Does DBSCAN clustering and finds noisy data
:param data: The input array
:param eps:
:param min_samples:
:return: Cluster labels for each point in the dataset given to fit(). Noisy samples are given the label -1.
"""
assert data.shape[1] == 2
standardizedData = StandardScaler().fit_transform(data)
db = DBSCAN(eps=eps, min_samples=min_samples).fit(standardizedData)
return db.labels_, db.core_sample_indices_
def get_n_points_closest_to_point(points: np.ndarray, n: int, refPoint: np.ndarray) -> list:
"""
Returns a list with indices of n points that are closest to the indicated refPoint
:param points: np.ndarray, cols: x, y, rows: individual points
:param n: number of points to return
:param refPoint: np.array([x, y]) of reference point
:return: list of point indices
"""
distancesToPoints: np.ndarray = np.linalg.norm(points - refPoint, axis=1)
sortedIndices = np.argsort(distancesToPoints)
return list(sortedIndices[:n])
def get_particle_featurematrix(particleContainer: ParticleContainer) -> np.ndarray:
"""
:return: np.ndarray, numRows: Features, numCols: Particles
"""
vectors: list = []
for particle in particleContainer.particles:
vectors.append(get_characteristic_vector(particle))
vectors: np.ndarray = np.array(vectors)
assert vectors.shape[0] == len(particleContainer.particles)
return vectors
def get_characteristic_vector(particle: pm.Particle) -> np.ndarray:
vector: list = []
# vector += list(get_log_hu_moments(particle.contour))
vector.append(float(get_color_index(particle.color)))
vector.append(get_solidity(particle.contour))
vector.append(get_aspect_ratio(particle.contour))
vector.append(get_extent(particle.contour))
vector.append(cv2.contourArea(particle.contour))
# vector.append(get_shape_index(particle.shape))
# vector.append(cv2.arcLength(particle.contour, True))
# vector: np.ndarray = np.hstack((log_hu, color))
# if len(vector) != 11:
# print('error')
# assert len(vector) == 7 + 4, f'wrong feature vector: {vector} with shape: {vector.shape}'
return np.array(vector)
def get_solidity(contour: np.ndarray) -> float:
area: float = cv2.contourArea(contour)
hull: np.ndarray = cv2.convexHull(contour)
hull_area: float = cv2.contourArea(hull)
if area == 0 or hull_area == 0:
raise ValueError
solidity: float = area / hull_area
return solidity
def get_aspect_ratio(contour: np.ndarray) -> float:
if contour.shape[0] >= 5: ##at least 5 points required for ellipse fitting...
ellipse = cv2.fitEllipse(contour)
short, long = ellipse[1]
else:
rect = cv2.minAreaRect(contour)
long, short = rect[1]
if short > long:
long, short = short, long
if short == 0.0:
raise InvalidParticleError
return long/short
def get_extent(contour: np.ndarray) -> float:
area: float = float(cv2.contourArea(contour))
x, y, w, h = cv2.boundingRect(contour)
rect_area: float = w * h
extent: float = area / rect_area
return extent
def get_log_hu_moments(contour: np.ndarray) -> np.ndarray:
moments: dict = cv2.moments(contour)
resultMoments: np.ndarray = np.zeros((7, 1))
for index, mom in enumerate(cv2.HuMoments(moments)):
if mom != 0:
resultMoments[index] = -1 * np.copysign(1.0, mom) * np.log10(abs(mom))
else:
resultMoments[index] = 0
return resultMoments[:, 0]
def get_color_hash(color: str, desiredLength: int = 4) -> np.ndarray:
colorArray: list = [int(i) for i in str(abs(hash(color)))[:desiredLength]]
return np.transpose(np.array(colorArray))
def get_color_index(color: str) -> int:
colors: list = ['transparent', 'green', 'brown', 'non-determinable', 'undedetermined', 'grey',
'red', 'yellow', 'white', 'blue']
assert color in colors, f'color not found: {color}'
return colors.index(color)
# def get_shape_index(shape: str) -> int:
# shapes: list = ['spherule', 'fibre', 'flake', 'irregular']
# assert shape in shapes
# return shapes.index(shape)
class TrainedSubsampling(SubsamplingMethod):
def __init__(self, particleContainer: ParticleContainer, desiredFraction: float,
path: str = r'C:\Users\xbrjos\Desktop\Python\Subsampling\chemometrics\RandomForestClassifier, score 0.72.pkl'):
super(TrainedSubsampling, self).__init__(particleContainer, desiredFraction)
self.score: float = None
self.clf = None
self.clfPath: str = path
@property
def label(self) -> str:
return 'Trained Random Sampling'
def _load_classifier(self) -> None:
assert os.path.exists(self.clfPath)
fname: str = self.clfPath
with open(fname, "rb") as fp:
self.clf = pickle.load(fp)
name: str = fname.split('.pkl')[0]
name: str = name.split('score')[1]
self.score = float(name)
def _get_measure_indices(self, predictedAssignments: list) -> set:
indicesToMeasure: set = set([])
assignments: np.ndarray = np.array(predictedAssignments)
mpIndices: list = list(np.where(assignments == 1)[0])
nonMpIndices: list = list(np.where(assignments == 0)[0])
numEstimMPParticles: int = len(mpIndices)
numParticlesToMeasure = round(len(predictedAssignments) * self.fraction)
if numParticlesToMeasure <= numEstimMPParticles:
indicesToMeasure = set(sample(mpIndices, numParticlesToMeasure))
else:
remainingIndices: int = int(numParticlesToMeasure - numEstimMPParticles)
indicesToMeasure = set(mpIndices + sample(nonMpIndices, remainingIndices))
assert len(indicesToMeasure) == numParticlesToMeasure
return indicesToMeasure
# class ChemometricSubsampling(SubsamplingMethod):
# # def __init__(self, particleContainer: ParticleContainer, desiredFraction: float):
# # super(ChemometricSubsampling, self).__init__(particleContainer, desiredFraction)
#
# @property
# def label(self) -> str:
# return 'Chemometric Selection'
#
# def apply_subsampling_method(self) -> list:
# vectors: np.ndarray = get_particle_featurematrix(self.particleContainer)
# try:
# princComps: np.ndarray = get_pca(vectors)
# except ValueError:
# print('numParticles:', len(self.particleContainer.particles))
# print('input featurematrix shape', vectors.shape)
# clusterLabels, coreIndices = do_DBSCAN_clustering(princComps)
# indices: list = self._get_indices_from_clusterLabels(princComps, clusterLabels, coreIndices)
#
# selectedParticles: list = []
# for particle in self.particleContainer.particles:
# if particle.index in indices:
# selectedParticles.append(particle)
#
# return selectedParticles
#
# def _get_particle_featurematrix(self) -> np.ndarray:
# """
# :return: np.ndarray, numRows: Particles, numCols: Features
# """
# vectors: list = []
# for particle in self.particleContainer.particles:
# # extractor: FeatureExtractor = FeatureExtractor(particle)
# vectors.append(extractor.get_characteristic_vector())
# vectors: np.array(vectors)
# # assert vectors.shape == (11, len(self.particleContainer.particles)), f'wrong featureMat-shape: {vectors.shape}'
# return vectors
#
# def equals(self, otherMethod) -> bool:
# equals: bool = False
# if type(otherMethod) == ChemometricSubsampling and otherMethod.fraction == self.fraction:
# equals = True
# return equals
#
# def _get_indices_from_clusterLabels(self, points: np.ndarray, labels: np.ndarray, centerIndices: np.ndarray) -> list:
# indices: list = []
# allIndices: np.ndarray = np.arange(len(labels))
# numPointsPerCluster: dict = self._get_numPoints_per_cluster(labels)
#
# for clusterIndex in set(labels):
# indToAppend: list = []
# nPoints: int = int(numPointsPerCluster[clusterIndex])
# indicesInCluster: np.ndarray = allIndices[labels == clusterIndex]
# if clusterIndex == -1:
# for ind in sample(list(indicesInCluster), nPoints):
# # assert ind not in indices
# indices.append(ind)
# else:
# clusterPoints: np.ndarray = points[indicesInCluster]
# centerPoint: np.ndarray = np.mean(clusterPoints, axis=0)
# indicesToSelect: list = get_n_points_closest_to_point(clusterPoints, nPoints, centerPoint)
# for ind in indicesToSelect:
# origInd = indicesInCluster[ind]
# indices.append(origInd)
#
# assert len(set(indices)) == len(indices), f'The calculated indices contain duplicates, ' \
# f'num duplicates: {len(indices) - len(set(indices))}'
# return indices
#
# def _get_numPoints_per_cluster(self, labels: np.ndarray, noiseAmpFactor: float = 5) -> dict:
# """
# MP Particles are expected to be the minority of all particles. So, if datapoints were classified as noise
# (i.e., label = -1), it is likely that MP is in there. The abundancy of points taken from the noise is multiplied
# by the noiseAmpFactor
# :param labels:
# :param noiseAmpFactor:
# :return: A dictionary with keys = cluster index (i.e., label) and value = number of points to take from that
# """
# pointsPerCluster: dict = {}
# if type(labels) != np.ndarray:
# labels = np.array(labels)
# individualLabels: set = set(labels)
# numPointsToSelect = round(len(labels) * self.fraction)
# if numPointsToSelect == 0:
# numPointsToSelect = 1
#
# numNoisePoints = len(labels[labels == -1])
# numClusteredPoints = len(labels) - numNoisePoints
#
# # # get max noiseAmpFactor
# if noiseAmpFactor > 1/self.fraction:
# noiseAmpFactor = 1/self.fraction
#
# numAmpPoints = numClusteredPoints + numNoisePoints*noiseAmpFactor
# fractionPerCluster = np.clip(numPointsToSelect / numAmpPoints, 0.0, 1.0)
#
# tooFewPoints = numPointsToSelect < len(individualLabels)
#
# totalPointsAdded = 0
# for ind in individualLabels:
# if ind > -1:
#
# if not tooFewPoints:
# pointsToAdd = round(fractionPerCluster * len(labels[labels == ind]))
# else:
# pointsToAdd = 1 if totalPointsAdded < numPointsToSelect else 0
#
# pointsPerCluster[ind] = pointsToAdd
# totalPointsAdded += pointsToAdd
#
# # fill up the rest with noisePoints
# if numNoisePoints > 0:
# diff: float = np.clip(numPointsToSelect - totalPointsAdded, 0, numNoisePoints)
# pointsPerCluster[-1] = diff
# totalPointsAdded += diff
#
# # just in case too many points were selected (due to rounding errors), keep on deleting until it matches
# while totalPointsAdded > numPointsToSelect:
# indexWithHighestCount = None
# maxCount = 0
# for index in pointsPerCluster.values():
# if pointsPerCluster[index] > maxCount:
# maxCount = pointsPerCluster[index]
# indexWithHighestCount = index
#
# pointsPerCluster[indexWithHighestCount] -= 1
# totalPointsAdded -= 1
#
# if not abs(totalPointsAdded - numPointsToSelect) <= 1:
# print('error')
# assert abs(totalPointsAdded - numPointsToSelect) <= 1
# for clusterIndex in pointsPerCluster.keys():
# assert 0 <= pointsPerCluster[clusterIndex] <= len(labels[labels == clusterIndex])
# return pointsPerCluster
\ No newline at end of file
import matplotlib.pyplot as plt
import numpy as np
from random import sample
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.feature_selection import SelectKBest
from sklearn.feature_selection import chi2
import pickle
import time
import sys
sys.path.append("C://Users//xbrjos//Desktop//Python")
from gepard import dataset
from gepard.analysis.particleContainer import ParticleContainer
from input_output import get_pkls_from_directory
from chemometricMethods import get_log_hu_moments, get_color_index, get_pca, get_characteristic_vector
from evaluation import is_MP_particle
def test_classification_models(dataset: tuple) -> None:
names = ["RandomForestClassifier", "NeuralNetClassifier"]
classifiers = [
RandomForestClassifier(n_estimators=1000),
MLPClassifier(alpha=1, max_iter=1000)]
t0 = time.time()
# preprocess dataset, split into training and test part
X, y = dataset
X = StandardScaler().fit_transform(X)
X_train, X_test, y_train, y_test = \
train_test_split(X, y, test_size=.3, random_state=42)
print(f'prepocessng finished after {round(time.time()-t0, 2)} seconds')
# iterate over classifiers
for name, clf in zip(names, classifiers):
t0 = time.time()
clf.fit(X_train, y_train)