Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Apache Airflow addon] Dependency added for EFS #719

Merged
merged 3 commits into from
Jun 13, 2023
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
295 changes: 164 additions & 131 deletions lib/addons/apache-airflow/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import { Values } from "../../spi";
import { setPath, createNamespace, createServiceAccount } from "../../utils";
import { IFileSystem } from "aws-cdk-lib/aws-efs";

import merge from "ts-deepmerge";

/**
* User provided options for the Helm Chart
*/
Expand Down Expand Up @@ -62,8 +64,8 @@ export interface AirflowAddOnProps extends HelmAddOnUserProps {

const AIRFLOW = 'airflow';
const RELEASE = 'blueprints-addon-apache-airflow';
const AIRFLOWSC = 'apache-airflow-sc'
const AIRFLOWPVC = 'efs-apache-airflow-pvc'
const AIRFLOWSC = 'apache-airflow-sc';
const AIRFLOWPVC = 'efs-apache-airflow-pvc';

/**
* Default props to be used when creating the Helm chart
Expand Down Expand Up @@ -96,151 +98,112 @@ export class ApacheAirflowAddOn extends HelmAddOn {

deploy(clusterInfo: ClusterInfo): Promise<Construct> {
const cluster = clusterInfo.cluster;
const albAddOnCheck = clusterInfo.getScheduledAddOn(AwsLoadBalancerControllerAddOn.name);
const enableAlb = this.options.enableAlb;
const cert = this.options.certificateResourceName;
const loggingIsEnabled = this.options.enableLogging;
const loggingBucketResourceName = this.options.s3Bucket;
const efsIsEnabled = this.options.enableEfs;
const efsResourceName = this.options.efsFileSystem;
const namespace = this.options.namespace;

// Create Namespace
const ns = createNamespace(this.options.namespace!, cluster, true, true);
const ns = createNamespace(namespace!, cluster, true, true);

let values: Values = populateValues(clusterInfo, ns, this.options);
// Setting basic custom values for Kubernetes
let values: Values = {
config: {
"kubernetes": {
"namespace": this.options.namespace!
},
"kubernetes_executor": {
"namespace": this.options.namespace!
}
},
"securityContext": {
"fsGroup": 66534
},
"executor": "KubernetesExecutor"
};

// If Load Balancing is enabled
if (enableAlb){
values = setUpLoadBalancer(clusterInfo, values, albAddOnCheck, cert);
} else {
assert(!cert, 'Cert option is supported only if ALB is enabled.');
}

// If Logging with S3 is enabled
if (loggingIsEnabled){
const bucket = clusterInfo.getRequiredResource<IBucket>(loggingBucketResourceName!);
values = setUpLogging(clusterInfo, values, ns, namespace!, bucket);
}

// If EFS is enabled for persistent storage
let pvcResource: KubernetesManifest;
if (efsIsEnabled){
[values, pvcResource] = setUpEFS(clusterInfo, values, ns, namespace!, efsResourceName!);
}

// Merge values with user-provided one
values = merge(values, this.props.values ?? {});

// Apply Helm Chart
const chart = this.addHelmChart(clusterInfo, values, false, false);

// Add PVC dependency to the Chart in case of EFS generating the resource
if (efsIsEnabled){
chart.node.addDependency(pvcResource!);
}

return Promise.resolve(chart);
}
}

/**
* populateValues populates the appropriate values used to customize the Helm chart
* @param helmOptions User provided values to customize the chart
* Helper function to set up Load Balancer
*/
function populateValues(clusterInfo: ClusterInfo, ns: KubernetesManifest, helmOptions: AirflowAddOnProps): Values {
let values = helmOptions.values ?? {};

const albAddOnCheck = clusterInfo.getScheduledAddOn(AwsLoadBalancerControllerAddOn.name);
const cert = helmOptions.certificateResourceName;
// const dbConfig = helmOptions.dbConfig;

// Kubernetes settings
setPath(values, "config.kubernetes.namespace", helmOptions.namespace!);
setPath(values, "config.kubernetes_executor.namespace", helmOptions.namespace!);

// Security Context
setPath(values, "securityContext.fsGroup", 66534);
setPath(values, "executor", "KubernetesExecutor");

// Load Balancer
if (helmOptions.enableAlb){

// Check to ensure AWS Load Balancer Controller AddOn is provided in the list of Addons
assert(albAddOnCheck, `Missing a dependency: ${AwsLoadBalancerControllerAddOn.name}. Please add it to your list of addons.`);
const presetAnnotations: any = {
'alb.ingress.kubernetes.io/group.name': 'airflow',
'alb.ingress.kubernetes.io/scheme': 'internet-facing',
'alb.ingress.kubernetes.io/target-type': 'ip',
'alb.ingress.kubernetes.io/listen-ports': '[{"HTTP": 80}]',
'alb.ingress.kubernetes.io/healthcheck-path': '/health',
};

// Set helm custom value for certificates, if provided
if (helmOptions.certificateResourceName){
presetAnnotations['alb.ingress.kubernetes.io/listen-ports'] = '[{"HTTP": 80},{"HTTPS":443}]';
const certificate = clusterInfo.getResource<ICertificate>(helmOptions.certificateResourceName);
presetAnnotations['alb.ingress.kubernetes.io/certificate-arn'] = certificate?.certificateArn;
}

setPath(values, "ingress.web", {
"enabled": "true",
"annotations": presetAnnotations,
"pathType": "Prefix",
"ingressClassName": "alb",
});

// Configuring Ingress for Airflow Web Ui hence the service type is changed to NodePort
setPath(values, "webserver.service", {
type: "NodePort",
ports: [{
name: "airflow-ui",
port: "{{ .Values.ports.airflowUI }}"
}]
});

} else {
assert(!cert, 'Cert option is supported only if ALB is enabled.');
}

// If Logging with S3 is enabled

if (helmOptions.enableLogging){
const bucket = clusterInfo.getRequiredResource<IBucket>(helmOptions.s3Bucket!);
values = setUpLogging(clusterInfo, values, ns, helmOptions, bucket);
}

// If EFS is enabled for persistent storage
if (helmOptions.enableEfs){
const efsAddOnCheck = clusterInfo.getScheduledAddOn(EfsCsiDriverAddOn.name);
assert(efsAddOnCheck, `Missing a dependency: ${EfsCsiDriverAddOn.name}. Please add it to your list of addons.`);
const efs = clusterInfo.getRequiredResource<IFileSystem>(helmOptions.efsFileSystem!);
assert(efs, "Please provide the name of EFS File System.");

// Need to create a storage class and pvc for the EFS
const efsResources = new KubernetesManifest(clusterInfo.cluster, 'apache-airflow-efs-sc', {
cluster: clusterInfo.cluster,
manifest: [{
apiVersion: "storage.k8s.io/v1",
kind: "StorageClass",
metadata: { name: AIRFLOWSC },
provisioner: "efs.csi.aws.com",
parameters: {
provisioningMode: "efs-ap",
fileSystemId: `${efs.fileSystemId}`,
directoryPerms: "700",
gidRangeStart: "1000",
gidRangeEnd: "2000",
}
},
{
apiVersion: "v1",
kind: "PersistentVolumeClaim",
metadata: {
name: AIRFLOWPVC,
namespace: `${helmOptions.namespace!}`
},
spec: {
accessModes: ["ReadWriteMany"],
storageClassName: AIRFLOWSC,
resources: {
requests: {
storage: '10Gi'
}
}
}
}],
overwrite: true,
});

// Set helm custom values for persistent storage of DAGs
setPath(values, "dags.persistence", {
enabled: true,
size: "10Gi",
storageClassName: AIRFLOWSC,
accessMode: "ReadWriteMany",
existingClaim: AIRFLOWPVC
});
}

// TODO: Using RDS as a Database
// if (helmOptions.enableRds){
// values = setUpDatabase(values, dbConfig)
// } else {
// assert(!dbConfig, 'DB Configuration is supported only if RDS is enabled.');
// }

return values;
function setUpLoadBalancer(clusterInfo: ClusterInfo, values: Values, albAddOnCheck: Promise<Construct> | undefined, cert: string | undefined ): Values {
// Check to ensure AWS Load Balancer Controller AddOn is provided in the list of Addons
assert(albAddOnCheck, `Missing a dependency: ${AwsLoadBalancerControllerAddOn.name}. Please add it to your list of addons.`);
const presetAnnotations: any = {
'alb.ingress.kubernetes.io/group.name': 'airflow',
'alb.ingress.kubernetes.io/scheme': 'internet-facing',
'alb.ingress.kubernetes.io/target-type': 'ip',
'alb.ingress.kubernetes.io/listen-ports': '[{"HTTP": 80}]',
'alb.ingress.kubernetes.io/healthcheck-path': '/health',
};

// Set helm custom value for certificates, if provided
if (cert){
presetAnnotations['alb.ingress.kubernetes.io/listen-ports'] = '[{"HTTP": 80},{"HTTPS":443}]';
const certificate = clusterInfo.getResource<ICertificate>(cert);
presetAnnotations['alb.ingress.kubernetes.io/certificate-arn'] = certificate?.certificateArn;
}

setPath(values, "ingress.web", {
"enabled": "true",
"annotations": presetAnnotations,
"pathType": "Prefix",
"ingressClassName": "alb",
});

// Configuring Ingress for Airflow Web Ui hence the service type is changed to NodePort
setPath(values, "webserver.service", {
type: "NodePort",
ports: [{
name: "airflow-ui",
port: "{{ .Values.ports.airflowUI }}"
}]
});

return values;
}

/**
* Helper function to set up Logging with S3 Bucket
*/
function setUpLogging(clusterInfo: ClusterInfo, values: Values, ns: KubernetesManifest, helmOptions: AirflowAddOnProps, bucket: IBucket): Values {
function setUpLogging(clusterInfo: ClusterInfo, values: Values, ns: KubernetesManifest, namespace: string, bucket: IBucket): Values {

// Assert check to ensure you provide an S3 Bucket
assert(bucket, "Please provide the name of S3 bucket for Logging.");
Expand Down Expand Up @@ -269,7 +232,7 @@ function setUpLogging(clusterInfo: ClusterInfo, values: Values, ns: KubernetesMa

// Set up IRSA
const airflowLoggingPolicyDocument = PolicyDocument.fromJson(AirflowLoggingPolicy);
const sa = createServiceAccount(clusterInfo.cluster, 'airflow-s3-logging-sa', helmOptions.namespace!, airflowLoggingPolicyDocument);
const sa = createServiceAccount(clusterInfo.cluster, 'airflow-s3-logging-sa', namespace, airflowLoggingPolicyDocument);
sa.node.addDependency(ns);

// Helm custom value set up for S3 logging set up
Expand Down Expand Up @@ -306,4 +269,74 @@ function setUpLogging(clusterInfo: ClusterInfo, values: Values, ns: KubernetesMa
});

return values;
}

/**
*
*/
function setUpEFS(clusterInfo: ClusterInfo, values: Values, ns: KubernetesManifest, namespace: string, efsResourceName: string): [Values, KubernetesManifest] {
// Check
const efsAddOnCheck = clusterInfo.getScheduledAddOn(EfsCsiDriverAddOn.name);
assert(efsAddOnCheck, `Missing a dependency: ${EfsCsiDriverAddOn.name}. Please add it to your list of addons.`);
const efs = clusterInfo.getRequiredResource<IFileSystem>(efsResourceName);
assert(efs, "Please provide the name of EFS File System.");

// Need to create a storage class and pvc for the EFS
const scResource = new KubernetesManifest(clusterInfo.cluster, 'apache-airflow-efs-sc', {
cluster: clusterInfo.cluster,
manifest: [{
apiVersion: "storage.k8s.io/v1",
kind: "StorageClass",
metadata: { name: AIRFLOWSC },
provisioner: "efs.csi.aws.com",
parameters: {
provisioningMode: "efs-ap",
fileSystemId: `${efs.fileSystemId}`,
directoryPerms: "700",
gidRangeStart: "1000",
gidRangeEnd: "2000",
}
}], overwrite: true,
});

const pvcResource = new KubernetesManifest(clusterInfo.cluster, 'apache-airflow-efs-pvc',{
cluster: clusterInfo.cluster,
manifest: [{
apiVersion: "v1",
kind: "PersistentVolumeClaim",
metadata: {
name: AIRFLOWPVC,
namespace: `${namespace}`
},
spec: {
accessModes: ["ReadWriteMany"],
storageClassName: AIRFLOWSC,
resources: {
requests: {
storage: '10Gi'
}
}
}
}], overwrite: true,
});

// SC depends on the EFS addon
if(efsAddOnCheck) {
efsAddOnCheck.then(construct => scResource.node.addDependency(construct));
}

// PVC depends on SC and NS
pvcResource.node.addDependency(scResource);
pvcResource.node.addDependency(ns);

// Set helm custom values for persistent storage of DAGs
setPath(values, "dags.persistence", {
enabled: true,
size: "10Gi",
storageClassName: AIRFLOWSC,
accessMode: "ReadWriteMany",
existingClaim: AIRFLOWPVC
});

return [values, pvcResource];
}