diff --git a/scripts/examples/daphnelib/data-exchange-pandas.py b/scripts/examples/daphnelib/data-exchange-pandas.py index b50ddbbf0..7ff549ecf 100644 --- a/scripts/examples/daphnelib/data-exchange-pandas.py +++ b/scripts/examples/daphnelib/data-exchange-pandas.py @@ -23,8 +23,6 @@ # Transfer data to DaphneLib F = dc.from_pandas(df, verbose=True) -#print(F) - print("How DAPHNE sees the data from pandas:") F.print().compute(verbose=True) diff --git a/src/api/daphnelib/DaphneLibResult.h b/src/api/daphnelib/DaphneLibResult.h index 4dbd0428b..4f20a665e 100644 --- a/src/api/daphnelib/DaphneLibResult.h +++ b/src/api/daphnelib/DaphneLibResult.h @@ -23,8 +23,4 @@ struct DaphneLibResult { int64_t rows; int64_t cols; int64_t vtc; - //Added for Frame handling - int64_t* vtcs; - char** labels; - void** columns; }; \ No newline at end of file diff --git a/src/api/python/context/daphne_context.py b/src/api/python/context/daphne_context.py index 82e42a0ea..e9b985c90 100644 --- a/src/api/python/context/daphne_context.py +++ b/src/api/python/context/daphne_context.py @@ -106,6 +106,7 @@ def from_numpy(self, mat: np.array, shared_memory=True, verbose=False) -> Matrix print(f"Overall Execution time: \n{end_time - start_time} seconds\n") return Matrix(self, 'readMatrix', unnamed_params, named_params, local_data=mat) +<<<<<<< HEAD def from_pandas(self, df: pd.DataFrame, shared_memory=True, verbose=False) -> Frame: """Generates a `DAGNode` representing a frame with data given by a pandas `DataFrame`. @@ -275,6 +276,22 @@ def from_tensor(self, tensor: tf.Tensor, shared_memory=True, verbose=False): return self.from_pandas(df, shared_memory, verbose) # Using the existing from_pandas method """ +======= + + def from_pandas(self, df: pd.DataFrame) -> Frame: + """Generates a `DAGNode` representing a frame with data given by a pandas `DataFrame`. + :param df: The pandas DataFrame. + :param args: unnamed parameters + :param kwargs: named parameters + :return: A Frame + """ + + # Data transfer via files. + unnamed_params = ['"src/api/python/tmp/{file_name}.csv\"'] + named_params = [] + return Frame(self, 'readFrame', unnamed_params, named_params, local_data=df) + +>>>>>>> parent of dbd6c738 (CleanUp) def fill(self, arg, rows:int, cols:int) -> Matrix: named_input_nodes = {'arg':arg, 'rows':rows, 'cols':cols} return Matrix(self, 'fill', [], named_input_nodes=named_input_nodes) diff --git a/src/api/python/operator/nodes/frame.py b/src/api/python/operator/nodes/frame.py index a32348b3f..5e3014f71 100644 --- a/src/api/python/operator/nodes/frame.py +++ b/src/api/python/operator/nodes/frame.py @@ -32,7 +32,7 @@ import json import os -from typing import Union, TYPE_CHECKING, Dict, Iterable, Optional, Sequence, List +from typing import Union, TYPE_CHECKING, Dict, Iterable, Optional, Sequence if TYPE_CHECKING: # to avoid cyclic dependencies during runtime @@ -40,26 +40,27 @@ class Frame(OperationNode): _pd_dataframe: pd.DataFrame - _column_names: Optional[List[str]] = None # Add this line def __init__(self, daphne_context: "DaphneContext", operation: str, unnamed_input_nodes: Union[str, Iterable[VALID_INPUT_TYPES]] = None, named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None, - local_data: pd.DataFrame = None, brackets: bool = False, - column_names: Optional[List[str]] = None) -> "Frame": # Modify this line + local_data: pd.DataFrame = None, brackets: bool = False) -> "Frame": is_python_local_data = False if local_data is not None: self._pd_dataframe = local_data is_python_local_data = True else: self._pd_dataframe = None +<<<<<<< HEAD self._column_names = column_names # Add this line +======= + +>>>>>>> parent of dbd6c738 (CleanUp) super().__init__(daphne_context, operation, unnamed_input_nodes, named_input_nodes, OutputType.FRAME, is_python_local_data, brackets) - def code_line(self, var_name: str, unnamed_input_vars: Sequence[str], named_input_vars: Dict[str, str]) -> str: code_line = super().code_line(var_name, unnamed_input_vars, named_input_vars).format(file_name=var_name, TMP_PATH = TMP_PATH) @@ -83,11 +84,19 @@ def code_line(self, var_name: str, unnamed_input_vars: Sequence[str], named_inpu ) return code_line +<<<<<<< HEAD def compute(self, type="shared memory", isTensor=False, verbose=False) -> Union[pd.DataFrame]: if self._is_pandas(): return self._pd_dataframe else: return super().compute(type, isTensor, verbose) +======= + def compute(self) -> Union[pd.DataFrame]: + if self._is_pandas(): + return self._pd_dataframe + else: + return super().compute() +>>>>>>> parent of dbd6c738 (CleanUp) def _is_pandas(self) -> bool: return self._pd_dataframe is not None diff --git a/src/api/python/operator/operation_node.py b/src/api/python/operator/operation_node.py index 2e685e354..e0ab4b177 100644 --- a/src/api/python/operator/operation_node.py +++ b/src/api/python/operator/operation_node.py @@ -89,6 +89,7 @@ def compute(self, type="shared memory", isTensor=False, verbose=False): # Still a hard copy function that creates tmp files to execute self._script.execute() self._script.clear(self) +<<<<<<< HEAD if(verbose): # Print the overall timing @@ -143,6 +144,9 @@ def compute(self, type="shared memory", isTensor=False, verbose=False): print(f"Compute Function Execution time: \n{comp_end_time - comp_start_time} seconds\n") elif self._output_type == OutputType.FRAME and type=="files": +======= + if self._output_type == OutputType.FRAME: +>>>>>>> parent of dbd6c738 (CleanUp) df = pd.read_csv(result) with open(result + ".meta", "r") as f: fmd = json.load(f) diff --git a/src/api/python/script_building/script.py b/src/api/python/script_building/script.py index 287c403dd..e346c756e 100644 --- a/src/api/python/script_building/script.py +++ b/src/api/python/script_building/script.py @@ -60,14 +60,8 @@ def build_code(self, dag_root: DAGNode, type="shared memory"): else: raise RuntimeError(f"unknown way to transfer the data: '{type}'") elif dag_root.output_type == OutputType.FRAME: - if type == "files": - self.add_code(f'writeFrame({baseOutVarString},"{TMP_PATH}/{baseOutVarString}.csv");') - return TMP_PATH + "/" + baseOutVarString + ".csv" - elif type == "shared memory": - self.add_code(f'saveDaphneLibResult({baseOutVarString});') - return None - else: - raise RuntimeError(f"unknown way to transfer the data: '{type}'") + self.add_code(f'writeFrame({baseOutVarString},"{TMP_PATH}/{baseOutVarString}.csv");') + return TMP_PATH + "/" + baseOutVarString + ".csv" elif dag_root.output_type == OutputType.SCALAR: # We transfer scalars back to Python by wrapping them into a 1x1 matrix. self.add_code(f'saveDaphneLibResult(as.matrix({baseOutVarString}));') diff --git a/src/runtime/local/kernels/SaveDaphneLibResult.h b/src/runtime/local/kernels/SaveDaphneLibResult.h index 8182d05ec..7e158b56f 100644 --- a/src/runtime/local/kernels/SaveDaphneLibResult.h +++ b/src/runtime/local/kernels/SaveDaphneLibResult.h @@ -19,7 +19,6 @@ #include #include -#include // **************************************************************************** // Struct for partial template specialization @@ -67,56 +66,4 @@ struct SaveDaphneLibResult> { } }; -// ---------------------------------------------------------------------------- -// Frame -// ---------------------------------------------------------------------------- - -template<> -struct SaveDaphneLibResult { - static void apply(const Frame * arg, DCTX(ctx)) { - // Increase the reference counter of the data object to be transferred - // to python, such that the data is not garbage collected by DAPHNE. - // TODO But who will free the memory in the end? - // Memory allocated with new has to be freed manually with delete[]. - // Therefore, we should call delete[] for vtcs and each element of labels array - // (and then for labels array itself) when you're done with these arrays. - // Delete Function for Both DenseMatrix and Frame should be implemented. - - arg->increaseRefCounter(); - - DaphneLibResult* daphneLibRes = ctx->getUserConfig().result_struct; - - if(!daphneLibRes) - throw std::runtime_error("saveDaphneLibRes(): daphneLibRes is nullptr"); - - std::vector vtcs_tmp; // The tmp arrays do not need to be deleted manually - std::vector labels_tmp; // They are local dynamic arrays and the destructor is called automatically - - for(size_t i = 0; i < arg->getNumCols(); i++) { - vtcs_tmp.push_back(arg->getSchema()[i]); - labels_tmp.push_back(arg->getLabels()[i]); - } - - // Create C-Type arrays for vtcs, labels and columns - int64_t* vtcs = new int64_t[arg->getNumCols()]; - char** labels = new char*[arg->getNumCols()]; - void** columns = new void*[arg->getNumCols()]; - - // Assign the Frame Information to the C-Type Arrays - for(size_t i = 0; i < arg->getNumCols(); i++) { - vtcs[i] = static_cast(vtcs_tmp[i]); - labels[i] = new char[labels_tmp[i].size() + 1]; - strcpy(labels[i], labels_tmp[i].c_str()); - - columns[i] = const_cast(reinterpret_cast(arg->getColumnRaw(i))); - } - - daphneLibRes->cols = arg->getNumCols(); - daphneLibRes->rows = arg->getNumRows(); - daphneLibRes->vtcs = vtcs; - daphneLibRes->labels = labels; - daphneLibRes->columns = columns; - } -}; - #endif //SRC_RUNTIME_LOCAL_KERNELS_SAVEDAPHNELIBRESULT_H