Skip to content

Commit

Permalink
Merge pull request #191 from Tauffer-Consulting/fix/container-resources
Browse files Browse the repository at this point in the history
Fix/container resources
  • Loading branch information
vinicvaz authored Nov 29, 2023
2 parents 6750087 + 9fcfa88 commit a29f1e2
Show file tree
Hide file tree
Showing 11 changed files with 185 additions and 63 deletions.
15 changes: 15 additions & 0 deletions frontend/src/@types/piece/piece.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,26 @@ export interface PieceSchema {
$defs: Definitions;
}

interface ContainerResources {
limits: {
cpu: number;
memory: number;
};
requests: {
cpu: number;
memory: number;
};
use_gpu?: boolean;
}

export interface Piece {
id: number;
name: string;
description: string;

container_resources: ContainerResources;
tags: string[];

repository_id: number;

input_schema: PieceSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import * as yup from "yup";

import { type IWorkflowPieceData, storageAccessModes } from "../context/types";
import { type GenerateWorkflowsParams } from "../context/workflowsEditor";
import { containerResourcesSchema } from "../schemas/containerResourcesSchemas";
import { extractDefaultInputValues, extractDefaultValues } from "../utils";
import {
extractDefaultContainerResources,
extractDefaultInputValues,
} from "../utils";
import {
type Differences,
importJsonWorkflow,
Expand Down Expand Up @@ -352,8 +354,9 @@ export const WorkflowsEditorComponent: React.FC = () => {
const defaultInputs = extractDefaultInputValues(
piece as unknown as Piece,
);
const defaultContainerResources = extractDefaultValues(
containerResourcesSchema as any,

const defaultContainerResources = extractDefaultContainerResources(
piece?.container_resources,
);

const currentWorkflowPieces = await getForageWorkflowPieces();
Expand Down
30 changes: 28 additions & 2 deletions frontend/src/features/workflowEditor/utils/jsonSchema.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
// Extract default values from Schema

import { type IWorkflowPieceData } from "../context/types";
import { isEmpty } from "utils";

import { defaultContainerResources } from "../components/SidebarForm/ContainerResourceForm";
import {
type IContainerResourceFormData,
type IWorkflowPieceData,
} from "../context/types";

import { getFromUpstream } from "./getFromUpstream";

Expand Down Expand Up @@ -85,7 +91,7 @@ export const extractDefaultValues = (
) => {
output = output === null ? {} : output;

if (schema) {
if (!isEmpty(schema) && "properties" in schema) {
const properties = schema.properties;
for (const [key, value] of Object.entries(properties)) {
if (value?.from_upstream === "always") {
Expand All @@ -104,3 +110,23 @@ export const extractDefaultValues = (

return output;
};

export const extractDefaultContainerResources = (
cr?: Piece["container_resources"],
): IContainerResourceFormData => {
if (cr && !isEmpty(cr) && "limits" in cr && "requests" in cr) {
return {
cpu: {
max: Number(cr.limits.cpu),
min: Number(cr.requests.cpu),
},
memory: {
max: Number(cr.limits.memory),
min: Number(cr.requests.memory),
},
useGpu: cr?.use_gpu ?? false,
};
} else {
return defaultContainerResources;
}
};
1 change: 1 addition & 0 deletions rest/constants/schemas/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .container_resources import ContainerResourcesModel
11 changes: 11 additions & 0 deletions rest/constants/schemas/container_resources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from pydantic import BaseModel, Field

class SystemRequirementsModel(BaseModel):
cpu: int = Field(default=128)
memory: int = Field(memory=100)


class ContainerResourcesModel(BaseModel):
requests: SystemRequirementsModel = Field(default=SystemRequirementsModel(cpu=100, memory=128))
limits: SystemRequirementsModel = Field(default=SystemRequirementsModel(cpu=100, memory=128))
use_gpu: bool = False
30 changes: 30 additions & 0 deletions rest/database/alembic/versions/93da7356c3d7_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""empty message
Revision ID: 93da7356c3d7
Revises: f7214a10a4df
Create Date: 2023-11-29 07:55:27.576939
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '93da7356c3d7'
down_revision = 'f7214a10a4df'
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('piece', sa.Column('tags', sa.ARRAY(sa.String()), server_default='{}', nullable=False))
op.add_column('piece', sa.Column('container_resources', sa.JSON(), server_default=sa.text("'{}'::jsonb"), nullable=False))
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('piece', 'container_resources')
op.drop_column('piece', 'tags')
# ### end Alembic commands ###
4 changes: 3 additions & 1 deletion rest/database/models/piece.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from database.models.base import Base, BaseDatabaseModel
from sqlalchemy.orm import relationship
from sqlalchemy import Column, String, Integer, JSON, ForeignKey, text
from sqlalchemy import Column, String, Integer, JSON, ForeignKey, text, ARRAY

class Piece(Base, BaseDatabaseModel):
__tablename__ = "piece"
Expand All @@ -13,6 +13,8 @@ class Piece(Base, BaseDatabaseModel):
input_schema = Column(JSON, nullable=False, server_default=text("'{}'::jsonb"))
output_schema = Column(JSON, nullable=False, server_default=text("'{}'::jsonb")) # Using server default empty JSON object to avoid null value in database
secrets_schema = Column(JSON, nullable=False, server_default=text("'{}'::jsonb"))
tags = Column(ARRAY(String), nullable=False, server_default="{}")
container_resources = Column(JSON, nullable=False, server_default=text("'{}'::jsonb"))
style = Column(JSON, nullable=True)
source_url = Column(String, nullable=True)
repository_id = Column(Integer, ForeignKey('piece_repository.id', ondelete='cascade'), nullable=False)
Expand Down
2 changes: 2 additions & 0 deletions rest/schemas/responses/piece.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ class GetPiecesResponse(BaseModel):
input_schema: Optional[Dict] = None
output_schema: Optional[Dict] = None
secrets_schema: Optional[Dict] = None
container_resources: Optional[Dict] = None
tags: Optional[List[str]] = None
style: Optional[Dict] = None
source_url: Optional[str] = None
repository_url: str
Expand Down
18 changes: 9 additions & 9 deletions rest/services/piece_service.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
from typing import List
import json
from schemas.requests.piece import ListPiecesFilters
from schemas.exceptions.base import ResourceNotFoundException
from clients.github_rest_client import GithubRestClient

from constants.schemas import ContainerResourcesModel
from core.logger import get_configured_logger
from core.settings import settings
from repository.user_repository import UserRepository
from repository.workspace_repository import WorkspaceRepository
from repository.piece_repository_repository import PieceRepositoryRepository
from database.models import Piece, PieceRepository
from database.models.enums import RepositorySource
from clients.local_files_client import LocalFilesClient
from repository.piece_repository import PieceRepository
from schemas.responses.piece import GetPiecesResponse
Expand Down Expand Up @@ -46,7 +42,7 @@ def list_pieces(
Returns:
List[GetPiecesResponse]: List of all pieces data
"""

piece_repository = self.piece_repository_repository.find_by_id(id=piece_repository_id)
if not piece_repository:
raise ResourceNotFoundException(message="Workspace or Piece Repository not found")
Expand All @@ -58,13 +54,13 @@ def list_pieces(
filters=filters.model_dump(exclude_none=True),
)
return [
GetPiecesResponse(**piece.to_dict(),repository_url=piece_repository.url) for piece in pieces
GetPiecesResponse(**piece.to_dict(), repository_url=piece_repository.url) for piece in pieces
]


def check_pieces_to_update_github(
self,
repository_id: int,
self,
repository_id: int,
compiled_metadata: dict,
dependencies_map: dict,
) -> None:
Expand Down Expand Up @@ -106,6 +102,8 @@ def _update_pieces_from_metadata(self, piece_metadata: dict, dependencies_map: d
piece_style = piece_metadata.get("style")
name = piece_metadata.get("name")
style = get_frontend_node_style(module_name=name, **piece_style)

container_resources = ContainerResourcesModel(**piece_metadata.get("container_resources", {}))
new_piece = Piece(
name=piece_metadata.get("name"),
dependency=piece_metadata.get("dependency"),
Expand All @@ -115,6 +113,8 @@ def _update_pieces_from_metadata(self, piece_metadata: dict, dependencies_map: d
input_schema=piece_metadata.get("input_schema", {}),
output_schema=piece_metadata.get("output_schema", {}),
secrets_schema=piece_metadata.get("secrets_schema", {}),
container_resources=container_resources.model_dump(),
tags=piece_metadata.get("tags", []),
style=style,
repository_id=repository_id
)
Expand Down
21 changes: 3 additions & 18 deletions src/domino/base_piece.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,22 +149,6 @@ def format_xcom(self, output_obj: pydantic.BaseModel) -> dict:
self.logger.info(f"Piece {self.__class__.__name__} is not returning a valid XCOM object. Auto-generating a base XCOM for it...")
xcom_obj = dict()

# Add arguments types to XCOM
# TODO - this is a temporary solution. We should find a better way to do this
# output_schema = output_obj.model_json_schema()
# for k, v in output_schema["properties"].items():
# if "type" in v:
# # Get file-path and directory-path types
# if v["type"] == "string" and "format" in v:
# v_type = v["format"]
# else:
# v_type = v["type"]
# elif "anyOf" in v:
# if "$ref" in v["anyOf"][0]:
# type_model = v["anyOf"][0]["$ref"].split("/")[-1]
# v_type = output_schema["definitions"][type_model]["type"]
# xcom_obj[f"{k}_type"] = v_type

# Serialize self.display_result and add it to XCOM
if isinstance(self.display_result, dict):
if "file_type" not in self.display_result:
Expand All @@ -185,6 +169,7 @@ def format_xcom(self, output_obj: pydantic.BaseModel) -> dict:
self.display_result["file_path"] = None
self.display_result["file_type"] = "txt"
self.display_result["base64_content"] = base64_content

xcom_obj["display_result"] = self.display_result

# Update XCOM with extra metadata
Expand Down Expand Up @@ -240,7 +225,7 @@ def run_piece_function(
self,
piece_input_data: dict,
piece_input_model: pydantic.BaseModel,
piece_output_model: pydantic.BaseModel,
piece_output_model: pydantic.BaseModel,
piece_secrets_model: Optional[pydantic.BaseModel] = None,
airflow_context: Optional[dict] = None
):
Expand Down Expand Up @@ -397,7 +382,7 @@ def piece_function(self):
It should have all the necessary content for auto-generating json schemas.
All arguments should be type annotated and docstring should carry description for each argument.
"""
raise NotImplementedError("This method must be implemented in the child class!")
raise NotImplementedError("This method must be implemented in the child class!")

def serialize_display_result_file(self, file_path: Union[str, Path], file_type: DisplayResultFileType) -> dict:
"""
Expand Down
Loading

0 comments on commit a29f1e2

Please sign in to comment.