Giter Club home page Giter Club logo

dbt-spark's Introduction

dbt logo

Unit Tests Badge

dbt enables data analysts and engineers to transform their data using the same practices that software engineers use to build applications.

dbt is the T in ELT. Organize, cleanse, denormalize, filter, rename, and pre-aggregate the raw data in your warehouse so that it's ready for analysis.

dbt-spark

The dbt-spark package contains all of the code enabling dbt to work with Apache Spark and Databricks. For more information, consult the docs.

Getting started

Running locally

A docker-compose environment starts a Spark Thrift server and a Postgres database as a Hive Metastore backend. Note: dbt-spark now supports Spark 3.3.2.

The following command starts two docker containers:

docker-compose up -d

It will take a bit of time for the instance to start, you can check the logs of the two containers. If the instance doesn't start correctly, try the complete reset command listed below and then try start again.

Create a profile like this one:

spark_testing:
  target: local
  outputs:
    local:
      type: spark
      method: thrift
      host: 127.0.0.1
      port: 10000
      user: dbt
      schema: analytics
      connect_retries: 5
      connect_timeout: 60
      retry_all: true

Connecting to the local spark instance:

  • The Spark UI should be available at http://localhost:4040/sqlserver/
  • The endpoint for SQL-based testing is at http://localhost:10000 and can be referenced with the Hive or Spark JDBC drivers using connection string jdbc:hive2://localhost:10000 and default credentials dbt:dbt

Note that the Hive metastore data is persisted under ./.hive-metastore/, and the Spark-produced data under ./.spark-warehouse/. To completely reset you environment run the following:

docker-compose down
rm -rf ./.hive-metastore/
rm -rf ./.spark-warehouse/

Reporting bugs and contributing code

  • Want to report a bug or request a feature? Let us know on Slack, or open an issue.

Code of Conduct

Everyone interacting in the dbt project's codebases, issue trackers, chat rooms, and mailing lists is expected to follow the PyPA Code of Conduct.

Join the dbt Community

Reporting bugs and contributing code

Code of Conduct

Everyone interacting in the dbt project's codebases, issue trackers, chat rooms, and mailing lists is expected to follow the dbt Code of Conduct.

dbt-spark's People

Contributors

aaronsteers avatar beckjake avatar brunomurino avatar charlottevdscheun avatar chenyulinx avatar colin-rogers-dbt avatar collinprather avatar dave-connors-3 avatar dbeatty10 avatar dependabot[bot] avatar drewbanin avatar emmyoop avatar fishtownbuildbot avatar fokko avatar friendofasquid avatar github-actions[bot] avatar grindheim avatar gshank avatar jczuurmond avatar jtcohen6 avatar leahwicz avatar mcknight-42 avatar michelleark avatar mikealfare avatar nielszeilemaker avatar nssalian avatar peterallenwebb avatar poidra02 avatar ueshin avatar versusfacit avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

dbt-spark's Issues

Master not working with sources

With the latest master, I'm unable to use sources:

sources:
    - name: bibo
      tables:
        - name: orders

Gives the following error:

root@f425d97e576a:/dbtlake# dbt docs generate
Running with dbt=0.17.0-rc2
* Deprecation Warning: dbt v0.17.0 introduces a new config format for the
dbt_project.yml file. Support for the existing version 1 format will be removed
in a future release of dbt. The following packages are currently configured with
config version 1:
 - dbtlake

For upgrading instructions, consult the documentation:
  https://docs.getdbt.com/docs/guides/migration-guide/upgrading-to-0-17-0
Found 8 models, 0 tests, 0 snapshots, 0 analyses, 151 macros, 1 operation, 0 seed files, 10 sources

08:53:54 | Concurrency: 8 threads (target='dev')
08:53:54 | 
Encountered an error:
Runtime Error
  Runtime Error in model src_logistical_configuration_data (models/staging/bibo/src_logistical_configuration_data.sql)
    Error while parsing relation logistical_configuration_data_incremental_raw: 
        identifier: orders 
        schema: bibo 
        database: fokko 
    On Spark, database should not be set. Use the schema config to set a custom schema/database for this relation.

I have to dive into this, but just want to let you all know.

Saner approaches to getting metadata for Relations

Up to now, the dbt-spark plugin has leveraged a handful of metadata commands:

-- find all current databases
show databases

-- find all current tables in a database
show tables in my_db

-- determine if a relational object is a view or table
show tblproperties my_db.my_relation ('view.default.database')

The main issue with running one statement per relation is that it's very slow. This is justifiable for the get_catalog method of dbt docs generate, but not so at the start of a dbt run. Most databases have more powerful, accessible troves of metadata, often stored in an information_schema. Spark offers nothing so convenient; describe database returns only information about the database; describe table [extended] must be run for every relation.

@drewbanin ended up finding a semi-documented statement in the Spark source code that does most of the thing we want:

show table extended in my_db like '*'

It returns the same three columns as show tables in my_db, for all relations in my_db, with a bonus column information that packs a lot of good stuff:

