-
Notifications
You must be signed in to change notification settings - Fork 1k
/
Copy pathserver.rs
151 lines (133 loc) · 4.65 KB
/
server.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#![allow(missing_docs)]
use prost::Message;
use std::net::SocketAddr;
use tokio::sync::oneshot;
use tokio_stream::{wrappers::TcpListenerStream, StreamExt};
use tonic::{transport::Server, Request};
use tonic_reflection::{
pb::v1::{
server_reflection_client::ServerReflectionClient,
server_reflection_request::MessageRequest, server_reflection_response::MessageResponse,
ServerReflectionRequest, ServiceResponse, FILE_DESCRIPTOR_SET,
},
server::Builder,
};
pub(crate) fn get_encoded_reflection_service_fd() -> Vec<u8> {
let mut expected = Vec::new();
prost_types::FileDescriptorSet::decode(FILE_DESCRIPTOR_SET)
.expect("decode reflection service file descriptor set")
.file[0]
.encode(&mut expected)
.expect("encode reflection service file descriptor");
expected
}
#[tokio::test]
async fn test_list_services() {
let response = make_test_reflection_request(ServerReflectionRequest {
host: "".to_string(),
message_request: Some(MessageRequest::ListServices(String::new())),
})
.await;
if let MessageResponse::ListServicesResponse(services) = response {
assert_eq!(
services.service,
vec![ServiceResponse {
name: String::from("grpc.reflection.v1.ServerReflection")
}]
);
} else {
panic!("Expected a ListServicesResponse variant");
}
}
#[tokio::test]
async fn test_file_by_filename() {
let response = make_test_reflection_request(ServerReflectionRequest {
host: "".to_string(),
message_request: Some(MessageRequest::FileByFilename(String::from(
"reflection_v1.proto",
))),
})
.await;
if let MessageResponse::FileDescriptorResponse(descriptor) = response {
let file_descriptor_proto = descriptor
.file_descriptor_proto
.first()
.expect("descriptor");
assert_eq!(
file_descriptor_proto.as_ref(),
get_encoded_reflection_service_fd()
);
} else {
panic!("Expected a FileDescriptorResponse variant");
}
}
#[tokio::test]
async fn test_file_containing_symbol() {
let response = make_test_reflection_request(ServerReflectionRequest {
host: "".to_string(),
message_request: Some(MessageRequest::FileContainingSymbol(String::from(
"grpc.reflection.v1.ServerReflection",
))),
})
.await;
if let MessageResponse::FileDescriptorResponse(descriptor) = response {
let file_descriptor_proto = descriptor
.file_descriptor_proto
.first()
.expect("descriptor");
assert_eq!(
file_descriptor_proto.as_ref(),
get_encoded_reflection_service_fd()
);
} else {
panic!("Expected a FileDescriptorResponse variant");
}
}
async fn make_test_reflection_request(request: ServerReflectionRequest) -> MessageResponse {
// Run a test server
let (shutdown_tx, shutdown_rx) = oneshot::channel();
let addr: SocketAddr = "127.0.0.1:0".parse().expect("SocketAddr parse");
let listener = tokio::net::TcpListener::bind(addr).await.expect("bind");
let local_addr = format!("http://{}", listener.local_addr().expect("local address"));
let jh = tokio::spawn(async move {
let service = Builder::configure()
.register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET)
.build_v1()
.unwrap();
Server::builder()
.add_service(service)
.serve_with_incoming_shutdown(TcpListenerStream::new(listener), async {
drop(shutdown_rx.await)
})
.await
.unwrap();
});
// Give the test server a few ms to become available
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
// Construct client and send request, extract response
let conn = tonic::transport::Endpoint::new(local_addr)
.unwrap()
.connect()
.await
.unwrap();
let mut client = ServerReflectionClient::new(conn);
let request = Request::new(tokio_stream::once(request));
let mut inbound = client
.server_reflection_info(request)
.await
.expect("request")
.into_inner();
let response = inbound
.next()
.await
.expect("steamed response")
.expect("successful response")
.message_response
.expect("some MessageResponse");
// We only expect one response per request
assert!(inbound.next().await.is_none());
// Shut down test server
shutdown_tx.send(()).expect("send shutdown");
jh.await.expect("server shutdown");
response
}