Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Telemetry Packet Extraction #80

Merged
merged 5 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { memo } from 'react';

import InstrumentationSymbol from '../../../static/instrumentation/instrumentationSymbol.svg';
import InstrumentationSymbol from '../../../static/instrumentation/InstrumentationSymbol.svg';
import { Stack, Typography } from '@mui/material';
import { Handle, Position } from 'reactflow';

Expand Down
39 changes: 39 additions & 0 deletions services/telemetry/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions services/telemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ clap = { version = "4.3.0", features = ["derive"] }
sha1 = "0.10.6"
serde = { version = "1.0.190", features = ["derive"] }
serde_json = "1.0.107"
regex = "1.10.5"
98 changes: 68 additions & 30 deletions services/telemetry/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use std::io::prelude::*;
use std::string::FromUtf8Error;
use std::thread::spawn;
use std::collections::HashMap;
use regex::Regex;
use std::str;

use crate::data::DataPacket;

Expand All @@ -16,41 +18,58 @@ use std::thread::sleep;
/// Takes one line of a packet as a Vec<u8>
/// Updates the state with information in that line
async fn extract_packets(state_ref: Arc<Mutex<State>>, received_line: Vec<u8>) {
let line = String::from_utf8_lossy(&received_line);
let data = match str::from_utf8(&received_line) {
Ok(v) => v,
Err(e) => panic!("Invalid UTF-8: {}", e)
};
let mut packet_data: HashMap<String, String> = HashMap::new();

let words: Vec<&str> = line.split_whitespace().collect();
let callsign_regex = Regex::new(r"([A-Z0-9]+)-\d").unwrap();
let lat_long_regex = Regex::new(r"(\d{4}\.\d{2}N)/(\d{5}\.\d{2}W)").unwrap();
let altitude_regex = Regex::new(r"/A=(\d{6})\*").unwrap();

// Search for the word "alt" in the words
if let Some(alt_index) = words.iter().position(|&word| word == "alt") {
// Extract numbers after "alt" until a non-number is hit
let mut altitude = String::new();
for word in &words[(alt_index + 1)..] {
if let Some(_) = word.chars().next().unwrap().to_digit(10) {
altitude.push_str(word);
altitude.push(' ');
} else {
break;
}
}
let mut packet_data: HashMap<String, String> = HashMap::new();
packet_data.insert("altitude".to_string(), altitude.to_string());
let packet_id: u64 = match state_ref.lock().unwrap().last_packet_id {
Some(id) => id.checked_add(1).unwrap_or(1), // Add 1 if possible, otherwise set to 1
None => 1, // If None, set to 1
};
packet_id.checked_add(1);
if let Some(callsign_caps) = callsign_regex.captures(data) {
let callsign = &callsign_caps[1];
packet_data.insert("callsign".to_string(), callsign.to_string());
println!("Callsign: {}", callsign);
} else {
packet_data.insert("callsign".to_string(), "".to_string());
println!("Failed to get callsign");
}

if let Some(lat_long_caps) = lat_long_regex.captures(data) {
let latitude = &lat_long_caps[1];
let longitude = &lat_long_caps[2];
packet_data.insert("latitude".to_string(), latitude.to_string());
packet_data.insert("longitude".to_string(), longitude.to_string());
println!("Latitude: {}", latitude);
println!("Longitude: {}", longitude);
} else {
packet_data.insert("latitude".to_string(), "".to_string());
packet_data.insert("longitude".to_string(), "".to_string());
println!("Failed to get lat long")
}

let mut packet = DataPacket{
id: packet_id, data: packet_data
};
state_ref.lock().unwrap().append_packet(packet);
if let Some(altitude_caps) = altitude_regex.captures(data) {
let altitude = &altitude_caps[1];
packet_data.insert("altitude".to_string(), altitude.to_string());
println!("Altitude: {}", altitude);
} else {
// prints here and above are for logging
println!("{}", line);
packet_data.insert("altitude".to_string(), "".to_string());
println!("Failed to get altitude");
}
}

let packet_id: u64 = match state_ref.lock().unwrap().last_packet_id {
Some(id) => id.checked_add(1).unwrap_or(1), // Add 1 if possible, otherwise set to 1
None => 1, // If None, set to 1
};
packet_id.checked_add(1);

let mut packet = DataPacket{
id: packet_id, data: packet_data
};
state_ref.lock().unwrap().append_packet(packet);
}
/// starts telemetry by calling decode.sh which in turn calls linux direwolf and rtl_fm binaries
///
/// calls extract_packets when new data received from rocket
Expand All @@ -67,11 +86,30 @@ pub async fn start_telemetry(state_ref: Arc<Mutex<State>>) -> io::Result<()> {

let mut child_out = BufReader::new(decode_aprs.stdout.as_mut().unwrap());
let mut readbuf = vec![0; 256];
let mut buffer = Vec::new();

loop {
// read() blocks until data is available. This makes this not polling.
let mut b = child_out.read(&mut readbuf);
extract_packets(Arc::clone(&state_ref), readbuf.clone()).await;
let mut bytes_read = child_out.read(&mut readbuf);
match bytes_read {
Ok(value) => {
let int_bytes_read: i32 = value as i32;
if (int_bytes_read > 0) {
buffer.extend_from_slice(&readbuf[..value]);
// Check for two successive newlines
if let Some(pos) = buffer.windows(2).position(|window| window == b"\n\n") {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when I run the code it couldn't find two consecutive new lines. We should test this with the brb on Thursday

I did this before the if statement and it worked fine with the mock data

 extract_packets(Arc::clone(&state_ref), buffer.clone()).await;
 buffer.clear();

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JackCotter Could we add a Locked attribute to the packet so we can more easily tell on the frontend if we've got a lock or not

// Extract the data before the two newlines
let data_to_process = buffer[..pos].to_vec();
extract_packets(Arc::clone(&state_ref), data_to_process).await;
// Remove the processed data and the two newlines from the buffer
buffer.drain(..pos + 2);
}
}
}
Err(e) => {
println!("An error occurred: {}", e);
}
}
}
Ok(())
}