Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Prepare sc-network for ProtocolController/NotificationService #14080

Merged
merged 7 commits into from
May 11, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Updates
  • Loading branch information
altonen committed May 9, 2023
commit b42ac09600f734ce5163b382b0b6446a14bf450c
61 changes: 33 additions & 28 deletions client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,7 @@ where
Box::new(DefaultBlockAnnounceValidator)
};

let block_request_protocol_config = {
let (block_request_protocol_config, block_request_protocol_name) = {
// Allow both outgoing and incoming requests.
let (handler, protocol_config) = BlockRequestHandler::new(
&protocol_id,
Expand All @@ -816,23 +816,26 @@ where
net_config.network_config.default_peers_set.in_peers as usize +
net_config.network_config.default_peers_set.out_peers as usize,
);
let config_name = protocol_config.name.clone();
spawn_handle.spawn("block-request-handler", Some("networking"), handler.run());
protocol_config
(protocol_config, config_name)
};

let state_request_protocol_config = {
let (state_request_protocol_config, state_request_protocol_name) = {
// Allow both outgoing and incoming requests.
let (handler, protocol_config) = StateRequestHandler::new(
&protocol_id,
config.chain_spec.fork_id(),
client.clone(),
net_config.network_config.default_peers_set_num_full as usize,
);
let config_name = protocol_config.name.clone();

spawn_handle.spawn("state-request-handler", Some("networking"), handler.run());
protocol_config
(protocol_config, config_name)
};

let warp_sync_protocol_config = match warp_sync_params.as_ref() {
let (warp_sync_protocol_config, warp_request_protocol_name) = match warp_sync_params.as_ref() {
Some(WarpSyncParams::WithProvider(warp_with_provider)) => {
// Allow both outgoing and incoming requests.
let (handler, protocol_config) = WarpSyncRequestHandler::new(
Expand All @@ -845,10 +848,12 @@ where
config.chain_spec.fork_id(),
warp_with_provider.clone(),
);
let config_name = protocol_config.name.clone();

spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run());
Some(protocol_config)
(Some(protocol_config), Some(config_name))
},
_ => None,
_ => (None, None),
};

let light_client_request_protocol_config = {
Expand All @@ -862,27 +867,6 @@ where
protocol_config
};

let (tx, rx) = sc_utils::mpsc::tracing_unbounded("mpsc_syncing_engine_protocol", 100_000);
let (chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new();
let (engine, sync_service, block_announce_config) = SyncingEngine::new(
Roles::from(&config.role),
client.clone(),
config.prometheus_config.as_ref().map(|config| config.registry.clone()).as_ref(),
&net_config,
protocol_id.clone(),
&config.chain_spec.fork_id().map(ToOwned::to_owned),
block_announce_validator,
warp_sync_params,
chain_sync_network_handle,
import_queue.service(),
block_request_protocol_config.name.clone(),
state_request_protocol_config.name.clone(),
warp_sync_protocol_config.as_ref().map(|config| config.name.clone()),
rx,
)?;
let sync_service_import_queue = sync_service.clone();
let sync_service = Arc::new(sync_service);

// install request handlers to `FullNetworkConfiguration`
net_config.add_request_response_protocol(block_request_protocol_config);
net_config.add_request_response_protocol(state_request_protocol_config);
Expand Down Expand Up @@ -910,6 +894,27 @@ where
);
net_config.add_notification_protocol(transactions_handler_proto.set_config());

let (tx, rx) = sc_utils::mpsc::tracing_unbounded("mpsc_syncing_engine_protocol", 100_000);
let (chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new();
let (engine, sync_service, block_announce_config) = SyncingEngine::new(
Roles::from(&config.role),
client.clone(),
config.prometheus_config.as_ref().map(|config| config.registry.clone()).as_ref(),
&net_config,
protocol_id.clone(),
&config.chain_spec.fork_id().map(ToOwned::to_owned),
block_announce_validator,
warp_sync_params,
chain_sync_network_handle,
import_queue.service(),
block_request_protocol_name,
state_request_protocol_name,
warp_request_protocol_name,
rx,
)?;
let sync_service_import_queue = sync_service.clone();
let sync_service = Arc::new(sync_service);

let genesis_hash = client.hash(Zero::zero()).ok().flatten().expect("Genesis block exists; qed");
let network_params = sc_network::config::Params::<TBl> {
role: config.role.clone(),
Expand Down