Database: my_db
Table: view_model
Owner: root
Created Time: Wed Jan 29 01:58:46 UTC 2020
Last Access: Thu Jan 01 00:00:00 UTC 1970
Created By: Spark 2.4.4
Type: VIEW
View Text: select * from my_db.seed
View Default Database: default
View Query Output Columns: [id, first_name, last_name, email, gender, ip_address]
Table Properties: [transient_lastDdlTime=1580263126, view.query.out.col.3=email, view.query.out.col.0=id, view.query.out.numCols=6, view.query.out.col.4=gender, view.default.database=default, view.query.out.col.1=first_name, view.query.out.col.5=ip_address, view.query.out.col.2=last_name]
Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Storage Properties: [serialization.format=1]
Schema: root
 |-- id: long (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- ip_address: string (nullable = true)

The same command also exists in Hive. This is a "big if true" find that could immediately clean up our workarounds for relation types. We'll want to check that it's supported by all vendors/implementations of Spark before committing to this approach, so we'll benefit from folks testing in other environments.

Ephemeral model bug: nested CTE cannot reference previous CTE

Given a query like:

with my_first_cte as (

  with my_sub_cte as (
  
    select 1 as fun
    
  )
  
  select * from my_sub_cte
  
),

my_second_cte as (

  with my_next_sub_cte as (
  
    select * from my_first_cte
    
  )
  
  select * from my_next_sub_cte
  
)

select * from my_second_cte

Returns the following error:

ERROR processing query/statement. Error Code: 0, SQL state: org.apache.spark.sql.AnalysisException: Table or view not found: my_first_cte; line 17 pos 18

This prevents us from being able to have multiple ephemeral models in a dependency line, since the second CTE (ephemeral model) includes sub CTEs that reference the first CTE (ephemeral model).

Custom schemas: table already exists

Issues with re-running workflows when using custom schemas.

When I create a model with a custom schema configured:

-- models/clean/clean_accounts.sql
{{ config(alias='accounts', schema='clean', materialization='table') }}
select * from {{ source('incoming', 'accounts') }}

I am able to run the workflow successfully once:

> dbt run
...
Completed successfully

However, if I run the same workflow again I get an error:

> dbt run
...
Runtime Error in model clean_orders (models/clean/clean_accounts.sql)
  Database Error
    org.apache.spark.sql.AnalysisException: `dev_clean`.`accounts` already exists.;

Instead, the table should be dropped and recreated. If we repeat the same exercise without the schema='clean' configuration, everything works as expected.

Handle downtime on interactive cluster startup

This is relevant to Databricks-hosted Spark clusters. Interactive clusters take several minutes to start up and we need a way for dbt to anticipate that startup time in production.

Semi-related: Should dbt use Databricks' jobs API when running in production? It is also half as expensive (relative to compute required) than the interactive clusters API.

Materialized tables creation fails on EMR

When I attempt to have dbt run a simple job that results in a materialized Spark table using EMR, I get an error as follows:

> dbt run
Running with dbt=0.13.0
Found 1 models, 0 tests, 0 archives, 0 analyses, 100 macros, 0 operations, 0 seed files, 1 sources

16:25:55 | Concurrency: 1 threads (target='dev')
16:25:55 |
16:25:55 | 1 of 1 START table model data_lake_intgn_test.test_incremental_dt.... [RUN]
16:25:58 | 1 of 1 ERROR creating table model data_lake_intgn_test.test_incremental_dt [ERROR in 2.37s]
16:25:58 |
16:25:58 | Finished running 1 table models in 6.23s.

Completed with 1 errors:

Runtime Error in model data_lake_intgn_current|test_incremental_dt|current (models/data_lake_intgn/current/data_lake_intgn_current|test_incremental_dt|current.sql)
  java.lang.IllegalArgumentException: Can not create a Path from an empty string

Done. PASS=0 ERROR=1 SKIP=0 TOTAL=1

If I run the compiled query directly in PySpark on the EMR cluster, I get the same error message (with the following more complete stack trace):

py4j.protocol.Py4JJavaError: An error occurred while calling o58.sql.
: java.lang.IllegalArgumentException: Can not create a Path from an empty string
	at org.apache.hadoop.fs.Path.checkPathArg(Path.java:163)
	at org.apache.hadoop.fs.Path.<init>(Path.java:175)
	at org.apache.spark.sql.catalyst.catalog.CatalogUtils$.stringToURI(ExternalCatalogUtils.scala:236)
	at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getDatabase$1$$anonfun$apply$2.apply(HiveClientImpl.scala:343)
	at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getDatabase$1$$anonfun$apply$2.apply(HiveClientImpl.scala:339)
	at scala.Option.map(Option.scala:146)
	at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getDatabase$1.apply(HiveClientImpl.scala:339)
	at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getDatabase$1.apply(HiveClientImpl.scala:345)
	at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:275)
	at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:213)
	at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:212)
	at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:258)
	at org.apache.spark.sql.hive.client.HiveClientImpl.getDatabase(HiveClientImpl.scala:338)
	at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$getDatabase$1.apply(HiveExternalCatalog.scala:211)
	at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$getDatabase$1.apply(HiveExternalCatalog.scala:211)
	at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:97)
	at org.apache.spark.sql.hive.HiveExternalCatalog.getDatabase(HiveExternalCatalog.scala:210)
	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.getDatabase(ExternalCatalogWithListener.scala:65)
	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getDatabaseMetadata(SessionCatalog.scala:233)
	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.defaultTablePath(SessionCatalog.scala:472)
	at org.apache.spark.sql.catalyst.catalog.SessionCatalog$$anonfun$4.apply(SessionCatalog.scala:327)
	at org.apache.spark.sql.catalyst.catalog.SessionCatalog$$anonfun$4.apply(SessionCatalog.scala:327)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.validateTableLocation(SessionCatalog.scala:327)
	at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.run(createDataSourceTables.scala:170)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:115)
	at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:195)
	at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:195)
	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:195)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:80)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)

If I run the same query with the addition of a location statement, however, I do not get an error and the table is created successfully - e.g.:

create table [name]
using parquet 
location 's3://[bucket]/[path]' 
as SELECT * FROM [blah]"

I think that the root cause of this is that Databricks does some behind-the-scenes magic with default locations / managed tables / DBFS, which doesn't work on more vanilla Spark, at least in the context of EMR. It's possible that fiddling with some Spark configs could mitigate this, but in general I'd think that specifying an s3 path for a table would be a fairly normal thing to want to do.

There are a couple approaches that occur to me for dealing with this, which could probably be combined into a default / override kind of situation:

  • Set a 'root' location in your dbt_project.yml, and have dbt format model names into it, i.e. set root to be s3://my-bucket/prod/models/ and have model_1 get automatically put into s3://my-bucket/prod/models/model_1/, model_2 automatically go into s3://my-bucket/prod/models/model_2/, and so on.
  • Set a specific table location at the table level via config, so you arbitrarily place model_1 at s3://bucket-a/some-model and model_2 at s3://bucket-b/some-model-also

Dbt docs broken in 0.16.0

With the same code I'm able to generate docs in 0.15.3 but not 0.16.0 (with dbt and dbt-spark always on matching versions):

๐Ÿš€ dbt docs generate
Running with dbt=0.16.0
Found 2 models, 1 test, 0 snapshots, 0 analyses, 141 macros, 0 operations, 0 seed files, 1 source

22:34:15 | Concurrency: 3 threads (target='local')
22:34:15 |
22:34:15 | Done.
22:34:15 | Building catalog
Encountered an error:
too many values to unpack (expected 2)

The same works in 0.15.3

๐Ÿš€ dbt docs generate
Running with dbt=0.15.3
Found 2 models, 1 test, 0 snapshots, 0 analyses, 139 macros, 0 operations, 0 seed files, 1 source

