Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Droth 3198 update incomplete refactor new branch #1957

Merged
7 changes: 1 addition & 6 deletions aws/fargate/startup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,7 @@ else
echo "batchRunType env is not defined"
exit 1
else
if [ "$batchRunType" = "UpdateIncompleteLinkList" ]; then

echo "UpdateIncompleteLinkList"
java $javaParameter -cp /digiroad2.jar fi.liikennevirasto.digiroad2.util.UpdateIncompleteLinkList
elif [ "$batchRunType" = "DataFixture" ] && [[ ! -z "$trafficSignGroup" ]]; then

if [ "$batchRunType" = "DataFixture" ] && [[ ! -z "$trafficSignGroup" ]]; then
echo "DataFixture with trafficSignGroup"
if [[ ! -z "$batchAction" ]]; then
java $javaParameter -cp /digiroad2.jar fi.liikennevirasto.digiroad2.util.DataFixture "$batchAction" "$trafficSignGroup"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,6 @@ class SpeedLimitSaveProjected[T](speedLimitProvider: SpeedLimitService) extends
}
}

class LinkPropertyUpdater(roadLinkService: RoadLinkService) extends Actor {
def receive = {
case w: RoadLinkChangeSet => roadLinkService.updateRoadLinkChanges(w)
case _ => println("linkPropertyUpdater: Received unknown message")
}
}

