Skip to content

Commit ebebab3

Browse files
committed
Asyc message handler
1 parent 7520fe2 commit ebebab3

File tree

4 files changed

+141
-156
lines changed

4 files changed

+141
-156
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ edition = "2021"
88
[dependencies]
99
base64 = "0.22.0"
1010
hudsucker = { git = "https://github.com/omjadas/hudsucker.git" }
11+
once_cell = "1.19.0"
1112
prost-reflect = { version = "0.13.1", features = ["serde"] }
1213
reqwest = { version = "0.12.3", features = ["json"] }
1314
serde = { version = "1.0.198", features = ["derive"] }

src/main.rs

+129-148
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,16 @@ use hudsucker::{
55
tokio_tungstenite::tungstenite::Message,
66
*,
77
};
8+
use once_cell::sync::Lazy;
89
use prost_reflect::{DynamicMessage, SerializeOptions, Value};
910
use serde_json::{json, Map, Value as JsonValue};
10-
use std::{
11-
error::Error,
12-
future::Future,
13-
io::Read,
14-
sync::{Arc, Mutex},
15-
};
11+
use std::{error::Error, future::Future, sync::Mutex};
1612
use std::{format, net::SocketAddr};
1713
use tracing::*;
1814
mod parser;
1915
mod settings;
20-
use parser::{Action, LiqiMessage};
16+
use parser::{Action, LiqiMessage, Parser};
17+
use settings::Settings;
2118

2219
use crate::parser::my_serialize;
2320

@@ -28,157 +25,162 @@ async fn shutdown_signal() {
2825
}
2926

3027
#[derive(Clone)]
31-
struct ActionHandler {
32-
parser: Arc<Mutex<parser::Parser>>,
33-
settings: Arc<settings::Settings>,
34-
client: reqwest::Client,
35-
}
28+
struct ActionHandler;
3629

3730
pub const SERIALIZE_OPTIONS: SerializeOptions = SerializeOptions::new()
3831
.skip_default_fields(false)
3932
.use_proto_field_name(true);
4033

4134
pub const RANDOM_MD5: &str = "0123456789abcdef0123456789abcdef";
4235

36+
static PARSER: Mutex<Lazy<Parser>> = Mutex::new(Lazy::<Parser, _>::new(Parser::new));
37+
static CLIENT: Lazy<reqwest::Client> = Lazy::new(|| {
38+
reqwest::ClientBuilder::new()
39+
.danger_accept_invalid_certs(true)
40+
.build()
41+
.expect("Failed to create reqwest client")
42+
});
43+
static SETTINGS: Lazy<Settings> = Lazy::new(Settings::new);
44+
4345
impl WebSocketHandler for ActionHandler {
4446
async fn handle_message(&mut self, _ctx: &WebSocketContext, msg: Message) -> Option<Message> {
4547
let direction_char = match _ctx {
4648
WebSocketContext::ClientToServer { .. } => '\u{2191}',
4749
WebSocketContext::ServerToClient { .. } => '\u{2193}',
4850
};
49-
if let Message::Binary(buf) = &msg {
50-
// convert binary message to hex string
51-
let hex = buf
52-
.iter()
53-
.map(|b| {
54-
if *b >= 0x20 && *b <= 0x7e {
55-
format!("{}", *b as char)
56-
} else {
57-
format!("{:02x} ", b)
51+
let msg_clone = msg.clone();
52+
tokio::spawn(async move {
53+
if let Message::Binary(buf) = msg_clone {
54+
// convert binary message to hex string
55+
let hex = buf
56+
.iter()
57+
.map(|b| {
58+
if *b >= 0x20 && *b <= 0x7e {
59+
format!("{}", *b as char)
60+
} else {
61+
format!("{:02x} ", b)
62+
}
63+
})
64+
.collect::<String>();
65+
debug!("{} {}", direction_char, hex);
66+
let mut parser = PARSER.lock().unwrap();
67+
let parsed = parser.parse(&buf);
68+
let parsed = match parsed {
69+
Ok(parsed) => parsed,
70+
Err(e) => {
71+
error!("Failed to parse message: {:?}", e);
72+
return;
5873
}
59-
})
60-
.collect::<String>();
61-
debug!("{} {}", direction_char, hex);
62-
let mut parser = self.parser.lock().unwrap();
63-
let parsed = parser.parse(buf);
64-
let parsed = match parsed {
65-
Ok(parsed) => parsed,
66-
Err(e) => {
67-
error!("Failed to parse message: {:?}", e);
68-
return Some(msg);
74+
};
75+
info!(
76+
"监听到: {}, {}, {:?}, {}",
77+
direction_char, parsed.id, parsed.msg_type, parsed.method_name
78+
);
79+
if direction_char == '\u{2193}' {
80+
return;
81+
}
82+
if let Err(e) = send_message(parsed) {
83+
error!("Failed to send message: {:?}", e);
6984
}
70-
};
71-
info!(
72-
"监听到: {}, {}, {:?}, {}",
73-
direction_char, parsed.id, parsed.msg_type, parsed.method_name
74-
);
75-
if direction_char == '\u{2193}' {
76-
return Some(msg);
77-
}
78-
if let Err(e) = self.send_message(parsed) {
79-
error!("Failed to send message: {:?}", e);
8085
}
81-
}
86+
});
8287
Some(msg)
8388
}
8489
}
8590

86-
impl ActionHandler {
87-
fn send_message(&self, mut parsed: LiqiMessage) -> Result<(), Box<dyn Error>> {
88-
let settings = self.settings.clone();
89-
let json_data: JsonValue;
90-
if !settings.is_method(&parsed.method_name) {
91+
fn send_message(mut parsed: LiqiMessage) -> Result<(), Box<dyn Error>> {
92+
let json_data: JsonValue;
93+
if !SETTINGS.is_method(&parsed.method_name) {
94+
return Ok(());
95+
}
96+
if parsed.method_name == ".lq.ActionPrototype" {
97+
let name = parsed
98+
.data
99+
.get("name")
100+
.ok_or("No name field")?
101+
.as_str()
102+
.ok_or("name is not a string")?
103+
.to_owned();
104+
if !SETTINGS.is_action(&name) {
91105
return Ok(());
92106
}
93-
if parsed.method_name == ".lq.ActionPrototype" {
94-
let name = parsed
95-
.data
107+
let data = parsed.data.get_mut("data").ok_or("No data field")?;
108+
if name == "ActionNewRound" {
109+
data.as_object_mut()
110+
.ok_or("data is not an object")?
111+
.insert("md5".to_string(), json!(RANDOM_MD5));
112+
}
113+
json_data = data.take();
114+
} else if parsed.method_name == ".lq.FastTest.syncGame" {
115+
let game_restore = parsed
116+
.data
117+
.get("game_restore")
118+
.ok_or("No game_restore field")?
119+
.get("actions")
120+
.ok_or("No actions field")?
121+
.as_array()
122+
.ok_or("actions is not an array")?;
123+
let mut actions: Vec<Action> = vec![];
124+
for item in game_restore.iter() {
125+
let action_name = item
96126
.get("name")
97127
.ok_or("No name field")?
98128
.as_str()
99-
.ok_or("name is not a string")?
100-
.to_owned();
101-
if !settings.is_action(&name) {
102-
return Ok(());
103-
}
104-
let data = parsed.data.get_mut("data").ok_or("No data field")?;
105-
if name == "ActionNewRound" {
106-
data.as_object_mut()
107-
.ok_or("data is not an object")?
108-
.insert("md5".to_string(), json!(RANDOM_MD5));
109-
}
110-
json_data = data.take();
111-
} else if parsed.method_name == ".lq.FastTest.syncGame" {
112-
let game_restore = parsed
113-
.data
114-
.get("game_restore")
115-
.ok_or("No game_restore field")?
116-
.get("actions")
117-
.ok_or("No actions field")?
118-
.as_array()
119-
.ok_or("actions is not an array")?;
120-
let mut actions: Vec<Action> = vec![];
121-
for item in game_restore.iter() {
122-
let action_name = item
123-
.get("name")
124-
.ok_or("No name field")?
125-
.as_str()
126-
.ok_or("name is not a string")?;
127-
let action_data = item
128-
.get("data")
129-
.ok_or("No data field")?
130-
.as_str()
131-
.unwrap_or("data is not a string");
132-
if action_data.is_empty() {
133-
let action = Action {
134-
name: action_name.to_string(),
135-
data: JsonValue::Object(Map::new()),
136-
};
137-
actions.push(action);
138-
} else {
139-
let b64 = BASE64_STANDARD.decode(action_data)?;
140-
let parser = self.parser.lock().unwrap();
141-
let action_type = parser
142-
.pool
143-
.get_message_by_name(action_name)
144-
.ok_or("Invalid action type")?;
145-
let mut action_obj = DynamicMessage::decode(action_type, b64.as_ref())?;
146-
if action_name == ".lq.ActionNewRound" {
147-
action_obj.set_field_by_name("md5", Value::String(RANDOM_MD5.to_string()));
148-
}
149-
let value: JsonValue = my_serialize(action_obj)?;
150-
let action = Action {
151-
name: action_name.to_string(),
152-
data: value,
153-
};
154-
actions.push(action);
129+
.ok_or("name is not a string")?;
130+
let action_data = item
131+
.get("data")
132+
.ok_or("No data field")?
133+
.as_str()
134+
.unwrap_or("data is not a string");
135+
if action_data.is_empty() {
136+
let action = Action {
137+
name: action_name.to_string(),
138+
data: JsonValue::Object(Map::new()),
139+
};
140+
actions.push(action);
141+
} else {
142+
let b64 = BASE64_STANDARD.decode(action_data)?;
143+
let parser = PARSER.lock().unwrap();
144+
let action_type = parser
145+
.pool
146+
.get_message_by_name(action_name)
147+
.ok_or("Invalid action type")?;
148+
let mut action_obj = DynamicMessage::decode(action_type, b64.as_ref())?;
149+
if action_name == ".lq.ActionNewRound" {
150+
action_obj.set_field_by_name("md5", Value::String(RANDOM_MD5.to_string()));
155151
}
152+
let value: JsonValue = my_serialize(action_obj)?;
153+
let action = Action {
154+
name: action_name.to_string(),
155+
data: value,
156+
};
157+
actions.push(action);
156158
}
157-
let mut map = Map::new();
158-
map.insert(
159-
"sync_game_actions".to_string(),
160-
serde_json::to_value(actions)?,
161-
);
162-
json_data = JsonValue::Object(map);
163-
} else {
164-
json_data = parsed.data;
165159
}
160+
let mut map = Map::new();
161+
map.insert(
162+
"sync_game_actions".to_string(),
163+
serde_json::to_value(actions)?,
164+
);
165+
json_data = JsonValue::Object(map);
166+
} else {
167+
json_data = parsed.data;
168+
}
166169

167-
// post data to API, no verification
168-
let client = self.client.clone();
169-
let future = client.post(&settings.api_url).json(&json_data).send();
170-
171-
handle_future(future);
172-
info!("已发送: {}", json_data);
170+
// post data to API, no verification
171+
let client = CLIENT.clone();
172+
let future = client.post(&SETTINGS.api_url).json(&json_data).send();
173173

174-
if let Some(liqi_data) = json_data.get("liqi") {
175-
let res = client.post(&settings.api_url).json(liqi_data).send();
176-
handle_future(res);
177-
info!("已发送: {:?}", liqi_data);
178-
}
174+
handle_future(future);
175+
info!("已发送: {}", json_data);
179176

180-
Ok(())
177+
if let Some(liqi_data) = json_data.get("liqi") {
178+
let res = client.post(&SETTINGS.api_url).json(liqi_data).send();
179+
handle_future(res);
180+
info!("已发送: {:?}", liqi_data);
181181
}
182+
183+
Ok(())
182184
}
183185

184186
fn handle_future(
@@ -221,33 +223,12 @@ async fn main() {
221223
请遵守当地法律法规,对于使用本程序所产生的任何后果,作者概不负责!
222224
\x1b[0m"
223225
);
224-
let parser = parser::Parser::new();
225-
let settings = settings::Settings::new();
226-
let settings = match settings {
227-
Ok(settings) => settings,
228-
Err(e) => {
229-
error!("{}", e);
230-
// press any key to exit
231-
println!("按任意键退出");
232-
let mut stdin = std::io::stdin();
233-
let _ = stdin.read(&mut [0u8]).unwrap_or_default();
234-
return;
235-
}
236-
};
237-
let client = reqwest::ClientBuilder::new()
238-
.danger_accept_invalid_certs(true)
239-
.build()
240-
.expect("Failed to create reqwest client");
241226

242227
let proxy = Proxy::builder()
243228
.with_addr(SocketAddr::from(([127, 0, 0, 1], 23410)))
244229
.with_rustls_client()
245230
.with_ca(ca)
246-
.with_websocket_handler(ActionHandler {
247-
parser: Arc::new(Mutex::new(parser)),
248-
settings: Arc::new(settings),
249-
client,
250-
})
231+
.with_websocket_handler(ActionHandler)
251232
.with_graceful_shutdown(shutdown_signal())
252233
.build();
253234

src/settings.rs

+10-8
Original file line numberDiff line numberDiff line change
@@ -17,27 +17,29 @@ pub struct Settings {
1717
}
1818

1919
impl Settings {
20-
pub fn new() -> Result<Self, Box<dyn std::error::Error>> {
21-
let cur_exe =
22-
std::env::current_exe().map_err(|e| format!("无法获取当前程序路径: {}", e))?;
20+
pub fn new() -> Self {
21+
let cur_exe = std::env::current_exe()
22+
.expect("无法获取当前程序路径")
23+
.canonicalize()
24+
.expect("无法获取当前程序路径的绝对路径");
2325
let exe_dir = cur_exe
2426
.parent()
25-
.ok_or("无法获取当前程序路径")?
27+
.expect("无法获取当前程序路径的父目录")
2628
.to_str()
27-
.ok_or("无法转换路径为字符串")?;
29+
.expect("无法将目录转换为UTF-8字符串");
2830
// read settings from file
2931
let settings = std::fs::read_to_string(std::path::Path::new(exe_dir).join("settings.json"))
3032
.or_else(
3133
// read pwd
3234
|_| std::fs::read_to_string("settings.json"),
3335
)
34-
.map_err(|e| format!("无法读取settings.json: {}", e))?;
36+
.expect("无法读取settings.json");
3537
let mut settings: Settings =
36-
serde_json::from_str(&settings).map_err(|e| format!("无法解析settings.json: {}", e))?;
38+
serde_json::from_str(&settings).expect("无法解析settings.json");
3739
info!("已载入配置: {:?}", settings);
3840
settings.methods_set = settings.send_method.iter().cloned().collect();
3941
settings.actions_set = settings.send_action.iter().cloned().collect();
40-
Ok(settings)
42+
settings
4143
}
4244

4345
pub fn is_method(&self, method: &str) -> bool {

0 commit comments

Comments
 (0)