Skip to content

Commit

Permalink
airbyte-workers: add support for kubernetes pod annotations (#10753)
Browse files Browse the repository at this point in the history
This PR adds the possibility to define pod annotations to the pods created by the workers.
Pod annotations can be required in different situations, such as configuring which IP pool to use when using some network plugins.

The original PR was here: #9874 we decided to split it into 3 different PRs.
  • Loading branch information
tbcdns authored Apr 5, 2022
1 parent 399469e commit 5528d7d
Show file tree
Hide file tree
Showing 14 changed files with 231 additions and 77 deletions.
31 changes: 26 additions & 5 deletions airbyte-config/models/src/main/java/io/airbyte/config/Configs.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

/**
Expand Down Expand Up @@ -293,23 +292,45 @@ public interface Configs {

/**
* Define one or more Job pod node selectors. Each kv-pair is separated by a `,`.
* Used for the sync job and as fallback in case job specific (spec, check, discover) node selectors are not defined.
*/
Optional<Map<String, String>> getJobKubeNodeSelectors();
Map<String, String> getJobKubeNodeSelectors();

/**
* Define node selectors for Spec job pods specifically. Each kv-pair is separated by a `,`.
*/
Optional<Map<String, String>> getSpecJobKubeNodeSelectors();
Map<String, String> getSpecJobKubeNodeSelectors();

/**
* Define node selectors for Check job pods specifically. Each kv-pair is separated by a `,`.
*/
Optional<Map<String, String>> getCheckJobKubeNodeSelectors();
Map<String, String> getCheckJobKubeNodeSelectors();

/**
* Define node selectors for Discover job pods specifically. Each kv-pair is separated by a `,`.
*/
Optional<Map<String, String>> getDiscoverJobKubeNodeSelectors();
Map<String, String> getDiscoverJobKubeNodeSelectors();

/**
* Define one or more Job pod annotations. Each kv-pair is separated by a `,`.
* Used for the sync job and as fallback in case job specific (spec, check, discover) annotations are not defined.
*/
Map<String, String> getJobKubeAnnotations();

/**
* Define annotations for Spec job pods specifically. Each kv-pair is separated by a `,`.
*/
Map<String, String> getSpecJobKubeAnnotations();

/**
* Define annotations for Check job pods specifically. Each kv-pair is separated by a `,`.
*/
Map<String, String> getCheckJobKubeAnnotations();

/**
* Define annotations for Discover job pods specifically. Each kv-pair is separated by a `,`.
*/
Map<String, String> getDiscoverJobKubeAnnotations();

/**
* Define the Job pod connector image pull policy.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class EnvConfigs implements Configs {
public static final String JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY = "JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY";
public static final String JOB_KUBE_TOLERATIONS = "JOB_KUBE_TOLERATIONS";
public static final String JOB_KUBE_NODE_SELECTORS = "JOB_KUBE_NODE_SELECTORS";
public static final String JOB_KUBE_ANNOTATIONS = "JOB_KUBE_ANNOTATIONS";
public static final String JOB_KUBE_SOCAT_IMAGE = "JOB_KUBE_SOCAT_IMAGE";
public static final String JOB_KUBE_BUSYBOX_IMAGE = "JOB_KUBE_BUSYBOX_IMAGE";
public static final String JOB_KUBE_CURL_IMAGE = "JOB_KUBE_CURL_IMAGE";
Expand Down Expand Up @@ -127,6 +128,9 @@ public class EnvConfigs implements Configs {
public static final String SPEC_JOB_KUBE_NODE_SELECTORS = "SPEC_JOB_KUBE_NODE_SELECTORS";
public static final String CHECK_JOB_KUBE_NODE_SELECTORS = "CHECK_JOB_KUBE_NODE_SELECTORS";
public static final String DISCOVER_JOB_KUBE_NODE_SELECTORS = "DISCOVER_JOB_KUBE_NODE_SELECTORS";
public static final String SPEC_JOB_KUBE_ANNOTATIONS = "SPEC_JOB_KUBE_ANNOTATIONS";
public static final String CHECK_JOB_KUBE_ANNOTATIONS = "CHECK_JOB_KUBE_ANNOTATIONS";
public static final String DISCOVER_JOB_KUBE_ANNOTATIONS = "DISCOVER_JOB_KUBE_ANNOTATIONS";

private static final String REPLICATION_ORCHESTRATOR_CPU_REQUEST = "REPLICATION_ORCHESTRATOR_CPU_REQUEST";
private static final String REPLICATION_ORCHESTRATOR_CPU_LIMIT = "REPLICATION_ORCHESTRATOR_CPU_LIMIT";
Expand Down Expand Up @@ -484,8 +488,8 @@ private TolerationPOJO parseToleration(final String tolerationStr) {
* @return map containing kv pairs of node selectors, or empty optional if none present.
*/
@Override
public Optional<Map<String, String>> getJobKubeNodeSelectors() {
return getNodeSelectorsFromEnvString(getEnvOrDefault(JOB_KUBE_NODE_SELECTORS, ""));
public Map<String, String> getJobKubeNodeSelectors() {
return splitKVPairsFromEnvString(getEnvOrDefault(JOB_KUBE_NODE_SELECTORS, ""));
}

/**
Expand All @@ -494,8 +498,8 @@ public Optional<Map<String, String>> getJobKubeNodeSelectors() {
* @return map containing kv pairs of node selectors, or empty optional if none present.
*/
@Override
public Optional<Map<String, String>> getSpecJobKubeNodeSelectors() {
return getNodeSelectorsFromEnvString(getEnvOrDefault(SPEC_JOB_KUBE_NODE_SELECTORS, ""));
public Map<String, String> getSpecJobKubeNodeSelectors() {
return splitKVPairsFromEnvString(getEnvOrDefault(SPEC_JOB_KUBE_NODE_SELECTORS, ""));
}

/**
Expand All @@ -504,8 +508,8 @@ public Optional<Map<String, String>> getSpecJobKubeNodeSelectors() {
* @return map containing kv pairs of node selectors, or empty optional if none present.
*/
@Override
public Optional<Map<String, String>> getCheckJobKubeNodeSelectors() {
return getNodeSelectorsFromEnvString(getEnvOrDefault(CHECK_JOB_KUBE_NODE_SELECTORS, ""));
public Map<String, String> getCheckJobKubeNodeSelectors() {
return splitKVPairsFromEnvString(getEnvOrDefault(CHECK_JOB_KUBE_NODE_SELECTORS, ""));
}

/**
Expand All @@ -514,28 +518,76 @@ public Optional<Map<String, String>> getCheckJobKubeNodeSelectors() {
* @return map containing kv pairs of node selectors, or empty optional if none present.
*/
@Override
public Optional<Map<String, String>> getDiscoverJobKubeNodeSelectors() {
return getNodeSelectorsFromEnvString(getEnvOrDefault(DISCOVER_JOB_KUBE_NODE_SELECTORS, ""));
public Map<String, String> getDiscoverJobKubeNodeSelectors() {
return splitKVPairsFromEnvString(getEnvOrDefault(DISCOVER_JOB_KUBE_NODE_SELECTORS, ""));
}

/**
* Parse string containing node selectors into a map. Each kv-pair is separated by a `,`
* Returns a map of annotations from its own environment variable. The value of the env is a string
* that represents one or more annotations. Each kv-pair is separated by a `,`
* <p>
* For example:- The following represents two node selectors
* For example:- The following represents two annotations
* <p>
* airbyte=server,type=preemptive
*
* @param envString string that represents one or more node selector labels.
* @return map containing kv pairs of annotations
*/
@Override
public Map<String, String> getJobKubeAnnotations() {
return splitKVPairsFromEnvString(getEnvOrDefault(JOB_KUBE_ANNOTATIONS, ""));
}

/**
* Returns a map of node selectors for Spec job pods specifically.
*
* @return map containing kv pairs of node selectors, or empty optional if none present.
*/
private Optional<Map<String, String>> getNodeSelectorsFromEnvString(final String envString) {
final Map<String, String> selectors = Splitter.on(",")
.splitToStream(envString)
@Override
public Map<String, String> getSpecJobKubeAnnotations() {
return splitKVPairsFromEnvString(getEnvOrDefault(SPEC_JOB_KUBE_ANNOTATIONS, ""));
}

/**
* Returns a map of node selectors for Check job pods specifically.
*
* @return map containing kv pairs of node selectors, or empty optional if none present.
*/
@Override
public Map<String, String> getCheckJobKubeAnnotations() {
return splitKVPairsFromEnvString(getEnvOrDefault(CHECK_JOB_KUBE_ANNOTATIONS, ""));
}

/**
* Returns a map of node selectors for Discover job pods specifically.
*
* @return map containing kv pairs of node selectors, or empty optional if none present.
*/
@Override
public Map<String, String> getDiscoverJobKubeAnnotations() {
return splitKVPairsFromEnvString(getEnvOrDefault(DISCOVER_JOB_KUBE_ANNOTATIONS, ""));
}

/**
* Splits key value pairs from the input string into a map. Each kv-pair is separated by a ','. The
* key and the value are separated by '='.
* <p>
* For example:- The following represents two map entries
* </p>
* key1=value1,key2=value2
*
* @param input string
* @return map containing kv pairs
*/
public Map<String, String> splitKVPairsFromEnvString(String input) {
if (input == null) {
input = "";
}
final Map<String, String> map = Splitter.on(",")
.splitToStream(input)
.filter(s -> !Strings.isNullOrEmpty(s) && s.contains("="))
.map(s -> s.split("="))
.collect(Collectors.toMap(s -> s[0], s -> s[1]));

return selectors.isEmpty() ? Optional.empty() : Optional.of(selectors);
.collect(Collectors.toMap(s -> s[0].trim(), s -> s[1].trim()));
return map.isEmpty() ? null : map;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,76 +186,103 @@ void testworkerKubeTolerations() {
new TolerationPOJO("airbyte-server", "NoSchedule", "true", "Equals")));
}

@Test
void testSplitKVPairsFromEnvString() {
String input = "key1=value1,key2=value2";
Map<String, String> map = config.splitKVPairsFromEnvString(input);
assertNotNull(map);
assertEquals(2, map.size());
assertEquals(map, Map.of("key1", "value1", "key2", "value2"));

input = "key=k,,;$%&^#";
map = config.splitKVPairsFromEnvString(input);
assertNotNull(map);
assertEquals(map, Map.of("key", "k"));

input = null;
map = config.splitKVPairsFromEnvString(input);
assertNull(map);

input = " key1= value1, key2 = value2";
map = config.splitKVPairsFromEnvString(input);
assertNotNull(map);
assertEquals(map, Map.of("key1", "value1", "key2", "value2"));

input = "key1:value1,key2:value2";
map = config.splitKVPairsFromEnvString(input);
assertNull(map);
}

@Test
void testJobKubeNodeSelectors() {
envMap.put(EnvConfigs.JOB_KUBE_NODE_SELECTORS, null);
assertFalse(config.getJobKubeNodeSelectors().isPresent());
assertNull(config.getJobKubeNodeSelectors());

envMap.put(EnvConfigs.JOB_KUBE_NODE_SELECTORS, ",,,");
assertFalse(config.getJobKubeNodeSelectors().isPresent());
assertNull(config.getJobKubeNodeSelectors());

envMap.put(EnvConfigs.JOB_KUBE_NODE_SELECTORS, "key=k,,;$%&^#");
assertEquals(config.getJobKubeNodeSelectors().get(), Map.of("key", "k"));
assertEquals(config.getJobKubeNodeSelectors(), Map.of("key", "k"));

envMap.put(EnvConfigs.JOB_KUBE_NODE_SELECTORS, "one=two");
assertEquals(config.getJobKubeNodeSelectors().get(), Map.of("one", "two"));
assertEquals(config.getJobKubeNodeSelectors(), Map.of("one", "two"));

envMap.put(EnvConfigs.JOB_KUBE_NODE_SELECTORS, "airbyte=server,something=nothing");
assertEquals(config.getJobKubeNodeSelectors().get(), Map.of("airbyte", "server", "something", "nothing"));
assertEquals(config.getJobKubeNodeSelectors(), Map.of("airbyte", "server", "something", "nothing"));
}

@Test
void testSpecKubeNodeSelectors() {
envMap.put(EnvConfigs.SPEC_JOB_KUBE_NODE_SELECTORS, null);
assertFalse(config.getSpecJobKubeNodeSelectors().isPresent());
assertNull(config.getSpecJobKubeNodeSelectors());

envMap.put(EnvConfigs.SPEC_JOB_KUBE_NODE_SELECTORS, ",,,");
assertFalse(config.getSpecJobKubeNodeSelectors().isPresent());
assertNull(config.getSpecJobKubeNodeSelectors());

envMap.put(EnvConfigs.SPEC_JOB_KUBE_NODE_SELECTORS, "key=k,,;$%&^#");
assertEquals(config.getSpecJobKubeNodeSelectors().get(), Map.of("key", "k"));
assertEquals(config.getSpecJobKubeNodeSelectors(), Map.of("key", "k"));

envMap.put(EnvConfigs.SPEC_JOB_KUBE_NODE_SELECTORS, "one=two");
assertEquals(config.getSpecJobKubeNodeSelectors().get(), Map.of("one", "two"));
assertEquals(config.getSpecJobKubeNodeSelectors(), Map.of("one", "two"));

envMap.put(EnvConfigs.SPEC_JOB_KUBE_NODE_SELECTORS, "airbyte=server,something=nothing");
assertEquals(config.getSpecJobKubeNodeSelectors().get(), Map.of("airbyte", "server", "something", "nothing"));
assertEquals(config.getSpecJobKubeNodeSelectors(), Map.of("airbyte", "server", "something", "nothing"));
}

@Test
void testCheckKubeNodeSelectors() {
envMap.put(EnvConfigs.CHECK_JOB_KUBE_NODE_SELECTORS, null);
assertFalse(config.getCheckJobKubeNodeSelectors().isPresent());
assertNull(config.getCheckJobKubeNodeSelectors());

envMap.put(EnvConfigs.CHECK_JOB_KUBE_NODE_SELECTORS, ",,,");
assertFalse(config.getCheckJobKubeNodeSelectors().isPresent());
assertNull(config.getCheckJobKubeNodeSelectors());

envMap.put(EnvConfigs.CHECK_JOB_KUBE_NODE_SELECTORS, "key=k,,;$%&^#");
assertEquals(config.getCheckJobKubeNodeSelectors().get(), Map.of("key", "k"));
assertEquals(config.getCheckJobKubeNodeSelectors(), Map.of("key", "k"));

envMap.put(EnvConfigs.CHECK_JOB_KUBE_NODE_SELECTORS, "one=two");
assertEquals(config.getCheckJobKubeNodeSelectors().get(), Map.of("one", "two"));
assertEquals(config.getCheckJobKubeNodeSelectors(), Map.of("one", "two"));

envMap.put(EnvConfigs.CHECK_JOB_KUBE_NODE_SELECTORS, "airbyte=server,something=nothing");
assertEquals(config.getCheckJobKubeNodeSelectors().get(), Map.of("airbyte", "server", "something", "nothing"));
assertEquals(config.getCheckJobKubeNodeSelectors(), Map.of("airbyte", "server", "something", "nothing"));
}

@Test
void testDiscoverKubeNodeSelectors() {
envMap.put(EnvConfigs.DISCOVER_JOB_KUBE_NODE_SELECTORS, null);
assertFalse(config.getDiscoverJobKubeNodeSelectors().isPresent());
assertNull(config.getDiscoverJobKubeNodeSelectors());

envMap.put(EnvConfigs.DISCOVER_JOB_KUBE_NODE_SELECTORS, ",,,");
assertFalse(config.getDiscoverJobKubeNodeSelectors().isPresent());
assertNull(config.getDiscoverJobKubeNodeSelectors());

envMap.put(EnvConfigs.DISCOVER_JOB_KUBE_NODE_SELECTORS, "key=k,,;$%&^#");
assertEquals(config.getDiscoverJobKubeNodeSelectors().get(), Map.of("key", "k"));
assertEquals(config.getDiscoverJobKubeNodeSelectors(), Map.of("key", "k"));

envMap.put(EnvConfigs.DISCOVER_JOB_KUBE_NODE_SELECTORS, "one=two");
assertEquals(config.getDiscoverJobKubeNodeSelectors().get(), Map.of("one", "two"));
assertEquals(config.getDiscoverJobKubeNodeSelectors(), Map.of("one", "two"));

envMap.put(EnvConfigs.DISCOVER_JOB_KUBE_NODE_SELECTORS, "airbyte=server,something=nothing");
assertEquals(config.getDiscoverJobKubeNodeSelectors().get(), Map.of("airbyte", "server", "something", "nothing"));
assertEquals(config.getDiscoverJobKubeNodeSelectors(), Map.of("airbyte", "server", "something", "nothing"));
}

@Test
Expand Down
Loading

0 comments on commit 5528d7d

Please sign in to comment.