-
Notifications
You must be signed in to change notification settings - Fork 54
Getting Started for Cascading Users
In Cascading, input data are read from source taps, and the outputs of the flow are persisted using sink taps. Typically, you specify the sources and sinks while connecting the flow together.
Flow connector = new HadoopFlowConnector().connect(sourceTap, sinkTap, pipe);
In Pigpen, you use load and store functions. For example:
(require '[pigpen.core :as pig])
(->>
(pig/load-tsv "input.tsv")
; do stuff ...
(pig/store-tsv "output.tsv"))
Several load and store functions are available out of the box, but you can also use raw taps directly.
(require '[pigpen.cascading :as cascading])
(import '[cascading.tap.hadoop Hfs]
'[cascading.scheme.hadoop TextLine])
(let [source-tap (Hfs. (TextLine.) "input_path")
sink-tap (Hfs. (TextLine.) "output_path")]
(->> (cascading/load-tap source-tap)
; do stuff ...
(cascading/store-tap sink-tap)))
Cascading operations act on tuples with an arbitrary number of fields. As you build flows, you need to keep track of the input and output fields of each step.
PigPen uses a single field. If you need more than one value, you can use a collection type such as vector or map. During groupings and joins, extra fields are implicitly added by bundling the keys and values of the grouping/join in a vector. This will become more clear in the specific examples for GroupBy and CoGroup below.
Suppose you want to implement the following function:
(fn [x] (* x x))
In Cascading, you would create a class that implements Function:
public class Square extends BaseOperation implements Function {
public Square() {
super(1, new Fields("output_field"));
}
@Override
public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
int x = functionCall.getArguments().getInteger(0);
int result = x * x;
functionCall.getOutputCollector().add(new Tuple(result));
}
}
And then use it in your flow:
Pipe pipe = new Each(previous, new Fields("input_field"), new Square(), new Fields("output_field"));
In PigPen, you use map:
;; mock some data
(def previous (pig/return [1 2 3]))
(pig/map (fn [x] (* x x))
previous)
Cascading doesn't have a flatten operation. Instead, it gives you the freedom to emit whatever you want from a function:
class Flatten extends BaseOperation implements Function {
public Flatten() {
super(1, new Fields("output_field"));
}
@Override
public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
List<Integer> input = (List<Integer>)functionCall.getArguments().getObject(0);
for (Integer result : input) {
functionCall.getOutputCollector().add(new Tuple(result));
}
}
}
In PigPen, you would use mapcat to achieve the same:
(pig/mapcat identity previous)
Just like with Function, Cascading requires you to write a custom class to implement a filter:
class MyFilter extends BaseOperation implements Filter {
@Override
public boolean isRemove(FlowProcess flowProcess, FilterCall filterCall) {
Map<String, Integer> input = (Map) filterCall.getArguments().getObject(0);
Integer val = input.get("a");
return !(val != null && val == 1);
}
}
In PigPen, you use filter and a function that returns true for all values you want to keep.
(pig/filter (fn [{:keys [a]}] (= a 1)) previous)
Notice that the logic is the opposite of what goes into isRemove in the cascading version, which returns true for values you want to discard. See also pig/remove
.
Cascading's GroupBy is used to group tuple streams by certain group fields, and is usually followed by an Every pipe to perform some aggregation on the groups. For the classic word count example, it would look something like this:
RegexSplitGenerator splitter = new RegexSplitGenerator(new Fields("token"), "[ \\[\\]\\(\\),.]");
Pipe pipe = new Each( "token", new Fields("text"), splitter, Fields.RESULTS);
pipe = new GroupBy(pipe, new Fields("token"));
pipe = new Every(pipe, Fields.ALL, new Count(), Fields.ALL);
The PigPen version is similar:
(->> lines
(pig/mapcat #(clojure.string/split % #"[ \[\]\(\),.]"))
(pig/group-by identity)
(pig/map (fn [[word occurrences]] [word (count occurrences)])))
The main differences are:
- The key for the group is not a separate pre-existing field in the tuple stream, but the result of applying a function to values from the stream. In the example above, the key is the same as the value, so the identity function is used.
- The function following a group-by receives vectors with key and group. In the example above the key is a word, and the group is a collection of occurrences of that word.
Cascading uses CoGroup to join two or more tuple streams via an optional Joiner. As with GroupBy, a CoGroup is typically followed by an Every pipe to perform some aggregation.
Pipe join = new CoGroup(lhsPipe, new Fields("lhs_group_field"), rhsPipe, new Fields("rhs_group_field"), new InnerJoin());
join = new Every(join, new MyAggregator(), Fields.RESULTS);
Whenever a joiner is specified, a single iterator is exposed to the aggregator function representing the cross product of each of the input streams. If you need to have access to each input stream separately, you have to use the special BufferJoin in conjunction with Fields.NONE, and get the iterators from a JoinerClosure. This is rather cumbersome and efforts have been made to provide a friendlier interface for this use case.
PigPen provides separate commands for the two use cases: join and co-group. The join command works like Cascading's CoGroup using a regular Joiner.
(pig/join [(lhs :on f)
(rhs :on g :type :optional)]
(fn [x y] [x y])))
A function (f and g above) is specified as key selector for each of the input streams. You can achieve the effects of left, right, inner and outer joins by specifying each stream's type as either optional or required. By default, join treats all inputs as required (inner join).
The second use case is covered by co-group:
(pig/cogroup [(left :on :a)
(right :on :c)]
(fn [k l r] [k (map :b l) (map :d r)]))
Here, key selectors work the same as in join, but the user function is given a sequence for each of the input streams. In the example, r is a sequence containing all values of the right stream for a given key.
Cascading's Every pipe, which follows GroupBy or CoGroup as seen above, takes a custom aggregator. Like Function and Filter, this requires you to create a new class that implements a specific interface. There are two choices: Buffer and Aggregator.
Buffer is a flexible interface that lets the user iterate over the streams directly. The downside is that only one Every pipe using a Buffer can follow a GroupBy/CoGroup.
Aggregator is more restrictive, imposing a Clojure reduce style of operation where the user logic has access to a single value from the stream and an accumulator. The upside is that several aggregators can be chained together after a GroupBy/CoGroup.
The approach followed by PigPen more closely resembles Buffer. A co-group expects a single user-defined function that will produce all necessary aggregations given the group key and a sequence for each input stream.
Cascading doesn't support native Hadoop combiners, but offers partial aggregation instead. The difference is that values are aggregated in memory, avoiding the extra cost of serialization, deserialization, and sorting, but providing no guarantee that a mapper will fully aggregate all values for a given key, so more data may end up going to the reducers.
If you want to use partial aggregation with Cascading, you have to embed your logic in a class that implements AggregateBy. This obviates the need to use an Aggregator or Buffer, since both map-side and reduce-side aggregation is taken care by AggregateBy.
In PigPen, partial aggregation is used behind the scenes when specifying a fold. This can be done by either using the top level fold command:
(require '[pigpen.fold :as fold])
; count is already defined in fold/count, but it is shown here for illustration purposes
(def fold-count (fold/fold-fn + (fn [acc _] (inc acc))))
(pig/fold fold-count previous)
or by specifying a fold inside a co-group:
;; define some mock data
(def foo (pig/return [{:a 1} {:a 1} {:a 2}]))
(def bar (pig/return [{:b 1} {:b 2} {:b 2}]))
(pig/cogroup [(foo :on :a)
(bar :on :b, :type :required, :fold (fold/count))]
(fn [key foos bar-count]
{:key key, :foos foos, :bars bar-count}))
In either case, you specify a fold function, which takes a reduce function and a combine function. The reduce function produces partial aggregations on the map side, while the combine function produces the final value from the partial aggregates. In the example, count is achieved by counting on the map side, and summing the counts on the reduce side.
Read more about folding data here.
Once you have a PigPen flow defined, getting the corresponding Cascading flow is very easy.
(def flow
(->>
(pig/load-tsv "input.tsv")
; do stuff ...
(pig/store-tsv "output.tsv")))
(def cascading-flow (pigpen.cascading/generate-flow flow))
This will use HadoopFlowConnector by default, but you can specify your own connector.
(import '[cascading.flow.hadoop HadoopFlowConnector])
(def my-connector (HadoopFlowConnector.)) ;; use whatever connector you like here
(def cascading-flow (pigpen.cascading/generate-flow my-connector flow))
The resulting Cascading flow can be invoked the same way any regular Cascading flow.
(.complete cascading-flow)
The easiest way to run your PigPen flow in production is to create a namespace with a main method, and bundle it in an uberjar (or whatever the equivalent is in your build system if not using lein).
(ns pigpen.example
(:gen-class :name pigpen.Example)
(:require [pigpen.core :as pig]
[pigpen.cascading :as pc]))
(defn get-cascading-flow []
;; call your fn here ...
)
(defn -main [& args]
(.complete (get-cascading-flow)))
Then you deploy your uberjar to your hadoop cluster, and invoke the main class.
hadoop jar myproject.jar pigpen.Example