Skip to content

Commit

Permalink
refactor: error handle and add reply for event (#18)
Browse files Browse the repository at this point in the history
* feat(logger): setup logger

* refactor(event): shorter event name

* chore: optimize error handle

* refactor(bot): refactor bot define

* chore: optimize error handle

* chore: fix clippy
  • Loading branch information
fu050409 authored Oct 27, 2024
1 parent 6b6c08d commit 6c0822c
Show file tree
Hide file tree
Showing 14 changed files with 353 additions and 160 deletions.
44 changes: 39 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/aionbot-adapter-onebot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ edition = "2021"
aionbot-core = { version = "0.1.0", path = "../aionbot-core" }
anyhow = "1.0.89"
futures-util = "0.3.30"
log = "0.4.22"
serde = { version = "1.0.213", features = ["derive"] }
serde_json = "1.0.132"
tokio = { version = "1.40.0", features = ["sync"] }
Expand Down
121 changes: 121 additions & 0 deletions crates/aionbot-adapter-onebot/src/bot.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
use std::{cell::UnsafeCell, sync::Arc};

use futures_util::{SinkExt, StreamExt};
use tokio::{net::TcpStream, sync::broadcast};
use tokio_tungstenite::{tungstenite::Message, WebSocketStream};

use crate::{
event::OnebotEvent,
models::{Action, ActionParams, MessageEvent, MinimalEvent},
};

#[derive(Debug)]
pub struct BotInstance {
pub id: String,
pub ws_stream: Option<WebSocketStream<TcpStream>>,
sender: broadcast::Sender<Box<OnebotEvent>>,
}

#[derive(Debug)]
pub struct Bot {
inner: UnsafeCell<BotInstance>,
}

unsafe impl Send for Bot {}
unsafe impl Sync for Bot {}

impl Bot {
pub fn new(sender: broadcast::Sender<Box<OnebotEvent>>) -> Arc<Self> {
Arc::new(Self {
inner: UnsafeCell::new(BotInstance {
id: String::new(),
ws_stream: None,
sender,
}),
})
}

pub fn id(&self) -> &str {
unsafe { &(*self.inner.get()).id }
}

pub fn set_id(&self, id: String) {
unsafe { (*self.inner.get()).id = id }
}
pub fn set_ws_stream(&self, ws_stream: WebSocketStream<TcpStream>) {
unsafe { (*self.inner.get()).ws_stream = Some(ws_stream) }
}

pub async fn listen(self: Arc<Self>) {
let bot = unsafe { &mut (*self.inner.get()) };
if let Some(ws_stream) = &mut bot.ws_stream {
log::info!("Starting listening for messages from bot {}...", bot.id);
ws_stream
.for_each(|message| async {
if let Ok(Message::Text(message)) = message {
log::debug!("Received event message: {}", message);
match serde_json::from_str::<MinimalEvent>(&message) {
Ok(data) => {
if !data.is_message() {
log::debug!(
"Received non-message event: {}, ignored.",
message
);
return;
}
}
Err(e) => {
log::warn!("Error deserializing event minimally: {}", e);
return;
}
};
let message_event: MessageEvent = match serde_json::from_str(&message) {
Ok(data) => data,
Err(e) => {
log::error!("Error deserializing message: {}", e);
return;
}
};
let event = OnebotEvent {
plain_data: message_event,
bot: self.clone(),
};
if let Err(e) = bot.sender.send(Box::new(event)) {
log::warn!("Error sending event: {}", e);
}
} else {
log::warn!("Received non-text message: {:?}", message)
}
})
.await;
};
}

pub async fn send(&self, event: &OnebotEvent, message: &str) {
if let Some(ws_stream) = &mut unsafe { &mut (*self.inner.get()) }.ws_stream {
ws_stream
.send(Message::Text(
serde_json::to_string(&Action {
action: if event.is_private() {
"send_private_msg".to_string()
} else {
"send_group_msg".to_string()
},
params: ActionParams {
group_id: if event.is_private() {
None
} else {
event.plain_data.group_id
},
user_id: Some(event.plain_data.user_id),
message: message.to_string(),
},
echo: Some("0".to_string()),
})
.unwrap(),
))
.await
.unwrap();
}
}
}
43 changes: 30 additions & 13 deletions crates/aionbot-adapter-onebot/src/event.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::sync::Arc;

use aionbot_core::event::Event;
use anyhow::{anyhow, Result};

use crate::{models::MessageEvent, ws::Bot};
use crate::{bot::Bot, models::MessageEvent};

#[derive(Clone, Debug)]
pub struct OnebotEvent {
Expand All @@ -11,37 +12,53 @@ pub struct OnebotEvent {
}

impl Event for OnebotEvent {
fn get_type(&self) -> &str {
fn event_type(&self) -> &str {
&self.plain_data.message_type
}

fn get_content(&self) -> Box<dyn std::any::Any> {
println!("Event::get_content");
fn content(&self) -> Box<dyn std::any::Any> {
let content = self
.plain_data
.message
.iter()
.map(|segment| segment.data.text.clone())
.collect::<Vec<String>>();
let result: &str = content.join("").leak();
println!("Event::get_content result: {}", result);
Box::new(result)
}

fn get_plain_data(&self) -> Box<dyn std::any::Any> {
fn plain_data(&self) -> Box<dyn std::any::Any> {
Box::new(self.plain_data.clone())
}

fn get_emitter_id(&self) -> &str {
fn emitter_id(&self) -> &str {
self.plain_data.user_id.to_string().leak()
}

fn get_channel_id(&self) -> &str {
self.plain_data
.group_id
.expect("Channel ID is not set, this event is most likely a private message")
.to_string()
.leak()
fn channel_id(&self) -> Result<&str> {
if let Some(group_id) = self.plain_data.group_id {
Ok(group_id.to_string().leak())
} else {
Err(anyhow!(
"Group ID not found in this event, \
perhaps this is not message from channel?"
))
}
}

fn reply<'s, 'a>(
&'s self,
message: Box<dyn ToString + Send + Sync>,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>>
where
's: 'a,
{
let bot = self.bot.clone();
Box::pin(async move {
let message = message.to_string();
bot.send(self, &message).await;
Ok(())
})
}
}

Expand Down
7 changes: 5 additions & 2 deletions crates/aionbot-adapter-onebot/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub extern crate aionbot_core;

pub mod bot;
pub mod event;
pub mod models;
pub mod ws;
Expand Down Expand Up @@ -56,9 +57,8 @@ impl Runtime for OnebotRuntime {
}

async fn prepare(&mut self) -> Result<()> {
println!("Preparing Onebot runtime");
log::debug!("Preparing for Onebot runtime...");
self.onebot = Some(ws::Onebot::new().listen(Default::default()).await?);
println!("Onebot runtime prepared");
Ok(())
}

Expand All @@ -67,12 +67,15 @@ impl Runtime for OnebotRuntime {
}

async fn finalize(&mut self) -> Result<()> {
log::debug!("Finalizing Onebot runtime..");
self.receiver = Some(self.onebot.as_ref().cloned().unwrap().subscribe().await);
Ok(())
}

async fn run(&mut self) -> Result<RuntimeStatus> {
log::debug!("Waiting for Onebot runtime event loop...");
let event = self.receiver.as_mut().unwrap().recv().await?;
log::debug!("Received Onebot event of type [{}].", event.event_type());
Ok(RuntimeStatus::Event(event))
}
}
28 changes: 28 additions & 0 deletions crates/aionbot-adapter-onebot/src/models.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,30 @@
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct MinimalEvent {
pub time: i64,
pub self_id: i64,
pub post_type: String,
}

impl MinimalEvent {
pub fn is_message(&self) -> bool {
self.post_type == "message"
}

pub fn is_notice(&self) -> bool {
self.post_type == "notice"
}

pub fn is_request(&self) -> bool {
self.post_type == "request"
}

pub fn is_meta_event(&self) -> bool {
self.post_type == "meta_event"
}
}

#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Sender {
pub user_id: Option<i64>,
Expand All @@ -21,7 +46,10 @@ pub struct Anonymous {

#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct MessageData {
#[serde(default)]
pub text: String,
#[serde(default)]
pub qq: String,
}

#[derive(Serialize, Deserialize, Clone, Debug)]
Expand Down
Loading

0 comments on commit 6c0822c

Please sign in to comment.