-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrequetage.py
137 lines (119 loc) · 5.31 KB
/
requetage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# -*- coding: utf-8 -*-
"""
This code enables to:
- connect to the Cassandra cluster on a specified keyspace
- send queries to the Cassandra table containing the preprocessed data (with spark)
- the queries are built with Python:
1. select cities being inside a 500km radius circle whose center is the seism epicenter
2. select telephone numbers, latitude, longitude corresponding to these cities, in a time range from t0 = date_seism to t0 + 1hour
Create:
- CREATE KEYSPACE test WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': 2 };
- create table cassandraresult (tel text,lat text,longi text, PRIMARY KEY (tel, lat));
- create table test_spark_bigText(t timestamp, id_ville text, tels text, primary key ((t,id_ville)));
"""
from cassandra.cluster import Cluster
# Connect to the cluster
cluster = Cluster()
session = cluster.connect()
# PARAMETRES
session.execute("USE test;")
#-------------------------------------------------------------------------------------------------#
# select cities, send queries to Cassandra
#-------------------------------------------------------------------------------------------------#
from selection_villes import findListVilles, getClosest
import datetime
import time
import math
from cassandra.query import BatchStatement
from cassandra.query import SimpleStatement
import os
# round hour e.g. 23:44 -> 23:40
def round_up(tm):
upmins = math.ceil(float(tm.minute)/10-1)*10
diffmins = upmins - tm.minute
newtime = tm + datetime.timedelta(minutes=diffmins)
newtime = newtime.replace(second=0)
return newtime
# function that insert the results of a the queries to a Cassandra table "cassandraresult"
def insertbatch(rowsToAdd,session, seismetime, warnedTime):
batch = BatchStatement()
for row in rowsToAdd:
batch.add(SimpleStatement("INSERT INTO cassandraresult(seismeTime,tel,lat,longi,warnedTime) values(%s , %s , %s , %s , %s)"),(seismetime , row[2] , row[0] , row[1] , str(warnedTime)))
session.execute(batch)
# select Tel, lat and long being in the cities in the seism area: perform queries
def Requetage(SeismeLatitude,SeismeLongitude, timestampTdT):
start=datetime.datetime.now()
# select villes
Villes=findListVilles(SeismeLatitude,SeismeLongitude)
# convert string to datetime
time = round_up(datetime.datetime.strptime(timestampTdT, '%Y-%m-%d %H:%M'))
Intervalles=[time.strftime('%Y-%m-%d %H:%M')]
Result = []
Warnedtab= []
WarnedCounter = 0
# select an hour from timestampTdT
for i in range(10,70,10):
time = time - datetime.timedelta(0,0,0,0,10,0,0)
# convert time to string
strTime = time.strftime('%Y-%m-%d %H:%M')
Intervalles.append(strTime)
# request on CASSANDRA, batch size = 10000
for ville in Villes:
for t in Intervalles:
Result = session.execute("SELECT tels FROM test_spark_bigtext WHERE T = %s AND Id_Ville = %s;", (t, ville))
print "ville : " , ville
print "t : ", t
if Result:
rows = Result[0].tels.split("|")
Batch = []
batchSize=0
i = 0
for row in (rows[:-1]):
i+=1
batchSize=batchSize+1
Batch.append(row.split("/"))
if(batchSize==10000):
warnedTime = (datetime.datetime.now() - start).total_seconds()
insertbatch(Batch,session, timestampTdT,warnedTime)
WarnedCounter = WarnedCounter + 10000
Warnedtab.append((WarnedCounter,warnedTime))
Batch = []
warnedTime = (datetime.datetime.now() - start).total_seconds()
WarnedCounter = WarnedCounter + len(Batch)
Warnedtab.append((WarnedCounter,warnedTime))
insertbatch(Batch,session, timestampTdT, warnedTime)
#print "insert batch " + str(i)
timediff=(datetime.datetime.now() - start).total_seconds()
delai = 0
if len(Warnedtab)>0:
totalWarned = Warnedtab[-1][0]
threshold = 0.8 * totalWarned
for i,j in Warnedtab:
if i>threshold:
delai = j
break;
print "Contacted cities : "+ str(Villes)
print "Intervalles : "+ str(Intervalles)
print "Messages sent : "+ str(totalWarned)
print "Total process time : " + str(timediff) +" seconds"
print "Time to warn 80% : "+str(delai)+" seconds"
else:
print " No one has been warned ! Tooo baaaad !!"
return Result
#------------------------------------------------------------------------------------------------#
# Requête
# seism info:
Lat_seism = float(raw_input('latitude : '))#35.01
Long_seism = float(raw_input('longitude : '))#135.0
time_seism = raw_input('datetime YYYY-MM-DD HH:MM: ')#'2015-01-25 10:50'
'''
#IPaddresses of the 5 clusters
IPaddressesTables=['172.31.53.38','172.31.53.39','172.31.53.40','172.31.53.41', '172.31.53.41']
# Shut the closest node down
_,nodeToCut=getClosest(Lat_seism, Long_seism)
os.system("nodetool -h "+IPaddressesTables[nodeToCut]+" stopdaemon")
# parametre
session.execute("USE test;")
'''
# run functions
Result = Requetage(Lat_seism, Long_seism, time_seism)