Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dynamic keys support #2265

Merged
merged 53 commits into from
Jul 17, 2018
Merged

dynamic keys support #2265

merged 53 commits into from
Jul 17, 2018

Conversation

aturley
Copy link
Contributor

@aturley aturley commented Jul 16, 2018

This PR contains the commits to support dynamic keys in Wallaroo.

aturley and others added 30 commits July 13, 2018 18:20
This change is in LocalPartitionRouter, and it calculates the MD5 sum
of a key if the key is a string. This is very preliminary work toward
getting consistent hashing to work.
This attempts to add routes for workers for state entites with string
keys using consistent hashing. It currently fails to find the corrent
route in router.pony:1140. I'm sure I've forgotten to do something
simple.

This doesn't really work yet. and it assumes a fixed keyset, so even
if it worked it wouldn't really be taking advantage of consistent
hashing.
This change starts to actually creates routers that route messages
based on the key, not based on a target step id. It sets up the
routers and the LocalPartitionRouter should use them now. I've added a
new DeliveryMsg type that can forward this data to another worker.

There are still some pieces that are missing. The delivery message
needs to know the message key, but there's no way to pass this to the
router's `route` method. And once the message is sent to another
worker it won't be handled correctly because I haven't made any
changes to the DataRouter.
This changes the DeliveryMsg trait to take arguments that let it look
up its own target_id. This will be useful for routing messages by key.
This adds logic for delivering messages. It appears that the routing
is not set up correctly because it doesn't work yet.
This will help us route messages via consistent hashing. It gives us a
way to link a key to a step.
This change implements the ability to send messages to remote workers
using consistent hashing.

Note that the message is only sent to the remote worker, it is still
not delivered to the correct state step.
This make Wallaroo use consistent hashing to determine which worker
should handle a message. The partition function for the message type
must return a String. The original partitioning system is still in
place for objects with non-String keys.

This currently only works in situations where the cluster is initially
set up to use multiple workers, in other words it does not support
multiple-worker scenarios with autoscale.
This constrains all partition keys to a single Key type
so that we can rely solely on our consistent hashing scheme
to route messages destined for a state partition.
This commit removes all code that was still relying
on the pre-consistent hashing strategy, which we
kept around as a short-term strategy to get things to
compile as we changed the partition routing infrastructure.
During autoscale events, we now use the new partitioning scheme
to determine which steps need to be migrated and where to migrate
them based on which keys fall into which partitions.
This is required for consistent hashing because all keys need to be
strings.
This change is required for consistent hashing because all the keys
need to be strings.
The partition API has changed for consitent hashing, all tests have
been updated to use the new API.
We no longer support U64 partition keys so this shouldn't be there.
These applications were breaking integration tests.
These applications were breaking integration tests.
When running the `run` method we were calling `step.run(...)`, which
cause the message to be processed the message asynchronously. It now
calls `step.process_message(...)` and things are done synchronously.
…ions

The consistent hashing changes have changed the way that information
about state entities is stored. This commit adds four new queries.

state-entity -- gets a list of the state entities on the queried worker

state-entity-count -- gets the number of state entities on the queried
worker

stateless-partitions -- gets a list of the stateless partitions in the
entire cluster

stateless-partitions-count -- gets the number of stateless partitions
in the entire cluster
This change passes the StateStepCreator to the producers that need to
have a reference to it. These producers are
* DataReceiver
* Step
* TCPSource

I'm ignoring the KafkaSource for now.
The `StateStepCreator` now creates a new step when it receives a
`report_unknown_key` message.

The `Step` object is currently configured in parts as part of topology
initialization. This change doesn't do any of that configuration, so
the new step is not expected to work as of this commit.
The `StateStepCreator` now initializes new `Step`s after they are
created and only informs the requesting producers about the step's
existence when the initialization is complete. This ensures that only
steps that are ready to process messages are handed off to producers.

The `StateStepCreator` also keeps a list of producers that are waiting
for a particular new step and notifies all of them when the step is
ready. This way we correctly handle a situation where multiple reports
of the same missing key come in before the step to handle that key is
completely set up.
This is a rough pass at storing message data if the key is not known
and then reprocessing the message data once a new step has been
created to handle the key. This commit is only designed to work with a
message that has a locally-handled key that comes in over TCP.

