Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add replication strategy support #1

Merged
merged 18 commits into from
Jul 14, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,3 @@ jdk:
- oraclejdk8
scala:
- 2.11.8
services:
- cassandra
57 changes: 44 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,32 @@ authored. Migrations are applied in ascending order and reversed in descending o

### Command Line

Here's the short version:
#####Here's the short version:

Given the configuration:

```
pillar.my_keyspace {
prod {
...
}
development {
...
}
}
```
1. Write migrations, place them in conf/pillar/migrations/myapp.
1. Add pillar settings to conf/application.conf.
1. % pillar initialize myapp
1. % pillar migrate myapp
1. `% pillar initialize -e prod my_keyspace`
1. `% pillar migrate -e prod my_keyspace`

*Note: development is the default environment if nothing is specified*

Or we could compile and run the jar:

```
java -cp "slf4j-simple.jar:pillar-assembly.jar" de.kaufhof.pillar.cli.App -d "path/to/migrations" -e "prod" initialize "my_keyspace"
```

#### Migration Files

Expand Down Expand Up @@ -136,26 +156,37 @@ The Pillar command line interface expects to find migrations in conf/pillar/migr
#### Configuration

Pillar uses the [Typesafe Config][typesafeconfig] library for configuration. The Pillar command-line interface expects
to find an application.conf file in ./conf or ./src/main/resources. Given a data store called faker, the
application.conf might look like the following:
to find an application.conf file in ./conf or ./src/main/resources.
The ReplicationStrategy and ReplicationFactor can be configured per environment. If left out completely,
SimplyStrategy with RF 3 will be used by default.
Given a data store called faker, the application.conf might look like the following:

```
pillar.faker {
development {
cassandra-seed-address: "127.0.0.1"
cassandra-keyspace-name: "pillar_development"
replicationStrategy: "SimpleStrategy"
replicationFactor: 0
}
test {
cassandra-seed-address: "127.0.0.1"
cassandra-keyspace-name: "pillar_test"
}
acceptance_test {
}
```
```
pillar.faker {
development {
cassandra-seed-address: "127.0.0.1"
cassandra-keyspace-name: "pillar_acceptance_test"
cassandra-keyspace-name: "pillar_development"
replicationStrategy: "NetworkTopologyStrategy"
replicationFactor: [
{dc1: 2},
{dc2: 3}
]
}
}
```

##### SSL & Authentication
You can add ssl options and authentication to each of the environments:
You can optionally add ssl options and authentication to each of the environments:

