Giter Club home page Giter Club logo

streaming_finance_data_with_aws_lambda's Introduction

Streaming Finance Data with AWS Lambda

Goal:

  • Go through the process of consuming “real time” data, processing the data and then dumping it in a manner that facilitates querying and further analysis, either in real time or near real time capacity.

Infrastructure:

This project consists of three major infrastructure elements that work in tandem:

  • A lambda function that collects our data (DataCollector)
  • A lambda function that transforms and places data into S3 (DataTransformer)
  • A serverless process that allows us to query our s3 data (DataAnalyzer)

Step 1: Data Transformer

  • Create a Kinesis Firehose Delivery Stream which should have a lambda function that transforms the record and streams it into an S3 bucket.
  • Outcome: finance_data folder

Lambda Source Code:

import json

def lambda_handler(event, context):
    output_records = []
    for record in event["records"]:
        output_records.append({
            "recordId": record['recordId'],
            "result": "Ok",
            "data": record["data"] + "Cg==" # this is the key here
        })
        
    return { "records": output_records }

Kinesis Data Firehose Delivery Stream Monitoring:

Screen Shot 2020-05-25 at 1 13 54 PM

S3 Files:

Screen Shot 2020-05-25 at 1 15 18 PM

Step 2: Data Collector

  • Write another Lambda function that is triggered from a simple URL call. On trigger, it will grab stock price data and place it into the delivery defined in the DataTransformer.
  • Use the yfinance module to grab pricing information for each of the following stocks:'FB', 'SHOP', 'BYND', 'NFLX', 'PINS', 'SQ', 'TTD', 'OKTA', 'SNAP', 'DDOG'. Collect one full day’s worth of stock HIGH and LOW prices for each company listed above on Thursday, May 14th 2020, at an one minute interval. Note that by “full day” we mean one day of stock trading, which is not 24 hours.

Lambda Source Code:

import json
import boto3
import os
import subprocess
import sys

subprocess.check_call([sys.executable, "-m", "pip", "install", "--target", "/tmp", 'yfinance'])
sys.path.append('/tmp')
import yfinance as yf

tickers = ['FB', 'SHOP', 'BYND', 'NFLX', 'PINS', 'SQ', 'TTD', 'OKTA', 'SNAP', 'DDOG']
def lambda_handler(event, context):
    fh = boto3.client("firehose", "us-east-2")
    for ticker in tickers:
        data = yf.download(ticker, start="2020-05-14", end="2020-05-15", interval = "1m")
        for datetime, row in data.iterrows():
            output = {'name': ticker}
            output['high'] = row['High']
            output['low'] = row['Low']
            output['ts'] = str(datetime)
            as_jsonstr = json.dumps(output)
            fh.put_record(
                DeliveryStreamName="finance-delivery-stream", 
                Record={"Data": as_jsonstr.encode('utf-8')})
    return {
        'statusCode': 200,
        'body': json.dumps(f'Done! Recorded: {as_jsonstr}')
    }

AWS Lambda Function URL:

AWS Lambda Configuration Page:

Screen Shot 2020-05-25 at 1 02 46 PM

Step 3: Data Analyzer

  • Configure AWS Glue, pointing it to the S3 Bucket created in DataTransformer. This will allow us to now interactively query the S3 files generated by the DataTransformer using AWS Athena to gain insight into our streamed data.
  • Outcome: results.csv file

SQL Query:

SELECT * 
FROM 
(SELECT T1.Company, T2.High_Stock_Price, T1.DateTime, T1.Hour
 FROM 
 (SELECT name AS Company, high, ts AS DateTime, SUBSTRING(ts, 12, 2) AS Hour FROM finance_stream_data) T1
INNER JOIN 
 (SELECT name, SUBSTRING(ts, 12, 2) AS hour, MAX(high) AS High_Stock_Price 
  FROM finance_stream_data
  GROUP BY name, SUBSTRING(ts, 12, 2)) T2
ON T1.Company = T2.name AND T1.high = T2.High_Stock_Price AND T1.Hour = T2.hour)
ORDER BY Company, Hour, DateTime

Extra Credit: Data Visualizations

  • Output: Analysis.ipynb

streaming_finance_data_with_aws_lambda's People

Contributors

jyannnnn avatar

Watchers

James Cloos avatar  avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.