This doesn't seem to work. For some reason I can't convert the data
that I'm trying to reprocess into the correct type.
This will now reprocess a message from a TCP source after adding a new
step.

Unfortunately now the new step seems to have an issue in
`StepIdRouter._route_with_target_id(...)`, where it fails because it
can't get a route to the next step its producer.
aturley and others added 23 commits July 13, 2018 18:50
When an unknown state/key combination is encountered in a
LocalParitionRouter it sends a message to the producer to let it know
that the key is missing. The producer calls the state step creator,
which creates a new step and triggers an router update. When the
producer receives a new router it tries to send any messages that were
queued because they were for unknown keys.

This commit only makes this work local for TCP Sources.
Dynamic keys now work for certain fixed multi-worker
topologies. `DataReceiver`s, `DataRouter`s, and `ForwardHashedMsg`s
now correctly report unknown keys so that new state steps can be
created for them, and pending messages are routed again.

This has only been tested in alphabet.pony.
Unknown keys were not getting removed from the pending message store
after they were delivered, so they would be delivered each time a new
unknown key was processed, which caused duplicate messages.
This commit makes dynamic keys work with autoscale.
This commit makes it possible to add new keys dynamically when using
Kafka sources.
The Python and Go API have been updated to support using Pony Strings
a keys. In Python the partition function returns a Python string. In
Go the partition function returns a `[]byte` (byte slice). This change
allows keys to be arbitrary sequences of bytes.
This change updates the Go word count applicaiton to use `[]byte`
keys. This will help folks understand the change.
The `route_with` method was the same for all `Rerouter`s except that
they would provide a router, so now `Rerouter` has a common
`route_with` implementation and classes that implement the `Rerouter`
trait must provide a `router` method that returns the router. This
reduces the amount of code needed for `Rerouter` classes and puts all
of the logical changes to `route_with` in one place. This will make it
easier to maintain the code.
There was a bug that prevented Go applications from building because
the partition key size was incorrectly set.
The new partitioning API requires that keys be `[]byte`s. The Go
version of Market Spread now uses this partitioning scheme.
Wallaroo now supports dynamic keys, so applications no longer need to
supply a list of partition keys in the builder step. This will make
the API easier to use.

Example and demo code has been updated to use these changes where
appropriate.
- All workers get their own `external_port` so they can be queried
- New observability query to get state entities from all workers and
  amalgamate them together
- Updates to correctness/tests/* to match changes to integration API
- Updated verifications for autoscale tests to match how the new
  migration protocol and observability queries work.
- see `/testing/tools/try_until`
- machida is reintroduced as a build dependency
- Python tests are enabled
- An ENV based option to pause a test after failure and before workers
  are destroyed is added.
  To enable this, set `env pause_for_user=true`
  To disable, `unset pause_for_user`
  Default: unset, so it doesn't happen in CI.
- Retry test_migration until it passes or times out to deal with
  spurious tests errors to observability channel race conditions.
These code changes were mostly cosmetic (renaming and moving classes
and files).
The new worker now gets propogated through the _state_builders, to the
StateSubpartitions, to the KeyDistributions, and update the
HashPartitions object. This had been causing the
"initialization/LocalTopologyEquality" test to fail.
The test was removing the wrong key, so the equality was failing.
Routing is now determined by the HashPartitions object, so it doesn't
really make sense to have a test that updates the routing directly.
There was a bug where the StateStepCreator would create a new state
step for a state/key multiple times because it would check to see if a
step existed in a map that was only updated after the step was
initialized. Step initialization is asynchronous, so other messages
for that state/key could have come in before the initialization was
complete.

We now store the state/key when we first receive it and check that
data structure every time we receive a new `unknown_key` message to
see if we've seen it before. If we have, we know that it is being
handled and we do nothing.

Fixes 2209
adds pcre2 to Circle CI Dockerfile and updates config to use latest
Docker image with pcre2
@aturley aturley merged commit becbea1 into master Jul 17, 2018
@aturley aturley deleted the d-keys-rebase branch July 17, 2018 12:39
@aturley aturley mentioned this pull request Jul 17, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants