Giter Club home page Giter Club logo

test's People

Watchers

 avatar

test's Issues

data-re

    #########################################################################################################
    # Please add layer for wrangler.                                                                        #
    # Current Implementation uses arn:aws:lambda:us-east-1:336392948345:layer:AWSSDKPandas-Python39:1       #
    # as layer                                                                                              #
    # This lambda will take an event as an input which provides the bucket name and db name to which lambda #
    # is going ti write to. This will execute on a daily schedule and look for files to be expired.         #
    #########################################################################################################
    
    
    import boto3
    import datetime
    import pytz
    import pandas as pd
    import awswrangler as wr
    import logging
    import json
    pd.options.mode.chained_assignment = None
    """
    Logger creation with INFO as the setlevel
    """
    
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    
    def get_s3_tags(bkt):
        """
        This Fucntion get tags listed on given s3 bucket.
        """
        try:
            cl=boto3.client("s3")
            response = cl.get_bucket_tagging(
                Bucket=bkt
            )
            return response.get("TagSet")
        except Exception as e:
            logger.error("Received exception while getting the tags from S3 : {}".format(e))
            raise e
        
    def list_s3_files_using_paginator(bkt,now):
        """
        This functions list all files in s3 using paginator.
        """
        try:
            s3_client = boto3.client("s3")
            bucket_name = bkt
            paginator = s3_client.get_paginator("list_objects_v2")
            iterator = paginator.paginate(Bucket=bucket_name, PaginationConfig={"PageSize": 500})
            date_time = now.strftime("%Y%m%d%H%M%S")
            all_files=[]
            for page in iterator:
                print("getting files from S3")
                files = page.get("Contents")
                for file in files:
                    file["DaysPassed"]=(now - file["LastModified"]).days
                    file["datetime"]=date_time
                    all_files.append(file)
                print("#" * 40)
            return pd.DataFrame(all_files)
        except Exception as e:
            logger.error("Received exception while reading list of files from s3 : {}".format(e))
            raise e
        
    def write_df_to_s3(df,bkt,key,db,tb):
        """
        This functions writes pandas df to glue catalog.
        """
        try:
            # boto3.setup_default_session(            
            #         region_name="us-east-1",
            #         aws_access_key_id=credentials.get("accessKeyId"), 
            #         aws_secret_access_key=credentials.get("secretAccessKey"), 
            #         aws_session_token=credentials.get("sessionToken")
            # )
            wr.s3.to_parquet(
                df = df,
                path = "s3://{bucket}/{bktKey}/".format(bucket=bkt,bktKey=key),
                dataset = True,
                partition_cols=['datetime'],
                database = db,
                table = tb
            )
        except Exception as e:
            logger.error("Received exception while writing to glue catalog : {}".format(e))
            raise e  
    
    def sendNote(msg,sub,arn,bkt):
        """
        This function sends notification to the distribution list about the expiry of the files.
        """
        try:
            cl = boto3.client("sns")
            #cl = client("sns")
            dict_ = {}
            dict_["bucket"] = {"DataType":"String","StringValue":bkt}
            response = cl.publish(
                    TopicArn=arn,
                    Message=msg,
                    Subject=sub,
                    MessageStructure='text',
                    MessageAttributes=dict_
                )
            return response
        except Exception as e:
            logger.error("Received exception while sending notification : {}".format(e))
            raise e  
        
    def lambda_handler(event, context):
        """
        Entry point method. This takes input from event from Payload and executes the STEP FUNCTION.
        """
        try:
            if event.get("requestContext") is not None:
                _input = json.loads(event.get("body"))
            else:
                _input = event
            logger.info(
                "====================================================== Below is the Event Details  ====================================================== "
            )
            logger.info("Input from the Payload:{}".format(_input))
            
            innav_acct_bkt = _input.get("innv_bkt_name")
            logger.info("Getting Tags from the bucket:{}".format(innav_acct_bkt))
            lTag = get_s3_tags(innav_acct_bkt)
            
            logger.info(
                "Lambda is going to run a check for all the files in the bucket:{} and return a pandas dataframe".format(innav_acct_bkt)
            )
            
            now=datetime.datetime.utcnow().replace(tzinfo=pytz.utc)
            df = list_s3_files_using_paginator(innav_acct_bkt,now)
            date_time = now.strftime("%Y%m%d%H%M%S")
            
            logger.info(
                "Indentifying the files which are going to cross the retention period.".format(innav_acct_bkt)
            )
            
            dr = int([x.get("Value") for x in lTag if x.get("Key") == "data-retention"][0])
            warn_df = df[((dr - df.DaysPassed ) <= 7) & ((dr - df.DaysPassed) > 0) ]
            del_df = df[((dr - df.DaysPassed ) <= 0)  ]
            db_bkt = _input.get("db_bkt")
            db = _input.get("db")
            bkt_name_mod=innav_acct_bkt.replace("-","_")
            
            logger.info(
                "Number of files which has 7 or less days left to expire: {}".format(len(warn_df))
            )
            
            if len(warn_df) > 0 :
                db_key = db + "/" + bkt_name_mod + "_dtls_future_deletetion_tb"
                tb = bkt_name_mod + "_dtls_future_deletion_tb"
                write_df_to_s3(warn_df,db_bkt,db_key,db,tb)
                msg = "Please Query select * from {}.{} where datetime='{}' to find the files which are up for deletion in next 7 days or less".format(db,tb,date_time)
                subject = "Please review your Innovation Platform Landing Zone Files"
                arn = _input.get("arn")
                logger.info("Sending notification to the Distribution list provided by the owner of the bucket:{}".format(innav_acct_bkt))
                sendNote(msg,subject,arn,innav_acct_bkt)
                #print("Files that are going to be deleted in next 7 days can be found in select * from {}.{}".format(db,tb))
                logger.info("Files that are going to be deleted in next 7 days can be found in select * from {}.{} where datetime='{}'".format(db,tb,date_time))
            else:
                print("No files which is crossing the data retention in next 7 days")
                logger.info("No files which is crossing the data retention in next 7 days")
            
            return {"response": "Custodian Lambda executed successfully for bucket {}".format(innav_acct_bkt)}
        except Exception as e:
            logger.error(
                "Received exception while executing the lambda handler : {}".format(e)
            )
            raise e

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.