forked from thangbk2209/google_cluster_trace_analysis
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtimeSeriesTaskUsage.py
52 lines (48 loc) · 2.61 KB
/
timeSeriesTaskUsage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
from pyspark.sql.session import SparkSession as spark
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from matplotlib import pyplot
import os
sc = SparkContext(appName="Task_usage")
sql_context = SQLContext(sc)
# folder_path ='/mnt/volume/ggcluster/clusterdata-2011-2/task_usage/'
folder_path = '/home/nguyen/spark-lab/spark-2.1.1-bin-hadoop2.7/analysis/ggcluster/'
dataSchema = StructType([StructField('startTime', StringType(), True),
StructField('endTime', StringType(), True),
StructField('JobId', LongType(), True),
StructField('taskIndex', LongType(), True),
StructField('machineId', LongType(), True),
StructField('meanCPUUsage', FloatType(), True),
# canonical memory usage
StructField('CMU', FloatType(), True),
# assigned memory usage
StructField('AssignMem', FloatType(), True),
# unmapped page cache memory usage
StructField('unmapped_cache_usage', FloatType(), True),
StructField('page_cache_usage', FloatType(), True),
StructField('max_mem_usage', FloatType(), True),
StructField('mean_diskIO_time', FloatType(), True),
StructField('mean_local_disk_space', FloatType(), True),
StructField('max_cpu_usage', FloatType(), True),
StructField('max_disk_io_time', FloatType(), True),
StructField('cpi', FloatType(), True),
StructField('mai', FloatType(), True),
StructField('sampling_portion', FloatType(), True),
StructField('agg_type', FloatType(), True),
StructField('sampled_cpu_usage', FloatType(), True)])
df = (
sql_context.read
.format('com.databricks.spark.csv')
.schema(dataSchema)
.load("/mnt/volume/ggcluster/spark-2.1.1-bin-hadoop2.7/thangbk2209/results/out.csv")
)
df.createOrReplaceTempView("dataFrame")
# df.printSchema()
sumCPUUsage = sql_context.sql("SELECT JobId, count(taskIndex) as countTask from dataFrame group by JobId order by countTask")
# sumCPUUsage.show(5000)
schema_df = ["startTime","numberOfJob"]
sumCPUUsage.toPandas().to_csv('thangbk2209/results/listJobId-countTask.csv', index=False, header=None)
# sumCPUUsage.write.save("results/test.csv", format="csv", columns=schema_df)
sc.stop()