Skip to content

Programming Guide

Yuzhen Huang edited this page Apr 25, 2019 · 7 revisions

Overview

At a high level, a Tangram application contains a user program written in C++, a Master program and a cluster of Worker program. To create a user application, a user writes a user program with the high-level Tangram library, compiles and links the user code with the worker program. The user program can then be submitted by a python launching script which will launch the Master and a cluster of Workers. The Master analyzes the DAG of MapUpdate plans and coordinates with the Workers to finish the program.

Collection

The main data abstraction Tangram provides are collection, which is a group of element partitioned across the nodes of the cluster that can be operated on in parallel. The collection in Tangram corresponds to the resilient distributed dataset (RDD) in Spark but the collection in Tangram can be mutable in a MapUpdate plan, which offers more generality.

Creating Collections

Collections in Tangram are created by starting with a file or a folder from the Hadoop file system, or a vector of element from the C++ user program. Besides, a placeholder collection (empty) can also be created.

// 1. Load a file from HDFS as a Tangram collection with each line being an element.
// User needs to provide a parse function to determine how 
// a line should be parsed into an object in the collection.
// The collection is of type Collection<String>.
auto c1 = Context::load("/path/to/file/in/hdfs", [](const std::string) { return s; });

// 2. Distribute a C++ vector to a Tangram collection with 2 partitions.
auto c2 = Context::distribute(
    std::vector<std::string>{"b", "a", "n", "a", "n", "a"}, 2);

// 3. Create a placeholder collection of type ObjT with 10 partitions.
auto c3 = Context::placeholder<ObjT>(10);

User-defined Type in Collection

Users can define their own object type to be stored in collection. For example, we can define a Vertex type with id and outlinks as internal fields:

struct Vertex {
  using KeyT = int;

  Vertex() = default;
  Vertex(KeyT id) : id(id) {}

  KeyT Key() const { return id; }

  // fields
  KeyT id;
  std::vector<int> outlinks;

  // serialization functions
  friend SArrayBinStream &operator<<(xyz::SArrayBinStream &stream,
                                     const Vertex &vertex) {
    stream << vertex.id << vertex.outlinks;
    return stream;
  }
  friend SArrayBinStream &operator>>(xyz::SArrayBinStream &stream,
                                     Vertex &vertex) {
    stream >> vertex.id >> vertex.outlinks;
    return stream;
  }
};

A few things that are worthy to mention:

  1. As we want the Vertex collection to be indexable (i.e., one can find a specific vertex by its key), the user-defined type should have a KeyT specifying the key type, and a Key() function which can return the key (i.e., the unique name of the object).
  2. We need to provide a constructor taking a key as the argument. This is because with this constructor, Tangram can automatically create a Vertex object if a given key does not exist.
  3. We need to provide the serialization/deserialization function so that the Vertex can be serialize/deserialize to/from disks or remote machines.

MapUpdate

MapUpdate is the main API in Tangram. It is similar but more powerful than MapReduce in several important aspects:

  1. Mutable. While map in MapUpdate is functional (similar to map in MapReduce), update allows for asynchronous in-place modification to the stateful collection. In MapUpdate, the update collection is mutable, and other collections, if different from the update collection, are considered immutable.

  2. Side-Input. MapUpdate allows the side-input collections to be specified explicitly and access to the side-input collections is fine-grained, which improves efficiency in many applications such as machine learning and graph analytics.

  3. Iterative and Asynchronous. MapUpdate provides support for iteration and consistency protocols including BSP, SSP and ASP (configurable by setStaleness).

Let's use word count as an example.

Next, we will conduct a MapUpdate plan for the word count.

Context::mapupdate(lines, wordcount, 
    [](std::string line, output<std::string, int>* o) {
      boost::char_separator<char> sep(" \t\n");
      boost::tokenizer<boost::char_separator<char>> tok(line, sep);
      for (auto& w : tok) {
        o->Add(w, 1);
      }
    },
    [](WC* wc, int c) {
      wc->count += c;
    });

Suppose we have two collections. The lines collection store the lines of the document. The wordcount collection is empty and will store the final word/count key/value pairs.

MapUpdate requires 4 parameters. The first two are the map collection and the update collection. Here, the map collection is lines and the update collection is wordcount. The next two parameters are the map function and the update function respectively.

The map function takes two parameters: the first one is the element and the second one is an Output object where our output key/value pairs will emit to. In the map function, we split the lines and for each word, we emit a <word, 1> pair to the Output object. The update function takes two parameters: the first one is a pointer to the WC object, and the second one is the emitted value corresponding to the key in the map function. In the update function, we simply accumulate the count of the word by c.

Tangram applies the map function to each object/element in the map function. It shuffles the output of the map function and groups the key/value pairs by key. The shuffled key/value pair will be applied to the corresponding update collection by invoking the update function.

Configurations

The MapUpdate can be configured with a number of configurations.

Combiner

A combine function can be set as follows:

->SetCombine([](int *a, int b) { return *a += b; })

Iteration

A MapUpdate plan can be executed iteratively. An iteration number can be set.

->SetIter(5)

Stateness

For iterative MapUpdate, a stateness parameter specifying how stale a partition is allowed can be set.

->SetStaleness(0)

Checkpoint Interval

For iterative MapUpdate, a checkpoint interval can be set.

->SetCheckpointInterval(5, "/path/to/store/checkpoint")

MapUpdate with Partition

Similar to mapPartitions in Spark, Tangram also offer mappartupdate which is a partition version of mapupdate.

  Context::mappartupdate(
      c1, c2,
      [](TypedPartition<std::string> *p, Output<std::string, int> *o) {
        for (auto &elem : *p) {
          o->Add(elem, 1);
        }
      },
      [](ObjT *obj, int m) {
        obj->b += m;
      });

In this example, user can access a whole partition of the data through TypedPartition<std::string> *p.

Side-Input

The side-input collection can be used in mappartwithupdate which takes additional side-input collection.

  Context::mappartwithupdate(
      c1, c2, c2,
      [](TypedPartition<std::string> *p, TypedCache<ObjT> *typed_cache,
         Output<std::string, int> *o) {
        // Access model from TypedCache<ObjT> *typed_cache
        for (int part_id = 0; part_id < 10; ++part_id) {
          auto part = typed_cache->GetPartition(part_id);
          auto *with_p = static_cast<TypedPartition<ObjT> *>(part.get());
          LOG(INFO) << "partition: " << part_id
                    << ", size: " << with_p->GetSize();
          for (auto &elem : *with_p) {
            LOG(INFO) << "elem: " << elem.a << " " << elem.b;
          }
          typed_cache->ReleasePart(part_id);
        }
        for (auto &elem : *p) {
          o->Add(elem, 1);
        }
      },
      [](ObjT *obj, int m) {
        obj->b += m;
      })
      ->SetIter(10)
      ->SetStaleness(0)
      ->SetCheckpointInterval(5);

In the above example, the side-input collection and the update collection are the same collection (c2).

For more details, please refer to the LR example or PageRank example.

Dataflow

Users can define multiple MapUpdate plan in their applications. The dependency will be solved by Tangram automatically. if two MapUpdate plans do not conflict, i.e., they are not updating the same collection, can run simultaneously.

API

Tangram provides high-level dataflow-like APIs in C++.

The signature of the API can be found in wiki/API.

The complete Tangram library API can be found in core/plan/context.hpp.