-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Tour of Beam] Learning content for "Introduction" module #23085
Changes from 33 commits
6a581d0
4cabcb7
0aa2e28
52ab6a9
e081553
2d40ee7
7a07f51
4fd5735
7ad04d6
f7eb966
9204431
af66a8a
5322fe1
11e2cf8
dde4bf4
b270bbd
12fe2f3
f1478c8
89f74ad
53d6cec
0cef6f5
0e411cf
a94cc39
5554db4
6e1f374
5ff346a
4478c7a
6debbed
7e929cf
934b205
f8e9e03
7444a90
33897ca
b4af54d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
# | ||
|
||
sdk: Go | ||
content: | ||
- introduction |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
<!-- | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
--> | ||
### Creating PCollection | ||
|
||
Now that you know how to create a Beam pipeline and pass parameters into it, it is time to learn how to create an initial `PCollection` and fill it with data. | ||
|
||
There are several options: | ||
|
||
→ You can create a PCollection of data stored in an in-memory collection class in your driver program. | ||
|
||
→ You can also read the data from a variety of external sources such as local or cloud-based files, databases, or other sources using Beam-provided I/O adapters | ||
|
||
Through the tour, most of the examples use either a `PCollection` created from in-memory data or data read from one of the cloud buckets "beam-examples" or "dataflow-samples". These buckets contain sample data sets specifically created for educational purposes. | ||
|
||
We encourage you to take a look, explore these data sets and use them while learning Apache Beam. | ||
|
||
### Creating a PCollection from in-memory data | ||
|
||
You can use the Beam-provided Create transform to create a `PCollection` from an in-memory Go Collection. You can apply Create transform directly to your Pipeline object itself. | ||
|
||
The following example code shows how to do this: | ||
|
||
``` | ||
func main() { | ||
ctx := context.Background() | ||
|
||
// First create pipeline | ||
p, s := beam.NewPipelineWithRoot() | ||
|
||
//Now create the PCollection using list of strings | ||
strings := beam.Create(s, "To", "be", "or", "not", "to", "be","that", "is", "the", "question") | ||
|
||
//Create a numerical PCollection | ||
numbers := beam.Create(s, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) | ||
|
||
} | ||
``` | ||
|
||
### Playground exercise | ||
|
||
You can find the complete code of this example in the playground window you can run and experiment with. | ||
|
||
One of the differences you will notice is that it also contains the part to output `PCollection` elements to the console. Don’t worry if you don’t quite understand it, as the concept of `ParDo` transform will be explained later in the course. Feel free, however, to use it in exercises and challenges to explore results. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One difference you will notice is that it also contains a function to output |
||
|
||
Do you also notice in what order elements of PCollection appear in the console? Why is that? You can also run the example several times to see if the output stays the same or changes. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you also notice the order elements of the PCollection appear in the console? Why is that? You can also run the example several times to see if the output stays the same or changes. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one or more | ||
// contributor license agreements. See the NOTICE file distributed with | ||
// this work for additional information regarding copyright ownership. | ||
// The ASF licenses this file to You under the Apache License, Version 2.0 | ||
// (the "License"); you may not use this file except in compliance with | ||
// the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
// beam-playground: | ||
// name: ParDo | ||
// description: ParDo example. | ||
// multifile: false | ||
// context_line: 32 | ||
// categories: | ||
// - Quickstart | ||
// complexity: BASIC | ||
// tags: | ||
// - hellobeam | ||
|
||
package main | ||
|
||
import ( | ||
"context" | ||
"github.com/apache/beam/sdks/v2/go/pkg/beam" | ||
"github.com/apache/beam/sdks/v2/go/pkg/beam/log" | ||
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" | ||
"fmt" | ||
) | ||
|
||
func main() { | ||
p, s := beam.NewPipelineWithRoot() | ||
|
||
words := beam.Create(s, "Hello", "world", "it`s", "Beam") | ||
|
||
output(s, words) | ||
|
||
err := beamx.Run(context.Background(), p) | ||
if err != nil { | ||
log.Exitf(context.Background(), "Failed to execute job: %v", err) | ||
} | ||
} | ||
|
||
func output(s beam.Scope, input beam.PCollection) { | ||
beam.ParDo0(s, func(element interface{}) { | ||
fmt.Println(element) | ||
}, input) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
# | ||
|
||
id: from-memory | ||
name: Creating in-memory PCollections | ||
taskName: ParDo |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
# | ||
|
||
id: creating-collections | ||
name: Creating Collections | ||
content: | ||
- from-memory | ||
- reading-from-text | ||
- reading-from-csv |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
<!-- | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
--> | ||
|
||
### Read from csv file | ||
|
||
Data processing pipelines often work with tabular data. In many examples and challenges throughout the course, you’ll be working with one of the datasets stored as csv files in either beam-examples, dataflow-samples buckets. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as csv files in the "beam-examples" or "dataflow-samples" buckets. |
||
|
||
Loading data from csv file requires some processing and consists of two main part: | ||
* Loading text lines using `TextIO.Read` transform | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Loading data from csv file takes two steps:
|
||
* Parsing lines of text into tabular format | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Parse |
||
|
||
### Playground exercise | ||
|
||
Try to experiment with an example in the playground window and modify the code to process other fields from New York taxi rides dataset. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Try to experiment with an example in the playground window and modify the code to process other fields from the New York taxi rides dataset. |
||
|
||
Here is a small list of fields and an example record from this dataset: | ||
|
||
| cost | passenger_count | ... | | ||
|------|-----------------|-----| | ||
| 5.8 | 1 | ... | | ||
| 4.6 | 2 | ... | | ||
| 24 | 1 | ... | | ||
|
||
Overview [file](https://storage.googleapis.com/apache-beam-samples/nyc_taxi/misc/sample1000.csv) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
// beam-playground: | ||
// name: CSV | ||
// description: CSV example. | ||
// multifile: false | ||
// context_line: 44 | ||
// categories: | ||
// - Quickstart | ||
// complexity: BASIC | ||
// tags: | ||
// - hellobeam | ||
|
||
package main | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"github.com/apache/beam/sdks/v2/go/pkg/beam" | ||
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio" | ||
"github.com/apache/beam/sdks/v2/go/pkg/beam/log" | ||
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" | ||
"strconv" | ||
"strings" | ||
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/top" | ||
|
||
) | ||
|
||
func less(a, b float64) bool{ | ||
return a>b | ||
} | ||
|
||
func main() { | ||
p, s := beam.NewPipelineWithRoot() | ||
|
||
file := Read(s, "gs://apache-beam-samples/nyc_taxi/misc/sample1000.csv") | ||
|
||
cost := applyTransform(s, file) | ||
|
||
fixedSizeElements := top.Largest(s,cost,10,less) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add a comment explaining this |
||
|
||
output(s, "Total cost: ", fixedSizeElements) | ||
|
||
err := beamx.Run(context.Background(), p) | ||
if err != nil { | ||
log.Exitf(context.Background(), "Failed to execute job: %v", err) | ||
} | ||
} | ||
|
||
// Read reads from fiename(s) specified by a glob string and a returns a PCollection<string>. | ||
func Read(s beam.Scope, glob string) beam.PCollection { | ||
return textio.Read(s, glob) | ||
} | ||
|
||
// ApplyTransform converts to uppercase all elements in a PCollection<string>. | ||
func applyTransform(s beam.Scope, input beam.PCollection) beam.PCollection { | ||
return beam.ParDo(s, func(line string) float64 { | ||
taxi := strings.Split(strings.TrimSpace(line), ",") | ||
if len(taxi) > 16 { | ||
cost, _ := strconv.ParseFloat(taxi[16],64) | ||
return cost | ||
} | ||
return 0.0 | ||
}, input) | ||
} | ||
|
||
func output(s beam.Scope, prefix string, input beam.PCollection) { | ||
beam.ParDo0(s, func(elements []float64) { | ||
for _, element := range elements { | ||
fmt.Println(prefix,element) | ||
} | ||
}, input) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
# | ||
|
||
id: from-csv | ||
name: Creating PCollections from csv files | ||
taskName: CSV |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
<!-- | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
--> | ||
### Reading from text file | ||
|
||
You use one of the Beam-provided I/O adapters to read from an external source. The adapters vary in their exact usage, but all of them read from some external data source and return a `PCollection` whose elements represent the data records in that source. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can use one of the Beam-provided I/O adapters to read from an external source. The adapters vary in their exact usage, but all of them read from some external data source and return a |
||
|
||
Each data source adapter has a Read transform; to read, you must apply that transform to the Pipeline object itself. | ||
|
||
`TextIO.Read` , for example, reads from an external text file and returns a `PCollection` whose elements are of type String. Each String represents one line from the text file. Here’s how you would apply `TextIO.Read` to your Pipeline to create a `PCollection`: | ||
|
||
``` | ||
func main() { | ||
ctx := context.Background() | ||
|
||
// First create pipline | ||
p, s := beam.NewPipelineWithRoot() | ||
|
||
// Now create the PCollection by reading text files. Separate elements will be added for each line in the input file | ||
lines := textio.Read(scope, 'gs://some/inputData.txt') | ||
|
||
} | ||
``` | ||
|
||
### Playground exercise | ||
|
||
In the playground window, you can find an example that reads a king lear poem from the text file stored in the Google Storage bucket and fills PCollection with individual lines and then with individual words. Try it out and see what the output is. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the playground window, you can find an example that reads the Shakespeare play King Lear from the text file stored in the Google Storage bucket and fills PCollection with individual lines and then with individual words. Try it out and see what the output is. |
||
|
||
One of the differences you will see is that the output is much shorter than the input file itself. This is because the number of elements in the output `PCollection` is limited with the `top.Largest(s,lines,10,less)` transform. Use Sample.fixedSizeGlobally transform of is another technique you can use to troubleshoot and limit the output sent to the console for debugging purposes in case of large input datasets. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One of the differences you will see is that the output is much shorter than the input file itself. This is because the number of elements in the output |
||
|
||
Overview [file](https://storage.googleapis.com/apache-beam-samples/shakespeare/kinglear.txt) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can find the complete code of this example in the playground window where you can run the pipeline and experiment with it.