Giter Club home page Giter Club logo

prefect-databricks's Introduction

Note

Active development of this project has moved within PrefectHQ/prefect. The code can be found here and documentation here. Please open issues and PRs against PrefectHQ/prefect instead of this repository.

prefect-databricks


PyPI

Visit the full docs here to see additional examples and the API reference.

The prefect-databricks collection makes it easy to coordiante Databricks jobs with other tools in your data stack using Prefect. Check out the examples below to get started!

Getting Started

Integrate with Prefect flows

Using Prefect with Databricks allows you to define and orchestrate complex data workflows that take advantage of the scalability and performance of Databricks.

This can be especially useful for data-intensive tasks such as ETL (extract, transform, load) pipelines, machine learning training and inference, and real-time data processing.

Below is an example of how you can incorporate Databricks notebooks within your Prefect flows.

Be sure to install prefect-databricks and save a credentials block to run the examples below!

If you don't have an existing notebook ready on Databricks, you can copy the following, and name it example.ipynb. This notebook, accepts a name parameter from the flow and simply prints a message.

name = dbutils.widgets.get("name")
message = f"Don't worry {name}, I got your request! Welcome to prefect-databricks!"
print(message)

Here, the flow launches a new cluster to run example.ipynb and waits for the completion of the notebook run. Replace the placeholders and run.

from prefect import flow
from prefect_databricks import DatabricksCredentials
from prefect_databricks.flows import jobs_runs_submit_and_wait_for_completion
from prefect_databricks.models.jobs import (
    AutoScale,
    AwsAttributes,
    JobTaskSettings,
    NotebookTask,
    NewCluster,
)


@flow
def jobs_runs_submit_flow(block_name: str, notebook_path: str, **base_parameters):
    databricks_credentials = DatabricksCredentials.load(block_name)

    # specify new cluster settings
    aws_attributes = AwsAttributes(
        availability="SPOT",
        zone_id="us-west-2a",
        ebs_volume_type="GENERAL_PURPOSE_SSD",
        ebs_volume_count=3,
        ebs_volume_size=100,
    )
    auto_scale = AutoScale(min_workers=1, max_workers=2)
    new_cluster = NewCluster(
        aws_attributes=aws_attributes,
        autoscale=auto_scale,
        node_type_id="m4.large",
        spark_version="10.4.x-scala2.12",
        spark_conf={"spark.speculation": True},
    )

    # specify notebook to use and parameters to pass
    notebook_task = NotebookTask(
        notebook_path=notebook_path,
        base_parameters=base_parameters,
    )

    # compile job task settings
    job_task_settings = JobTaskSettings(
        new_cluster=new_cluster,
        notebook_task=notebook_task,
        task_key="prefect-task"
    )

    run = jobs_runs_submit_and_wait_for_completion(
        databricks_credentials=databricks_credentials,
        run_name="prefect-job",
        tasks=[job_task_settings]
    )

    return run


jobs_runs_submit_flow(
    block_name="BLOCK-NAME-PLACEHOLDER"
    notebook_path="/Users/<EMAIL_ADDRESS_PLACEHOLDER>/example.ipynb",
    name="Marvin"
)

Upon execution, the notebook run should output:

Don't worry Marvin, I got your request! Welcome to prefect-databricks!

!!! info "Input dictionaries in the place of models"

Instead of using the built-in models, you may also input a valid dictionary.

For example, the following are equivalent:

```python
auto_scale=AutoScale(min_workers=1, max_workers=2)
```

```python
auto_scale={"min_workers": 1, "max_workers": 2}
```

If you have an existing Databricks job, you can run it using jobs_runs_submit_by_id_and_wait_for_completion:

from prefect import flow

from prefect_databricks import DatabricksCredentials
from prefect_databricks.flows import (
    jobs_runs_submit_by_id_and_wait_for_completion,
)


@flow
def existing_job_submit(databricks_credentials_block_name: str, job_id):
    databricks_credentials = DatabricksCredentials.load(name=block_name)

    run = jobs_runs_submit_by_id_and_wait_for_completion(
        databricks_credentials=databricks_credentials, job_id=job_id
    )

    return run

