Click here to Skip to main content
15,118,017 members
Articles / Hosted Services / AWS
Technical Blog
Posted 7 Mar 2020

Stats

4.5K views
1 bookmarked

Using the AWS Toolkit for PyCharm to Create and Deploy a Kinesis Firehose Stream with a Lambda Transformation Function.

Rate me:
Please Sign up or sign in to vote.
0.00/5 (No votes)
7 Mar 2020CPOL13 min read
A semi-realistic example of using AWS Kinesis Firehose
In this tutorial, you create a semi-realistic example of using AWS Kinesis Firehose. You use the AWS Toolkit for Pycharm to create a Lambda transformation function that is deployed to AWS CloudFormation using a Serverless Application Model (SAM) template. You then create the Kinesis Firehose stream and attach the lambda function to the stream to transform the data.

Amazon Web Services Kinesis Firehose is a service offered by Amazon for streaming large amounts of data in near real-time. Streaming data is continuously generated data that can be originated by many sources and can be sent simultaneously and in small payloads. Logs, Internet of Things (IoT) devices, and stock market data are three obvious data stream examples. Kinesis Streams Firehose manages scaling for you transparently. Firehose allows you to load streaming data into Amazon S3, Amazon Redshift, Amazon Elasticsearch Service, and Splunk. You can also transform the data using a Lambda function. Firehose also allows easy encryption of data and compressing the data so that data is secure and takes less space. For more information, refer to Amazon’s introduction to Kinesis Firehose.

If you prefer watching a video introduction, the following is a good Kinesis Firehose overview.

AWS Introduction to Kinesis Firehose

Other Tutorials

Although this tutorial stands alone, you might wish to view some more straight-forward tutorials on Kinesis Firehose before continuing with this tutorial. Here, we add complexity by using Pycharm and an AWS Serverless Application Model (SAM) template to deploy a Lambda function.

The following is a good video demonstration of using Kinesis Firehose by Arpan Solanki. The example project focuses on the out of the box functionality of Kinesis Firehose and will make this tutorial easier to understand.

AWS Kinesis Firehose demo by Arpan Solanki

Tasks Performed Here

In this tutorial, you add more complexity to the more straightforward demonstrations on using Kinesis Firehose. Rather than creating the Lambda function while creating the Kinesis Stream, you create a more realistic Lambda function using Pycharm. Moreover, you deploy that function using as an AWS Serverless Application. We will perform the following tasks in this tutorial.

  • Create and test a Kinesis Firehose stream.
  • Create a Lambda function that applies a transformation to the stream data.
  • Deploy the Lambda function using a Serverless Application Model (SAM) template.
  • Modify the Kinesis Firehose stream to use the Lambda data transformer.
  • Test the Kinesis Firehose stream.
  • Trace and fix an error in the Lambda function.
  • Redeploy the Lambda function.
  • Test the Kinesis Firehose stream

Sample Project Architecture

Assume we have many locations that record the ambient temperature. We need to aggregate this data from the many different locations in almost real-time. We decide to use AWS Kinesis Firehose to stream data to an S3 bucket for further back-end processing.

Data is recorded as either fahrenheit or celsius depending upon the location sending the data. But the back-end needs the data standardized as kelvin. To transform data in a Kinesis Firehose stream, we use a Lambda transform function. The following illustrates the application’s architecture.

Image 1

Tutorial application architecture

Prerequisites

This tutorial expects you to have an AWS developer account and knowledge of the AWS console. You should have PyCharm with the AWS Toolkit installed. You should also have the AWS CLI installed.

You should also have a rudimentary knowledge of S3, CloudFormation and SAM templates, Lambda functions, and of course, Python. The following links should help if you are missing prerequisites.

Kinesis Firehose

AWS Kinesis Firehose is a fully managed service.

