Skip to content

Commit

Permalink
Merge pull request #6 from annaformaniuk/feature/custom_generalizer
Browse files Browse the repository at this point in the history
Feature/custom generalizer
  • Loading branch information
saadsaifse authored Jun 27, 2020
2 parents 7bc0060 + cf8f703 commit 4410173
Show file tree
Hide file tree
Showing 5 changed files with 423 additions and 86 deletions.
3 changes: 2 additions & 1 deletion envirocar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from .client.api.track_api import TrackAPI
from .client.request_param import BboxSelector, TimeSelector
from .trajectories.preprocessing import Preprocessing
from .trajectories.preprocessing import GeneralizationType
from .trajectories.track_converter import TrackConverter
from .trajectories.track_similarity import TrackSimilarity
from .trajectories.track_generalizer import MinDistanceGeneralizer, MaxDistanceGeneralizer, DouglasPeuckerGeneralizer
from .trajectories.track_similarity import *
26 changes: 26 additions & 0 deletions envirocar/trajectories/geom_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from math import sin, cos, atan2, radians, sqrt
from shapely.geometry import Point

R_EARTH = 6371000 # radius of earth in meters

def measure_distance_spherical(point1, point2):
"""Return spherical distance between two shapely Points as a float."""
if (type(point1) != Point) or (type(point2) != Point):
raise TypeError("Only Points are supported as arguments, got {} and {}".format(point1, point2))
lon1 = float(point1.x)
lon2 = float(point2.x)
lat1 = float(point1.y)
lat2 = float(point2.y)
delta_lat = radians(lat2 - lat1)
delta_lon = radians(lon2 - lon1)
a = sin(delta_lat/2) * sin(delta_lat/2) + cos(radians(lat1)) * cos(radians(lat2)) * sin(delta_lon/2) * sin(delta_lon/2)
c = 2 * atan2(sqrt(a), sqrt(1 - a))
dist = R_EARTH * c
return dist


def measure_distance_euclidean(point1, point2):
"""Return euclidean distance between two shapely Points as float."""
if (not isinstance(point1, Point)) or (not isinstance(point2, Point)):
raise TypeError("Only Points are supported as arguments, got {} and {}".format(point1, point2))
return point1.distance(point2)
46 changes: 1 addition & 45 deletions envirocar/trajectories/preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,6 @@
from branca.colormap import linear
import enum

class GeneralizationType(enum.Enum):
DouglasPeucker = 0
MinDistance = 1
MinTimeDelta = 2

class Preprocessing():
def __init__(self):
print("Initializing pre-processing class") # do we need anything?
Expand Down Expand Up @@ -568,43 +563,4 @@ def temporal_filter_date(self, mpd_df, filterdate):

def cluster(self, points_mp):
# TODO clustering of points here
return 'Clustering function was called. Substitute this string with clustering result'

def generalize(self, traj, tolerance, generalizationMode):
""" Generalize the trajectory/trajectory collection
Supported generalization modes include:
- ‘douglas-peucker’ (tolerance as float in CRS units or meters if CRS is geographic, e.g. EPSG:4326 WGS84)
- ‘min-time-delta’ (tolerance as datetime.timedelta)
- ‘min-distance’ (tolerance as float in CRS units or meters if CRS is geographic, e.g. EPSG:4326 WGS84)
Returns:
moving pandas trajectory/trajectory collection
"""

return traj.generalize(generalizationMode, tolerance)

def generalize_v04(self, traj, tolerance, generalizationType):
""" Generalizes the moving pandas trajectory or trajectory collection.
Note: This function will only work with movingpandas v0.4-rc1 and above. See https://github.com/anitagraser/movingpandas/issues/73
Keyword Arguments:
traj -- movingpandas trajectory/trajectory collection
tolerance -- tolerance from 0 to 1. Specify minutes incase of MinTimDelta generalization type
generalizationType -- type of generalization e.g. Douglas Peucker, Min Distance, or Min Time Delta
Returns:
moving pandas trajectory or trajectory collection
"""