existing_job_submit(databricks_credentials_block_name="db-creds", job_id="YOUR-JOB-NAME")

Resources

For more tips on how to use tasks and flows in a Collection, check out Using Collections!

Note, the tasks within this collection were created by a code generator using the service's OpenAPI spec.

The service's REST API documentation can be found here.

Installation

Install prefect-databricks with pip:

pip install prefect-databricks

Requires an installation of Python 3.7+.

We recommend using a Python virtual environment manager such as pipenv, conda or virtualenv.

These tasks are designed to work with Prefect 2. For more information about how to use Prefect, please refer to the Prefect documentation.

Saving Credentials to Block

To use the load method on Blocks, you must already have a block document saved through code or saved through the UI.

Below is a walkthrough on saving block documents through code; simply create a short script, replacing the placeholders.

  1. Head over to Databricks.
  2. Login to your Databricks account and select a workspace.
  3. On the top right side of the nav bar, click on your account name -> User Settings.
  4. Click Access tokens -> Generate new token -> Generate and copy the token.
  5. Note down your Databricks instance from the browser URL, formatted like https://<DATABRICKS-INSTANCE>.cloud.databricks.com/
  6. Create a short script, replacing the placeholders.
from prefect_databricks import DatabricksCredentials

credentials = DatabricksCredentials(
    databricks_instance="DATABRICKS-INSTANCE-PLACEHOLDER"
    token="TOKEN-PLACEHOLDER"
)

connector.save("BLOCK_NAME-PLACEHOLDER")

Congrats! You can now easily load the saved block, which holds your credentials:

from prefect_databricks import DatabricksCredentials

DatabricksCredentials.load("BLOCK_NAME-PLACEHOLDER")

!!! info "Registering blocks"

