Skip to content

Commit

Permalink
CleanUp
Browse files Browse the repository at this point in the history
Cleanup main branch
  • Loading branch information
danielwetzel committed Jul 30, 2023
1 parent 97e778a commit 6082700
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 72 deletions.
2 changes: 0 additions & 2 deletions scripts/examples/daphnelib/data-exchange-pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
# Transfer data to DaphneLib
F = dc.from_pandas(df, verbose=True)

#print(F)

print("How DAPHNE sees the data from pandas:")
F.print().compute(verbose=True)

Expand Down
4 changes: 0 additions & 4 deletions src/api/daphnelib/DaphneLibResult.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,4 @@ struct DaphneLibResult {
int64_t rows;
int64_t cols;
int64_t vtc;
//Added for Frame handling
int64_t* vtcs;
char** labels;
void** columns;
};
17 changes: 17 additions & 0 deletions src/api/python/context/daphne_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def from_numpy(self, mat: np.array, shared_memory=True, verbose=False) -> Matrix
print(f"Overall Execution time: \n{end_time - start_time} seconds\n")

return Matrix(self, 'readMatrix', unnamed_params, named_params, local_data=mat)
<<<<<<< HEAD

def from_pandas(self, df: pd.DataFrame, shared_memory=True, verbose=False) -> Frame:
"""Generates a `DAGNode` representing a frame with data given by a pandas `DataFrame`.
Expand Down Expand Up @@ -275,6 +276,22 @@ def from_tensor(self, tensor: tf.Tensor, shared_memory=True, verbose=False):
return self.from_pandas(df, shared_memory, verbose) # Using the existing from_pandas method
"""

=======

def from_pandas(self, df: pd.DataFrame) -> Frame:
"""Generates a `DAGNode` representing a frame with data given by a pandas `DataFrame`.
:param df: The pandas DataFrame.
:param args: unnamed parameters
:param kwargs: named parameters
:return: A Frame
"""

# Data transfer via files.
unnamed_params = ['"src/api/python/tmp/{file_name}.csv\"']
named_params = []
return Frame(self, 'readFrame', unnamed_params, named_params, local_data=df)

>>>>>>> parent of dbd6c738 (CleanUp)
def fill(self, arg, rows:int, cols:int) -> Matrix:
named_input_nodes = {'arg':arg, 'rows':rows, 'cols':cols}
return Matrix(self, 'fill', [], named_input_nodes=named_input_nodes)
Expand Down
19 changes: 14 additions & 5 deletions src/api/python/operator/nodes/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,34 +32,35 @@

import json
import os
from typing import Union, TYPE_CHECKING, Dict, Iterable, Optional, Sequence, List
from typing import Union, TYPE_CHECKING, Dict, Iterable, Optional, Sequence

if TYPE_CHECKING:
# to avoid cyclic dependencies during runtime
from context.daphne_context import DaphneContext

class Frame(OperationNode):
_pd_dataframe: pd.DataFrame
_column_names: Optional[List[str]] = None # Add this line

def __init__(self, daphne_context: "DaphneContext", operation: str,
unnamed_input_nodes: Union[str, Iterable[VALID_INPUT_TYPES]] = None,
named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None,
local_data: pd.DataFrame = None, brackets: bool = False,
column_names: Optional[List[str]] = None) -> "Frame": # Modify this line
local_data: pd.DataFrame = None, brackets: bool = False) -> "Frame":
is_python_local_data = False
if local_data is not None:
self._pd_dataframe = local_data
is_python_local_data = True
else:
self._pd_dataframe = None
<<<<<<< HEAD

self._column_names = column_names # Add this line

=======

>>>>>>> parent of dbd6c738 (CleanUp)
super().__init__(daphne_context, operation, unnamed_input_nodes,
named_input_nodes, OutputType.FRAME, is_python_local_data, brackets)


def code_line(self, var_name: str, unnamed_input_vars: Sequence[str], named_input_vars: Dict[str, str]) -> str:
code_line = super().code_line(var_name, unnamed_input_vars, named_input_vars).format(file_name=var_name, TMP_PATH = TMP_PATH)

Expand All @@ -83,11 +84,19 @@ def code_line(self, var_name: str, unnamed_input_vars: Sequence[str], named_inpu
)
return code_line

<<<<<<< HEAD
def compute(self, type="shared memory", isTensor=False, verbose=False) -> Union[pd.DataFrame]:
if self._is_pandas():
return self._pd_dataframe
else:
return super().compute(type, isTensor, verbose)
=======
def compute(self) -> Union[pd.DataFrame]:
if self._is_pandas():
return self._pd_dataframe
else:
return super().compute()
>>>>>>> parent of dbd6c738 (CleanUp)

