diff --git a/README.md b/README.md index 5737a17..4d8e90e 100644 --- a/README.md +++ b/README.md @@ -69,7 +69,7 @@ This repository will include code examples and walkthroughs for the following co Go ahead and install apache-flink since we'll need it for the rest of this exercise. ```bash - (my-new-environment) $ pip install apache-flink==1.13.6 + (my-new-environment) $ pip install apache-flink==1.15.2 ``` Additional Note: Please validate that you are either using Java 8 or Java 11 when running examples. There are compatibility issues with later versions of Java due to the Py4j libary calling out to the Kinesis Connector. diff --git a/deploying/README.md b/deploying/README.md index 0487ccc..4b1cc5b 100644 --- a/deploying/README.md +++ b/deploying/README.md @@ -16,7 +16,7 @@ 5. AmazonKinesisAnalyticsFullAccess 3. Navigate to the Kinesis Analytics tab and click Create Application. - 1. Select Apache Flink 1.13 as the version + 1. Select Apache Flink 1.15 as the version 2. Choose the IAM Role you have previously created 3. Use the Development Template for Application Settings to reduce on costs. This will disable savepoints, reduce the granularity of metrics produced to CloudWatch, and set the application parallelism to `1`. For production workloads you will want different settings. @@ -35,7 +35,7 @@ Click `Create group` and create a group called `kinesis.analytics.flink.run.options` with the following key value pairs: - `python`: `GettingStarted/getting-started.py` - - `jarfile`: `GettingStarted/lib/flink-sql-connector-kinesis_2.12-1.13.2` + - `jarfile`: `GettingStarted/lib/flink-sql-connector-kinesis-1.15.2.jar` The `python` key tells Kinesis Data Analytics for Apache Flink how to execute the file, and that it is a python application. It is the equivalent of locally typing `python getting-started.py` to execute the application. diff --git a/deploying/img/2021-03-22-15-50-59.png b/deploying/img/2021-03-22-15-50-59.png index 39de893..64935d3 100644 Binary files a/deploying/img/2021-03-22-15-50-59.png and b/deploying/img/2021-03-22-15-50-59.png differ diff --git a/getting-started/README.md b/getting-started/README.md index bc13c8c..6a2681e 100644 --- a/getting-started/README.md +++ b/getting-started/README.md @@ -72,7 +72,7 @@ 3. Download and set up your Kinesis Connector. - Create a folder within your GettingStarted application called `lib`, and download [the latest minor version of your Apache Flink Version flink-sql-connector jar file from this repo](https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kinesis) into it. It's the Amazon Kinesis SQL Connector for Flink. Make sure you get the Scala 2.12 version of this connector as well. This will need to be bundled with your application on deploy, but ensure it's in your gitignore because adding jars to git is not a good practice. + Create a folder within your GettingStarted application called `lib`, and download [the latest minor version of your Apache Flink Version flink-sql-connector jar file from this repo](https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kinesis) into it. It's the Amazon Kinesis SQL Connector for Flink. This will need to be bundled with your application on deploy, but ensure it's in your gitignore because adding jars to git is not a good practice. ![](../img/2021-03-22-09-12-14.png) diff --git a/logging/README.md b/logging/README.md index a33f462..72c91f1 100644 --- a/logging/README.md +++ b/logging/README.md @@ -47,7 +47,7 @@ When you're finished, the folder structure should look like: ```bash UDF/ - lib/flink-sql-connector-kinesis_2.12-1.13.2 + lib/flink-sql-connector-kinesis-1.15.2.jar application_properties.json udf.py ``` @@ -68,8 +68,6 @@ UDF/ - Change `getting-started.py` to `udf.py` (Select the group and click Edit Group) - ![](img/2021-03-23-14-49-48.png) - Click `Update`, and the application will automatically restart picking up the new files. diff --git a/packaging/README.md b/packaging/README.md index a451121..b404995 100644 --- a/packaging/README.md +++ b/packaging/README.md @@ -6,7 +6,7 @@ Packaging your Pyflink application for deployment onto Kinesis Data Analytics fo 1. Python files `(getting-started.py)` 2. Any required dependencies such as `boto3`. -3. Any included jar files `(lib/flink-sql-connector-kinesis_2.12-1.13.2)` +3. Any included jar files `(lib/flink-sql-connector-kinesis-1.15.2.jar)` ### Including Python dependencies Before we can archive our files, we need to look at how to include python dependencies in our application. @@ -33,7 +33,7 @@ my-pyflink-project │ │ mymodulefile2.py ... └───lib -│ │ flink-sql-connector-kinesis_2.11-1.13.2.jar +│ │ flink-sql-connector-kinesis-1.15.2.jar │ │ ... ... ``` @@ -57,7 +57,7 @@ Lastly, you can also include your own dependencies via the same method--placing ### Jar dependencies -If your application depends on a connector, be sure to include the connector jar (e.g. `flink-sql-connector-kinesis_2.11-1.13.2.jar`) in your package; under the lib folder in the tree structure shown above. Note that you don't have to name the folder lib, you just have to include it somewhere in your package and also ensure that you specify the jar dependency using the jarfile property as described below. Make sure the connector version corresponds to the appropriate Apache Flink version in your Kinesis Data Analytics application. +If your application depends on a connector, be sure to include the connector jar (e.g. `flink-sql-connector-kinesis-1.15.2.jar`) in your package; under the lib folder in the tree structure shown above. Note that you don't have to name the folder lib, you just have to include it somewhere in your package and also ensure that you specify the jar dependency using the jarfile property as described below. Make sure the connector version corresponds to the appropriate Apache Flink version in your Kinesis Data Analytics application. If you have multiple dependencies, you have to create a fat jar and then include it using the jarfile property as described below. This is a Flink requirement as described [here](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/python/dependency_management/#jar-dependencies). @@ -101,7 +101,7 @@ Next, type adding: pyflink-examples/GettingStarted/getting-started.py (deflated 68%) adding: pyflink-examples/GettingStarted/application_properties.json (deflated 56%) adding: pyflink-examples/GettingStarted/lib/ (stored 0%) - adding: pyflink-examples/GettingStarted/lib/flink-sql-connector-kinesis_2.12-1.13.2 (deflated 8%) + adding: pyflink-examples/GettingStarted/lib/flink-sql-connector-kinesis-1.15.2.jar (deflated 8%) ``` This will create a file called `GettingStarted.zip` with the getting-started.py file, a lib folder with the kinesis connector, and an unused application_properties.json--you can choose to omit this if you'd like. diff --git a/pyflink-examples/FirehoseSink/README.md b/pyflink-examples/FirehoseSink/README.md new file mode 100644 index 0000000..c033a99 --- /dev/null +++ b/pyflink-examples/FirehoseSink/README.md @@ -0,0 +1,25 @@ +# Packaging Instructions for multiple connectors in Kinesis Data Analytics + +If you need to use multiple connectors in your streaming application, you will need to create a fat jar, bundle it with your application and reference it in your application configuration as described [here](https://docs.aws.amazon.com/kinesisanalytics/latest/java/gs-python-createapp.html). This sample application shows how to bundle multiple connectors into a fat jar. + +Pre-requisites: +1. Apache Maven +2. A Kinesis Data Stream and Kinesis Data Firehose Stream +3. (If running locally) Apache Flink installed and appropriate AWS credentials to access Kinesis and Firehose streams + +To get this sample application working locally: +1. Run `mvn clean package` in the FirehoseSink folder +2. Ensure the resulting jar is referenced correctly in the python script +3. Ensure the `application_properties.json` parameters are configured correctly +4. Set the environment variable `IS_LOCAL=True` +5. Run the python script `python streaming-firehose-sink.py` + +To get this sample application working in Kinesis Data Analytics: +1. Run `mvn clean package` in the FirehoseSink folder +2. Zip the python script and fat jar generated in the previous step +3. Upload the zip to an in region S3 bucket +4. Create the Kinesis Data Analytics application +5. Configure the application to use the zip uploaded to the S3 bucket and configure the application IAM role to be able to access both Kinesis and Firehose streams +6. Run the application + +A sample script to produce appropriate Kinesis records, as well as detailed configuration instructions for Kinesis Data Analytics can be found [here](https://docs.aws.amazon.com/kinesisanalytics/latest/java/gs-python-createapp.html). \ No newline at end of file diff --git a/pyflink-examples/FirehoseSink/application_properties.json b/pyflink-examples/FirehoseSink/application_properties.json new file mode 100644 index 0000000..dfdbaf1 --- /dev/null +++ b/pyflink-examples/FirehoseSink/application_properties.json @@ -0,0 +1,24 @@ +[ + { + "PropertyGroupId": "kinesis.analytics.flink.run.options", + "PropertyMap": { + "python": "streaming-firehose-sink.py", + "jarfile": "target/aws-kinesis-analytics-python-apps-1.jar" + } + }, + { + "PropertyGroupId": "consumer.config.0", + "PropertyMap": { + "input.stream.name": "ExampleInputStream", + "scan.stream.initpos": "LATEST", + "aws.region": "us-east-1" + } + }, + { + "PropertyGroupId": "producer.config.0", + "PropertyMap": { + "output.stream.name": "ExampleOutputStream", + "aws.region": "us-east-1" + } + } +] \ No newline at end of file diff --git a/pyflink-examples/FirehoseSink/pom.xml b/pyflink-examples/FirehoseSink/pom.xml new file mode 100644 index 0000000..c31c4ba --- /dev/null +++ b/pyflink-examples/FirehoseSink/pom.xml @@ -0,0 +1,54 @@ + + 4.0.0 + com.amazonaws + aws-kinesis-analytics-python-apps + 1 + + + 1.15.2 + + + + + + org.apache.flink + flink-connector-aws-kinesis-firehose + ${flink.version} + + + + org.apache.flink + flink-connector-kinesis + ${flink.version} + + + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.4.1 + + + + + + + + package + + shade + + + + + + + + + + \ No newline at end of file diff --git a/pyflink-examples/FirehoseSink/streaming-firehose-sink.py b/pyflink-examples/FirehoseSink/streaming-firehose-sink.py new file mode 100644 index 0000000..7f1e330 --- /dev/null +++ b/pyflink-examples/FirehoseSink/streaming-firehose-sink.py @@ -0,0 +1,127 @@ +# -*- coding: utf-8 -*- + +""" +streaming-firehose-sink.py +~~~~~~~~~~~~~~~~~~~ +This module: + 1. Creates a table environment + 2. Creates a source table from a Kinesis Data Stream + 3. Creates a sink table writing to a Kinesis Data Firehose Stream + 4. Inserts the source table data into the sink table +""" + +from pyflink.table import EnvironmentSettings, StreamTableEnvironment +import os +import json + +# 1. Creates a Table Environment +env_settings = EnvironmentSettings.in_streaming_mode() +table_env = StreamTableEnvironment.create(environment_settings=env_settings) + +APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json" # on kda + +is_local = ( + True if os.environ.get("IS_LOCAL") else False +) # set this env var in your local environment + +if is_local: + # only for local, overwrite variable to properties and pass in your jars delimited by a semicolon (;) + APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json" # local + + CURRENT_DIR = os.path.dirname(os.path.realpath(__file__)) + table_env.get_config().get_configuration().set_string( + "pipeline.jars", + "file:///" + CURRENT_DIR + "/target/aws-kinesis-analytics-python-apps-1.jar", + ) + +def get_application_properties(): + if os.path.isfile(APPLICATION_PROPERTIES_FILE_PATH): + with open(APPLICATION_PROPERTIES_FILE_PATH, "r") as file: + contents = file.read() + properties = json.loads(contents) + return properties + else: + print('A file at "{}" was not found'.format(APPLICATION_PROPERTIES_FILE_PATH)) + + +def property_map(props, property_group_id): + for prop in props: + if prop["PropertyGroupId"] == property_group_id: + return prop["PropertyMap"] + + +def create_kinesis_table(table_name, stream_name, region, stream_initpos = None): + init_pos = "\n'scan.stream.initpos' = '{0}',".format(stream_initpos) if stream_initpos is not None else '' + + return """ CREATE TABLE {0} ( + ticker VARCHAR(6), + price DOUBLE, + event_time TIMESTAMP(3) + ) + WITH ( + 'connector' = 'kinesis', + 'stream' = '{1}', + 'aws.region' = '{2}',{3} + 'format' = 'json', + 'json.timestamp-format.standard' = 'ISO-8601' + ) """.format(table_name, stream_name, region, init_pos) + +def create_firehose_table(table_name, stream_name, region): + + return """ CREATE TABLE {0} ( + ticker VARCHAR(6), + price DOUBLE, + event_time TIMESTAMP(3) + ) + WITH ( + 'connector' = 'firehose', + 'delivery-stream' = '{1}', + 'aws.region' = '{2}', + 'format' = 'json', + 'json.timestamp-format.standard' = 'ISO-8601' + ) """.format(table_name, stream_name, region) + + +def main(): + # Application Property Keys + input_property_group_key = "consumer.config.0" + producer_property_group_key = "producer.config.0" + + input_stream_key = "input.stream.name" + input_region_key = "aws.region" + input_starting_position_key = "scan.stream.initpos" + + output_stream_key = "output.stream.name" + output_region_key = "aws.region" + + # tables + input_table_name = "ExampleInputStream" + output_table_name = "ExampleOutputStream" + + # get application properties + props = get_application_properties() + + input_property_map = property_map(props, input_property_group_key) + output_property_map = property_map(props, producer_property_group_key) + + input_stream = input_property_map[input_stream_key] + input_region = input_property_map[input_region_key] + stream_initpos = input_property_map[input_starting_position_key] + + output_stream = output_property_map[output_stream_key] + output_region = output_property_map[output_region_key] + + # 2. Creates a source table from a Kinesis Data Stream + table_env.execute_sql(create_kinesis_table(input_table_name, input_stream, input_region, stream_initpos)) + + # 3. Creates a sink table writing to a Kinesis Data Stream + table_env.execute_sql(create_firehose_table(output_table_name, output_stream, output_region)) + + # 4. Inserts the source table data into the sink table + result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}".format(output_table_name, input_table_name)) + + if is_local: + result.wait() + +if __name__ == "__main__": + main()