diff --git a/build/build.sh b/build/build.sh index da619d0..c5b5878 100755 --- a/build/build.sh +++ b/build/build.sh @@ -7,10 +7,13 @@ echo "###################################" sudo apt update && sudo apt install -y software-properties-common && sudo add-apt-repository -y ppa:deadsnakes/ppa && - sudo apt install -y python3.7 python3.7-dev python3-dev python3-pip ansible git aptitude + sudo apt install -y python3.7 python3.7-dev python3-dev python3-pip ansible git aptitude cpanminus sudo pip3 install setuptools +# used by report generator to parse DOT file to ascii art +sudo cpanm Graph::Easy + echo "###################################" echo "Installing Umbra" echo "###################################" @@ -38,4 +41,10 @@ cd .. sudo python3.7 setup.py install cd .. +echo "##################################################" +echo "Setup dockprom v3.17.1 for UI monitoring dashboard" +echo "##################################################" + +git clone -b v3.17.1 https://github.com/stefanprodan/dockprom.git + sudo usermod -aG docker $USER diff --git a/docs/imgs/example_fabric_topology.JPG b/docs/imgs/example_fabric_topology.JPG new file mode 100644 index 0000000..93bc1ff Binary files /dev/null and b/docs/imgs/example_fabric_topology.JPG differ diff --git a/docs/imgs/grafana_dashboard.JPG b/docs/imgs/grafana_dashboard.JPG new file mode 100644 index 0000000..16253ca Binary files /dev/null and b/docs/imgs/grafana_dashboard.JPG differ diff --git a/docs/source/architecture.rst b/docs/source/architecture.rst index 54ced06..119d6dd 100644 --- a/docs/source/architecture.rst +++ b/docs/source/architecture.rst @@ -14,8 +14,10 @@ Umbra was designed having in mind the following guidance: Components ********** -Umbra has three independent components: +Umbra has five independent components: * design: defines APIs to implement the topology and events that will be respectively deployed and triggered when testing a blockchain platform. -* broker: main component, responsible for the orchestration and management of the scenario (topology and events) +* broker: main component, responsible for the orchestration and management of the scenario (topology and events) * scenario: the actual interface that deploys the topology (i.e. network, containers, virtual switches) +* agent: runs as one of the "peer" in the blockchain network. It can be used to generate stimulus to the network like interrupting a running blockchain transaction (via ``iperf``) and replay packets (via ``tcpreplay``). +* monitor: runs on the host machine. Used to monitor the status/metrics of both host and containers diff --git a/docs/source/example.rst b/docs/source/example.rst index 724f057..0372cf3 100644 --- a/docs/source/example.rst +++ b/docs/source/example.rst @@ -4,6 +4,7 @@ Example To examplify how Umbra can be utilized, an extension was coded to support Hyperledger Fabric v1.4. The details of how that was coded are exposed in the Extensions section. Here is just the explanation of what the Fabric example realizes. +.. image:: /imgs/imgs/example_fabric_topology.JPG.JPG Building ******** @@ -47,11 +48,11 @@ The executable file named run.sh in the umbra/examples/ folder contains the comm Having that executed, all the instantiation of components from the saved FabricTopology will take place (i.e., peers, orderers, links, etc) and events will be called on them. -The commands bellow respectivelly start and stop the example experiment with Fabric. +The commands below respectively start and stop the example experiment with Fabric. .. code-block:: bash - $ sudo -H ./run.sh start -c ./fabric/fabric_configs/config_fabric_simple.json + $ sudo -H ./run.sh start -c ./fabric/fabric_configs/Fabric-Simple-01.json $ sudo -H ./run.sh stop @@ -72,4 +73,133 @@ Modifying ********* To modify the Fabric experiment, it is just needed to modify the build_configs.py file, changing how the FabricTopology instance is built, besides changing how the events are scheduled. -If new orgs are added, the file fabric.py inside base_configtx folder needs to be modified accordingly to define the policies in configtx needed for the creation of the Fabric requirements. \ No newline at end of file +If new orgs are added, the file fabric.py inside base_configtx folder needs to be modified accordingly to define the policies in configtx needed for the creation of the Fabric requirements. + + +Changing environment during runtime +*********************************** + +We will use the broker's environment plugin (``umbra/broker/plugins/env.py``) to generate event that modifies the environment behavior. Refer `build_configs `_. + +Following are example usecases: + +.. code-block:: python + + # 1. Kill a container + ev_kill_container = { + "command": "environment_event", + "target_node": , # e.g. "peer0.org2.example.com" + "action": "kill_container", + "action_args": {}, + } + + # 2. Set mem limit + ev_mem_limit_peer1_org1 = { + "command": "environment_event", + "action": "update_memory_limit", + "action_args": { + "mem_limit": , # e.g. 256000000 for 256MB memory + "memswap_limit": -1 + }, + "target_node": , # e.g. "peer0.org2.example.com" + } + + # 3. Set cpu limit. More info at + # https://docs.docker.com/config/containers/resource_constraints/#cpu + # https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt + ev_cpu_limit_peer1_org2 = { + "command": "environment_event", + "target_node": , # e.g. "peer0.org2.example.com" + "action": "update_cpu_limit", + "action_args": { # refer Docker docs for these values + "cpu_quota": 10000, + "cpu_period": 50000, + "cpu_shares": -1, + "cores": {} + }, + } + + # 4. Update link resources + # Here, we change the resources of s0<-->peer1.org1 interface + # to bandwidth of 3Mbps, with 4ms delay, and packet loss rate of 10% + ev_update_link_res = { + "command": "environment_event", + "action": "update_link", + "action_args": { + "events": [ + { + "group": "links", + "specs": { + "action": "update", + "online": True, + "resources": { + "bw": 3, # Mbps + "delay": "4ms", + "loss": 10, # + } + }, + "targets": ("s0", "peer1.org1.example.com") + }, + ] + }, + } + + # 5. Change link state, e.g. UP or DOWN + # Beginning of test all link should be up. + # Set the "online" key to either True or False + # Example below set the orderer interface to DOWN + ev_update_link_orderer_down = { + "command": "environment_event", + "action": "update_link", + "action_args": { + "events": [ + { + "group": "links", + "specs": { + "action": "update", + "online": , # True=UP, False=DOWN + "resources": None + }, + "targets": ("s0", "orderer.example.com") + }, + ] + }, + } + + +Creating stimulus in the network via agent +****************************************** + +`umbra-agent` currently supports `ping` to send ICMP echo, `iperf` to simulate heavy traffic, and `tcpreplay` to replay pcap packet. Only `ping` is tested thus far, other examples coming soon. + +.. code-block:: python + + # Ping peer0.org1.example.com at 1 packet per second + # for 4 seconds + ev_agent_ping_peer0org1 = { + "agent_name": agent_name, + "id": "100", + "actions": [ + { + 'id': "1", + "tool": "ping", + "output": { + "live": False, + "address": None, + }, + 'parameters': { + "target": , # e.g. "peer0.org1.example.com" + "interval": "1", + "duration": "4", + }, + 'schedule': { + "from": 1, + "until": 0, + "duration": 0, + "interval": 0, + "repeat": 0 + }, + }, + ], + } + diff --git a/docs/source/starting.rst b/docs/source/starting.rst index 2f10e4d..baafe7e 100644 --- a/docs/source/starting.rst +++ b/docs/source/starting.rst @@ -1,3 +1,5 @@ +.. sectnum:: + Getting Started =============== @@ -11,7 +13,7 @@ Umbra works on Ubuntu 18.04. To run the getting started example it is recommended a machine with Ubuntu 18.04 installed and containing 4 vCPUs (cores), 4 GB of memory, and at least 5GB of disk available. -1. Install the Main Components +Install the Main Components ****************************** Umbra contains 3 python components (design, broker, scenario), the build script below installs the requirements and the components themselves. @@ -32,7 +34,7 @@ Umbra contains 3 python components (design, broker, scenario), the build script Please note, the script above (build.sh) install docker-ce and adds the $USER to the docker group (no need to user sudo before the docker command). To enable this setting, logout and login again, or execute `su $USER` (to start a new session). You can test if it works simply running `docker ps -a`. -2. Install the Fabric Requirements +Install the Fabric Requirements ********************************** As Umbra is plugin oriented (i.e., each Hyperledger project needs its own umbra-plugin), the build_fabric script below installs all the Fabric (v1.4) components needed to run the Umbra Fabric plugin. @@ -48,7 +50,7 @@ As Umbra is plugin oriented (i.e., each Hyperledger project needs its own umbra- $ cd - -3. Create the Fabric Configs +Create the Fabric Configs **************************** The build_configs script below creates the config files for the Fabric scenario. @@ -63,21 +65,32 @@ Open this file to see what is the scenario created, the topology and its events. $ cd - -4. Run the Test +Run the Test *************** +**Optional Grafana Dashboard UI**: Skip this step if not needed. Before running the test, start the monitoring stack first using the command below. + +.. code-block:: bash + + $ cd build/dockprom + $ docker-compose up -d + # If the command is succesful, then the dashboard should be up by now. + # Open a browser and point to :3000 to view the grafana dashboard. + +.. image:: /imgs/grafana_dashboard.JPG + The run.sh script below executes the Fabric scenario (topology and events). In order to run the Mininet, a sudo password will be asked to run the Umbra scenario component. +To run the test with more debug log, add (``-d``) flag. This is required to generate a proper report when stopping the simulation. + .. code-block:: bash $ cd umbra/examples/ - - $ ./run.sh start -c ./fabric/fabric_configs/config_fabric_simple.json + $ ./run.sh start -c ./fabric/fabric_configs/Fabric-Simple-01.json <-d> - -4. Check the Test Logs +Check the Test Logs ********************** As the broker and scenario components save logs during their execution, they can be seen by the commands below. @@ -89,11 +102,12 @@ As the broker and scenario components save logs during their execution, they can $ tail -f logs/scenario.log -4. Stop the Test +Stop the Test **************** -The command below stops all the Umbra processes and clean their breadcrumbs. +The command below stops all the Umbra processes and clean their breadcrumbs. To generate the simulation report, add -r flag. Note that to generate a proper report, the simulation needs to run in debug mode (``-d``). Refer the previous section "Run the Test". .. code-block:: bash - $ ./run.sh stop \ No newline at end of file + $ ./run.sh stop <-r> + diff --git a/docs/source/workflow.rst b/docs/source/workflow.rst index dcb1a15..caddb7c 100644 --- a/docs/source/workflow.rst +++ b/docs/source/workflow.rst @@ -77,4 +77,6 @@ Directory Structure * **umbra/common**: contains all the common application models (i.e., protocol buffer messages and services) implemented to be used by the other Umbra components. * **umbra/broker**: contains all the component source code to install and run the orchestration logic of umbra, i.e., how to receive a scenario configuration request, deploy it and trigger the events programmed in the scenario logic. The executable umbra-broker contains plugins, each one specified for a different blockchain platform it supports. * **umbra/design**: contains all the component source code to install and enable APIs for the configuration logic of umbra, i.e., how to specify different topology APIs to build the configuration needed for each blockchain platform to be executed by umbra-broker. -* **umbra/scenario**: contains all the component source code to install and run the plugin that enables Containernet to deploy the topology needed to execute a blockchain platform. \ No newline at end of file +* **umbra/scenario**: contains all the component source code to install and run the plugin that enables Containernet to deploy the topology needed to execute a blockchain platform. +* **umbra/agent**: contains source code related to umbra-agent executable +* **umbra/monitor**: contains source code related to umbra-monitor executable diff --git a/examples/fabric/build_configs.py b/examples/fabric/build_configs.py index b5d6b3c..3911055 100644 --- a/examples/fabric/build_configs.py +++ b/examples/fabric/build_configs.py @@ -246,75 +246,32 @@ def build_simple_fabric_cfg(): "chaincode_args": ['b'], } - ev_kill_container_peer0_org1 = { - "command": "environment_event", - "args": { - "node_name": "peer0.org1.example.com", - "action": "kill_container", - "action_args": {}, - }, - "schedule": { - "from": 4, # run on the 4th second, after ev_create_channel - "until": 0, - "duration": 0, - "interval": 0, - "repeat": 0 - }, - } - ev_kill_container_peer0_org2 = { "command": "environment_event", - "args": { - "node_name": "peer0.org2.example.com", - "action": "kill_container", - "action_args": {}, - }, - "schedule": { - "from": 4, # run on the 4th second, after ev_create_channel - "until": 0, - "duration": 0, - "interval": 0, - "repeat": 0 - }, + "target_node": "peer0.org2.example.com", + "action": "kill_container", + "action_args": {}, } ev_mem_limit_peer1_org1 = { "command": "environment_event", - "args": { - "action": "update_memory_limit", - "action_args": { - "mem_limit": 256000000, - "memswap_limit": -1 - }, - "node_name": "peer1.org1.example.com" - }, - "schedule": { - "from": 4, # run on the 4th second, after ev_create_channel - "until": 0, - "duration": 0, - "interval": 0, - "repeat": 0 + "action": "update_memory_limit", + "action_args": { + "mem_limit": 256000000, + "memswap_limit": -1 }, + "target_node": "peer1.org1.example.com", } ev_cpu_limit_peer1_org2 = { "command": "environment_event", - "args": { - "action": "update_cpu_limit", - "action_args": { - "cpu_quota": 10000, - "cpu_period": 50000, - "cpu_shares": -1, - "cores": {} - }, - "node_name": "peer1.org2.example.com" - }, - "schedule": { - "from": 4, # run on the 4th second, after ev_create_channel - "until": 0, - "duration": 0, - "interval": 0, - "repeat": 0 + "target_node": "peer1.org2.example.com", + "action": "update_cpu_limit", + "action_args": { + "cpu_quota": 10000, + "cpu_period": 50000, + "cpu_shares": -1, + "cores": {} }, } @@ -327,111 +284,126 @@ def build_simple_fabric_cfg(): qdisc htb 5: root refcnt 2 r2q 10 default 1 direct_packets_stat 0 direct_qlen 1000 qdisc netem 10: parent 5:1 limit 1000 delay 4.0ms loss 10% """ - ev_update_link = { + ev_update_link_res = { "command": "environment_event", - "args": { - "action": "update_link", - "action_args": { - "events": [ - { - "group": "links", - "specs": { - "action": "update", - "online": True, - "resources": { - "bw": 3, - "delay": "4ms", - "loss": 10, - } - }, - "targets": ("s0", "peer1.org1.example.com") + "action": "update_link", + "action_args": { + "events": [ + { + "group": "links", + "specs": { + "action": "update", + "online": True, + "resources": { + "bw": 3, + "delay": "4ms", + "loss": 10, + } }, - { - "group": "links", - "specs": { - "action": "update", - "online": True, - "resources": { - "bw": 3, - "delay": "4ms", - "loss": 10, - } - }, - "targets": ("s0", "peer0.org3.example.com") + "targets": ("s0", "peer1.org1.example.com") + }, + { + "group": "links", + "specs": { + "action": "update", + "online": True, + "resources": { + "bw": 3, + "delay": "4ms", + "loss": 10, + } }, - ] - }, - }, - "schedule": { - "from": 6, - "until": 0, - "duration": 0, - "interval": 0, - "repeat": 0 + "targets": ("s0", "peer0.org3.example.com") + }, + ] }, } + # Find peer1.org1.example.com connection name from scenario.log + # In this case, it is s0-eth2 # $ ip link show s0-eth2 # ensure state DOWN - # ... state DOWN mode DEFAULT + # ... mtu 1500 qdisc htb master ovs-system state DOWN ev_update_link_peer1_org1_downlink = { "command": "environment_event", - "args": { - "action": "update_link", - "action_args": { - "events": [ - { - "group": "links", - "specs": { - "action": "update", - "online": False, - "resources": None - }, - "targets": ("s0", "peer1.org1.example.com") + "action": "update_link", + "action_args": { + "events": [ + { + "group": "links", + "specs": { + "action": "update", + "online": False, + "resources": None }, - ] - }, - }, - "schedule": { - "from": 1, - "until": 0, - "duration": 0, - "interval": 0, - "repeat": 0 + "targets": ("s0", "peer1.org1.example.com") + }, + ] }, } ev_update_link_peer1_org1_uplink = { "command": "environment_event", - "args": { - "action": "update_link", - "action_args": { - "events": [ - { - "group": "links", - "specs": { - "action": "update", - "online": True, - "resources": { - "bw": 1, - "delay": "2ms", - # "loss": None, - } - }, - "targets": ("s0", "peer1.org1.example.com") + "action": "update_link", + "action_args": { + "events": [ + { + "group": "links", + "specs": { + "action": "update", + "online": True, + "resources": { + "bw": 1, + "delay": "2ms", + # "loss": None, + } }, - ] - }, + "targets": ("s0", "peer1.org1.example.com") + }, + ] }, - "schedule": { - "from": 3, - "until": 0, - "duration": 0, - "interval": 0, - "repeat": 0 + } + + ev_update_link_orderer_down = { + "command": "environment_event", + "action": "update_link", + "action_args": { + "events": [ + { + "group": "links", + "specs": { + "action": "update", + "online": False, + "resources": None + }, + "targets": ("s0", "orderer.example.com") + }, + ] }, } - ev_agent_v2 = { + ev_update_link_orderer_up = { + "command": "environment_event", + "action": "update_link", + "action_args": { + "events": [ + { + "group": "links", + "specs": { + "action": "update", + "online": True, + "resources": { + "bw": 1, + "delay": "2ms", + # "loss": None, + } + }, + "targets": ("s0", "orderer.example.com") + }, + ] + }, + } + + ev_agent_ping_peer0org1 = { "agent_name": agent_name, "id": "100", "actions": [ @@ -458,7 +430,7 @@ def build_simple_fabric_cfg(): ], } - ev_monitor_v2 = { + ev_monitor_container_peer0org1 = { "id": "101", "actions": [ { @@ -485,34 +457,42 @@ def build_simple_fabric_cfg(): } - scenario.add_event("0", "fabric", ev_info_channels) - scenario.add_event("1", "fabric", ev_create_channel) - scenario.add_event_v2(1, "agent", ev_agent_v2) - scenario.add_event_v2(3, "monitor", ev_monitor_v2) - # scenario.add_event("3", "environment", ev_kill_container_peer0_org1) - # scenario.add_event("4", "environment", ev_kill_container_peer0_org2) - # scenario.add_event("6", "environment", ev_update_link) - # scenario.add_event("0", "environment", ev_update_link_peer1_org1_downlink) - # scenario.add_event("0", "environment", ev_update_link_peer1_org1_uplink) - scenario.add_event("2", "environment", ev_mem_limit_peer1_org1) - scenario.add_event("2", "environment", ev_cpu_limit_peer1_org2) - - scenario.add_event("3", "fabric", ev_join_channel_org1) - scenario.add_event("3", "fabric", ev_join_channel_org2) - scenario.add_event("3", "fabric", ev_join_channel_org3) - scenario.add_event("3", "fabric", ev_join_channel_org4) - scenario.add_event("5", "fabric", ev_info_channel) - # scenario.add_event("5", "fabric", ev_info_channel_config) - - # scenario.add_event("9", "fabric", ev_info_channels) - # scenario.add_event("10", "fabric", ev_info_network) - # scenario.add_event("11", "fabric", ev_chaincode_install_org1) - # scenario.add_event("11", "fabric", ev_chaincode_install_org2) - # scenario.add_event("13", "fabric", ev_chaincode_instantiate_org1) - # scenario.add_event("13", "fabric", ev_chaincode_instantiate_org2) - # scenario.add_event("23", "fabric", ev_chaincode_invoke_org1) - # scenario.add_event("30", "fabric", ev_chaincode_query_org1) - # scenario.add_event("32", "fabric", ev_chaincode_query_org2) + ev_get_topology = { + "command": "current_topology", + } + + + scenario.add_event_fabric("0", "fabric", ev_info_channels) + scenario.add_event_fabric("1", "fabric", ev_create_channel) + scenario.add_event_others(1, "agent", ev_agent_ping_peer0org1) + scenario.add_event_others(3, "monitor", ev_monitor_container_peer0org1) + # scenario.add_event_others(3, "environment", ev_kill_container_peer0_org1) + # scenario.add_event_others(4, "environment", ev_kill_container_peer0_org2) + # scenario.add_event_others(6, "environment", ev_update_link_res) + # scenario.add_event_others(3, "environment", ev_update_link_peer1_org1_downlink) + # scenario.add_event_others(3, "environment", ev_update_link_peer1_org1_uplink) + # scenario.add_event_others(1, "environment", ev_update_link_orderer_down) + # scenario.add_event_others(2, "environment", ev_update_link_orderer_up) + scenario.add_event_others(1, "environment", ev_mem_limit_peer1_org1) + scenario.add_event_others(1, "environment", ev_cpu_limit_peer1_org2) + scenario.add_event_others(3, "environment", ev_get_topology) + + scenario.add_event_fabric("3", "fabric", ev_join_channel_org1) + scenario.add_event_fabric("3", "fabric", ev_join_channel_org2) + scenario.add_event_fabric("3", "fabric", ev_join_channel_org3) + scenario.add_event_fabric("3", "fabric", ev_join_channel_org4) + scenario.add_event_fabric("5", "fabric", ev_info_channel) + # scenario.add_event_fabric("5", "fabric", ev_info_channel_config) + + scenario.add_event_fabric("9", "fabric", ev_info_channels) + scenario.add_event_fabric("10", "fabric", ev_info_network) + scenario.add_event_fabric("11", "fabric", ev_chaincode_install_org1) + scenario.add_event_fabric("11", "fabric", ev_chaincode_install_org2) + scenario.add_event_fabric("13", "fabric", ev_chaincode_instantiate_org1) + scenario.add_event_fabric("13", "fabric", ev_chaincode_instantiate_org2) + scenario.add_event_fabric("23", "fabric", ev_chaincode_invoke_org1) + scenario.add_event_fabric("35", "fabric", ev_chaincode_query_org1) + scenario.add_event_fabric("37", "fabric", ev_chaincode_query_org2) # Save config file scenario.save() diff --git a/examples/report.py b/examples/report.py new file mode 100644 index 0000000..8972c56 --- /dev/null +++ b/examples/report.py @@ -0,0 +1,85 @@ +#!/usr/bin/env python3.7 + +import os +import sys +import logging +import re +import subprocess +from shutil import which +from datetime import date + +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + +brokerlog_path = "logs/broker.log" +report_path = "logs/REPORT.log" + +OUT = "" + +OUT += "=============================================\n" +OUT += "=== Simulation Report Summary ===\n" +OUT += "=============================================\n" +OUT += "\n\n" + +def get_ts(line): + # 2020-08-23 15:12:10,119 [INFO] umbra.broker.plugins.fabric + # from log format above, timestamp field is at 2nd field + out = " " + line.split(' ')[1] + return out + +def parse_dot(line): + global OUT + OUT += "\n\n\n" + part = line.partition("DOT: ") + + res = "" + if which("graph-easy"): + proc = subprocess.run(["graph-easy"], stdout=subprocess.PIPE, input=part[2], encoding='ascii') + res = proc.stdout + else: + res = "Install Perl script `sudo cpanm Graph::Easy` to parse Dot format" + + OUT += "Topology at" + get_ts(part[0]) + ":\n\n" + OUT += res + "\n\n\n" + +def parse_fabric_cfg(line): + global OUT + part = line.partition("FABRIC_CONFIG: ") + OUT += part[2] + +def parse_fabric_ev(line): + global OUT + ts = get_ts(line) + part = line.partition("FABRIC_EV:") + OUT += "Fabric_event response at" + ts + " " + part[2] + +def parse_environment_ev(line): + global OUT + part = line.partition("START call_scenario:") + ts = get_ts(line) + OUT += "Executing environment_event at" + ts + " " + part[2] + +with open(brokerlog_path, "r") as file: + for line in file: + if "FABRIC_CONFIG" in line: + parse_fabric_cfg(line) + + if ": FABRIC_EV:" in line: # results from executing Fabric event + parse_fabric_ev(line) + + if "START call_scenario:" in line: + parse_environment_ev(line) + + if "Scheduling plugin fabric" in line: + part = line.split(' ') + OUT += f"Scheduling fabric event at: {part[1]}\n" + + if "Calling at" in line: + part = line.partition("Calling at ") + OUT += " " + part[2] + + if "DOT" in line: # topology in DOT format + parse_dot(line) + +with open(report_path, 'w') as report_file: + report_file.write(OUT) diff --git a/examples/run.sh b/examples/run.sh index 5567c6e..0c84798 100755 --- a/examples/run.sh +++ b/examples/run.sh @@ -20,6 +20,8 @@ function printHelp() { echo " - one of 'start' or 'stop'" echo " - 'start' - bring up the network specified in the " echo " - 'stop' - stop and clear the started setup" + echo " -r run post-processing script to generate report after setup teardown" + echo " NOTE: -r requires simulation in debug mode. E.g. run.sh start ___ -d" echo " -c - filepath of a config created using umbra-configs" echo " -d enable debug mode to print more logs" echo " run.sh -h (print this message)" @@ -51,12 +53,6 @@ reset() { kill_process_tree 1 $$ fi - echo_bold "Cleaning logs" - files=(./logs/*) - if [ ${#files[@]} -gt 0 ]; then - rm ./logs/* - fi - scenarioPID=`ps -o pid --no-headers -C umbra-scenario` brokerPID=`ps -o pid --no-headers -C umbra-broker` monitorPID=`ps -o pid --no-headers -C umbra-monitor` @@ -85,6 +81,23 @@ reset() { echo_bold "Stopping examples script ${examplesPID}" kill -9 $examplesPID &> /dev/null fi + + # generate report and save all logs if '-r', else delete logs + if [[ ! -z "${REPORT}" ]]; then + timenow=`date +"%G_%m_%d_%H-%M-%S"` + echo_bold "Generate report to ./logs/${timenow}/REPORT.log" + python3 report.py + # save log file + echo_bold "-> Saving logs at ./logs/${timenow}" + mkdir ./logs/${timenow} + mv ./logs/*.log ./logs/${timenow} + else + echo_bold "Cleaning logs" + files=(./logs/*) + if [ ${#files[@]} -gt 0 ]; then + rm ./logs/*.log + fi + fi } function clearContainers() { @@ -97,7 +110,7 @@ function clearContainers() { } -while getopts ":h:c:d" opt; do +while getopts ":hc:dr" opt; do case "${opt}" in h | \?) printHelp @@ -109,6 +122,9 @@ while getopts ":h:c:d" opt; do d) DEBUG='--debug' ;; + r) + REPORT=1 + ;; esac done @@ -159,7 +175,8 @@ case "$COMMAND" in echo "Running config ${CONFIG_SOURCE}" echo "########################################" - sleep 6 + #sleep 6 + until nc -z 172.17.0.1 8989; do sleep 1; done examples="/usr/bin/python3.7 ./examples.py --config ${CONFIG_SOURCE}" nohup ${examples} > logs/examples.log 2>&1 & examplesPID=$! diff --git a/umbra/broker/operator.py b/umbra/broker/operator.py index 0c8020c..c118c41 100644 --- a/umbra/broker/operator.py +++ b/umbra/broker/operator.py @@ -79,7 +79,7 @@ async def call_scenario(self, test, command, topology, address): logger.info(f"Deploying Scenario - {command}") scenario = self.serialize_bytes(topology) - deploy = Workflow(id=test, workflow=command, scenario=scenario) + deploy = Workflow(id=test, command=command, scenario=scenario) deploy.timestamp.FromDatetime(datetime.now()) host, port = address.split(":") @@ -133,9 +133,10 @@ async def call_events(self, scenario, info_deploy): topo.fill_config(info_topology) topo.fill_hosts_config(info_hosts) self.topology = topo + logger.debug("DOT: %s", self.topology.to_dot()) self.config_plugins() - events = scenario.get("events") + events = scenario.get("events_fabric") self.schedule_plugins(events) def config_env_event(self, wflow_id): @@ -143,16 +144,40 @@ def config_env_event(self, wflow_id): self.plugins["environment"] = self.events_env async def call_env_event(self, wflow_id, scenario): + logger.info("Scheduling environment events...") self.config_env_event(wflow_id) - events = scenario.get("events") - - # filter out non "environment" type events - env_events = {key: value for key, value in events.items() - if value['category'] == "environment"} - await self.events_env.handle(env_events) + env_events = scenario.get("events_others").get("environment") + + # Any better way to get the id of event=current_topology? + # Need it as the key to the 'result' dict which has + # the response of the query for current topology + curr_topo_id = None + for event in env_events: + if event["command"] == "current_topology": + curr_topo_id = event["id"] + + result = await self.events_env.handle(env_events) + + # BUG: what if you have > 1 current_topology events? Above + # `await` will block until you receive results from all tasks. + # Correct behavior would be to straightaway update topology + # after querying topology from umbra-scenario + + # update the topology with the newly received topology + if curr_topo_id: + topo = self.scenario.get_topology() + updated_topo = result[curr_topo_id][1].get("topology") + updated_host = result[curr_topo_id][1].get("hosts") + topo.fill_config(updated_topo) + topo.fill_hosts_config(updated_host) + self.topology = topo + logger.debug("DOT: %s", self.topology.to_dot()) + + return result async def call_agent_event(self, scenario): - agent_events = scenario.get("eventsv2").get("agent") + logger.info("Scheduling agent events...") + agent_events = scenario.get("events_others").get("agent") # '[0]' because we assume only single agent exist, thus all # events should have the same "agent_name" agent_name = agent_events[0].get("agent_name") @@ -178,7 +203,8 @@ async def call_agent_event(self, scenario): channel.close() async def call_monitor_event(self, scenario): - monitor_events = scenario.get("eventsv2").get("monitor") + logger.info("Scheduling monitor events...") + monitor_events = scenario.get("events_others").get("monitor") # extract all the actions from monitor_events to # construct the Instruction message @@ -225,12 +251,13 @@ async def run(self, request): status_bytes = self.serialize_bytes(status_info) report.status = status_bytes - await self.call_agent_event(scenario) - await self.call_monitor_event(scenario) - await self.call_env_event(request.id, scenario) + await asyncio.gather( + self.call_agent_event(scenario), + self.call_monitor_event(scenario), + self.call_env_event(request.id, scenario) + ) else: ack,topo_info = await self.call_scenario(request.id, "stop", {}, address) return report - diff --git a/umbra/broker/plugins/env.py b/umbra/broker/plugins/env.py index 464d6cf..f3e68c7 100644 --- a/umbra/broker/plugins/env.py +++ b/umbra/broker/plugins/env.py @@ -10,6 +10,12 @@ logger = logging.getLogger(__name__) class EnvEventHandler(): + """ + Responsible for handling environment related events. + It will construct EnvEvent class based on user-defined events + and schedule it to run using umbra/common/scheduler component + """ + def __init__(self): self.handler = Handler() # url:port address for umbra-scenario @@ -23,24 +29,24 @@ def config(self, address, wflow_id): def build_calls(self, events): calls = {} - for event_id, event_args in events.items(): - params = event_args.get('params', {}) - env_event = EnvEvent(self.address, self.wflow_id, params['command'], params['args']) + for event in events: + ev_id = event.get("id") + env_event = EnvEvent(self.address, self.wflow_id, event) action_call = env_event.call_scenario - action_sched = params.get('schedule', {}) - calls[event_id] = (action_call, action_sched) + calls[ev_id] = (action_call, event.get("schedule")) return calls async def handle(self, events): calls = self.build_calls(events) results = await self.handler.run(calls) + return results class EnvEvent(): - def __init__(self, address, wflow_id, command, wflow_scenario): + def __init__(self, address, wflow_id, wflow_scenario): self.address = address + self.wflow_cmd = wflow_scenario.get("command", "") self.wflow_id = wflow_id - self.command = command self.wflow_scenario = wflow_scenario def parse_bytes(self, msg): @@ -63,9 +69,9 @@ def serialize_bytes(self, msg): async def call_scenario(self): logger.debug(f"START call_scenario: {self.wflow_scenario}") - self.wflow_scenario = self.serialize_bytes(self.wflow_scenario) - deploy = Workflow(id=self.wflow_id, workflow=self.command, scenario=self.wflow_scenario) + deploy = Workflow(id=self.wflow_id, command=self.wflow_cmd, + scenario=self.wflow_scenario) deploy.timestamp.FromDatetime(datetime.now()) host, port = self.address.split(":") diff --git a/umbra/broker/plugins/fabric.py b/umbra/broker/plugins/fabric.py index 584797b..d1656cb 100644 --- a/umbra/broker/plugins/fabric.py +++ b/umbra/broker/plugins/fabric.py @@ -46,11 +46,11 @@ def build_cli(self): os.environ["PATH"] += os.pathsep + os.pathsep.join(pathlist) self._cli = Client(net_profile=self._config_sdk) - logger.debug("Fabric Orgs %s", self._cli.organizations) - logger.debug("Fabric Peers %s", self._cli.peers) - logger.debug("Fabric Orderers %s", self._cli.orderers) - logger.debug("Fabric CAs %s", self._cli.CAs) - logger.info("Fabric Client SDK CLI Started") + logger.debug("FABRIC_CONFIG: Fabric Orgs %s", self._cli.organizations.keys()) + logger.debug("FABRIC_CONFIG: Fabric Peers %s", self._cli.peers.keys()) + logger.debug("FABRIC_CONFIG: Fabric Orderers %s", self._cli.orderers.keys()) + logger.debug("FABRIC_CONFIG: Fabric CAs %s", self._cli.CAs.keys()) + logger.info("Fabric Client SDK CLI Started") def schedule(self, events): for _id,event in events.items(): @@ -144,7 +144,7 @@ async def event_create_channel(self, ev): config_yaml=self._configtx_dir, channel_profile=profile ) - logger.info("Create channel response %s", response) + logger.info("FABRIC_EV:create_channel: Create channel response %s", response) return response logger.info("unknown orderer %s and org %s", orderer_name, org_name) @@ -175,7 +175,7 @@ async def event_join_channel(self, ev): peers=peers_fqdn, orderer=orderer_fqdn ) - logger.info("Join channel response %s", response) + logger.info("FABRIC_EV:join_channel: Join channel response %s", response) return response logger.info("unknown orderer %s and org %s", orderer_name, org_name) @@ -202,7 +202,7 @@ async def event_info_channel(self, ev): peers=peers_fqdn, decode=True ) - logger.info("Info channel response %s", response) + logger.info("FABRIC_EV:info_channel: Info channel response %s", response) return response logger.info("unknown org %s and/org peers %s", org_name, peers_names) @@ -227,7 +227,7 @@ async def event_info_channels(self, ev): peers=peers_fqdn, decode=True ) - logger.info("Info channels response %s", response) + logger.info("FABRIC_EV:info_channels: Info channels response %s", response) return response logger.info("unknown org %s and/org peers %s", org_name, peers_names) @@ -254,7 +254,7 @@ async def event_info_channel_config(self, ev): peers=peers_fqdn, decode=True ) - logger.info("Info channel config response %s", response) + logger.info("FABRIC_EV:info_channel_config: Info channel config response %s", response) return response logger.info("unknown org %s and/org peers %s", org_name, peers_names) @@ -279,7 +279,7 @@ async def event_info_channel_chaincodes(self, ev): peers=peers_fqdn, decode=True ) - logger.info("Info channel chaincodes response %s", response) + logger.info("FABRIC_EV:info_channel_chaincodes: Info channel chaincodes response %s", response) return response logger.info("unknown org %s and/org peers %s", org_name, peers_names) @@ -296,7 +296,7 @@ async def event_info_network(self, ev): orderer_fqdn, 'mspid' ) - logger.info("Info network response %s", response) + logger.info("FABRIC_EV:info_network: Info network response %s", response) return response logger.info("unknown orderer %s", orderer_name) @@ -326,7 +326,7 @@ async def event_chaincode_install(self, ev): cc_name=chaincode_name, cc_version=chaincode_version ) - logger.info("Chaincode install response %s", response) + logger.info("FABRIC_EV:chaincode_install: Chaincode install response %s", response) return response logger.info("unknown org %s and/or peers %s", org_name, peers_names) @@ -358,7 +358,7 @@ async def event_chaincode_instantiate(self, ev): cc_name=chaincode_name, cc_version=chaincode_version ) - logger.info("Chaincode instantiate response %s", response) + logger.info("FABRIC_EV:chaincode_instantiate: Chaincode instantiate response %s", response) return response logger.info("unknown org %s and/or peers %s", org_name, peers_names) @@ -388,7 +388,7 @@ async def event_chaincode_invoke(self, ev): args=chaincode_args, cc_name=chaincode_name ) - logger.info("Chaincode invoke response %s", response) + logger.info("FABRIC_EV:chaincode_invoke: Chaincode invoke response %s", response) return response logger.info("unknown org %s and/or peers %s", org_name, peers_names) @@ -419,8 +419,8 @@ async def event_chaincode_query(self, ev): args=chaincode_args, cc_name=chaincode_name ) - logger.info("Chaincode query response %s", response) + logger.info("FABRIC_EV:chaincode_query: Chaincode query response %s", response) return response logger.info("unknown org %s and/or peers %s", org_name, peers_names) - return None \ No newline at end of file + return None diff --git a/umbra/common/protobuf/build.sh b/umbra/common/protobuf/build.sh index 2a92e74..7b0dfce 100755 --- a/umbra/common/protobuf/build.sh +++ b/umbra/common/protobuf/build.sh @@ -16,4 +16,7 @@ # python3 -m grpc_tools.protoc --proto_path=$(pwd) --python_out=$(pwd) --grpc_python_out=$(pwd) umbra.proto python3 -m grpc_tools.protoc -I. --python_out=. --grpclib_python_out=. umbra.proto - +# NOTE: issue with protobuf import path +# https://github.com/protocolbuffers/protobuf/issues/1491 +# https://github.com/protocolbuffers/protobuf/pull/7470 +sed -i 's/import umbra_pb2/from umbra.common.protobuf import umbra_pb2/g' umbra_grpc.py diff --git a/umbra/common/protobuf/umbra.proto b/umbra/common/protobuf/umbra.proto index c745016..a2a1df8 100644 --- a/umbra/common/protobuf/umbra.proto +++ b/umbra/common/protobuf/umbra.proto @@ -13,6 +13,7 @@ service Broker { service Scenario { rpc Establish(Workflow) returns (Status); + rpc CurrentTopology(Workflow) returns (Status); } service Monitor { @@ -37,7 +38,7 @@ message Report { message Workflow { string id = 1; - string workflow = 2; + string command = 2; bytes scenario = 3; google.protobuf.Timestamp timestamp = 4; } diff --git a/umbra/common/protobuf/umbra_grpc.py b/umbra/common/protobuf/umbra_grpc.py index eb4abce..26f2fdd 100644 --- a/umbra/common/protobuf/umbra_grpc.py +++ b/umbra/common/protobuf/umbra_grpc.py @@ -64,6 +64,10 @@ class ScenarioBase(abc.ABC): async def Establish(self, stream: 'grpclib.server.Stream[umbra_pb2.Workflow, umbra_pb2.Status]') -> None: pass + @abc.abstractmethod + async def CurrentTopology(self, stream: 'grpclib.server.Stream[umbra_pb2.Workflow, umbra_pb2.Status]') -> None: + pass + def __mapping__(self) -> typing.Dict[str, grpclib.const.Handler]: return { '/umbra.Scenario/Establish': grpclib.const.Handler( @@ -72,6 +76,12 @@ def __mapping__(self) -> typing.Dict[str, grpclib.const.Handler]: umbra_pb2.Workflow, umbra_pb2.Status, ), + '/umbra.Scenario/CurrentTopology': grpclib.const.Handler( + self.CurrentTopology, + grpclib.const.Cardinality.UNARY_UNARY, + umbra_pb2.Workflow, + umbra_pb2.Status, + ), } @@ -84,6 +94,12 @@ def __init__(self, channel: grpclib.client.Channel) -> None: umbra_pb2.Workflow, umbra_pb2.Status, ) + self.CurrentTopology = grpclib.client.UnaryUnaryMethod( + channel, + '/umbra.Scenario/CurrentTopology', + umbra_pb2.Workflow, + umbra_pb2.Status, + ) class MonitorBase(abc.ABC): diff --git a/umbra/common/protobuf/umbra_pb2.py b/umbra/common/protobuf/umbra_pb2.py index ba3022f..8b693a3 100644 --- a/umbra/common/protobuf/umbra_pb2.py +++ b/umbra/common/protobuf/umbra_pb2.py @@ -21,7 +21,7 @@ syntax='proto3', serialized_options=None, create_key=_descriptor._internal_create_key, - serialized_pb=b'\n\x0bumbra.proto\x12\x05umbra\x1a\x1cgoogle/protobuf/struct.proto\x1a\x1fgoogle/protobuf/timestamp.proto\"U\n\x06\x43onfig\x12\n\n\x02id\x18\x01 \x01(\t\x12\x10\n\x08scenario\x18\x02 \x01(\x0c\x12-\n\ttimestamp\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"S\n\x06Report\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0e\n\x06status\x18\x02 \x01(\x0c\x12-\n\ttimestamp\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"i\n\x08Workflow\x12\n\n\x02id\x18\x01 \x01(\t\x12\x10\n\x08workflow\x18\x02 \x01(\t\x12\x10\n\x08scenario\x18\x03 \x01(\x0c\x12-\n\ttimestamp\x18\x04 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"l\n\x06Status\x12\n\n\x02id\x18\x01 \x01(\t\x12\n\n\x02ok\x18\x02 \x01(\t\x12\r\n\x05\x65rror\x18\x03 \x01(\t\x12\x0c\n\x04info\x18\x04 \x01(\x0c\x12-\n\ttimestamp\x18\x05 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"\xbd\x03\n\x0bInstruction\x12\n\n\x02id\x18\x01 \x01(\t\x12*\n\x07\x61\x63tions\x18\x02 \x03(\x0b\x32\x19.umbra.Instruction.Action\x1aX\n\x05Sched\x12\x0c\n\x04\x66rom\x18\x01 \x01(\r\x12\r\n\x05until\x18\x02 \x01(\r\x12\x10\n\x08\x64uration\x18\x03 \x01(\r\x12\x10\n\x08interval\x18\x04 \x01(\r\x12\x0e\n\x06repeat\x18\x05 \x01(\r\x1a\x9b\x02\n\x06\x41\x63tion\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0c\n\x04tool\x18\x02 \x01(\t\x12\x30\n\x06output\x18\x03 \x01(\x0b\x32 .umbra.Instruction.Action.Output\x12=\n\nparameters\x18\x04 \x03(\x0b\x32).umbra.Instruction.Action.ParametersEntry\x12*\n\x08schedule\x18\x05 \x01(\x0b\x32\x18.umbra.Instruction.Sched\x1a\'\n\x06Output\x12\x0c\n\x04live\x18\x01 \x01(\x08\x12\x0f\n\x07\x61\x64\x64ress\x18\x02 \x01(\t\x1a\x31\n\x0fParametersEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\x9f\x03\n\nEvaluation\x12\n\n\x02id\x18\x01 \x01(\t\x12(\n\x06source\x18\x02 \x01(\x0b\x32\x18.umbra.Evaluation.Source\x12)\n\x07metrics\x18\x03 \x03(\x0b\x32\x18.umbra.Evaluation.Metric\x12.\n\ttimestamp\x18\x04 \x01(\x0b\x32\x1b.umbra.Evaluation.Timestamp\x1a$\n\x06Source\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0c\n\x04\x63\x61ll\x18\x02 \x01(\t\x1ax\n\x06Metric\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0c\n\x04type\x18\x02 \x01(\t\x12\x0c\n\x04unit\x18\x03 \x01(\t\x12\x10\n\x06scalar\x18\x04 \x01(\x01H\x00\x12)\n\x06series\x18\x05 \x01(\x0b\x32\x17.google.protobuf.StructH\x00\x42\x07\n\x05value\x1a`\n\tTimestamp\x12)\n\x05start\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12(\n\x04stop\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\">\n\x08Snapshot\x12\n\n\x02id\x18\x01 \x01(\t\x12&\n\x0b\x65valuations\x18\x02 \x03(\x0b\x32\x11.umbra.Evaluation2]\n\x06\x42roker\x12&\n\x06Manage\x12\r.umbra.Config\x1a\r.umbra.Report\x12+\n\x07Measure\x12\x11.umbra.Evaluation\x1a\r.umbra.Status27\n\x08Scenario\x12+\n\tEstablish\x12\x0f.umbra.Workflow\x1a\r.umbra.Status28\n\x07Monitor\x12-\n\x06Listen\x12\x12.umbra.Instruction\x1a\x0f.umbra.Snapshot25\n\x05\x41gent\x12,\n\x05Probe\x12\x12.umbra.Instruction\x1a\x0f.umbra.Snapshotb\x06proto3' + serialized_pb=b'\n\x0bumbra.proto\x12\x05umbra\x1a\x1cgoogle/protobuf/struct.proto\x1a\x1fgoogle/protobuf/timestamp.proto\"U\n\x06\x43onfig\x12\n\n\x02id\x18\x01 \x01(\t\x12\x10\n\x08scenario\x18\x02 \x01(\x0c\x12-\n\ttimestamp\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"S\n\x06Report\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0e\n\x06status\x18\x02 \x01(\x0c\x12-\n\ttimestamp\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"h\n\x08Workflow\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0f\n\x07\x63ommand\x18\x02 \x01(\t\x12\x10\n\x08scenario\x18\x03 \x01(\x0c\x12-\n\ttimestamp\x18\x04 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"l\n\x06Status\x12\n\n\x02id\x18\x01 \x01(\t\x12\n\n\x02ok\x18\x02 \x01(\t\x12\r\n\x05\x65rror\x18\x03 \x01(\t\x12\x0c\n\x04info\x18\x04 \x01(\x0c\x12-\n\ttimestamp\x18\x05 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"\xbd\x03\n\x0bInstruction\x12\n\n\x02id\x18\x01 \x01(\t\x12*\n\x07\x61\x63tions\x18\x02 \x03(\x0b\x32\x19.umbra.Instruction.Action\x1aX\n\x05Sched\x12\x0c\n\x04\x66rom\x18\x01 \x01(\r\x12\r\n\x05until\x18\x02 \x01(\r\x12\x10\n\x08\x64uration\x18\x03 \x01(\r\x12\x10\n\x08interval\x18\x04 \x01(\r\x12\x0e\n\x06repeat\x18\x05 \x01(\r\x1a\x9b\x02\n\x06\x41\x63tion\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0c\n\x04tool\x18\x02 \x01(\t\x12\x30\n\x06output\x18\x03 \x01(\x0b\x32 .umbra.Instruction.Action.Output\x12=\n\nparameters\x18\x04 \x03(\x0b\x32).umbra.Instruction.Action.ParametersEntry\x12*\n\x08schedule\x18\x05 \x01(\x0b\x32\x18.umbra.Instruction.Sched\x1a\'\n\x06Output\x12\x0c\n\x04live\x18\x01 \x01(\x08\x12\x0f\n\x07\x61\x64\x64ress\x18\x02 \x01(\t\x1a\x31\n\x0fParametersEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\x9f\x03\n\nEvaluation\x12\n\n\x02id\x18\x01 \x01(\t\x12(\n\x06source\x18\x02 \x01(\x0b\x32\x18.umbra.Evaluation.Source\x12)\n\x07metrics\x18\x03 \x03(\x0b\x32\x18.umbra.Evaluation.Metric\x12.\n\ttimestamp\x18\x04 \x01(\x0b\x32\x1b.umbra.Evaluation.Timestamp\x1a$\n\x06Source\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0c\n\x04\x63\x61ll\x18\x02 \x01(\t\x1ax\n\x06Metric\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0c\n\x04type\x18\x02 \x01(\t\x12\x0c\n\x04unit\x18\x03 \x01(\t\x12\x10\n\x06scalar\x18\x04 \x01(\x01H\x00\x12)\n\x06series\x18\x05 \x01(\x0b\x32\x17.google.protobuf.StructH\x00\x42\x07\n\x05value\x1a`\n\tTimestamp\x12)\n\x05start\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12(\n\x04stop\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\">\n\x08Snapshot\x12\n\n\x02id\x18\x01 \x01(\t\x12&\n\x0b\x65valuations\x18\x02 \x03(\x0b\x32\x11.umbra.Evaluation2]\n\x06\x42roker\x12&\n\x06Manage\x12\r.umbra.Config\x1a\r.umbra.Report\x12+\n\x07Measure\x12\x11.umbra.Evaluation\x1a\r.umbra.Status2j\n\x08Scenario\x12+\n\tEstablish\x12\x0f.umbra.Workflow\x1a\r.umbra.Status\x12\x31\n\x0f\x43urrentTopology\x12\x0f.umbra.Workflow\x1a\r.umbra.Status28\n\x07Monitor\x12-\n\x06Listen\x12\x12.umbra.Instruction\x1a\x0f.umbra.Snapshot25\n\x05\x41gent\x12,\n\x05Probe\x12\x12.umbra.Instruction\x1a\x0f.umbra.Snapshotb\x06proto3' , dependencies=[google_dot_protobuf_dot_struct__pb2.DESCRIPTOR,google_dot_protobuf_dot_timestamp__pb2.DESCRIPTOR,]) @@ -136,7 +136,7 @@ is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( - name='workflow', full_name='umbra.Workflow.workflow', index=1, + name='command', full_name='umbra.Workflow.command', index=1, number=2, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, @@ -169,7 +169,7 @@ oneofs=[ ], serialized_start=257, - serialized_end=362, + serialized_end=361, ) @@ -228,8 +228,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=364, - serialized_end=472, + serialized_start=363, + serialized_end=471, ) @@ -288,8 +288,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=546, - serialized_end=634, + serialized_start=545, + serialized_end=633, ) _INSTRUCTION_ACTION_OUTPUT = _descriptor.Descriptor( @@ -326,8 +326,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=830, - serialized_end=869, + serialized_start=829, + serialized_end=868, ) _INSTRUCTION_ACTION_PARAMETERSENTRY = _descriptor.Descriptor( @@ -364,8 +364,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=871, - serialized_end=920, + serialized_start=870, + serialized_end=919, ) _INSTRUCTION_ACTION = _descriptor.Descriptor( @@ -423,8 +423,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=637, - serialized_end=920, + serialized_start=636, + serialized_end=919, ) _INSTRUCTION = _descriptor.Descriptor( @@ -461,8 +461,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=475, - serialized_end=920, + serialized_start=474, + serialized_end=919, ) @@ -500,8 +500,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=1082, - serialized_end=1118, + serialized_start=1081, + serialized_end=1117, ) _EVALUATION_METRIC = _descriptor.Descriptor( @@ -564,8 +564,8 @@ create_key=_descriptor._internal_create_key, fields=[]), ], - serialized_start=1120, - serialized_end=1240, + serialized_start=1119, + serialized_end=1239, ) _EVALUATION_TIMESTAMP = _descriptor.Descriptor( @@ -602,8 +602,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=1242, - serialized_end=1338, + serialized_start=1241, + serialized_end=1337, ) _EVALUATION = _descriptor.Descriptor( @@ -654,8 +654,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=923, - serialized_end=1338, + serialized_start=922, + serialized_end=1337, ) @@ -693,8 +693,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=1340, - serialized_end=1402, + serialized_start=1339, + serialized_end=1401, ) _CONFIG.fields_by_name['timestamp'].message_type = google_dot_protobuf_dot_timestamp__pb2._TIMESTAMP @@ -849,8 +849,8 @@ index=0, serialized_options=None, create_key=_descriptor._internal_create_key, - serialized_start=1404, - serialized_end=1497, + serialized_start=1403, + serialized_end=1496, methods=[ _descriptor.MethodDescriptor( name='Manage', @@ -885,8 +885,8 @@ index=1, serialized_options=None, create_key=_descriptor._internal_create_key, - serialized_start=1499, - serialized_end=1554, + serialized_start=1498, + serialized_end=1604, methods=[ _descriptor.MethodDescriptor( name='Establish', @@ -898,6 +898,16 @@ serialized_options=None, create_key=_descriptor._internal_create_key, ), + _descriptor.MethodDescriptor( + name='CurrentTopology', + full_name='umbra.Scenario.CurrentTopology', + index=1, + containing_service=None, + input_type=_WORKFLOW, + output_type=_STATUS, + serialized_options=None, + create_key=_descriptor._internal_create_key, + ), ]) _sym_db.RegisterServiceDescriptor(_SCENARIO) @@ -911,8 +921,8 @@ index=2, serialized_options=None, create_key=_descriptor._internal_create_key, - serialized_start=1556, - serialized_end=1612, + serialized_start=1606, + serialized_end=1662, methods=[ _descriptor.MethodDescriptor( name='Listen', @@ -937,8 +947,8 @@ index=3, serialized_options=None, create_key=_descriptor._internal_create_key, - serialized_start=1614, - serialized_end=1667, + serialized_start=1664, + serialized_end=1717, methods=[ _descriptor.MethodDescriptor( name='Probe', diff --git a/umbra/common/scheduler.py b/umbra/common/scheduler.py index 407fd63..1800cda 100644 --- a/umbra/common/scheduler.py +++ b/umbra/common/scheduler.py @@ -173,7 +173,7 @@ async def run(self, calls): uid = calls_ids[counter] if isinstance(aw, Exception): - logger.debug(f"Could not run _schedule {calls[uid]} - exception {aw}") + logger.error(f"Could not run _schedule {calls[uid]} - exception {aw}") else: if aw: results[uid] = aw.pop() diff --git a/umbra/design/configs.py b/umbra/design/configs.py index 8d866a1..c401d75 100644 --- a/umbra/design/configs.py +++ b/umbra/design/configs.py @@ -380,7 +380,29 @@ def show(self): logger.info("links:") for src, dst, data in self.graph.edges(data=True): logger.info(f" src = {src}, dst = {dst}, data = {data}", data) - + + def to_dot(self): + """ + Parse networkx graph into Graphiz Dot format + + Return a string in Graphviz Dot format showing the + connections between all the nodes in the Topology + """ + dot_fmt = "strict graph {" + + # Populate list of nodes + for n in self.graph.nodes(): + dot_fmt += f"\"{n}\";" + + # Populate the connection between nodes + for src, dst, data in self.graph.edges(data=True): + if data.get("deploy", {}).get("intf_isup", True): + dot_fmt += f"\"{src}\" -- \"{dst}\";" + + dot_fmt += "}" + + return dot_fmt + def build(self): nodes = [] links = [] @@ -1495,7 +1517,7 @@ def __init__(self, name, cfgs_dir, clear_dir=True): self.network_mode = "umbra" -class Events: +class EventsFabric: def __init__(self): self._ids = 1 self._events = {} @@ -1517,7 +1539,10 @@ def build(self): def parse(self, data): self._events = data -class EventsV2: +class EventsOthers: + """ + Use this Event class for event category of: monitor, agent, and environment + """ def __init__(self): self._ev_id = 1; self._events = defaultdict(lambda: []) @@ -1558,30 +1583,36 @@ def __init__(self, id, entrypoint, folder): self.id = id self.entrypoint = entrypoint self.folder = folder - self.topology = None - self.events = Events() - self.eventsv2 = EventsV2() + self.topology = None + # ideally, we should have a generic Event class for all kinds of + # event type: Fabric, Iroha, environment, agent, monitor, etc. + # To achieve that, all events should use the umbra/common/scheduler.py + # Currently, FabricEvents (broker/plugin/fabric.py) has custom scheduler + self.events_fabric = EventsFabric() + self.events_others = EventsOthers() def parse(self, data): topo = Topology(None) ack = topo.parse(data.get("topology", {})) if ack: self.topology = topo - self.events.parse(data.get("events", {})) - self.eventsv2.parse(data.get("eventsv2", {})) + self.events_fabric.parse(data.get("events_fabric", {})) + self.events_others.parse(data.get("events_others", {})) self.name = data.get("id", None) self.entrypoint = data.get("entrypoint", None) return True return False - def add_event(self, when, category, params): - self.events.add(when, category, params) + def add_event_fabric(self, when, category, params): + self.events_fabric.add(when, category, params) - def add_event_v2(self, when, category, ev_args): + def add_event_others(self, when, category, ev_args): """ - category: fabric | environment | agent | monitor + Arguments: + when: run the event at ith-second + category: environment | agent | monitor """ - self.eventsv2.add(when, category, ev_args) + self.events_others.add(when, category, ev_args) def set_topology(self, topology): self.topology = topology @@ -1591,14 +1622,14 @@ def get_topology(self): def dump(self): topo_built = self.topology.build() - events_built = self.events.build() - eventsv2_built = self.eventsv2.build() + events_fabric_built = self.events_fabric.build() + events_others_built = self.events_others.build() scenario = { "id": self.id, "entrypoint": self.entrypoint, "topology": topo_built, - "events": events_built, - "eventsv2": eventsv2_built + "events_fabric": events_fabric_built, + "events_others": events_others_built } return scenario diff --git a/umbra/scenario/environment.py b/umbra/scenario/environment.py index 6548cef..4d8947d 100644 --- a/umbra/scenario/environment.py +++ b/umbra/scenario/environment.py @@ -100,7 +100,7 @@ def __init__(self, topo): self.switches = {} self.nodes_info = {} logger.debug("Environment Instance Created") - logger.debug(f"{json.dumps(self.topo, indent=4)}") + # logger.debug(f"{json.dumps(self.topo, indent=4)}") def update_link_resources(self, src, dst, resources): src = self.net.get(src) @@ -318,6 +318,7 @@ def parse_info(self, elements, specie): "name": link_name, "src": link.intf1.node.name, "dst": link.intf2.node.name, + "intf_isup": link.intf1.isUp() and link.intf2.isUp(), "src-port": link.intf1.name, "dst-port": link.intf2.name, } @@ -342,8 +343,9 @@ def start(self): self._start_network() logger.info("Experiment running") + self.nodes_info = self.parse_info(self.net.hosts, "hosts") info = { - "hosts": self.nodes_info, + "hosts": self.nodes_info.get("hosts"), "topology": self.net_topo_info(), } return True, info @@ -353,6 +355,15 @@ def _stop_network(self): self.net.stop() logger.info("Stopped network: %r" % self.net) + def get_current_topology(self): + self.nodes_info = self.parse_info(self.net.hosts, "hosts") + info = { + "hosts": self.nodes_info.get("hosts"), + "topology": self.net_topo_info(), + } + + return True, info + def kill_container(self, node_name): err_msg = None ok = True diff --git a/umbra/scenario/main.py b/umbra/scenario/main.py index c6380e2..d6cc63b 100644 --- a/umbra/scenario/main.py +++ b/umbra/scenario/main.py @@ -46,20 +46,22 @@ def loop(self, in_queue, out_queue): elif cmd == "stop": reply = self.stop() elif cmd == "environment_event": - node_name = scenario.get('node_name', None) + target_node = scenario.get('target_node', None) action = scenario.get('action', None) action_args = scenario.get('action_args', None) if action == "kill_container": - reply = self.kill_container(node_name) + reply = self.kill_container(target_node) elif action == "update_cpu_limit": - reply= self.update_cpu_limit(node_name, action_args) + reply= self.update_cpu_limit(target_node, action_args) elif action == "update_memory_limit": - reply= self.update_memory_limit(node_name, action_args) + reply= self.update_memory_limit(target_node, action_args) elif action == "update_link": reply= self.update_link(action_args) else: reply = {} + elif cmd == "current_topology": + reply = self.get_current_topology() else: reply = {} @@ -101,9 +103,24 @@ def stop(self): } return ack - def kill_container(self, node_name): - ok, err_msg = self.exp_topo.kill_container(node_name) - logger.info("Terminating container name: %s", node_name) + def get_current_topology(self): + ok, info = "True", {} + if self.exp_topo: + ok, info = self.exp_topo.get_current_topology() + + ack = { + 'ok': str(ok), + 'msg': { + 'info': info, + 'error': "", + } + } + + return ack + + def kill_container(self, target_node): + ok, err_msg = self.exp_topo.kill_container(target_node) + logger.info("Terminating container name: %s", target_node) ack = { 'ok': str(ok), @@ -115,15 +132,15 @@ def kill_container(self, node_name): return ack - def update_cpu_limit(self, node_name, params): + def update_cpu_limit(self, target_node, params): cpu_quota = params.get('cpu_quota', -1) cpu_period = params.get('cpu_period', -1) cpu_shares = params.get('cpu_shares', -1) cores = params.get('cores', None) - ok, err_msg = self.exp_topo.update_cpu_limit(node_name, + ok, err_msg = self.exp_topo.update_cpu_limit(target_node, cpu_quota, cpu_period,cpu_shares, cores) - logger.info("Updating cpu limit of %s with %s", node_name, params) + logger.info("Updating cpu limit of %s with %s", target_node, params) ack = { 'ok': str(ok), @@ -135,13 +152,13 @@ def update_cpu_limit(self, node_name, params): return ack - def update_memory_limit(self, node_name, params): + def update_memory_limit(self, target_node, params): mem_limit = params.get('mem_limit', -1) memswap_limit = params.get('memswap_limit', -1) - ok, err_msg = self.exp_topo.update_memory_limit(node_name, + ok, err_msg = self.exp_topo.update_memory_limit(target_node, mem_limit, memswap_limit) - logger.info("Updating mem limit of %s with %s", node_name, params) + logger.info("Updating mem limit of %s with %s", target_node, params) ack = { 'ok': str(ok), @@ -215,8 +232,8 @@ async def play(self, id, command, scenario): logger.debug("Stopping running playground") await self.call("stop", None) self.stop() - - self.start() + + self.start() reply = await self.call(command, scenario) elif command == "stop": @@ -224,9 +241,11 @@ async def play(self, id, command, scenario): self.stop() elif command == "environment_event": reply = await self.call(command, scenario) + elif command == "current_topology": + reply = await self.call(command, scenario) else: - logger.debug(f"Unkown playground command {command}") - return False, {} + logger.warn(f"Unkown playground command {command}") + return "False", {} ack, info = reply.get("ok"), reply.get("msg") return ack, info @@ -259,13 +278,30 @@ async def Establish(self, stream): deploy_dict = json_format.MessageToDict(deploy, preserving_proto_field_name=True) id = deploy_dict.get("id") - command = deploy_dict.get("workflow") + command = deploy_dict.get("command") ok, msg = await self.play(id, command, scenario) - logger.debug(f"Playground msg: {msg}") + logger.debug(f"command = {command}, Playground msg = {msg}") error = msg.get("error") built_info = self.serialize_bytes(msg.get("info")) built = Status(id=id, ok=ok, error=error, info=built_info) await stream.send_message(built) + + async def CurrentTopology(self, stream): + wflow_raw = await stream.recv_message() + event = self.parse_bytes(wflow_raw.scenario) + + wflow_dict = json_format.MessageToDict(wflow_raw, preserving_proto_field_name=True) + ev_id = wflow_dict.get("id") + command = wflow_dict.get("command") + + ok, msg = await self.play(id, command, scenario) + logger.debug(f"command = {command}, Playground msg = {msg}") + + error = msg.get("error") + topo_info = self.serialize_bytes(msg.get("info")) + + reply = Status(id=ev_id, ok=ok, error=error, info=topo_info) + await stream.send_message(reply)