Register blocks in this module to
[view and edit them](https://orion-docs.prefect.io/ui/blocks/)
on Prefect Cloud:

```bash
prefect block register -m prefect_databricks
```

Feedback

If you encounter any bugs while using prefect-databricks, feel free to open an issue in the prefect-databricks repository.

If you have any questions or issues while using prefect-databricks, you can find help in either the Prefect Discourse forum or the Prefect Slack community.

Feel free to star or watch prefect-databricks for updates too!

Contributing

If you'd like to help contribute to fix an issue or add a feature to prefect-databricks, please propose changes through a pull request from a fork of the repository.

Here are the steps:

  1. Fork the repository
  2. Clone the forked repository
  3. Install the repository and its dependencies:
pip install -e ".[dev]"
  1. Make desired changes
  2. Add tests
  3. Insert an entry to CHANGELOG.md
  4. Install pre-commit to perform quality checks prior to commit:
pre-commit install
  1. git commit, git push, and create a pull request

prefect-databricks's People

Contributors

ahuang11 avatar avishniakov avatar chrisguidry avatar dependabot[bot] avatar desertaxle avatar discdiver avatar edmondop avatar jsopkin avatar micklaw avatar prefect-collection-synchronizer[bot] avatar serinamarie avatar urimandujano avatar zerodarkzone avatar zzstoatzz avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

prefect-databricks's Issues

Running jobs_runs_submit_and_wait_for_completion without run_name

If run_name defaults to None, it will result in:

prefect.exceptions.ParameterTypeError: Flow run received invalid parameters:
 - run_name: none is not an allowed value

in jobs_runs_submit_and_wait_for_completion
    jobs_runs_state, jobs_runs_metadata = await jobs_runs_wait_for_completion(
ValueError: too many values to unpack (expected 2)

Because Prefect validates the input, but here the run_name arg isn't optional so it crashes:
https://github.com/PrefectHQ/prefect-databricks/blob/main/prefect_databricks/flows.py#L399-L400

Also need to update the type annotations in the base flow as well.

Originally reported from:
https://prefect-community.slack.com/archives/CL09KU1K7/p1664168414041899

`NewCluster` has `node_type_id` as mandatory

Hi,

in current implementation of NewCluster it is not possible to pass in only instance_pool_id, because node_type_id is mandatory field. Obviously, if we pass both it will fail on cluster creation on Databricks side, since instance_pool_id and node_type_id are self-excluding settings for a cluster.

I can contribute to the package myself, if I'm assigned with necessary permissions (just not sure how contributing works for prefect-databricks and if it is allowed by maintainers).

Errors

>>> NewCluster(instance_pool_id="ABC", spark_version="10.4.x-scala2.12")
---------------------------------------------------------------------------
ValidationError                           Traceback (most recent call last)
Cell In [37], line 1
----> 1 NewCluster(instance_pool_id="ABC", spark_version="10.4.x-scala2.12")

File ~/Projects/GitHub/Libs/mls-mlops/.venv/lib/python3.8/site-packages/pydantic/main.py:342, in pydantic.main.BaseModel.__init__()

ValidationError: 1 validation error for NewCluster
node_type_id
  field required (type=value_error.missing)
>>> NewCluster(node_type_id="m5d.xlarge", instance_pool_id="ABC", spark_version="10.4.x-scala2.12")
Malformed Request from Databricks

job_submission_handler no longer supported by recent versions of Prefect (2.14?)

The job_submission_handler introduced with #89 is no longer working with recent versions of Prefect due to:

/usr/local/lib/python3.9/site-packages/prefect/utilities/callables.py:296: UserWarning: Callable 
job_submission_handler was excluded from schema since JSON schema has no equivalent type.

This is likely because job_submission_handler is Optional[Callable]. It was working with Prefect 2.12.x. The error first appeared after upgrading my Prefect Server to 2.14.10. It seems that now Prefect flow's parameters must be JSON-encodable, and Callable is not.

Expectation / Proposal

Is this a regression introduced by Prefect in a recent release, or is it the desired behaviour?

Traceback / Example

`socket.gaierror: [Errno -2] Name or service not known` error

Summary

I went through the getting started example here but I receive a socket.gaierror: [Errno -2] Name or service not known error.

from prefect import flow
from prefect_databricks import DatabricksCredentials
from prefect_databricks.jobs import jobs_list


@flow
def example_execute_endpoint_flow():
    databricks_credentials = DatabricksCredentials.load("databricks")
    jobs = jobs_list(
        databricks_credentials,
        limit=5
    )
    return jobs

example_execute_endpoint_flow()

I checked that it's not because of problems with the credentials by

  1. Using the Databricks API endpoint directly with the same instance and token which worked.
  2. I checked if it's due to the blocks by using something like
DatabricksCredentials(databricks_instance=MY_INSTANCE, token=MY_TOKEN)

This resulted in the same error

Error

08:49:44.624 | INFO    | prefect.engine - Created flow run 'spry-malkoha' for flow 'example-execute-endpoint-flow'
08:49:45.145 | INFO    | Flow run 'spry-malkoha' - Created task run 'jobs_list-8bffe913-0' for task 'jobs_list'
08:49:45.149 | INFO    | Flow run 'spry-malkoha' - Executing 'jobs_list-8bffe913-0' immediately...
08:49:45.378 | ERROR   | Task run 'jobs_list-8bffe913-0' - Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/httpcore/_exceptions.py", line 8, in map_exceptions
    yield
  File "/usr/local/lib/python3.8/dist-packages/httpcore/backends/asyncio.py", line 109, in connect_tcp
    stream: anyio.abc.ByteStream = await anyio.connect_tcp(
  File "/usr/local/lib/python3.8/dist-packages/anyio/_core/_sockets.py", line 189, in connect_tcp
    gai_res = await getaddrinfo(
  File "/usr/local/lib/python3.8/dist-packages/anyio/_core/_sockets.py", line 496, in getaddrinfo
    gai_res = await get_asynclib().getaddrinfo(
  File "/usr/local/lib/python3.8/dist-packages/anyio/_backends/_asyncio.py", line 1754, in getaddrinfo
    result = await get_running_loop().getaddrinfo(
  File "/usr/lib/python3.8/asyncio/base_events.py", line 825, in getaddrinfo
    return await self.run_in_executor(
  File "/usr/lib/python3.8/concurrent/futures/thread.py", line 57, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/usr/lib/python3.8/socket.py", line 918, in getaddrinfo
    for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
socket.gaierror: [Errno -2] Name or service not known

During handling of the above exception, another exception occurred:

I can add the full stack trace but it is quite long.

Versions

prefect             2.3.1
prefect-databricks  0.1.1

`ClusterAttributes` gone from `models.jobs`

In 0.1.2 we have a models.jobs generated out of jobs.yaml, but before it was generated out of jobs-2.1-aws.yaml. Current main doesn't have file named as jobs.yaml, but has the former one: jobs-2.1-aws.yaml.

Issue

Our code base was relying on ClusterAttributes which is still in the schema ClusterAttributes link.
Was this change of schema for generation intended and how to find a new schema file?

I observed that DockerImage is also gone from models.jobs.

Background

We use specific setting out of it: docker_image. If this change was intended how to pass it now?

Databricks flow that runs an existing job and waits for it to return

This is more of a feature request . . .

The utility functions of this repo allow you to create, run, and wait for result for a custom databricks job (the definition of which you pass in to the function), but there isn't a simple way to run and wait for result for an existing databricks job by job id. This would be very helpful.

Error when running a multitask job

Expectation / Proposal

The jobs_runs_submit_by_id_and_wait_for_completion flow fails when running a multi task databricks job.
I think it has to do with some changes in the databricks jobs API 2.1.

Traceback / Example

jobs_runs_submit_by_id_and_wait_for_completion
    task_run_output = await task_run_output_future.result()
  File "/usr/local/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 292, in aresult
    return await asyncio.wrap_future(self.future)
  File "/usr/local/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 346, in _run_async
    result = await coro
  File "/usr/local/lib/python3.9/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/usr/local/lib/python3.9/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 1655, in orchestrate_task_run
    result = await call.aresult()
  File "/usr/local/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 292, in aresult
    return await asyncio.wrap_future(self.future)
  File "/usr/local/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 346, in _run_async
    result = await coro
  File "/usr/local/lib/python3.9/site-packages/prefect_databricks/jobs.py", line 1238, in jobs_runs_get_output
    contents = _unpack_contents(response, responses)
  File "/usr/local/lib/python3.9/site-packages/prefect_databricks/rest.py", line 157, in _unpack_contents
    raise httpx.HTTPStatusError(
httpx.HTTPStatusError: A job run with multiple tasks was provided.JSON response: {'error_code': 'INVALID_PARAMETER_VALUE', 'message': 'Retrieving the output of runs with multiple tasks is not supported. Please retrieve the output of each individual task run instead.'}
10:27:05 PM
prefect.flow_runs
Finished in state Failed("Flow run encountered an exception. HTTPStatusError: A job run with multiple tasks was provided.JSON response: {'error_code': 'INVALID_PARAMETER_VALUE', 'message': 'Retrieving the output of runs with multiple tasks is not supported. Please retrieve the output of each individual task run instead.'}")

Add collection sync workflow using cruft

Add cruft to repo to allow synchronization of this collection with the original template.
Cruft can be added by running cruft link. Note that a starting commit will need to be specified.
Using the commit of the prefect-collection-template closest to the generation date of this repo
is a good default.

ACL not serializable for `jobs_runs_submit_and_wait_for_completion`

Hello,

I was trying to set proper ACL for RunSubmit using jobs_runs_submit_and_wait_for_completion as follows:

import os
from asyncio import run

from prefect_databricks import DatabricksCredentials
from prefect_databricks.flows import jobs_runs_submit_and_wait_for_completion
from prefect_databricks.models.jobs import (
    AccessControlList,
    AccessControlRequestForGroup,
    AccessControlRequestForUser,
    CanManage,
    CanView,
    NewCluster,
    RunSubmitTaskSettings,
    SparkPythonTask,
)

owners = ["[email protected]"]
acl = [
    AccessControlRequestForUser(
        permission_level=CanManage.canmanage,
        user_name=user,
    )
    for user in owners
]
acl.append(
    AccessControlRequestForGroup(group_name="users", permission_level=CanView.canview)
)

run(
    jobs_runs_submit_and_wait_for_completion(
        databricks_credentials=DatabricksCredentials(
            databricks_instance=os.environ["DATABRICKS_INSTANCE"],
            token=os.environ["DATABRICKS_TOKEN"],
        ),
        tasks=[
            RunSubmitTaskSettings(
                new_cluster=NewCluster(
                    spark_version="10.4.x-scala2.12",
                    num_workers=1,
                    node_type_id="m5d.large",
                ),
                spark_python_task=SparkPythonTask(python_file="s3://foo/bar.py"),
                task_key="foo",
            )
        ],
        access_control_list=acl,
    )
)

Running it results in following:

12:51:21.752 | INFO    | prefect.engine - Created flow run 'space-otter' for flow 'Submit jobs runs and wait for completion'
12:51:22.371 | INFO    | Flow run 'space-otter' - Created task run 'jobs_runs_submit-c00eee75-0' for task 'jobs_runs_submit'
12:51:22.372 | INFO    | Flow run 'space-otter' - Submitted task run 'jobs_runs_submit-c00eee75-0' for execution.
12:51:22.547 | ERROR   | Task run 'jobs_runs_submit-c00eee75-0' - Encountered exception during execution:
Traceback (most recent call last):
  File "/***/.venv/lib/python3.8/site-packages/prefect/engine.py", line 1215, in orchestrate_task_run
    result = await task.fn(*args, **kwargs)
  File "/***/.venv/lib/python3.8/site-packages/prefect_databricks/jobs.py", line 1634, in jobs_runs_submit
    response = await execute_endpoint.fn(
  File "/***/.venv/lib/python3.8/site-packages/prefect_databricks/rest.py", line 132, in execute_endpoint
    response = await getattr(client, http_method)(
  File "/***/.venv/lib/python3.8/site-packages/httpx/_client.py", line 1848, in post
    return await self.request(
  File "/***/.venv/lib/python3.8/site-packages/httpx/_client.py", line 1520, in request
    request = self.build_request(
  File "/***/.venv/lib/python3.8/site-packages/httpx/_client.py", line 360, in build_request
    return Request(
  File "/***/.venv/lib/python3.8/site-packages/httpx/_models.py", line 339, in __init__
    headers, stream = encode_request(
  File "/***/.venv/lib/python3.8/site-packages/httpx/_content.py", line 211, in encode_request
    return encode_json(json)
  File "/***/.venv/lib/python3.8/site-packages/httpx/_content.py", line 174, in encode_json
    body = json_dumps(json).encode("utf-8")
  File "/usr/local/Cellar/[email protected]/3.8.15/Frameworks/Python.framework/Versions/3.8/lib/python3.8/json/__init__.py", line 231, in dumps
    return _default_encoder.encode(obj)
  File "/usr/local/Cellar/[email protected]/3.8.15/Frameworks/Python.framework/Versions/3.8/lib/python3.8/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/usr/local/Cellar/[email protected]/3.8.15/Frameworks/Python.framework/Versions/3.8/lib/python3.8/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "/usr/local/Cellar/[email protected]/3.8.15/Frameworks/Python.framework/Versions/3.8/lib/python3.8/json/encoder.py", line 179, in default
    raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type CanManage is not JSON serializable
12:51:22.610 | ERROR   | Task run 'jobs_runs_submit-c00eee75-0' - Finished in state Failed('Task run encountered an exception: TypeError: Object of type CanManage is not JSON serializable\n')
12:51:22.610 | ERROR   | Flow run 'space-otter' - Encountered exception during execution:
Traceback (most recent call last):
  File "/***/.venv/lib/python3.8/site-packages/prefect/engine.py", line 603, in orchestrate_flow_run
    result = await flow_call()
  File "/***/.venv/lib/python3.8/site-packages/prefect_databricks/flows.py", line 265, in jobs_runs_submit_and_wait_for_completion
    multi_task_jobs_runs = await multi_task_jobs_runs_future.result()
  File "/***/.venv/lib/python3.8/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/***/.venv/lib/python3.8/site-packages/prefect/states.py", line 74, in _get_state_result
    raise await get_state_exception(state)
  File "/***/.venv/lib/python3.8/site-packages/prefect/engine.py", line 1215, in orchestrate_task_run
    result = await task.fn(*args, **kwargs)
  File "/***/.venv/lib/python3.8/site-packages/prefect_databricks/jobs.py", line 1634, in jobs_runs_submit
    response = await execute_endpoint.fn(
  File "/***/.venv/lib/python3.8/site-packages/prefect_databricks/rest.py", line 132, in execute_endpoint
    response = await getattr(client, http_method)(
  File "/***/.venv/lib/python3.8/site-packages/httpx/_client.py", line 1848, in post
    return await self.request(
  File "/***/.venv/lib/python3.8/site-packages/httpx/_client.py", line 1520, in request
    request = self.build_request(
  File "/***/.venv/lib/python3.8/site-packages/httpx/_client.py", line 360, in build_request
    return Request(
  File "/***/.venv/lib/python3.8/site-packages/httpx/_models.py", line 339, in __init__
    headers, stream = encode_request(
  File "/***/.venv/lib/python3.8/site-packages/httpx/_content.py", line 211, in encode_request
    return encode_json(json)
  File "/***/.venv/lib/python3.8/site-packages/httpx/_content.py", line 174, in encode_json
    body = json_dumps(json).encode("utf-8")
  File "/usr/local/Cellar/[email protected]/3.8.15/Frameworks/Python.framework/Versions/3.8/lib/python3.8/json/__init__.py", line 231, in dumps
    return _default_encoder.encode(obj)
  File "/usr/local/Cellar/[email protected]/3.8.15/Frameworks/Python.framework/Versions/3.8/lib/python3.8/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/usr/local/Cellar/[email protected]/3.8.15/Frameworks/Python.framework/Versions/3.8/lib/python3.8/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "/usr/local/Cellar/[email protected]/3.8.15/Frameworks/Python.framework/Versions/3.8/lib/python3.8/json/encoder.py", line 179, in default
    raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type CanManage is not JSON serializable
12:51:22.670 | ERROR   | Flow run 'space-otter' - Finished in state Failed('Flow run encountered an exception. TypeError: Object of type CanManage is not JSON serializable\n')
Traceback (most recent call last):
  File "tmp/issue.py", line 25, in <module>
    run(
  File "/usr/local/Cellar/[email protected]/3.8.15/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/local/Cellar/[email protected]/3.8.15/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "/***/.venv/lib/python3.8/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
    return await fn(*args, **kwargs)
  File "/***/.venv/lib/python3.8/site-packages/prefect/engine.py", line 237, in create_then_begin_flow_run
    return await state.result(fetch=True)
  File "/***/.venv/lib/python3.8/site-packages/prefect/states.py", line 74, in _get_state_result
    raise await get_state_exception(state)
  File "/***/.venv/lib/python3.8/site-packages/prefect/engine.py", line 603, in orchestrate_flow_run
    result = await flow_call()
  File "/***/.venv/lib/python3.8/site-packages/prefect_databricks/flows.py", line 265, in jobs_runs_submit_and_wait_for_completion
    multi_task_jobs_runs = await multi_task_jobs_runs_future.result()
  File "/***/.venv/lib/python3.8/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/***/.venv/lib/python3.8/site-packages/prefect/states.py", line 74, in _get_state_result
    raise await get_state_exception(state)
  File "/***/.venv/lib/python3.8/site-packages/prefect/engine.py", line 1215, in orchestrate_task_run
    result = await task.fn(*args, **kwargs)
  File "/***/.venv/lib/python3.8/site-packages/prefect_databricks/jobs.py", line 1634, in jobs_runs_submit
    response = await execute_endpoint.fn(
  File "/***/.venv/lib/python3.8/site-packages/prefect_databricks/rest.py", line 132, in execute_endpoint
    response = await getattr(client, http_method)(
  File "/***/.venv/lib/python3.8/site-packages/httpx/_client.py", line 1848, in post
    return await self.request(
  File "/***/.venv/lib/python3.8/site-packages/httpx/_client.py", line 1520, in request
    request = self.build_request(
  File "/***/.venv/lib/python3.8/site-packages/httpx/_client.py", line 360, in build_request
    return Request(
  File "/***/.venv/lib/python3.8/site-packages/httpx/_models.py", line 339, in __init__
    headers, stream = encode_request(
  File "/***/.venv/lib/python3.8/site-packages/httpx/_content.py", line 211, in encode_request
    return encode_json(json)
  File "/***/.venv/lib/python3.8/site-packages/httpx/_content.py", line 174, in encode_json
    body = json_dumps(json).encode("utf-8")
  File "/usr/local/Cellar/[email protected]/3.8.15/Frameworks/Python.framework/Versions/3.8/lib/python3.8/json/__init__.py", line 231, in dumps
    return _default_encoder.encode(obj)
  File "/usr/local/Cellar/[email protected]/3.8.15/Frameworks/Python.framework/Versions/3.8/lib/python3.8/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/usr/local/Cellar/[email protected]/3.8.15/Frameworks/Python.framework/Versions/3.8/lib/python3.8/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "/usr/local/Cellar/[email protected]/3.8.15/Frameworks/Python.framework/Versions/3.8/lib/python3.8/json/encoder.py", line 179, in default
    raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type CanManage is not JSON serializable

Once I remove ACL parameter - request will go to Databricks normally.

I tried to solve serialization issue with modifying parmission level objects like this:

class IsOwner(str, Enum):
    """
    Perimssion that represents ownership of the job.
    """

    isowner = "IS_OWNER"

After that request with ACL reach Databricks, but results in httpx.HTTPStatusError: The request was malformed. See JSON response for error details.JSON response: {'error_code': 'BAD_REQUEST', 'message': 'Principal name not defined'}

On top of that I tried different definitions of ACLs, like:

acl = [
    AccessControlRequest.parse_obj(
        AccessControlRequestForUser(
            permission_level=CanManage.canmanage,
            user_name=user,
        )
    )
    for user in owners
]
acl.append(
    AccessControlRequest.parse_obj(
        AccessControlRequestForGroup(
            group_name="users", permission_level=CanView.canview
        )
    )
)

Also tried keeping only users as viewers - same error.

Any advise would be helpful, since this is critical feature: job is normally executed on behalf of automation user.

Handling job creation when using jobs_runs_submit_by_id_and_wait_for_completion

At the moment, when using the jobs_runs_submit_by_id_and_wait_for_completion, the details of the Databricks job are not available in the parent flow or task until the job completes, and they are returned. However, for long running job one could desire to store the Databricks Job URL as an artifact, for example

https://github.com/PrefectHQ/prefect-databricks/blob/main/prefect_databricks/flows.py#L462

Expectation / Proposal

Traceback / Example

Installing prefect-databricks with poetry yields installation of every version of botocore

Not sure if this is on purpose, but installing this package with poetry results in every version of botocore being installed.

Command: poetry add prefect-databricks

Related:

Workaround:

  1. Install in virtual environment using pip: poetry run pip install prefect-databricks
  2. Then add as a dependency: poetry add prefect-databricks

Goes nearly instantly. But long term solution is probably a version pinning of something related to boto3/moto etc

jobs_runs_submit_and_wait_for_completion raises an error on import

I have a flow which wraps the call for jobs_runs_submit_and_wait_for_completion:

@flow
def my_flow(databricks_credentials, job_task_settings):
    """Oversimplified"""
    task_run = jobs_runs_submit_and_wait_for_completion(
        databricks_credentials=databricks_credentials,
        tasks=[job_task_settings]
    )

    return task_run

But I receive the following error:

    raise SkipField(f'Callable {field.name} was excluded from schema since JSON schema has no equivalent type.')
E   pydantic.v1.schema.SkipField: Callable job_submission_handler was excluded from schema since JSON schema has no equivalent type.

Full stack trace:

 During handling of the above exception, another exception occurred:
test_run_notebook.py:6: in <module>
    from ...
../../../.../tasks/databricks/run_notebook.py:10: in <module>
    from prefect_databricks.flows import jobs_runs_submit_and_wait_for_completion
PATH/lib/python3.9/site-packages/prefect_databricks/flows.py:61: in <module>
    async def jobs_runs_submit_and_wait_for_completion(
PATH/lib/python3.9/site-packages/prefect/flows.py:1382: in flow
    Flow(
PATH/lib/python3.9/site-packages/prefect/context.py:186: in __register_init__
    original_init(__self__, *args, **kwargs)
PATH/lib/python3.9/site-packages/prefect/flows.py:307: in __init__
    self.parameters = parameter_schema(self.fn)
PATH/lib/python3.9/site-packages/prefect/utilities/callables.py:340: in parameter_schema
    create_schema(
PATH/lib/python3.9/site-packages/prefect/utilities/callables.py:296: in create_v1_schema
    return model.schema(by_alias=True)
PATH/lib/python3.9/site-packages/pydantic/v1/main.py:664: in schema
    s = model_schema(cls, by_alias=by_alias, ref_template=ref_template)
PATH/lib/python3.9/site-packages/pydantic/v1/schema.py:188: in model_schema
    m_schema, m_definitions, nested_models = model_process_schema(
PATH/lib/python3.9/site-packages/pydantic/v1/schema.py:582: in model_process_schema
    m_schema, m_definitions, nested_models = model_type_schema(
PATH/lib/python3.9/site-packages/pydantic/v1/schema.py:632: in model_type_schema
    warnings.warn(skip.message, UserWarning)
E   UserWarning: Callable job_submission_handler was excluded from schema since JSON schema has no equivalent type.

I am using poetry for dependency management.

My pyproject.toml: (oversimplified)

python = ">=3.8,<4"
prefect = ">=2.14.14"
pydantic = ">=2"
prefect-databricks = ">=0.2.3"

Python version 3.9.6

jobs_runs_wait_for_completion doesn't retry correctly

Expectation / Proposal

I may be misunderstanding something, but I have a custom flow that calls jobs_runs_wait_for_completion under the hood. I am calling my custom flow with additional options like this:

custom_flow.with_options(retries=1)

However, when the underlying Databricks job fails, I see it acknowledged by Prefect but I don't see the Databricks job ever actually rerun.

Am I misunderstanding something basic about how to use jobs_runs_wait_for_completion in combination with with_options(retries=) to get automatic retries of the called Databricks job?

Traceback / Example

17:37:34.392 | INFO    | Flow run 'amorphous-sawfly' - Received non-final state 'AwaitingRetry' when proposing final state 'Failed' and will attempt to run again...
17:37:36.726 | INFO    | Flow run 'encouraging-bustard' - Finished in state Completed()
17:37:36.860 | ERROR   | Flow run 'amorphous-sawfly' - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/nchammas/Library/Caches/pypoetry/virtualenvs/ozawa-prefect-LtscNPhu-py3.9/lib/python3.9/site-packages/prefect/engine.py", line 833, in orchestrate_flow_run
    result = await flow_call.aresult()
  File "/Users/nchammas/Library/Caches/pypoetry/virtualenvs/ozawa-prefect-LtscNPhu-py3.9/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 292, in aresult
    return await asyncio.wrap_future(self.future)
  File "/Users/nchammas/Library/Caches/pypoetry/virtualenvs/ozawa-prefect-LtscNPhu-py3.9/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 316, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/Users/nchammas/dev/spx/ozawa/prefect2/ozawa_prefect/flows/databricks/__init__.py", line 66, in run_job_or_fail
    raise Exception(
Exception: {'result_state': 'FAILED', 'run_page_url': 'https://...cloud.databricks.com/?o=xxx#job/126796463608738/run/28379095', 'notebook_output': {}, 'error': 'ArithmeticException: / by zero'}
17:37:37.000 | ERROR   | Flow run 'amorphous-sawfly' - Finished in state Failed("Flow run encountered an exception. Exception: {'result_state': 'FAILED', 'run_page_url': 'https://...cloud.databricks.com/?o=xxx#job/126796463608738/run/28379095', 'notebook_output': {}, 'error': 'ArithmeticException: / by zero'}")

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.