Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Airflow support for GCP #25

Merged
merged 7 commits into from
Jan 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 64 additions & 40 deletions gcp/terraform/.terraform.lock.hcl

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 29 additions & 0 deletions gcp/terraform/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,35 @@ Then, apply the `services` module (deploys Metaflow services to GKE)

The step above will output next steps for Metaflow end users.

## Metaflow job orchestration options
The recommended way to orchestrate Metaflow workloads on Kubernetes is via [Argo Workflows](https://docs.metaflow.org/going-to-production-with-metaflow/scheduling-metaflow-flows/scheduling-with-argo-workflows). However, Airflow is also supported as an alternative.

The template also provides the `deploy_airflow` and `deploy_argo` flags as variables. These are booleans that specify if [Airflow](https://airflow.apache.org/) or [Argo Workflows](https://argoproj.github.io/argo-workflows/) will be deployed in the Kubernetes cluster along with Metaflow related services. By default `deploy_argo` is set to __true__ and `deploy_airflow` is set to __false__.
To change these, set them in your `FILE.tfvars` file (or else, via other [terraform variable](https://www.terraform.io/language/values/variables) passing mechanisms)

### Argo Workflows
Argo Workflows is installed by default on the AKS cluster as part of the `services` submodule. Setting the `deploy_argo` [variable](./variables.tf) will deploy Argo in the GKE cluster. No additional configuration is done in the `infra` module to support `argo`.

After you have changed the value of `deploy_argo`, re-apply terraform for both [infra and services](#usage).

### Airflow

**This is quickstart template only, not recommended for real production deployments**

If `deploy_airflow` is set to true, then the `services` module will deploy Airflow via a [helm chart](https://airflow.apache.org/docs/helm-chart/stable/index.html) into the kubernetes cluster (the one deployed by the `infra` module).

The terraform template deploys Airflow configured with a `LocalExecutor`. Metaflow can work with any Airflow executor. This template deploys the `LocalExecutor` for simplicity.

After you have changed the value of `deploy_airflow`, reapply terraform for both [infra and services](#usage).

#### Shipping Metaflow compiled DAGs to Airflow
Airflow expects Python files with Airflow DAGS present in the [dags_folder](https://airflow.apache.org/docs/apache-airflow/2.2.0/configurations-ref.html#dags-folder). By default this terraform template uses the [defaults](https://airflow.apache.org/docs/helm-chart/stable/parameters-ref.html#airflow) set in the Airflow helm chart which is `{AIRFLOW_HOME}/dags` (`/opt/airflow/dags`).

The metaflow-tools repository also ships a [airflow_dag_upload.py](../../scripts/airflow_dag_upload.py) file that can help sync Airflow dag file generated by Metaflow to the Airflow scheduler _deployed by this template_. Under the hood [airflow_dag_upload.py](../../scripts/airflow_dag_upload.py) uses the `kubectl cp` command to copy files from local to the Airflow scheduler's container. Example of how to use the file:
```
python airflow_dag_upload.py my-dag.py /opt/airflow/dags/my-dag.py
```

## (Advanced) Terraform state management
Terraform manages the state of GCP resources in [tfstate](https://www.terraform.io/language/state) files locally by default.

Expand Down
23 changes: 21 additions & 2 deletions gcp/terraform/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ terraform {
source = "hashicorp/local"
version = "2.2.3"
}
helm = {
source = "hashicorp/helm"
version = "2.6.0"
}
}
}

Expand Down Expand Up @@ -45,12 +49,22 @@ data "google_sql_database_instance" "default" {
}

provider "kubernetes" {
host = "https://${data.google_container_cluster.default.endpoint}"
token = data.google_client_config.default.access_token
host = "https://${data.google_container_cluster.default.endpoint}"
token = data.google_client_config.default.access_token
cluster_ca_certificate = base64decode(
data.google_container_cluster.default.master_auth[0].cluster_ca_certificate,
)
}
provider "helm" {
kubernetes {
host = "https://${data.google_container_cluster.default.endpoint}"
cluster_ca_certificate = base64decode(data.google_container_cluster.default.master_auth[0].cluster_ca_certificate)
token = data.google_client_config.default.access_token
# token is required here and we remove `client_certificate` / `client_key` because it results in this error like :
# `Error: unable to build kubernetes objects from release manifest: unknown`
# More notes on this issue can be found here : https://github.com/hashicorp/terraform-provider-helm/issues/513
}
}

# This will be used for invoking kubectl re: Argo installation
resource "local_file" "kubeconfig" {
Expand Down Expand Up @@ -81,6 +95,7 @@ module "services" {
metaflow_ui_static_service_image = local.metaflow_ui_static_service_image
metaflow_ui_backend_service_image = local.metaflow_ui_backend_service_image
metaflow_datastore_sysroot_gs = local.metaflow_datastore_sysroot_gs
airflow_logs_bucket_path = local.airflow_logs_bucket_path
metaflow_db_host = "localhost"
metaflow_db_name = "metaflow"
metaflow_db_user = "metaflow"
Expand All @@ -93,4 +108,8 @@ module "services" {
metaflow_workload_identity_ksa_name = local.metaflow_workload_identity_ksa_name
metadata_service_image = local.metadata_service_image
kubeconfig_path = local_file.kubeconfig.filename
deploy_airflow = var.deploy_airflow
deploy_argo = var.deploy_argo
airflow_version = local.airflow_version
airflow_frenet_secret = local.airflow_frenet_secret
}
4 changes: 3 additions & 1 deletion gcp/terraform/output.tf
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ METAFLOW_SERVICE_URL=http://127.0.0.1:8080/
METAFLOW_SERVICE_INTERNAL_URL=http://metadata-service.default:8080/
[For Argo only] METAFLOW_KUBERNETES_NAMESPACE=argo
[For Argo only] METAFLOW_KUBERNETES_SERVICE_ACCOUNT=argo
[For Airflow only] METAFLOW_KUBERNETES_NAMESPACE=airflow
[For Airflow only] METAFLOW_KUBERNETES_SERVICE_ACCOUNT=airflow-deployment-scheduler
[For non-Argo only] METAFLOW_KUBERNETES_SERVICE_ACCOUNT=${local.metaflow_workload_identity_ksa_name}

Note: you can skip these:
Expand All @@ -76,7 +78,7 @@ $ kubectl port-forward -n argo deployment/argo-server 2746:2746

option 2 - this script manages the same port-forwards for you (and prevents timeouts)

$ python metaflow-tools/scripts/forward_metaflow_ports.py [--include-argo]
$ python metaflow-tools/scripts/forward_metaflow_ports.py [--include-argo] [--include-airflow]

STEP 4: Install GCP Python SDK
$ pip install google-cloud-storage google-auth
Expand Down
51 changes: 51 additions & 0 deletions gcp/terraform/services/airflow.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
resource "kubernetes_namespace" "airflow" {
count = var.deploy_airflow ? 1 : 0
metadata {
name = "airflow"
}
}

locals {
airflow_values = {
"executor" = "LocalExecutor"
"defaultAirflowTag" = var.airflow_version
"airflowVersion" = var.airflow_version
"webserverSecretKey" = var.airflow_frenet_secret
}
}


resource "helm_release" "airflow" {
count = var.deploy_airflow ? 1 : 0
name = "airflow-deployment"

repository = "https://airflow.apache.org"
chart = "airflow"

namespace = kubernetes_namespace.airflow[0].metadata[0].name

timeout = 1200

wait = false # Why set `wait=false`
#: Read this (https://github.com/hashicorp/terraform-provider-helm/issues/683#issuecomment-830872443)
# Short summary : If this is not set then airflow doesn't end up running migrations on the database. That makes the scheduler and other containers to keep waiting for migrations.

values = [
yamlencode(local.airflow_values)
]
}
# annotation is added to the scheduler's pod so that the pod's service account can
# talk to Google cloud storage.
resource "kubernetes_annotations" "airflow_service_account_annotation" {
count = var.deploy_airflow ? 1 : 0
depends_on = [helm_release.airflow]
api_version = "v1"
kind = "ServiceAccount"
metadata {
name = "airflow-deployment-scheduler"
namespace = kubernetes_namespace.airflow[0].metadata[0].name
}
annotations = {
"iam.gke.io/gcp-service-account" = "${var.metaflow_workload_identity_gsa_name}@${var.project}.iam.gserviceaccount.com"
}
}
7 changes: 5 additions & 2 deletions gcp/terraform/services/argo.tf
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
resource "kubernetes_namespace" "argo" {
count = var.deploy_argo ? 1 : 0
metadata {
name = "argo"
}
Expand All @@ -16,22 +17,24 @@ locals {
# https://registry.terraform.io/providers/gavinbunney/kubectl/1.14
# The main challenge is that the Argo yaml contains multiple k8s resources, and terraform does not accept that natively.
resource "null_resource" "argo-quick-start-installation" {
count = var.deploy_argo ? 1 : 0
triggers = {
cmd = local._apply_cmd
}
provisioner "local-exec" {
interpreter = local.is_windows ? ["PowerShell"] : null
command = local.is_windows ? "$env:KUBECONFIG='${var.kubeconfig_path}'; ${local._apply_cmd}" : "KUBECONFIG=${var.kubeconfig_path} ${local._apply_cmd}"
command = local.is_windows ? "$env:KUBECONFIG='${var.kubeconfig_path}'; ${local._apply_cmd}" : "KUBECONFIG=${var.kubeconfig_path} ${local._apply_cmd}"
}
}

resource "null_resource" "argo-annotate-service-account" {
count = var.deploy_argo ? 1 : 0
depends_on = [null_resource.argo-quick-start-installation]
triggers = {
cmd = local._annotate_cmd
}
provisioner "local-exec" {
interpreter = local.is_windows ? ["PowerShell"] : null
command = local.is_windows ? "$env:KUBECONFIG='${var.kubeconfig_path}'; ${local._annotate_cmd}" : "KUBECONFIG=${var.kubeconfig_path} ${local._annotate_cmd}"
command = local.is_windows ? "$env:KUBECONFIG='${var.kubeconfig_path}'; ${local._annotate_cmd}" : "KUBECONFIG=${var.kubeconfig_path} ${local._annotate_cmd}"
}
}
11 changes: 6 additions & 5 deletions gcp/terraform/services/service_account.tf
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# TODO rename to "_for_default"
resource kubernetes_service_account "metaflow_service_account" {
resource "kubernetes_service_account" "metaflow_service_account" {
metadata {
name = var.metaflow_workload_identity_ksa_name
name = var.metaflow_workload_identity_ksa_name
namespace = "default"
annotations = {
"iam.gke.io/gcp-service-account" = "${var.metaflow_workload_identity_gsa_name}@${var.project}.iam.gserviceaccount.com"
Expand All @@ -13,8 +13,9 @@ resource "google_service_account_iam_binding" "metaflow-service-account-iam" {
service_account_id = var.metaflow_workload_identity_gsa_id
role = "roles/iam.workloadIdentityUser"

members = [
members = flatten([
"serviceAccount:${var.project}.svc.id.goog[${kubernetes_service_account.metaflow_service_account.id}]",
"serviceAccount:${var.project}.svc.id.goog[argo/argo]",
]
var.deploy_airflow ? ["serviceAccount:${var.project}.svc.id.goog[airflow/airflow-deployment-scheduler]"] : [],
var.deploy_argo ? ["serviceAccount:${var.project}.svc.id.goog[argo/argo]"] : [],
])
}
Loading