22:38:07 | Concurrency: 3 threads (target='local')
22:38:07 |
22:38:07 | Done.
22:38:07 | Building catalog
22:38:08 | Catalog written to <redacted>/catalog.json

Note that this is using the docker-compose environment so it should be easily reproducible

Re-add table owner + stats to catalog

Background

  • @Fokko added owner (#39) and table statistics (#41) to the auto-generated docs site by fleshing out the static method _parse_relation and passing it to get_catalog
  • Amid all the merging for 0.15.3 (#65), the eventual implementation of get_catalog called list_relations + _massage_column_for_catalog. It never called _parse_relation. The unit tests on _parse_relation have been passing, but the output of that work never made it to the actual catalog.
  • The contract of get_catalog changed in dbt v0.16.0. We fixed in dbt-spark v0.16.1 by reimplementing some private methods (#77).

Next steps

This isn't a regression per se, since this feature never really worked in v0.15.3... but we sure meant for it to ship then!

We should update the catalog private method reimplementations to include owner and table stats. The basis for this work should be the latest implementation on core adapters.

Connecting to Spark cluster from Qubole

I'm trying to get dbt to connect up to my Spark cluster being managed by Qubole. Would love a tip on how to configure the profiles.yml for the connection. Thanks.

dbt fails cryptically on old (I assume?) versions of spark

With dbt-spark==0.15.3 and dbt==0.15.3, a user in slack reports a couple of errors:

  • no support for /* ... */ style comments.
  • when query-comment: none is set to avoid that, an error about not enough values to unpack (expected 4, got 1)
    • that's caused by the show table extended ... command only returning one column.

I don't know when spark introduced block comments (2.0 at the latest), but I'm pretty sure show table extended having 4 columns dates to spark 2.2 (around this PR). We should just detect that we're at least above that and bail out if we aren't. I don't think it's unreasonable to require 2.2 as a minimum version, but I'd be very happy to go even higher.

Edit: upon further thought, we should also detect when show table extended ... returns a number of columns that's not 4, and error about it in a better way than tuple-unpacking errors. Alternatively we could access the columns by name, and give a decent error if the column name doesn't exist.

Check existing relation type when renaming to __dbt_backup

If a dbt model already exists as a view and we are replacing it with a table, dbt will run

alter table analytics.my_model rename to analytics.my_model__dbt_backup

which returns the following error:

Runtime Error in model fct_evs (models/marts/fct_evs.sql)
  org.apache.spark.sql.AnalysisException: Cannot alter a view with ALTER TABLE. Please use ALTER VIEW instead;

I think this is because Redshift/Postgres allow using alter table on views.

Support Databricks connections via latest JDBC or ODBC driver

What

Latest JDBC/ODBC driver information, including download links, here.

We can pick either ODBC or JDBC, as the two are meant to be at feature parity.

The latest JDBC connection string looks like:

jdbc:spark://account.cloud.databricks.com:443/default;transportMode=http;ssl=1;AuthMech=3;httpPath=/sqlgateway/protocolv1/endpoint-alpha12345

That differs from the older cluster-specific connection string:

jdbc:spark://dbc-01db0449-2976.cloud.databricks.com:443/dbt_jcohen;transportMode=http;httpPath=sql/protocolv1/o/0/1234-567890-clust123;ssl=1;AuthMech=3;UID=token

How

We should add a new method: odbc.

Ideally, we'd support both the sqlgateway and cluster-specific connection mechanisms via driver, and allow the user to specify either a cluster ID or sqlservice endpoint in profiles.yml.

Several other community-contributed plugins already use ODBC driver connections via pyodbc, e.g. dbt-sqlserver. I think that's more promising than trying to hack this with JayDeBeApi.

Crucially, Databricks has requested that we pass a UserAgentEntry parameter to the driver in the format <product-name>/<product-version> <comment>. E.g. we could specify something like: fishtown-analytics-dbt-spark/0.19.0 (Databricks)

Why?

  • Databricks is not committed to supporting the thrift protocol long term
  • These drivers are meant to be significantly faster than older versions we tried in 2019
  • Connecting via these drivers is a prerequisite for accessing new Databricks features that are coming later this year

Spark_connection_url do not contain workspace_id while connecting to databricks

Hi

I tried connecting databricks via http. The dbt debug was failing at connection step.
Later I checked that spark_connection_url do not contain the workspace id of databricks (my organization's)

When I updated spark_connection_url my local computer's dbt-spark connector (with help of an engineering colleague) everything worked out

Reconcile `schema`/`database` as profile + model configs

Spark namespaces are a bilevel hierarchy, and the term schema and database are used interchangeably to refer to the topmost level.

In its core implementation, dbt expects database and schema to represent different logical constructs, whereby schemas are contained within databases. Right now, this results in some unexpected behavior and ugly output (in the literal sense) for the Spark plugin, as detailed in #74.

I think we have two options, which are functionally similar but differ in semantics:

  1. Treat Spark more like Redshift, where database really means "universe of objects I can access with the current connection." There is never a reason to set a custom database because there is only ever the one we're connected to. Error early and often if a user tries to set a custom database, or remove support for it as a profile/model config entirely.

  2. Across the board, dbt-spark should treat schema and database as interchangeable and mutually exclusive. It should accept one of (but never both) schema and database in the profile and model config. This requires subtle changes to the implementations of SparkCredentials and SparkRelation.

I'm leaning toward option 2 because it hews more closely to Spark's view of the world. In either case, these changes may be breaking for some projects.

Curious to hear input from @drewbanin @beckjake and members of the dbt+Spark community!

Transaction log has failed integrity checks

Light on details for now. A similar issue here with a good answer. This seems more broadly related to the way we drop and replace Delta tables.

Runtime Error in model dim_users (models/marts/dim_users.sql)
  java.lang.IllegalStateException: The transaction log has failed integrity checks. We recommend you contact Databricks support for assistance. To disable this check, set spark.databricks.delta.state.corruptionIsFatal to false. Failed verification of:
  Table size (bytes) - Expected: 696757759 Computed: 696757530

This error pops up when trying to drop or vacuum the existing table. I managed to circumvent by dropping and recreating the analytics schema.

Profile configuration not appropriate for EMR/on-prem clusters

As we discussed on Slack, the profile configuration is geared very much towards a Databricks-style backend, and isn't appropriate for connecting directly to an EMR cluster (or presumably an on-premise cluster). Basically, when connecting to an EMR cluster, you're connecting directly to the thrift endpoint on the master node; the only relevant fields are host (IP address / hostname of the master node), port, and username (hadoop, at least in the case of EMR). cluster and token aren't applicable in this context. Also, the SPARK_CONNECTION_URL template isn't relevant here - you're connecting directly to a specified host / IP address. It'd be great if there was a way to provide connection information that would allow an EMR cluster to serve as a Spark backend.

I've been successfully using PyHive to connect to an EMR cluster by doing something like:

from pyhive import hive
cursor = hive.connect(host='[some IP address]', port=10001, username='hadoop').cursor()

Some possibly relevant documentation: https://aws.amazon.com/premiumsupport/knowledge-center/jdbc-connection-emr/

Improvements to CI, release workflow

CI

Thanks to the hard work in #58, we can spin up dockerized Spark for integration tests. I think all PRs (including ones from external contributors) should run unit and integration tests in Circle.

Releases

For now, I think a good release flow might look like:

  • dbt-spark maintainer cuts a new GitHub release off the master branch, tagged with a semantic version
  • Automated deployment of that versioned release to pypi (+ homebrew?) via GitHub Actions

Eventually we may find we need dev/version branches, alpha/beta/release candidates, etc.

Remove leading period from info logging

This is low-priority cosmetic issue.

There appears an extra leading period in dbt run:

11:19:56 | 1 of 15 START table model .dbt_jcohen.materialized................... [RUN]
11:20:02 | 1 of 15 OK created table model .dbt_jcohen.materialized.............. [OK in 6.84s]
11:20:02 | 2 of 15 START incremental model .dbt_jcohen.incremental.............. [RUN]
11:20:10 | 2 of 15 ERROR creating incremental model .dbt_jcohen.incremental..... [ERROR in 7.55s]
...

Interestingly, the period appears only in the first line of dbt seed:

11:19:12 | 1 of 3 START seed file .dbt_jcohen.other_seed........................ [RUN]
11:19:20 | 1 of 3 OK loaded seed file dbt_jcohen.other_seed..................... [CREATE 100 in 8.06s]
11:19:20 | 2 of 3 START seed file .dbt_jcohen.other_seed_update................. [RUN]
11:19:27 | 2 of 3 OK loaded seed file dbt_jcohen.other_seed_update.............. [CREATE 200 in 6.72s]
11:19:27 | 3 of 3 START seed file .dbt_jcohen.seed.............................. [RUN]
11:19:33 | 3 of 3 OK loaded seed file dbt_jcohen.seed........................... [CREATE 5 in 5.74s]
  • This issue is new in 0.16.0. When I run 0.15.3, it doesn't appear.
  • I imagine it may be related to Spark's idiosyncratic behavior around databases/schemas.
  • Does it have something to do with the addition of a generate_database_name macro in dbt-core?

Unhandled EOFError() after 300s

For any long-running query, after 300 seconds, dbt will return with an unhandled error:

File "/usr/local/lib/python3.7/site-packages/dbt_core-0.13.0-py3.7.egg/dbt/adapters/sql/connections.py", line 61, in add_query
    cursor.execute(sql, bindings)
  File "/Users/jerco/dev/dbt-dev/dbt-spark/dbt/adapters/spark/connections.py", line 100, in execute
    return self._cursor.execute(sql, bindings)
  File "/usr/local/lib/python3.7/site-packages/PyHive-0.6.1-py3.7.egg/pyhive/hive.py", line 364, in execute
    response = self._connection.client.ExecuteStatement(req)
  File "/usr/local/lib/python3.7/site-packages/PyHive-0.6.1-py3.7.egg/TCLIService/TCLIService.py", line 280, in ExecuteStatement
    return self.recv_ExecuteStatement()
  File "/usr/local/lib/python3.7/site-packages/PyHive-0.6.1-py3.7.egg/TCLIService/TCLIService.py", line 292, in recv_ExecuteStatement
    (fname, mtype, rseqid) = iprot.readMessageBegin()
  File "/usr/local/lib/python3.7/site-packages/thrift-0.11.0-py3.7-macosx-10.14-x86_64.egg/thrift/protocol/TBinaryProtocol.py", line 148, in readMessageBegin
    name = self.trans.readAll(sz)
  File "/usr/local/lib/python3.7/site-packages/thrift-0.11.0-py3.7-macosx-10.14-x86_64.egg/thrift/transport/TTransport.py", line 66, in readAll
    raise EOFError()

This is raised by readall() in the thrift.transport module that the Spark adapter includes here.

The query itself is not cancelled and will continue running (to failure/completion/otherwise) in Spark.

Fix is_incremental() macro

The default implementation of is_incremental() bundles the following four conditions:

relation is not none
and relation.type == 'table'
and model.config.materialized == 'incremental'
and not flags.FULL_REFRESH

The second condition is always false on Spark because relation objects don't naturally have a value for type. Instead, relation type is handled by the spark_get_relation_type macro, which infers it from

show tblproperties ... ('view.default.database')

There are a few potential fixes:

  • Reimplement is_incremental() for dbt-spark
  • Update the relation object with its type as part of the incremental materialization, since we're already calling spark_get_relation_type here
  • At the beginning of runs, cache all info on dbt relations, including relation types (similar to get_catalog, although this takes a while to run)

Links in README.md to Spark documentation are broken

Both of these links give a 404 error

[Spark profile](https://docs.getdbt.com/docs/profile-spark)
[Spark specific configs](https://docs.getdbt.com/docs/spark-configs)

They should probably be set to these instead

[Spark profile](https://docs.getdbt.com/reference/warehouse-profiles/spark-profile/)
[Spark specific configs](https://docs.getdbt.com/reference/resource-configs/spark-configs/)

More efficient catalog generation

Picks up from #49

Background

Currently, _get_one_catalog runs show tblproperties and describe extended on every single relation, one by one. As a result, dbt docs generate can take several minutes to run.

All the same information is available at a schema level by running show table extended in [schema] like '*', which dbt-spark knows as the LIST_RELATIONS_MACRO_NAME.

Challenge

The tricky part is the formatting: the result of show table extended... is structured, but strange:

Database: my_db
Table: view_model
Owner: root
Created Time: Wed Jan 29 01:58:46 UTC 2020
Last Access: Thu Jan 01 00:00:00 UTC 1970
Created By: Spark 2.4.4
Type: VIEW
View Text: select * from my_db.seed
View Default Database: default
View Query Output Columns: [id, first_name, last_name, email, gender, ip_address]
Table Properties: [transient_lastDdlTime=1580263126, view.query.out.col.3=email, view.query.out.col.0=id, view.query.out.numCols=6, view.query.out.col.4=gender, view.default.database=default, view.query.out.col.1=first_name, view.query.out.col.5=ip_address, view.query.out.col.2=last_name]
Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Storage Properties: [serialization.format=1]
Schema: root
 |-- id: long (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- ip_address: string (nullable = true)

As @aaronsteers helpfully found, we could use this regex string to parse out all the info we need from the result above:

\|-- (.*): (.*) \(nullable = (.*)\b

This will require changes to _get_columns_for_catalog, and possibly to parse_describe_extended as well.

Alternatives

  • Open a PR against Apache Spark to add an information schema :)

Who will this benefit?

Faster docs generation for everybody!

Better document implications of incremental models

This plugin uses insert overwrite... to implement incremental models. We'll need to document the implications of this approach (unique_key is not supported, full partitions are overwritten) as well as any impact to the model SQL that needs to be written. It appears that the ordering of partitioning columns is significant -- can we automate this in the materialization code?

How to connect to a local spark install

Is there any way to connect using a locally installed spark instance, rather than to a remote service via http/thrift?

The code I'm trying to migrate uses the following imports to run SQL-based transforms locally using spark/hive already on the container:

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.config(conf=conf)
    .master("local")
    .appName("My Spark App")
    .enableHiveSupport()
    .getOrCreate()
)
spark.sparkContext.setLogLevel(SPARK_LOG_LEVEL)
sc = spark.sparkContext

# ...

df = spark.sql(f"CREATE TABLE AS SELECT * FROM {my_source_table}")

And if not supported currently, is there any chance we could build this and/or add the feature? For CI/CD pipelines especially, it seems we would want to be able to run dbt pipelines even without access to an external cluster.

Error using spark adapter in thrift mode

Moving #20 (comment) here:

I tried to use this spark with this pull request but I get the following error:

2019-05-30 18:17:30,493 (MainThread): Encountered an error:
2019-05-30 18:17:30,493 (MainThread): not enough values to unpack (expected 3, got 1)
2019-05-30 18:17:30,532 (MainThread): Traceback (most recent call last):
  File "/home/paul/.local/lib/python3.6/site-packages/dbt/main.py", line 79, in main
    results, succeeded = handle_and_check(args)
  File "/home/paul/.local/lib/python3.6/site-packages/dbt/main.py", line 153, in handle_and_check
    task, res = run_from_args(parsed)
  File "/home/paul/.local/lib/python3.6/site-packages/dbt/main.py", line 209, in run_from_args
    results = run_from_task(task, cfg, parsed)
  File "/home/paul/.local/lib/python3.6/site-packages/dbt/main.py", line 217, in run_from_task
    result = task.run()
  File "/home/paul/.local/lib/python3.6/site-packages/dbt/task/runnable.py", line 256, in run
    self.before_run(adapter, selected_uids)
  File "/home/paul/.local/lib/python3.6/site-packages/dbt/task/run.py", line 85, in before_run
    self.populate_adapter_cache(adapter)
  File "/home/paul/.local/lib/python3.6/site-packages/dbt/task/run.py", line 23, in populate_adapter_cache
    adapter.set_relations_cache(self.manifest)
  File "/home/paul/.local/lib/python3.6/site-packages/dbt/adapters/base/impl.py", line 331, in set_relations_cache
    self._relations_cache_for_schemas(manifest)
  File "/home/paul/.local/lib/python3.6/site-packages/dbt/adapters/base/impl.py", line 313, in _relations_cache_for_schemas
    for relation in self.list_relations_without_caching(db, schema):
  File "/home/paul/dbt-spark/dbt/adapters/spark/impl.py", line 75, in list_relations_without_caching
    for _database, name, _ in results:
ValueError: not enough values to unpack (expected 3, got 1)

If i add a print(results[0]) right above that line, it seems like results has a single entry instead of 3:
<agate.Row: ('mytable')>

I couldn't get spark connecting in http mode (i.e. without this pull request) so I'm not sure if the issue is with this pull request or something more general.

This is connecting to an EMR 5.20.0 cluster, and thrift was started with sudo /usr/lib/spark/sbin/start-thriftserver.sh --master yarn-client.

The amount of metadata returned from EMR/Glue renders dbt-spark not working/hanging on the query

  • dbt version 0.16.1
  • Spark version 2.4.4

Hey folks,

I have encountered a problem where creating models/relations using dbt and EMR/Glue is limited to the point where the amount of metadata stored in Glue about the relations makes the dbt run/query hanging.

First, I noticed that after creating 6 relations in my target schema every subsequent run of dbt results in dbt hanging, not advancing.

Looking at the Thrift logs in the master node, I noticed that the problem was the following query:

SparkExecuteStatementOperation: Running query '/* {"app": "dbt", "dbt_version": "0.16.1", "profile_name": "emr_datalake", "target_name": "dev", "connection_name": "list__dbt_spark"} */
show table extended in dbt_spark like '*'
 ' with 6adf8996-814e-4a8c-93a1-1caa85c13340

Looking at the dbt.log file, I noticed that the process stops at the step TGetResultSetMetadataResp:

2020-07-21 19:16:57.017962 (ThreadPoolExecutor-1_0): Acquiring new spark connection "list__dbt_spark".
2020-07-21 19:16:57.018235 (ThreadPoolExecutor-1_0): Re-using an available connection from the pool (formerly list_).
2020-07-21 19:16:57.019701 (ThreadPoolExecutor-1_0): NotImplemented: add_begin_query
2020-07-21 19:16:57.019795 (ThreadPoolExecutor-1_0): Using spark connection "list__dbt_spark".
2020-07-21 19:16:57.019868 (ThreadPoolExecutor-1_0): On list__dbt_spark: /* {"app": "dbt", "dbt_version": "0.16.1", "profile_name": "emr_datalake", "target_name": "dev", "connection_name": "list__dbt_spark"} */
show table extended in dbt_spark like '*'
  
2020-07-21 19:16:57.020019 (ThreadPoolExecutor-1_0): /* {"app": "dbt", "dbt_version": "0.16.1", "profile_name": "emr_datalake", "target_name": "dev", "connection_name": "list__dbt_spark"} */
show table extended in dbt_spark like '*'
  
2020-07-21 19:16:57.020118 (ThreadPoolExecutor-1_0): TExecuteStatementReq(sessionHandle=TSessionHandle(sessionId=THandleIdentifier(guid=b'iF\x80\x1fj\xcfB\xcf\x86o\xd4\xa4,\xa1\xdd/', secret=b'vR$j\x9cbGW\x8c\x07\x91\x84\xb0\xe8\xb6\x80')), statement='/* {"app": "dbt", "dbt_version": "0.16.1", "profile_name": "emr_datalake", "target_name": "dev", "connection_name": "list__dbt_spark"} */\nshow table extended in dbt_spark like \'*\'\n  ', confOverlay=None, runAsync=True, queryTimeout=0)
2020-07-21 19:16:57.164044 (ThreadPoolExecutor-1_0): TExecuteStatementResp(status=TStatus(statusCode=0, infoMessages=None, sqlState=None, errorCode=None, errorMessage=None), operationHandle=TOperationHandle(operationId=THandleIdentifier(guid=b'\xdb\xad4\x04\x91\xfdMQ\xac\xd3\tN&\xd8DC', secret=b't\x80\xfa<\x1e\xf2@\x83\xae]\t\xd69\x1bq\xea'), operationType=0, hasResultSet=True, modifiedRowCount=None))
2020-07-21 19:16:59.199866 (ThreadPoolExecutor-1_0): TGetOperationStatusResp(status=TStatus(statusCode=0, infoMessages=None, sqlState=None, errorCode=None, errorMessage=None), operationState=2, sqlState=None, errorCode=None, errorMessage=None, taskStatus=None, operationStarted=None, operationCompleted=None, hasResultSet=None, progressUpdateResponse=None)
2020-07-21 19:16:59.200094 (ThreadPoolExecutor-1_0): Poll status: 2, query complete
2020-07-21 19:16:59.200222 (ThreadPoolExecutor-1_0): SQL status: OK in 2.18 seconds
2020-07-21 19:16:59.778318 (ThreadPoolExecutor-1_0): TGetResultSetMetadataResp(status=TStatus(statusCode=0, infoMessages=None, sqlState=None, errorCode=None, errorMessage=None), schema=TTableSchema(columns=[TColumnDesc(columnName='database', typeDesc=TTypeDesc(types=[TTypeEntry(primitiveEntry=TPrimitiveTypeEntry(type=7, typeQualifiers=None), arrayEntry=None, mapEntry=None, structEntry=None, unionEntry=None, userDefinedTypeEntry=None)]), position=1, comment=''), TColumnDesc(columnName='tableName', typeDesc=TTypeDesc(types=[TTypeEntry(primitiveEntry=TPrimitiveTypeEntry(type=7, typeQualifiers=None), arrayEntry=None, mapEntry=None, structEntry=None, unionEntry=None, userDefinedTypeEntry=None)]), position=2, comment=''), TColumnDesc(columnName='isTemporary', typeDesc=TTypeDesc(types=[TTypeEntry(primitiveEntry=TPrimitiveTypeEntry(type=0, typeQualifiers=None), arrayEntry=None, mapEntry=None, structEntry=None, unionEntry=None, userDefinedTypeEntry=None)]), position=3, comment=''), TColumnDesc(columnName='information', typeDesc=TTypeDesc(types=[TTypeEntry(primitiveEntry=TPrimitiveTypeEntry(type=7, typeQualifiers=None), arrayEntry=None, mapEntry=None, structEntry=None, unionEntry=None, userDefinedTypeEntry=None)]), position=4, comment='')]))

Since I am not that familiar with the steps that I should be expecting, I removed 6 previously created relations and created 7 dummy relations, in order to reject the "6 maximum number of relations" hypothesis.

  WITH dummy     AS (
      SELECT 1 AS col1,
             1 AS col2,
             1 AS col3,
             1 AS col4,
             1 AS col5,
             1 AS col6,
             1 AS col7,
             1 AS col8,
             1 AS col9,
             1 AS col10,
             1 AS col11,
             1 AS col12
      )
SELECT *
  FROM dummy

I created 7 of these copies:

Running with dbt=0.16.1
Found 37 models, 0 tests, 0 snapshots, 0 analyses, 143 macros, 0 operations, 0 seed files, 17 sources

21:57:25 | Concurrency: 1 threads (target='dev')
21:57:25 | 
21:57:25 | 1 of 7 START view model .dbt_spark.src_dummy_1................ [RUN]
21:57:27 | 1 of 7 OK created view model .dbt_spark.src_dummy_1........... [OK in 2.34s]
21:57:27 | 2 of 7 START view model .dbt_spark.src_dummy_2................ [RUN]
21:57:29 | 2 of 7 OK created view model .dbt_spark.src_dummy_2........... [OK in 2.20s]
21:57:29 | 3 of 7 START view model .dbt_spark.src_dummy_3................ [RUN]
21:57:31 | 3 of 7 OK created view model .dbt_spark.src_dummy_3........... [OK in 2.16s]
21:57:31 | 4 of 7 START view model .dbt_spark.src_dummy_4................ [RUN]
21:57:34 | 4 of 7 OK created view model .dbt_spark.src_dummy_4........... [OK in 2.14s]
21:57:34 | 5 of 7 START view model .dbt_spark.src_dummy_5................ [RUN]
21:57:36 | 5 of 7 OK created view model .dbt_spark.src_dummy_5........... [OK in 2.05s]
21:57:36 | 6 of 7 START view model .dbt_spark.src_dummy_6................ [RUN]
21:57:38 | 6 of 7 OK created view model .dbt_spark.src_dummy_6........... [OK in 2.25s]
21:57:38 | 7 of 7 START view model .dbt_spark.src_dummy_7................ [RUN]
21:57:40 | 7 of 7 OK created view model .dbt_spark.src_dummy_7........... [OK in 2.17s]
21:57:42 | 
21:57:42 | Finished running 7 view models in 22.47s.

Checking the Thrift logs, I noticed a step that should have been the next one had the query resumed and not hanged.

2020-07-21 19:20:50.359768 (ThreadPoolExecutor-1_0): TFetchResultsResp

This call/return is a big one since it returns the full metadata about the relations in your target schema, essentially what you are asking for through show table extended in <schema_name> like '*'

In order to confirm it is the size of the metadata payload returned from Glue, I add more columns and text to a model.

  WITH dummy     AS (
      SELECT 1 AS col1,
             1 AS col2,
             1 AS col3,
             1 AS col4,
             1 AS col5,
             1 AS col6,
             1 AS col7,
             1 AS col8,
             1 AS col9,
             1 AS col10,
             1 AS col11,
             1 AS col12,
             1 AS col13,
             1 AS col14,
             1 AS col15,
             1 AS col16,
             1 AS col17,
             1 AS col18,
             1 AS col19,
             'TEST123TEST123TEST123TEST123TEST123TEST123' AS col20,
             'TEST123TEST123TEST123TEST123TEST123TEST123' AS col21,
             'TEST123TEST123TEST123TEST123TEST123TEST123' AS col22,
             'TEST123TEST123TEST123TEST123TEST123TEST123' AS col23,
             'TEST123TEST123TEST123TEST123TEST123TEST123' AS col24,
             'TEST123TEST123TEST123TEST123TEST123TEST123' AS col25,
             'TEST123TEST123TEST123TEST123TEST123TEST123' AS col26,
             'TEST123TEST123TEST123TEST123TEST123TEST123' AS col27,
             'TEST123TEST123TEST123TEST123TEST123TEST123' AS col28,
             'TEST123TEST123TEST123TEST123TEST123TEST123' AS col29,
             'TEST123TEST123TEST123TEST123TEST123TEST123' AS col30,
             'TEST123TEST123TEST123TEST123TEST123TEST123' AS col31,
             'TEST123TEST123TEST123TEST123TEST123TEST123' AS col32,
             'TEST123TEST123TEST123TEST123TEST123TEST123' AS col33
      )

SELECT *
  FROM dummy

This actually executes, since the metadata about the new relations is yet to be returned (next dbt run).

Running the same set of models results in hanging because the metadata size of the new models is too big (pointing that there is a certain cap to the amount of data that can be returned/parsed).

Running with dbt=0.16.1
Found 37 models, 0 tests, 0 snapshots, 0 analyses, 143 macros, 0 operations, 0 seed files, 17 sources

22:03:56 | Concurrency: 1 threads (target='dev')
22:03:56 | 
22:03:56 | 1 of 7 START view model .dbt_spark.src_dummy_1................ [RUN]

With the increase, I get the hanging on the dbt run until I kill it manually.

Support kerberos

pyhive supports kerberos, but this plugin does not allow additional (or alternative) parameters to be passed to pyhive for such authentication.

Isolate `sasl` as a dependency

Right now, pip install dbt-spark is not working on Windows because:

  • dbt-spark requires PyHive[hive]
  • hive extra requires sasl
  • pip install sasl is failing on Windows machines

On my personal machine:

Python 3.8.5
pip 20.2.3

Here's one of the stacktraces I've seen, though they vary by user/machine:

Building wheels for collected packages: sasl
  Building wheel for sasl (setup.py) ... error
  ERROR: Command errored out with exit status 1:
...
Complete output (27 lines):
  running bdist_wheel
  running build
  running build_py
  creating build
  creating build\lib.win-amd64-3.7
  creating build\lib.win-amd64-3.7\sasl
  copying sasl\__init__.py -> build\lib.win-amd64-3.7\sasl
  running egg_info
  writing sasl.egg-info\PKG-INFO
  writing dependency_links to sasl.egg-info\dependency_links.txt
  writing requirements to sasl.egg-info\requires.txt
  writing top-level names to sasl.egg-info\top_level.txt
  reading manifest file 'sasl.egg-info\SOURCES.txt'
  reading manifest template 'MANIFEST.in'
  writing manifest file 'sasl.egg-info\SOURCES.txt'
  copying sasl\saslwrapper.cpp -> build\lib.win-amd64-3.7\sasl
  copying sasl\saslwrapper.h -> build\lib.win-amd64-3.7\sasl
  copying sasl\saslwrapper.pyx -> build\lib.win-amd64-3.7\sasl
  running build_ext
  building 'sasl.saslwrapper' extension
  creating build\temp.win-amd64-3.7
  creating build\temp.win-amd64-3.7\Release
  creating build\temp.win-amd64-3.7\Release\sasl
  C:\Program Files (x86)\Microsoft Visual Studio 14.0\VC\BIN\x86_amd64\cl.exe /c /nologo /Ox /W3 /GL /DNDEBUG /MD -Isasl -Ic:\users\User\appdata\local\programs\python\python37\include -Ic:\users\User\appdata\local\programs\python\python37\include "-IC:\Program Files (x86)\Microsoft Visual Studio 14.0\VC\INCLUDE" "-IC:\Program Files (x86)\Windows Kits\10\include\wdf\ucrt" "-IC:\Program Files (x86)\Windows Kits\10\include\wdf\shared" "-IC:\Program Files (x86)\Windows Kits\10\include\wdf\um" "-IC:\Program Files (x86)\Windows Kits\10\include\wdf\winrt" /EHsc /Tpsasl/saslwrapper.cpp /Fobuild\temp.win-amd64-3.7\Release\sasl/saslwrapper.obj 
  saslwrapper.cpp
  c:\users\User\appdata\local\programs\python\python37\include\pyconfig.h(59): fatal error C1083: Cannot open include file: 'io.h': No such file or directory   
  error: command 'C:\\Program Files (x86)\\Microsoft Visual Studio 14.0\\VC\\BIN\\x86_amd64\\cl.exe' failed with exit status 2
----------------------------------------
ERROR: Failed building wheel for sasl
  Running setup.py clean for sasl
Failed to build sasl
Installing collected packages: pure-sasl, thrift-sasl, sasl, PyHive, dbt-spark
    Running setup.py install for sasl ... error

Seeds error: unexpected column list in insert statement

I'm getting the error below when attempting to load a seeds file. (I'm new so this is my first time trying.)

Runtime Error in seed test_seed (data/test_seed.csv)
  Database Error
    org.apache.spark.sql.catalyst.parser.ParseException:
    mismatched input 'my_col_a' expecting {'(', 'SELECT', 'FROM', 'VALUES', 'TABLE', 'INSERT', 'MAP', 'REDUCE'}(line 2, pos 43)

    == SQL ==

                insert into default.test_seed (my_col_a,  my_col_b) values
    -------------------------------------------^^^
                ('hi',' "Hi"'),('bye',' "Bye"')

On comparing this with the online docs checking the documentation, it looks like the column list is disallowed.

The query dtb-spark is trying to run is:

insert into default.test_seed (my_col_a,  my_col_b) values ('hi',' "Hi"'),('bye',' "Bye"')

And the corrected query would be:

insert into default.test_seed values ('hi',' "Hi"'),('bye',' "Bye"')

(Confirmed above via testing in Spark 2.4)

Or to retain the self-documenting aspect (given that columns apparently can't be declared):

insert into default.test_seed /*(my_col_a,  my_col_b)*/ values ('hi',' "Hi"'),('bye',' "Bye"')

Happy to send a PR if you know where in the code I should start. I'm new to DBT but happy to contribute if I can.

Support persist_docs for column descriptions

dbt 0.17.0 adds support for persist_docs = {'columns': true} on all core adapters.

In Apache Spark, column-level comments are only supported during table creation with a full column specification. I think that adding column comments as an alter table command may only be possible for Delta tables.

Support schema/database creation

ERROR: Schema/Database creation is not supported in the Spark adapter. Please create the database "dbt" manually

Hey, that's something I hit while developing a docker environment to test dbt-spark, trying to do something simple like:

pre-hook: "create database if not exists dbt"

I'm not sure what the technical blockers are to add support for it in dbt-spark, but I wanted to open that issue to discuss/track progress

Incremental insert overwrite requires identical column ordering

Column order matters

In Spark, tables store their partition columns last. In the scenario featured in our integration test, given a seed file seed

id,first_name,last_name,email,gender,ip_address
1,Jack,Hunter,[email protected],Male,59.80.20.168
2,Kathryn,Walker,[email protected],Female,194.121.179.35
3,Gerald,Ryan,[email protected],Male,11.3.212.243
4,Bonnie,Spencer,[email protected],Female,216.32.196.175
5,Harold,Taylor,[email protected],Male,253.10.246.136

And an incremental model

{{
          config(
              materialized='incremental,
              partition_by='id',
              file_format='parquet'
          )
      }}
      select * from {{ ref('seed') }}

The resulting table will look like

first_name last_name email gender ip_address id
Jack Hunter [email protected] Male 59.80.20.168 1
Kathryn Walker [email protected] Female 194.121.179.35 2
Gerald Ryan [email protected] Male 11.3.212.243 3
Bonnie Spencer [email protected] Female 216.32.196.175 4
Harold Taylor [email protected] Male 253.10.246.136 5

In subsequent incremental runs, dbt would attempt to run two queries

create temporary view incremental_relation__dbt_tmp as
    
      select * from dbt_jcohen.seed;

insert overwrite table dbt_jcohen.incremental_relation
       partition (id)
       select * from incremental_relation__dbt_tmp

Since the columns in seed are in different order from the columns in incremental_relation (partitioned on id), the result would be

first_name last_name email gender ip_address id
Kathryn Walker [email protected] Female 194.121.179.35 2
Harold Taylor [email protected] Male 253.10.246.136 5
Bonnie Spencer [email protected] Female 216.32.196.175 4
Jack Hunter [email protected] Male 59.80.20.168 1
Gerald Ryan [email protected] Male 11.3.212.243 3
3 Gerald Ryan [email protected] Male
4 Bonnie Spencer [email protected] Female
5 Harold Taylor [email protected] Male
1 Jack Hunter [email protected] Male
2 Kathryn Walker [email protected] Female

Why hasn't the integration test been failing?

The equality test between seed and incremental_relation has been passing because we didn't have the right quoting character defined for Spark. " is the default quoting character in dbt-core; in Spark, " encloses a string literal, not a column name.

Therefore, a query like

-- setup



with a as (

    select * from dbt_jcohen.incremental_relation

),

b as (

    select * from dbt_jcohen.seed

),

a_minus_b as (

    select "first_name", "last_name", "email", "gender", "ip_address", "id", "# Partition Information", "# col_name", "id" from a
    
  

    except



    select "first_name", "last_name", "email", "gender", "ip_address", "id", "# Partition Information", "# col_name", "id" from b

),

b_minus_a as (

    select "first_name", "last_name", "email", "gender", "ip_address", "id", "# Partition Information", "# col_name", "id" from b
    
  

    except



    select "first_name", "last_name", "email", "gender", "ip_address", "id", "# Partition Information", "# col_name", "id" from a

),

unioned as (

    select * from a_minus_b
    union all
    select * from b_minus_a

),

final as (

    select (select count(*) from unioned) +
        (select abs(
            (select count(*) from a_minus_b) -
            (select count(*) from b_minus_a)
            ))
        as count

)

select count from final

Looks okay prima facie. There's some metadata/comment column names included, which is weirdly not erroring. I thought to run just the snippet

select "first_name", "last_name", "email", "gender", "ip_address", "id", "# Partition Information", "# col_name", "id" from a

Which returns

first_name last_name email gender ip_address id # Partition Information # col_name id
first_name last_name email gender ip_address id # Partition Information # col_name id
first_name last_name email gender ip_address id # Partition Information # col_name id
first_name last_name email gender ip_address id # Partition Information # col_name id
first_name last_name email gender ip_address id # Partition Information # col_name id
first_name last_name email gender ip_address id # Partition Information # col_name id
first_name last_name email gender ip_address id # Partition Information # col_name id
first_name last_name email gender ip_address id # Partition Information # col_name id
first_name last_name email gender ip_address id # Partition Information # col_name id
first_name last_name email gender ip_address id # Partition Information # col_name id
first_name last_name email gender ip_address id # Partition Information # col_name id

Yeah.

Solutions

  • Grab the dest table columns as an ordered comma-separated list, just like how dbt-core does it in the default (Redshift/Postgres) implementation
  • Override the Spark quoting character to be ` instead of " (handled in #39)

spark__create_schema defined twice

[2020-03-24 09:13:47,559] {dbt_operator.py:168} INFO - Running with dbt=0.16.0
[2020-03-24 09:13:47,559] {dbt_operator.py:168} INFO - Encountered an error:
[2020-03-24 09:13:47,559] {dbt_operator.py:168} INFO - Compilation Error
[2020-03-24 09:13:47,559] {dbt_operator.py:168} INFO -   dbt found two macros named "spark__create_schema" in the project "dbt_spark".
[2020-03-24 09:13:47,559] {dbt_operator.py:168} INFO -    To fix this error, rename or remove one of the following macros:
[2020-03-24 09:13:47,559] {dbt_operator.py:168} INFO -       - macros/adapters.sql
[2020-03-24 09:13:47,559] {dbt_operator.py:168} INFO -       - macros/adapters.sql

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.