Skip to content

Commit

Permalink
Merge pull request #45 from rchillyard/Parallel
Browse files Browse the repository at this point in the history
Parallel
  • Loading branch information
rchillyard authored Mar 18, 2023
2 parents 1ea1a6c + d89a4b8 commit d342738
Show file tree
Hide file tree
Showing 29 changed files with 4,605 additions and 178 deletions.
3 changes: 3 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@ libraryDependencies ++= Seq(
"io.github.jmcardon" %% "tsec-cipher-jca" % tsecVersion,
"com.phasmidsoftware" %% "flog" % "1.0.8",
"io.spray" %% "spray-json" % "1.3.6",
//noinspection SbtDependencyVersionInspection
scalaModules %% "scala-parser-combinators" % scalaParserCombinatorsVersion,
"com.github.nscala-time" %% "nscala-time" % nScalaTimeVersion,
"ch.qos.logback" % "logback-classic" % "1.4.5" % "runtime",
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.5",
"org.scalatest" %% "scalatest" % scalaTestVersion % "test"
)
libraryDependencies +=
"org.scala-lang.modules" %% "scala-parallel-collections" % "1.0.4"

Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,25 @@ class CrimeFuncSpec extends AnyFlatSpec with Matchers {
// Set up the source
val sy: IO[Source] = IO.fromTry(for (u <- resource[CrimeFuncSpec](crimeFile)) yield Source.fromURL(u))

val fraction = 4
// Set up the parser (we set the predicate only for demonstration purposes)
val parser: RawTableParser = RawTableParser().setPredicate(TableParser.sampler(2))
val parser: RawTableParser = RawTableParser().setPredicate(TableParser.sampler(fraction))

// Create the table
val wsty: IO[RawTable] = parser.parse(sy)

// CONSIDER how is it that this test runs in around 157 seconds yet the timeout is set to 30 seconds?
// CONSIDER how is it that this test runs in around 157 seconds yet the timeout is set to 30 seconds?
matchIO(wsty, Timeout(Span(30, Seconds))) {
case t@HeadedTable(r, _) =>
val analysis = Analysis(t)
println(s"Crime: $analysis")
analysis.rows shouldBe 43650 +- 2000
analysis.rows shouldBe 87211 / fraction +- 2000
r take 10 foreach println
succeed
}
}