Create Stream

  • Log in to the AWS Console and select Services and then Kinesis.
  • Click Get Started if it is your first time visiting Kinesis.
  • Click Create delivery stream in the Firehose panel.

    Image 2

    Create delivery stream option on Amazon Kinesis dashboard (if no defined streams)
  • Name the delivery stream temperatureStream.
  • Accept the default values for the remaining settings.
  • Click Next.

    Image 3

    Create delivery stream – first step

    A data producer is any application that sends data records to Kinesis Firehose. By selecting Direct PUT or other sources, you are allowing producers to write records directly to the stream.

  • Accept the default setting of Disabled for Transform source records with AWS Lambda and Convert record format.
  • Click Next.

    Image 4

    Create delivery stream – second step

    The Transform source records with AWS Lambda allows you to define a Lambda function. Later in this tutorial, you will change this setting and define a Lambda function. For now, leave it disabled.

  • Select Amazon S3 as the Destination.
  • Under the S3 destination, click Create new.
  • Name the S3 bucket with a reasonable name (remember all names must be globally unique in S3). Here, I use the name temperaturebucket123 as the bucket name and select the appropriate Region.

    Image 5

    Create S3 bucket for stream
  • Click Next.
  • Accept the defaults and scroll to the Permissions section.
  • Click Create new or choose to associate an IAM role to the stream.

    Image 6

    Create new or choose IAM role for stream
  • Create a role named temperature_stream_role (we return to this role in a moment) by accepting the defaults.
  • Click Allow.
  • Click Next after returned to the stream creation.

    Image 7

    Create Role
  • Review the delivery stream and click Create delivery stream to create the stream.

    Image 8

    Select newly created role by clicking temperature_stream_role
  • You should be taken to the list of streams and the Status of temperatureStream should be …Creating.

    Image 9

    Delivery stream console after created.
  • After the stream’s status is Active, click on temperatureStream to be taken to the stream’s configuration page.
  • Click on the IAM role to return to the role settings in IAM.
  • Now, we are being very lazy…you would not do this in production, but delete the attached policy and attach the AWSLambdaFullAccess, AmazonS3FullAccess, and AmazonKinesisFirehoseFullAccess roles.
    Here we are granting the role too much access. In reality, you should grant the minimal access needed in a production setting.

    Image 10

    For simplicity (not for production use), delete policy and add the following three policies to role

Test Stream

For a simple stream such as what you just developed, AWS provides an easy means of testing your data. Let’s test your data before continuing development.

  • If not on the stream configuration screen, select the stream on the Kinesis dashboard to navigate to the stream’s configuration screen.
  • Expand the Test with demo data section.
  • Click the Start sending demo data button.
  • Wait about a minute and click the Stop sending demo data button.

    Image 11

    Test data option on stream summary on AWS console
  • From the Amazon S3 destination section, click on the bucket name to navigate to the S3 bucket. Be certain to wait five minutes to give the data time to stream to the S3 bucket.

    If you tire of waiting five minutes, return to the stream’s configuration and change the buffer time to a smaller interval than 300 seconds.

    Image 12

    The Buffer interval allows configuring the time frame for buffering data.

    Image 13

    S3 bucket link on stream summary on AWS console
  • Click on the sub-folders until taken to the data file. If you do not see the top level folder, then wait five minutes and refresh the page. Remember, the data is buffered.

    Image 14

    S3 Bucket top level folder after test data written
  • Open the file and you should see the test records written to the file.

    Image 15

    Test data written to S3 bucket by Kinesis Firehose
  • Navigate to the top level folder and delete the test data. Be certain you delete the top level folder and not the bucket itself.

    Image 16

    Delete test data by deleting top level folder
  • Open a command-line terminal on your computer and enter the following aws firehose put-record commands.
    > aws firehose put-record --delivery-stream-name temperatureStream --record='Data="99.55F"'
    > aws firehose put-record --delivery-stream-name temperatureStream --record='Data="33.22C"'
    > aws firehose put-record --delivery-stream-name temperatureStream --record='Data="57.99f"'

    You should see something similar to the following in your command-line terminal.

    Image 17

    AWS firehose put-record commands in command-line terminal

    For details on the put-record command, refer to the AWS reference page on the command (AWS CLI Command Reference: put-record).

  • Return to the AWS Console and navigate to the S3 bucket and note the data was written to the bucket. Remember to allow the records time to process by waiting five minutes.

    Image 18

  • Rather than sending a simple string, modify the commands to send Json. Note that you escape the double-quotes.
    > aws firehose put-record --delivery-stream-name temperatureStream 
                              --record='Data="{\"station\":\"A1\",\"temp\":\"57.99f\"}"'
  • Return to the AWS Console and you should see a file in the S3 bucket with data formatted as follows. Do not forget to give the record time to stream before checking the S3 bucket.
    Python
    {"station":"A1","temp":"57.99f"}{"station":"A1","temp":"57.99f"}

