aws-samples / emr-serverless-samples Goto Github PK
View Code? Open in Web Editor NEWExample code for running Spark and Hive jobs on EMR Serverless.
Home Page: https://aws.amazon.com/emr/serverless/
License: MIT No Attribution
Example code for running Spark and Hive jobs on EMR Serverless.
Home Page: https://aws.amazon.com/emr/serverless/
License: MIT No Attribution
we have a job which uses mssql driver and currently I am supplying below config as part of "spark properties" but I am getting below error.
"--conf spark.archives=/artifacts/pyspark/pyspark_ge.tar.gz#environment --conf spark.submit.pyFiles=/package-1.0.0-py3.8.egg --jars /jar/spark-mssql-connector_2.12-1.1.0.jar --conf spark.emr-serverless.driverEnv.PYSPARK_DRIVER_PYTHON=./environment/bin/python --conf spark.emr-serverless.driverEnv.PYSPARK_PYTHON=./environment/bin/python --conf spark.executorEnv.PYSPARK_PYTHON=./environment/bin/python"
Error:
": java.lang.ClassNotFoundException: com.microsoft.sqlserver.jdbc.SQLServerDriver"
Hi,
Trying to execute the first two command lines listed in the readme file
docker build --output . .; aws s3 cp pyspark_ge.tar.gz s3://${S3_BUCKET}/artifacts/pyspark/
I noticed that the file pyspark_ge.tar.gz was not loaded locally. I had to run the container related to the image built with the previous command and next the Docker cp command.
I was wondering if it was just my problem. If not I volunteer to update the documentation with the two additional commands I had to run.
Thank you very much.
Cloudwatch Variables are now GA and have full CDK support.
It would be great to use a CFN Variable for Application ID where the Stack Parameter is optional and that just sets the default value.
This way a user can continue using the same stack + dashboard to monitor multiple EMR Serverless Applications.
How are you able to install the emr_serverless package on MWAA 2.2.2 using the official constraints file:
The emr_serverless operator depends on boto3>=1.23.9
While MWAA 2.2.2 has the following constraints:
boto3==1.18.65
boto==2.49.0
botocore==1.21.65
Hello,
We are using this Dockerfile to generate the virtualenv that we later provide to our Emr Serverless 7.1 Application to be used.
FROM --platform=linux/amd64 public.ecr.aws/amazonlinux/amazonlinux:2023-minimal AS base
RUN dnf install -y gcc python3 python3-devel
ENV VIRTUAL_ENV=/opt/venv
RUN python3 -m venv $VIRTUAL_ENV
ENV PATH="$VIRTUAL_ENV/bin:$PATH"
RUN python3 -m pip install --upgrade pip && \
python3 -m pip install \
venv-pack==0.2.0 \
pytz==2022.7.1 \
boto3==1.33.13 \
pandas==1.3.5 \
python-dateutil==2.8.2
RUN mkdir /output && venv-pack -o /output/pyspark_ge.tar.gz
FROM scratch AS export
COPY --from=base /output/pyspark_ge.tar.gz /
Within the Spark application we have a part which is calling ['aws', 's3', 'mv'] by calling check_call from subprocess module.
In that case it seems like the virtualenv is not used but the global python is used which is coming without dateutil (python 3.9)
Of course one could rewrite the application to call from the code logic with the current running binary but I also expected that I could provide an option to tell the emr serverless application "in general" to use my virtualenv and not just when running my pyspark application. Is it possible or is this behavior expected?
Trying to pass a jar dependency file with spark submit as mentioned in the examples for pyspark jar is not working
"sparkSubmitParameters": "--jars=s3://<data-integration-artifacts>/spark-credentials-provider/idealo-spark-credentials-provider-1.3.0.jar",
Also noticed that the jar doesn't end up on "spark.driver.extraClassPath" or "spark.executor.extraClassPath" even if it's downloaded from s3 into tmp folder
Files s3://<data-integration-artifacts>/spark-credentials-provider/idealo-spark-credentials-provider-1.3.0.jar from /tmp/spark-112c49ee-7811-43bf-82ee-587a2d188f19/idealo-spark-credentials-provider-1.3.0.jar to /home/hadoop/./idealo-spark-credentials-provider-1.3.0.jar
Tried the above with both EMR version 7.0.0 and 6.14.0
copying the jar to /usr/share/aws/emr/emrfs/auxlib/ with a Docker build worked.
Is this a known issue and is there any solution to fix this without using Docker ?
Could the pom.xml
file mentioned in the README for the PySpark dependencies example please be provided?
Hello, I am trying to increase the default parameter countdown
in the EmrServerlessStartJobOperator
but I cannot do that because the waiter
is not receiving it:
self.hook.waiter(
get_state_callable=self.hook.conn.get_job_run,
get_state_args={
"applicationId": self.application_id,
"jobRunId": response["jobRunId"],
},
parse_response=["jobRun", "state"],
desired_state=EmrServerlessJobSensor.SUCCESS_STATES,
failure_states=EmrServerlessJobSensor.FAILURE_STATES,
object_type="job",
action="run",
)
I am using the hive example as a template, but instead using json data. Using the setup below, I receive an error every time. Is there a different setup I should be using? Also, is there a good example for hive / EMR Serverless using JSON data that should work?
Having an additional hive example based on JSON in this repository would be helpful since the existing one uses CSV formatted data
Given an S3 bucket that contains files with a format such as
{"Id":"123","Name":"my-name","Type":"some-type"}
and the initialization script here
and the query script here
When I run a job in EMR serverless using these inputs
Then I receive an error message stating
Job failed, please check complete logs in configured logging destination. ExitCode: 2. Last few exceptions: Caused by: java.lang.ClassNotFoundException: Class org.apache.hive.hcatalog.data.JsonSerDe not found Caused by: java.lang.RuntimeException: Map operator initialization failed ], TaskAttempt 2 failed, info=[Error: Error while running task ( failure ) : attempt_1665435121822_0001_1_00_000000_2:java.lang.RuntimeException: java.lang.RuntimeException: Map operator initialization failed Caused by: java.lang.ClassNotFoundException: Class org.apache.hive.hcatalog.data.JsonSerDe not found Caused by: java.lang.RuntimeException: Map operator initialization failed...
AWS has announced AWS EMR CLI
I have tried and CLi works great simplifies submitting jobs
However, could you tell us how to enable the Glue Hive meta store when submitting a job via CLI or in Boto3
i have looked at documentation i don't see an argument for supplying use Glue CatLog option on boto3
Here is a sample of how we are submitting jobs qith EMR-CLI
emr run --entry-point entrypoint.py
--application-id --job-role <arn>
--s3-code-uri s3:///emr_scripts/ --spark-submit-opts "--conf spark.jars=/usr/lib/hudi/hudi-spark-bundle.jar --conf spark.serializer=org.apache.spark.serializer.KryoSerializer"
--build `
--wait
Created A Github Issue awslabs/amazon-emr-cli#18
If you can kindly get back to us on issue that would be great 😃
Hi,
I have run the example from
https://github.com/aws-samples/emr-serverless-samples/tree/main/examples/pyspark/dependencies
aws emr-serverless start-job-run \ --application-id $APPLICATION_ID \ --execution-role-arn $JOB_ROLE_ARN \ --job-driver '{ "sparkSubmit": { "entryPoint": "s3://'${S3_BUCKET}'/code/pyspark/ge_profile.py", "entryPointArguments": ["s3://'${S3_BUCKET}'/tmp/ge-profile"], "sparkSubmitParameters": "--conf spark.archives=s3://'${S3_BUCKET}'/artifacts/pyspark/pyspark_ge.tar.gz#environment --conf spark.emr-serverless.driverEnv.PYSPARK_DRIVER_PYTHON=./environment/bin/python --conf spark.emr-serverless.driverEnv.PYSPARK_PYTHON=./environment/bin/python --conf spark.emr-serverless.executorEnv.PYSPARK_PYTHON=./environment/bin/python" } }' \ --configuration-overrides '{ "monitoringConfiguration": { "s3MonitoringConfiguration": { "logUri": "s3://'${S3_BUCKET}'/logs/" } } }'
And keep getting
Traceback (most recent call last):
File "/tmp/spark-adc7d82f-ca5d-41ca-8d69-a89f144589a0/ge_profile.py", line 4, in
import great_expectations as ge
ModuleNotFoundError: No module named 'great_expectations'
I can confirm that the module is included in pyspark_ge.tar.gz
Thanks for the help
Eric
I want to exec hsql file with parameters.
local shell script add "-hivevar date_param=string:$date_param" ,example like this:
"""
#!/bin/bash
date_param=date "+%Y%m%d"
aws emr-serverless start-job-run
--application-id XXXXX
--execution-role-arn arn:aws:iam::xxxxx:role/xxxxxxxx
--job-driver '{
"hive": {
"initQueryFile": "s3://mm-emr/emr-serverless-hive/sql/createTable.sql",
"query": "s3://mm-emr/emr-serverless-hive/sql/insert3.sql",
"parameters": "--hiveconf hive.exec.scratchdir=s3://mm-emr/emr-serverless-hive/hive/scratch --hiveconf hive.metastore.warehouse.dir=s3://aiways-emr/emr-serverless-hive/hive/warehouse
-hivevar date_param=string:$date_param"
}
}'
"""
and i load the sql files to s3 path and run the app.
Get a Error :An error occurred (ValidationException) when calling the StartJobRun operation: Flag '-hivevar' is not supported
how should i do ?
I just noticed that the file 'setup.py' in the 'airflow' directory has still version 0.0.2 (https://github.com/aws-samples/emr-serverless-samples/blob/v0.0.4-preview/airflow/setup.py#L5), while the current version should be 0.0.4.
You may also want to look into updating the version numbers for the dependencies 'boto3' and 'botocore'.
Franco
EmrServerlessStartJobOperator
doesn't currently support providing a job name or tags.
Hi all,
I'm trying to use the latest release of the serverless plugin on MWAA with Airflow version 2.2.2: https://github.com/aws-samples/emr-serverless-samples/releases/tag/v1.0.1
The install is in conflict with the airflow v2.2.2 constraints file found here: https://raw.githubusercontent.com/apache/airflow/constraints-2.2.2/constraints-3.7.txt
Requirements.txt contents:
--constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.2.2/constraints-3.7.txt"
emr_serverless @ https://github.com/aws-samples/emr-serverless-samples/releases/download/v1.0.1/mwaa_plugin.zip
Run:
pip3 install -r requirements.txt
Output:
The conflict is caused by:
emr-serverless 1.0.1 depends on boto3>=1.23.9 and ~=1.23
The user requested (constraint) boto3==1.18.65
It's me again.
I noticed that the in the Airflow operators for EMR Serverless you have this line (https://github.com/aws-samples/emr-serverless-samples/blob/main/airflow/emr_serverless/operators/emr.py#L239):
template_fields: Sequence[str] = "application_id"
The problem with it is that Python converts the string 'application_id' into the list: ['a', 'p', 'p', 'l', ...], and therefore when Airflow/MWAA tries to use that operator it fails with the error message:
AttributeError: 'EmrServerlessDeleteApplicationOperator' object has no attribute 'a'
I found a good explanation in this SO question: https://stackoverflow.com/questions/56845602/python-airflow-error-attributeerror-xsensor-object-has-no-attribute-l
Based on that I changed that line to:
template_fields: Sequence[str] = ("application_id",)
and after that change, the operator 'EmrServerlessDeleteApplicationOperator' works.
Franco
Hi Team,
While trying to test EMRServerless operator using the github link in requirements file, I am getting the following error
"Broken DAG: [/usr/local/airflow/dags/EMR_Serverless.py] Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/emr_serverless/operators/emr.py", line 22, in
from emr_serverless.hooks.emr import EmrServerlessHook
File "/usr/local/lib/python3.7/site-packages/emr_serverless/hooks/emr.py", line 23, in
from airflow.compat.functools import cached_property
ModuleNotFoundError: No module named 'airflow.compat' "
Versions used in Docker container:
It would be great if someone can suggest me workarounds.
Thanks
Hi, I'd like that config
in the EmrServerlessCreateApplicationOperator
be a templated field so that I can use jinja templates in it. Would it be possible to incorporate it into the next release? I have tried overriding the operator and it's worked just fine:
class CustomEmrServerlessCreateApplicationOperator(EmrServerlessCreateApplicationOperator):
template_fields: Sequence[str] = ("config",)
Thanks!
We have a use case where we would like to start job runs in EMR Serverless where the job name is derived from the output from a previous task.
Since the job name is passed via the 'config' argument to the EmrServerlessStartJobOperator, this request is to add 'config' to the list of 'template_fields' in that operator (https://github.com/aws-samples/emr-serverless-samples/blob/main/airflow/emr_serverless/operators/emr.py#L144-L149), if at all possible.
Thanks in advance,
Franco Venturi
configuration_overrides
is a required field in EmrServerlessStartJobOperator
, but it's not a required field.
Error message when trying to start a job without it:
E airflow.exceptions.AirflowException: Argument ['configuration_overrides'] is required
As detailed here and here we should be able to install and use a python venv:
--conf spark.archives=s3://DOC-EXAMPLE-BUCKET/EXAMPLE-PREFIX/pyspark_venv.tar.gz#environment
--conf spark.emr-serverless.driverEnv.PYSPARK_DRIVER_PYTHON=./environment/bin/python
--conf spark.emr-serverless.driverEnv.PYSPARK_PYTHON=./environment/bin/python
--conf spark.emr-serverless.executorEnv.PYSPARK_PYTHON=./environment/bin/python
but that doesn't seem to work. The application fails with this error:
Unpacking an archive s3://DOC-EXAMPLE-BUCKET/EXAMPLE-PREFIX/pyspark_venv.tar.gz#environment from /tmp/spark-02908b0e-9b64-469d-xxx-xxxxxxxx/pyspark_venv.tar.gz to /home/hadoop/./environment
Exception in thread "main" java.io.IOException: Cannot run program "./environment/bin/python": error=2, No such file or directory
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
at org.apache.spark.deploy.PythonRunner$.main(PythonRunner.scala:105)
at org.apache.spark.deploy.PythonRunner.main(PythonRunner.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1003)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1092)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1101)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.IOException: error=2, No such file or directory
at java.lang.UNIXProcess.forkAndExec(Native Method)
at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
at java.lang.ProcessImpl.start(ProcessImpl.java:134)
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
... 14 more
22/07/20 05:36:18 INFO ShutdownHookManager: Shutdown hook called
22/07/20 05:36:18 INFO ShutdownHookManager: Deleting directory /tmp/spark-02908b0e-9b64-469d-b094-edee291a2426
This repository contains a great example of using a more recent python interpreter on EMR serverless.
Using that example I am able to use a custom python3.11 + preinstalled modules venv. This works fine for spark-submit jobs. In interactive mode, namely EMR Studio, I can also use my custom venv. However the deployed version of jupyter, in pariticular ipython, has compatibility issues with newer versions of python. Some commands fail with:
An error was encountered:
required field "type_ignores" missing from Module
Traceback (most recent call last):
File "/tmp/6833554925722006797", line 226, in execute
code = compile(mod, '<stdin>', 'exec')
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: required field "type_ignores" missing from Module
I see 2 possible solutions to this:
emr-serverless/spark/emr-7.0.0
, if so, can it be upgraded with a custom image?But maybe I miss something? Is there another way? Do I misinterpret the stacktrace?
It is possible to develop locally of course, however the data/computation should happen in AWS.
Hi! I am testing the Operator and I found that airflow is marking a task as SUCCESS
even though the EMR job state is FAILED
.
I think the problem is in the EmrServerlessHook
, this condition:
if state in failure_states:
raise AirflowException(
f"{object_type.title()} reached failure state {state}."
)
should be at the end in the while loop.
Thanks!
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.