From 5dec089fabb6c05903cf241ddd5fe3fae31c5c77 Mon Sep 17 00:00:00 2001 From: Josef Brandt Date: Wed, 18 Mar 2020 20:02:26 +0100 Subject: [PATCH] KennardStone basically working, but slow.. --- chemometricMethods.py | 136 ++++++++++++++++++++++++++----- tests/test_chemometricMethods.py | 97 ++++++++++++++++++---- 2 files changed, 194 insertions(+), 39 deletions(-) diff --git a/chemometricMethods.py b/chemometricMethods.py index 962fcbe..1ff1982 100644 --- a/chemometricMethods.py +++ b/chemometricMethods.py @@ -4,7 +4,7 @@ from sklearn.preprocessing import StandardScaler from sklearn.decomposition import PCA from scipy import spatial from itertools import combinations - +import time import sys sys.path.append("C://Users//xbrjos//Desktop//Python") from gepard.analysis.particleContainer import ParticleContainer @@ -65,7 +65,59 @@ class KennardStone(object): self.fraction: float = desiredFraction def get_sampled_indices(self) -> list: - pass + t0 = time.time() + selectedIndices: set = set([]) + numIndices: int = round(self.data.shape[0] * self.fraction) + if numIndices < 2: + raise ValueError(f'Not enough indices to generate (min = 2), requested: {numIndices}') + else: + distMat = spatial.distance_matrix(self.data, self.data) + i, j = np.unravel_index(distMat.argmax(), distMat.shape) + remainingIndices: set = set(np.arange(self.data.shape[0])) + selectedIndices.add(i) + selectedIndices.add(j) + remainingIndices.remove(i) + remainingIndices.remove(j) + for _ in range(numIndices-2): + minDist = 0.0 + for j in remainingIndices: + dist = np.min([distMat[j][i] for i in selectedIndices]) + if dist > minDist: + minj = j + minDist = dist + selectedIndices.add(minj) + remainingIndices.remove(minj) + + assert len(np.unique(list(selectedIndices))) == len(selectedIndices) + print('selecting indices time:', np.round(time.time() - t0, 2), 'seconds') + return list(selectedIndices) + # def get_sampled_indices(self) -> list: + # t0 = time.time() + # numIndices: int = round(self.data.shape[0] * self.fraction) + # if numIndices < 2: + # raise ValueError(f'Not enough indices to generate (min = 2), requested: {numIndices}') + # else: + # startInd = self._get_start_indices() + # selectedPoints = np.zeros((numIndices, 2)) + # selectedPoints[0, :] = self.data[startInd[0], :] + # selectedPoints[1, :] = self.data[startInd[1], :] + # + # if numIndices > 2: + # data: np.ndarray = np.delete(self.data, startInd, 0) + # for i in range(numIndices-2): + # newIndex: int = self._get_point_furthest_from_other_points(selectedPoints[:i+2, :], data) + # selectedPoints[i+2, :] = data[newIndex, :] + # data = np.delete(data, newIndex, 0) + # + # selectedIndices = [] + # assert numIndices == selectedPoints.shape[0] + # for i in range(numIndices): + # newInd = np.where(self.data == selectedPoints[i])[0][0] + # selectedIndices.append(newInd) + # + # assert len(np.unique(selectedIndices)) == len(selectedIndices) + # print('selecting indices time:', np.round(time.time()-t0, 2), 'seconds') + # return selectedIndices def _get_start_indices(self) -> list: """ @@ -73,29 +125,71 @@ class KennardStone(object): Adapted from https://stackoverflow.com/questions/50468643/finding-two-most-far-away-points-in-plot-with-many-points-in-python/50469147 :return: """ + assert self.data.shape[1] == 2 candidates = self.data[spatial.ConvexHull(self.data).vertices] - import matplotlib.pyplot as plt - # plt.scatter(self.data[:, 0], self.data[:, 1]) - # plt.plot(candidates[:, 0], candidates[:, 1]) - # plt.show() dist_mat = spatial.distance_matrix(candidates, candidates) i, j = np.unravel_index(dist_mat.argmax(), dist_mat.shape) index1 = np.where(self.data == candidates[i])[0][0] index2 = np.where(self.data == candidates[j])[0][0] + assert index1 != index2 return sorted([index1, index2]) - def _get_point_furthest_from_other_points(self, refPoints: np.ndarray, otherPoints: np.ndarray) -> int: - index: int = -1 - maxDist: float = 0.0 - for i in range(otherPoints.shape[0]): - point = otherPoints[i, :] - dist = 0 - for j in range(refPoints.shape[0]): - point2 = refPoints[j] - dist += (point[0]-point2[0])**2 + (point[1]-point2[1])**2 - if dist > maxDist: - maxDist = dist - index = i - - return index - + # def _get_point_furthest_from_other_points(self, refPoints: np.ndarray, otherPoints: np.ndarray) -> int: + # assert refPoints.shape[1] == 2 + # assert otherPoints.shape[1] == 2 + # dist_mat = spatial.distance_matrix(refPoints, otherPoints) + # i, index = np.unravel_index(dist_mat.argmax(), dist_mat.shape) + # return index + + +class BoundingAreaHierarchy(object): + def __init__(self, points: np.ndarray): + super(BoundingAreaHierarchy, self).__init__() + self.points: np.ndarray = points + self.tree: BAHNode = BAHNode(points) + self._populate_tree() + + # def _populate_tree(self) -> None: + # + + +# class BAHNode(object): +# def __init__(self, parent, startxy: tuple, width: float, height: float, points: np.ndarray) -> None: +# super(BAHNode, self).__init__() +# self.maxPointsPerNode: int = 10 +# self.parent: BAHNode = parent +# self.children: list = [] # if empty, we reached the lowest node level +# self.points: np.ndarray = np.array([]) # if empty, we are not at the lowest level and have to check children +# self.x0: float = startxy[0] +# self.y0: float = startxy[1] +# self.width: float = width +# self.height: float = height +# self.x1: float = self.x0 + width +# self.y1: float = self.y0 + height +# self._create_child_nodes(points) +# +# def _create_child_nodes(self, points:np.ndarray): +# if points.shape[0] > 0: # avoid testing for children in case of a testrun (empty array is provided as points) +# pointsInNode: np.ndarray = self._get_points_in_area(points) +# if pointsInNode.shape[0] > self.maxPointsPerNode: +# childWidth: float = self.width/2 +# childHeight:float = self.height/2 +# +# self.children.append(BAHNode(self, (self.x0, self.y1), +# childWidth, childHeight, pointsInNode)) +# +# self.children.append(BAHNode(self, (self.x0 + childWidth, self.y1), +# childWidth, childHeight, pointsInNode)) +# +# self.children.append(BAHNode(self, (self.x0, self.y1 + childHeight), +# childWidth, childHeight, pointsInNode)) +# +# self.children.append(BAHNode(self, (self.x0 + childWidth, self.y1 + childHeight), +# childWidth, childHeight, pointsInNode)) +# +# +# def _get_points_in_area(self, points:np.ndarray) -> np.ndarray: +# assert points.shape[1] == 2 +# cond1: np.ndarray = np.logical_and(points[:, 0] >= self.x0, points[:, 0] < self.x1) +# cond2: np.ndarray = np.logical_and(points[:, 1] >= self.y0, points[:, 1] < self.y1) +# return points[np.logical_and(cond1, cond2)] diff --git a/tests/test_chemometricMethods.py b/tests/test_chemometricMethods.py index 4a39ea4..a08129b 100644 --- a/tests/test_chemometricMethods.py +++ b/tests/test_chemometricMethods.py @@ -53,12 +53,33 @@ class TestKennardStone(unittest.TestCase): self.kennardStone: cmeth.KennardStone = cmeth.KennardStone(np.array([]), 0.1) def test_get_sampled_indices(self): - pass + numDataSets: int = 400 + self.kennardStone.data = np.random.rand(numDataSets, 2) + self.kennardStone.fraction = 0.1 + selectedIndices = self.kennardStone.get_sampled_indices() + self.assertEqual(len(selectedIndices), numDataSets*self.kennardStone.fraction) + self.assertEqual(len(np.unique(selectedIndices)), numDataSets*self.kennardStone.fraction) + + plt.scatter(self.kennardStone.data[:, 0], self.kennardStone.data[:, 1]) + plt.scatter(self.kennardStone.data[selectedIndices, 0], self.kennardStone.data[selectedIndices, 1]) + plt.show() + + self.kennardStone.fraction = 0.1 + numDataSets = 2 + self.kennardStone.data = np.random.rand(numDataSets, 2) + self.assertRaises(ValueError, self.kennardStone.get_sampled_indices) + + numDataSets = 20 + self.kennardStone.data = np.random.rand(numDataSets, 2) + selectedIndices = self.kennardStone.get_sampled_indices() + self.assertEqual(len(selectedIndices), 2) + self.assertEqual(len(np.unique(selectedIndices)), 2) def test_get_start_indices(self): points: list = [[0, 0], [10, 10]] - for _ in range(10): + for _ in range(100): points.append([np.random.rand()*5 + 2.5, np.random.rand()*5 + 2.5]) + self.kennardStone.data = np.array(points) startIndices: list = self.kennardStone._get_start_indices() self.assertEqual(startIndices, [0, 1]) @@ -73,22 +94,22 @@ class TestKennardStone(unittest.TestCase): startIndices = self.kennardStone._get_start_indices() self.assertEqual(startIndices, [4, len(points) - 1]) - def test_get_point_furthest_from_other_points(self): - otherPoints: list = [[0, 0], [10, 0], [0, 10], [10, 10]] - refPoints: list = [[2, 2]] - indexOfFurthestPoint = self.kennardStone._get_point_furthest_from_other_points(np.array(refPoints), - np.array(otherPoints)) - self.assertEqual(indexOfFurthestPoint, 3) - - refPoints: list = [[9, 9]] - indexOfFurthestPoint = self.kennardStone._get_point_furthest_from_other_points(np.array(refPoints), - np.array(otherPoints)) - self.assertEqual(indexOfFurthestPoint, 0) - - refPoints: list = [[2, 2], [3, 3], [-1, -5]] - indexOfFurthestPoint = self.kennardStone._get_point_furthest_from_other_points(np.array(refPoints), - np.array(otherPoints)) - self.assertEqual(indexOfFurthestPoint, 3) + # def test_get_point_furthest_from_other_points(self): + # otherPoints: list = [[0, 0], [10, 0], [0, 10], [10, 10]] + # refPoints: list = [[2, 2]] + # indexOfFurthestPoint = self.kennardStone._get_point_furthest_from_other_points(np.array(refPoints), + # np.array(otherPoints)) + # self.assertEqual(indexOfFurthestPoint, 3) + # + # refPoints: list = [[9, 9]] + # indexOfFurthestPoint = self.kennardStone._get_point_furthest_from_other_points(np.array(refPoints), + # np.array(otherPoints)) + # self.assertEqual(indexOfFurthestPoint, 0) + # + # refPoints: list = [[2, 2], [3, 3], [-1, -5]] + # indexOfFurthestPoint = self.kennardStone._get_point_furthest_from_other_points(np.array(refPoints), + # np.array(otherPoints)) + # self.assertEqual(indexOfFurthestPoint, 3) class TestChemometricSubsampling(unittest.TestCase): @@ -120,3 +141,43 @@ class TestChemometricSubsampling(unittest.TestCase): # plt.scatter(princComp[:, 0], princComp[:, 1]) # plt.title(dset.name) # plt.show() + + +# class TestBAH(unittest.TestCase): +# # def setUp(self) -> None: +# # self.bah = cmeth.BoundingAreaHierarchy() +# # +# def test_get_points_in_area(self): +# points: np.ndarray = np.array([[0, 0], [0, 10], [10, 0], [10, 10]]) +# topLeftXY = (0, 0) +# width, height = 5, 5 +# bahNode: cmeth.BAHNode = cmeth.BAHNode(None, topLeftXY, width, height, np.array([])) +# ponitsInNode: np.ndarray = bahNode._get_points_in_area(points) +# self.assertEqual(ponitsInNode.shape[0], 1) +# self.assertTrue([0, 0] in ponitsInNode) +# +# width, height = 10, 10 +# bahNode = cmeth.BAHNode(None, topLeftXY, width, height, np.array([])) +# ponitsInNode: np.ndarray = bahNode._get_points_in_area(points) +# self.assertEqual(ponitsInNode.shape[0], 1) +# self.assertTrue([0, 0] in ponitsInNode) +# +# width, height = 10.1, 10.1 +# bahNode = cmeth.BAHNode(None, topLeftXY, width, height, np.array([])) +# ponitsInNode: np.ndarray = bahNode._get_points_in_area(points) +# self.assertEqual(ponitsInNode.shape[0], 4) +# for point in points: +# self.assertTrue(point in ponitsInNode) +# +# topLeftXY = (-5, -5) +# bahNode = cmeth.BAHNode(None, topLeftXY, width, height, np.array([])) +# ponitsInNode: np.ndarray = bahNode._get_points_in_area(points) +# self.assertEqual(ponitsInNode.shape[0], 1) +# self.assertTrue([0, 0] in ponitsInNode) +# +# width, height = 10, 20 +# bahNode = cmeth.BAHNode(None, topLeftXY, width, height, np.array([])) +# ponitsInNode: np.ndarray = bahNode._get_points_in_area(points) +# self.assertEqual(ponitsInNode.shape[0], 2) +# for point in points[:2]: +# self.assertTrue(point in ponitsInNode) \ No newline at end of file -- GitLab