In the sample architecture, note that you need to convert the temperature data to kelvin. To accomplish this transformation, you create a Lambda transform function for the Kinesis Firehose stream.

Lambda Function

Recall when creating the stream, you were provided the option of transforming the data.

Image 19

Transform source records option

Although you left this feature disabled, the requirements dictate that you need to modify temperature readings from fahrenheit or celsius to kelvin. Kinesis firehose provides an easy way to transform data using a Lambda function. If you referred to any of the linked tutorials above, then you know that you can create and edit the Lambda function directly in the AWS console.

Here, you develop the Lambda function in a local development environment, debug the function, and then deploy the function to AWS. Here, you develop a Python Lambda function locally and deploy it to AWS using a CloudFormation SAM template.

PyCharm

Hopefully, you have installed PyCharm and the AWS Toolkit. If not, do so now. Refer to the prerequisites above for information on installing both.

  • Start PyCharm.
  • Create a new AWS Serverless Application named kelvinTempConversion.

    Image 20

    Creating a new AWS SAM Project
  • Click No if the following Create Project popup appears.

    Image 21

    Select No to this dialog to create a project with new resources
  • Open the template.yaml folder and notice the generated SAM template.
  • Modify the timeout from 3 to 60 seconds (Kinesis Firehose requires a 60 second timeout).

    Image 22

    SAM template generated by PyCharm
  • Right click the hello_world folder and select Refactor | Rename to rename the folder to kelvinConversion.
  • After reviewing the changes to be made, click the Do Refactor button.

    Image 23

    Refactoring Hello World to kelvinConversion
  • Change all instances of HelloWorld with KelvinConversion in template.yaml.
  • Modify the function timeout (Globals:Function:Timeout:) to 60 seconds, the minimum for Kinesis Firehose.
  • Remove the Events section and the KelvinConversionApi section. These two sections are for building a public rest API. As we are developing a transformation function for our stream, neither is needed.
  • After modifying all instances of the hello world text, template.yaml should appear similar to the following:
    Python
    AWSTemplateFormatVersion: '2010-09-09'
    Transform: AWS::Serverless-2016-10-31
    Description: >
      AWS
    
      Sample SAM Template for AWS
    
    Globals:
      Function:
        Timeout: 60
    
    Resources:
      KelvinConversionFunction:
        Type: AWS::Serverless::Function
        Properties:
          CodeUri: kelvinConversion/
          Handler: app.lambda_handler
          Runtime: python3.8
    
    Outputs:
      KelvinConversionFunction:
        Description: "Kelvin Conversion Lambda Function ARN"
        Value: !GetAtt KelvinConversionFunction.Arn
      KelvinConversionFunctionIamRole:
        Description: "Implicit IAM Role created for Kelvin Conversion function"
        Value: !GetAtt KelvinConversionFunctionRole.Arn
  • From the upper right drop down, select Edit Configurations.
  • Modify the template to reflect the new folder.
  • Click Ok.

    Image 24

    Runtime configuration
  • Select the dropdown item and click the green arrow to run the application.
    Python
    /usr/local/bin/sam local invoke 
    --template /Users/jamesabrannan/PycharmProjects/
    kelvinTempConversion/.aws-sam/build/template.yaml 
    --event "/private/var/folders/xr/j9kyhs2n3gqcc0n1mct4g3lr0000gp/T/
    [Local] KelvinConversionFunction-event.json" KelvinConversionFunction
    Invoking app.lambda_handler (python3.8)
    
    Fetching lambci/lambda:python3.8 Docker container image......
    Mounting /Users/jamesabrannan/PycharmProjects/kelvinTempConversion/
    .aws-sam/build/KelvinConversionFunction as /var/task:ro,delegated inside runtime container
    START RequestId: 1ffa20fa-486e-1827-e987-e92f16101778 Version: $LATEST
    END RequestId: 1ffa20fa-486e-1827-e987-e92f16101778
    REPORT RequestId: 1ffa20fa-486e-1827-e987-e92f16101778	
    Init Duration: 531.94 ms	Duration: 14.75 ms	
    Billed Duration: 100 ms	Memory Size: 128 MB	Max Memory Used: 24 MB	
    
    {"statusCode":200,"body":"{\"message\": \"hello world\"}"}
  • Now that you are assured the project is configured correctly and executes locally, open app.py and replace the sample code with the following. Note that the line using the index string function is in error. This error is by design and you will fix it later in the tutorial.
    Python
    import base64
    import json
    from decimal import Decimal
    
    def lambda_handler(event, context):
        output = []
    
        for record in event['records'] :
            print(record['recordId'])
            payload = base64.b64decode(record['data']).decode('utf-8')
            print(payload)
            reading = json.loads(payload)
            print(reading)
            temp = reading['temp']
            print(temp)
            # note: this is in error, if celsius this causes error
            # this is fixed later in tutorial
            isfarenheit = bool(temp.upper().index('F') > 0)
            kelvin = 0
    
            if isfarenheit:
                print(float(temp.upper().strip('F')))
                kelvin = (float(temp.upper().strip('F')) + 459.67) * 5.0 / 9.0
            else:
                kelvin = float(temp.upper().strip('C')) + 273.15
    
            print("{:.2f}".format(kelvin))
    
            reading['temp'] = str("{:.2f}".format(kelvin))
    
            print(reading)
    
            output_record = {
                'recordId': record['recordId'],
                'result': 'Ok',
                'data': base64.b64encode(json.dumps(reading).encode('UTF-8'))
            }
            output.append(output_record)
    
        print('Processed {} records.'.format(len(event['records'])))
    
        return {'records': output}

