Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
waruto210 committed Feb 1, 2023
1 parent db47a89 commit 1c45f7d
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 3 deletions.
14 changes: 11 additions & 3 deletions src/connector/src/source/nexmark/source/combined_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub fn new_combined_event(event: Event) -> CombinedEvent {
}

pub(crate) fn get_event_data_types(event_type: Option<EventType>) -> Vec<DataType> {
match event_type {
let mut fields = match event_type {
None => {
vec![
DataType::Int64,
Expand All @@ -79,7 +79,10 @@ pub(crate) fn get_event_data_types(event_type: Option<EventType>) -> Vec<DataTyp
Some(EventType::Person) => get_person_struct_type().fields,
Some(EventType::Auction) => get_auction_struct_type().fields,
Some(EventType::Bid) => get_bid_struct_type().fields,
}
};
// _row_id
fields.push(DataType::Int64);
fields
}

pub(crate) fn get_person_struct_type() -> StructType {
Expand Down Expand Up @@ -188,16 +191,21 @@ pub(crate) fn combined_event_to_row(e: CombinedEvent) -> OwnedRow {
e.bid
.map(bid_to_datum)
.map(|fields| StructValue::new(fields).into()),
// _row_id
None,
];

OwnedRow::new(fields)
}

pub(crate) fn event_to_row(e: Event) -> OwnedRow {
let fields = match e {
let mut fields = match e {
Event::Person(p) => person_to_datum(p),
Event::Auction(a) => auction_to_datum(a),
Event::Bid(b) => bid_to_datum(b),
};
// _row_id
fields.push(None);
OwnedRow::new(fields)
}

Expand Down
60 changes: 60 additions & 0 deletions src/connector/src/source/nexmark/source/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,68 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::{SystemTime, UNIX_EPOCH};

use bytes::Bytes;
use nexmark::event::Event;

use crate::source::nexmark::source::combined_event::CombinedEvent;
use crate::source::{SourceMessage, SourceMeta, SplitId};
#[derive(Clone, Debug)]
pub struct NexmarkMeta {
pub timestamp: Option<i64>,
}
#[derive(Clone, Debug)]
pub struct NexmarkMessage {
pub split_id: SplitId,
pub sequence_number: String,
pub payload: Bytes,
}

impl From<NexmarkMessage> for SourceMessage {
fn from(msg: NexmarkMessage) -> Self {
SourceMessage {
payload: Some(msg.payload),
offset: msg.sequence_number.clone(),
split_id: msg.split_id,
meta: SourceMeta::Nexmark(NexmarkMeta {
timestamp: Some(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as i64,
),
}),
}
}
}

impl NexmarkMessage {
pub fn new_single_event(split_id: SplitId, offset: u64, event: Event) -> Self {
NexmarkMessage {
split_id,
sequence_number: offset.to_string(),
payload: match &event {
Event::Person(p) => serde_json::to_string(p),
Event::Auction(a) => serde_json::to_string(a),
Event::Bid(b) => serde_json::to_string(b),
}
.unwrap()
.into(),
}
}

pub fn new_combined_event(split_id: SplitId, offset: u64, event: Event) -> Self {
let combined_event = match event {
Event::Person(p) => CombinedEvent::person(p),
Event::Auction(a) => CombinedEvent::auction(a),
Event::Bid(b) => CombinedEvent::bid(b),
};
let combined_event = serde_json::to_string(&combined_event).unwrap();
NexmarkMessage {
split_id,
sequence_number: offset.to_string(),
payload: combined_event.into(),
}
}
}

0 comments on commit 1c45f7d

Please sign in to comment.