Skip to content

Commit

Permalink
adding all dependencies, code refactor, and adding merge with user-pr…
Browse files Browse the repository at this point in the history
…ovided helm value
  • Loading branch information
youngjeong46 committed Jun 12, 2023
1 parent 0090ed6 commit 676a2ea
Showing 1 changed file with 162 additions and 131 deletions.
293 changes: 162 additions & 131 deletions lib/addons/apache-airflow/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import { Values } from "../../spi";
import { setPath, createNamespace, createServiceAccount } from "../../utils";
import { IFileSystem } from "aws-cdk-lib/aws-efs";

import merge from "ts-deepmerge";

/**
* User provided options for the Helm Chart
*/
Expand Down Expand Up @@ -96,153 +98,112 @@ export class ApacheAirflowAddOn extends HelmAddOn {

deploy(clusterInfo: ClusterInfo): Promise<Construct> {
const cluster = clusterInfo.cluster;
const albAddOnCheck = clusterInfo.getScheduledAddOn(AwsLoadBalancerControllerAddOn.name);
const enableAlb = this.options.enableAlb;
const cert = this.options.certificateResourceName;
const loggingIsEnabled = this.options.enableLogging;
const loggingBucketResourceName = this.options.s3Bucket;
const efsIsEnabled = this.options.enableEfs;
const efsResourceName = this.options.efsFileSystem;
const namespace = this.options.namespace;

// Create Namespace
const ns = createNamespace(this.options.namespace!, cluster, true, true);
const ns = createNamespace(namespace!, cluster, true, true);

let values: Values = populateValues(clusterInfo, ns, this.options);
// Setting basic custom values for Kubernetes
let values: Values = {
config: {
"kubernetes": {
"namespace": this.options.namespace!
},
"kubernetes_executor": {
"namespace": this.options.namespace!
}
},
"securityContext": {
"fsGroup": 66534
},
"executor": "KubernetesExecutor"
};

// If Load Balancing is enabled
if (enableAlb){
values = setUpLoadBalancer(clusterInfo, values, albAddOnCheck, cert);
} else {
assert(!cert, 'Cert option is supported only if ALB is enabled.');
}

// If Logging with S3 is enabled
if (loggingIsEnabled){
const bucket = clusterInfo.getRequiredResource<IBucket>(loggingBucketResourceName!);
values = setUpLogging(clusterInfo, values, ns, namespace!, bucket);
}

// If EFS is enabled for persistent storage
let pvcResource: KubernetesManifest;
if (efsIsEnabled){
[values, pvcResource] = setUpEFS(clusterInfo, values, ns, namespace!, efsResourceName!);
}

// Merge values with user-provided one
values = merge(values, this.props.values ?? {});

// Apply Helm Chart
const chart = this.addHelmChart(clusterInfo, values, false, false);

// Add PVC dependency to the Chart in case of EFS generating the resource
if (efsIsEnabled){
chart.node.addDependency(pvcResource!);
}

return Promise.resolve(chart);
}
}