Local Testing

To test the record, you need to use an event template. There are event types you can choose, depending upon how the Lambda function is to be used.

  • From Event Templates, select Kinesis Firehose.

    Image 25

    Select Kinesis Firehose template to generate test data.
  • Create the sample record {“station”:”A1″,”temp”:”99.33F”} and base64 encode the record. A good site to encode and decode is the base64encode.org website.

    Image 26

    Encoding a simple Json record to Base64
  • Replace the data string generated when you selected the Kinesis Firehose Event Template and replace it with the base64 encoded string.

    Image 27

    Modify data value with the newly encoded value
  • Run the application locally and you should see the returned record.

    Image 28

    Console output from running application locally.
  • Copy the data string and decode the record from base64.

    Image 29

    Decode result from Base64 to string
  • Validate the converted kelvin measurement is correct.
    Note, you only tested fahrenheit. This is by design to illustrate debugging in the AWS Console. You fix this error later in this tutorial.

    Image 30

Deploying Serverless Application

  • Right click on template.yaml and select Deploy Serverless Application from the popup menu.

    Image 31

    Right click on template.yaml and select Deploy Serverless Application
  • Select Create Stack and name the stack kelvinTemperatureConversionStack.
  • Select or create an S3 Bucket.
  • Click Deploy.

    Image 32

  • If you receive a credentials error, then you need to configure the AWS Toolkit correctly.
  • At the extreme lower right of the window, click the message telling you the issue.

    Image 33

    Error if AWS Toolkit credentials are not configured correctly

    Image 34

    Profile settings configured for AWS Toolkit
  • After fixing credentials (if applicable), then try again. A dialog window should appear informing you of the deployment progress.
  • Notice that the window is using CLI Sam commands to deploy the function to AWS.

    Image 35

    Deploying application output

    Image 36

    Deploying application output

Verifying

After deploying, you should verify the function was deployed correctly.

Lambda Function

  • Navigate to the AWS Lambda service and you should see the newly created Lambda function.

    Image 37

    Created Lambda function in AWS console

S3 Bucket

  • Navigate to the S3 buckets and you should see the newly created bucket used for deploying the Lambda function.

    Image 38

    Created S3 bucket for deploying serverless application

AWS CloudFormation

  • Navigate to CloudFormation and you should see the created stack.

    Image 39

    Cloudformation summary in AWS console
  • Return to the Kinesis Firehose stream to add the Lambda function to the stream.

Modifying Firehose Stream

  • Navigate to the temperatureStream configuration page.
  • Click Edit.
  • Enable source record transformation in the Transform source records with AWS Lambda section.

    Image 40

  • Select the Lambda function created and deployed by PyCharm.
  • Click Save.