it should "be ingested as a Table[Crime]" in {
it should "be ingested as a Table[Crime]" in {

import CrimeParser._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
package com.phasmidsoftware.examples.teamproject

import com.phasmidsoftware.parse._
import com.phasmidsoftware.table.{HeadedTable, Header, Table}
import com.phasmidsoftware.table.{Content, HeadedTable, Header, Table}
import java.net.URL

/**
Expand Down Expand Up @@ -68,7 +68,7 @@ sealed trait TeamProjectTableParser extends StringTableParser[Table[TeamProject]

val rowParser: RowParser[Row, String] = TeamProjectParser.parser

protected def builder(rows: Iterable[TeamProject], header: Header): Table[Row] = HeadedTable(rows, header)
protected def builder(rows: Iterable[TeamProject], header: Header): Table[Row] = HeadedTable(Content(rows), header)
}

object TeamProjectTableParser extends TeamProjectTableParser

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions src/main/scala/com/phasmidsoftware/parse/Parseable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ object Parseable {
f(w).recoverWith {
case x: IllegalArgumentException =>
Failure(if (w.nonEmpty) InvalidParseException(msg(w), x) else BlankException(x))
case x: NumberFormatException =>
Failure(if (w.nonEmpty) InvalidParseException(msg(w), x) else BlankException(x))
// case x: NumberFormatException =>
// Failure(if (w.nonEmpty) InvalidParseException(msg(w), x) else BlankException(x))
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/com/phasmidsoftware/parse/RawParsers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

package com.phasmidsoftware.parse

import com.phasmidsoftware.table.{HeadedTable, Header, RawRow, RawTable}
import com.phasmidsoftware.table._

/**
* Abstract class to define a raw parser, that's to say a Parser of Seq[String]
Expand Down Expand Up @@ -33,7 +33,7 @@ abstract class RawParsers(maybeHeader: Option[Header], forgiving: Boolean = fals

// protected def builder(rows: Iterable[Row], header: Header): RawTable = HeadedTable(rows.map(r => RawRow(r, header)), header)

protected def builder(rows: Iterable[Row], header: Header): RawTable = HeadedTable(rows, header)
protected def builder(rows: Iterable[Row], header: Header): RawTable = new HeadedTable(Content(rows), header)

protected val rowParser: RowParser[Row, Input] = implicitly[RowParser[Row, String]]
}
Expand Down
15 changes: 8 additions & 7 deletions src/main/scala/com/phasmidsoftware/parse/TableParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import com.phasmidsoftware.crypto.HexEncryption
import com.phasmidsoftware.parse.AbstractTableParser.logException
import com.phasmidsoftware.parse.TableParser.includeAll
import com.phasmidsoftware.table._
import com.phasmidsoftware.util.FP.partition
import com.phasmidsoftware.util.FP.{partition, sequence}
import com.phasmidsoftware.util._
import org.slf4j.{Logger, LoggerFactory}
import scala.annotation.implicitNotFound
Expand Down Expand Up @@ -190,7 +190,7 @@ case class RawTableParser(override protected val predicate: Try[RawRow] => Boole
val rowParser: RowParser[Row, String] = StandardRowParser.create[Row]

// CONSIDER why do we have a concrete Table type mentioned here?
protected def builder(rows: Iterable[Row], header: Header): Table[Row] = HeadedTable(rows, header)
protected def builder(rows: Iterable[Row], header: Header): Table[Row] = HeadedTable(Content(rows), header)

def setHeader(header: Header): RawTableParser = copy(maybeFixedHeader = Some(header))

Expand Down Expand Up @@ -273,7 +273,7 @@ case class EncryptedHeadedStringTableParser[X: CellParser : ClassTag, A: HexEncr
private val phase2Parser = PlainTextHeadedStringTableParser(None, forgiving, headerRowsToRead)

override def parse(xr: Iterator[String], n: Int): IO[Table[X]] = {
def decryptAndParse(h: Header, xt: RawTable): IO[Table[X]] = for (wt <- decryptTable(xt); xt <- phase2Parser.parseRows(wt.rows.iterator, h)) yield xt
def decryptAndParse(h: Header, xt: RawTable): IO[Table[X]] = for (wt <- decryptTable(xt); xt <- phase2Parser.parseRows(wt.iterator, h)) yield xt

val sr: TeeIterator[String] = new TeeIterator(n)(xr)
val hi: IO[Header] = rowParser.parseHeader(sr.tee)
Expand Down Expand Up @@ -346,7 +346,7 @@ case class EncryptedHeadedStringTableParser[X: CellParser : ClassTag, A: HexEncr

private def decryptTable(xt: RawTable): IO[Table[String]] = {
val wit: Table[IO[String]] = xt.map(row => HexEncryption.decryptRow(keyFunction)(row.ws))
for (ws <- IO.parSequenceN(2)(wit.rows.toSeq)) yield wit.unit(ws)
for (ws <- IO.parSequenceN(2)(wit.toSeq)) yield wit.unit(ws)
}
}

Expand Down Expand Up @@ -376,7 +376,7 @@ sealed abstract class HeadedStringTableParser[X: CellParser : ClassTag](maybeFix

type Row = X

protected def builder(rows: Iterable[X], header: Header): Table[Row] = HeadedTable(rows, header)
protected def builder(rows: Iterable[X], header: Header): Table[Row] = HeadedTable(Content(rows), header)

protected val rowParser: RowParser[X, String] = StandardRowParser.create[X]
}
Expand Down Expand Up @@ -463,11 +463,12 @@ abstract class AbstractTableParser[Table] extends TableParser[Table] {

def processTriedRows(rys: Iterator[Try[Row]]) = if (forgiving) {
val (good, bad) = partition(rys)
// CONSIDER using sequenceRev in order to save time
bad foreach failureHandler //AbstractTableParser.logException[Row]
FP.sequence(good filter predicate)
sequence(good filter predicate)
}
else
FP.sequence(rys filter predicate)
sequence(rys filter predicate)

val q: Seq[Try[Row]] = mapTsToRows.toSeq
for (rs <- processTriedRows(q.iterator)) yield builder(rs.toList, header)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ abstract class CsvTableRenderer[T: CsvRenderer : CsvGenerator, O: Writable] exte
o =>
// CONSIDER can remove o2 here and just use o.
val o2 = sw.writeRawLine(o)(hdr)
for (r <- x.rows.toSeq) yield generateText(sw, tc, o2, r)
for (r <- x.content.toSeq) yield generateText(sw, tc, o2, r)
o2
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ trait HierarchicalRenderers {
}
}


/**
* Method to return a HierarchicalRenderer[ Seq[T] ].
* NOTE: there are no identifiers generated with this HierarchicalRenderer.
Expand Down
98 changes: 81 additions & 17 deletions src/main/scala/com/phasmidsoftware/table/Analysis.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
package com.phasmidsoftware.table

import cats.effect.IO
import cats.effect.unsafe.implicits.global
import com.phasmidsoftware.parse.{RawTableParser, TableParser}
import com.phasmidsoftware.util.FP
import com.phasmidsoftware.util.FP.sequence
import scala.io.Source

/**
* Class to represent the analysis of a table.
*
* @param rows the number of rows.
* @param columns the number of columns.
* @param columnMap a map of column names to Column objects (the statistics of a column).
*/
case class Analysis(rows: Int, columns: Int, columnMap: Map[String, Column]) {
override def toString: String = s"Analysis: rows: $rows, columns: $columns, $showColumnMap"

Expand All @@ -14,13 +25,19 @@ case class Analysis(rows: Int, columns: Int, columnMap: Map[String, Column]) {
}

object Analysis {
def apply(table: RawTable): Analysis = {
val wso: Option[Seq[String]] = table.maybeColumnNames

def method1(ws: Seq[String]): Seq[(String, Column)] =
for (w <- ws; z <- FP.sequence(table.column(w)).toSeq; q <- Column.make(z).toSeq) yield w -> q
def apply(table: RawTable): Analysis = {
/**
* Method to create a column map, i.e. a sequence of String->Column pairs.
*
* Complexity of this statement is W * X where W is the number of columns and X is the time to make a Column object.
*
* @param names a sequence of column names.
* @return a sequence of String,Column tuples.
*/
def createColumnMap(names: Seq[String]): Seq[(String, Column)] = for (name <- names; column <- Column.make(table, name)) yield name -> column

val columnMap: Iterable[(String, Column)] = for (ws <- wso.toSeq; z <- method1(ws)) yield z
val columnMap = for (ws <- table.maybeColumnNames.toSeq; z <- createColumnMap(ws)) yield z
new Analysis(table.size, table.head.ws.size, columnMap.toMap)
}
}
Expand All @@ -46,30 +63,77 @@ case class Column(clazz: String, optional: Boolean, maybeStatistics: Option[Stat
}

object Column {
def make(xs: Iterator[String]): Option[Column] = {
val (ws, nulls) = xs.toList.partition(_.nonEmpty)
val optional = nulls.nonEmpty
val co1: Option[Column] = for (xs <- sequence(for (w <- ws) yield w.toIntOption); ys = xs map (_.toDouble)) yield Column("Int", optional, Statistics.make(ys))
lazy val co2: Option[Column] = for (xs <- sequence(for (w <- ws) yield w.toDoubleOption); ys = xs) yield Column("Double", optional, Statistics.make(ys))
co1 orElse co2 orElse Some(Column("String", optional, None))
/**
* Method to make a Column from column values of <code>table</code>, identified by <code>name</code>.
* Some columns cannot be analyzed (e.g., non-numeric columns) and that's why the result is optional.
*
* The complexity of this method is O(N) where N is the number of rows in the table.
*
* @param table the (raw) table from which the column is to be analyzed.
* @param name the name of the column.
* @return an optional Column.
*/
def make(table: RawTable, name: String): Option[Column] = sequence(table.column(name)) flatMap (ws => make(ws))

/**
* Method to make a Column, the analysis of a column of a (raw) Table.
* If the column is numeric (can be parsed as integers or doubles), then we can create a result, otherwise not.
*
* Complexity: O(N) where N is the length of xs.
*
* @param xs an sequence of String values, each corresponding to the column value of a row of the table.
* @return an optional Column.
*/
def make(xs: Seq[String]): Option[Column] = {
val (ws, nulls) = xs.partition(_.nonEmpty)
val nullable: Boolean = nulls.nonEmpty
val co1 = for (xs <- sequence(for (w <- ws) yield w.toIntOption); ys = xs map (_.toDouble)) yield Column("Int", nullable, Statistics.make(ys))
lazy val co2 = for (xs <- sequence(for (w <- ws) yield w.toDoubleOption); ys = xs) yield Column("Double", nullable, Statistics.make(ys))
co1 orElse co2 orElse Some(Column("String", nullable, None))
}
}

/**
* Class to represent the statistics of a column.
*
* @param mu the mean value.
* @param sigma the standard deviation.
* @param min the smallest value.
* @param max the largest value.
*/
case class Statistics(mu: Double, sigma: Double, min: Double, max: Double) {
override def toString: String = s"(range: $min-$max, mean: $mu, stdDev: $sigma)"
}

object Statistics {
def doMake(xs: Seq[Double]): Option[Statistics] = {
val mu = xs.sum / xs.size
val variance = (xs map (_ - mu) map (x => x * x)).sum / xs.size
Some(Statistics(mu, math.sqrt(variance), xs.min, xs.max))
}

def make(xs: Seq[Double]): Option[Statistics] = xs match {
case Nil => None
case h :: Nil => Some(Statistics(h, 0, h, h))
case _ => doMake(xs)
}

private def doMake(xs: Seq[Double]): Option[Statistics] = {
val mu = xs.sum / xs.size
val variance = (xs map (_ - mu) map (x => x * x)).sum / xs.size
Some(Statistics(mu, math.sqrt(variance), xs.min, xs.max))
}
}

object Main extends App {
// TODO merge the two copies of this file into one (it needs to be at the root level of resources)
val crimeFile = "2023-01-metropolitan-street-sample.csv"

// Set up the source
val sy: IO[Source] = IO.fromTry(for (u <- FP.resource[Analysis](crimeFile)) yield Source.fromURL(u))

val fraction = 1
// Set up the parser (we set the predicate only for demonstration purposes)
val parser: RawTableParser = RawTableParser().setPredicate(TableParser.sampler(fraction))

parser.parse(sy).unsafeRunSync() match {
case t@HeadedTable(r, _) =>
val analysis = Analysis(t)
println(s"Crime: $analysis")
r take 10 foreach println
}
}
Loading

0 comments on commit d342738

Please sign in to comment.