class ProhibitionSaveProjected[T](prohibitionProvider: ProhibitionService) extends Actor {
def receive = {
Expand Down Expand Up @@ -368,9 +362,6 @@ object Digiroad2Context {
eventbus.subscribe(speedLimitUpdater, "speedLimits:persistUnknownLimits")
eventbus.subscribe(speedLimitUpdater, "speedLimits:update")

val linkPropertyUpdater = system.actorOf(Props(classOf[LinkPropertyUpdater], roadLinkService), name = "linkPropertyUpdater")
eventbus.subscribe(linkPropertyUpdater, "linkProperties:changed")

val prohibitionSaveProjected = system.actorOf(Props(classOf[ProhibitionSaveProjected[PersistedLinearAsset]], prohibitionService), name = "prohibitionSaveProjected")
eventbus.subscribe(prohibitionSaveProjected, "prohibition:saveProjectedProhibition")

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class CsvDataImporterSpec extends AuthenticatedApiSpec with BeforeAndAfter {
val roadLink = Seq(RoadLink(1, Seq(Point(2, 2), Point(4, 4)), 3.5, Municipality, 1, TrafficDirection.BothDirections, Motorway, None, None, Map("MUNICIPALITYCODE" -> BigInt(408))))

when(mockRoadLinkService.getClosestRoadlinkForCarTrafficFromVVH(any[User], any[Point], any[Boolean])).thenReturn(roadLink)
when(mockRoadLinkService.enrichRoadLinksFromVVH(any[Seq[VVHRoadlink]], any[Seq[ChangeInfo]])).thenReturn(roadLink)
when(mockRoadLinkService.enrichRoadLinksFromVVH(any[Seq[VVHRoadlink]])).thenReturn(roadLink)

def runWithRollback(test: => Unit): Unit = sTestTransactions.runWithRollback()(test)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class MassTransitStopCsvImporterSpec extends AuthenticatedApiSpec with BeforeAnd
val roadLink = Seq(RoadLink(1, Seq(Point(2, 2), Point(4, 4)), 3.5, Municipality, 1, TrafficDirection.BothDirections, Motorway, None, None))

when(mockRoadLinkService.getClosestRoadlinkForCarTrafficFromVVH(any[User], any[Point], any[Boolean])).thenReturn(roadLink)
when(mockRoadLinkService.enrichRoadLinksFromVVH(any[Seq[VVHRoadlink]], any[Seq[ChangeInfo]])).thenReturn(roadLink)
when(mockRoadLinkService.enrichRoadLinksFromVVH(any[Seq[VVHRoadlink]])).thenReturn(roadLink)

def runWithRollback(test: => Unit): Unit = sTestTransactions.runWithRollback()(test)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class CacheClient {
case e: Exception => logger.error("Retrieval of cached value failed", e); throw e
}
}

}

object Caching extends CacheClient {
Expand All @@ -66,5 +67,11 @@ object Caching extends CacheClient {
f
}
}

def flush(): Boolean = {
val result = client.flush()
result.get()
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import fi.liikennevirasto.digiroad2._
import fi.liikennevirasto.digiroad2.client.Caching
import fi.liikennevirasto.digiroad2.dao.RoadLinkDAO.LinkAttributesDao
import fi.liikennevirasto.digiroad2.util.ChangeLanesAccordingToVvhChanges.vvhClient
import fi.liikennevirasto.digiroad2.util.UpdateIncompleteLinkList.generateProperties
import org.joda.time.format.{DateTimeFormat, ISODateTimeFormat}
import org.joda.time.{DateTime, DateTimeZone}
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -220,10 +221,10 @@ class RoadLinkService(val vvhClient: VVHClient, val eventbus: DigiroadEventBus,
val (changeInfos, complementaryLinks, vvhRoadLinks)= Await.result(fut, Duration.Inf)
if (newTransaction)
withDynTransaction {
(enrichRoadLinksFromVVH(vvhRoadLinks ++ complementaryLinks, changeInfos), changeInfos)
(enrichRoadLinksFromVVH(vvhRoadLinks ++ complementaryLinks), changeInfos)
}
else
(enrichRoadLinksFromVVH(vvhRoadLinks ++ complementaryLinks, changeInfos), changeInfos)
(enrichRoadLinksFromVVH(vvhRoadLinks ++ complementaryLinks), changeInfos)
}


Expand Down Expand Up @@ -258,6 +259,7 @@ class RoadLinkService(val vvhClient: VVHClient, val eventbus: DigiroadEventBus,
fetchChangedVVHRoadlinksBetweenDates(since, until)
}


/**
* This method returns road links by municipality.
*
Expand Down Expand Up @@ -461,15 +463,15 @@ class RoadLinkService(val vvhClient: VVHClient, val eventbus: DigiroadEventBus,
)
withDynTransaction {
LogUtils.time(logger, "TEST LOG enrichRoadLinksFromVVH")(
(enrichRoadLinksFromVVH(links, changes), changes)
(enrichRoadLinksFromVVH(links), changes)
)
}
}

def getRoadLinksAndChangesFromVVHWithPolygon(polygon :Polygon): (Seq[RoadLink], Seq[ChangeInfo])= {
val (changes, links) = Await.result(vvhClient.roadLinkChangeInfo.fetchByPolygonF(polygon).zip(vvhClient.roadLinkData.fetchByPolygonF(polygon)), atMost = Duration.Inf)
withDynTransaction {
(enrichRoadLinksFromVVH(links, changes), changes)
(enrichRoadLinksFromVVH(links), changes)
}
}

Expand All @@ -483,7 +485,7 @@ class RoadLinkService(val vvhClient: VVHClient, val eventbus: DigiroadEventBus,
val (complementaryLinks, changes, links) = Await.result(futures, Duration.Inf)

withDynTransaction {
(enrichRoadLinksFromVVH(links ++ complementaryLinks, changes), changes)
(enrichRoadLinksFromVVH(links ++ complementaryLinks), changes)
}
}

Expand Down Expand Up @@ -537,10 +539,10 @@ class RoadLinkService(val vvhClient: VVHClient, val eventbus: DigiroadEventBus,

if(newTransaction){
withDynTransaction {
(enrichRoadLinksFromVVH(links ++ complementaryLinks, changes), changes)
(enrichRoadLinksFromVVH(links ++ complementaryLinks), changes)
}
}
else (enrichRoadLinksFromVVH(links ++ complementaryLinks, changes), changes)
else (enrichRoadLinksFromVVH(links ++ complementaryLinks), changes)

}

Expand Down Expand Up @@ -576,8 +578,8 @@ class RoadLinkService(val vvhClient: VVHClient, val eventbus: DigiroadEventBus,
else enrichRoadLinksFromVVH(links ++ complementaryLinks)
}

def reloadRoadLinksWithComplementaryAndChangesFromVVH(municipalities: Int): (Seq[RoadLink], Seq[ChangeInfo], Seq[RoadLink])= {
val fut = for{
def reloadRoadLinksWithComplementaryAndChangesFromVVH(municipalities: Int): (Seq[RoadLink], Seq[ChangeInfo], Seq[RoadLink]) = {
val fut = for {
f1Result <- vvhClient.complementaryData.fetchWalkwaysByMunicipalitiesF(municipalities)
f2Result <- vvhClient.roadLinkChangeInfo.fetchByMunicipalityF(municipalities)
f3Result <- vvhClient.roadLinkData.fetchByMunicipalityF(municipalities)
Expand All @@ -586,9 +588,23 @@ class RoadLinkService(val vvhClient: VVHClient, val eventbus: DigiroadEventBus,
val (complementaryLinks, changes, links) = Await.result(fut, Duration.Inf)

withDynTransaction {
(enrichRoadLinksFromVVH(links, changes), changes, enrichRoadLinksFromVVH(complementaryLinks, changes))
(enrichRoadLinksFromVVH(links), changes, enrichRoadLinksFromVVH(complementaryLinks))
}
}

def getLinksWithComplementaryAndGenerateProperties(municipalities: Int): (Seq[RoadLink], Seq[ChangeInfo], Seq[RoadLink])= {
val fut = for{
f1Result <- vvhClient.complementaryData.fetchWalkwaysByMunicipalitiesF(municipalities)
f2Result <- vvhClient.roadLinkChangeInfo.fetchByMunicipalityF(municipalities)
f3Result <- vvhClient.roadLinkData.fetchByMunicipalityF(municipalities)
} yield (f1Result, f2Result, f3Result)

val (complementaryLinks, changes, links) = Await.result(fut, Duration.Inf)

withDynTransaction {
(generateProperties(links, changes), changes, generateProperties(complementaryLinks, changes))
}
}

/**
* This method returns road links and change data by municipality.
Expand Down Expand Up @@ -626,10 +642,10 @@ class RoadLinkService(val vvhClient: VVHClient, val eventbus: DigiroadEventBus,
val ((links, links2), changes) = Await.result(links1F.zip(links2F).zip(changeF), atMost = Duration.apply(120, TimeUnit.SECONDS))
if(newTransaction)
withDynTransaction {
(enrichRoadLinksFromVVH(links ++ links2, changes), changes)
(enrichRoadLinksFromVVH(links ++ links2), changes)
}
else
(enrichRoadLinksFromVVH(links ++ links2, changes), changes)
(enrichRoadLinksFromVVH(links ++ links2), changes)
}

/**
Expand Down Expand Up @@ -990,30 +1006,6 @@ class RoadLinkService(val vvhClient: VVHClient, val eventbus: DigiroadEventBus,
}
}

/**
* Updates road link data in OTH db. Used by Digiroad2Context LinkPropertyUpdater Akka actor.
*/
def updateRoadLinkChanges(roadLinkChangeSet: RoadLinkChangeSet): Unit = {

withDynTransaction {
LogUtils.time(logger, s"updateRoadLinkChanges updateIncompleteLinks incompleteLinks: ${roadLinkChangeSet.incompleteLinks.size},") {
updateIncompleteLinks(roadLinkChangeSet.incompleteLinks)
}
}

withDynTransaction {
LogUtils.time(logger, s"updateRoadLinkChanges updateAutoGeneratedProperties adjustedRoadLinks: ${roadLinkChangeSet.adjustedRoadLinks.size},") {
updateAutoGeneratedProperties(roadLinkChangeSet.adjustedRoadLinks)
}
}

withDynTransaction {
LogUtils.time(logger, s"updateRoadLinkChanges fillRoadLinkAttributes roadLinks: ${roadLinkChangeSet.roadLinks.size}, changes: ${roadLinkChangeSet.changes.size},") {
fillRoadLinkAttributes(roadLinkChangeSet.roadLinks, roadLinkChangeSet.changes)
}
}
}

def updateAutoGeneratedProperties(adjustedRoadLinks: Seq[AdjustedRoadLinksAndVVHRoadLink]): Unit = {
def createUsernameForAutogenerated(modifiedBy: Option[String]): Option[String] = {
modifiedBy match {
Expand Down Expand Up @@ -1116,7 +1108,7 @@ class RoadLinkService(val vvhClient: VVHClient, val eventbus: DigiroadEventBus,
/**
* Updates incomplete road link list (incomplete = functional class or link type missing). Used by RoadLinkService.updateRoadLinkChanges.
*/
protected def updateIncompleteLinks(incompleteLinks: Seq[IncompleteLink]): Unit = {
def updateIncompleteLinks(incompleteLinks: Seq[IncompleteLink]): Unit = {

val insertLinkPropertyPS = dynamicSession.prepareStatement(
s"""insert into incomplete_link(id, link_id, municipality_code, administrative_class)
Expand Down Expand Up @@ -1300,7 +1292,7 @@ class RoadLinkService(val vvhClient: VVHClient, val eventbus: DigiroadEventBus,
}
}.toMap
}

def resolveChanges(changesToBeProcessed: Seq[ChangeInfo]): Unit = {
changesToBeProcessed.foreach { change =>
ChangeType.apply(change.changeType) match {
Expand Down Expand Up @@ -1342,64 +1334,12 @@ class RoadLinkService(val vvhClient: VVHClient, val eventbus: DigiroadEventBus,
.sortWith(_.vvhTimeStamp < _.vvhTimeStamp)
resolveChanges(changesToBeProcessed)
}

/**
* This method performs formatting operations to given vvh road links:
* - auto-generation of functional class and link type by feature class
* - information transfer from old link to new link from change data
* It also passes updated links and incomplete links to be saved to db by actor.
*
* @param allVvhRoadLinks
* @param changes
* @return Road links
*/
def enrichRoadLinksFromVVH(allVvhRoadLinks: Seq[VVHRoadlink], changes: Seq[ChangeInfo] = Nil): Seq[RoadLink] = {
val vvhRoadLinks = allVvhRoadLinks.filterNot(_.featureClass == FeatureClass.WinterRoads)
def autoGenerateProperties(roadLink: RoadLink): RoadLink = {
val vvhRoadLink = vvhRoadLinks.find(_.linkId == roadLink.linkId)
vvhRoadLink.get.featureClass match {
case FeatureClass.TractorRoad => roadLink.copy(functionalClass = 7, linkType = TractorRoad, modifiedBy = Some("automatic_generation"), modifiedAt = Some(DateTimePropertyFormat.print(DateTime.now())))
case FeatureClass.DrivePath | FeatureClass.CarRoad_IIIb => roadLink.copy(functionalClass = 6, linkType = SingleCarriageway, modifiedBy = Some("automatic_generation"), modifiedAt = Some(DateTimePropertyFormat.print(DateTime.now())))
case FeatureClass.CycleOrPedestrianPath => roadLink.copy(functionalClass = 8, linkType = CycleOrPedestrianPath, modifiedBy = Some("automatic_generation"), modifiedAt = Some(DateTimePropertyFormat.print(DateTime.now())))
case FeatureClass.SpecialTransportWithoutGate => roadLink.copy(functionalClass = UnknownFunctionalClass.value, linkType = SpecialTransportWithoutGate, modifiedBy = Some("auto_generation"), modifiedAt = Some(DateTimePropertyFormat.print(DateTime.now())))
case FeatureClass.SpecialTransportWithGate => roadLink.copy(functionalClass = UnknownFunctionalClass.value, linkType = SpecialTransportWithGate, modifiedBy = Some("auto_generation"), modifiedAt = Some(DateTimePropertyFormat.print(DateTime.now())))
case FeatureClass.CarRoad_IIIa => vvhRoadLink.get.administrativeClass match {
case State => roadLink.copy(functionalClass = 4, linkType = SingleCarriageway, modifiedBy = Some("automatic_generation"), modifiedAt = Some(DateTimePropertyFormat.print(DateTime.now())))
case Municipality | Private => roadLink.copy(functionalClass = 5, linkType = SingleCarriageway, modifiedBy = Some("automatic_generation"), modifiedAt = Some(DateTimePropertyFormat.print(DateTime.now())))
case _ => roadLink
}
case _ => roadLink //similar logic used in roadaddressbuilder
}
}
def toIncompleteLink(roadLink: RoadLink): IncompleteLink = {
val vvhRoadLink = vvhRoadLinks.find(_.linkId == roadLink.linkId)
IncompleteLink(roadLink.linkId, vvhRoadLink.get.municipalityCode, roadLink.administrativeClass)
}

def canBeAutoGenerated(roadLink: RoadLink): Boolean = {
vvhRoadLinks.find(_.linkId == roadLink.linkId).get.featureClass match {
case FeatureClass.AllOthers => false
case _ => true
}
}

val roadLinkDataByLinkId: Seq[RoadLink] = LogUtils.time(logger,"TEST LOG roadLinkDataByLinkId"){getRoadLinkDataByLinkIds(vvhRoadLinks)}
val (incompleteLinks, completeLinks) = roadLinkDataByLinkId.partition(isIncomplete)
val (linksToAutoGenerate, incompleteOtherLinks) = incompleteLinks.partition(canBeAutoGenerated)
val autoGeneratedLinks = linksToAutoGenerate.map(autoGenerateProperties)
val (changedLinks, stillIncompleteLinks) = LogUtils.time(logger,"TEST LOG fillIncompleteLinksWithPreviousLinkData"){ fillIncompleteLinksWithPreviousLinkData(incompleteOtherLinks, changes)}
val changedPartiallyIncompleteLinks = stillIncompleteLinks.filter(isPartiallyIncomplete)
val stillIncompleteLinksInUse = stillIncompleteLinks.filter(_.constructionType == ConstructionType.InUse)

val adjustedRoadLinks = autoGeneratedLinks ++ changedLinks ++ changedPartiallyIncompleteLinks
val vvhRoadLinksGroupBy = allVvhRoadLinks.groupBy(_.linkId)
val pair = adjustedRoadLinks.map(r=>AdjustedRoadLinksAndVVHRoadLink(r,vvhRoadLinksGroupBy(r.linkId).last))

eventbus.publish("linkProperties:changed", RoadLinkChangeSet(pair, stillIncompleteLinksInUse.map(toIncompleteLink), changes, roadLinkDataByLinkId))

completeLinks ++ autoGeneratedLinks ++ changedLinks ++ stillIncompleteLinks
def enrichRoadLinksFromVVH(allVvhRoadLinks: Seq[VVHRoadlink]): Seq[RoadLink] = {
LogUtils.time(logger,"TEST LOG roadLinkDataByLinkId"){getRoadLinkDataByLinkIds(allVvhRoadLinks)}
}

/**
* Uses old road link ids from change data to fetch their OTH overridden properties from db.
* Used by RoadLinkSErvice.fillIncompleteLinksWithPreviousLinkData.
Expand Down Expand Up @@ -1739,7 +1679,7 @@ class RoadLinkService(val vvhClient: VVHClient, val eventbus: DigiroadEventBus,
)
}

private def getCachedRoadLinksAndChanges(municipalityCode: Int): (Seq[RoadLink], Seq[ChangeInfo]) = {
def getCachedRoadLinksAndChanges(municipalityCode: Int): (Seq[RoadLink], Seq[ChangeInfo]) = {
val (roadLinks, changes, _) = getCachedRoadLinks(municipalityCode)
(roadLinks, changes)
}
Expand All @@ -1756,6 +1696,7 @@ class RoadLinkService(val vvhClient: VVHClient, val eventbus: DigiroadEventBus,
reloadRoadLinksWithComplementaryAndChangesFromVVH(municipalityCode)
)("links:" + municipalityCode)
}

/**
* Call reloadRoadNodesFromVVH
*/
Expand Down
Loading