argo-workflow-tools is a set of tools intended to easue the usage of argo for data science and data engineerign workflows
argo-workflow-tools is published to the Python Package Index (PyPI) under the name argo-workflow-tools. To install it, run:
pip install argo-workflow-tools
Argo Submitter is an easy to use argo client that allows data scientists to easily execute and control Argo Workflows from code and interactive notebooks.
The simplest way to submit a new workflow is by running a workflow from template
ARGO_CLIENT = 'http://localhost:2746'
client = ArgoClient(ARGO_CLIENT, options=ArgoOptions(client_side_validation=False, namespace='argo'))
result = client.submit('test-workflow', params={'message':'hello world'})
You can wait for template completion by setting wait=True parameter, or calling wait_for_completion()
result = client.submit('test-workflow', params={'message':'hello world'}, wait=True)
You may send parameters, through the params dictionary
result = client.submit('test-workflow', params={'message':'hello world'}, wait=True)
You send objects as parameters, and they will be automatically serialized to json.
ARGO_CLIENT = 'http://localhost:2746'
client = ArgoClient(ARGO_CLIENT, options=ArgoOptions(client_side_validation=False, namespace='argo'))
result = client.submit('test-workflow',
if you have a custom workflow manifest , you can run it by using create
result = client.create(workflow_manifest, wait=True)
You can check the status of a workflow by calling the status field
You can fetch output parametes and artifacts throut the output field
As well as reach artifacts through the s3 path property
You may cancel a running flow through the cancel method
You may ssuspend, resume or cancel your workflow at any time
result = client.submit('test-workflow', params={'message':'hello world'}, wait=False)
You can retry a failing workflow through the retry method
Fargo is a library for autoring Argo Workflows in a Python and friendly way. The main goal of Hera are
- Make Argo Workflows accessible by leveraging pythonic style of dag
- Allow seamlless local runs, for debug or testing while maintaining the same codebase for running DAG's at scale
pythonic DSL is an opinionate subset of writing Argo workflows, it favors simplicity, readability and "pythonic flow" over leveraging the entire capability set Argo Workflows brings.
- task - atomic python code
- DAG - atonic workflows code
####Hello World
def say_hello(name):
message = f"hello {name}"
return message
to run this simple task in argo all we need to do is to decorate our code in a task decorator
def say_hello(name:str):
message = f"hello {name}"
return message
workflow = Workflow(name="hello-world", entrypoint=say_hello, arguments={"name": "Brian"})
DAG functions are functions that define a workflow by calling other tasks or nested DAGs, We support task depndency declaration implicitly by analyzing inputs and outputs of each task.
When writing DAG functions make sure you keep it a simple as possible, call only DAG or TASK flows.
def multiply_task(x: int):
return x * 2
def sum_task(x: int, y: int):
return x + y
def diamond(num: int):
a = multiply_task(num)
b = multiply_task(a)
c = multiply_task(num)
return sum_task(b, c)
In case a task does not return a parameter, you can set an explicit dependency by sending wait_for argument to the next task
def print_task():
def diamond():
a = print_task()
b = print_task(wait_for=a)
c = print_task(wait_for=a)
print_task(wait_for=[c, b])
we support map reduce workflows through [for in]
loop, the iterable input must be a parameter, an output of a previous task, or a sequence object.
currently only one level of nesting is supported, in case you wish to use nested loops, extract the second loop into a fucntion and decorate it as well with a DAG decorater.
def generate_list(partitions: int, partition_size: int):
items = []
for i in range(partitions):
items.append(list(range(1, partition_size)))
return items
def sum_task(items:list[int]):
return sum(items)
def map_reduce(partitions, partition_size):
partition_list = generate_list(partitions, partition_size)
partition_sums = [sum_task(partition) for partition in partition_list]
return sum_task(partition_sums)
we support conditional task run by employing the 'with condition' syntax
def say_hello(name: str):
message = f"hello {name}"
return message
def say_goodbye(name: str):
message = f"goodbye {name}"
return message
def command_hello(name, command):
with Condition().equals(command, "hello"):
with Condition().equals(command, "goodbye"):
Have any feedback? Wish to implement an extenstion or new capability? Want to help us make argo better and easier to use? Every contribution to Argo Workflow Tools is greatly appreciated.