"""Welcome to Reflex! This file outlines the steps to create a basic app.""" import reflex as rx from typing import Literal, Union from sqlmodel import select, asc, desc, or_, func, cast, String from datetime import datetime, timedelta #LiteralStatus = Literal["Delivered", "Pending", "Cancelled"] def _get_percentage_change(value: Union[int, float], prev_value: Union[int, float]) -> float: percentage_change = ( round(((value - prev_value) / prev_value) * 100, 2) if prev_value != 0 else 0 if value == 0 else float("inf") ) return percentage_change class Customer(rx.Model, table=True): """The customer model.""" name: str email: str phone: str address: str date: str payments: float status: str class MonthValues(rx.Base): """Values for a month.""" num_customers: int = 0 total_payments: float = 0.0 num_delivers: int = 0 class State(rx.State): """The app state.""" users: list[Customer] = [] sort_value: str = "" sort_reverse: bool = False search_value: str = "" current_user: Customer = Customer() # Values for current and previous month current_month_values: MonthValues = MonthValues() previous_month_values: MonthValues = MonthValues() def load_entries(self) -> list[Customer]: """Get all users from the database.""" with rx.session() as session: query = select(Customer) if self.search_value: search_value = f"%{str(self.search_value).lower()}%" query = query.where( or_( *[ getattr(Customer, field).ilike(search_value) for field in Customer.get_fields() if field not in ["id", "payments"] ], # ensures that payments is cast to a string before applying the ilike operator cast(Customer.payments, String).ilike(search_value) ) ) if self.sort_value: sort_column = getattr(Customer, self.sort_value) if self.sort_value == "payments": order = desc(sort_column) if self.sort_reverse else asc(sort_column) else: order = desc(func.lower(sort_column)) if self.sort_reverse else asc(func.lower(sort_column)) query = query.order_by(order) self.users = session.exec(query).all() self.get_current_month_values() self.get_previous_month_values() def get_current_month_values(self): """Calculate current month's values.""" now = datetime.now() start_of_month = datetime(now.year, now.month, 1) current_month_users = [ user for user in self.users if datetime.strptime(user.date, '%Y-%m-%d %H:%M:%S') >= start_of_month ] num_customers = len(current_month_users) total_payments = sum(user.payments for user in current_month_users) num_delivers = len([user for user in current_month_users if user.status == "Delivered"]) self.current_month_values = MonthValues(num_customers=num_customers, total_payments=total_payments, num_delivers=num_delivers) def get_previous_month_values(self): """Calculate previous month's values.""" now = datetime.now() first_day_of_current_month = datetime(now.year, now.month, 1) last_day_of_last_month = first_day_of_current_month - timedelta(days=1) start_of_last_month = datetime(last_day_of_last_month.year, last_day_of_last_month.month, 1) previous_month_users = [ user for user in self.users if start_of_last_month <= datetime.strptime(user.date, '%Y-%m-%d %H:%M:%S') <= last_day_of_last_month ] # We add some dummy values to simulate growth/decline. Remove them in production. num_customers = len(previous_month_users) + 3 total_payments = sum(user.payments for user in previous_month_users) + 240 num_delivers = len([user for user in previous_month_users if user.status == "Delivered"]) + 5 self.previous_month_values = MonthValues(num_customers=num_customers, total_payments=total_payments, num_delivers=num_delivers) def sort_values(self, sort_value: str): self.sort_value = sort_value self.load_entries() def toggle_sort(self): self.sort_reverse = not self.sort_reverse self.load_entries() def filter_values(self, search_value): self.search_value = search_value self.load_entries() def get_user(self, user: Customer): self.current_user = user def add_customer_to_db(self, form_data: dict): self.current_user = form_data self.current_user["date"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") with rx.session() as session: if session.exec( select(Customer).where(Customer.email == self.current_user["email"]) ).first(): return rx.window_alert("User with this email already exists") session.add(Customer(**self.current_user)) session.commit() self.load_entries() return rx.toast.info(f"User {self.current_user['name']} has been added.", variant="outline", position="bottom-right") def update_customer_to_db(self, form_data: dict): self.current_user.update(form_data) with rx.session() as session: customer = session.exec( select(Customer).where(Customer.id == self.current_user["id"]) ).first() for field in Customer.get_fields(): if field != "id": setattr(customer, field, self.current_user[field]) session.add(customer) session.commit() self.load_entries() return rx.toast.info(f"User {self.current_user['name']} has been modified.", variant="outline", position="bottom-right") def delete_customer(self, id: int): """Delete a customer from the database.""" with rx.session() as session: customer = session.exec(select(Customer).where(Customer.id == id)).first() session.delete(customer) session.commit() self.load_entries() return rx.toast.info(f"User {customer.name} has been deleted.", variant="outline", position="bottom-right") @rx.var def payments_change(self) -> float: return _get_percentage_change(self.current_month_values.total_payments, self.previous_month_values.total_payments) @rx.var def customers_change(self) -> float: return _get_percentage_change(self.current_month_values.num_customers, self.previous_month_values.num_customers) @rx.var def delivers_change(self) -> float: return _get_percentage_change(self.current_month_values.num_delivers, self.previous_month_values.num_delivers) import reflex as rx def _badge(icon: str, text: str, color_scheme: str): return rx.badge(rx.icon(icon, size=16), text, color_scheme=color_scheme, radius="full", variant="soft", size="3") def status_badge(status: str): badge_mapping = { "Delivered": ("check", "Delivered", "green"), "Pending": ("loader", "Pending", "yellow"), "Cancelled": ("ban", "Cancelled", "red") } return _badge(*badge_mapping.get(status, ("loader", "Pending", "yellow"))) def form_field( label: str, placeholder: str, type: str, name: str, icon: str, default_value: str = "" ) -> rx.Component: return rx.form.field( rx.flex( rx.hstack( rx.icon(icon, size=16, stroke_width=1.5), rx.form.label(label), align="center", spacing="2", ), rx.form.control( rx.input( placeholder=placeholder, type=type, default_value=default_value ), as_child=True, ), direction="column", spacing="1", ), name=name, width="100%", ) from reflex.components.radix.themes.base import ( LiteralAccentColor, ) #from ..backend.backend import State def _arrow_badge(arrow_icon: str, percentage_change: float, arrow_color: str): return rx.badge( rx.icon( tag=arrow_icon, color=rx.color(arrow_color, 9), ), rx.text( f"{percentage_change}%", size="2", color=rx.color(arrow_color, 9), weight="medium", ), color_scheme=arrow_color, radius="large", align_items="center", ) def stats_card(stat_name: str, value: int, prev_value: int, percentage_change: float, icon: str, icon_color: LiteralAccentColor, extra_char: str = "") -> rx.Component: return rx.card( rx.hstack( rx.vstack( rx.hstack( rx.hstack( rx.icon( tag=icon, size=22, color=rx.color(icon_color, 11), ), rx.text( stat_name, size="4", weight="medium", color=rx.color("gray", 11), ), spacing="2", align="center", ), rx.cond( value > prev_value, _arrow_badge("trending-up", percentage_change, "grass"), _arrow_badge("trending-down", percentage_change, "tomato"), ), justify="between", width="100%", ), rx.hstack( rx.heading( f"{extra_char}{value:,}", size="7", weight="bold", ), rx.text( f"from {extra_char}{prev_value:,}", size="3", color=rx.color("gray", 10), ), spacing="2", align_items="end", ), align_items="start", justify="between", width="100%", ), align_items="start", width="100%", justify="between", ), size="3", width="100%", max_width="22rem", ) def stats_cards_group() -> rx.Component: return rx.flex( stats_card("Total Customers", State.current_month_values.num_customers, State.previous_month_values.num_customers, State.customers_change, "users", "blue"), stats_card("Total Payments", State.current_month_values.total_payments, State.previous_month_values.total_payments, State.payments_change, "dollar-sign", "orange", "$"), stats_card("Total Delivers", State.current_month_values.num_delivers, State.previous_month_values.num_delivers, State.delivers_change, "truck", "ruby"), spacing="5", width="100%", wrap="wrap", display=["none", "none", "flex"], ) def navbar(): return rx.flex( rx.badge( rx.icon(tag="table-2", size=28), rx.heading("Customer Data App", size="6"), color_scheme="green", radius="large", align="center", variant="surface", padding="0.65rem", ), rx.spacer(), rx.hstack( rx.logo(), rx.color_mode.button(), align="center", spacing="3", ), spacing="2", flex_direction=["column", "column", "row"], align="center", width="100%", top="0px", padding_top="2em", ) #from ..backend.backend import State, Customer #from ..components.form_field import form_field #from ..components.status_badges import status_badge def show_customer(user: Customer): """Show a customer in a table row.""" return rx.table.row( rx.table.cell(user.name), rx.table.cell(user.email), rx.table.cell(user.phone), rx.table.cell(user.address), rx.table.cell(f"${user.payments:,}"), rx.table.cell(user.date), rx.table.cell(rx.match( user.status, ("Delivered", status_badge("Delivered")), ("Pending", status_badge("Pending")), ("Cancelled", status_badge("Cancelled")), status_badge("Pending") )), rx.table.cell( rx.hstack( update_customer_dialog(user), rx.icon_button( rx.icon("trash-2", size=22), on_click=lambda: State.delete_customer(getattr(user, "id")), size="2", variant="solid", color_scheme="red", ), ) ), style={"_hover": {"bg": rx.color("gray", 3)}}, align="center", ) def add_customer_button() -> rx.Component: return rx.dialog.root( rx.dialog.trigger( rx.button( rx.icon("plus", size=26), rx.text("Add Customer", size="4", display=[ "none", "none", "block"]), size="3", ), ), rx.dialog.content( rx.hstack( rx.badge( rx.icon(tag="users", size=34), color_scheme="grass", radius="full", padding="0.65rem", ), rx.vstack( rx.dialog.title( "Add New Customer", weight="bold", margin="0", ), rx.dialog.description( "Fill the form with the customer's info", ), spacing="1", height="100%", align_items="start", ), height="100%", spacing="4", margin_bottom="1.5em", align_items="center", width="100%", ), rx.flex( rx.form.root( rx.flex( # Name form_field( "Name", "Customer Name", "text", "name", "user", ), # Email form_field( "Email", "user@reflex.dev", "email", "email", "mail" ), # Phone form_field( "Phone", "Customer Phone", "tel", "phone", "phone" ), # Address form_field( "Address", "Customer Address", "text", "address", "home" ), # Payments form_field( "Payment ($)", "Customer Payment", "number", "payments", "dollar-sign" ), # Status rx.vstack( rx.hstack( rx.icon("truck", size=16, stroke_width=1.5), rx.text("Status"), align="center", spacing="2", ), rx.radio( ["Delivered", "Pending", "Cancelled"], name="status", direction="row", as_child=True, required=True, ), ), direction="column", spacing="3", ), rx.flex( rx.dialog.close( rx.button( "Cancel", variant="soft", color_scheme="gray", ), ), rx.form.submit( rx.dialog.close( rx.button("Submit Customer"), ), as_child=True, ), padding_top="2em", spacing="3", mt="4", justify="end", ), on_submit=State.add_customer_to_db, reset_on_submit=False, ), width="100%", direction="column", spacing="4", ), style={"max_width": 450}, box_shadow="lg", padding="1.5em", border=f"2px solid {rx.color('accent', 7)}", border_radius="25px", ), ) def update_customer_dialog(user): return rx.dialog.root( rx.dialog.trigger( rx.button( rx.icon("square-pen", size=22), rx.text("Edit", size="3"), color_scheme="blue", size="2", variant="solid", on_click=lambda: State.get_user(user), ), ), rx.dialog.content( rx.hstack( rx.badge( rx.icon(tag="square-pen", size=34), color_scheme="green", radius="full", padding="0.65rem", ), rx.vstack( rx.dialog.title( "Edit Customer", weight="bold", margin="0", ), rx.dialog.description( "Edit the customer's info", ), spacing="1", height="100%", align_items="start", ), height="100%", spacing="4", margin_bottom="1.5em", align_items="center", width="100%", ), rx.flex( rx.form.root( rx.flex( # Name form_field( "Name", "Customer Name", "text", "name", "user", user.name, ), # Email form_field( "Email", "user@reflex.dev", "email", "email", "mail", user.email, ), # Phone form_field( "Phone", "Customer Phone", "tel", "phone", "phone", user.phone, ), # Address form_field( "Address", "Customer Address", "text", "address", "home", user.address, ), # Payments form_field( "Payment ($)", "Customer Payment", "number", "payments", "dollar-sign", user.payments.to(str) ), # Status rx.vstack( rx.hstack( rx.icon("truck", size=16, stroke_width=1.5), rx.text("Status"), align="center", spacing="2", ), rx.radio( ["Delivered", "Pending", "Cancelled"], default_value=user.status, name="status", direction="row", as_child=True, required=True, ), ), direction="column", spacing="3", ), rx.flex( rx.dialog.close( rx.button( "Cancel", variant="soft", color_scheme="gray", ), ), rx.form.submit( rx.dialog.close( rx.button("Update Customer"), ), as_child=True, ), padding_top="2em", spacing="3", mt="4", justify="end", ), on_submit=State.update_customer_to_db, reset_on_submit=False, ), width="100%", direction="column", spacing="4", ), style={"max_width": 450}, box_shadow="lg", padding="1.5em", border=f"2px solid {rx.color('accent', 7)}", border_radius="25px", ), ) def _header_cell(text: str, icon: str): return rx.table.column_header_cell( rx.hstack( rx.icon(icon, size=18), rx.text(text), align="center", spacing="2", ), ) def main_table(): return rx.fragment( rx.flex( add_customer_button(), rx.spacer(), rx.hstack( rx.cond( State.sort_reverse, rx.icon("arrow-down-z-a", size=28, stroke_width=1.5, cursor="pointer", on_click=State.toggle_sort), rx.icon("arrow-down-a-z", size=28, stroke_width=1.5, cursor="pointer", on_click=State.toggle_sort), ), rx.select( ["name", "email", "phone", "address", "payments", "date", "status"], placeholder="Sort By: Name", size="3", on_change=lambda sort_value: State.sort_values(sort_value), ), rx.input( placeholder="Search here...", size="3", on_change=lambda value: State.filter_values(value), ), spacing="3", align="center", ), spacing="3", wrap="wrap", width="100%", padding_bottom="1em", ), rx.table.root( rx.table.header( rx.table.row( _header_cell("Name", "user"), _header_cell("Email", "mail"), _header_cell("Phone", "phone"), _header_cell("Address", "home"), _header_cell("Payments", "dollar-sign"), _header_cell("Date", "calendar"), _header_cell("Status", "truck"), _header_cell("Actions", "cog"), ), ), rx.table.body(rx.foreach(State.users, show_customer)), variant="surface", size="3", width="100%", on_mount=State.load_entries, ), ) #from customer_data_app.backend.backend import State #from .components.stats_cards import stats_cards_group #from .views.navbar import navbar #from .views.main import main_table def index() -> rx.Component: return rx.vstack( navbar(), stats_cards_group(), rx.box( main_table(), width="100%", ), width="100%", spacing="6", padding_x=["1.5em", "1.5em", "3em"], ) # Create app instance and add index page. app = rx.App( theme=rx.theme( appearance="dark", has_background=True, radius="large", accent_color="grass" ), ) app.add_page( index, title="Customer Data App", description="A simple app to manage customer data.", )