def _is_pandas(self) -> bool:
return self._pd_dataframe is not None
Expand Down
4 changes: 4 additions & 0 deletions src/api/python/operator/operation_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ def compute(self, type="shared memory", isTensor=False, verbose=False):
# Still a hard copy function that creates tmp files to execute
self._script.execute()
self._script.clear(self)
<<<<<<< HEAD

if(verbose):
# Print the overall timing
Expand Down Expand Up @@ -143,6 +144,9 @@ def compute(self, type="shared memory", isTensor=False, verbose=False):
print(f"Compute Function Execution time: \n{comp_end_time - comp_start_time} seconds\n")

elif self._output_type == OutputType.FRAME and type=="files":
=======
if self._output_type == OutputType.FRAME:
>>>>>>> parent of dbd6c738 (CleanUp)
df = pd.read_csv(result)
with open(result + ".meta", "r") as f:
fmd = json.load(f)
Expand Down
10 changes: 2 additions & 8 deletions src/api/python/script_building/script.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,8 @@ def build_code(self, dag_root: DAGNode, type="shared memory"):
else:
raise RuntimeError(f"unknown way to transfer the data: '{type}'")
elif dag_root.output_type == OutputType.FRAME:
if type == "files":
self.add_code(f'writeFrame({baseOutVarString},"{TMP_PATH}/{baseOutVarString}.csv");')
return TMP_PATH + "/" + baseOutVarString + ".csv"
elif type == "shared memory":
self.add_code(f'saveDaphneLibResult({baseOutVarString});')
return None
else:
raise RuntimeError(f"unknown way to transfer the data: '{type}'")
self.add_code(f'writeFrame({baseOutVarString},"{TMP_PATH}/{baseOutVarString}.csv");')
return TMP_PATH + "/" + baseOutVarString + ".csv"
elif dag_root.output_type == OutputType.SCALAR:
# We transfer scalars back to Python by wrapping them into a 1x1 matrix.
self.add_code(f'saveDaphneLibResult(as.matrix({baseOutVarString}));')
Expand Down
53 changes: 0 additions & 53 deletions src/runtime/local/kernels/SaveDaphneLibResult.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

#include <runtime/local/context/DaphneContext.h>
#include <runtime/local/datastructures/DenseMatrix.h>
#include <runtime/local/datastructures/DataObjectFactory.h>

// ****************************************************************************
// Struct for partial template specialization
Expand Down Expand Up @@ -67,56 +66,4 @@ struct SaveDaphneLibResult<DenseMatrix<VT>> {
}
};

// ----------------------------------------------------------------------------
// Frame
// ----------------------------------------------------------------------------

template<>
struct SaveDaphneLibResult<Frame> {
static void apply(const Frame * arg, DCTX(ctx)) {
// Increase the reference counter of the data object to be transferred
// to python, such that the data is not garbage collected by DAPHNE.
// TODO But who will free the memory in the end?
// Memory allocated with new has to be freed manually with delete[].
// Therefore, we should call delete[] for vtcs and each element of labels array
// (and then for labels array itself) when you're done with these arrays.
// Delete Function for Both DenseMatrix and Frame should be implemented.

arg->increaseRefCounter();

DaphneLibResult* daphneLibRes = ctx->getUserConfig().result_struct;

if(!daphneLibRes)
throw std::runtime_error("saveDaphneLibRes(): daphneLibRes is nullptr");

std::vector<ValueTypeCode> vtcs_tmp; // The tmp arrays do not need to be deleted manually
std::vector<std::string> labels_tmp; // They are local dynamic arrays and the destructor is called automatically

for(size_t i = 0; i < arg->getNumCols(); i++) {
vtcs_tmp.push_back(arg->getSchema()[i]);
labels_tmp.push_back(arg->getLabels()[i]);
}

// Create C-Type arrays for vtcs, labels and columns
int64_t* vtcs = new int64_t[arg->getNumCols()];
char** labels = new char*[arg->getNumCols()];
void** columns = new void*[arg->getNumCols()];

// Assign the Frame Information to the C-Type Arrays
for(size_t i = 0; i < arg->getNumCols(); i++) {
vtcs[i] = static_cast<int64_t>(vtcs_tmp[i]);
labels[i] = new char[labels_tmp[i].size() + 1];
strcpy(labels[i], labels_tmp[i].c_str());

columns[i] = const_cast<void*>(reinterpret_cast<const void*>(arg->getColumnRaw(i)));
}

daphneLibRes->cols = arg->getNumCols();
daphneLibRes->rows = arg->getNumRows();
daphneLibRes->vtcs = vtcs;
daphneLibRes->labels = labels;
daphneLibRes->columns = columns;
}
};

#endif //SRC_RUNTIME_LOCAL_KERNELS_SAVEDAPHNELIBRESULT_H

0 comments on commit 6082700

Please sign in to comment.