Giter Club home page Giter Club logo

Comments (8)

laughingman7743 avatar laughingman7743 commented on June 3, 2024 2

I just have released v3.1.0. 🎉
https://pypi.org/project/PyAthena/3.1.0/
https://github.com/laughingman7743/PyAthena/releases/tag/v3.1.0

from pyathena.

laughingman7743 avatar laughingman7743 commented on June 3, 2024 1

Dataframe would be using Spark Dataframe, not Pandas. I am not sure of the use case that would return values. You will probably be running code that writes data out to S3.

from pyathena.

Avinash-1394 avatar Avinash-1394 commented on June 3, 2024 1

That was to mainly check if the import cause any issues but think we can skip that feedback 👍🏽

from pyathena.

laughingman7743 avatar laughingman7743 commented on June 3, 2024

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/athena/client/start_calculation_execution.html
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/athena/client/stop_calculation_execution.html
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/athena/client/get_calculation_execution.html
If you pass a flag that uses Spark in some way, I think it would be possible to implement the query to be executed by Spark. Refactoring of the cursor class may be necessary, though.

from pyathena.

Avinash-1394 avatar Avinash-1394 commented on June 3, 2024

If you pass a flag that uses Spark in some way, I think it would be possible to implement the query to be executed by Spark

The way dbt have handled that is by treating the query as a generic compiled_code and using the file extension to redirect it to either the query engine or spark engine. I just started looking into this library but what would you say is the entrypoint to receive the query? Is it the Cursor class

Refactoring of the cursor class may be necessary, though.

Definitely. It seems like we need to add calls to the new spark api endpoints to the BaseCursor and create something similar to AthenaQueryExecution like AthenaSparkExecution. Don't think that will be enough but those seem like the first steps.

from pyathena.

laughingman7743 avatar laughingman7743 commented on June 3, 2024

I am trying to implement a cursor class that executes Spark calculations in the following branch.
#497

It looks like the PySpark code can be executed as follows.

import textwrap
from pyathena import connect

conn = connect(work_group="spark-primary", cursor_class=CalcCursor)
with conn.cursor() as cursor:
    cursor.execute(
        textwrap.dedent(
            """
            spark.sql("create database if not exists spark_demo_database")
            """
        )
    )

Since it would be difficult to add features to a regular cursor, I have implemented a different cursor class. If you have any ideas, please feel free to suggest them.

from pyathena.

Avinash-1394 avatar Avinash-1394 commented on June 3, 2024

@laughingman7743 Thank you so much that. I have reviewed the PR.

There are a couple of additional models you can test and check if they cause issues.

Pandas dataframe

import pandas as pd
return pd.DataFrame({"A": [1, 2, 3, 4]})

Spark dataframe

return spark.createDataFrame(data, ["A"])

Think you can also import pyspark and return a pyspark dataframe but I haven't tested that one out

from pyathena.

laughingman7743 avatar laughingman7743 commented on June 3, 2024

The code for the Athena Example notebook is as follows:

Spark Dataframes:

file_name = "s3://athena-examples-us-east-1/notebooks/yellow_tripdata_2016-01.parquet"

taxi_df = (spark.read.format("parquet")
     .option("header", "true")
     .option("inferSchema", "true")
     .load(file_name))

print("Read parquet file" + " complete")

taxi1_df=taxi_df.groupBy("VendorID", "passenger_count").count()
taxi1_df.show()

var1 = taxi1_df.collect()
%table var1

taxi1_df.coalesce(1).write.mode('overwrite').csv("s3://aws-athena-query-results-****-us-west-2-hl3rhzkk/select_taxi")
print("Write to s3 " + "complete")

Spark SQL:

spark.sql("create database if not exists spark_demo_database")
spark.sql("show databases").show()

spark.sql("use spark_demo_database")
taxi1_df.write.mode("overwrite").format("parquet").option("path","s3://aws-athena-query-results-****-us-west-2-hl3rhzkk/select_taxi").saveAsTable("select_taxi_table")
print("Create new table" + " complete")

spark.sql("show tables").show()

spark.sql("select * from select_taxi_table").show()

spark.sql("DROP TABLE if exists select_taxi_table")
spark.sql("DROP DATABASE if exists spark_demo_database")
print("Clean resources" + " complete")

from pyathena.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.