Skip to content

Commit

Permalink
Merge pull request #543 from entur/feature/gbfs-delta
Browse files Browse the repository at this point in the history
Compute delta of GBFS files and refactor vehicles and stations updaters
  • Loading branch information
testower authored Jan 28, 2025
2 parents dace98f + 1ee60bc commit 1f3f6b8
Show file tree
Hide file tree
Showing 31 changed files with 2,671 additions and 258 deletions.
19 changes: 17 additions & 2 deletions src/main/java/org/entur/lamassu/cache/EntityCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,27 @@
*/
public interface EntityCache<T extends Entity> extends EntityReader<T> {
/**
* Update multiple entities in the cache with a TTL
* Updates or adds multiple entities to the cache with no expiration (infinite TTL).
* Entities will remain in the cache until explicitly removed.
*
* @param entities Map of entity keys to their corresponding entities
*/
void updateAll(Map<String, T> entities);

/**
* Updates or adds multiple entities to the cache with a specified expiration.
* If ttl is 0 and timeUnit is null, entities will not expire.
*
* @param entities Map of entity keys to their corresponding entities
* @param ttl Cache expiration value, use 0 for no expiration
* @param timeUnit Cache expiration time unit, use null for no expiration
*/
void updateAll(Map<String, T> entities, int ttl, TimeUnit timeUnit);

/**
* Remove multiple entities from the cache
* Removes multiple entities from the cache by their keys.
*
* @param keys Set of entity keys to remove from the cache
*/
void removeAll(Set<String> keys);
}
26 changes: 26 additions & 0 deletions src/main/java/org/entur/lamassu/cache/UpdateContinuityCache.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.entur.lamassu.cache;

import java.util.Date;