if (not isinstance(generalizationType, GeneralizationType)):
raise ValueError("Invalid generalization type " + str(generalizationType))

if generalizationType.value == GeneralizationType.DouglasPeucker.value:
return mpd.DouglasPeuckerGeneralizer(traj).generalize(tolerance=tolerance)
elif generalizationType.value == GeneralizationType.MinDistance.value:
return mpd.MinDistanceGeneralizer(traj).generalize(tolerance=tolerance)
elif generalizationType.value == GeneralizationType.MinTimeDelta.value:
return mpd.MinTimeDeltaGeneralizer(traj).generalize(tolerance=timedelta(minutes=tolerance))

return 'Clustering function was called. Substitute this string with clustering result'
263 changes: 263 additions & 0 deletions envirocar/trajectories/track_generalizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
# -*- coding: utf-8 -*-

from copy import copy, deepcopy
from shapely.geometry import LineString

from movingpandas import Trajectory, TrajectoryCollection
from .geom_utils import measure_distance_spherical, measure_distance_euclidean

class TrackGeneralizer:
"""
Generalizer base class
"""
def __init__(self, traj):
"""
Create TrajectoryGeneralizer
Parameters
----------
traj : Trajectory/TrajectoryCollection
"""
self.traj = traj

def generalize(self, tolerance, columnNamesToDistributeValues):
"""
Generalize the input Trajectory/TrajectoryCollection.
Parameters
----------
tolerance : any type
Tolerance threshold, differs by generalizer
columnNamesToDistributeValues: list
List of column names to distribute values to neighboring kept rows
Returns
-------
Trajectory/TrajectoryCollection
Generalized Trajectory or TrajectoryCollection
"""
if isinstance(self.traj, Trajectory):
return self._generalize_traj(self.traj, tolerance, columnNamesToDistributeValues)
elif isinstance(self.traj, TrajectoryCollection):
return self._generalize_traj_collection(tolerance, columnNamesToDistributeValues)
else:
raise TypeError

def _generalize_traj_collection(self, tolerance, columnNamesToDistributeValues):
generalized = []
for traj in self.traj.trajectories:
generalized.append(self._generalize_traj(traj, tolerance, columnNamesToDistributeValues))
result = copy(self.traj)
result.trajectories = generalized
return result

def _generalize_traj(self, traj, tolerance, columnNamesToDistributeValues):
return traj

class MinDistanceGeneralizer(TrackGeneralizer):
"""
Generalizes based on distance.
This generalization ensures that consecutive locations are at least a certain distance apart.
tolerance : float
Desired minimum distance between consecutive points
columnNamesToDistributeValues : list of column names to distribute values to neighboring kept rows
Examples
--------
>>> mpd.MinDistanceGeneralizer(traj).generalize(tolerance=1.0)
"""

def _generalize_traj(self, traj, tolerance, columnNamesToDistributeValues = None):
temp_df = traj.df.copy()
prev_pt = temp_df.iloc[0][traj.get_geom_column_name()]
keep_rows = [0]
i = 0
trajCopy = deepcopy(traj)
for index, row in trajCopy.df.iterrows():
pt = row[traj.get_geom_column_name()]
if traj.is_latlon:
dist = measure_distance_spherical(pt, prev_pt)
else:
dist = measure_distance_euclidean(pt, prev_pt)
if dist >= tolerance:
keep_rows.append(i)
prev_pt = pt
i += 1

keep_rows.append(len(traj.df)-1)

