Skip to content

Commit

Permalink
Add socket sharing
Browse files Browse the repository at this point in the history
  • Loading branch information
sansyrox committed Oct 13, 2021
1 parent e26975e commit 8163dbc
Show file tree
Hide file tree
Showing 13 changed files with 309 additions and 6 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ actix-web = "4.0.0-beta.8"
actix-files = "0.6.0-beta.4"
futures-util = "0.3.15"
matchit = "0.4.3"
actix-http = "3.0.0-beta.8"
socket2 = { version = "0.4.1", features = ["all"] }

[package.metadata.maturin]
name = "robyn"
36 changes: 33 additions & 3 deletions robyn/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,33 @@
import argparse
import asyncio

from .robyn import Server
from .robyn import Server, SocketHeld
from .responses import static_file, jsonify
from .dev_event_handler import EventHandler
from .log_colors import Colors
from multiprocessing import Process


from watchdog.observers import Observer


def spawned_process(handlers, socket, name):
import asyncio
import uvloop

uvloop.install()
loop = uvloop.new_event_loop()
asyncio.set_event_loop(loop)

# create a server
server = Server()

for i in handlers:
server.add_route(i[0], i[1], i[2], i[3])

server.start(socket, name)
asyncio.get_event_loop().run_forever()


class Robyn:
"""This is the python wrapper for the Robyn binaries.
Expand All @@ -22,6 +40,8 @@ def __init__(self, file_object):
self.directory_path = directory_path
self.server = Server(directory_path)
self.dev = self._is_dev()
self.routes = []
self.headers = []

def _is_dev(self):
parser = argparse.ArgumentParser()
Expand All @@ -40,8 +60,8 @@ def add_route(self, route_type, endpoint, handler):

""" We will add the status code here only
"""
self.server.add_route(
route_type, endpoint, handler, asyncio.iscoroutinefunction(handler)
self.routes.append(
( route_type, endpoint, handler, asyncio.iscoroutinefunction(handler) )
)

def add_directory(self, route, directory_path, index_file=None, show_files_listing=False):
Expand All @@ -59,7 +79,17 @@ def start(self, url="127.0.0.1", port="5000"):
:param port [int]: [reperesents the port number at which the server is listening]
"""
socket = SocketHeld(f"0.0.0.0:{port}", port)
if not self.dev:
for i in range(2):
copied = socket.try_clone()
p = Process(
target=spawned_process,
args=(self.routes, copied, f"Process {i}"),
)
p.start()

input("Press Cntrl + C to stop \n")
self.server.start(url, port)
else:
event_handler = EventHandler(self.file_path)
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
mod processor;
mod router;
mod server;
mod shared_socket;
mod types;

use server::Server;
use shared_socket::SocketHeld;

// pyO3 module
use pyo3::prelude::*;
Expand All @@ -12,6 +14,7 @@ use pyo3::prelude::*;
pub fn robyn(_py: Python<'_>, m: &PyModule) -> PyResult<()> {
// the pymodule class to make the rustPyFunctions available
m.add_class::<Server>()?;
m.add_class::<SocketHeld>()?;
pyo3::prepare_freethreaded_python();
Ok(())
}
28 changes: 25 additions & 3 deletions src/server.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
use crate::processor::{apply_headers, handle_request};
use crate::router::Router;
use crate::shared_socket::SocketHeld;
use crate::types::Headers;
use actix_files::Files;
use std::convert::TryInto;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::{Relaxed, SeqCst};
use std::sync::{Arc, RwLock};
use std::thread;
// pyO3 module
use actix_http::KeepAlive;
use actix_web::*;
use dashmap::DashMap;
use pyo3::prelude::*;
use pyo3::types::PyAny;

// hyper modules
use socket2::{Domain, Protocol, Socket, Type};

static STARTED: AtomicBool = AtomicBool::new(false);

#[derive(Clone)]
Expand Down Expand Up @@ -41,15 +46,28 @@ impl Server {
}
}

pub fn start(&mut self, py: Python, url: String, port: u16) {
pub fn start(
&mut self,
py: Python,
url: String,
port: u16,
socket: &PyCell<SocketHeld>,
name: String,
) -> PyResult<()> {
if STARTED
.compare_exchange(false, true, SeqCst, Relaxed)
.is_err()
{
println!("Already running...");
return;
return Ok(());
}

let borrow = socket.try_borrow_mut()?;
let held_socket: &SocketHeld = &*borrow;

let raw_socket = held_socket.get_socket();
println!("Got our socket {:?}", raw_socket);

let router = self.router.clone();
let headers = self.headers.clone();
let directories = self.directories.clone();
Expand Down Expand Up @@ -103,7 +121,10 @@ impl Server {
})
}))
})
.bind(addr)
.keep_alive(KeepAlive::Os)
.workers(1)
.client_timeout(0)
.listen(raw_socket.try_into().unwrap())
.unwrap()
.run()
.await
Expand All @@ -112,6 +133,7 @@ impl Server {
});

event_loop.call_method0("run_forever").unwrap();
Ok(())
}

pub fn add_directory(
Expand Down
37 changes: 37 additions & 0 deletions src/shared_socket.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use pyo3::prelude::*;

use socket2::{Domain, Protocol, Socket, Type};
use std::net::SocketAddr;

#[pyclass]
#[derive(Debug)]
pub struct SocketHeld {
pub socket: Socket,
}

#[pymethods]
impl SocketHeld {
#[new]
pub fn new(address: String, port: i32) -> PyResult<SocketHeld> {
let socket = Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP))?;

let address: SocketAddr = address.parse()?;
socket.set_reuse_address(true)?;
//socket.set_reuse_port(true)?;
socket.bind(&address.into())?;
socket.listen(1024)?;

Ok(SocketHeld { socket })
}

pub fn try_clone(&self) -> PyResult<SocketHeld> {
let copied = self.socket.try_clone()?;
Ok(SocketHeld { socket: copied })
}
}

impl SocketHeld {
pub fn get_socket(&self) -> Socket {
self.socket.try_clone().unwrap()
}
}
7 changes: 7 additions & 0 deletions test_python/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from fastapi import FastAPI

app = FastAPI()

@app.get("/")
def read_root():
return {"Hello": "World"}
22 changes: 22 additions & 0 deletions test_python/mysite/manage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#!/usr/bin/env python
"""Django's command-line utility for administrative tasks."""
import os
import sys


def main():
"""Run administrative tasks."""
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mysite.settings')
try:
from django.core.management import execute_from_command_line
except ImportError as exc:
raise ImportError(
"Couldn't import Django. Are you sure it's installed and "
"available on your PYTHONPATH environment variable? Did you "
"forget to activate a virtual environment?"
) from exc
execute_from_command_line(sys.argv)


if __name__ == '__main__':
main()
Empty file.
16 changes: 16 additions & 0 deletions test_python/mysite/mysite/asgi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""
ASGI config for mysite project.
It exposes the ASGI callable as a module-level variable named ``application``.
For more information on this file, see
https://docs.djangoproject.com/en/3.2/howto/deployment/asgi/
"""

import os

from django.core.asgi import get_asgi_application

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mysite.settings')

application = get_asgi_application()
Loading

0 comments on commit 8163dbc

Please sign in to comment.