Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Firehose sink sample app #7

Merged
merged 2 commits into from
Dec 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ This repository will include code examples and walkthroughs for the following co
Go ahead and install apache-flink since we'll need it for the rest of this exercise.

```bash
(my-new-environment) $ pip install apache-flink==1.13.6
(my-new-environment) $ pip install apache-flink==1.15.2
```

Additional Note: Please validate that you are either using Java 8 or Java 11 when running examples. There are compatibility issues with later versions of Java due to the Py4j libary calling out to the Kinesis Connector.
Expand Down
4 changes: 2 additions & 2 deletions deploying/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
5. AmazonKinesisAnalyticsFullAccess

3. Navigate to the Kinesis Analytics tab and click Create Application.
1. Select Apache Flink 1.13 as the version
1. Select Apache Flink 1.15 as the version
2. Choose the IAM Role you have previously created
3. Use the Development Template for Application Settings to reduce on costs. This will disable savepoints, reduce the granularity of metrics produced to CloudWatch, and set the application parallelism to `1`. For production workloads you will want different settings.

Expand All @@ -35,7 +35,7 @@

Click `Create group` and create a group called `kinesis.analytics.flink.run.options` with the following key value pairs:
- `python`: `GettingStarted/getting-started.py`
- `jarfile`: `GettingStarted/lib/flink-sql-connector-kinesis_2.12-1.13.2`
- `jarfile`: `GettingStarted/lib/flink-sql-connector-kinesis-1.15.2.jar`

The `python` key tells Kinesis Data Analytics for Apache Flink how to execute the file, and that it is a python application. It is the equivalent of locally typing `python getting-started.py` to execute the application.

Expand Down
Binary file modified deploying/img/2021-03-22-15-50-59.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion getting-started/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
3. Download and set up your Kinesis Connector.


Create a folder within your GettingStarted application called `lib`, and download [the latest minor version of your Apache Flink Version flink-sql-connector jar file from this repo](https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kinesis) into it. It's the Amazon Kinesis SQL Connector for Flink. Make sure you get the Scala 2.12 version of this connector as well. This will need to be bundled with your application on deploy, but ensure it's in your gitignore because adding jars to git is not a good practice.
Create a folder within your GettingStarted application called `lib`, and download [the latest minor version of your Apache Flink Version flink-sql-connector jar file from this repo](https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kinesis) into it. It's the Amazon Kinesis SQL Connector for Flink. This will need to be bundled with your application on deploy, but ensure it's in your gitignore because adding jars to git is not a good practice.

![](../img/2021-03-22-09-12-14.png)

Expand Down
4 changes: 1 addition & 3 deletions logging/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ When you're finished, the folder structure should look like:

```bash
UDF/
lib/flink-sql-connector-kinesis_2.12-1.13.2
lib/flink-sql-connector-kinesis-1.15.2.jar
application_properties.json
udf.py
```
Expand All @@ -68,8 +68,6 @@ UDF/
- Change `getting-started.py` to `udf.py`
(Select the group and click Edit Group)

![](img/2021-03-23-14-49-48.png)



Click `Update`, and the application will automatically restart picking up the new files.
Expand Down
8 changes: 4 additions & 4 deletions packaging/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Packaging your Pyflink application for deployment onto Kinesis Data Analytics fo

1. Python files `(getting-started.py)`
2. Any required dependencies such as `boto3`.
3. Any included jar files `(lib/flink-sql-connector-kinesis_2.12-1.13.2)`
3. Any included jar files `(lib/flink-sql-connector-kinesis-1.15.2.jar)`