if (columnNamesToDistributeValues):
# Distribute the selected values of dropped rows to the neighboring rows
for i, rowIndex in enumerate(keep_rows):
if (i < len(keep_rows) - 1 and keep_rows[i + 1] - rowIndex > 1):
nextRowIndex = keep_rows[i + 1]
discardedRows = trajCopy.df.iloc[rowIndex + 1 : nextRowIndex]
discardedRowsSelectedColumns = discardedRows[columnNamesToDistributeValues]
discardedRowsSelectedColumnsSum = discardedRowsSelectedColumns.sum()
aboveRow = trajCopy.df.iloc[rowIndex]
belowRow = trajCopy.df.iloc[nextRowIndex]
aboveRow[columnNamesToDistributeValues] = aboveRow[columnNamesToDistributeValues] + (discardedRowsSelectedColumnsSum/2)
belowRow[columnNamesToDistributeValues] = belowRow[columnNamesToDistributeValues] + (discardedRowsSelectedColumnsSum/2)
trajCopy.df.iloc[rowIndex] = aboveRow
trajCopy.df.iloc[nextRowIndex] = belowRow

new_df = trajCopy.df.iloc[keep_rows]
new_traj = Trajectory(new_df, trajCopy.id)
return new_traj


class MinTimeDeltaGeneralizer(TrackGeneralizer):
"""
Generalizes based on time.
This generalization ensures that consecutive rows are at least a certain timedelta apart.
tolerance : datetime.timedelta
Desired minimum time difference between consecutive rows
columnNamesToDistributeValues : list of column names to distribute values to neighboring kept rows
Examples
--------
>>> mpd.MinTimeDeltaGeneralizer(traj).generalize(tolerance=timedelta(minutes=10))
"""

def _generalize_traj(self, traj, tolerance, columnNamesToDistributeValues = None):
temp_df = traj.df.copy()
temp_df['t'] = temp_df.index
prev_t = temp_df.head(1)['t'][0]
keep_rows = [0]
i = 0
trajCopy = deepcopy(traj)
for index, row in trajCopy.df.iterrows():
t = row['t']
tdiff = t - prev_t
if tdiff >= tolerance:
keep_rows.append(i)
prev_t = t
i += 1

keep_rows.append(len(traj.df)-1)

if (columnNamesToDistributeValues):
# Distribute the selected values of dropped rows to the neighboring rows
for i, rowIndex in enumerate(keep_rows):
if (i < len(keep_rows) - 1 and keep_rows[i + 1] - rowIndex > 1):
nextRowIndex = keep_rows[i + 1]
discardedRows = trajCopy.df.iloc[rowIndex + 1 : nextRowIndex]
discardedRowsSelectedColumns = discardedRows[columnNamesToDistributeValues]
discardedRowsSelectedColumnsSum = discardedRowsSelectedColumns.sum()
aboveRow = trajCopy.df.iloc[rowIndex]
belowRow = trajCopy.df.iloc[nextRowIndex]
aboveRow[columnNamesToDistributeValues] = aboveRow[columnNamesToDistributeValues] + (discardedRowsSelectedColumnsSum/2)
belowRow[columnNamesToDistributeValues] = belowRow[columnNamesToDistributeValues] + (discardedRowsSelectedColumnsSum/2)
trajCopy.df.iloc[rowIndex] = aboveRow
trajCopy.df.iloc[nextRowIndex] = belowRow

new_df = trajCopy.df.iloc[keep_rows]
new_traj = Trajectory(new_df, trajCopy.id)
return new_traj


class MaxDistanceGeneralizer(TrackGeneralizer):
"""
Generalizes based on distance.
Similar to Douglas-Peuker. Single-pass implementation that checks whether the provided distance threshold
is exceed.
tolerance : float
Distance tolerance
columnNamesToDistributeValues : list of column names to distribute values to neighboring kept rows
Examples
--------
>>> mpd.MaxDistanceGeneralizer(traj).generalize(tolerance=1.0)
"""

def _generalize_traj(self, traj, tolerance, columnNamesToDistributeValues = None):
prev_pt = None
pts = []
keep_rows = []
i = 0
trajCopy = deepcopy(traj)
for index, row in trajCopy.df.iterrows():
current_pt = row[trajCopy.get_geom_column_name()]
if prev_pt is None:
prev_pt = current_pt
keep_rows.append(i)
continue
line = LineString([prev_pt, current_pt])
for pt in pts:
if line.distance(pt) > tolerance:
prev_pt = current_pt
pts = []
keep_rows.append(i)
continue
pts.append(current_pt)
i += 1

