Skip to content

Commit

Permalink
add server instrumentation for method calls
Browse files Browse the repository at this point in the history
  • Loading branch information
fbandersnatch committed Mar 6, 2019
1 parent 7c53f68 commit a8268aa
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 13 deletions.
8 changes: 8 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ path = "examples/hello_world/client.rs"
name = "greeter_server"
path = "examples/hello_world/server.rs"

[[example]]
name = "greeter_client_instrument"
path = "examples/hello_world_instrument/client.rs"

[[example]]
name = "greeter_server_instrument"
path = "examples/hello_world_instrument/server.rs"

[dev-dependencies]
serde_json = "1.0"
serde = "1.0"
Expand Down
54 changes: 42 additions & 12 deletions compiler/src/codegen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,24 +379,35 @@ impl<'a> MethodGen<'a> {
w.fn_def(&sig);
}

fn write_bind(&self, w: &mut CodeWriter) {
fn write_bind(&self, w: &mut CodeWriter, instrument: bool) {
let add = match self.method_type().0 {
MethodType::Unary => "add_unary_handler",
MethodType::ClientStreaming => "add_client_streaming_handler",
MethodType::ServerStreaming => "add_server_streaming_handler",
MethodType::Duplex => "add_duplex_streaming_handler",
};
w.block(
&format!(
"builder = builder.{}(&{}, move |ctx, req, resp| {{",
add,
self.const_method_name()
),
"});",
|w| {
w.write_line(&format!("instance.{}(ctx, req, resp)", self.name()));
},

let line = &format!(
"builder = builder.{}(&{}, move |ctx, req, resp| {{",
add,
self.const_method_name()
);

if instrument {
w.block(line, "});", |w| {
w.write_line("&p.before();");
w.write_line(&format!(
"let res = instance.{}(ctx, req, resp);",
self.name()
));
w.write_line("&p.after();");
w.write_line("res");
});
} else {
w.block(line, "});", |w| {
w.write_line(&format!("instance.{}(ctx, req, resp)", self.name()));
});
}
}
}

Expand Down Expand Up @@ -489,7 +500,26 @@ impl<'a> ServiceGen<'a> {
w.write_line("let mut builder = ::grpcio::ServiceBuilder::new();");
for method in &self.methods {
w.write_line("let mut instance = s.clone();");
method.write_bind(w);
method.write_bind(w, false);
}
w.write_line("builder.build()");
});

// server with instrumentation
w.write_line("");

let s = format!(
"create_instrumented_{}<S: {} + Send + Clone + 'static, P: {} + Clone + Send + Copy + 'static>(s: S, p: P) -> {}",
to_snake_case(&self.service_name()),
self.service_name(),
fq_grpc("ServerInstrumenter"),
fq_grpc("Service")
);
w.pub_fn(&s, |w| {
w.write_line("let mut builder = ::grpcio::ServiceBuilder::new();");
for method in &self.methods {
w.write_line("let mut instance = s.clone();");
method.write_bind(w, true);
}
w.write_line("builder.build()");
});
Expand Down
38 changes: 38 additions & 0 deletions examples/hello_world_instrument/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2017 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

extern crate grpcio;
extern crate grpcio_proto;
#[macro_use]
extern crate log;

#[path = "../log_util.rs"]
mod log_util;

use std::sync::Arc;

use grpcio::{ChannelBuilder, EnvBuilder};
use grpcio_proto::example::helloworld::HelloRequest;
use grpcio_proto::example::helloworld_grpc::GreeterClient;

fn main() {
let _guard = log_util::init_log(None);
let env = Arc::new(EnvBuilder::new().build());
let ch = ChannelBuilder::new(env).connect("localhost:50051");
let client = GreeterClient::new(ch);

let mut req = HelloRequest::new();
req.set_name("world".to_owned());
let reply = client.say_hello(&req).expect("rpc");
info!("Greeter received: {}", reply.get_message());
}
85 changes: 85 additions & 0 deletions examples/hello_world_instrument/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright 2017 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

extern crate futures;
extern crate grpcio;
extern crate grpcio_proto;
#[macro_use]
extern crate log;

#[path = "../log_util.rs"]
mod log_util;

use std::io::Read;
use std::sync::Arc;
use std::{io, thread};

use futures::sync::oneshot;
use futures::Future;
use grpcio::{Environment, RpcContext, ServerBuilder, UnarySink, ServerInstrumenter};

use grpcio_proto::example::helloworld::{HelloReply, HelloRequest};
use grpcio_proto::example::helloworld_grpc::{self, Greeter};

#[derive(Clone)]
struct GreeterService;

impl Greeter for GreeterService {
fn say_hello(&mut self, ctx: RpcContext, req: HelloRequest, sink: UnarySink<HelloReply>) {
let msg = format!("Hello {}", req.get_name());
let mut resp = HelloReply::new();
resp.set_message(msg);
let f = sink
.success(resp)
.map_err(move |e| error!("failed to reply {:?}: {:?}", req, e));
ctx.spawn(f)
}
}

#[derive(Copy)]
#[derive(Clone)]
struct MyServerInstrumenter;

impl ServerInstrumenter for MyServerInstrumenter {
fn before(&self) {
println!("Hello world!");
}

fn after(&self) {
println!("Goodbye.");
}
}


fn main() {
let _guard = log_util::init_log(None);
let env = Arc::new(Environment::new(1));
let service = helloworld_grpc::create_instrumented_greeter(GreeterService, MyServerInstrumenter);
let mut server = ServerBuilder::new(env)
.register_service(service)
.bind("127.0.0.1", 50_051)
.build()
.unwrap();
server.start();
for &(ref host, port) in server.bind_addrs() {
info!("listening on {}:{}", host, port);
}
let (tx, rx) = oneshot::channel();
thread::spawn(move || {
info!("Press ENTER to exit...");
let _ = io::stdin().read(&mut [0]).unwrap();
tx.send(())
});
let _ = rx.wait();
let _ = server.shutdown().wait();
}
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,4 @@ pub use crate::env::{EnvBuilder, Environment};
pub use crate::error::{Error, Result};
pub use crate::log_util::redirect_log;
pub use crate::metadata::{Metadata, MetadataBuilder, MetadataIter};
pub use crate::server::{Server, ServerBuilder, Service, ServiceBuilder, ShutdownFuture};
pub use crate::server::{Server, ServerBuilder, Service, ServiceBuilder, ShutdownFuture, ServerInstrumenter};
5 changes: 5 additions & 0 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ impl<F> Handler<F> {
}
}

pub trait ServerInstrumenter {
fn before(&self);
fn after(&self);
}

pub trait CloneableHandler: Send {
fn handle(&mut self, ctx: RpcContext, reqs: Option<MessageReader>);
fn box_clone(&self) -> Box<CloneableHandler>;
Expand Down

0 comments on commit a8268aa

Please sign in to comment.