### Including Python dependencies
Before we can archive our files, we need to look at how to include python dependencies in our application.
Expand All @@ -33,7 +33,7 @@ my-pyflink-project
│ │ mymodulefile2.py
...
└───lib
│ │ flink-sql-connector-kinesis_2.11-1.13.2.jar
│ │ flink-sql-connector-kinesis-1.15.2.jar
│ │ ...
...
```
Expand All @@ -57,7 +57,7 @@ Lastly, you can also include your own dependencies via the same method--placing


### Jar dependencies
If your application depends on a connector, be sure to include the connector jar (e.g. `flink-sql-connector-kinesis_2.11-1.13.2.jar`) in your package; under the lib folder in the tree structure shown above. Note that you don't have to name the folder lib, you just have to include it somewhere in your package and also ensure that you specify the jar dependency using the jarfile property as described below. Make sure the connector version corresponds to the appropriate Apache Flink version in your Kinesis Data Analytics application.
If your application depends on a connector, be sure to include the connector jar (e.g. `flink-sql-connector-kinesis-1.15.2.jar`) in your package; under the lib folder in the tree structure shown above. Note that you don't have to name the folder lib, you just have to include it somewhere in your package and also ensure that you specify the jar dependency using the jarfile property as described below. Make sure the connector version corresponds to the appropriate Apache Flink version in your Kinesis Data Analytics application.

If you have multiple dependencies, you have to create a fat jar and then include it using the jarfile property as described below. This is a Flink requirement as described [here](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/python/dependency_management/#jar-dependencies).

Expand Down Expand Up @@ -101,7 +101,7 @@ Next, type
adding: pyflink-examples/GettingStarted/getting-started.py (deflated 68%)
adding: pyflink-examples/GettingStarted/application_properties.json (deflated 56%)
adding: pyflink-examples/GettingStarted/lib/ (stored 0%)
adding: pyflink-examples/GettingStarted/lib/flink-sql-connector-kinesis_2.12-1.13.2 (deflated 8%)
adding: pyflink-examples/GettingStarted/lib/flink-sql-connector-kinesis-1.15.2.jar (deflated 8%)
```

This will create a file called `GettingStarted.zip` with the getting-started.py file, a lib folder with the kinesis connector, and an unused application_properties.json--you can choose to omit this if you'd like.
Expand Down
25 changes: 25 additions & 0 deletions pyflink-examples/FirehoseSink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Packaging Instructions for multiple connectors in Kinesis Data Analytics

If you need to use multiple connectors in your streaming application, you will need to create a fat jar, bundle it with your application and reference it in your application configuration as described [here](https://docs.aws.amazon.com/kinesisanalytics/latest/java/gs-python-createapp.html). This sample application shows how to bundle multiple connectors into a fat jar.

Pre-requisites:
1. Apache Maven
2. A Kinesis Data Stream and Kinesis Data Firehose Stream
3. (If running locally) Apache Flink installed and appropriate AWS credentials to access Kinesis and Firehose streams

To get this sample application working locally:
1. Run `mvn clean package` in the FirehoseSink folder
2. Ensure the resulting jar is referenced correctly in the python script
3. Ensure the `application_properties.json` parameters are configured correctly
4. Set the environment variable `IS_LOCAL=True`
5. Run the python script `python streaming-firehose-sink.py`

To get this sample application working in Kinesis Data Analytics:
1. Run `mvn clean package` in the FirehoseSink folder
2. Zip the python script and fat jar generated in the previous step
3. Upload the zip to an in region S3 bucket
4. Create the Kinesis Data Analytics application
5. Configure the application to use the zip uploaded to the S3 bucket and configure the application IAM role to be able to access both Kinesis and Firehose streams
6. Run the application

A sample script to produce appropriate Kinesis records, as well as detailed configuration instructions for Kinesis Data Analytics can be found [here](https://docs.aws.amazon.com/kinesisanalytics/latest/java/gs-python-createapp.html).
24 changes: 24 additions & 0 deletions pyflink-examples/FirehoseSink/application_properties.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[
{
"PropertyGroupId": "kinesis.analytics.flink.run.options",
"PropertyMap": {
"python": "streaming-firehose-sink.py",
"jarfile": "target/aws-kinesis-analytics-python-apps-1.jar"
}
},
{
"PropertyGroupId": "consumer.config.0",
"PropertyMap": {
"input.stream.name": "ExampleInputStream",
"scan.stream.initpos": "LATEST",
"aws.region": "us-east-1"
}
},
{
"PropertyGroupId": "producer.config.0",
"PropertyMap": {
"output.stream.name": "ExampleOutputStream",
"aws.region": "us-east-1"
}
}
]
54 changes: 54 additions & 0 deletions pyflink-examples/FirehoseSink/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
<project>
<modelVersion>4.0.0</modelVersion>
<groupId>com.amazonaws</groupId>
<artifactId>aws-kinesis-analytics-python-apps</artifactId>
<version>1</version>

<properties>
<flink.version>1.15.2</flink.version>
</properties>

<dependencies>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-aws-kinesis-firehose</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kinesis</artifactId>
<version>${flink.version}</version>
</dependency>

</dependencies>

<build>

<plugins>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.4.1</version>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>

</plugins>

</build>

</project>
127 changes: 127 additions & 0 deletions pyflink-examples/FirehoseSink/streaming-firehose-sink.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# -*- coding: utf-8 -*-

"""
streaming-firehose-sink.py
~~~~~~~~~~~~~~~~~~~
This module:
1. Creates a table environment
2. Creates a source table from a Kinesis Data Stream
3. Creates a sink table writing to a Kinesis Data Firehose Stream
4. Inserts the source table data into the sink table
"""

from pyflink.table import EnvironmentSettings, StreamTableEnvironment
import os
import json

# 1. Creates a Table Environment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = StreamTableEnvironment.create(environment_settings=env_settings)

APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json" # on kda

is_local = (
True if os.environ.get("IS_LOCAL") else False
) # set this env var in your local environment

if is_local:
# only for local, overwrite variable to properties and pass in your jars delimited by a semicolon (;)
APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json" # local

CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))
table_env.get_config().get_configuration().set_string(
"pipeline.jars",
"file:///" + CURRENT_DIR + "/target/aws-kinesis-analytics-python-apps-1.jar",
)

