Skip to content

Commit

Permalink
Issue 223: Fix connection leak (#231)
Browse files Browse the repository at this point in the history
  • Loading branch information
Wenqi Mou authored Apr 20, 2021
1 parent e4f7beb commit fff2b36
Show file tree
Hide file tree
Showing 30 changed files with 261 additions and 228 deletions.
7 changes: 4 additions & 3 deletions .github/workflows/python_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ jobs:
java-version: '11' # The JDK version to make available on the path.
- name: Download and Run Pravega standalone
run: |
wget https://github.com/pravega/pravega/releases/download/v0.9.0/pravega-0.9.0.tgz
tar -xzvf pravega-0.9.0.tgz
pravega-0.9.0/bin/pravega-standalone > pravega.log 2>&1 &
wget https://github.com/Tristan1900/pravega/releases/download/ip-fix/pravega-0.10.0-ip-fix.tgz
tar -xzvf pravega-0.10.0-ip-fix.tgz
pravega-0.10.0-2866.172245f-SNAPSHOT/bin/pravega-standalone > pravega.log 2>&1 &
sleep 120 && echo "Started standalone"
- name: Set up Python
uses: actions/setup-python@v2
Expand All @@ -44,6 +44,7 @@ jobs:
run: tox -c bindings/tox.ini
- name: Upload Pravega standalone logs
uses: actions/upload-artifact@v2
if: always()
with:
name: pravega-standalone-log
path: pravega.log
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pravega-client-config = {path = "./config", version = "0.1"}
async-trait = "0.1"
futures = "0.3"
snafu = "0.6"
tokio = { version = "1.1", features = ["full"] }
tokio = { version = "1", features = ["full"] }
tracing = "0.1"
tracing-futures = "0.2"
tracing-subscriber = "0.2"
Expand Down
1 change: 1 addition & 0 deletions bindings/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
//
// http://www.apache.org/licenses/LICENSE-2.0
//
#![allow(clippy::from_over_into)]

#[macro_use]
extern crate cfg_if;
Expand Down
7 changes: 6 additions & 1 deletion config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,11 @@ mod tests {
assert_eq!(
config.credentials.get_request_metadata(),
format!("{} {}", "Basic", "ABCDE")
)
);
env::remove_var("pravega_client_auth_method");
env::remove_var("pravega_client_auth_username");
env::remove_var("pravega_client_auth_password");
env::remove_var("pravega_client_auth_token");
}

#[test]
Expand All @@ -241,5 +245,6 @@ mod tests {
.build()
.unwrap();
assert_eq!(config.trustcert, "/");
env::remove_var("pravega_client_tls_cert_path");
}
}
2 changes: 1 addition & 1 deletion controller-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jsonwebtoken = "7"
serde = {version = "1.0", features = ["derive"] }

[build-dependencies]
tonic-build = "0.3"
tonic-build = "0.4"