Testing Using CLI

  • Open a command-line window and send several records to the stream. Be certain to escape the double-quotes, with the exception of the double quotes surrounding the data record.
    Python
    > aws firehose put-record --delivery-stream-name temperatureStream 
                                 --record='Data="{\"station\":\"A1\",\"temp\":\"57.99f\"}"'
    > aws firehose put-record --delivery-stream-name temperatureStream 
                                 --record='Data="{\"station\":\"A1\",\"temp\":\"89.90F\"}"'
    > aws firehose put-record --delivery-stream-name temperatureStream 
                                 --record='Data="{\"station\":\"A1\",\"temp\":\"22.20C\"}"'
    > aws firehose put-record --delivery-stream-name temperatureStream 
                                 --record='Data="{\"station\":\"A1\",\"temp\":\"12.76C\"}"'
  • After waiting five minutes, navigate to the S3 bucket and you should see a new folder entitled processing-failed.

    Image 41

    Processing-failed folder when Kinesis Firehose fails
  • Navigate down the processing-failed folder hierarchy and open the failure records.

    Image 42

    Errors written to S3 Bucket
  • The error messages are not very informative. But at least they tell you the Lambda function processing caused the error.
  • Navigate to the stream and select Amazon S3 Logs.
  • The log message is also not very informative.

    Image 43

  • Navigate to the Lambda function details.

    Image 44

  • Select the LogStream from the most recent invocation of the Lambda function.
  • The detailed log records the exact cause of the error, the index function. Unlike some languages such as Java, the Python index function returns an error if the string is not found.

    Image 45

    Image 46

Fixing Error

  • Return to the PyCharm project to fix the error and redeploy the Lambda function to AWS.
    You might notice that you can edit a function directly in the AWS Console. DO NOT EDIT! Remember, you deployed this application using SAM in CloudFormation. The correct process is to fix the function and then redeploy it using SAM.

    Image 47

    Python implementation in the AWS Console

    Image 48

    Data replaced with celsius value after encoding
  • Modify the function to use find rather than the index function.
    Python
    isfarenheit = bool(temp.upper().find('F') > 0)

    Image 49

    Lambda function results in error due to the index function
  • Run the application locally using a celsius value. As before, encode and decode and test the converted value.

    Image 50

    Lambda function successfully ran with celsius data
  • After testing, right click on template.yaml and redeploy the serverless application.
  • Accept the Update Stack defaults.

    Image 51

    Update Stack option in Deploy Serverless Application
  • After clicking Deploy, a popup window informs you of the deployment progress.

    Image 52

    Redeploying SAM application to AWS
  • Navigate to the Lambda function details in the AWS Console and you should see the corrected source code.

    Image 53

    Transformation function reflects changes made in PyCharm
  • From your command-line, send several records to the stream.
    Python
    > aws firehose put-record --delivery-stream-name temperatureStream 
                                 --record='Data="{\"station\":\"A1\",\"temp\":\"12.76C\"}"'
    > aws firehose put-record --delivery-stream-name temperatureStream 
                                 --record='Data="{\"station\":\"A1\",\"temp\":\"57.99f\"}"'
    > aws firehose put-record --delivery-stream-name temperatureStream 
                                 --record='Data="{\"station\":\"A1\",\"temp\":\"89.90F\"}"'
    > aws firehose put-record --delivery-stream-name temperatureStream 
                                 --record='Data="{\"station\":\"A1\",\"temp\":\"22.20C\"}"'
  • Navigate to the S3 bucket and you should see the transformed records.

    Image 54

    Data streamed to S3 bucket

Summary

In this tutorial, you created a Kinesis FIrehose stream and created a Lambda transformation function. You configured the stream manually and used SAM to deploy the Lambda function. An obvious next step would be to add the creation of the Kinesis Firehose and associated bucket to the Cloudformation template in your PysCharm project. This tutorial was sparse on explanation, so refer to the many linked resources to understand the technologies demonstrated here better. However, this tutorial was intended to provide a variation on the numerous more straightforward Kinesis Firehose tutorials available.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

James A. Brannan
Software Developer (Senior) Brannan Technical Solutions LLC
United States United States
I have worked in IT for over twenty years and truly enjoy development. Architecture and writing is fun as is instructing others. My primary interests are Amazon Web Services, JEE/Spring Stack, SOA, and writing. I have a Masters of Science in Computer Science from Hood College in Frederick, Maryland.

Comments and Discussions

 
-- There are no messages in this forum --