-
Notifications
You must be signed in to change notification settings - Fork 290
/
Copy pathJavaDatasetConversion.java
170 lines (134 loc) · 4.52 KB
/
JavaDatasetConversion.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
package dataframe;
import org.apache.spark.sql.*;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import static org.apache.spark.sql.functions.col;
//
// Explore interoperability between DataFrame and Dataset. Note that Dataset
// is covered in much greater detail in the 'dataset' directory.
//
public class JavaDatasetConversion {
//
// This must be a JavaBean in order for Spark to infer a schema for it
//
public static class Cust implements Serializable {
private int id;
private String name;
private double sales;
private double discount;
private String state;
public Cust(int id, String name, double sales, double discount, String state) {
this.id = id;
this.name = name;
this.sales = sales;
this.discount = discount;
this.state = state;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public double getSales() {
return sales;
}
public void setSales(double sales) {
this.sales = sales;
}
public double getDiscount() {
return discount;
}
public void setDiscount(double discount) {
this.discount = discount;
}
public String getState() {
return state;
}
public void setState(String state) {
this.state = state;
}
}
//
// A smaller JavaBean for a subset of the fields
//
public static class StateSales implements Serializable {
private double sales;
private String state;
public StateSales(int id, String name, double sales, double discount, String state) {
this.sales = sales;
this.state = state;
}
public double getSales() {
return sales;
}
public void setSales(double sales) {
this.sales = sales;
}
public String getState() {
return state;
}
public void setState(String state) {
this.state = state;
}
}
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("DataFrame-Java-DatasetConversion")
.master("local[4]")
.getOrCreate();
//
// The Java API requires you to explicitly instantiate an encoder for
// any JavaBean you want to use for schema inference
//
Encoder<Cust> custEncoder = Encoders.bean(Cust.class);
//
// Create a container of the JavaBean instances
//
List<Cust> data = Arrays.asList(
new Cust(1, "Widget Co", 120000.00, 0.00, "AZ"),
new Cust(2, "Acme Widgets", 410500.00, 500.00, "CA"),
new Cust(3, "Widgetry", 410500.00, 200.00, "CA"),
new Cust(4, "Widgets R Us", 410500.00, 0.0, "CA"),
new Cust(5, "Ye Olde Widgete", 500.00, 0.0, "MA")
);
//
// Use the encoder and the container of JavaBean instances to create a
// Dataset
//
Dataset<Cust> ds = spark.createDataset(data, custEncoder);
System.out.println("*** here is the schema inferred from the Cust bean");
ds.printSchema();
System.out.println("*** here is the data");
ds.show();
//
// Querying a Dataset of any type results in a
// DataFrame (i.e. Dastaset<Row>)
//
Dataset<Row> smallerDF =
ds.select("sales", "state").filter(col("state").equalTo("CA"));
System.out.println("*** here is the dataframe schema");
smallerDF.printSchema();
System.out.println("*** here is the data");
smallerDF.show();
//
// But a Dataset<Row> can be converted back to a Dataset of some other
// type by using another bean encoder
//
Encoder<StateSales> stateSalesEncoder = Encoders.bean(StateSales.class);
Dataset<StateSales> stateSalesDS = smallerDF.as(stateSalesEncoder);
System.out.println("*** here is the schema inferred from the StateSales bean");
stateSalesDS.printSchema();
System.out.println("*** here is the data");
stateSalesDS.show();
spark.stop();
}
}