louis-clotman / test Goto Github PK
View Code? Open in Web Editor NEWtest
test
#########################################################################################################
# Please add layer for wrangler. #
# Current Implementation uses arn:aws:lambda:us-east-1:336392948345:layer:AWSSDKPandas-Python39:1 #
# as layer #
# This lambda will take an event as an input which provides the bucket name and db name to which lambda #
# is going ti write to. This will execute on a daily schedule and look for files to be expired. #
#########################################################################################################
import boto3
import datetime
import pytz
import pandas as pd
import awswrangler as wr
import logging
import json
pd.options.mode.chained_assignment = None
"""
Logger creation with INFO as the setlevel
"""
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def get_s3_tags(bkt):
"""
This Fucntion get tags listed on given s3 bucket.
"""
try:
cl=boto3.client("s3")
response = cl.get_bucket_tagging(
Bucket=bkt
)
return response.get("TagSet")
except Exception as e:
logger.error("Received exception while getting the tags from S3 : {}".format(e))
raise e
def list_s3_files_using_paginator(bkt,now):
"""
This functions list all files in s3 using paginator.
"""
try:
s3_client = boto3.client("s3")
bucket_name = bkt
paginator = s3_client.get_paginator("list_objects_v2")
iterator = paginator.paginate(Bucket=bucket_name, PaginationConfig={"PageSize": 500})
date_time = now.strftime("%Y%m%d%H%M%S")
all_files=[]
for page in iterator:
print("getting files from S3")
files = page.get("Contents")
for file in files:
file["DaysPassed"]=(now - file["LastModified"]).days
file["datetime"]=date_time
all_files.append(file)
print("#" * 40)
return pd.DataFrame(all_files)
except Exception as e:
logger.error("Received exception while reading list of files from s3 : {}".format(e))
raise e
def write_df_to_s3(df,bkt,key,db,tb):
"""
This functions writes pandas df to glue catalog.
"""
try:
# boto3.setup_default_session(
# region_name="us-east-1",
# aws_access_key_id=credentials.get("accessKeyId"),
# aws_secret_access_key=credentials.get("secretAccessKey"),
# aws_session_token=credentials.get("sessionToken")
# )
wr.s3.to_parquet(
df = df,
path = "s3://{bucket}/{bktKey}/".format(bucket=bkt,bktKey=key),
dataset = True,
partition_cols=['datetime'],
database = db,
table = tb
)
except Exception as e:
logger.error("Received exception while writing to glue catalog : {}".format(e))
raise e
def sendNote(msg,sub,arn,bkt):
"""
This function sends notification to the distribution list about the expiry of the files.
"""
try:
cl = boto3.client("sns")
#cl = client("sns")
dict_ = {}
dict_["bucket"] = {"DataType":"String","StringValue":bkt}
response = cl.publish(
TopicArn=arn,
Message=msg,
Subject=sub,
MessageStructure='text',
MessageAttributes=dict_
)
return response
except Exception as e:
logger.error("Received exception while sending notification : {}".format(e))
raise e
def lambda_handler(event, context):
"""
Entry point method. This takes input from event from Payload and executes the STEP FUNCTION.
"""
try:
if event.get("requestContext") is not None:
_input = json.loads(event.get("body"))
else:
_input = event
logger.info(
"====================================================== Below is the Event Details ====================================================== "
)
logger.info("Input from the Payload:{}".format(_input))
innav_acct_bkt = _input.get("innv_bkt_name")
logger.info("Getting Tags from the bucket:{}".format(innav_acct_bkt))
lTag = get_s3_tags(innav_acct_bkt)
logger.info(
"Lambda is going to run a check for all the files in the bucket:{} and return a pandas dataframe".format(innav_acct_bkt)
)
now=datetime.datetime.utcnow().replace(tzinfo=pytz.utc)
df = list_s3_files_using_paginator(innav_acct_bkt,now)
date_time = now.strftime("%Y%m%d%H%M%S")
logger.info(
"Indentifying the files which are going to cross the retention period.".format(innav_acct_bkt)
)
dr = int([x.get("Value") for x in lTag if x.get("Key") == "data-retention"][0])
warn_df = df[((dr - df.DaysPassed ) <= 7) & ((dr - df.DaysPassed) > 0) ]
del_df = df[((dr - df.DaysPassed ) <= 0) ]
db_bkt = _input.get("db_bkt")
db = _input.get("db")
bkt_name_mod=innav_acct_bkt.replace("-","_")
logger.info(
"Number of files which has 7 or less days left to expire: {}".format(len(warn_df))
)
if len(warn_df) > 0 :
db_key = db + "/" + bkt_name_mod + "_dtls_future_deletetion_tb"
tb = bkt_name_mod + "_dtls_future_deletion_tb"
write_df_to_s3(warn_df,db_bkt,db_key,db,tb)
msg = "Please Query select * from {}.{} where datetime='{}' to find the files which are up for deletion in next 7 days or less".format(db,tb,date_time)
subject = "Please review your Innovation Platform Landing Zone Files"
arn = _input.get("arn")
logger.info("Sending notification to the Distribution list provided by the owner of the bucket:{}".format(innav_acct_bkt))
sendNote(msg,subject,arn,innav_acct_bkt)
#print("Files that are going to be deleted in next 7 days can be found in select * from {}.{}".format(db,tb))
logger.info("Files that are going to be deleted in next 7 days can be found in select * from {}.{} where datetime='{}'".format(db,tb,date_time))
else:
print("No files which is crossing the data retention in next 7 days")
logger.info("No files which is crossing the data retention in next 7 days")
return {"response": "Custodian Lambda executed successfully for bucket {}".format(innav_acct_bkt)}
except Exception as e:
logger.error(
"Received exception while executing the lambda handler : {}".format(e)
)
raise e
test
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.