keep_rows.append(i)
if (columnNamesToDistributeValues):
# Distribute the selected values of dropped rows to the neighboring rows
for i, rowIndex in enumerate(keep_rows):
if (i < len(keep_rows) - 1 and keep_rows[i + 1] - rowIndex > 1):
nextRowIndex = keep_rows[i + 1]
discardedRows = trajCopy.df.iloc[rowIndex + 1 : nextRowIndex]
discardedRowsSelectedColumns = discardedRows[columnNamesToDistributeValues]
discardedRowsSelectedColumnsSum = discardedRowsSelectedColumns.sum()
aboveRow = trajCopy.df.iloc[rowIndex]
belowRow = trajCopy.df.iloc[nextRowIndex]
aboveRow[columnNamesToDistributeValues] = aboveRow[columnNamesToDistributeValues] + (discardedRowsSelectedColumnsSum/2)
belowRow[columnNamesToDistributeValues] = belowRow[columnNamesToDistributeValues] + (discardedRowsSelectedColumnsSum/2)
trajCopy.df.iloc[rowIndex] = aboveRow
trajCopy.df.iloc[nextRowIndex] = belowRow

new_df = trajCopy.df.iloc[keep_rows]
new_traj = Trajectory(new_df, trajCopy.id)
return new_traj


class DouglasPeuckerGeneralizer(TrackGeneralizer):
"""
Generalizes using Douglas-Peucker algorithm.
tolerance : float
Distance tolerance
columnNamesToDistributeValues : list of column names to distribute values to neighboring kept rows
Examples
--------
>>> mpd.DouglasPeuckerGeneralizer(traj).generalize(tolerance=1.0)
"""

def _generalize_traj(self, traj, tolerance, columnNamesToDistributeValues = None):
prev_pt = None
pts = []
keep_rows = []
i = 0
trajCopy = deepcopy(traj)
for index, row in trajCopy.df.iterrows():
current_pt = row.geometry
# Handle first row and skip the loop
if prev_pt is None:
prev_pt = current_pt
keep_rows.append(i)
continue
line = LineString([prev_pt, current_pt])
for pt in pts:
if line.distance(pt) > tolerance:
prev_pt = current_pt
pts = []
keep_rows.append(i)
continue
pts.append(current_pt)
i += 1
# Keep the last row
keep_rows.append(i)

if (columnNamesToDistributeValues):
# Distribute the selected values of dropped rows to the neighboring rows
for i, rowIndex in enumerate(keep_rows):
if (i < len(keep_rows) - 1 and keep_rows[i + 1] - rowIndex > 1):
nextRowIndex = keep_rows[i + 1]
discardedRows = trajCopy.df.iloc[rowIndex + 1 : nextRowIndex]
discardedRowsSelectedColumns = discardedRows[columnNamesToDistributeValues]
discardedRowsSelectedColumnsSum = discardedRowsSelectedColumns.sum()
aboveRow = trajCopy.df.iloc[rowIndex]
belowRow = trajCopy.df.iloc[nextRowIndex]
aboveRow[columnNamesToDistributeValues] = aboveRow[columnNamesToDistributeValues] + (discardedRowsSelectedColumnsSum/2)
belowRow[columnNamesToDistributeValues] = belowRow[columnNamesToDistributeValues] + (discardedRowsSelectedColumnsSum/2)
trajCopy.df.iloc[rowIndex] = aboveRow
trajCopy.df.iloc[nextRowIndex] = belowRow

new_df = trajCopy.df.iloc[keep_rows]
new_traj = Trajectory(new_df, trajCopy.id)
return new_traj
Loading

0 comments on commit 4410173

Please sign in to comment.