/**
* populateValues populates the appropriate values used to customize the Helm chart
* @param helmOptions User provided values to customize the chart
* Helper function to set up Load Balancer
*/
function populateValues(clusterInfo: ClusterInfo, ns: KubernetesManifest, helmOptions: AirflowAddOnProps): Values {
let values = helmOptions.values ?? {};

const albAddOnCheck = clusterInfo.getScheduledAddOn(AwsLoadBalancerControllerAddOn.name);
const cert = helmOptions.certificateResourceName;
// const dbConfig = helmOptions.dbConfig;

// Kubernetes settings
setPath(values, "config.kubernetes.namespace", helmOptions.namespace!);
setPath(values, "config.kubernetes_executor.namespace", helmOptions.namespace!);

// Security Context
setPath(values, "securityContext.fsGroup", 66534);
setPath(values, "executor", "KubernetesExecutor");

// Load Balancer
if (helmOptions.enableAlb){

// Check to ensure AWS Load Balancer Controller AddOn is provided in the list of Addons
assert(albAddOnCheck, `Missing a dependency: ${AwsLoadBalancerControllerAddOn.name}. Please add it to your list of addons.`);
const presetAnnotations: any = {
'alb.ingress.kubernetes.io/group.name': 'airflow',
'alb.ingress.kubernetes.io/scheme': 'internet-facing',
'alb.ingress.kubernetes.io/target-type': 'ip',
'alb.ingress.kubernetes.io/listen-ports': '[{"HTTP": 80}]',
'alb.ingress.kubernetes.io/healthcheck-path': '/health',
};

// Set helm custom value for certificates, if provided
if (helmOptions.certificateResourceName){
presetAnnotations['alb.ingress.kubernetes.io/listen-ports'] = '[{"HTTP": 80},{"HTTPS":443}]';
const certificate = clusterInfo.getResource<ICertificate>(helmOptions.certificateResourceName);
presetAnnotations['alb.ingress.kubernetes.io/certificate-arn'] = certificate?.certificateArn;
}

setPath(values, "ingress.web", {
"enabled": "true",
"annotations": presetAnnotations,
"pathType": "Prefix",
"ingressClassName": "alb",
});

// Configuring Ingress for Airflow Web Ui hence the service type is changed to NodePort
setPath(values, "webserver.service", {
type: "NodePort",
ports: [{
name: "airflow-ui",
port: "{{ .Values.ports.airflowUI }}"
}]
});

} else {
assert(!cert, 'Cert option is supported only if ALB is enabled.');
}

// If Logging with S3 is enabled

if (helmOptions.enableLogging){
const bucket = clusterInfo.getRequiredResource<IBucket>(helmOptions.s3Bucket!);
values = setUpLogging(clusterInfo, values, ns, helmOptions, bucket);
}

// If EFS is enabled for persistent storage
if (helmOptions.enableEfs){
const efsAddOnCheck = clusterInfo.getScheduledAddOn(EfsCsiDriverAddOn.name);
assert(efsAddOnCheck, `Missing a dependency: ${EfsCsiDriverAddOn.name}. Please add it to your list of addons.`);
const efs = clusterInfo.getRequiredResource<IFileSystem>(helmOptions.efsFileSystem!);
assert(efs, "Please provide the name of EFS File System.");

// Need to create a storage class and pvc for the EFS
const efsResources = new KubernetesManifest(clusterInfo.cluster, 'apache-airflow-efs-sc', {
cluster: clusterInfo.cluster,
manifest: [{
apiVersion: "storage.k8s.io/v1",
kind: "StorageClass",
metadata: { name: AIRFLOWSC },
provisioner: "efs.csi.aws.com",
parameters: {
provisioningMode: "efs-ap",
fileSystemId: `${efs.fileSystemId}`,
directoryPerms: "700",
gidRangeStart: "1000",
gidRangeEnd: "2000",
}
},
{
apiVersion: "v1",
kind: "PersistentVolumeClaim",
metadata: {
name: AIRFLOWPVC,
namespace: `${helmOptions.namespace!}`
},
spec: {
accessModes: ["ReadWriteMany"],
storageClassName: AIRFLOWSC,
resources: {
requests: {
storage: '10Gi'
}
}
}
}],
overwrite: true,
});

efsResources.node.addDependency(ns);

// Set helm custom values for persistent storage of DAGs
setPath(values, "dags.persistence", {
enabled: true,
size: "10Gi",
storageClassName: AIRFLOWSC,
accessMode: "ReadWriteMany",
existingClaim: AIRFLOWPVC
});
}

// TODO: Using RDS as a Database
// if (helmOptions.enableRds){
// values = setUpDatabase(values, dbConfig)
// } else {
// assert(!dbConfig, 'DB Configuration is supported only if RDS is enabled.');
// }

return values;
function setUpLoadBalancer(clusterInfo: ClusterInfo, values: Values, albAddOnCheck: Promise<Construct> | undefined, cert: string | undefined ): Values {
// Check to ensure AWS Load Balancer Controller AddOn is provided in the list of Addons
assert(albAddOnCheck, `Missing a dependency: ${AwsLoadBalancerControllerAddOn.name}. Please add it to your list of addons.`);
const presetAnnotations: any = {
'alb.ingress.kubernetes.io/group.name': 'airflow',
'alb.ingress.kubernetes.io/scheme': 'internet-facing',
'alb.ingress.kubernetes.io/target-type': 'ip',
'alb.ingress.kubernetes.io/listen-ports': '[{"HTTP": 80}]',
'alb.ingress.kubernetes.io/healthcheck-path': '/health',
};

// Set helm custom value for certificates, if provided
if (cert){
presetAnnotations['alb.ingress.kubernetes.io/listen-ports'] = '[{"HTTP": 80},{"HTTPS":443}]';
const certificate = clusterInfo.getResource<ICertificate>(cert);
presetAnnotations['alb.ingress.kubernetes.io/certificate-arn'] = certificate?.certificateArn;
}

setPath(values, "ingress.web", {
"enabled": "true",
"annotations": presetAnnotations,
"pathType": "Prefix",
"ingressClassName": "alb",
});

// Configuring Ingress for Airflow Web Ui hence the service type is changed to NodePort
setPath(values, "webserver.service", {
type: "NodePort",
ports: [{
name: "airflow-ui",
port: "{{ .Values.ports.airflowUI }}"
}]
});

return values;
}

/**
* Helper function to set up Logging with S3 Bucket
*/
function setUpLogging(clusterInfo: ClusterInfo, values: Values, ns: KubernetesManifest, helmOptions: AirflowAddOnProps, bucket: IBucket): Values {
function setUpLogging(clusterInfo: ClusterInfo, values: Values, ns: KubernetesManifest, namespace: string, bucket: IBucket): Values {

// Assert check to ensure you provide an S3 Bucket
assert(bucket, "Please provide the name of S3 bucket for Logging.");
Expand Down Expand Up @@ -271,7 +232,7 @@ function setUpLogging(clusterInfo: ClusterInfo, values: Values, ns: KubernetesMa

// Set up IRSA
const airflowLoggingPolicyDocument = PolicyDocument.fromJson(AirflowLoggingPolicy);
const sa = createServiceAccount(clusterInfo.cluster, 'airflow-s3-logging-sa', helmOptions.namespace!, airflowLoggingPolicyDocument);
const sa = createServiceAccount(clusterInfo.cluster, 'airflow-s3-logging-sa', namespace, airflowLoggingPolicyDocument);
sa.node.addDependency(ns);

// Helm custom value set up for S3 logging set up
Expand Down Expand Up @@ -308,4 +269,74 @@ function setUpLogging(clusterInfo: ClusterInfo, values: Values, ns: KubernetesMa
});

return values;
}

/**
*
*/
function setUpEFS(clusterInfo: ClusterInfo, values: Values, ns: KubernetesManifest, namespace: string, efsResourceName: string): [Values, KubernetesManifest] {
// Check
const efsAddOnCheck = clusterInfo.getScheduledAddOn(EfsCsiDriverAddOn.name);
assert(efsAddOnCheck, `Missing a dependency: ${EfsCsiDriverAddOn.name}. Please add it to your list of addons.`);
const efs = clusterInfo.getRequiredResource<IFileSystem>(efsResourceName);
assert(efs, "Please provide the name of EFS File System.");

// Need to create a storage class and pvc for the EFS
const scResource = new KubernetesManifest(clusterInfo.cluster, 'apache-airflow-efs-sc', {
cluster: clusterInfo.cluster,
manifest: [{
apiVersion: "storage.k8s.io/v1",
kind: "StorageClass",
metadata: { name: AIRFLOWSC },
provisioner: "efs.csi.aws.com",
parameters: {
provisioningMode: "efs-ap",
fileSystemId: `${efs.fileSystemId}`,
directoryPerms: "700",
gidRangeStart: "1000",
gidRangeEnd: "2000",
}
}], overwrite: true,
});

const pvcResource = new KubernetesManifest(clusterInfo.cluster, 'apache-airflow-efs-pvc',{
cluster: clusterInfo.cluster,
manifest: [{
apiVersion: "v1",
kind: "PersistentVolumeClaim",
metadata: {
name: AIRFLOWPVC,
namespace: `${namespace}`
},
spec: {
accessModes: ["ReadWriteMany"],
storageClassName: AIRFLOWSC,
resources: {
requests: {
storage: '10Gi'
}
}
}
}], overwrite: true,
});

// SC depends on the EFS addon
if(efsAddOnCheck) {
efsAddOnCheck.then(construct => scResource.node.addDependency(construct));
}

// PVC depends on SC and NS
pvcResource.node.addDependency(scResource);
pvcResource.node.addDependency(ns);

// Set helm custom values for persistent storage of DAGs
setPath(values, "dags.persistence", {
enabled: true,
size: "10Gi",
storageClassName: AIRFLOWSC,
accessMode: "ReadWriteMany",
existingClaim: AIRFLOWPVC
});

return [values, pvcResource];
}

0 comments on commit 676a2ea

Please sign in to comment.