Skip to content


Add replication strategy support (#1)
Browse files Browse the repository at this point in the history
* Basic setup for replication configuration support for Pillar.

* Removed ReplicationOptions as it was redundant.

* Error handling for reading ReplicationStrategy from our config.

* Tweaked the error handling.

* Modified the CommandExecutorTest and added a new test.

* Add qualifications for config params.

* added cassandra unit and tweaked app default params

* Improved README
  • Loading branch information
j-potts authored and markglh committed Jul 14, 2016
1 parent 81566a3 commit e7429d5
Show file tree
Hide file tree
Showing 25 changed files with 1,125 additions and 128 deletions.
2 changes: 0 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,3 @@ jdk:
- oraclejdk8
- 2.11.8
- cassandra
57 changes: 44 additions & 13 deletions
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,32 @@ authored. Migrations are applied in ascending order and reversed in descending o

### Command Line

Here's the short version:
#####Here's the short version:

Given the configuration:

pillar.my_keyspace {
prod {
development {
1. Write migrations, place them in conf/pillar/migrations/myapp.
1. Add pillar settings to conf/application.conf.
1. % pillar initialize myapp
1. % pillar migrate myapp
1. `% pillar initialize -e prod my_keyspace`
1. `% pillar migrate -e prod my_keyspace`

*Note: development is the default environment if nothing is specified*

Or we could compile and run the jar:

java -cp "slf4j-simple.jar:pillar-assembly.jar" de.kaufhof.pillar.cli.App -d "path/to/migrations" -e "prod" initialize "my_keyspace"

#### Migration Files

Expand Down Expand Up @@ -136,26 +156,37 @@ The Pillar command line interface expects to find migrations in conf/pillar/migr
#### Configuration

Pillar uses the [Typesafe Config][typesafeconfig] library for configuration. The Pillar command-line interface expects
to find an application.conf file in ./conf or ./src/main/resources. Given a data store called faker, the
application.conf might look like the following:
to find an application.conf file in ./conf or ./src/main/resources.
The ReplicationStrategy and ReplicationFactor can be configured per environment. If left out completely,
SimplyStrategy with RF 3 will be used by default.
Given a data store called faker, the application.conf might look like the following:

pillar.faker {
development {
cassandra-seed-address: ""
cassandra-keyspace-name: "pillar_development"
replicationStrategy: "SimpleStrategy"
replicationFactor: 0
test {
cassandra-seed-address: ""
cassandra-keyspace-name: "pillar_test"
acceptance_test {
pillar.faker {
development {
cassandra-seed-address: ""
cassandra-keyspace-name: "pillar_acceptance_test"
cassandra-keyspace-name: "pillar_development"
replicationStrategy: "NetworkTopologyStrategy"
replicationFactor: [
{dc1: 2},
{dc2: 3}

##### SSL & Authentication
You can add ssl options and authentication to each of the environments:
You can optionally add ssl options and authentication to each of the environments:

pillar.faker {
development {
Expand Down Expand Up @@ -225,7 +256,7 @@ The package installs to /opt/pillar by default. The /opt/pillar/bin/pillar execu

data-store The target data store, as defined in application.conf

#### Examples
#### More Examples

Initialize the faker datastore development environment

Expand Down
6 changes: 5 additions & 1 deletion project/PillarBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ object PillarBuild extends Build {
"com.typesafe" % "config" % "1.0.1",
"org.clapper" %% "argot" % "1.0.3",
"org.mockito" % "mockito-core" % "1.9.5" % "test",
"org.scalatest" %% "scalatest" % "2.2.0" % "test"
"org.scalatest" %% "scalatest" % "2.2.0" % "test",
"org.cassandraunit" % "cassandra-unit" % "" % "test",
"" % "guava" % "18.0" % "test",
"ch.qos.logback" % "logback-classic" % "1.1.7" % "test"

val rhPackage = TaskKey[File]("rh-package", "Packages the application for Red Hat Package Manager")
Expand Down Expand Up @@ -80,6 +83,7 @@ object PillarBuild extends Build {
Some("releases" at nexus + "service/local/staging/deploy/maven2")
parallelExecution in Test := false,
publishMavenStyle := true,
publishArtifact in Test := false,
pomIncludeRepository := { _ => false },
Expand Down
65 changes: 64 additions & 1 deletion src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,72 @@ pillar.faker {
test {
cassandra-seed-address: ""
cassandra-keyspace-name: "pillar_test"
replicationStrategy: "NetworkTopologyStrategy"
replicationFactor: [
{dc1: 2},
{dc2: 3}
acceptance_test {
cassandra-seed-address: ""
cassandra-keyspace-name: "pillar_acceptance_test"
replicationStrategy: "SimpleStrategy"
replicationFactor: 1

pillar.test {
simpleGood {
cassandra-seed-address: ""
cassandra-keyspace-name: "pillar_test"
replicationStrategy: "SimpleStrategy"
replicationFactor: 1
simpleBadStrat {
cassandra-seed-address: ""
cassandra-keyspace-name: "pillar_test"
replicationStrategy: "SimpleStrategee"
replicationFactor: 1
simpleBadRep {
cassandra-seed-address: ""
cassandra-keyspace-name: "pillar_test"
replicationStrategy: "SimpleStrategy"
replicationFactor: foo
simpleMissingRep {
cassandra-seed-address: ""
cassandra-keyspace-name: "pillar_test"
replicationStrategy: "SimpleStrategy"
simpleZeroRep {
cassandra-seed-address: ""
cassandra-keyspace-name: "pillar_test"
replicationStrategy: "SimpleStrategy"
replicationFactor: 0
netGood {
cassandra-seed-address: ""
cassandra-keyspace-name: "pillar_test"
replicationStrategy: "NetworkTopologyStrategy"
replicationFactor: [
{dc1: 2},
{dc2: 3}
netEmptyRep {
cassandra-seed-address: ""
cassandra-keyspace-name: "pillar_test"
replicationStrategy: "NetworkTopologyStrategy"
replicationFactor: []
netZeroRep {
cassandra-seed-address: ""
cassandra-keyspace-name: "pillar_test"
replicationStrategy: "NetworkTopologyStrategy"
replicationFactor: [
{dc1: 0},
{dc2: 3}
4 changes: 2 additions & 2 deletions src/main/scala/de/kaufhof/pillar/CassandraMigrator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ class CassandraMigrator(registry: Registry) extends Migrator {
selectMigrationsToApply(dateRestriction, appliedMigrations).foreach(_.executeUpStatement(session))

override def initialize(session: Session, keyspace: String, replicationOptions: ReplicationOptions = ReplicationOptions.default) {
executeIdempotentCommand(session, "CREATE KEYSPACE %s WITH replication = %s".format(keyspace, replicationOptions.toString()))
override def initialize(session: Session, keyspace: String, replicationStrategy: ReplicationStrategy) {
executeIdempotentCommand(session, s"CREATE KEYSPACE $keyspace WITH replication = ${replicationStrategy.cql}")
| CREATE TABLE %s.applied_migrations (
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/de/kaufhof/pillar/Migrator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ object Migrator {
trait Migrator {
def migrate(session: Session, dateRestriction: Option[Date] = None)

def initialize(session: Session, keyspace: String, replicationOptions: ReplicationOptions = ReplicationOptions.default)
def initialize(session: Session, keyspace: String, replicationStrategy: ReplicationStrategy = SimpleStrategy())

def destroy(session: Session, keyspace: String)
2 changes: 1 addition & 1 deletion src/main/scala/de/kaufhof/pillar/PrintStreamReporter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import java.util.Date
import com.datastax.driver.core.Session

class PrintStreamReporter(stream: PrintStream) extends Reporter {
override def initializing(session: Session, keyspace: String, replicationOptions: ReplicationOptions) {
override def initializing(session: Session, keyspace: String, replicationStrategy: ReplicationStrategy) {
stream.println(s"Initializing $keyspace")

Expand Down
17 changes: 0 additions & 17 deletions src/main/scala/de/kaufhof/pillar/ReplicationOptions.scala

This file was deleted.

32 changes: 32 additions & 0 deletions src/main/scala/de/kaufhof/pillar/ReplicationStrategy.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package de.kaufhof.pillar

* Defines all possible ReplicationStrategy configurations.
* A NetworkTopologyStrategy will require the appropriate snitch.
sealed trait ReplicationStrategy {
def cql: String
override def toString: String = cql

final case class SimpleStrategy(replicationFactor: Int = 3) extends ReplicationStrategy {
require(replicationFactor > 0)

override def cql: String = s"{'class' : 'SimpleStrategy', 'replication_factor' : $replicationFactor}"

final case class NetworkTopologyStrategy(dataCenters: Seq[CassandraDataCenter]) extends ReplicationStrategy {

override def cql: String = {
val replicationFacString = { dc =>
s"'${}' : ${dc.replicationFactor} "
}.mkString(", ")

s"{'class' : 'NetworkTopologyStrategy', $replicationFacString }"

final case class CassandraDataCenter(name: String, replicationFactor: Int){
require(replicationFactor > 0 && name.nonEmpty)
76 changes: 76 additions & 0 deletions src/main/scala/de/kaufhof/pillar/ReplicationStrategyBuilder.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package de.kaufhof.pillar

import java.util.Map.Entry

import com.typesafe.config.ConfigException.BadValue
import com.typesafe.config.{Config, ConfigException, ConfigObject, ConfigValue}

import scala.util.{Failure, Success, Try}

final case class ReplicationStrategyConfigError(msg: String) extends Exception

object ReplicationStrategyBuilder {

* Parses replication settings from a config that looks like:
* {{{
* replicationStrategy: "SimpleStrategy"
* replicationFactor: 3
* }}}
* or:
* {{{
* replicationStrategy: "NetworkTopologyStrategy"
* replicationFactor: [
* {dc1: 3},
* {dc2: 3}
* ]
* }}}
* @param configuration The applications Typesafe config
* @param dataStoreName The target data store, as defined in application.conf
* @param environment The environment, as defined in application.conf (i.e. "pillar.dataStoreName.environment {...})
* @return ReplicationOptions with a default of Simple Strategy with a replication factor of 3.
def getReplicationStrategy(configuration: Config, dataStoreName: String, environment: String): ReplicationStrategy = try {

val repStrategyStr = Try(configuration.getString(s"pillar.$dataStoreName.$environment.replicationStrategy"))

repStrategyStr match {
case Success(repStrategy) => repStrategy match {
case "SimpleStrategy" =>
val repFactor = configuration.getInt(s"pillar.$dataStoreName.$environment.replicationFactor")

case "NetworkTopologyStrategy" =>
import scala.collection.JavaConverters._
val dcConfigBuffer = configuration

val dcBuffer = for {
item: ConfigObject <- dcConfigBuffer
entry: Entry[String, ConfigValue] <- item.entrySet().asScala
dcName = entry.getKey
dcRepFactor = entry.getValue.unwrapped().toString.toInt
} yield (dcName, dcRepFactor)

val datacenters = dcBuffer
.map(dc => CassandraDataCenter(dc._1, dc._2))


case _ =>
throw new ReplicationStrategyConfigError(s"$repStrategy is not a valid replication strategy.")

case Failure(e: ConfigException.Missing) => SimpleStrategy()
case Failure(e) => throw e
} catch {
case e: IllegalArgumentException => throw new BadValue(s"pillar.$dataStoreName.$environment", e.getMessage)
case e: Exception => throw e
2 changes: 1 addition & 1 deletion src/main/scala/de/kaufhof/pillar/Reporter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import java.util.Date
import com.datastax.driver.core.Session

trait Reporter {
def initializing(session: Session, keyspace: String, replicationOptions: ReplicationOptions)
def initializing(session: Session, keyspace: String, replicationStrategy: ReplicationStrategy)
def migrating(session: Session, dateRestriction: Option[Date])
def applying(migration: Migration)
def reversing(migration: Migration)
Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/de/kaufhof/pillar/ReportingMigrator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import java.util.Date
import com.datastax.driver.core.Session

class ReportingMigrator(reporter: Reporter, wrapped: Migrator) extends Migrator {
override def initialize(session: Session, keyspace: String, replicationOptions: ReplicationOptions = ReplicationOptions.default) {
reporter.initializing(session, keyspace, replicationOptions)
wrapped.initialize(session, keyspace, replicationOptions)
override def initialize(session: Session, keyspace: String, replicationStrategy: ReplicationStrategy) {
reporter.initializing(session, keyspace, replicationStrategy)
wrapped.initialize(session, keyspace, replicationStrategy)

override def migrate(session: Session, dateRestriction: Option[Date] = None) {
Expand Down

0 comments on commit e7429d5

Please sign in to comment.