From ab2bb4b6cc52b0a74618f87114de77d4b25acbbf Mon Sep 17 00:00:00 2001 From: frgfm Date: Sun, 12 Sep 2021 17:18:32 +0200 Subject: [PATCH 01/10] style: Reindented code properly --- pyroengine/engine/predictor.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pyroengine/engine/predictor.py b/pyroengine/engine/predictor.py index f0552234..cb37b845 100644 --- a/pyroengine/engine/predictor.py +++ b/pyroengine/engine/predictor.py @@ -23,11 +23,12 @@ def __init__(self): normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) img_size = 448 - self.tf = transforms.Compose([transforms.Resize(size=img_size), - transforms.CenterCrop(size=img_size), - transforms.ToTensor(), - normalize - ]) + self.tf = transforms.Compose([ + transforms.Resize(size=img_size), + transforms.CenterCrop(size=img_size), + transforms.ToTensor(), + normalize + ]) def predict(self, im): """Run prediction""" From f88d2b2dfaa335787ea2a2ccad2d422cb20aa2f7 Mon Sep 17 00:00:00 2001 From: frgfm Date: Sun, 12 Sep 2021 17:18:54 +0200 Subject: [PATCH 02/10] feat: Added a pending alert cache mechanism --- pyroengine/engine/engine.py | 232 ++++++++++++++++++++++++------------ 1 file changed, 158 insertions(+), 74 deletions(-) diff --git a/pyroengine/engine/engine.py b/pyroengine/engine/engine.py index 543f53c4..23be3819 100644 --- a/pyroengine/engine/engine.py +++ b/pyroengine/engine/engine.py @@ -4,7 +4,15 @@ # See LICENSE or go to for full license details. import io +import os +import json import logging +from PIL import Image +from requests.exceptions import ConnectionError +from datetime import datetime, timedelta +from collections import deque +from typing import Optional, Dict + from pyroclient import client from .predictor import PyronearPredictor @@ -16,97 +24,119 @@ class PyronearEngine: not based on this image. Args: - detection_threshold (float): wildfire detection threshold in [0, 1] - api_url (str): url of the pyronear API - pi_zero_credentials (Dict): api credectials for each pizero, the dictionary should as the one in the example - save_every_n_frame (int): Send one frame over N to the api for our dataset - latitude (float): device latitude - longitude (float): device longitude + detection_thresh: wildfire detection threshold in [0, 1] + api_url: url of the pyronear API + client_creds: api credectials for each pizero, the dictionary should as the one in the example + frame_saving_period: Send one frame over N to the api for our dataset + latitude: device latitude + longitude: device longitude + cache_size: maximum number of alerts to save in cache + alert_relaxation: number of consecutive positive detections required to send the first alert, and also + the number of consecutive negative detections before stopping the alert + cache_backup_period: number of minutes between each cache backup to disk Examples: - >>> pi_zero_credentials ={} - >>> pi_zero_credentials['pi_zero_id_1']={'login':'log1', 'password':'pwd1'} - >>> pi_zero_credentials['pi_zero_id_2']={'login':'log2', 'password':'pwd2'} - >>> pyroEngine = PyronearEngine(0.6, 'https://api.pyronear.org', pi_zero_credentials, 50) - >>> pyroEngine.run() + >>> client_creds ={} + >>> client_creds['pi_zero_id_1']={'login':'log1', 'password':'pwd1'} + >>> client_creds['pi_zero_id_2']={'login':'log2', 'password':'pwd2'} + >>> pyroEngine = PyronearEngine(0.6, 'https://api.pyronear.org', client_creds, 50) """ def __init__( self, - detection_threshold=0.5, - api_url=None, - pi_zero_credentials=None, - save_evry_n_frame=None, - latitude=None, - longitude=None - ): + detection_thresh: float = 0.5, + api_url: Optional[str] = None, + client_creds: Optional[Dict[str, str]] = None, + frame_saving_period: Optional[int] = None, + latitude: Optional[float] = None, + longitude: Optional[float] = None, + cache_size: int = 100, + alert_relaxation: int = 3, + cache_backup_period: int = 60, + ) -> None: """Init engine""" # Engine Setup self.pyronearPredictor = PyronearPredictor() - self.detection_threshold = detection_threshold - self.detection_counter = {} - self.event_appening = {} - self.frames_counter = {} - self.save_evry_n_frame = save_evry_n_frame - if pi_zero_credentials is not None: - for pi_zero_id in pi_zero_credentials.keys(): - self.detection_counter[pi_zero_id] = 0 - self.event_appening[pi_zero_id] = False - self.frames_counter[pi_zero_id] = 0 - else: - self.detection_counter['-1'] = 0 - self.event_appening['-1'] = False + self.detection_thresh = detection_thresh + self.frame_saving_period = frame_saving_period + self.alert_relaxation = alert_relaxation # API Setup - self.use_api = False self.api_url = api_url self.latitude = latitude self.longitude = longitude - if self.api_url is not None: - self.use_api = True - self.init_api(pi_zero_credentials) + + # Var initialization self.stream = io.BytesIO() + self.consec_dets = {} + self.ongoing_alert = {} + self.frames_counter = {} + if isinstance(client_creds, dict): + for pi_zero_id in client_creds.keys(): + self.consec_dets[pi_zero_id] = 0 + self.frames_counter[pi_zero_id] = 0 + self.ongoing_alert[pi_zero_id] = False + else: + self.consec_dets['-1'] = 0 - def predict(self, frame, pi_zero_id=None): + if self.api_url is not None: + # Instantiate clients for each camera + self.api_client = {} + for _id, vals in client_creds.items(): + self.api_client[_id] = client.Client(self.api_url, vals['login'], vals['password']) + + # Restore pending alerts cache + self.pending_alerts = deque([], cache_size) + self.load_cache_from_disk() + self.cache_backup_period = cache_backup_period + self.last_cache_dump = datetime.utcnow() + + def predict(self, frame: Image.Image, pi_zero_id: Optional[int] = None) -> float: """ run prediction on comming frame""" - res = self.pyronearPredictor.predict(frame.convert('RGB')) # run prediction + prob = self.pyronearPredictor.predict(frame.convert('RGB')) # run prediction if pi_zero_id is None: - logging.info(f"Wildfire detection score ({res:.2%})") + logging.info(f"Wildfire detection score ({prob:.2%})") else: self.heartbeat(pi_zero_id) - logging.info(f"Wildfire detection score ({res:.2%}), on device {pi_zero_id}") + logging.info(f"Wildfire detection score ({prob:.2%}), on device {pi_zero_id}") - if res > self.detection_threshold: + # Alert + if prob > self.detection_thresh: if pi_zero_id is None: pi_zero_id = '-1' # add default key value - if not self.event_appening[pi_zero_id]: - self.detection_counter[pi_zero_id] += 1 - # Ensure counter max value is 3 - if self.detection_counter[pi_zero_id] > 3: - self.detection_counter[pi_zero_id] = 3 + # Don't increment beyond relaxation + if not ongoing_alert[pi_zero_id] and self.consec_dets[pi_zero_id] < self.alert_relaxation: + self.consec_dets[pi_zero_id] += 1 - # If counter reach 3, start sending alerts - if self.detection_counter[pi_zero_id] == 3: - self.event_appening[pi_zero_id] = True + if self.consec_dets[pi_zero_id] == self.alert_relaxation: + self.ongoing_alert[pi_zero_id] = True - if self.use_api and self.event_appening[pi_zero_id]: - frame.save(self.stream, format='JPEG') - # Send alert to the api - self.send_alert(pi_zero_id) - self.stream.seek(0) # "Rewind" the stream to the beginning so we can read its content + if isinstance(self.api_url, str) and self.ongoing_alert[pi_zero_id]: + # Save the alert in cache to avoid connection issues + self.save_to_cache(frame, pi_zero_id) + # No wildfire else: - if self.detection_counter[pi_zero_id] > 0: - self.detection_counter[pi_zero_id] -= 1 - - if self.detection_counter[pi_zero_id] == 0 and self.event_appening[pi_zero_id]: - # Stop event - self.event_appening[pi_zero_id] = False + if self.consec_dets[pi_zero_id] > 0: + self.consec_dets[pi_zero_id] -= 1 + # Consider event as finished + if self.consec_dets[pi_zero_id] == 0: + self.ongoing_alert[pi_zero_id] = False + + # Uploading pending alerts + if len(self.pending_alerts) > 0: + self.upload_pending_alerts() + + # Check if it's time to backup pending alerts + ts = datetime.utcnow() + if ts > self.last_cache_dump + timedelta(minutes=self.cache_backup_period): + self.save_cache_to_disk() + self.last_cache_dump = ts # save frame - if self.use_api and self.save_evry_n_frame: + if isinstance(self.api_url, str) and isinstance(self.frame_saving_period, int) and isinstance(pi_zero_id, int): self.frames_counter[pi_zero_id] += 1 - if self.frames_counter[pi_zero_id] == self.save_evry_n_frame: + if self.frames_counter[pi_zero_id] == self.frame_saving_period: # Reset frame counter self.frames_counter[pi_zero_id] = 0 # Send frame to the api @@ -114,32 +144,86 @@ def predict(self, frame, pi_zero_id=None): self.save_frame(pi_zero_id) self.stream.seek(0) # "Rewind" the stream to the beginning so we can read its content - return res + return prob - def init_api(self, pi_zero_credentials): - """Setup api""" - self.api_client = {} - for pi_zero_id in pi_zero_credentials.keys(): - self.api_client[pi_zero_id] = client.Client(self.api_url, pi_zero_credentials[pi_zero_id]['login'], - pi_zero_credentials[pi_zero_id]['password']) - - def send_alert(self, pi_zero_id): + def send_alert(self, pi_zero_id: int) -> None: """Send alert""" - logging.info("Send alert !") + logging.info("Sending alert...") # Create a media media_id = self.api_client[pi_zero_id].create_media_from_device().json()["id"] # Create an alert linked to the media and the event self.api_client[pi_zero_id].send_alert_from_device(lat=self.latitude, lon=self.longitude, media_id=media_id) self.api_client[pi_zero_id].upload_media(media_id=media_id, media_data=self.stream.getvalue()) - def save_frame(self, pi_zero_id): + def upload_frame(self, pi_zero_id: int) -> None: """Save frame""" - logging.info("Upload media for dataset") + logging.info("Uploading media...") # Create a media media_id = self.api_client[pi_zero_id].create_media_from_device().json()["id"] # Send media self.api_client[pi_zero_id].upload_media(media_id=media_id, media_data=self.stream.getvalue()) - def heartbeat(self, pi_zero_id): + def heartbeat(self, pi_zero_id: int) -> None: """Updates last ping of device""" self.api_client[pi_zero_id].heartbeat() + + def save_to_cache(self, frame: Image.Image, pi_zero_id: int) -> None: + # Store information in the queue + self.pending_alerts.append( + {"frame": frame, "pi_zero_id": pi_zero_id, "ts": datetime.utcnow()} + ) + + def upload_pending_alerts(self) -> None: + + for _ in range(len(self.pending_alerts)): + # try to upload the oldest element + frame_info = self.pending_alerts[0] + + try: + frame_info['frame'].save(self.stream, format='JPEG') + # Send alert to the api + self.send_alert(frame_info['pi_zero_id']) + # No need to upload it anymore + self.pending_alerts.popleft() + logging.info(f"Alert sent by device {frame_info['pi_zero_id']}") + except ConnectionError: + logging.warning(f"Unable to upload cache for device {frame_info['pi_zero_id']}") + self.stream.seek(0) # "Rewind" the stream to the beginning so we can read its content + break + + def save_cache_to_disk(self) -> None: + + # Remove previous dump + if os.path.exists('pending_alerts.json'): + with open('pending_alerts.json', 'rb') as f: + data = json.load(f) + + for entry in data: + os.remove(entry['frame_path']) + os.remove('pending_alerts.json') + + data_ = [] + for idx, info in enumerate(self.pending_alerts): + # Save frame to disk + info['frame'].save(f"pending_frame{idx}.jpg") + + # Save path in JSON + data.append({"frame_path": f"pending_frame{idx}.jpg", "pi_zero_id": info["pi_zero_id"], "ts": info['ts']}) + + # JSON dump + if len(data) > 0: + with open('pending_alerts.json', 'w') as f: + json.dump(data, f) + + def load_cache_from_disk(self) -> None: + # Read json + if os.path.exists('pending_alerts.json'): + with open('pending_alerts.json', 'rb') as f: + data = json.load(f) + + for entry in data: + # Open image + frame = Image.open(entry['frame_path'], mode='r') + self.pending_alerts.append( + {"frame": frame, "pi_zero_id": entry['pi_zero_id'], "ts": entry['ts']} + ) From 7f66e5df43a4940038681d6c171f02fe7f3c1f09 Mon Sep 17 00:00:00 2001 From: frgfm Date: Sun, 12 Sep 2021 17:19:03 +0200 Subject: [PATCH 03/10] chore: Updated requirements --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index 85d1085e..9b405b38 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ pyrovision >= 0.1.2 python-dotenv >= 0.15.0 +requests>=2.25.1 pyroclient@git+https://github.com/pyronear/pyro-api.git#egg=pyroclient&subdirectory=client pandas>=0.25.2 psutil From dab99105c57a3fc27167a49fffd0ab3e5e2655b5 Mon Sep 17 00:00:00 2001 From: frgfm Date: Sun, 12 Sep 2021 17:19:12 +0200 Subject: [PATCH 04/10] test: Updated unittest --- test/test_engine.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/test/test_engine.py b/test/test_engine.py index 1ff433a0..26b912b2 100644 --- a/test/test_engine.py +++ b/test/test_engine.py @@ -27,6 +27,9 @@ def test_engine(self): self.assertGreater(res, 0.5) + # Check backup + engine.save_cache_to_disk() + if __name__ == '__main__': unittest.main() From 7787bdd923e4fd50d0fc47b3835754bc1db37378 Mon Sep 17 00:00:00 2001 From: frgfm Date: Sun, 12 Sep 2021 17:39:54 +0200 Subject: [PATCH 05/10] fix: Fixed typos --- pyroengine/engine/engine.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyroengine/engine/engine.py b/pyroengine/engine/engine.py index 23be3819..bfc5299b 100644 --- a/pyroengine/engine/engine.py +++ b/pyroengine/engine/engine.py @@ -105,7 +105,7 @@ def predict(self, frame: Image.Image, pi_zero_id: Optional[int] = None) -> float pi_zero_id = '-1' # add default key value # Don't increment beyond relaxation - if not ongoing_alert[pi_zero_id] and self.consec_dets[pi_zero_id] < self.alert_relaxation: + if not self.ongoing_alert[pi_zero_id] and self.consec_dets[pi_zero_id] < self.alert_relaxation: self.consec_dets[pi_zero_id] += 1 if self.consec_dets[pi_zero_id] == self.alert_relaxation: @@ -202,7 +202,7 @@ def save_cache_to_disk(self) -> None: os.remove(entry['frame_path']) os.remove('pending_alerts.json') - data_ = [] + data = [] for idx, info in enumerate(self.pending_alerts): # Save frame to disk info['frame'].save(f"pending_frame{idx}.jpg") From b5c846c85767130ba43b8593d177f2b3dd0c96e6 Mon Sep 17 00:00:00 2001 From: frgfm Date: Sun, 12 Sep 2021 17:42:44 +0200 Subject: [PATCH 06/10] feat: Made backup path flexible --- pyroengine/engine/engine.py | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/pyroengine/engine/engine.py b/pyroengine/engine/engine.py index bfc5299b..32511442 100644 --- a/pyroengine/engine/engine.py +++ b/pyroengine/engine/engine.py @@ -8,6 +8,7 @@ import json import logging from PIL import Image +from pathlib import Path from requests.exceptions import ConnectionError from datetime import datetime, timedelta from collections import deque @@ -89,6 +90,7 @@ def __init__( self.load_cache_from_disk() self.cache_backup_period = cache_backup_period self.last_cache_dump = datetime.utcnow() + self._backup_folder = Path("data/") # with Docker, the path has to be a bind volume def predict(self, frame: Image.Image, pi_zero_id: Optional[int] = None) -> float: """ run prediction on comming frame""" @@ -194,31 +196,37 @@ def upload_pending_alerts(self) -> None: def save_cache_to_disk(self) -> None: # Remove previous dump - if os.path.exists('pending_alerts.json'): - with open('pending_alerts.json', 'rb') as f: + json_path = _backup_folder.joinpath('pending_alerts.json') + if json_path.is_file(): + with open(json_path, 'rb') as f: data = json.load(f) for entry in data: os.remove(entry['frame_path']) - os.remove('pending_alerts.json') + os.remove(json_path) data = [] for idx, info in enumerate(self.pending_alerts): # Save frame to disk - info['frame'].save(f"pending_frame{idx}.jpg") + info['frame'].save(_backup_folder.joinpath(f"pending_frame{idx}.jpg")) # Save path in JSON - data.append({"frame_path": f"pending_frame{idx}.jpg", "pi_zero_id": info["pi_zero_id"], "ts": info['ts']}) + data.append({ + "frame_path": str(_backup_folder.joinpath(f"pending_frame{idx}.jpg")), + "pi_zero_id": info["pi_zero_id"], + "ts": info['ts'] + }) # JSON dump if len(data) > 0: - with open('pending_alerts.json', 'w') as f: + with open(json_path, 'w') as f: json.dump(data, f) def load_cache_from_disk(self) -> None: # Read json - if os.path.exists('pending_alerts.json'): - with open('pending_alerts.json', 'rb') as f: + json_path = _backup_folder.joinpath('pending_alerts.json') + if json_path.is_file(): + with open(json_path, 'rb') as f: data = json.load(f) for entry in data: From 005d102755dcf2d30b22c889b17180835b9d2111 Mon Sep 17 00:00:00 2001 From: frgfm Date: Sun, 12 Sep 2021 17:59:22 +0200 Subject: [PATCH 07/10] fix: Fixed typo --- pyroengine/engine/engine.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pyroengine/engine/engine.py b/pyroengine/engine/engine.py index 32511442..eeeb7af5 100644 --- a/pyroengine/engine/engine.py +++ b/pyroengine/engine/engine.py @@ -196,7 +196,7 @@ def upload_pending_alerts(self) -> None: def save_cache_to_disk(self) -> None: # Remove previous dump - json_path = _backup_folder.joinpath('pending_alerts.json') + json_path = self._backup_folder.joinpath('pending_alerts.json') if json_path.is_file(): with open(json_path, 'rb') as f: data = json.load(f) @@ -208,11 +208,11 @@ def save_cache_to_disk(self) -> None: data = [] for idx, info in enumerate(self.pending_alerts): # Save frame to disk - info['frame'].save(_backup_folder.joinpath(f"pending_frame{idx}.jpg")) + info['frame'].save(self._backup_folder.joinpath(f"pending_frame{idx}.jpg")) # Save path in JSON data.append({ - "frame_path": str(_backup_folder.joinpath(f"pending_frame{idx}.jpg")), + "frame_path": str(self._backup_folder.joinpath(f"pending_frame{idx}.jpg")), "pi_zero_id": info["pi_zero_id"], "ts": info['ts'] }) @@ -224,7 +224,7 @@ def save_cache_to_disk(self) -> None: def load_cache_from_disk(self) -> None: # Read json - json_path = _backup_folder.joinpath('pending_alerts.json') + json_path = self._backup_folder.joinpath('pending_alerts.json') if json_path.is_file(): with open(json_path, 'rb') as f: data = json.load(f) From 3bc4b6b4ae602f40040010fbf8a63372c8c9889c Mon Sep 17 00:00:00 2001 From: frgfm Date: Sun, 12 Sep 2021 18:06:16 +0200 Subject: [PATCH 08/10] fix: Fixed backup location setting --- pyroengine/engine/engine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyroengine/engine/engine.py b/pyroengine/engine/engine.py index eeeb7af5..cc47a55a 100644 --- a/pyroengine/engine/engine.py +++ b/pyroengine/engine/engine.py @@ -87,10 +87,10 @@ def __init__( # Restore pending alerts cache self.pending_alerts = deque([], cache_size) + self._backup_folder = Path("data/") # with Docker, the path has to be a bind volume self.load_cache_from_disk() self.cache_backup_period = cache_backup_period self.last_cache_dump = datetime.utcnow() - self._backup_folder = Path("data/") # with Docker, the path has to be a bind volume def predict(self, frame: Image.Image, pi_zero_id: Optional[int] = None) -> float: """ run prediction on comming frame""" From 17791d3f8d53f878c96b3db20ac842919b234b5f Mon Sep 17 00:00:00 2001 From: frgfm Date: Sun, 12 Sep 2021 18:25:10 +0200 Subject: [PATCH 09/10] fix: Fixed no-API initialization --- pyroengine/engine/engine.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pyroengine/engine/engine.py b/pyroengine/engine/engine.py index cc47a55a..15f4f4a5 100644 --- a/pyroengine/engine/engine.py +++ b/pyroengine/engine/engine.py @@ -78,6 +78,7 @@ def __init__( self.ongoing_alert[pi_zero_id] = False else: self.consec_dets['-1'] = 0 + self.ongoing_alert['-1'] = 0 if self.api_url is not None: # Instantiate clients for each camera From 4f8fc836703214cf519bdfb49cdc69148d7e9319 Mon Sep 17 00:00:00 2001 From: frgfm Date: Sat, 18 Sep 2021 15:58:05 +0200 Subject: [PATCH 10/10] docs: Fixed docstring typo --- pyroengine/engine/engine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyroengine/engine/engine.py b/pyroengine/engine/engine.py index 15f4f4a5..5edaa2e8 100644 --- a/pyroengine/engine/engine.py +++ b/pyroengine/engine/engine.py @@ -27,7 +27,7 @@ class PyronearEngine: Args: detection_thresh: wildfire detection threshold in [0, 1] api_url: url of the pyronear API - client_creds: api credectials for each pizero, the dictionary should as the one in the example + client_creds: api credectials for each pizero, the dictionary should be as the one in the example frame_saving_period: Send one frame over N to the api for our dataset latitude: device latitude longitude: device longitude