diff --git a/book/core-concepts/decoders-and-encoders.md b/book/core-concepts/decoders-and-encoders.md index 6e5f72035c..988f330ada 100644 --- a/book/core-concepts/decoders-and-encoders.md +++ b/book/core-concepts/decoders-and-encoders.md @@ -59,74 +59,51 @@ Let's make that a bit more concrete. Let's say we are sending in two messages; e Wallaroo's `TCPSource` takes a `Decoder` that can process a framed message protocol. You'll need to implement three methods. Below is a `Decoder` that we can use to process our stream of strings that we layed out in the previous section. ```python -class Decoder(object): - def header_length(self): - return 4 - - def payload_length(self, bs): - return struct.unpack(">I", bs)[0] - - def decode(self, bs): - return bs.decode("utf-8") +@wallaroo.decoder(header_length=4, length_fmt=">I") +def decode(self, bs): + return bs.decode("utf-8") ``` -`header_length` will return the fixed size of our payload field. +`header_length` is the fixed size of our payload field. -`payload_length` is used to decode our message header to determine how long the payload is going to be. In this case, we are relying on the Python `struct` package to decode the bytes. If you aren't familiar with `struct`, you can check out [the documentation](https://docs.python.org/2/library/struct.html) to learn more. Remember, when using `struct`, don't forget to import it! +`length_fmt` is used internally to decode our message header to determine how long the payload is going to be. We rely on the Python `struct` package to decode the bytes. If you aren't familiar with `struct`, you can check out [the documentation](https://docs.python.org/2/library/struct.html) to learn more. Remember, when using `struct`, don't forget to import it! `decode` takes a series of bytes that represent your payload and turns it into an application message. In this case, our application message is a string, so we take the incoming byte stream `bs` and convert it to UTF-8 Python string. Here's a slightly more complicated example taken from our [Alphabet Popularity Contest example](https://github.com/WallarooLabs/wallaroo/tree/{{ book.wallaroo_version }}/examples/python/alphabet). ```python -class Decoder(object): - def header_length(self): - return 4 - - def payload_length(self, bs): - return struct.unpack(">I", bs)[0] - - def decode(self, bs): - (letter, vote_count) = struct.unpack(">sI", bs) - return Votes(letter, vote_count) -``` - -Like we did in our previous example, we're using a 4-byte payload length header: - -```python -class Decoder(object): - def header_length(self): - return 4 +@wallaroo.decoder(header_length=4, length_fmt=">I") +def decode(self, bs): + (letter, vote_count) = struct.unpack(">sI", bs) + return Votes(letter, vote_count) ``` -We're once again use `struct` to decode our header into a payload length: +Like we did in our previous example, we're using a 4-byte integer payload length header: ```python -class Decoder(object): - def payload_length(self, bs): - return struct.unpack(">I", bs)[0] +@wallaroo.decoder(header_length=4, length_fmt=">I") ``` But this time, we are doing something slightly more complicated in our payload. Our payload is two items, a string representing a letter and some votes for that letter. We'll unpack those using `struct` and create a domain specific object `Votes` to return. ```python -class Decoder(object): - def decode(self, bs): - (letter, vote_count) = struct.unpack(">sI", bs) - return Votes(letter, vote_count) +def decode(self, bs): + (letter, vote_count) = struct.unpack(">sI", bs) + return Votes(letter, vote_count) ``` ## Creating an Encoder Wallaroo encoders do the opposite of decoders. A decoder takes a stream of bytes and turns in into messages for Wallaroo to process. An encoder receives a stream of messages and converts them into a stream of bytes for sending to another system. -Here's a quick example `Encoder`: +Here's a quick example encoder: ```python -class Encoder(object): - def encode(self, data): - # data is a string - return data + "\n" +@wallaroo.encoder +def encode(self, data): + # data is a string + return data + "\n" ``` This is just about the simplest encoder you could have. It's from the [Reverse Word example](https://github.com/WallarooLabs/wallaroo/tree/{{ book.wallaroo_version }}/examples/python/reverse). It takes a string that we want to send to an external system as an input, adds a newline at the end and returns it for sending. @@ -139,10 +116,10 @@ class Votes(object): self.letter = letter self.votes = votes -class Encoder(object): - def encode(self, data): - # data is a Votes - return struct.pack(">IsQ", 9, data.letter, data.votes) +@wallaroo.encoder +def encode(self, data): + # data is a Votes + return struct.pack(">IsQ", 9, data.letter, data.votes) ``` Let's take a look at what is happening here. First of all, we are once again using the Python `struct` package. In this case, though, we are creating our own packed binary message. It's a framed message with a 4-byte header for the payload length, plus the payload: diff --git a/book/core-concepts/partitioning.md b/book/core-concepts/partitioning.md index 420eb601ab..714c19c6b5 100644 --- a/book/core-concepts/partitioning.md +++ b/book/core-concepts/partitioning.md @@ -11,6 +11,7 @@ class Stock(object): self.price = price +@wallaroo.state class Stocks(object): def __init__(self): self.stocks = {} @@ -46,6 +47,7 @@ To do this, a _partition function_ is used to determine which _state part_ a par In order to take advantage of state partitioning, state objects need to be broken down. In the stock example there is already a class that represents an individual stock, so it can be used as the partitioned state. ```python +@wallaroo.state class Stock(object): def __init__(self, symbol, price): self.symbol = symbol @@ -55,12 +57,12 @@ class Stock(object): Since the computation only has one stock in its state now, there is no need to do a dictionary look up. Instead, the computation can update the particular Stock's state right away. ```python -class UpdateStock(object): - def compute(self, stock, state): - state.symbol = stock.symbol - state.price = stock.price +@wallaroo.state_computation +def compute(self, stock, state): + state.symbol = stock.symbol + state.price = stock.price - return (None, True) + return (None, True) ``` ### Partition Key @@ -73,7 +75,7 @@ Currently, the partition keys for a particular partition need to be defined alon The partition function takes in message data and returns a partition key. In the example, the message symbol would be extracted from the message data and returned as the key. ```python -class SymbolPartitionFunction(object): - def partition(self, data): - return data.symbol +@wallaroo.partition +def partition(self, data): + return data.symbol ``` diff --git a/book/core-concepts/state.md b/book/core-concepts/state.md index 9a088364ea..b371138707 100644 --- a/book/core-concepts/state.md +++ b/book/core-concepts/state.md @@ -23,6 +23,7 @@ So what is a state object? Let's explain via an example. Imagine we are writing Wallaroo allows you to define your own state objects that match your domain. For example, our example application might define a state object as: ```python +@wallaroo.state class Stock(object): def __init__(self, symbol, price): self.symbol = symbol diff --git a/book/core-concepts/working-with-state.md b/book/core-concepts/working-with-state.md index 3d4f3671a1..9a2671bcfe 100644 --- a/book/core-concepts/working-with-state.md +++ b/book/core-concepts/working-with-state.md @@ -7,6 +7,7 @@ Wallaroo's state objects allow developers to define their own domain-specific da Imagine a word counting application. We'll have a state object for each different word. Each word and count object would look something like: ```python +@wallaroo.state class WordAndCount(object): def __init__(self, word="", count=0): self.word = word diff --git a/book/python/api.md b/book/python/api.md index 61958a6f70..c0deb2d4d2 100644 --- a/book/python/api.md +++ b/book/python/api.md @@ -4,7 +4,7 @@ The Wallaroo Python API allows developers to create Wallaroo applications in Pyt ## Overview -In order to create a Wallaroo application in Python, developers need to create classes that provide the required interfaces for each step in their pipline, and then connect them together in a topology structure that is returned by the entry-point function `application_setup`. +In order to create a Wallaroo application in Python, developers need to create functions and classes that provide the required interfaces for each step in their pipline, and then connect them together in a topology structure that is returned by the entry-point function `application_setup`. The recommended way to create your topology structure is by using the [ApplicationBuilder](#wallarooapplicationbuilder) in the `wallaroo` module. @@ -21,7 +21,6 @@ The recommended way to create your topology structure is by using the [Applicati * [KafkaSinkEncoder](#kafkasinkencoder) * [kafkaSourceDecoder](#kafkasourcedecoder) * [State](#state) -* [StateBuilder](#statebuilder) * [StateComputation](#statecomputation) * [TCPSourceConfig](#tcpsourceconfig) * [TCPSinkConfig](#tcpsinkconfig) @@ -32,7 +31,7 @@ After Machida loaded a Wallaroo Python application module, it executes its entry The `wallaroo` module provides an [ApplicationBuilder](#wallarooapplicationbuilder) that facilitates the creation of this data structure. When `ApplicationBuilder` is used, a topology can be built using its methods, and then its structure can be return by calling `ApplicationBuilder.build()`. -For a simple application with a `Decoder`, `Computation`, and `Encoder`, this function may look like +For a simple application with a decoder, computation, and encoder, this function may look like ```python def application_setup(args): @@ -41,9 +40,9 @@ def application_setup(args): ab = wallaroo.ApplicationBuilder("My Application") ab.new_pipeline("pipeline 1", - wallaroo.TCPSourceConfig(in_host, in_port, Decoder())) - ab.to(Computation) - ab.to_sink(wallaroo.TCPSinkConfig(out_host, out_port, Encoder())) + wallaroo.TCPSourceConfig(in_host, in_port, decoder)) + ab.to(computation) + ab.to_sink(wallaroo.TCPSinkConfig(out_host, out_port, encoder)) ab.done() return ab.build() ``` @@ -74,35 +73,36 @@ Close the current pipeline object. This is necessary if you intend to add another pipeline. -##### `to(computation_cls)` +##### `to(computation_f)` -Add a stateless computation _class_ to the current pipeline. Only a single instance of the computation will be created. +Add a stateless computation _function_ to the current pipeline. -Note that this method takes a _class_, rather than an _instance_ as its argument. `computation_cls` must be a [Computation](#computation). +Note that this method takes a _function_ decorated by `@computation`. See [Computation](#computation). -##### `to_parallel(computation_cls)` +##### `to_parallel(computation_f)` -Add a stateless computation _class_ to the current pipeline. Creates one copy of the computation per worker in the cluster allowing you to parallelize stateless work. +Add a stateless computation _function_ to the current pipeline. +Creates one copy of the computation per worker in the cluster allowing you to parallelize stateless work. -Note that this method takes a _class_, rather than an _instance_ as its argument. `computation_cls` must be a [Computation](#computation). +Note that this method takes a _function_, decorated by `@computation`. See [Computation](#computation). -##### `to_stateful(computation, state_builder, state_name)` +##### `to_stateful(computation, state_class, state_name)` -Add a StateComputation _instance_, along with a [StateBuilder](#statebuilder) _instance_ and `state_name` to the current pipeline. +Add a state computation _function_, along with a [State](#state) _class_ and `state_name` to the current pipeline. `computation` must be a [StateComputation](#statecomputation). -`state_builder` must be a [StateBuilder](#statebuilder). +`state_class` must be a [State](#state). `state_name` must be a str. `state_name` is the name of the state object that we will run computations against. You can share the object across pipelines by using the same name. Using different names for different objects, keeps them separate and in this way, acts as a sort of namespace. -##### `to_state_partition(computation, state_builder, state_partition_name, partition_function, partition_keys)` +##### `to_state_partition(computation, state, state_partition_name, partition_function, partition_keys)` -Add a partitioned StateComputation to the current pipeline. +Add a partitioned state computation to the current pipeline. `computation` must be a [StateComputation](#statecomputation). -`state_builder` must be [StateBuilder](#statebuilder). +`state` must be [State](#state). `state_partition_name` must be a str. `state_partition_name` is the name of the collection of state object that we will run computations against. You can share state partitions across pipelines by using the same name. Using different names for different partitions, keeps them separate and in this way, acts as a sort of namespace. @@ -116,7 +116,7 @@ Add a partitioned stateful computation to the current pipeline. `computation` must be a [StateComputation](#statecomputation). -`state_builder` must be [StateBuilder](#statebuilder). +`state` must be [State](#state). `state_partition_name` must be a str. `state_partition_name` is the name of the collection of state object that we will run computations against. You can share state partitions across pipelines by using the same name. Using different names for different partitions, keeps them separate and in this way, acts as a sort of namespace. @@ -136,11 +136,7 @@ Return the complete list of topology tuples. This is the topology structure Wall A stateless computation is a simple function that takes input, returns an output, and does not modify any variables outside of its scope. e.g. a stateless computation has _no side effects_. -A `Computation` class must provide the `name` and `compute` methods. - -##### `name()` - -Return the name of the computaiton as a string. +A `computation` class must be wrapped by the `@computation(name)` decorator, which takes thename of the computation as an argument. ##### `compute(data)` @@ -155,34 +151,28 @@ Use `data` to perform a computation and return a series of new outputs. `data` i A Computation that doubles an integer, or returns 0 if its input was not an int: ```python -class Computation(object): - def name(self): - return "double" - - def compute(data): - if isinstance(data, int): - return data*2 - else - return 0 +@computation("double") +def compute(data): + if isinstance(data, int): + return data*2 + else + return 0 ``` A Computation that returns both its input integer and double that value. If the incoming data isn't an integer, we filter aka drop the message by returning `None`. ```python -class Computation(object): - def name(self): - return "double" - - def compute_multi(data): - if isinstance(data, int): - return [data, data*2] - else - return None +@computation_multi("doubledouble"): +def compute_multi(data): + if isinstance(data, int): + return [data, data*2] + else + return None ``` ### Data -Data is the object that is passed to a [Computation](#computation) and [StateComputation](#statecomputation)'s `compute` method. It is a plain Python object and can be as simple or as complex as you would like it to. +Data is the object that is passed to a [Computation](#computation) and a [StateComputation](#statecomputation)'s method. It is a plain Python object and can be as simple or as complex as you would like it to be. It is important to ensure that data returned is always immutable or unique to avoid any unexpected behvaiour due to the asynchronous execution nature of Wallaroo. @@ -192,30 +182,26 @@ Partition Keys must correctly support the `__eq__` (`==`) and `__hash__` operato ### PartitionFunction -A PartitionFunction class must provide the `partition` method. - -##### `partition(data)` - -Return the appropriate [Key](#key) for `data`. +A partition function class must be wrapped in the `@partition_function` decorator and return the appropriate [Key](#key) for `data`. #### Example PartitionFunction An example that partitions words for a word-count based on their first character, and buckets all other cases to the empty string key ```python -PartitionFunction(object): - def partition(self, data): - try: - return data[0].lower() if data[0].isalpha() else '' - except: - return '' +@partition_function +def partition(self, data): + try: + return data[0].lower() if data[0].isalpha() else '' + except: + return '' ``` -### TCPSinkEncoder +### TCP Sink Encoder -The `TCPSinkEncoder` is responsible for taking the output of the last computation in a pipeline and converting it into a `bytes` for Wallaroo to send out over a TCP connection. +The TCP Sink Encoder is responsible for taking the output of the last computation in a pipeline and converting it into a `bytes` for Wallaroo to send out over a TCP connection. -To do this, a `TCPSinkEncoder` class must provide the `encode(data)` method. +To do this, wrap your function in the `@encode` decorator. ##### `encode(data)` @@ -226,31 +212,26 @@ Return a `bytes` that can be sent over the network. It is up to the developer to A complete `TCPSinkEncoder` example that takes a list of integers and encodes it to a sequence of big-endian Longs preceded by a big-endian short representing the number of integers in the list: ```python -class Encoder(object): - def encode(self, data): - fmt = '>H{}'.format('L'*len(data)) - return struct.pack(fmt, len(data), *data) +@encoder +def encode(self, data): + fmt = '>H{}'.format('L'*len(data)) + return struct.pack(fmt, len(data), *data) ``` -### TCPSourceDecoder +### TCP Source Decoder -The `TCPSourceDecoder` is responsible for two tasks: +The TCP Source Decoder is responsible for two tasks: 1. Telling Wallaroo _how many bytes to read_ from its input connection. 2. Converting those bytes into an object that the rest of the application can process. -To do this, a `TCPSourceDecoder` class must implement the following three methods: - -##### `header_length()` - -Return an integer representing the number of bytes from the beginning of an incoming message to return to the function that reads the payload length. +To do this, wrap your function with the `@decoder(header_length, fmt)` decorator. -##### `payload_length(bs)` +##### `header_length` -Return an integer representing the number of bytes after the payload length bytes to return to the function that decodes the payload. +An integer representing the number of bytes from the beginning of an incoming message to return to the function that reads the payload length. -`bs` is a `bytes` of length `header_length()`, and it's up to the user to provide a method for converting that into an integer value. - -A common encoding used in Wallaroo is a big-endien 32-bit unsigned integer, which can be decoded with the [struct module's](https://docs.python.org/2/library/struct.html) help: +##### `fmt` +A format string that represents how the message header is encoded. A common encoding used in Wallaroo is a big-endian 32-bit unsigned integer, which can be decoded with the [struct module's](https://docs.python.org/2/library/struct.html) help: ```python struct.unpack('>L', bs) @@ -267,64 +248,54 @@ Return a python a python object of the type the next step in the pipeline expect A complete TCPSourceDecoder example that decodes messages with a 32-bit unsigned integer _payload_length_ and a character followed by a 32-bt unsigned int in its _payload_: ```python -class Decoder(object): - def header_length(self): - return 4 - - def payload_length(self, bs): - return struct.unpack('>L', bs) - - def decode(self, bs): - return struct.unpack('>1sL', bs) +@decoder(header_length=4, length_fmt=">L") +def decode(self, bs): + return struct.unpack('>1sL', bs) ``` -### KafkaSinkEncoder + ### Kafka Sink Encoder -The `KafkaSinkEncoder` is responsible for taking the output of the last computation in a pipeline and converting it into a `bytes` for Wallaroo to send out to a Kafka sink, along with a `key` or `None`. - -To do this, a `KafkaSinkEncoder` class must provide the `encode(data)` method. - -##### `encode(data)` +The Kafka Sink Encoder is responsible for taking the output of the last computation in a pipeline and converting it into a `bytes` for Wallaroo to send out to a Kafka sink, along with a `key` or `None`. -Return a tuple of `(bytes, key)` that can be sent over the network. It is up to the developer to determine how to translate `data` into a `bytes` and `key`, and what information to keep or discard. +To do this, create an encoder function, in the same way you would for a TCP Sink Encoder, but Return a tuple of `(bytes, key)` that can be sent over the network. It is up to the developer to determine how to translate `data` into a `bytes` and `key`, and what information to keep or discard. #### Example KafkaSinkEncoder A complete `KafkaSinkEncoder` example that takes a word and sends it to the partition corresponding to the first letter of the word: ```python -class Encoder(object): - def encode(self, data): - word = data[:] - letter_key = data[0] - return (word, letter_key) +@encoder +def encode(self, data): + word = data[:] + letter_key = data[0] + return (word, letter_key) ``` -### KafakSourceDecoder +### Kafka Source Decoder -The `TCPSourceDecoder` is responsible for converting bytes into an object that the rest of the application can process. +The Kafka Source Decoder is responsible for converting bytes into an object that the rest of the application can process. -To do this, a `KafkaSourceDecoder` class must implement the following method: +To do this, create a decoder function, in the same way you would for a TCP Source Decoder. ##### `decode(bs)` Return Python object of the type the next step in the pipeline expects. -`bs` is a `bytes` of the length returned by [payload_length](#payload-length(bs)), and it is up to the developer to translate that into a Python object. +`bs` is a `bytes` of the length returned by parsing the header, and it is up to the developer to translate that into a Python object. #### Example KafkaSourceDecoder A complete KafkaSourceDecoder example that decodes messages with a 32-bit unsigned int in its _payload_: ```python -class Decoder(object): - def decode(self, bs): - return struct.unpack('>1sL', bs) +@decoder(header_length=4, length_fmt=">L") +def decode(self, bs): + return struct.unpack('>1sL', bs) ``` ### State -State is an object that is passed to the [StateCompution's](#statecomputation) `compute` method. It is a plain Python object and can be as simple or as complex as you would like it to. +State is an object that is passed to the [StateCompution's](#statecomputation) `compute` method. It is a plain Python object and can be as simple or as complex as you would like it to. The class definition must be wrapped in the `@state` decorator. A common issue that arises with asynchronous execution, is that when references to mutable objects are passed to the next step, if another update to the state precedes the execution of the next step, it will then execute with the latest state (that is, it will execute with the "wrong" state). Therefore, anything returned by a [Computation](#computation) or [StateComputation](#statecomputation) ought to be either unique, or immutable. @@ -337,6 +308,7 @@ In either case, it is up to the developer to provide a side-effect safe value fo An AlphabetCounts keeps a count for how many times each letter in the English alphabet has been seen ```python +@state AlphabetCounts(objects): def __init__(self, initial_counts): self.data = dict(initial_counts) @@ -354,39 +326,17 @@ AlphabetCounts(objects): return self.data[c] ``` -### StateBuilder - -A StateBuilder is used by Wallaroo to create an initial state object for a [StateComputation](#statecomputation). - -##### `build()` - -Return a new [State](#state) instance. - -#### Example StateBuilder - -Return a AlphabetCounts object initialized with a count of zero for each letter in the English alphabet - -```python -class StateBuilder(object): - def build(self): - return AlphabetCounts([(c, 0) for c in string.ascii_lowercase]) -``` - ### StateComputation -A StateComputation is similar to a [Computation](#computation), except that its `compute` method takes an additional argument: `state`. +A State Computation is similar to a [Computation](#computation), except that it takes an additional argument: `state`. In order to provide resilience, Wallaroo needs to keep track of state changes, or side effects, and it does so by making `state` an explicit object that is given as input to any StateComputation step. -Like Computation, a StateComputation class must provide the `name` and `compute` methods. - -##### `name()` - -Return the name of the computation as a string. +Similarly to a Computation, a StateComputation class must be wrapped in the `@state_computation` decorator. ##### `compute(data, state)` -`data` is anything that was returned by the previous step in the pipeline, and `state` is provided by the [StateBuilder](#statebuilder) that was defined for this step in the pipeline definition. +`data` is anything that was returned by the previous step in the pipeline, and `state` is the [State](#state) that was defined for this step in the pipeline definition. Returns a tuple. The first element is a message that we will send on to our next step. It should be a new object. Returning `None` will stop processing and no messages will be sent to the next step. The second element is a boolean value instructing Wallaroo to save our updated state so that in the event of a crash, we can recover to this point. Return `True` to save `state`. Return `False` to not save `state`. @@ -397,23 +347,20 @@ Why wouldn't we always return `True`? There are two answers: ##### `compute_multi(data, state)` -Same as `compute` but the first element of the return tuple is a list of messages to send on to our next step. Allows taking a single input message and creating multiple outputs. Each item in the list will arrive individually at the next step; i.e. not as a list. +Same as `compute` but the first element of the return tuple is a list of messages to send on to our next step. Allows taking a single input message and creating multiple outputs. Each item in the list will arrive individually at the next step; i.e. not as a list. Wrap this function in `@state_computation_multi`. #### Example StateComputation An example StateComputation that keeps track of the maximum integer value it has seen so far: ```python -class StateComputation(object): - def name(self): - return "max" - - def compute(self, data, state): - try: - state.update(data) - except TypeError: - pass - return (state.get_max(), True) +@state_computation(name='max'): +def compute(self, data, state): + try: + state.update(data) + except TypeError: + pass + return (state.get_max(), True) ``` ### TCPSourceConfig diff --git a/book/python/word-count.md b/book/python/word-count.md index 568d160065..4cab444dcf 100644 --- a/book/python/word-count.md +++ b/book/python/word-count.md @@ -22,11 +22,11 @@ def application_setup(args): ab = wallaroo.ApplicationBuilder("Word Count Application") ab.new_pipeline("Split and Count", - wallaroo.TCPSourceConfig(in_host, in_port, Decoder())) - ab.to_parallel(Split) - ab.to_state_partition(CountWord(), WordTotalsBuilder(), "word totals", - WordPartitionFunction(), word_partitions) - ab.to_sink(wallaroo.TCPSinkConfig(out_host, out_port, Encoder())) + wallaroo.TCPSourceConfig(in_host, in_port, decoder)) + ab.to_parallel(split) + ab.to_state_partition(count_word, WordTotals, "word totals", + partition, word_partitions) + ab.to_sink(wallaroo.TCPSinkConfig(out_host, out_port, encoder)) return ab.build() ``` @@ -35,10 +35,10 @@ By now, hopefully, most of this looks somewhat familiar. We're building on conce ```python ab = wallaroo.ApplicationBuilder("Word Count Application") ab.new_pipeline("Split and Count", - wallaroo.TCPSourceConfig(in_host, in_port, Decoder())) + wallaroo.TCPSourceConfig(in_host, in_port, decoder)) ``` -Upon receiving some textual input, our word count application will route it to a stateless computation called `Split`. `Split` is responsible for breaking the text down into individual words. You might notice something a little different about how we set up this stateful computation. In our previous example, we called `to` on our application builder. In this case, we are calling `to_parallel`. What's the difference between `to` and `to_parallel`? The `to` method creates a single instance of the stateless computation. No matter how many workers we might run in our Wallaroo cluster, there will only be a single instance of the computation. Every message that is processed by the computation will need to be routed the worker running that computation. `to_parallel` is different. By doing `to_parallel(Split)`, we are placing an instance of the `Split` computation on every worker in our cluster. +Upon receiving some textual input, our word count application will route it to a stateless computation called `split`. `split` is responsible for breaking the text down into individual words. You might notice something a little different about how we set up this stateful computation. In our previous example, we called `to` on our application builder. In this case, we are calling `to_parallel`. What's the difference between `to` and `to_parallel`? The `to` method creates a single instance of the stateless computation. No matter how many workers we might run in our Wallaroo cluster, there will only be a single instance of the computation. Every message that is processed by the computation will need to be routed the worker running that computation. `to_parallel` is different. By doing `to_parallel(split)`, we are placing the `split` computation on every worker in our cluster. ### `A to` vs `to_parallel` digression @@ -74,19 +74,19 @@ def application_setup(args): ab = wallaroo.ApplicationBuilder("Word Count Application") ab.new_pipeline("Split and Count", - wallaroo.TCPSourceConfig(in_host, in_port, Decoder())) - ab.to_parallel(Split) - ab.to_state_partition(CountWord(), WordTotalsBuilder(), "word totals", - WordPartitionFunction(), word_partitions) - ab.to_sink(wallaroo.TCPSinkConfig(out_host, out_port, Encoder())) + wallaroo.TCPSourceConfig(in_host, in_port, decoder)) + ab.to_parallel(split) + ab.to_state_partition(count_word, WordTotals, "word totals", + partition, word_partitions) + ab.to_sink(wallaroo.TCPSinkConfig(out_host, out_port, encoder)) return ab.build() ``` Beyond `to_parallel`, there's nothing new in our word count application. After we split our text chunks into words, they get routed to a state partition where they are counted. ```python -ab.to_state_partition(CountWord(), WordTotalsBuilder(), "word totals", - WordPartitionFunction(), word_partitions) + ab.to_state_partition(count_word, WordTotals, "word totals", + partition, word_partitions) ``` Note we setup up 27 partitions to count our words, one for each letter plus one called "!" which will handle any "word" that doesn't start with a letter: @@ -97,8 +97,8 @@ word_partitions.append("!") ... -ab.to_state_partition(CountWord(), WordTotalsBuilder(), "word totals", - WordPartitionFunction(), word_partitions) +ab.to_state_partition(count_word, WordTotals, "word totals", + partition, word_partitions) ``` ### Splitting words @@ -106,55 +106,50 @@ ab.to_state_partition(CountWord(), WordTotalsBuilder(), "word totals", Our word splitting is mostly uninteresting, except for one huge difference, our previous examples had one output for each input. When splitting text into words, we take one input and produce multiple outputs. Let's see how that is done. ```python -class Split(object): - def name(self): - return "split into words" +@wallaroo.computation_multi(name="split into words") +def split(data): + punctuation = " !\"#$%&'()*+,-./:;<=>?@[\]^_`{|}~" - def compute_multi(self, data): - punctuation = " !\"#$%&'()*+,-./:;<=>?@[\]^_`{|}~" + words = [] - words = [] + for line in data.split("\n"): + clean_line = line.lower().strip(punctuation) + for word in clean_line.split(' '): + clean_word = word.strip(punctuation) + words.append(clean_word) - for line in data.split("\n"): - clean_line = line.lower().strip(punctuation) - for word in clean_line.split(' '): - clean_word = word.strip(punctuation) - words.append(clean_word) - - return words + return words ``` -Did you catch what is going on? Previously, we've seen our stateless computations have a `compute` method. `Split` doesn't have a `compute` method. It has a `compute_multi` method. Why both `compute` and `compute_multi`? The answer lies in Python's type system. +Did you catch what is going on? Previously, we've seen our stateless computations wrapped by the `computation` decorator. Why do we have both `computation` and `computation_multi`? The answer lies in Python's type system. -### `compute` vs `compute_multi` +### `computation` vs `computation_multi` Wallaroo's Python API allows a programmer to indicate that the output of a computation is meant to be treated as a single output by using the `compute` method. This allows us, for example, to split some text into words and have that list of words treated as a single item by Wallaroo. In our word splitting case, that isn't what we want. We want each word to be handled individually. `compute_multi` lets us tell Wallaroo that each of these words is a new message and should be handled individually. -By using `compute_multi`, each word will be handled individually. This allows us to then route each one based on its first letter for counting. If you look below, you can see that our word partitioning function is expecting words, not a list, which makes sense. +By using `computation_multi`, each word will be handled individually. This allows us to then route each one based on its first letter for counting. If you look below, you can see that our word partitioning function is expecting words, not a list, which makes sense. ```python -class WordPartitionFunction(object): - def partition(self, data): - if data[0] >= 'a' or data[0] <= 'z': - return data[0] - else: - return "!" +@wallaroo.partition +def partition(data): + if data[0] >= 'a' or data[0] <= 'z': + return data[0] + else: + return "!" ``` ### Our counting guts -The next three classes are the core of our word counting application. By this point, our messages has been split into individual words and run through our `WordPartitionFunction` and will arrive at a state computation based on the first letter of the word. +The next three classes are the core of our word counting application. By this point, our messages has been split into individual words and run through our `partition` function and will arrive at a state computation based on the first letter of the word. -Let's take a look at we have. `CountWord` is a `StateComputation`. When it's run, we update our `word_totals` state to reflect the new incoming `word`. Then, it returns a tuple of the return value from `word_totals.get_count` and `True`. The return value of `get_count` is an instance of the `WordCount` class containing the word and its current count. +Let's take a look at we have. `CountWord` is a State Computation. When it's run, we update our `word_totals` state to reflect the new incoming `word`. Then, it returns a tuple of the return value from `word_totals.get_count` and `True`. The return value of `get_count` is an instance of the `WordCount` class containing the word and its current count. ```python -class CountWord(): - def name(self): - return "Count Word" +@wallaroo.state_computation(name="Count Word") +def count_word(word, word_totals): + word_totals.update(word) + return (word_totals.get_count(word), True) - def compute(self, word, word_totals): - word_totals.update(word) - return (word_totals.get_count(word), True) class WordCount(object): def __init__(self, word, count): @@ -165,6 +160,7 @@ class WordCount(object): `WordTotals` isn't all that interesting. When we `update`, we check to see if we have seen the word before and if not, add it to our map of words and set the count to one. If we have seen the word before, we increment its count. `get_count` looks up a word and returns a `WordCount` for it. ```python +@wallaroo.state class WordTotals(object): def __init__(self): self.word_totals = {} @@ -184,9 +180,9 @@ class WordTotals(object): By this point, our word has almost made it to the end of the line. The only thing left is the sink and encoding. We don't do anything fancy with our encoding. We take the word, its count and format it into a single line of text that our receiver can record. ```python -class Encoder(object): - def encode(self, data): - return data.word + " => " + str(data.count) + "\n" +@wallaroo.encoder +def encoder(data): + return data.word + " => " + str(data.count) + "\n" ``` ### Running `word_count.py` diff --git a/book/python/writing-your-own-application.md b/book/python/writing-your-own-application.md index 1cf9a10107..52cd4f72d9 100644 --- a/book/python/writing-your-own-application.md +++ b/book/python/writing-your-own-application.md @@ -15,13 +15,10 @@ The `reverse.py` application is going to receive text as its input, reverse it, Let's start with the computation, because that's the purpose of the application: ```python -class Reverse(object): - def name(self): - return "reverse" - - def compute(self, data): - print "compute", data - return data[::-1] +@wallaroo.computation(name='reverse'): +def reverse(self, data): + print "compute", data + return data[::-1] ``` A Computation has no state, so it only needs to define its name, and how to convert input data into output data. In this case, string reversal is performed with a slice notation. @@ -33,11 +30,11 @@ Note that there is a `print` statement in the `compute` method (and in other met Next, we are going to define how the output gets constructed for the sink. It is important to remember that Wallaroo sends its output over the network, so data going through the sink needs to be of type `bytes`. In Python2, this is the same as `str`. ```python -class Encoder(object): - def encode(self, data): - # data is a string - print "encode", data - return data + "\n" +@wallaroo.encoder +def encode(self, data): + # data is a string + print "encode", data + return data + "\n" ``` ### SourceDecoder @@ -45,18 +42,10 @@ class Encoder(object): Now, we also need to decode the incoming bytes of the source. ```python -class Decoder(object): - def header_length(self): - print "header_length" - return 4 - - def payload_length(self, bs): - print "payload_length", bs - return struct.unpack(">I", bs)[0] - - def decode(self, bs): - print "decode", bs - return bs.decode("utf-8") +@wallaroo.decoder(header_length=4, length_fmt=">I") +def decode(self, bs): + print "decode", bs + return bs.decode("utf-8") ``` This one is different. Wallaroo handles _streams of bytes_, and in order to do that efficiently, it uses a method called message framing. This means that Wallaroo requires input data to follow a special structure, as well as for the application to provide the mechanism with which to decode this data. @@ -90,7 +79,7 @@ An application is constructed of pipelines which, in turn, are constructed from ```python ab = wallaroo.ApplicationBuilder("Reverse Word") ab.new_pipeline("reverse", - wallaroo.TCPSourceConfig("localhost", "7002", Decoder())) + wallaroo.TCPSourceConfig("localhost", "7002", decoder)) ``` Each pipeline must have a source, and each source must have a decoder, so `new_pipeline` takes a name and a `TCPSourceConfig` instance as its arguments. @@ -98,13 +87,13 @@ Each pipeline must have a source, and each source must have a decoder, so `new_p Next, we add the computation step: ```python -ab.to(Reverse) +ab.to(reverse) ``` And finally, we add the sink, using a `TCPSinkConfig`: ```python -ab.to_sink(wallaroo.TCPSinkConfig("localhost", "7010", Encoder())) +ab.to_sink(wallaroo.TCPSinkConfig("localhost", "7010", encoder)) ``` ### The `application_setup` Entry Point @@ -118,9 +107,9 @@ def application_setup(args): ab = wallaroo.ApplicationBuilder("Reverse Word") ab.new_pipeline("reverse", - wallaroo.TCPSourceConfig(in_host, in_port, Decoder())) - ab.to(Reverse) - ab.to_sink(wallaroo.TCPSinkConfig(out_host, out_port, Encoder())) + wallaroo.TCPSourceConfig(in_host, in_port, decoder)) + ab.to(reverse) + ab.to_sink(wallaroo.TCPSinkConfig(out_host, out_port, encoder)) return ab.build() ``` diff --git a/book/python/writing-your-own-stateful-application.md b/book/python/writing-your-own-stateful-application.md index c6c89b8a93..b785cdf90f 100644 --- a/book/python/writing-your-own-stateful-application.md +++ b/book/python/writing-your-own-stateful-application.md @@ -19,13 +19,10 @@ As with the Reverse Word example, we will list the components required: The computation here is fairly straightforward: given a data object and a state object, update the state with the new data, and return some data that tells Wallaroo what to do next. ```python -class AddVotes(object): - def name(self): - return "add votes" - - def compute(self, data, state): - state.update(data) - return (state.get_votes(data.letter), True) +@wallaroo.state_computation(name='add votes') +def add_votes(self, data, state): + state.update(data) + return (state.get_votes(data.letter), True) ``` Let's dig into that tuple that we are returning: @@ -53,6 +50,7 @@ class Votes(object): And a state map: ```python +@wallaroo.state class AllVotes(object): def __init__(self): self.votes_by_letter = {} @@ -73,22 +71,14 @@ class AllVotes(object): This map is the `state` object that `AddVotes.compute` above takes. An important thing to note here is that `get_votes` returns a _new_ `Votes` instance. This is important, as this is the value that is returned eventually passed to `Encoder.encode`, and if we passed a reference to a mutable object here, there is no guarantee that `Encoder.encode` will execute before another update to this object. -Lastly, a stateful application's pipeline is going to need a `StateBuilder`, so let's create one: - -```python -class LetterStateBuilder(object): - def build(self): - return AllVotes() -``` - ### Encoder The encoder is going to receive a `Votes` instance and encode into a string with the letter, followed by the vote count as a big-endian 64-bit unsigned integer: ```python -class Encoder(object): - def encode(self, data): - # data is a Votes - return struct.pack(">IsQ", 9, data.letter, data.votes) +@wallaroo.encoder +def encode(self, data): + # data is a Votes + return struct.pack(">IsQ", 9, data.letter, data.votes) ``` ### Decoder @@ -96,16 +86,10 @@ class Encoder(object): The decoder, like the one in Reverse Word, is going to use a `header_length` of 4 bytes to denote a big-endian 32-bit unsigned integer. Then, for the data, it is expecting a single character followed by a big-endian 32-bit unsigned integer. Here we use the `struct` module to unpack these integers from the bytes string. ```python -class Decoder(object): - def header_length(self): - return 4 - - def payload_length(self, bs): - return struct.unpack(">I", bs)[0] - - def decode(self, bs): - (letter, vote_count) = struct.unpack(">sI", bs) - return Votes(letter, vote_count) +@wallaroo.decoder(header_length=4, length_fmt=">I") +def decode(self, bs): + (letter, vote_count) = struct.unpack(">sI", bs) + return Votes(letter, vote_count) ``` ### Application Setup @@ -119,25 +103,25 @@ def application_setup(args): ab = wallaroo.ApplicationBuilder("alphabet") ab.new_pipeline("alphabet", - wallaroo.TCPSourceConfig(in_host, in_port, Decoder())) - ab.to_stateful(AddVotes(), LetterStateBuilder(), "letter state") - ab.to_sink(wallaroo.TCPSinkConfig(out_host, out_port, Encoder())) + wallaroo.TCPSourceConfig(in_host, in_port, decoder)) + ab.to_stateful(add_votes, AllVotes, "letter state") + ab.to_sink(wallaroo.TCPSinkConfig(out_host, out_port, encoder)) return ab.build() ``` The only difference between this setup and the stateless Reverse Word's one is that while in Reverse Word we used: ```python -ab.to(Reverse) +ab.to(reverse) ``` here we use: ```python -ab.to_stateful(AddVotes(), LetterStateBuilder(), "letter state") +ab.to_stateful(add_votes, AllVotes, "letter state") ``` -That is, while the stateless computation constructor `to` took only a computation class as its argument, the stateful computation constructor `to_stateful` takes a computation _instance_, as well as a state-builder _instance_, along with the name of that state. +That is, while the stateless computation constructor `to` took only a computation class as its argument, the stateful computation constructor `to_stateful` takes a computation _function_, as well as a state _class_, along with the name of that state. ### Miscellaneous diff --git a/examples/python/alphabet/alphabet.py b/examples/python/alphabet/alphabet.py index fe85929e04..0532ab41f9 100644 --- a/examples/python/alphabet/alphabet.py +++ b/examples/python/alphabet/alphabet.py @@ -25,9 +25,9 @@ def application_setup(args): ab = wallaroo.ApplicationBuilder("alphabet") ab.new_pipeline("alphabet", - wallaroo.TCPSourceConfig(in_host, in_port, Decoder())) - ab.to_stateful(AddVotes(), LetterStateBuilder(), "letter state") - ab.to_sink(wallaroo.TCPSinkConfig(out_host, out_port, Encoder())) + wallaroo.TCPSourceConfig(in_host, in_port, decoder)) + ab.to_stateful(add_votes, AllVotes, "letter state") + ab.to_sink(wallaroo.TCPSinkConfig(out_host, out_port, encoder)) return ab.build() @@ -39,17 +39,13 @@ def deserialize(bs): return pickle.loads(bs) -class LetterStateBuilder(object): - def build(self): - return AllVotes() - - class Votes(object): def __init__(self, letter, votes): self.letter = letter self.votes = votes +@wallaroo.state class AllVotes(object): def __init__(self): self.votes_by_letter = {} @@ -57,7 +53,6 @@ def __init__(self): def update(self, votes): letter = votes.letter vote_count = votes.votes - votes_for_letter = self.votes_by_letter.get(letter, Votes(letter, 0)) votes_for_letter.votes += vote_count self.votes_by_letter[letter] = votes_for_letter @@ -68,28 +63,19 @@ def get_votes(self, letter): return Votes(letter, vbl.votes) -class Decoder(object): - def header_length(self): - return 4 - - def payload_length(self, bs): - return struct.unpack(">I", bs)[0] - - def decode(self, bs): - (letter, vote_count) = struct.unpack(">sI", bs) - return Votes(letter, vote_count) - +@wallaroo.decoder(header_length=4, length_fmt=">I") +def decoder(bs): + (letter, vote_count) = struct.unpack(">sI", bs) + return Votes(letter, vote_count) -class AddVotes(object): - def name(self): - return "add votes" - def compute(self, data, state): - state.update(data) - return (state.get_votes(data.letter), True) +@wallaroo.state_computation(name="add votes") +def add_votes(data, state): + state.update(data) + return (state.get_votes(data.letter), True) -class Encoder(object): - def encode(self, data): - # data is a Votes - return struct.pack(">IsQ", 9, data.letter, data.votes) +@wallaroo.encoder +def encoder(data): + # data is a Votes + return struct.pack(">IsQ", 9, data.letter, data.votes) diff --git a/examples/python/alphabet_partitioned/alphabet_partitioned.py b/examples/python/alphabet_partitioned/alphabet_partitioned.py index 1f95ad55b2..e8eaf71de1 100644 --- a/examples/python/alphabet_partitioned/alphabet_partitioned.py +++ b/examples/python/alphabet_partitioned/alphabet_partitioned.py @@ -27,10 +27,10 @@ def application_setup(args): letter_partitions = list(string.ascii_lowercase) ab = wallaroo.ApplicationBuilder("alphabet") ab.new_pipeline("alphabet", - wallaroo.TCPSourceConfig(in_host, in_port, Decoder())) - ab.to_state_partition(AddVotes(), LetterStateBuilder(), "letter state", - LetterPartitionFunction(), letter_partitions) - ab.to_sink(wallaroo.TCPSinkConfig(out_host, out_port, Encoder())) + wallaroo.TCPSourceConfig(in_host, in_port, decoder)) + ab.to_state_partition(add_votes, TotalVotes, "letter state", + partition, letter_partitions) + ab.to_sink(wallaroo.TCPSinkConfig(out_host, out_port, encoder)) return ab.build() @@ -42,16 +42,12 @@ def deserialize(bs): return pickle.loads(bs) -class LetterPartitionFunction(object): - def partition(self, data): - return data.letter[0] - - -class LetterStateBuilder(object): - def build(self): - return TotalVotes() +@wallaroo.partition +def partition(data): + return data.letter[0] +@wallaroo.state class TotalVotes(object): def __init__(self): self.letter = 'X' @@ -71,32 +67,19 @@ def __init__(self, letter, votes): self.votes = votes -class Decoder(object): - def header_length(self): - return 4 - - def payload_length(self, bs): - return struct.unpack(">I", bs)[0] - - def decode(self, bs): - (letter, vote_count) = struct.unpack(">1sI", bs) - return Votes(letter, vote_count) - +@wallaroo.decoder(header_length=4, length_fmt=">I") +def decoder(bs): + (letter, vote_count) = struct.unpack(">sI", bs) + return Votes(letter, vote_count) -class AddVotes(object): - def name(self): - return "add votes" - def compute(self, data, state): - state.update(data) - return (state.get_votes(), True) +@wallaroo.state_computation(name="add votes") +def add_votes(data, state): + state.update(data) + return (state.get_votes(), True) -class Encoder(object): - def encode(self, data): - # data is a Votes - letter = data.letter - votes = data.votes - print "letter is " + str(letter) - print "votes is " + str(votes) - return struct.pack(">IsQ", 9, data.letter, data.votes) +@wallaroo.encoder +def encoder(data): + # data is a Votes + return struct.pack(">IsQ", 9, data.letter, data.votes) diff --git a/examples/python/celsius-kafka/celsius.py b/examples/python/celsius-kafka/celsius.py index 7a044a9ea3..95707ae943 100644 --- a/examples/python/celsius-kafka/celsius.py +++ b/examples/python/celsius-kafka/celsius.py @@ -29,41 +29,35 @@ def application_setup(args): ab.new_pipeline("convert", wallaroo.KafkaSourceConfig(in_topic, in_brokers, in_log_level, - Decoder())) + decoder)) - ab.to(Multiply) - ab.to(Add) + ab.to(multiply) + ab.to(add) ab.to_sink(wallaroo.KafkaSinkConfig(out_topic, out_brokers, out_log_level, out_max_produce_buffer_ms, out_max_message_size, - Encoder())) + encoder)) return ab.build() -class Decoder(object): - def decode(self, bs): - if len(bs) < 4: - return 0.0 - return struct.unpack('>f', bs[:4])[0] +@wallaroo.decoder(header_length=4, length_fmt=">I") +def decoder(bs): + if len(bs) < 4: + return 0.0 + return struct.unpack('>f', bs[:4])[0] -class Multiply(object): - def name(self): - return "multiply by 1.8" +@wallaroo.computation(name="multiply by 1.8") +def multiply(data): + return data * 1.8 - def compute(self, data): - return data * 1.8 +@wallaroo.computation(name="add 32") +def add(data): + return data + 32 -class Add(object): - def name(self): - return "add 32" - def compute(self, data): - return data + 32 - - -class Encoder(object): - def encode(self, data): - # data is a float - return (struct.pack('>f', data), None) +@wallaroo.encoder +def encoder(data): + # data is a float + return (struct.pack('>f', data), None) diff --git a/examples/python/celsius/celsius.py b/examples/python/celsius/celsius.py index 09f10147de..ef50b86d3c 100644 --- a/examples/python/celsius/celsius.py +++ b/examples/python/celsius/celsius.py @@ -24,41 +24,28 @@ def application_setup(args): ab = wallaroo.ApplicationBuilder("Celsius to Fahrenheit") ab.new_pipeline("Celsius Conversion", - wallaroo.TCPSourceConfig(in_host, in_port, Decoder())) - ab.to(Multiply) - ab.to(Add) - ab.to_sink(wallaroo.TCPSinkConfig(out_host, out_port, Encoder())) + wallaroo.TCPSourceConfig(in_host, in_port, decoder)) + ab.to(multiply) + ab.to(add) + ab.to_sink(wallaroo.TCPSinkConfig(out_host, out_port, encoder)) return ab.build() -class Decoder(object): - def header_length(self): - return 4 +@wallaroo.decoder(header_length=4, length_fmt=">I") +def decoder(bs): + return struct.unpack('>f', bs)[0] - def payload_length(self, bs): - return struct.unpack(">I", bs)[0] - def decode(self, bs): - return struct.unpack('>f', bs)[0] +@wallaroo.computation(name="multiply by 1.8") +def multiply(data): + return data * 1.8 -class Multiply(object): - def name(self): - return "multiply by 1.8" +@wallaroo.computation(name="add 32") +def add(data): + return data + 32 - def compute(self, data): - return data * 1.8 - -class Add(object): - def name(self): - return "add 32" - - def compute(self, data): - return data + 32 - - -class Encoder(object): - def encode(self, data): - # data is a float - return struct.pack('>If', 4, data) +@wallaroo.encoder +def encoder(data): + return struct.pack('>If', 4, data) diff --git a/examples/python/market_spread/market_spread.py b/examples/python/market_spread/market_spread.py index 13098c0204..32e6c33844 100644 --- a/examples/python/market_spread/market_spread.py +++ b/examples/python/market_spread/market_spread.py @@ -27,10 +27,6 @@ SIDETYPE_SELL = 2 -def test_python(): - return "hello python" - - def str_to_partition(stringable): ret = 0 for x in range(0, len(stringable)): @@ -56,19 +52,19 @@ def application_setup(args): ab = wallaroo.ApplicationBuilder("market-spread") ab.new_pipeline( "Orders", - wallaroo.TCPSourceConfig(order_host, order_port, OrderDecoder()) + wallaroo.TCPSourceConfig(order_host, order_port, order_decoder) ).to_state_partition_u64( - CheckOrder(), SymbolDataBuilder(), "symbol-data", - SymbolPartitionFunction(), symbol_partitions - ).to_sink(wallaroo.TCPSinkConfig(out_host, out_port, - OrderResultEncoder()) - ).new_pipeline( + check_order, SymbolData, "symbol-data", + symbol_partition_function, symbol_partitions + ).to_sink(wallaroo.TCPSinkConfig(out_host, out_port, + order_result_encoder) + ).new_pipeline( "Market Data", wallaroo.TCPSourceConfig(nbbo_host, nbbo_port, - MarketDataDecoder()) + market_data_decoder) ).to_state_partition_u64( - UpdateMarketData(), SymbolDataBuilder(), "symbol-data", - SymbolPartitionFunction(), symbol_partitions + update_market_data, SymbolData, "symbol-data", + symbol_partition_function, symbol_partitions ).done() return ab.build() @@ -85,34 +81,25 @@ class MarketSpreadError(Exception): pass +@wallaroo.state class SymbolData(object): - def __init__(self, last_bid, last_offer, should_reject_trades): + def __init__(self, last_bid=0.0, last_offer=0.0, should_reject_trades=True): self.last_bid = last_bid self.last_offer = last_offer self.should_reject_trades = should_reject_trades -class SymbolDataBuilder(object): - def build(self): - return SymbolData(0.0, 0.0, True) - - -class SymbolPartitionFunction(object): - def partition(self, data): - return str_to_partition(data.symbol) +@wallaroo.partition +def symbol_partition_function(data): + return str_to_partition(data.symbol) -class CheckOrder(object): - def name(self): - return "Check Order" - - def compute(self, data, state): - if state.should_reject_trades: - ts = int(time.time() * 100000) - return (OrderResult(data, state.last_bid, - state.last_offer, ts), - False) - return (None, False) +@wallaroo.state_computation(name="Check Order") +def check_order(data, state): + if state.should_reject_trades: + ts = int(time.time() * 100000) + return (OrderResult(data, state.last_bid, state.last_offer, ts), False) + return (None, False) class Order(object): @@ -127,38 +114,32 @@ def __init__(self, side, account, order_id, symbol, qty, price, self.transact_time = transact_time -class OrderDecoder(object): - def header_length(self): - return 4 - - def payload_length(self, bs): - return struct.unpack(">I", bs)[0] - - def decode(self, bs): - """ - 0 - 1b - FixType (U8) - 1 - 1b - side (U8) - 2 - 4b - account (U32) - 6 - 6b - order id (String) - 12 - 4b - symbol (String) - 16 - 8b - order qty (F64) - 24 - 8b - price (F64) - 32 - 21b - transact_time (String) - """ - order_type = struct.unpack(">B", bs[0:1])[0] - if order_type != FIXTYPE_ORDER: - raise MarketSpreadError("Wrong Fix message type. Did you connect " - "the senders the wrong way around?") - side = struct.unpack(">B", bs[1:2])[0] - account = struct.unpack(">I", bs[2:6])[0] - order_id = struct.unpack("6s", bs[6:12])[0] - symbol = struct.unpack("4s", bs[12:16])[0] - qty = struct.unpack(">d", bs[16:24])[0] - price = struct.unpack(">d", bs[24:32])[0] - transact_time = struct.unpack("21s", bs[32:53])[0] - - return Order(side, account, order_id, symbol, qty, price, - transact_time) +@wallaroo.decoder(header_length=4, length_fmt=">I") +def order_decoder(bs): + """ + 0 - 1b - FixType (U8) + 1 - 1b - side (U8) + 2 - 4b - account (U32) + 6 - 6b - order id (String) + 12 - 4b - symbol (String) + 16 - 8b - order qty (F64) + 24 - 8b - price (F64) + 32 - 21b - transact_time (String) + """ + order_type = struct.unpack(">B", bs[0:1])[0] + if order_type != FIXTYPE_ORDER: + raise MarketSpreadError("Wrong Fix message type. Did you connect " + "the senders the wrong way around?") + side = struct.unpack(">B", bs[1:2])[0] + account = struct.unpack(">I", bs[2:6])[0] + order_id = struct.unpack("6s", bs[6:12])[0] + symbol = struct.unpack("4s", bs[12:16])[0] + qty = struct.unpack(">d", bs[16:24])[0] + price = struct.unpack(">d", bs[24:32])[0] + transact_time = struct.unpack("21s", bs[32:53])[0] + + return Order(side, account, order_id, symbol, qty, price, + transact_time) class OrderResult(object): @@ -169,20 +150,20 @@ def __init__(self, order, last_bid, last_offer, timestamp): self.timestamp = timestamp -class OrderResultEncoder(object): - def encode(self, data): - p = struct.pack(">HI6s4sddddQ", - data.order.side, - data.order.account, - data.order.order_id, - data.order.symbol, - data.order.qty, - data.order.price, - data.bid, - data.offer, - data.timestamp) - out = struct.pack('>I{}s'.format(len(p)), len(p), p) - return out +@wallaroo.encoder +def order_result_encoder(data): + p = struct.pack(">HI6s4sddddQ", + data.order.side, + data.order.account, + data.order.order_id, + data.order.symbol, + data.order.qty, + data.order.price, + data.bid, + data.offer, + data.timestamp) + out = struct.pack('>I{}s'.format(len(p)), len(p), p) + return out class MarketDataMessage(object): @@ -194,44 +175,35 @@ def __init__(self, symbol, transact_time, bid, offer): self.mid = (bid + offer) / 2.0 -class MarketDataDecoder(object): - def header_length(self): - return 4 - - def payload_length(self, bs): - return struct.unpack(">I", bs)[0] - - def decode(self, bs): - """ - 0 - 1b - FixType (U8) - 1 - 4b - symbol (String) - 5 - 21b - transact_time (String) - 26 - 8b - bid_px (F64) - 34 - 8b - offer_px (F64) - """ - order_type = struct.unpack(">B", bs[0:1])[0] - if order_type != FIXTYPE_MARKET_DATA: - raise MarketSpreadError("Wrong Fix message type. Did you connect " - "the senders the wrong way around?") - symbol = struct.unpack(">4s", bs[1:5])[0] - transact_time = struct.unpack(">21s", bs[5:26])[0] - bid = struct.unpack(">d", bs[26:34])[0] - offer = struct.unpack(">d", bs[34:42])[0] - return MarketDataMessage(symbol, transact_time, bid, offer) - - -class UpdateMarketData(object): - def name(self): - return "Update Market Data" - - def compute(self, data, state): - offer_bid_difference = data.offer - data.bid - - should_reject_trades = ((offer_bid_difference >= 0.05) or - ((offer_bid_difference / data.mid) >= 0.05)) - - state.last_bid = data.bid - state.last_offer = data.offer - state.should_reject_trades = should_reject_trades - - return (None, True) +@wallaroo.decoder(header_length=4, length_fmt=">I") +def market_data_decoder(bs): + """ + 0 - 1b - FixType (U8) + 1 - 4b - symbol (String) + 5 - 21b - transact_time (String) + 26 - 8b - bid_px (F64) + 34 - 8b - offer_px (F64) + """ + order_type = struct.unpack(">B", bs[0:1])[0] + if order_type != FIXTYPE_MARKET_DATA: + raise MarketSpreadError("Wrong Fix message type. Did you connect " + "the senders the wrong way around?") + symbol = struct.unpack(">4s", bs[1:5])[0] + transact_time = struct.unpack(">21s", bs[5:26])[0] + bid = struct.unpack(">d", bs[26:34])[0] + offer = struct.unpack(">d", bs[34:42])[0] + return MarketDataMessage(symbol, transact_time, bid, offer) + + +@wallaroo.state_computation(name="Update Market Data") +def update_market_data(data, state): + offer_bid_difference = data.offer - data.bid + + should_reject_trades = ((offer_bid_difference >= 0.05) or + ((offer_bid_difference / data.mid) >= 0.05)) + + state.last_bid = data.bid + state.last_offer = data.offer + state.should_reject_trades = should_reject_trades + + return (None, True) diff --git a/examples/python/reverse/reverse.py b/examples/python/reverse/reverse.py index 299c228606..d568b36f91 100644 --- a/examples/python/reverse/reverse.py +++ b/examples/python/reverse/reverse.py @@ -24,37 +24,23 @@ def application_setup(args): ab = wallaroo.ApplicationBuilder("Reverse Word") ab.new_pipeline("reverse", - wallaroo.TCPSourceConfig(in_host, in_port, Decoder())) - ab.to(Reverse) - ab.to_sink(wallaroo.TCPSinkConfig(out_host, out_port, Encoder())) + wallaroo.TCPSourceConfig(in_host, in_port, decoder)) + ab.to(reverse) + ab.to_sink(wallaroo.TCPSinkConfig(out_host, out_port, encoder)) return ab.build() -class Decoder(object): - def header_length(self): - print "header_length" - return 4 +@wallaroo.decoder(header_length=4, length_fmt=">I") +def decoder(bs): + return bs.decode("utf-8") - def payload_length(self, bs): - print "payload_length", bs - return struct.unpack(">I", bs)[0] - def decode(self, bs): - print "decode", bs - return bs.decode("utf-8") +@wallaroo.computation(name="reverse") +def reverse(data): + return data[::-1] -class Reverse(object): - def name(self): - return "reverse" - - def compute(self, data): - print "compute", data - return data[::-1] - - -class Encoder(object): - def encode(self, data): - # data is a string - print "encode", data - return data + "\n" +@wallaroo.encoder +def encoder(data): + # data is a string + return data + "\n" diff --git a/examples/python/word_count/word_count.py b/examples/python/word_count/word_count.py index 2703f519e7..1818f1c6b6 100644 --- a/examples/python/word_count/word_count.py +++ b/examples/python/word_count/word_count.py @@ -27,41 +27,35 @@ def application_setup(args): ab = wallaroo.ApplicationBuilder("Word Count Application") ab.new_pipeline("Split and Count", - wallaroo.TCPSourceConfig(in_host, in_port, Decoder())) - ab.to_parallel(Split) - ab.to_state_partition(CountWord(), WordTotalsBuilder(), "word totals", - WordPartitionFunction(), word_partitions) - ab.to_sink(wallaroo.TCPSinkConfig(out_host, out_port, Encoder())) + wallaroo.TCPSourceConfig(in_host, in_port, decoder)) + ab.to_parallel(split) + ab.to_state_partition(count_word, WordTotals, "word totals", + partition, word_partitions) + ab.to_sink(wallaroo.TCPSinkConfig(out_host, out_port, encoder)) return ab.build() +@wallaroo.computation_multi(name="split into words") +def split(data): + punctuation = " !\"#$%&'()*+,-./:;<=>?@[\]^_`{|}~" -class Split(object): - def name(self): - return "split into words" + words = [] - def compute_multi(self, data): - punctuation = " !\"#$%&'()*+,-./:;<=>?@[\]^_`{|}~" + for line in data.split("\n"): + clean_line = line.lower().strip(punctuation) + for word in clean_line.split(' '): + clean_word = word.strip(punctuation) + words.append(clean_word) - words = [] + return words - for line in data.split("\n"): - clean_line = line.lower().strip(punctuation) - for word in clean_line.split(' '): - clean_word = word.strip(punctuation) - words.append(clean_word) - return words - - -class CountWord(object): - def name(self): - return "Count Word" - - def compute(self, word, word_totals): - word_totals.update(word) - return (word_totals.get_count(word), True) +@wallaroo.state_computation(name="Count Word") +def count_word(word, word_totals): + word_totals.update(word) + return (word_totals.get_count(word), True) +@wallaroo.state class WordTotals(object): def __init__(self): self.word_totals = {} @@ -82,30 +76,19 @@ def __init__(self, word, count): self.count = count -class WordTotalsBuilder(object): - def build(self): - return WordTotals() - - -class WordPartitionFunction(object): - def partition(self, data): - if data[0] >= 'a' and data[0] <= 'z': - return data[0] - else: - return "!" - - -class Decoder(object): - def header_length(self): - return 4 +@wallaroo.partition +def partition(data): + if data[0] >= 'a' or data[0] <= 'z': + return data[0] + else: + return "!" - def payload_length(self, bs): - return struct.unpack(">I", bs)[0] - def decode(self, bs): - return bs.decode("utf-8") +@wallaroo.decoder(header_length=4, length_fmt=">I") +def decoder(bs): + return bs.decode("utf-8") -class Encoder(object): - def encode(self, data): - return data.word + " => " + str(data.count) + "\n" +@wallaroo.encoder +def encoder(data): + return data.word + " => " + str(data.count) + "\n" diff --git a/machida/wallaroo.py b/machida/wallaroo.py index b98dafe072..cd762c99fc 100644 --- a/machida/wallaroo.py +++ b/machida/wallaroo.py @@ -14,9 +14,10 @@ import argparse +from functools import wraps import inspect import pickle - +import struct def serialize(o): return pickle.dumps(o) @@ -47,9 +48,9 @@ def to(self, computation): return self def to_parallel(self, computation): - if not inspect.isclass(computation): - raise WallarooParameterError("Expecting a Computation class. Got " - "an instance instead.") + if inspect.isclass(computation): + raise WallarooParameterError("Expecting a Computation Builder " + "instance. Got a class instead.") self._actions.append(("to_parallel", computation)) return self @@ -57,9 +58,9 @@ def to_stateful(self, computation, state_builder, state_name): if inspect.isclass(computation): raise WallarooParameterError("Expecting a Computation Builder " "instance. Got a class instead.") - if inspect.isclass(state_builder): - raise WallarooParameterError("Expecting a State Builder " - "instance. Got a class instead.") + if not inspect.isclass(state_builder): + raise WallarooParameterError("Expecting a State class." + "Got an instance instead.") self._actions.append(("to_stateful", computation, state_builder, state_name)) return self @@ -69,9 +70,9 @@ def to_state_partition_u64(self, computation, state_builder, state_name, if inspect.isclass(computation): raise WallarooParameterError("Expecting a Computation Builder " "instance. Got a class instead.") - if inspect.isclass(state_builder): - raise WallarooParameterError("Expecting a State Builder " - "instance. Got a class instead.") + if not inspect.isclass(state_builder): + raise WallarooParameterError("Expecting a State class." + "Got an instance instead.") self._actions.append(("to_state_partition_u64", computation, state_builder, state_name, partition_function, partition_keys)) @@ -82,9 +83,9 @@ def to_state_partition(self, computation, state_builder, state_name, if inspect.isclass(computation): raise WallarooParameterError("Expecting a Computation Builder " "instance. Got a class instead.") - if inspect.isclass(state_builder): - raise WallarooParameterError("Expecting a State Builder " - "instance. Got a class instead.") + if not inspect.isclass(state_builder): + raise WallarooParameterError("Expecting a State class." + "Got an instance instead.") if not isinstance(partition_keys, list): raise WallarooParameterError("Expecting a partition_keys list. " "Got a {} instead.".format( @@ -105,6 +106,100 @@ def build(self): return self._actions +def computation(name): + def wrapped(computation_function): + @wraps(computation_function) + class C: + def name(self): + return name + def compute(self, data): + return computation_function(data) + + return C + + return wrapped + + +def state_computation(name): + def wrapped(computation_function): + @wraps(computation_function) + class C: + def name(self): + return name + def compute(self, data, state): + return computation_function(data, state) + + return C() + + return wrapped + + +def computation_multi(name): + def wrapped(computation_function): + @wraps(computation_function) + class C: + def name(self): + return name + def compute_multi(self, data): + return computation_function(data) + + return C + + return wrapped + + +def state_computation_multi(name): + def wrapped(computation_function): + @wraps(computation_function) + class C: + def name(self): + return name + def compute_multi(self, data, state): + return computation_function(data, state) + + return C + + return wrapped + + +def state(clz): + clz.build = classmethod(lambda(c): clz()) + return clz + + +def partition(fn): + @wraps(fn) + class C: + def partition(self, data): + return fn(data) + return C() + + +def decoder(header_length, length_fmt): + def wrapped(decoder_function): + @wraps(decoder_function) + class C: + def header_length(self): + return header_length + def payload_length(self, bs): + return struct.unpack(length_fmt, bs)[0] + def decode(self, bs): + return decoder_function(bs) + + return C() + + return wrapped + + +def encoder(encoder_function): + class C: + @wraps(encoder_function) + def encode(self, data): + return encoder_function(data) + + return C() + + class TCPSourceConfig(object): def __init__(self, host, port, decoder): self._host = host