Skip to content

Commit 7faee41

Browse files
ADD Code
1 parent f5bc356 commit 7faee41

File tree

5 files changed

+456
-0
lines changed

5 files changed

+456
-0
lines changed
Binary file not shown.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
# Databricks notebook source
2+
# Example RDD
3+
data = [1, 2, 3, 4, 5]
4+
rdd = sc.parallelize(data)
5+
6+
# COMMAND ----------
7+
8+
# MAGIC %md
9+
# MAGIC # Transformations
10+
11+
# COMMAND ----------
12+
13+
# MAGIC %md
14+
# MAGIC ## `Map` Function
15+
16+
# COMMAND ----------
17+
18+
# 1. map
19+
print("### 1. map ###")
20+
print("Description: Return a new RDD by applying a function to all elements of this RDD.")
21+
22+
# Example 1: Multiply each element by 2
23+
simple_map = rdd.map(lambda x: x * 2).collect()
24+
print("01 map example (multiply by 2):", simple_map)
25+
26+
# Example 2: Extract the length of each word in a list of sentences
27+
sentences = ["Hello world", "Apache Spark", "RDD transformations Wide Vs Narrow Spark"]
28+
# Hello World => split (" ") => [(0)-> Hello, (1) -> World]
29+
sentence_rdd = sc.parallelize(sentences)
30+
words_map = sentence_rdd.map(lambda sentence: len(sentence.split(" "))).collect()
31+
print("example_map example (word count in sentences):", words_map)
32+
33+
# COMMAND ----------
34+
35+
# MAGIC %md
36+
# MAGIC ## `Filter` Function
37+
38+
# COMMAND ----------
39+
40+
# 2. filter
41+
print("\n### 2. filter ###")
42+
print("Description: Return a new RDD containing only the elements that satisfy a predicate.")
43+
44+
# 01 Example: Filter out even numbers
45+
simple_filter = rdd.filter(lambda x: x % 2 == 0).collect()
46+
print("01 filter example (even numbers):", simple_filter)
47+
48+
# example_Example: Filter sentences containing the word 'Spark'
49+
words_filter = sentence_rdd.filter(lambda sentence: "Spark" in sentence).collect()
50+
print("example_ filter example (sentences with 'Spark'):", words_filter)
51+
52+
# COMMAND ----------
53+
54+
# MAGIC %md
55+
# MAGIC ## `FlatMap` Function
56+
57+
# COMMAND ----------
58+
59+
# 3. flatMap
60+
print("\n### 3. flatMap ###")
61+
print("Description: Return a new RDD by applying a function to all elements of this RDD and then flattening the results.")
62+
63+
# 01 Example: Split sentences into words
64+
sentences_mapped = sentence_rdd.map(lambda sentence: sentence.split(" ")).collect()
65+
print("01 sentences_mapped:", sentences_mapped)
66+
67+
simple_flatMap = sentence_rdd.flatMap(lambda sentence: sentence.split(" ")).collect()
68+
print("02 flatMap example (split sentences into words):", simple_flatMap)
69+
70+
# example_Example: Flatten a list of lists
71+
nested_lists = [[1, 2, 3], [4, 5], [6, 7, 8, 9]]
72+
nested_rdd = sc.parallelize(nested_lists)
73+
flatten_list = nested_rdd.flatMap(lambda x: x).collect()
74+
print("flatten_list flatMap example (flatten list of lists):", flatten_list)
75+
76+
# COMMAND ----------
77+
78+
# MAGIC %md
79+
# MAGIC ## `Reduce` Function
80+
81+
# COMMAND ----------
82+
83+
# 4. reduce
84+
print("\n### 4. reduce ###")
85+
print("Description: Reduces the elements of this RDD using the specified commutative and associative binary operator.")
86+
87+
# 01 Example: Sum of elements
88+
simple_reduce = rdd.reduce(lambda x, y: x + y)
89+
print("01 reduce example (sum of elements):", simple_reduce)
90+
91+
# example_Example: Find the longest word in a list of words
92+
words = ["cat", "elephant", "rat", "hippopotamus"]
93+
words_rdd = sc.parallelize(words)
94+
words_rdd_reduced = words_rdd.reduce(lambda x, y: x if len(x) > len(y) else y)
95+
print("reduce example (longest word):", words_rdd_reduced)
96+
97+
# COMMAND ----------
98+
99+
# MAGIC %md
100+
# MAGIC ## `groupByKey` Function
101+
102+
# COMMAND ----------
103+
104+
# 5. groupByKey
105+
print("\n### 5. groupByKey ###")
106+
print("Description: Group the values for each key in the RDD into a single sequence.")
107+
108+
# 01 Example: Group numbers by even and odd
109+
pairs = [(1, 'a'),(1, 'ali'), (2, 'b'), (3, 'c'), (4, 'd'), (5, 'e')]
110+
pairs_rdd = sc.parallelize(pairs)
111+
simple_groupByKey = pairs_rdd.groupByKey().mapValues(list).collect()
112+
print("01 groupByKey example (group numbers):", simple_groupByKey)
113+
114+
# example_Example: Group words by their starting letter
115+
words_pairs = [("cat", 1), ("car", 2), ("dog", 3), ("deer", 4), ("elephant", 5),("elephant", 20)]
116+
words_rdd = sc.parallelize(words_pairs)
117+
# mapValues(list) converts the grouped values (which are iterable) into lists.
118+
words_grouped = words_rdd.groupByKey().mapValues(list).collect()
119+
print("words_grouped example (group words by starting letter):", words_grouped)
120+
121+
# COMMAND ----------
122+
123+
# MAGIC %md
124+
# MAGIC ## `reduceByKey` Function
125+
126+
# COMMAND ----------
127+
128+
# 6. reduceByKey
129+
print("\n### 6. reduceByKey ###")
130+
print("Description: Merge the values for each key using an associative and commutative reduce function.")
131+
pairs = [(1, 'a'),(1, '_a'), (2, 'b'), (2, '_b'), (3, 'c'), (4, 'd'), (5, 'e')]
132+
pairs_rdd = sc.parallelize(pairs)
133+
134+
# 01 Example: Sum values with the same key
135+
simple_reduceByKey = pairs_rdd.reduceByKey(lambda x, y: x + y).collect()
136+
print("01 reduceByKey example (sum values by key):", simple_reduceByKey)
137+
138+
# example_Example: Count the occurrences of each word in a list
139+
word_list = ["cat", "cat", "dog", "elephant", "dog", "dog"]
140+
word_pairs_rdd = sc.parallelize(word_list).map(lambda word: (word, 1))
141+
example__reduceByKey = word_pairs_rdd.reduceByKey(lambda x, y: x + y).collect()
142+
print("example_ reduceByKey example (word count):", example__reduceByKey)
143+
144+
# COMMAND ----------
145+
146+
# MAGIC %md
147+
# MAGIC ## `join` Function
148+
149+
# COMMAND ----------
150+
151+
# 7. join
152+
print("\n### 7. join ###")
153+
print("Description: Perform an inner join of this RDD and another one.")
154+
155+
# 01 Example: Join two RDDs by key
156+
fruits = sc.parallelize([(1, "apple"), (2, "banana")])
157+
colors = sc.parallelize([(1, "red"), (2, "yellow")])
158+
fruits_color_join = fruits.join(colors).collect()
159+
print("01 join fruits_color_join (join two RDDs):", fruits_color_join)
160+
161+
# example_Example: Join employee data with department data
162+
employees = sc.parallelize([(1, "John"), (2, "Jane"), (3, "Joe")])
163+
departments = sc.parallelize([(1, "HR"), (2, "Finance")])
164+
employees_department_join = employees.join(departments).collect()
165+
print("join example (employee-department join):", employees_department_join)
166+
167+
# COMMAND ----------
168+
169+
# MAGIC %md
170+
# MAGIC ## `cogroup` Function
171+
172+
# COMMAND ----------
173+
174+
# MAGIC %md
175+
# MAGIC TableA:
176+
# MAGIC
177+
# MAGIC | id | value |
178+
# MAGIC |----|--------|
179+
# MAGIC | 1 | apple |
180+
# MAGIC | 2 | banana |
181+
# MAGIC | 3 | orange |
182+
# MAGIC
183+
# MAGIC
184+
# MAGIC TableB:
185+
# MAGIC
186+
# MAGIC | id | color |
187+
# MAGIC |----|--------|
188+
# MAGIC | 1 | red |
189+
# MAGIC | 2 | yellow |
190+
# MAGIC
191+
# MAGIC
192+
# MAGIC Result of cogroup:
193+
# MAGIC
194+
# MAGIC | id | value | color |
195+
# MAGIC |----|--------|--------|
196+
# MAGIC | 1 | apple | red |
197+
# MAGIC | 2 | banana | yellow |
198+
# MAGIC | 3 | orange | NULL |
199+
# MAGIC
200+
# MAGIC
201+
202+
# COMMAND ----------
203+
204+
# 8. cogroup
205+
# The cogroup function in PySpark is used to group data from two RDDs that share the same key.
206+
# It combines the values of matching keys from both RDDs into a tuple of lists.
207+
print("\n### 8. cogroup ###")
208+
print("Description: Group data from two RDDs sharing the same key.")
209+
210+
# 01 Example: Cogroup two RDDs
211+
fruits_rdd = sc.parallelize([(1, "apple"), (2, "banana"), (3, "orange")])
212+
colors_rdd = sc.parallelize([(1, "red"), (2, "yellow")])
213+
cogrouped_fruits_colors = fruits_rdd.cogroup(colors_rdd).mapValues(lambda x: (list(x[0]), list(x[1]))).collect()
214+
print("01 cogroup example (group two RDDs):", cogrouped_fruits_colors)
215+
216+
217+
218+
# example_Example: Cogroup sales data with target data
219+
sales_rdd = sc.parallelize([("store1", 100), ("store2", 200)])
220+
targets_rdd = sc.parallelize([("store1", 150), ("store3", 250)])
221+
cogrouped_sales_targets = sales_rdd.cogroup(targets_rdd).mapValues(lambda x: (list(x[0]), list(x[1]))).collect()
222+
print("example_cogroup example (sales-targets cogroup):", cogrouped_sales_targets)
223+
224+
225+
# COMMAND ----------
226+
227+
# MAGIC %md
228+
# MAGIC ## `distinct` Function
229+
230+
# COMMAND ----------
231+
232+
# 9. distinct
233+
print("\n### 9. distinct ###")
234+
print("Description: Return a new RDD containing the distinct elements in this RDD.")
235+
236+
# example_Example: Unique words from a list of words
237+
words = ["cat", "dog", "cat", "elephant", "dog"]
238+
words_rdd = sc.parallelize(words)
239+
example__distinct = words_rdd.distinct().collect()
240+
print("example_distinct example (unique words):", example__distinct)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
# Databricks notebook source
2+
# MAGIC %md
3+
# MAGIC # Immutable RDDs
4+
5+
# COMMAND ----------
6+
7+
# Test Immutable RDDs
8+
numbers = [1, 2, 3, 4, 5]
9+
numbers_rdd = sc.parallelize(numbers)
10+
print(f"Original RDD ID: {numbers_rdd.id()}")
11+
print(f"Original RDD ID: {numbers_rdd.id()}")
12+
13+
# # Apply a transformation: multiply each number by 2
14+
transformed_rdd = numbers_rdd.map(lambda x: x * 2)
15+
print(f"Transformed RDD ID: {transformed_rdd.id()}")
16+
17+
# # Collect the results to trigger the computation
18+
result = transformed_rdd.collect()
19+
print(f"Transformed RDD result: {result}")
20+
21+
22+
# COMMAND ----------
23+
24+
# MAGIC %scala
25+
# MAGIC // Test Immutable RDDs
26+
# MAGIC val numbers = List(1, 2, 3, 4, 5)
27+
# MAGIC val numbersRdd = sc.parallelize(numbers)
28+
# MAGIC println(s"Original RDD ID: ${numbersRdd.id}")
29+
# MAGIC println(s"Original RDD ID: ${numbersRdd.id}")
30+
# MAGIC println(s"Original RDD ID: ${numbersRdd.id}")
31+
# MAGIC
32+
# MAGIC
33+
34+
# COMMAND ----------
35+
36+
# MAGIC %scala
37+
# MAGIC
38+
# MAGIC // numbersRdd = numbersRdd.map(x => x * 2) //OPS!!!!!!!!!!!
39+
# MAGIC
40+
# MAGIC // Apply a transformation: multiply each number by 2
41+
# MAGIC val transformedRdd = numbersRdd.map(x => x * 2)
42+
# MAGIC println(s"Transformed RDD ID: ${transformedRdd.id}")
43+
# MAGIC
44+
# MAGIC // Collect the results to trigger the computation
45+
# MAGIC val result = transformedRdd.collect()
46+
# MAGIC println(s"Transformed RDD result: ${result.mkString(", ")}")
47+
48+
# COMMAND ----------
49+
50+
# MAGIC %md
51+
# MAGIC # Immutable DF Example
52+
53+
# COMMAND ----------
54+
55+
# Create an RDD
56+
data = [("John", 28), ("Smith", 44), ("Adam", 65), ("Henry", 23)]
57+
rdd = sc.parallelize(data)
58+
59+
# Show the original RDD
60+
print("Original RDD:")
61+
for row in rdd.collect():
62+
print(row)
63+
64+
65+
# COMMAND ----------
66+
67+
68+
print(f"Original RDD ID: {rdd.id()}")
69+
70+
rdd = rdd.filter(lambda x: x[1] > 30)
71+
72+
print(f"Original RDD ID After filter: {rdd.id()}")
73+
74+
# Filter rows where the age is greater than 30
75+
filtered_rdd = rdd.filter(lambda x: x[1] > 30)
76+
print(f"Transformed RDD ID: {filtered_rdd.id()}")
77+
78+
# Show the transformed RDD
79+
print("Filtered RDD:")
80+
for row in filtered_rdd.collect():
81+
print(row)
82+
83+
# COMMAND ----------
84+
85+
# MAGIC %scala
86+
# MAGIC // Create an RDD
87+
# MAGIC val data = Seq(("John", 28), ("Smith", 44), ("Adam", 65), ("Henry", 23))
88+
# MAGIC val rdd = sc.parallelize(data)
89+
# MAGIC
90+
# MAGIC // Show the original RDD
91+
# MAGIC println("Original RDD:")
92+
# MAGIC rdd.collect().foreach(println)
93+
# MAGIC //rdd = rdd.filter{ case (name, age) => age > 30 }
94+
# MAGIC // // Filter rows where the age is greater than 30
95+
# MAGIC val filteredRdd = rdd.filter{ case (name, age) => age > 30 }
96+
# MAGIC println(s"Transformed RDD ID: ${filteredRdd.id}")
97+
# MAGIC
98+
# MAGIC // Show the transformed RDD
99+
# MAGIC println("Filtered RDD:")
100+
# MAGIC filteredRdd.collect().foreach(println)
101+
102+
# COMMAND ----------
103+
104+
# MAGIC %md
105+
# MAGIC # Spark Lazy Evaluation
106+
107+
# COMMAND ----------
108+
109+
# Create an RDD
110+
rdd = sc.parallelize([
111+
("John", 28),
112+
("Smith", 44),
113+
("Adam", 65),
114+
("Henry", 23)
115+
])
116+
117+
# Apply a map transformation to create a new RDD with a tuple including the name and a boolean flag
118+
# if the person is older than 30
119+
mapped_rdd = rdd.map(lambda x: (x[0], x[1], x[1] > 30))
120+
121+
# Filter the RDD to include only people older than 30
122+
filtered_rdd = mapped_rdd.filter(lambda x: x[2])
123+
124+
# Convert the filtered RDD back to a DataFrame
125+
df = spark.createDataFrame(filtered_rdd, ["Name", "Age", "OlderThan30"])
126+
127+
# Select only the name and age columns
128+
final_df = df.select("Name", "Age")
129+
130+
# # Collect the results which triggers the execution of all transformations
131+
results = final_df.collect()
132+
display(results)
133+
Binary file not shown.

0 commit comments

Comments
 (0)