Skip to content

Commit

Permalink
modify tunnel.py with print_conn_err() and is_internet_connected()
Browse files Browse the repository at this point in the history
  • Loading branch information
mikeqfu committed Nov 10, 2020
1 parent aa5325b commit a2df5ad
Showing 1 changed file with 97 additions and 70 deletions.
167 changes: 97 additions & 70 deletions pyrcs/other_assets/tunnel.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from pyhelpers.text import find_similar_str

from pyrcs.utils import cd_dat, get_catalogue, get_last_updated_date, homepage_url, \
parse_tr
parse_tr, print_conn_err, is_internet_connected


class Tunnels:
Expand All @@ -33,6 +33,9 @@ class Tunnels:
:param update: whether to check on update and proceed to update the package data,
defaults to ``False``
:type update: bool
:param verbose: whether to print relevant information in console as the function runs,
defaults to ``True``
:type verbose: bool or int
**Example**::
Expand All @@ -47,28 +50,32 @@ class Tunnels:
http://www.railwaycodes.org.uk/tunnels/tunnels0.shtm
"""

def __init__(self, data_dir=None, update=False):
def __init__(self, data_dir=None, update=False, verbose=True):
"""
Constructor method.
"""
self.Name = 'Railway tunnel lengths'
self.Key = 'Tunnels'
self.LUDKey = 'Last updated date'
self.HomeURL = homepage_url()
self.SourceURL = urllib.parse.urljoin(self.HomeURL, '/tunnels/tunnels0.shtm')

self.Key = 'Tunnels'

self.LUDKey = 'Last updated date'
self.Date = get_last_updated_date(self.SourceURL, parsed=True, as_date_type=False,
verbose=verbose)

self.Catalogue = get_catalogue(self.SourceURL, update=update,
confirmation_required=False)

self.P1Key, self.P2Key, self.P3Key, self.P4Key = list(self.Catalogue.keys())[1:]
self.Date = get_last_updated_date(self.SourceURL, parsed=True, as_date_type=False)

self.DataDir = validate_input_data_dir(data_dir) if data_dir \
else cd_dat("other-assets", self.Key.lower())

if data_dir:
self.DataDir = validate_input_data_dir(data_dir)
else:
self.DataDir = cd_dat("other-assets", self.Key.lower())
self.CurrentDataDir = copy.copy(self.DataDir)

def cdd_tunnels(self, *sub_dir, **kwargs):
def _cdd_tnl(self, *sub_dir, **kwargs):
"""
Change directory to package data directory and sub-directories (and/or a file).
Expand Down Expand Up @@ -164,7 +171,7 @@ def parse_length(x):
measurement.measures.Distance(yd=yards).m
return length, add_info