[[bin]]
name = "controller-cli"
Expand Down
4 changes: 2 additions & 2 deletions controller-client/proto/Controller.proto
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,10 @@ message TxnStatus {

message PingTxnStatus {
enum Status {
reserved 3;
OK = 0;
LEASE_TOO_LARGE = 1;
MAX_EXECUTION_TIME_EXCEEDED = 2;
SCALE_GRACE_TIME_EXCEEDED = 3 [deprecated=true];
DISCONNECTED = 4;
COMMITTED = 5;
ABORTED = 6;
Expand Down Expand Up @@ -240,9 +240,9 @@ message TxnId {
}

message CreateTxnRequest {
reserved 3;
StreamInfo streamInfo = 1;
int64 lease = 2;
int64 scaleGracePeriod = 3 [deprecated=true];
}

message CreateTxnResponse {
Expand Down
7 changes: 1 addition & 6 deletions controller-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#![allow(clippy::multiple_crate_versions)]
#![allow(dead_code)]
#![allow(clippy::similar_names)]
#![allow(clippy::upper_case_acronyms)]

use std::result::Result as StdResult;
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -867,7 +868,6 @@ impl ControllerClientImpl {
let request = CreateTxnRequest {
stream_info: Some(StreamInfo::from(stream)),
lease: lease.as_millis() as i64,
scale_grace_period: 0,
};
let op_status: StdResult<tonic::Response<CreateTxnResponse>, tonic::Status> = self
.get_controller_client()
Expand Down Expand Up @@ -941,11 +941,6 @@ impl ControllerClientImpl {
operation: operation_name.into(),
error_msg: "Ping transaction failed, Reason:MaxExecutionTimeExceeded".into(),
}),
Status::ScaleGraceTimeExceeded => Err(ControllerError::OperationError {
can_retry: false, // do not retry.
operation: operation_name.into(),
error_msg: "Ping transaction failed, Reason:ScaleGraceTimeExceeded".into(),
}),
Status::Disconnected => Err(ControllerError::OperationError {
can_retry: false, // do not retry.
operation: operation_name.into(),
Expand Down
48 changes: 22 additions & 26 deletions controller-client/src/model_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,14 @@ impl From<TxId> for TxnId {
}
}
}

impl Into<SegmentId> for ScopedSegment {
fn into(self) -> SegmentId {
impl From<ScopedSegment> for SegmentId {
fn from(segment: ScopedSegment) -> SegmentId {
SegmentId {
stream_info: Some(StreamInfo {
scope: self.scope.name,
stream: self.stream.name,
scope: segment.scope.name,
stream: segment.stream.name,
}),
segment_id: self.segment.number,
segment_id: segment.segment.number,
}
}
}
Expand All @@ -54,12 +53,11 @@ impl<'a> From<&'a ScopedSegment> for SegmentId {
}
}
}

impl Into<StreamInfo> for ScopedStream {
fn into(self) -> StreamInfo {
impl From<ScopedStream> for StreamInfo {
fn from(stream: ScopedStream) -> StreamInfo {
StreamInfo {
scope: self.scope.name,
stream: self.stream.name,
scope: stream.scope.name,
stream: stream.stream.name,
}
}
}
Expand Down Expand Up @@ -98,30 +96,28 @@ impl<'a> From<&'a StreamConfiguration> for StreamConfig {
}
}
}

impl Into<StreamConfig> for StreamConfiguration {
fn into(self) -> StreamConfig {
impl From<StreamConfiguration> for StreamConfig {
fn from(config: StreamConfiguration) -> StreamConfig {
StreamConfig {
stream_info: Some(self.scoped_stream.into()),
stream_info: Some(config.scoped_stream.into()),
scaling_policy: Some(ScalingPolicy {
scale_type: self.scaling.scale_type as i32,
target_rate: self.scaling.target_rate,
scale_factor: self.scaling.scale_factor,
min_num_segments: self.scaling.min_num_segments,
scale_type: config.scaling.scale_type as i32,
target_rate: config.scaling.target_rate,
scale_factor: config.scaling.scale_factor,
min_num_segments: config.scaling.min_num_segments,
}),
retention_policy: Some(RetentionPolicy {
retention_type: self.retention.retention_type as i32,
retention_param: self.retention.retention_param,
retention_type: config.retention.retention_type as i32,
retention_param: config.retention.retention_param,
}),
}
}
}

impl Into<crate::controller::StreamCut> for pravega_client_shared::StreamCut {
fn into(self) -> crate::controller::StreamCut {
impl From<pravega_client_shared::StreamCut> for crate::controller::StreamCut {
fn from(cut: pravega_client_shared::StreamCut) -> crate::controller::StreamCut {
crate::controller::StreamCut {
stream_info: Some(self.scoped_stream.into()),
cut: self.segment_offset_map,
stream_info: Some(cut.scoped_stream.into()),
cut: cut.segment_offset_map,
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions integration_test/src/disconnection_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ async fn create_scope_stream(controller_client: &dyn ControllerClient) {
})
.await
.expect("create scope");
assert!(result, true);
assert!(result);

let stream_name = Stream::from("testStream".to_owned());
let request = StreamConfiguration {
Expand Down Expand Up @@ -135,7 +135,7 @@ async fn create_scope_stream(controller_client: &dyn ControllerClient) {
})
.await
.expect("create stream");
assert!(result, true);
assert!(result);
}

fn test_retry_with_unexpected_reply() {
Expand Down
1 change: 1 addition & 0 deletions integration_test/src/event_stream_reader_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub fn test_event_stream_reader(config: PravegaStandaloneServiceConfig) {
test_multiple_readers(&client_factory);
test_reader_offline(&client_factory);
test_segment_rebalance(&client_factory);
info!("test event stream reader finished");
}

fn test_read_large_events(client_factory: &ClientFactory, rt: &Runtime) {
Expand Down
66 changes: 40 additions & 26 deletions integration_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,12 @@ mod test {
fn integration_test() {
trace::init();
// metric::metric_init(PROMETHEUS_SCRAPE_PORT.parse::<SocketAddr>().unwrap());
let span = info_span!("integration test", auth = true, tls = true);
span.in_scope(|| {
info!("Running integration test");
let config = PravegaStandaloneServiceConfig::new(false, true, true);
run_tests(config);
});
info!("Running integration test");
let config = PravegaStandaloneServiceConfig::new(false, true, true);
run_tests(config);

let span = info_span!("integration test", auth = false, tls = false);
span.in_scope(|| {
info!("Running integration test");
let config = PravegaStandaloneServiceConfig::new(false, false, false);
run_tests(config);
});
let config = PravegaStandaloneServiceConfig::new(false, false, false);
run_tests(config);

// disconnection test will start its own Pravega Standalone.
disconnection_tests::disconnection_test_wrapper();
Expand All @@ -98,20 +91,41 @@ mod test {
env::set_var("pravega_client_auth_username", "admin");
env::set_var("pravega_client_auth_password", "1111_aaaa");
}
controller_tests::test_controller_apis(config.clone());

tablemap_tests::test_tablemap(config.clone());

event_stream_writer_tests::test_event_stream_writer(config.clone());

tablesynchronizer_tests::test_tablesynchronizer(config.clone());

transactional_event_stream_writer_tests::test_transactional_event_stream_writer(config.clone());

byte_stream_tests::test_byte_stream(config.clone());

event_stream_reader_tests::test_event_stream_reader(config.clone());

let span = info_span!("controller test", auth = config.auth, tls = config.tls);
span.in_scope(|| {
info!("Running controller test");
controller_tests::test_controller_apis(config.clone());
});
let span = info_span!("table map test", auth = config.auth, tls = config.tls);
span.in_scope(|| {
info!("Running table map test");
tablemap_tests::test_tablemap(config.clone());
});
let span = info_span!("event stream writer test", auth = config.auth, tls = config.tls);
span.in_scope(|| {
info!("Running event stream writer test");
event_stream_writer_tests::test_event_stream_writer(config.clone());
});
let span = info_span!("synchronizer test", auth = config.auth, tls = config.tls);
span.in_scope(|| {
info!("Running synchronizer test");
tablesynchronizer_tests::test_tablesynchronizer(config.clone());
});
let span = info_span!("transaction test", auth = config.auth, tls = config.tls);
span.in_scope(|| {
info!("Running transaction test");
transactional_event_stream_writer_tests::test_transactional_event_stream_writer(config.clone());
});
let span = info_span!("byte stream test", auth = config.auth, tls = config.tls);
span.in_scope(|| {
info!("Running byte stream test");
byte_stream_tests::test_byte_stream(config.clone());
});
let span = info_span!("event reader test", auth = config.auth, tls = config.tls);
span.in_scope(|| {
info!("Running event reader test");
event_stream_reader_tests::test_event_stream_reader(config.clone());
});
// Shut down Pravega standalone
pravega.stop().unwrap();
wait_for_standalone_with_timeout(false, 30);
Expand Down
Loading

0 comments on commit fff2b36

Please sign in to comment.