pillar.faker {
development {
Expand Down Expand Up @@ -225,7 +256,7 @@ The package installs to /opt/pillar by default. The /opt/pillar/bin/pillar execu

data-store The target data store, as defined in application.conf

#### Examples
#### More Examples

Initialize the faker datastore development environment

Expand Down
6 changes: 5 additions & 1 deletion project/PillarBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ object PillarBuild extends Build {
"com.typesafe" % "config" % "1.0.1",
"org.clapper" %% "argot" % "1.0.3",
"org.mockito" % "mockito-core" % "1.9.5" % "test",
"org.scalatest" %% "scalatest" % "2.2.0" % "test"
"org.scalatest" %% "scalatest" % "2.2.0" % "test",
"org.cassandraunit" % "cassandra-unit" % "3.0.0.1" % "test",
"com.google.guava" % "guava" % "18.0" % "test",
"ch.qos.logback" % "logback-classic" % "1.1.7" % "test"
)

val rhPackage = TaskKey[File]("rh-package", "Packages the application for Red Hat Package Manager")
Expand Down Expand Up @@ -80,6 +83,7 @@ object PillarBuild extends Build {
else
Some("releases" at nexus + "service/local/staging/deploy/maven2")
},
parallelExecution in Test := false,
publishMavenStyle := true,
publishArtifact in Test := false,
pomIncludeRepository := { _ => false },
Expand Down
65 changes: 64 additions & 1 deletion src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,72 @@ pillar.faker {
test {
cassandra-seed-address: "127.0.0.1"
cassandra-keyspace-name: "pillar_test"
replicationStrategy: "NetworkTopologyStrategy"
replicationFactor: [
{dc1: 2},
{dc2: 3}
]
}
acceptance_test {
cassandra-seed-address: "127.0.0.1"
cassandra-keyspace-name: "pillar_acceptance_test"
replicationStrategy: "SimpleStrategy"
replicationFactor: 1
}
}
}

pillar.test {
simpleGood {
cassandra-seed-address: "127.0.0.1"
cassandra-keyspace-name: "pillar_test"
replicationStrategy: "SimpleStrategy"
replicationFactor: 1
}
simpleBadStrat {
cassandra-seed-address: "127.0.0.1"
cassandra-keyspace-name: "pillar_test"
replicationStrategy: "SimpleStrategee"
replicationFactor: 1
}
simpleBadRep {
cassandra-seed-address: "127.0.0.1"
cassandra-keyspace-name: "pillar_test"
replicationStrategy: "SimpleStrategy"
replicationFactor: foo
}
simpleMissingRep {
cassandra-seed-address: "127.0.0.1"
cassandra-keyspace-name: "pillar_test"
replicationStrategy: "SimpleStrategy"
}
simpleZeroRep {
cassandra-seed-address: "127.0.0.1"
cassandra-keyspace-name: "pillar_test"
replicationStrategy: "SimpleStrategy"
replicationFactor: 0
}
netGood {
cassandra-seed-address: "127.0.0.1"
cassandra-keyspace-name: "pillar_test"
replicationStrategy: "NetworkTopologyStrategy"
replicationFactor: [
{dc1: 2},
{dc2: 3}
]
}
netEmptyRep {
cassandra-seed-address: "127.0.0.1"
cassandra-keyspace-name: "pillar_test"
replicationStrategy: "NetworkTopologyStrategy"
replicationFactor: []
}
netZeroRep {
cassandra-seed-address: "127.0.0.1"
cassandra-keyspace-name: "pillar_test"
replicationStrategy: "NetworkTopologyStrategy"
replicationFactor: [
{dc1: 0},
{dc2: 3}
]
}
}
4 changes: 2 additions & 2 deletions src/main/scala/de/kaufhof/pillar/CassandraMigrator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ class CassandraMigrator(registry: Registry) extends Migrator {
selectMigrationsToApply(dateRestriction, appliedMigrations).foreach(_.executeUpStatement(session))
}

override def initialize(session: Session, keyspace: String, replicationOptions: ReplicationOptions = ReplicationOptions.default) {
executeIdempotentCommand(session, "CREATE KEYSPACE %s WITH replication = %s".format(keyspace, replicationOptions.toString()))
override def initialize(session: Session, keyspace: String, replicationStrategy: ReplicationStrategy) {
executeIdempotentCommand(session, s"CREATE KEYSPACE $keyspace WITH replication = ${replicationStrategy.cql}")
executeIdempotentCommand(session,
"""
| CREATE TABLE %s.applied_migrations (
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/de/kaufhof/pillar/Migrator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ object Migrator {
trait Migrator {
def migrate(session: Session, dateRestriction: Option[Date] = None)

def initialize(session: Session, keyspace: String, replicationOptions: ReplicationOptions = ReplicationOptions.default)
def initialize(session: Session, keyspace: String, replicationStrategy: ReplicationStrategy = SimpleStrategy())

def destroy(session: Session, keyspace: String)
}
2 changes: 1 addition & 1 deletion src/main/scala/de/kaufhof/pillar/PrintStreamReporter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import java.util.Date
import com.datastax.driver.core.Session

class PrintStreamReporter(stream: PrintStream) extends Reporter {
override def initializing(session: Session, keyspace: String, replicationOptions: ReplicationOptions) {
override def initializing(session: Session, keyspace: String, replicationStrategy: ReplicationStrategy) {
stream.println(s"Initializing $keyspace")
}

Expand Down
17 changes: 0 additions & 17 deletions src/main/scala/de/kaufhof/pillar/ReplicationOptions.scala

This file was deleted.

32 changes: 32 additions & 0 deletions src/main/scala/de/kaufhof/pillar/ReplicationStrategy.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package de.kaufhof.pillar

/**
* Defines all possible ReplicationStrategy configurations.
* A NetworkTopologyStrategy will require the appropriate snitch.
*/
sealed trait ReplicationStrategy {
def cql: String
override def toString: String = cql
}

final case class SimpleStrategy(replicationFactor: Int = 3) extends ReplicationStrategy {
require(replicationFactor > 0)

override def cql: String = s"{'class' : 'SimpleStrategy', 'replication_factor' : $replicationFactor}"
}

final case class NetworkTopologyStrategy(dataCenters: Seq[CassandraDataCenter]) extends ReplicationStrategy {
require(dataCenters.nonEmpty)

override def cql: String = {
val replicationFacString = dataCenters.map { dc =>
s"'${dc.name}' : ${dc.replicationFactor} "
}.mkString(", ")

s"{'class' : 'NetworkTopologyStrategy', $replicationFacString }"
}
}

final case class CassandraDataCenter(name: String, replicationFactor: Int){
require(replicationFactor > 0 && name.nonEmpty)
}
76 changes: 76 additions & 0 deletions src/main/scala/de/kaufhof/pillar/ReplicationStrategyBuilder.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package de.kaufhof.pillar

import java.util.Map.Entry

import com.typesafe.config.ConfigException.BadValue
import com.typesafe.config.{Config, ConfigException, ConfigObject, ConfigValue}

import scala.util.{Failure, Success, Try}

final case class ReplicationStrategyConfigError(msg: String) extends Exception

object ReplicationStrategyBuilder {

/**
* Parses replication settings from a config that looks like:
* {{{
* replicationStrategy: "SimpleStrategy"
* replicationFactor: 3
* }}}
*
* or:
*
* {{{
* replicationStrategy: "NetworkTopologyStrategy"
* replicationFactor: [
* {dc1: 3},
* {dc2: 3}
* ]
* }}}
*
* @param configuration The applications Typesafe config
* @param dataStoreName The target data store, as defined in application.conf
* @param environment The environment, as defined in application.conf (i.e. "pillar.dataStoreName.environment {...})
* @return ReplicationOptions with a default of Simple Strategy with a replication factor of 3.
*/
def getReplicationStrategy(configuration: Config, dataStoreName: String, environment: String): ReplicationStrategy = try {

val repStrategyStr = Try(configuration.getString(s"pillar.$dataStoreName.$environment.replicationStrategy"))

repStrategyStr match {
case Success(repStrategy) => repStrategy match {
case "SimpleStrategy" =>
val repFactor = configuration.getInt(s"pillar.$dataStoreName.$environment.replicationFactor")
SimpleStrategy(repFactor)

case "NetworkTopologyStrategy" =>
import scala.collection.JavaConverters._
val dcConfigBuffer = configuration
.getObjectList(s"pillar.$dataStoreName.$environment.replicationFactor")
.asScala

val dcBuffer = for {
item: ConfigObject <- dcConfigBuffer
entry: Entry[String, ConfigValue] <- item.entrySet().asScala
dcName = entry.getKey
dcRepFactor = entry.getValue.unwrapped().toString.toInt
} yield (dcName, dcRepFactor)

val datacenters = dcBuffer
.map(dc => CassandraDataCenter(dc._1, dc._2))
.toList

NetworkTopologyStrategy(datacenters)

case _ =>
throw new ReplicationStrategyConfigError(s"$repStrategy is not a valid replication strategy.")
}

case Failure(e: ConfigException.Missing) => SimpleStrategy()
case Failure(e) => throw e
}
} catch {
case e: IllegalArgumentException => throw new BadValue(s"pillar.$dataStoreName.$environment", e.getMessage)
case e: Exception => throw e
}
}
2 changes: 1 addition & 1 deletion src/main/scala/de/kaufhof/pillar/Reporter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import java.util.Date
import com.datastax.driver.core.Session

trait Reporter {
def initializing(session: Session, keyspace: String, replicationOptions: ReplicationOptions)
def initializing(session: Session, keyspace: String, replicationStrategy: ReplicationStrategy)
def migrating(session: Session, dateRestriction: Option[Date])
def applying(migration: Migration)
def reversing(migration: Migration)
Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/de/kaufhof/pillar/ReportingMigrator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import java.util.Date
import com.datastax.driver.core.Session

class ReportingMigrator(reporter: Reporter, wrapped: Migrator) extends Migrator {
override def initialize(session: Session, keyspace: String, replicationOptions: ReplicationOptions = ReplicationOptions.default) {
reporter.initializing(session, keyspace, replicationOptions)
wrapped.initialize(session, keyspace, replicationOptions)
override def initialize(session: Session, keyspace: String, replicationStrategy: ReplicationStrategy) {
reporter.initializing(session, keyspace, replicationStrategy)
wrapped.initialize(session, keyspace, replicationStrategy)
}

override def migrate(session: Session, dateRestriction: Option[Date] = None) {
Expand Down
Loading