def get_application_properties():
if os.path.isfile(APPLICATION_PROPERTIES_FILE_PATH):
with open(APPLICATION_PROPERTIES_FILE_PATH, "r") as file:
contents = file.read()
properties = json.loads(contents)
return properties
else:
print('A file at "{}" was not found'.format(APPLICATION_PROPERTIES_FILE_PATH))


def property_map(props, property_group_id):
for prop in props:
if prop["PropertyGroupId"] == property_group_id:
return prop["PropertyMap"]


def create_kinesis_table(table_name, stream_name, region, stream_initpos = None):
init_pos = "\n'scan.stream.initpos' = '{0}',".format(stream_initpos) if stream_initpos is not None else ''

return """ CREATE TABLE {0} (
ticker VARCHAR(6),
price DOUBLE,
event_time TIMESTAMP(3)
)
WITH (
'connector' = 'kinesis',
'stream' = '{1}',
'aws.region' = '{2}',{3}
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
) """.format(table_name, stream_name, region, init_pos)

def create_firehose_table(table_name, stream_name, region):

return """ CREATE TABLE {0} (
ticker VARCHAR(6),
price DOUBLE,
event_time TIMESTAMP(3)
)
WITH (
'connector' = 'firehose',
'delivery-stream' = '{1}',
'aws.region' = '{2}',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
) """.format(table_name, stream_name, region)


def main():
# Application Property Keys
input_property_group_key = "consumer.config.0"
producer_property_group_key = "producer.config.0"

input_stream_key = "input.stream.name"
input_region_key = "aws.region"
input_starting_position_key = "scan.stream.initpos"

output_stream_key = "output.stream.name"
output_region_key = "aws.region"

# tables
input_table_name = "ExampleInputStream"
output_table_name = "ExampleOutputStream"

# get application properties
props = get_application_properties()

input_property_map = property_map(props, input_property_group_key)
output_property_map = property_map(props, producer_property_group_key)

input_stream = input_property_map[input_stream_key]
input_region = input_property_map[input_region_key]
stream_initpos = input_property_map[input_starting_position_key]

output_stream = output_property_map[output_stream_key]
output_region = output_property_map[output_region_key]

# 2. Creates a source table from a Kinesis Data Stream
table_env.execute_sql(create_kinesis_table(input_table_name, input_stream, input_region, stream_initpos))

# 3. Creates a sink table writing to a Kinesis Data Stream
table_env.execute_sql(create_firehose_table(output_table_name, output_stream, output_region))

# 4. Inserts the source table data into the sink table
result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}".format(output_table_name, input_table_name))

if is_local:
result.wait()

if __name__ == "__main__":
main()