def collect_tunnel_lengths_by_page(self, page_no, update=False, verbose=False):
def collect_lengths_by_page(self, page_no, update=False, verbose=False):
"""
Collect data of railway tunnel lengths for a page number from source web page.
Expand All @@ -186,13 +193,13 @@ def collect_tunnel_lengths_by_page(self, page_no, update=False, verbose=False):
>>> tunnels = Tunnels()
>>> tunnel_len_1 = tunnels.collect_tunnel_lengths_by_page(page_no=1)
>>> tunnel_len_1 = tunnels.collect_lengths_by_page(page_no=1)
>>> type(tunnel_len_1)
<class 'dict'>
>>> print(list(tunnel_len_1.keys()))
['Page 1 (A-F)', 'Last updated date']
>>> tunnel_len_4 = tunnels.collect_tunnel_lengths_by_page(page_no=4)
>>> tunnel_len_4 = tunnels.collect_lengths_by_page(page_no=4)
>>> type(tunnel_len_4)
<class 'dict'>
>>> print(list(tunnel_len_4.keys()))
Expand All @@ -202,72 +209,82 @@ def collect_tunnel_lengths_by_page(self, page_no, update=False, verbose=False):
assert page_no in range(1, 5)
page_name = find_similar_str(str(page_no), list(self.Catalogue.keys()))

pickle_filename = \
re.sub(r"[()]", "", re.sub(r"[ -]", "-", page_name)).lower() + ".pickle"
path_to_pickle = self.cdd_tunnels(pickle_filename)
pickle_filename_ = re.sub(r"[()]", "", re.sub(r"[ -]", "-", page_name)).lower()
path_to_pickle = self._cdd_tnl(pickle_filename_ + ".pickle")

if os.path.isfile(path_to_pickle) and not update:
page_railway_tunnel_lengths = load_pickle(path_to_pickle)

else:
url = self.Catalogue[page_name]

try:
last_updated_date = get_last_updated_date(url)
except Exception as e:
print("Failed to find the last updated date for tunnel lengths data of "
"{}. {}".format(page_name, e))
last_updated_date = None
page_railway_tunnel_lengths = None

if verbose == 2:
print("Collecting data of {} on {}".format(self.Key.lower(), page_name),
end=" ... ")

try:
source = requests.get(url, headers=fake_requests_headers())
parsed_text = bs4.BeautifulSoup(source.text, 'lxml')

headers = []
temp_header = parsed_text.find('table')
while temp_header.find_next('th'):
header = [x.text for x in temp_header.find_all('th')]
if len(header) > 0:
crossed = [re.match('^Between.*', x) for x in header]
if any(crossed):
idx = list(itertools.compress(range(len(crossed)), crossed))
assert len(idx) == 1
header.remove(header[idx[0]])
header[idx[0]:idx[0]] = ['Station_O', 'Station_D']
headers.append(header)
temp_header = temp_header.find_next('table')

tbl_lst = operator.itemgetter(
1, len(parsed_text.find_all('h3')) + 1)(parsed_text.find_all('table'))
tbl_lst = [parse_tr(header, x.find_all('tr'))
for header, x in zip(headers, tbl_lst)]
tbl_lst = [
[[item.replace('\xa0', '') for item in record] for record in tbl]
for tbl in tbl_lst]

tunnel_lengths = [pd.DataFrame(tbl, columns=header)
for tbl, header in zip(tbl_lst, headers)]

for i in range(len(tunnel_lengths)):
tunnel_lengths[i][['Length_metres', 'Length_notes']] = \
tunnel_lengths[i].Length.map(
self.parse_length).apply(pd.Series)

if len(tunnel_lengths) == 1:
tunnel_lengths_data = tunnel_lengths[0]
else:
tunnel_lengths_data = dict(
zip([x.text for x in parsed_text.find_all('h3')], tunnel_lengths))

except Exception as e:
print("Failed to collect tunnel lengths data of {}. {}".format(
page_name, e))
tunnel_lengths_data = None
except requests.ConnectionError:
print("Failed. ") if verbose == 2 else ""
print_conn_err(verbose=verbose)

page_railway_tunnel_lengths = {page_name: tunnel_lengths_data,
self.LUDKey: last_updated_date}

save_pickle(page_railway_tunnel_lengths, path_to_pickle, verbose=verbose)
else:
try:
parsed_text = bs4.BeautifulSoup(source.text, 'lxml')

headers = []
temp_header = parsed_text.find('table')
while temp_header.find_next('th'):
header = [x.text for x in temp_header.find_all('th')]
if len(header) > 0:
crossed = [re.match('^Between.*', x) for x in header]
if any(crossed):
idx = list(itertools.compress(range(len(crossed)), crossed))
assert len(idx) == 1
header.remove(header[idx[0]])
header[idx[0]:idx[0]] = ['Station_O', 'Station_D']
headers.append(header)
temp_header = temp_header.find_next('table')

tbl_lst = operator.itemgetter(
1, len(parsed_text.find_all('h3')) + 1)(
parsed_text.find_all('table'))
tbl_lst = [
parse_tr(header, x.find_all('tr'))
for header, x in zip(headers, tbl_lst)]
tbl_lst = [
[[item.replace('\xa0', '') for item in record] for record in tbl]
for tbl in tbl_lst]

tunnel_lengths = [pd.DataFrame(tbl, columns=header)
for tbl, header in zip(tbl_lst, headers)]

for i in range(len(tunnel_lengths)):
tunnel_lengths[i][['Length_metres', 'Length_notes']] = \
tunnel_lengths[i].Length.map(
self.parse_length).apply(pd.Series)

if len(tunnel_lengths) == 1:
tunnel_lengths_data = tunnel_lengths[0]
else:
tunnel_lengths_data = dict(
zip([x.text for x in parsed_text.find_all('h3')],
tunnel_lengths))

last_updated_date = get_last_updated_date(url)

print("Done. ") if verbose == 2 else ""

page_railway_tunnel_lengths = {page_name: tunnel_lengths_data,
self.LUDKey: last_updated_date}

save_pickle(page_railway_tunnel_lengths, path_to_pickle,
verbose=verbose)

except Exception as e:
print("Failed. {}".format(e))

return page_railway_tunnel_lengths

Expand Down Expand Up @@ -312,10 +329,20 @@ def fetch_tunnel_lengths(self, update=False, pickle_it=False, data_dir=None,
['Page 1 (A-F)', 'Page 2 (G-P)', 'Page 3 (Q-Z)', 'Page 4 (others)']
"""

verbose_ = False if data_dir or not verbose else True
verbose_ = False if (data_dir or not verbose) else (2 if verbose == 2 else True)

page_data = [
self.collect_tunnel_lengths_by_page(page_no, update, verbose=verbose_)
for page_no in range(1, 5)]
self.collect_lengths_by_page(
x, update=update, verbose=verbose_ if is_internet_connected() else False)
for x in range(1, 5)]

if all(x is None for x in page_data):
if update:
print_conn_err(verbose=verbose)
print("No data of the {} has been freshly collected.".format(
self.Key.lower()))
page_data = [self.collect_lengths_by_page(x, update=False, verbose=verbose_)
for x in range(1, 5)]

railway_tunnel_lengths = {
self.Key: {next(iter(x)): next(iter(x.values())) for x in page_data},
Expand Down

0 comments on commit a2df5ad

Please sign in to comment.