/**
* Interface for tracking GBFS update continuity.
* Used to store timestamps of last successful updates to detect missed updates.
*/
public interface UpdateContinuityCache {
/**
* Get the timestamp of the last successful update.
*
* @param systemId ID of the system to check
* @return Timestamp of last update or null if no previous update exists
*/
Date getLastUpdateTime(String systemId);

/**
* Store the timestamp of a successful update. If timestamp is null, entry
* is removed from cache.
*
* @param systemId ID of the system being updated
* @param timestamp Timestamp of the successful update
*/
void setLastUpdateTime(String systemId, Date timestamp);
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ public T get(String key) {
return null;
}

@Override
public void updateAll(Map<String, T> entities) {
cache.putAll(entities);
}

@Override
public void updateAll(Map<String, T> entities, int ttl, TimeUnit timeUnit) {
cache.putAll(entities, ttl, timeUnit);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.entur.lamassu.cache.impl;

import java.util.Date;
import org.entur.lamassu.cache.UpdateContinuityCache;
import org.redisson.api.RMapCache;

/**
* Redis-backed implementation of UpdateContinuityCache using Redisson.
*/
public class RedisUpdateContinuityCache implements UpdateContinuityCache {

private final RMapCache<String, Date> cache;

public RedisUpdateContinuityCache(RMapCache<String, Date> cache) {
this.cache = cache;
}

@Override
public Date getLastUpdateTime(String systemId) {
return cache.get(systemId);
}

@Override
public void setLastUpdateTime(String systemId, Date timestamp) {
if (timestamp == null) {
cache.remove(systemId);
} else {
cache.put(systemId, timestamp);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import java.util.Set;
import org.entur.gbfs.validation.model.ValidationResult;
import org.entur.lamassu.cache.StationSpatialIndexId;
import org.entur.lamassu.cache.UpdateContinuityCache;
import org.entur.lamassu.cache.VehicleSpatialIndexId;
import org.entur.lamassu.cache.impl.RedisUpdateContinuityCache;
import org.entur.lamassu.config.project.LamassuProjectInfoConfiguration;
import org.entur.lamassu.model.entities.GeofencingZones;
import org.entur.lamassu.model.entities.PricingPlan;
Expand Down Expand Up @@ -45,6 +47,8 @@ public class RedissonCacheConfig {
public static final String STATION_SPATIAL_INDEX_KEY = "stationSpatialIndex";
public static final String VALIDATION_REPORTS_CACHE_KEY = "validationReportsCache";
public static final String CACHE_READY_KEY = "cacheReady";
public static final String VEHICLE_STATUS_BASES_KEY = "vehicleStatusBases";
public static final String STATION_STATUS_BASES_KEY = "stationStatusBases";

private final String serializationVersion;
private final Config redissonConfig;
Expand Down Expand Up @@ -188,4 +192,22 @@ public RListMultimap<String, ValidationResult> validationResultsCache(
public RBucket<Boolean> cacheReady(RedissonClient redissonClient) {
return redissonClient.getBucket(CACHE_READY_KEY + "_" + serializationVersion);
}

@Bean
public UpdateContinuityCache vehicleUpdateContinuityCache(
RedissonClient redissonClient
) {
return new RedisUpdateContinuityCache(
redissonClient.getMapCache(VEHICLE_STATUS_BASES_KEY + "_" + serializationVersion)
);
}

@Bean
public UpdateContinuityCache stationUpdateContinuityCache(
RedissonClient redissonClient
) {
return new RedisUpdateContinuityCache(
redissonClient.getMapCache(STATION_STATUS_BASES_KEY + "_" + serializationVersion)
);
}
}
213 changes: 213 additions & 0 deletions src/main/java/org/entur/lamassu/delta/BaseGBFSFileDeltaCalculator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
/*
*
*
* * Licensed under the EUPL, Version 1.2 or – as soon they will be approved by
* * the European Commission - subsequent versions of the EUPL (the "Licence");
* * You may not use this work except in compliance with the Licence.
* * You may obtain a copy of the Licence at:
* *
* * https://joinup.ec.europa.eu/software/page/eupl
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the Licence is distributed on an "AS IS" basis,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the Licence for the specific language governing permissions and
* * limitations under the Licence.
*
*/

package org.entur.lamassu.delta;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public abstract class BaseGBFSFileDeltaCalculator<S, T>
implements GBFSFileDeltaCalculator<S, T> {

private static final List<String> EXCLUDE_METHODS = List.of(
"toString",
"hashCode",
"equals"
);

@Override
public final GBFSFileDelta<T> calculateDelta(@Nullable S base, @NotNull S compare) {
List<GBFSEntityDelta<T>> entityDeltas = getEntityDeltas(base, compare);
return getGBFSFileDelta(base, compare, entityDeltas);
}

private @NotNull GBFSFileDelta<T> getGBFSFileDelta(
S base,
@NotNull S compare,
List<GBFSEntityDelta<T>> entityDeltas
) {
return new GBFSFileDelta<>(
getNullableLastUpdated(base),
getLastUpdated(compare),
getFileName(),
entityDeltas
);
}

private @NotNull List<GBFSEntityDelta<T>> getEntityDeltas(S base, @NotNull S compare) {
List<T> baseEntities = getBaseEntities(base);
Map<String, T> baseEntityMap = getBaseEntityMap(baseEntities);
List<String> baseEntityIds = getEntityIds(baseEntityMap);
List<T> compareEntities = getEntities(compare);
List<String> compareEntityIds = getEntityIds(compareEntities);

return Stream
.of(
getDeletedEntityDeltas(baseEntities, compareEntityIds),
getKeptEntityDeltas(compareEntities, baseEntityMap, baseEntityIds)
)
.flatMap(Collection::stream)
.toList();
}

private @NotNull List<String> getEntityIds(Map<String, ?> entityMap) {
return entityMap.keySet().stream().toList();
}

private @NotNull List<String> getEntityIds(List<T> entities) {
return entities.stream().map(this::getEntityId).toList();
}

private @NotNull List<T> getBaseEntities(S base) {
return base != null ? getEntities(base) : List.of();
}

private @NotNull Map<String, T> getBaseEntityMap(List<T> baseEntities) {
return baseEntities.stream().collect(Collectors.toMap(this::getEntityId, v -> v));
}

private @NotNull List<GBFSEntityDelta<T>> getDeletedEntityDeltas(
List<T> baseEntities,
List<String> compareEntityIds
) {
return baseEntities
.stream()
.map(this::getEntityId)
.filter(id -> !compareEntityIds.contains(id))
.map(id -> new GBFSEntityDelta<T>(id, DeltaType.DELETE, null))
.toList();
}

private @NotNull List<GBFSEntityDelta<T>> getKeptEntityDeltas(
List<T> compareEntities,
Map<String, T> baseEntityMap,
List<String> baseEntityIds
) {
return compareEntities
.stream()
// We do not need to return a delta for entities that haven't changed. We trust the implementation
// of equals from the gbfs model here.
.filter(entity -> !entity.equals(baseEntityMap.get(getEntityId(entity))))
.map(entity -> {
var entityId = getEntityId(entity);
// If the entity exists in the base, then this delta is an update, and we can compute
// the entity delta
if (baseEntityIds.contains(entityId)) {
return new GBFSEntityDelta<>(
entityId,
DeltaType.UPDATE,
getEntityDelta(baseEntityMap.get(entityId), entity)
);
// Otherwise, this is a new entity, and the "delta" contains the entire entity
} else {
return new GBFSEntityDelta<>(entityId, DeltaType.CREATE, entity);
}
})
.toList();
}

private T getEntityDelta(T base, T compare) {
T delta = createEntity();
Method[] methods = base.getClass().getDeclaredMethods();
for (Method method : methods) {
if (isMethodEligibleForDelta(method)) {
Method setter = getSetter(methods, method.getName());
if (setter != null) {
copyValueToDelta(method, setter, compare, delta);
}
}
}
return delta;
}

private boolean isMethodEligibleForDelta(Method method) {
return !EXCLUDE_METHODS.contains(method.getName()) && isGetter(method);
}

private boolean isGetter(Method method) {
return method.getParameterCount() == 0;
}

private void copyValueToDelta(Method getter, Method setter, T source, T target) {
try {
setter.invoke(target, getter.invoke(source));
} catch (IllegalAccessException | InvocationTargetException e) {
throw new GBFSDeltaException(
"Failed to set value for field " + getter.getName(),
e
);
}
}

private @Nullable Method getSetter(Method[] methods, String getterName) {
String setterName = getterName.replace("get", "set");
return Arrays
.stream(methods)
.filter(method1 -> method1.getName().equals(setterName))
.findFirst()
.orElse(null);
}

private Long getNullableLastUpdated(S instance) {
if (instance == null) {
return null;
}
return getLastUpdated(instance);
}

/**
* Get a list of enumerable entities from the GBFS file instance
* @param instance The GBFS file instance
* @return List of enumerable entities of type T
*/
protected abstract List<T> getEntities(S instance);

/**
* Get the id of the entity
* @param entity The entity
* @return The entity's id
*/
protected abstract String getEntityId(T entity);

/**
* Create a new instance of the entity of type T
* @return An instance of T
*/
protected abstract T createEntity();

/**
* Get the last updated time of the GBFS file instance
* @param instance The GBFS file instance
* @return The last updated time
*/
protected abstract long getLastUpdated(S instance);

/**
* Get the file name of the GBFS file
* @return The file name
*/
protected abstract String getFileName();
}
39 changes: 39 additions & 0 deletions src/main/java/org/entur/lamassu/delta/DeltaType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
*
*
* * Licensed under the EUPL, Version 1.2 or – as soon they will be approved by
* * the European Commission - subsequent versions of the EUPL (the "Licence");
* * You may not use this work except in compliance with the Licence.
* * You may obtain a copy of the Licence at:
* *
* * https://joinup.ec.europa.eu/software/page/eupl
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the Licence is distributed on an "AS IS" basis,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the Licence for the specific language governing permissions and
* * limitations under the Licence.
*
*/

package org.entur.lamassu.delta;

/**
* Enum representing the type of delta.
*/
public enum DeltaType {
/**
* A new entity was created
*/
CREATE,

/**
* En existing entity was updated
*/
UPDATE,

/**
* An existing entity was deleted
*/
DELETE,
}
Loading

0 comments on commit 1f3f6b8

Please sign in to comment.