[docs]defupdate_named_entity(self,resource_type,id,metadata):
+ """
+ Updates the metadata associated with a named entity. A named entity is designated a resource, e.g. a workflow,
+ task or launch plan specified by {project, domain, name} across all versions of the resource.
+
+ :param int resource_type: Enum value from flytekit.models.identifier.ResourceType
+ :param flytekit.models.admin.named_entity.NamedEntityIdentifier id: identifier for named entity to update
+ :param flytekit.models.admin.named_entity.NamedEntityIdentifierMetadata metadata:
+ """
+ super(SynchronousFlyteClient,self).update_named_entity(
+ _common_pb2.NamedEntityUpdateRequest(
+ resource_type=resource_type,
+ id=id.to_flyte_idl(),
+ metadata=metadata.to_flyte_idl(),
+ )
+ )
):def__init__(self,*args,**kwargs):super(SdkLaunchPlan,self).__init__(*args,**kwargs)
+ # Set all the attributes we expect this class to haveself._id=None# The interface is not set explicitly unless fetched in an engine context
@@ -237,26 +238,6 @@
Source code for flytekit.common.launch_plan
sdk_lp._interface=lp_wf.interfacereturnsdk_lp
-
[docs]@_exception_scopes.system_entry_point
- defregister(self,project,domain,name,version):
- """
- :param Text project:
- :param Text domain:
- :param Text name:
- :param Text version:
- """
- self.validate()
- id_to_register=_identifier.Identifier(
- _identifier_model.ResourceType.LAUNCH_PLAN,
- project,
- domain,
- name,
- version
- )
- _engine_loader.get_engine().get_launch_plan(self).register(id_to_register)
- self._id=id_to_register
- return_six.text_type(self.id)
-
@propertydefid(self):"""
@@ -383,12 +364,43 @@
Source code for flytekit.common.launch_plan
@_exception_scopes.system_entry_pointdef__call__(self,*args,**input_map):
- raise_user_exceptions.FlyteAssertion(
- "TODO: Implement adding of remote launch plans to workflows. Current workaround is to add remote "
- "workflows directly."
- )
+ """
+ :param list[T] args: Do not specify. Kwargs only are supported for this function.
+ :param dict[Text,T] input_map: Map of inputs. Can be statically defined or OutputReference links.
+ :rtype: flytekit.common.nodes.SdkNode
+ """
+ iflen(args)>0:
+ raise_user_exceptions.FlyteAssertion(
+ "When adding a launchplan as a node in a workflow, all inputs must be specified with kwargs only. We "
+ "detected {} positional args.".format(self,len(args))
+ )
+
+ # Take the default values from the launch plan
+ default_inputs={
+ k:v.sdk_default
+ fork,vin_six.iteritems(self.default_inputs.parameters)ifnotv.required
+ }
+ default_inputs.update(input_map)
+
+ bindings,upstream_nodes=self.interface.create_bindings_for_inputs(default_inputs)
+
+ return_nodes.SdkNode(
+ id=None,
+ metadata=_workflow_models.NodeMetadata("",_datetime.timedelta(),_literal_models.RetryStrategy(0)),
+ bindings=sorted(bindings,key=lambdab:b.var),
+ upstream_nodes=upstream_nodes,
+ sdk_launch_plan=self
+ )
+
+ def__repr__(self):
+ """
+ :rtype: Text
+ """
+ return"SdkLaunchPlan(ID: {} Interface: {} WF ID: {})".format(self.id,self.interface,self.workflow_id)
+# The difference between this and the SdkLaunchPlan class is that this runnable class is supposed to only be used for
+# launch plans loaded alongside the current Python interpreter.
ifrole:auth=_launch_plan_models.Auth(assumable_iam_role=role)
+ # The constructor for SdkLaunchPlan sets the id to None anyways so we don't bother passing in an ID. The ID
+ # should be set in one of three places,
+ # 1) When the object is registered (in the code above)
+ # 2) By the dynamic task code after this runnable object has already been __call__'ed. The SdkNode produced
+ # maintains a link to this object and will set the ID according to the configuration variables present.
+ # 3) When SdkLaunchPlan.fetch() is runsuper(SdkRunnableLaunchPlan,self).__init__(
- _identifier.Identifier(
- _identifier_model.ResourceType.WORKFLOW,
- _internal_config.PROJECT.get(),
- _internal_config.DOMAIN.get(),
- _uuid.uuid4().hex,
- _internal_config.VERSION.get()
- ),
+ None,_launch_plan_models.LaunchPlanMetadata(schedule=scheduleor_schedule_model.Schedule(''),notifications=notificationsor[]
@@ -461,6 +473,26 @@
"""returnself._sdk_workflow.id
- @_exception_scopes.system_entry_point
- def__call__(self,*args,**input_map):
+ def__repr__(self):"""
- :param list[T] args: Do not specify. Kwargs only are supported for this function.
- :param dict[Text,T] input_map: Map of inputs. Can be statically defined or OutputReference links.
- :rtype: flytekit.common.nodes.SdkNode
+ :rtype: Text """
- iflen(args)>0:
- raise_user_exceptions.FlyteAssertion(
- "When adding a launchplan as a node in a workflow, all inputs must be specified with kwargs only. We "
- "detected {} positional args.".format(self,len(args))
- )
-
- # Take the default values from the launch plan
- default_inputs={
- k:v.sdk_default
- fork,vin_six.iteritems(self.default_inputs.parameters)ifnotv.required
- }
- default_inputs.update(input_map)
-
- bindings,upstream_nodes=self.interface.create_bindings_for_inputs(default_inputs)
-
- # TODO: Remove DEADBEEF
- return_nodes.SdkNode(
- id=None,
- metadata=_workflow_models.NodeMetadata("DEADBEEF",_datetime.timedelta(),_literal_models.RetryStrategy(0)),
- bindings=sorted(bindings,key=lambdab:b.var),
- upstream_nodes=upstream_nodes,
- sdk_launch_plan=self
- )
[docs]defauto_assign_name(self):
+ """
+ This function is a bit of trickster Python code that goes hand in hand with the _InstanceTracker metaclass
+ defined above. Thanks @matthewphsmith for this bit of ingenuity.
+
+ For instance, if a user has code that looks like this:
+
+ from some.other.module import wf
+ my_launch_plan = wf.create_launch_plan()
+
+ @dynamic_task
+ def sample_task(wf_params):
+ yield my_launch_plan()
+
+ This code means that we should have a launch plan with a name ending in "my_launch_plan", since that is the
+ name of the variable that the created launch plan gets assigned to. That is also the name that the launch plan
+ would be registered with.
+
+ However, when the create_launch_plan() function runs, the Python interpreter has no idea where the created
+ object will be assigned to. It has no idea that the output of the create_launch_plan call is to be paired up
+ with a variable named "my_launch_plan". This function basically does this after the fact. Leveraging the
+ _instantiated_in field provided by the _InstanceTracker class above, this code will re-import the
+ module (ie Python file) that the object is in. Since it's already loaded, it's just retrieved from memory.
+ It then scans all objects in the module, and when an object match is found, it knows it's found the right
+ variable name.
+
+ Just to drive the point home, this function is mostly needed for Launch Plans. Assuming that user code has:
+
+ @python_task
+ def some_task()
+
+ When Flytekit calls the module loader and loads the task, the name of the task is the name of the function
+ itself. It's known at time of creation. In contrast, when
+
+ xyz = SomeWorflow.create_launch_plan()
+
+ is called, the name of the launch plan isn't known until after creation, it's not "SomeWorkflow", it's "xyz"
+ """
+ _logging.debug("Running name auto assign")
+ m=_importlib.import_module(self.instantiated_in)
+
+ forkindir(m):
+ ifgetattr(m,k)==self:
+ self._platform_valid_name=_utils.fqdn(m.__name__,k,entity_type=self.resource_type)
+ _logging.debug("Auto-assigning name to {}".format(self._platform_valid_name))
+ return
+
+ _logging.error("Could not auto-assign name")
+ raise_system_exceptions.FlyteSystemException("Error looking for object while auto-assigning name.")
[docs]classSdkGenericSparkTask(_base_tasks.SdkTask):
+ """
+ This class includes the additional logic for building a task that executes as a Spark Job.
+
+ """
+ def__init__(
+ self,
+ task_type,
+ discovery_version,
+ retries,
+ interruptible,
+ task_inputs,
+ deprecated,
+ discoverable,
+ timeout,
+ spark_type,
+ main_class,
+ main_application_file,
+ spark_conf,
+ hadoop_conf,
+ environment,
+ ):
+ """
+ :param Text task_type: string describing the task type
+ :param Text discovery_version: string describing the version for task discovery purposes
+ :param int retries: Number of retries to attempt
+ :param bool interruptible: Whether or not task is interruptible
+ :param Text deprecated:
+ :param bool discoverable:
+ :param datetime.timedelta timeout:
+ :param Text spark_type: Type of Spark Job: Scala/Java
+ :param Text main_class: Main class to execute for Scala/Java jobs
+ :param Text main_application_file: Main application file
+ :param dict[Text,Text] spark_conf:
+ :param dict[Text,Text] hadoop_conf:
+ :param dict[Text,Text] environment: [optional] environment variables to set when executing this task.
+ """
+
+ spark_job=_task_models.SparkJob(
+ spark_conf=spark_conf,
+ hadoop_conf=hadoop_conf,
+ spark_type=spark_type,
+ application_file=main_application_file,
+ main_class=main_class,
+ executor_path=_sys.executable,
+ ).to_flyte_idl()
+
+ super(SdkGenericSparkTask,self).__init__(
+ task_type,
+ _task_models.TaskMetadata(
+ discoverable,
+ _task_models.RuntimeMetadata(
+ _task_models.RuntimeMetadata.RuntimeType.FLYTE_SDK,
+ __version__,
+ 'spark'
+ ),
+ timeout,
+ _literal_models.RetryStrategy(retries),
+ interruptible,
+ discovery_version,
+ deprecated
+ ),
+ _interface.TypedInterface({},{}),
+ _MessageToDict(spark_job),
+ )
+
+ # Add Inputs
+ iftask_inputsisnotNone:
+ task_inputs(self)
+
+ # Container after the Inputs have been updated.
+ self._container=self._get_container_definition(
+ environment=environment
+ )
+
+ def_validate_inputs(self,inputs):
+ """
+ :param dict[Text, flytekit.models.interface.Variable] inputs: Input variables to validate
+ :raises: flytekit.common.exceptions.user.FlyteValidationException
+ """
+ fork,vin_six.iteritems(inputs):
+ sdk_type=_helpers.get_sdk_type_from_literal_type(v.type)
+ ifsdk_typenotininput_types_supported:
+ raise_user_exceptions.FlyteValidationException(
+ "Input Type '{}' not supported. Only Primitives are supported for Scala/Java Spark.".format(sdk_type)
+ )
+ super(SdkGenericSparkTask,self)._validate_inputs(inputs)
+
+
[docs]@_exception_scopes.system_entry_point
+ defadd_inputs(self,inputs):
+ """
+ Adds the inputs to this task. This can be called multiple times, but it will fail if an input with a given
+ name is added more than once, a name collides with an output, or if the name doesn't exist as an arg name in
+ the wrapped function.
+ :param dict[Text, flytekit.models.interface.Variable] inputs: names and variables
+ """
+ self._validate_inputs(inputs)
+ self.interface.inputs.update(inputs)
for name,variablein_six.iteritems(self.interface.outputs)}
+ # Because users declare both inputs and outputs in their functions signatures, merge them together
+ # before calling user codeinputs_dict.update(outputs_dict)yielded_sub_tasks=[sub_taskforsub_taskinsuper(SdkDynamicTask,self)._execute_user_code(context,inputs_dict)or[]]
@@ -308,10 +315,28 @@
Source code for flytekit.common.tasks.sdk_dynamic
visited_nodes =set()generated_ids={}effective_failure_ratio=self._allowed_failure_ratioor0.0
+
+ # TODO: This function needs to be cleaned up.
+ # The reason we chain these two together is because we allow users to not have to explicitly "yield" the
+ # node. As long as the subtask/lp/subwf has an output that's referenced, it'll get picked up.forsub_task_nodein_itertools.chain(yielded_sub_tasks,upstream_nodes):ifsub_task_nodeinvisited_nodes:continuevisited_nodes.add(sub_task_node)
+ executable=sub_task_node.executable_sdk_object
+
+ # If the executable object that we're dealing with is registerable (ie, SdkRunnableLaunchPlan, SdkWorkflow
+ # SdkTask, or SdkRunnableTask), then it should have the ability to give itself a name. After assigning
+ # itself the name, also make sure the id is properly set according to current config values.
+ ifisinstance(executable,_registerable.RegisterableEntity):
+ executable.auto_assign_name()
+ executable._id=_identifier.Identifier(
+ executable.resource_type,
+ _internal_config.TASK_PROJECT.get()or_internal_config.PROJECT.get(),
+ _internal_config.TASK_DOMAIN.get()or_internal_config.DOMAIN.get(),
+ executable.platform_valid_name,
+ _internal_config.TASK_VERSION.get()or_internal_config.VERSION.get()
+ )# Generate an id that's unique in the document (if the same task is used multiple times with# different resources, executable_sdk_object.id will be the same but generated node_ids should not
@@ -323,44 +348,60 @@
Source code for flytekit.common.tasks.sdk_dynamic
new_count =generated_ids[safe_task_id]=0unique_node_id=_dnsify("{}-{}".format(safe_task_id,new_count))
- # If the task can run as an array job, group its instances together. Otherwise, keep each invocation as a
- # separate node.
- ifSdkDynamicTask._can_run_as_array(sub_task_node.executable_sdk_object.type):
- ifsub_task_node.executable_sdk_objectinarray_job_index:
- array_job,node=array_job_index[sub_task_node.executable_sdk_object]
- array_job.size+=1
- array_job.min_successes=int(math.ceil((1-effective_failure_ratio)*array_job.size))
- else:
- array_job=self._create_array_job(inputs_prefix=unique_node_id)
- node=sub_task_node.assign_id_and_return(unique_node_id)
- array_job_index[sub_task_node.executable_sdk_object]=(array_job,node)
-
- node_index=_six.text_type(array_job.size-1)
+ # Handling cases where the yielded nodes are launch plan or subworkflow nodes
+ ifisinstance(sub_task_node.executable_sdk_object,(_launch_plan.SdkLaunchPlan,_workflow.SdkWorkflow)):
+ node=sub_task_node.assign_id_and_return(unique_node_id)
+ nodes.append(node)fork,node_outputin_six.iteritems(sub_task_node.outputs):ifnotnode_output.sdk_node.id:node_output.sdk_node.assign_id_and_return(node.id)
- node_output.var="[{}].{}".format(node_index,node_output.var)
- # Upload inputs to working directory under /array_job.input_ref/<index>/inputs.pb
- input_path=_os.path.join(node.id,node_index,_constants.INPUT_FILE_NAME)
- generated_files[input_path]=_literal_models.LiteralMap(
- literals={binding.var:binding.binding.to_literal_model()forbindingin
- sub_task_node.inputs})
+ # Upload inputs to working directory under /array_job.input_ref/inputs.pb
+ input_path=_os.path.join(node.id,_constants.INPUT_FILE_NAME)
+ generated_files[input_path]=_literal_models.LiteralMap(
+ literals={binding.var:binding.binding.to_literal_model()forbindingin
+ sub_task_node.inputs})
+
+ # Handling taskselse:
- node=sub_task_node.assign_id_and_return(unique_node_id)
+ # If the task can run as an array job, group its instances together. Otherwise, keep each
+ # invocation as a separate node.
+ ifSdkDynamicTask._can_run_as_array(sub_task_node.executable_sdk_object.type):
+ ifsub_task_node.executable_sdk_objectinarray_job_index:
+ array_job,node=array_job_index[sub_task_node.executable_sdk_object]
+ array_job.size+=1
+ array_job.min_successes=int(math.ceil((1-effective_failure_ratio)*array_job.size))
+ else:
+ array_job=self._create_array_job(inputs_prefix=unique_node_id)
+ node=sub_task_node.assign_id_and_return(unique_node_id)
+ array_job_index[sub_task_node.executable_sdk_object]=(array_job,node)
+
+ node_index=_six.text_type(array_job.size-1)
+ fork,node_outputin_six.iteritems(sub_task_node.outputs):
+ ifnotnode_output.sdk_node.id:
+ node_output.sdk_node.assign_id_and_return(node.id)
+ node_output.var="[{}].{}".format(node_index,node_output.var)
+
+ # Upload inputs to working directory under /array_job.input_ref/<index>/inputs.pb
+ input_path=_os.path.join(node.id,node_index,_constants.INPUT_FILE_NAME)
+ generated_files[input_path]=_literal_models.LiteralMap(
+ literals={binding.var:binding.binding.to_literal_model()forbindingin
+ sub_task_node.inputs})
+ else:
+ node=sub_task_node.assign_id_and_return(unique_node_id)
- tasks.append(sub_task_node.executable_sdk_object)
- nodes.append(node)
+ tasks.append(sub_task_node.executable_sdk_object)
+ nodes.append(node)
- fork,node_outputin_six.iteritems(sub_task_node.outputs):
- ifnotnode_output.sdk_node.id:
- node_output.sdk_node.assign_id_and_return(node.id)
+ fork,node_outputin_six.iteritems(sub_task_node.outputs):
+ ifnotnode_output.sdk_node.id:
+ node_output.sdk_node.assign_id_and_return(node.id)
- # Upload inputs to working directory under /array_job.input_ref/inputs.pb
- input_path=_os.path.join(node.id,_constants.INPUT_FILE_NAME)
- generated_files[input_path]=_literal_models.LiteralMap(
- literals={binding.var:binding.binding.to_literal_model()forbindingin
- sub_task_node.inputs})
+ # Upload inputs to working directory under /array_job.input_ref/inputs.pb
+ input_path=_os.path.join(node.id,_constants.INPUT_FILE_NAME)
+ generated_files[input_path]=_literal_models.LiteralMap(
+ literals={binding.var:binding.binding.to_literal_model()forbindingin
+ sub_task_node.inputs})# assign custom field to the ArrayJob properties computed.fortask,(array_job,_)in_six.iteritems(array_job_index):
diff --git a/_modules/flytekit/common/tasks/sdk_runnable.html b/_modules/flytekit/common/tasks/sdk_runnable.html
index 3984c1d8a0..e79caf9aa6 100644
--- a/_modules/flytekit/common/tasks/sdk_runnable.html
+++ b/_modules/flytekit/common/tasks/sdk_runnable.html
@@ -8,7 +8,7 @@
- flytekit.common.tasks.sdk_runnable — Flyte 0.0.1 documentation
+ flytekit.common.tasks.sdk_runnable — Flyte 0.3.0 documentation
@@ -650,8 +650,8 @@
Source code for flytekit.common.tasks.sdk_runnable
for k,vin_six.iteritems(inputs):ifnotself._is_argname_in_function_definition(k):raise_user_exceptions.FlyteValidationException(
- "The input '{}' was not specified in the task function. Therefore, this input cannot be "
- "provided to the task.".format(v)
+ "The input named '{}' was not specified in the task function. Therefore, this input cannot be "
+ "provided to the task.".format(k))if_type_helpers.get_sdk_type_from_literal_type(v.type)intype(self)._banned_inputs:raise_user_exceptions.FlyteValidationException(
diff --git a/_modules/flytekit/common/tasks/sidecar_task.html b/_modules/flytekit/common/tasks/sidecar_task.html
index 3f749e3404..e4a66684e6 100644
--- a/_modules/flytekit/common/tasks/sidecar_task.html
+++ b/_modules/flytekit/common/tasks/sidecar_task.html
@@ -8,7 +8,7 @@
- flytekit.common.tasks.sidecar_task — Flyte 0.0.1 documentation
+ flytekit.common.tasks.sidecar_task — Flyte 0.3.0 documentation
diff --git a/_modules/flytekit/common/tasks/spark_task.html b/_modules/flytekit/common/tasks/spark_task.html
index 517a94d63c..3530218b22 100644
--- a/_modules/flytekit/common/tasks/spark_task.html
+++ b/_modules/flytekit/common/tasks/spark_task.html
@@ -8,7 +8,7 @@
- flytekit.common.tasks.spark_task — Flyte 0.0.1 documentation
+ flytekit.common.tasks.spark_task — Flyte 0.3.0 documentation
@@ -219,6 +219,7 @@
[docs]defbuild_sdk_workflow_from_metaclass(metaclass,queuing_budget=None,cls=None):""" :param T metaclass: :param cls: This is the class that will be instantiated from the inputs, outputs, and nodes. This will be used by users extending the base Flyte programming model. If set, it must be a subclass of SdkWorkflow.
+ :param queuing_budget datetime.timedelta: [Optional] Budget that specifies the amount of time a workflow can be queued up for execution. :rtype: SdkWorkflow """inputs,outputs,nodes=_discover_workflow_components(metaclass)
-
+ metadata=_workflow_models.WorkflowMetadata(queuing_budget)ifqueuing_budgetelseNonereturn(clsorSdkWorkflow)(inputs=[iforiinsorted(inputs,key=lambdax:x.name)],outputs=[oforoinsorted(outputs,key=lambdax:x.name)],
- nodes=[nforninsorted(nodes,key=lambdax:x.id)]
+ nodes=[nforninsorted(nodes,key=lambdax:x.id)],
+ metadata=metadata)
[docs]classDynamicTask(ReturnOutputsTask):
+ def__init__(self,*args,**kwargs):
+ self._has_workflow_node=False
+ super(DynamicTask,self).__init__(*args,**kwargs)
+
+ def_transform_for_user_output(self,outputs):
+ ifself.has_workflow_node:
+ # If a workflow node has been detected, then we skip any transformation
+ # This is to support the early termination behavior of the unit test engine when it comes to dynamic tasks
+ # that produce launch plan or subworkflow nodes.
+ # See the warning message in the code below for additional information
+ returnoutputs
+ returnsuper(DynamicTask,self)._transform_for_user_output(outputs)
+
def_execute_user_code(self,inputs):""" :param flytekit.models.literals.LiteralMap inputs:
@@ -318,6 +331,15 @@
Source code for flytekit.engines.unit.engine
tasks_map ={task.id:taskfortaskinfutures.tasks}forfuture_nodeinfutures.nodes:
+ iffuture_node.workflow_nodeisnotNone:
+ # TODO: implement proper unit testing for launchplan and subworkflow nodes somehow
+ _logging.warning("A workflow node has been detected in the output of the dynamic task. The "
+ "Flytekit unit test engine is incomplete for dynamic tasks that return launch "
+ "plans or subworkflows. The generated dynamic job spec will be returned but "
+ "they will not be run.")
+ # For now, just return the output of the parent task
+ self._has_workflow_node=True
+ returnresultstask=tasks_map[future_node.task_node.reference_id]iftask.type==_sdk_constants.SdkTaskType.CONTAINER_ARRAY_TASK:sub_task_output=DynamicTask.execute_array_task(future_node.id,task,results)
@@ -346,6 +368,13 @@
[docs]classWorkflowMetadata(_common.FlyteIdlEntity):
- def__init__(self):
+ def__init__(self,queuing_budget=None):""" Metadata for the workflow.
+
+ :param queuing_budget datetime.timedelta: [Optional] Budget that specifies the amount of time a workflow can be queued up for execution. """
- pass
+ self._queuing_budget=queuing_budget
+
+ @property
+ defqueuing_budget(self):
+ """
+ :rtype: datetime.timedelta
+ """
+ returnself._queuing_budget
[docs]classSparkJob(_common.FlyteIdlEntity):
- def__init__(self,application_file,spark_conf,hadoop_conf,executor_path):
+ def__init__(self,spark_type,application_file,main_class,spark_conf,hadoop_conf,executor_path):""" This defines a SparkJob target. It will execute the appropriate SparkJob.
@@ -711,10 +712,28 @@
Source code for flytekit.models.task
:param dict[Text, Text] hadoop_conf: A definition of key-value pairs for hadoop config for the job. """self._application_file=application_file
+ self._spark_type=spark_type
+ self._main_class=main_classself._executor_path=executor_pathself._spark_conf=spark_confself._hadoop_conf=hadoop_conf
+ @property
+ defmain_class(self):
+ """
+ The main class to execute
+ :rtype: Text
+ """
+ returnself._main_class
+
+ @property
+ defspark_type(self):
+ """
+ Spark Job Type
+ :rtype: Text
+ """
+ returnself._spark_type
+
@propertydefapplication_file(self):"""
@@ -751,8 +770,22 @@
[docs]defworkflow_class(_workflow_metaclass=None,cls=None,queuing_budget=None):""" This is a decorator for wrapping class definitions into workflows.
@@ -220,11 +220,12 @@
Source code for flytekit.sdk.workflow
:param cls: This is the class that will be instantiated from the inputs, outputs, and nodes. This will be used by users extending the base Flyte programming model. If set, it must be a subclass of :py:class:`flytekit.common.workflow.SdkWorkflow`.
+ :param queuing_budget datetime.timedelta: [Optional] Budget that specifies the amount of time a workflow can be queued up for execution. :rtype: flytekit.common.workflow.SdkWorkflow """defwrapper(metaclass):
- wf=_common_workflow.build_sdk_workflow_from_metaclass(metaclass,cls=cls)
+ wf=_common_workflow.build_sdk_workflow_from_metaclass(metaclass,cls=cls,queuing_budget=queuing_budget)returnwfif_workflow_metaclassisnotNone:
@@ -232,7 +233,7 @@
[docs]defworkflow(nodes,inputs=None,outputs=None,cls=None,queuing_budget=None):""" This function provides a user-friendly interface for authoring workflows.
@@ -265,12 +266,14 @@
Source code for flytekit.sdk.workflow
:param T cls: This is the class that will be instantiated from the inputs, outputs, and nodes. This will be used by users extending the base Flyte programming model. If set, it must be a subclass of :py:class:`flytekit.common.workflow.SdkWorkflow`.
+ :param queuing_budget datetime.timedelta: [Optional] Budget that specifies the amount of time a workflow can be queued up for execution. :rtype: flytekit.common.workflow.SdkWorkflow """wf=(clsor_common_workflow.SdkWorkflow)(inputs=[v.rename_and_return_reference(k)fork,vinsorted(_six.iteritems(inputsor{}))],outputs=[v.rename_and_return_reference(k)fork,vinsorted(_six.iteritems(outputsor{}))],
- nodes=[v.assign_id_and_return(k)fork,vinsorted(_six.iteritems(nodes))]
+ nodes=[v.assign_id_and_return(k)fork,vinsorted(_six.iteritems(nodes))],
+ metadata=_common_workflow._workflow_models.WorkflowMetadata(queuing_budget=queuing_budget)ifqueuing_budgetelseNone)returnwf
diff --git a/_modules/random.html b/_modules/random.html
index d4a8313bb5..06a00eba78 100644
--- a/_modules/random.html
+++ b/_modules/random.html
@@ -8,7 +8,7 @@
- random — Flyte 0.0.1 documentation
+ random — Flyte 0.3.0 documentation
diff --git a/_sources/administrator/install/getting_started.rst.txt b/_sources/administrator/install/getting_started.rst.txt
index fa56932bdc..e7c3d17fce 100644
--- a/_sources/administrator/install/getting_started.rst.txt
+++ b/_sources/administrator/install/getting_started.rst.txt
@@ -41,8 +41,19 @@ For local deployments, this endpoint is typically http://localhost:30081/console
(for Minikube deployment, you need to run ``minikube tunnel`` and use the ip that Minikube tunnel outputs)
WARNING:
- The sandbox deployment is not well suited for production use.
- Most importantly, Flyte needs access to an object store, and a PostgreSQL database.
- In the sandbox deployment, the object store and PostgreSQL database are each installed as a single kubernetes pod.
- These pods are sufficient for testing and playground purposes, but they not designed to handle production load.
- Read on to learn how to configure Flyte for production.
+ - The sandbox deployment is not well suited for production use.
+ - Most importantly, Flyte needs access to an object store, and a PostgreSQL database.
+ - In the sandbox deployment, the object store and PostgreSQL database are each installed as a single kubernetes pod.
+ - These pods are sufficient for testing and playground purposes, but they not designed to handle production load.
+ - Read on to learn how to configure Flyte for production.
+
+SPECIAL NOTE FOR MINIKUBE:
+ - Minikube runs in a Virtual Machine on your host
+ - So if you try to access the flyte console on localhost, that will not work, because the Virtual Machine has a different IP address.
+ - Flyte runs within Kubernetes (minikube), thus to access FlyteConsole, you cannot just use https://localhost:30081/console, you need to use the ``IP address`` of the minikube VM instead of ``localhost``
+ - Refer to https://kubernetes.io/docs/tutorials/hello-minikube/ to understand how to access a
+ - also to register workflows, tasks etc or use the CLI to query Flyte service, you have to use the IP address.
+ - If you are building an image locally and want to execute on Minikube hosted Flyte environment, please push the image to docker registry running on the Minikube VM.
+ - Another alternative is to change the docker host, to build the docker image on the Minikube hosted docker daemon. https://minikube.sigs.k8s.io/docs/handbook/pushing/ provides more
+ detailed information about this process. As a TL;DR, Flyte can only run images that are accessible to Kubernetes. To make an image accessible, you could either push it to a remote registry or to
+ a regisry that is available to Kuberentes. In case on minikube this registry is the one thats running on the VM.
diff --git a/_sources/flytekit/flytekit.common.tasks.rst.txt b/_sources/flytekit/flytekit.common.tasks.rst.txt
index 9b256c644b..48e80d5486 100644
--- a/_sources/flytekit/flytekit.common.tasks.rst.txt
+++ b/_sources/flytekit/flytekit.common.tasks.rst.txt
@@ -12,6 +12,14 @@ flytekit.common.tasks.executions module
:undoc-members:
:show-inheritance:
+flytekit.common.tasks.generic\_spark\_task module
+-------------------------------------------------
+
+.. automodule:: flytekit.common.tasks.generic_spark_task
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
flytekit.common.tasks.hive\_task module
---------------------------------------
diff --git a/_sources/flytekit/flytekit.models.rst.txt b/_sources/flytekit/flytekit.models.rst.txt
index 4ec22807e3..3fab151540 100644
--- a/_sources/flytekit/flytekit.models.rst.txt
+++ b/_sources/flytekit/flytekit.models.rst.txt
@@ -76,6 +76,14 @@ flytekit.models.literals module
:undoc-members:
:show-inheritance:
+flytekit.models.named\_entity module
+------------------------------------
+
+.. automodule:: flytekit.models.named_entity
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
flytekit.models.node\_execution module
--------------------------------------
diff --git a/_sources/flytekit/flytekit.sdk.rst.txt b/_sources/flytekit/flytekit.sdk.rst.txt
index 5a48c3a2be..776cafbaf2 100644
--- a/_sources/flytekit/flytekit.sdk.rst.txt
+++ b/_sources/flytekit/flytekit.sdk.rst.txt
@@ -12,6 +12,14 @@ flytekit.sdk.exceptions module
:undoc-members:
:show-inheritance:
+flytekit.sdk.spark\_types module
+--------------------------------
+
+.. automodule:: flytekit.sdk.spark_types
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
flytekit.sdk.tasks module
-------------------------
diff --git a/_sources/introduction/docs_overview.rst.txt b/_sources/introduction/docs_overview.rst.txt
index b128dbc42a..770047a6a2 100644
--- a/_sources/introduction/docs_overview.rst.txt
+++ b/_sources/introduction/docs_overview.rst.txt
@@ -12,7 +12,6 @@ personas.
User: I want to write Flyte Workflows
=====================================
A user refers to anyone who wants to:
-
- Explore how Flyte works and try it out before installing, operating, or
using it
- Use a hosted Flyte deployment available at her organization
@@ -23,11 +22,10 @@ A user refers to anyone who wants to:
Administrator: I want to manage a Flyte installation at my company
==================================================================
-An administrator is someone who wants to deploy, manage, and scale a Flyte installation for
-his or her organization. The administrator is not interested in altering or modifying any code,
-only using the system off the shelf, and configuring the various available knobs and settings. This
-section also talks about the typical installation structure and scalability
-primitives available in the system.
+An administrator is someone who wants to:
+ - deploy, manage, and scale a Flyte installation for their organization.
+ - The administrator is not interested in altering or modifying any code, only using the system off the shelf, and configuring the various available knobs and settings.
+ - This section also talks about the typical installation structure and scalability primitives available in the system.
:ref:`Jump to Administrator Docs `
diff --git a/_sources/user/getting_started/create_first.rst.txt b/_sources/user/getting_started/create_first.rst.txt
index 8b504ad7ff..e6acbc7107 100644
--- a/_sources/user/getting_started/create_first.rst.txt
+++ b/_sources/user/getting_started/create_first.rst.txt
@@ -33,7 +33,7 @@ Lets create a new project called ``myflyteproject``. Use the project creation en
Writing a Task
*****************
-The most basic Flyte primitive is a "task". Flyte Tasks are units of work that can be composed in a workflow. The simplest way to write a Flyte task is using the FlyteSDK.
+The most basic Flyte primitive is a "task". Flyte Tasks are units of work that can be composed in a workflow. The simplest way to write a Flyte task is using the Flyte Python SDK - flytekit.
Start by creating a new file ::
@@ -109,7 +109,7 @@ Flyte fulfills tasks using docker images. You'll need to build a docker image fr
If you have the flyte sandbox installed on your local machine, the image will be accessible to to your Flyte system. If you're running a remote Flyte instance, you'll need to upload this image to a remote registry such as Dockerhub, Amazon ECR, or Google Container Registry, so that it can be used by the Flyte system.
-To upload to a remote registry, use ::
+To upload to a remote registry (or even local registry), use ::
DOCKER_REGISTRY_USERNAME={username} DOCKER_REGISTRY_PASSWORD={pass} REGISTRY=docker.io make docker_build
diff --git a/_sources/user/tasktypes/presto.rst.txt b/_sources/user/tasktypes/presto.rst.txt
index 85f4934436..c193f46627 100644
--- a/_sources/user/tasktypes/presto.rst.txt
+++ b/_sources/user/tasktypes/presto.rst.txt
@@ -23,6 +23,7 @@ saving it to an external table, and performing cleanup.
If a user wanted to run a Presto query like:
.. code-block:: sql
+
SELECT *
FROM foo
WHERE bar = 123
diff --git a/_static/documentation_options.js b/_static/documentation_options.js
index 4f9bc45140..f3296e0a63 100644
--- a/_static/documentation_options.js
+++ b/_static/documentation_options.js
@@ -1,6 +1,6 @@
var DOCUMENTATION_OPTIONS = {
URL_ROOT: document.getElementById("documentation_options").getAttribute('data-url_root'),
- VERSION: '0.0.1',
+ VERSION: '0.3.0',
LANGUAGE: 'None',
COLLAPSE_INDEX: false,
FILE_SUFFIX: '.html',
diff --git a/administrator/architecture.html b/administrator/architecture.html
index ad6bd35d61..d336980271 100644
--- a/administrator/architecture.html
+++ b/administrator/architecture.html
@@ -8,7 +8,7 @@
- Architecture — Flyte 0.0.1 documentation
+ Architecture — Flyte 0.3.0 documentation
diff --git a/administrator/index.html b/administrator/index.html
index 922f5d74f1..e9a62b17ad 100644
--- a/administrator/index.html
+++ b/administrator/index.html
@@ -8,7 +8,7 @@
- Administrator Docs — Flyte 0.0.1 documentation
+ Administrator Docs — Flyte 0.3.0 documentation
diff --git a/administrator/install/authentication.html b/administrator/install/authentication.html
index 4746f7bfd2..e5fdab08c0 100644
--- a/administrator/install/authentication.html
+++ b/administrator/install/authentication.html
@@ -8,7 +8,7 @@
- Authentication — Flyte 0.0.1 documentation
+ Authentication — Flyte 0.3.0 documentation
diff --git a/administrator/install/configure/admin.html b/administrator/install/configure/admin.html
index 6ab70c0fab..9a9688d08d 100644
--- a/administrator/install/configure/admin.html
+++ b/administrator/install/configure/admin.html
@@ -8,7 +8,7 @@
- FlyteAdmin Configuration — Flyte 0.0.1 documentation
+ FlyteAdmin Configuration — Flyte 0.3.0 documentation
diff --git a/administrator/install/configure/common.html b/administrator/install/configure/common.html
index d88588605b..eb218ba3ab 100644
--- a/administrator/install/configure/common.html
+++ b/administrator/install/configure/common.html
@@ -8,7 +8,7 @@
- Common configuration across all backend components — Flyte 0.0.1 documentation
+ Common configuration across all backend components — Flyte 0.3.0 documentation
diff --git a/administrator/install/configure/index.html b/administrator/install/configure/index.html
index ff1aa02176..20561591cb 100644
--- a/administrator/install/configure/index.html
+++ b/administrator/install/configure/index.html
@@ -8,7 +8,7 @@
- Configure Flyte backend — Flyte 0.0.1 documentation
+ Configure Flyte backend — Flyte 0.3.0 documentation
diff --git a/administrator/install/configure/plugins.html b/administrator/install/configure/plugins.html
index d4ebdb435a..20b38387b9 100644
--- a/administrator/install/configure/plugins.html
+++ b/administrator/install/configure/plugins.html
@@ -8,7 +8,7 @@
- Plugin Configuration — Flyte 0.0.1 documentation
+ Plugin Configuration — Flyte 0.3.0 documentation
diff --git a/administrator/install/configure/propeller.html b/administrator/install/configure/propeller.html
index 643be15b80..407ffe14ca 100644
--- a/administrator/install/configure/propeller.html
+++ b/administrator/install/configure/propeller.html
@@ -8,7 +8,7 @@
- Propeller Configuration — Flyte 0.0.1 documentation
+ Propeller Configuration — Flyte 0.3.0 documentation
diff --git a/administrator/install/getting_started.html b/administrator/install/getting_started.html
index beaf162cef..a11a17fbfc 100644
--- a/administrator/install/getting_started.html
+++ b/administrator/install/getting_started.html
@@ -8,7 +8,7 @@
- Getting Started — Flyte 0.0.1 documentation
+ Getting Started — Flyte 0.3.0 documentation
@@ -212,11 +212,25 @@
(for Minikube deployment, you need to run minikubetunnel and use the ip that Minikube tunnel outputs)
-
WARNING:
The sandbox deployment is not well suited for production use.
-Most importantly, Flyte needs access to an object store, and a PostgreSQL database.
-In the sandbox deployment, the object store and PostgreSQL database are each installed as a single kubernetes pod.
-These pods are sufficient for testing and playground purposes, but they not designed to handle production load.
-Read on to learn how to configure Flyte for production.
+
WARNING:
+
The sandbox deployment is not well suited for production use.
+
Most importantly, Flyte needs access to an object store, and a PostgreSQL database.
+
In the sandbox deployment, the object store and PostgreSQL database are each installed as a single kubernetes pod.
+
These pods are sufficient for testing and playground purposes, but they not designed to handle production load.
+
Read on to learn how to configure Flyte for production.
+
+
+
SPECIAL NOTE FOR MINIKUBE:
+
Minikube runs in a Virtual Machine on your host
+
So if you try to access the flyte console on localhost, that will not work, because the Virtual Machine has a different IP address.
+
Flyte runs within Kubernetes (minikube), thus to access FlyteConsole, you cannot just use https://localhost:30081/console, you need to use the IPaddress of the minikube VM instead of localhost
also to register workflows, tasks etc or use the CLI to query Flyte service, you have to use the IP address.
+
If you are building an image locally and want to execute on Minikube hosted Flyte environment, please push the image to docker registry running on the Minikube VM.
+
Another alternative is to change the docker host, to build the docker image on the Minikube hosted docker daemon. https://minikube.sigs.k8s.io/docs/handbook/pushing/ provides more
+detailed information about this process. As a TL;DR, Flyte can only run images that are accessible to Kubernetes. To make an image accessible, you could either push it to a remote registry or to
+a regisry that is available to Kuberentes. In case on minikube this registry is the one thats running on the VM.
Updates the metadata associated with a named entity. A named entity is designated a resource, e.g. a workflow,
+task or launch plan specified by {project, domain, name} across all versions of the resource.
+
+
Parameters
+
+
resource_type (int) – Enum value from flytekit.models.identifier.ResourceType
+
id (flytekit.models.admin.named_entity.NamedEntityIdentifier) – identifier for named entity to update
cls – This is the class that will be instantiated from the inputs, outputs, and nodes. This will be used
by users extending the base Flyte programming model. If set, it must be a subclass of SdkWorkflow.
+
datetime.timedelta (queuing_budget) – [Optional] Budget that specifies the amount of time a workflow can be queued up for execution.
This function is a bit of trickster Python code that goes hand in hand with the _InstanceTracker metaclass
+defined above. Thanks @matthewphsmith for this bit of ingenuity.
+
For instance, if a user has code that looks like this:
+
+
from some.other.module import wf
+my_launch_plan = wf.create_launch_plan()
+
@dynamic_task
+def sample_task(wf_params):
+
+
yield my_launch_plan()
+
+
+
This code means that we should have a launch plan with a name ending in “my_launch_plan”, since that is the
+name of the variable that the created launch plan gets assigned to. That is also the name that the launch plan
+would be registered with.
+
However, when the create_launch_plan() function runs, the Python interpreter has no idea where the created
+object will be assigned to. It has no idea that the output of the create_launch_plan call is to be paired up
+with a variable named “my_launch_plan”. This function basically does this after the fact. Leveraging the
+_instantiated_in field provided by the _InstanceTracker class above, this code will re-import the
+module (ie Python file) that the object is in. Since it’s already loaded, it’s just retrieved from memory.
+It then scans all objects in the module, and when an object match is found, it knows it’s found the right
+variable name.
+
Just to drive the point home, this function is mostly needed for Launch Plans. Assuming that user code has:
+
+
@python_task
+def some_task()
+
+
When Flytekit calls the module loader and loads the task, the name of the task is the name of the function
+itself. It’s known at time of creation. In contrast, when
+
+
xyz = SomeWorflow.create_launch_plan()
+
+
is called, the name of the launch plan isn’t known until after creation, it’s not “SomeWorkflow”, it’s “xyz”
Adds the inputs to this task. This can be called multiple times, but it will fail if an input with a given
+name is added more than once, a name collides with an output, or if the name doesn’t exist as an arg name in
+the wrapped function.
+:param dict[Text, flytekit.models.interface.Variable] inputs: names and variables
-flytekit.interfaces.random.random = <random.Random object at 0x2e4c4b8>¶
+flytekit.interfaces.random.random = <random.Random object at 0x1e9a568>¶
An instance of the global random number generator used by flytekit. Flytekit maintains it’s own random instance
to ensure that calls to random.seed(…) do not affect the pseudo-random behavior of flytekit. This random should be
used by flytekit components in all cases where random.random would have been used. Components who want additional
diff --git a/flytekit/flytekit.interfaces.stats.html b/flytekit/flytekit.interfaces.stats.html
index 6340a70f9c..d2c569ad6f 100644
--- a/flytekit/flytekit.interfaces.stats.html
+++ b/flytekit/flytekit.interfaces.stats.html
@@ -8,7 +8,7 @@
-
Create a generic spark task. This task will connect to a Spark cluster, configure the environment,
+and then execute the mainClass code as the Spark driver program.