Giter Club home page Giter Club logo

marshmallow-pyspark's Introduction

marshmallow-pyspark

Build Status codecov.io Apache 2.0 licensed

Marshmallow is a popular package used for data serialization and validation. One defines data schemas in marshmallow containing rules on how input data should be marshalled. Similar to marshmallow, pyspark also comes with its own schema definitions used to process data frames. This package enables users to utilize marshmallow schemas and its powerful data validation capabilities in pyspark applications. Such capabilities can be utilized in data-pipeline ETL jobs where data consistency and quality is of importance.

Install

The package can be install using pip:

$ pip install marshmallow-pyspark

Usage

Data schemas can can define the same way as you would using marshmallow. A quick example is shown below:

from marshmallow_pyspark import Schema
from marshmallow import fields

# Create data schema.
class AlbumSchema(Schema):
    title = fields.Str()
    release_date = fields.Date()

# Input data frame to validate.
df = spark.createDataFrame([
    {"title": "valid_1", "release_date": "2020-1-10"},
    {"title": "valid_2", "release_date": "2020-1-11"},
    {"title": "invalid_1", "release_date": "2020-31-11"},
    {"title": "invalid_2", "release_date": "2020-1-51"},
])

# Get data frames with valid rows and error prone rows 
# from input data frame by validating using the schema.
valid_df, errors_df = AlbumSchema().validate_df(df)

# Output of valid data frame
valid_df.show()
#    +-------+------------+
#    |  title|release_date|
#    +-------+------------+
#    |valid_1|  2020-01-10|
#    |valid_2|  2020-01-11|
#    +-------+------------+

# Output of errors data frame
errors_df.show()
#    +--------------------+
#    |             _errors|
#    +--------------------+
#    |{"row": {"release...|
#    |{"row": {"release...|
#    +--------------------+

More Options

On top of marshmallow supported options, the Schema class comes with two additional initialization arguments:

  • error_column_name: name of the column to store validation errors. Default value is _errors.

  • split_errors: split rows with validation errors as a separate data frame from valid rows. When set to False the rows with errors are returned together with valid rows as a single data frame. The field values of all error rows are set to null. For user convenience the original field values can be found in the row attribute of the error JSON. Default value is True.

An example is shown below:

from marshmallow import EXCLUDE

schema = AlbumSchema(
    error_column_name="custom_errors",     # Use 'custom_errors' as name for errors column
    split_errors=False,                     # Don't split the input data frame into valid and errors
    unkown=EXCLUDE                          # Marshmallow option to exclude fields not present in schema
)

# Input data frame to validate.
df = spark.createDataFrame([
    {"title": "valid_1", "release_date": "2020-1-10", "garbage": "wdacfa"},
    {"title": "valid_2", "release_date": "2020-1-11", "garbage": "5wacfa"},
    {"title": "invalid_1", "release_date": "2020-31-11", "garbage": "3aqf"},
    {"title": "invalid_2", "release_date": "2020-1-51", "garbage": "vda"},
])

valid_df, errors_df = schema.validate_df(df)

# Output of valid data frame. Contains rows with errors as
# the option 'split_errors' was set to False.
valid_df.show()
#    +-------+------------+--------------------+
#    |  title|release_date|             _errors|
#    +-------+------------+--------------------+
#    |valid_1|  2020-01-10|                    |
#    |valid_2|  2020-01-11|                    |
#    |       |            |{"row": {"release...|
#    |       |            |{"row": {"release...|
#    +-------+------------+--------------------+

# The errors data frame will be set to None
assert errors_df is None        # True

Lastly, on top of passing marshmallow specific options in the schema, you can also pass them in the validate_df method. These are options are passed to the marshmallow's load method:

schema = AlbumSchema(
    error_column_name="custom_errors",     # Use 'custom_errors' as name for errors column
    split_errors=False,                     # Don't split the input data frame into valid and errors
)

valid_df, errors_df = schema.validate_df(df, unkown=EXCLUDE)

Duplicates

Marshmallow-pyspark comes with the ability to validate one or more schema fields for duplicate values. This is achieved by adding the field names to the UNIQUE attribute of the schema as shown:

class AlbumSchema(Schema):
    # Unique valued field "title" in the schema
    UNIQUE = ["title"]

    title = fields.Str()
    release_date = fields.Date()

# Input data frame to validate.
df = spark.createDataFrame([
        {"title": "title_1", "release_date": "2020-1-10"},
        {"title": "title_2", "release_date": "2020-1-11"},
        {"title": "title_2", "release_date": "2020-3-11"},  # duplicate title
        {"title": "title_3", "release_date": "2020-1-51"},
    ])

# Validate data frame
valid_df, errors_df = AlbumSchema().validate_df(df)
    
# List of valid rows
valid_rows = [row.asDict(recursive=True) for row in valid_df.collect()]
#
#   [
#        {'title': 'title_1', 'release_date': datetime.date(2020, 1, 10)},
#        {'title': 'title_2', 'release_date': datetime.date(2020, 1, 11)}
#   ]
#

# Rows with errors
error_rows = [row.asDict(recursive=True) for row in errors_df.collect()]
# 
#   [
#        {'_errors': '{"row": {"release_date": "2020-3-11", "title": "title_2", "__count__title": 2}, '
#                    '"errors": ["duplicate row"]}'},
#        {'_errors': '{"row": {"release_date": "2020-1-51", "title": "title_3", "__count__title": 1}, '
#                    '"errors": {"release_date": ["Not a valid date."]}}'}
#    ]
#

The technique to drop duplicates but keep first is discussed in this link. In case there are multiple unique fields in the schema just add them to the UNIQUE, e.g. UNIQUE=["title", "release_date"]. You can even specify uniqueness for combination of fields by grouping them in a list:

class AlbumSchema(Schema):
    # Combined values of "title" and "release_date" should be unique
    UNIQUE = [["title", "release_date"]]

    title = fields.Str()
    release_date = fields.Date()

# Input data frame to validate.
df = spark.createDataFrame([
        {"title": "title_1", "release_date": "2020-1-10"},
        {"title": "title_2", "release_date": "2020-1-11"},
        {"title": "title_2", "release_date": "2020-3-11"},
        {"title": "title_3", "release_date": "2020-1-21"},
        {"title": "title_3", "release_date": "2020-1-21"},
        {"title": "title_4", "release_date": "2020-1-51"},
    ])

# Validate data frame
valid_df, errors_df = AlbumSchema().validate_df(df)
    
# List of valid rows
valid_rows = [row.asDict(recursive=True) for row in valid_df.collect()]
#
#   [
#        {'title': 'title_1', 'release_date': datetime.date(2020, 1, 10)},
#        {'title': 'title_2', 'release_date': datetime.date(2020, 1, 11)},
#        {'title': 'title_3', 'release_date': datetime.date(2020, 1, 21)}
#   ]
#

# Rows with errors
error_rows = [row.asDict(recursive=True) for row in errors_df.collect()]
# 
#   [
#        {'_errors': '{"row": {"release_date": "2020-1-21", "title": "title_3", '
#                    '"__count__title": 2, "__count__release_date": 2}, '
#                    '"errors": ["duplicate row"]}'},
#        {'_errors': '{"row": {"release_date": "2020-1-51", "title": "title_4", '
#                    '"__count__title": 1, "__count__release_date": 1}, '
#                    '"errors": {"release_date": ["Not a valid date."]}}'},
#        {'_errors': '{"row": {"release_date": "2020-3-11", "title": "title_2", '
#                    '"__count__title": 2, "__count__release_date": 1}, '
#                    '"errors": ["duplicate row"]}'}
#    ]
#

WARNING: Duplicate check requires data shuffle per unique field. Having large number of unique fields will effect spark job performance. By default UNIQUE is set to an empty list preventing any duplicate checks.

Fields

Marshmallow comes with a variety of different fields that can be used to define schemas. Internally marshmallow-pyspark convert these fields into pyspark SQL data types. The following table lists the supported marshmallow fields and their equivalent spark SQL data types:

Marshmallow PySpark
Raw user specified
String StringType
DateTime TimestampType
Date DateType
Boolean BooleanType
Integer IntegerType
Float FloatType
Number DoubleType
List ArrayType
Dict MapType
Nested StructType

By default the StringType data type is used for marshmallow fields not in the above table. The spark_schema property of your defined schema can be used to check the converted spark SQL schema:

# Gets the spark schema for the Album schema
AlbumSchema().spark_schema
# StructType(List(StructField(title,StringType,true),StructField(release_date,DateType,true),StructField(_errors,StringType,true)))

Custom Fields

Marshmallow_pyspark comes with support for an additional Raw field. The Raw field does not perform any formatting and requires the user to specify the spark data type associated with the field. See the following example:

from marshmallow_pyspark import Schema
from marshmallow_pyspark.fields import Raw
from marshmallow import fields
from pyspark.sql.types import DateType
from datetime import date


class AlbumSchema(Schema):
    title = fields.Str()
    # Takes python datetime.date objects and treats them as pyspark DateType
    release_date = Raw(spark_type=DateType())

# Input data frame to validate.
df = spark.createDataFrame([
        {"title": "title_1", "release_date": date(2020, 1, 10)},
        {"title": "title_2", "release_date": date(2020, 1, 11)},
        {"title": "title_3", "release_date": date(2020, 3, 10)},
    ])

# Validate data frame
valid_df, errors_df = AlbumSchema().validate_df(df)
    
# List of valid rows
valid_rows = [row.asDict(recursive=True) for row in valid_df.collect()]
#
#   [
#        {'title': 'title_1', 'release_date': datetime.date(2020, 1, 10)},
#        {'title': 'title_2', 'release_date': datetime.date(2020, 1, 11)},
#        {'title': 'title_3', 'release_date': datetime.date(2020, 3, 10)}
#   ]
#

# Rows with errors
error_rows = [row.asDict(recursive=True) for row in errors_df.collect()]
# 
#   []
#

It is also possible to add support for custom marshmallow fields, or those missing in the above table. In order to do so, you would need to create a converter for the custom field. The converter can be built using the ConverterABC interface:

from marshmallow_pyspark import ConverterABC
from pyspark.sql.types import StringType


class EmailConverter(ConverterABC):
    """
        Converter to convert marshmallow's Email field to a pyspark 
        SQL data type.
    """

    def convert(self, ma_field):
        return StringType()

The ma_field argument in the convert method is provided to handle nested fields. For an example you can checkout NestedConverter. Now the final step would be to add the converter to the CONVERTER_MAP attribute of your schema:

from marshmallow_pyspark import Schema
from marshmallow import fields


class User(Schema):
    name = fields.String(required=True)
    email = fields.Email(required=True)

# Adding email converter to schema.
User.CONVERTER_MAP[fields.Email] = EmailConverter

# You can now use your schema to validate the input data frame.
valid_df, errors_df = User().validate_df(input_df)

Development

To hack marshmallow-pyspark locally run:

$ pip install -e .[dev]			# to install all dependencies
$ pytest --cov-config .coveragerc --cov=./			# to get coverage report
$ pylint marshmallow_pyspark			# to check code quality with PyLint

Optionally you can use make to perform development tasks.

License

The source code is licensed under Apache License Version 2.

Contributions

Pull requests always welcomed! :)

marshmallow-pyspark's People

Contributors

ashwin153 avatar hannes-buseyne-dpg avatar ketgo avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar

marshmallow-pyspark's Issues

Nested "many=True" field throws an exception

Hello. Given the following schemas with nested "list" field

# Create data schema.
class SingerSchema(Schema):
    name = fields.Str(required=True, allow_None=False)
    alias = fields.Str(required=False)

class AlbumSchema(Schema):
    title = fields.Str()
    release_date = fields.Date()
    singers = fields.Nested(SingerSchema, many=True, unknown=EXCLUDE)

df = spark.createDataFrame([
    {"title": "Song Title", "release_date": "2020-1-10", "singers": [{"name":  "Phil Collins", "alias": "P C"}]},
])

df.show(1, False)
# Output
# +------------+--------------------------------------+----------+
# |release_date|singers                               |title     |
# +------------+--------------------------------------+----------+
# |2020-1-10   |[{name -> Phil Collins, alias -> P C}]|Song Title|
# +------------+--------------------------------------+----------+

When I try to validate like this

valid_df, errors_df = AlbumSchema().validate_df(df)

errors_df.show(10, False)

It throws this Pyspark exception

Stack trace
Py4JJavaError: An error occurred while calling o2874.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 26 in stage 331.0 failed 4 times, most recent failure: Lost task 26.3 in stage 331.0 (TID 28610) (172.36.18.59 executor 15): java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 2 fields are required while 1 values are provided.
	at org.apache.spark.sql.execution.python.EvaluatePython$$anonfun$$nestedInanonfun$makeFromJava$16$1.applyOrElse(EvaluatePython.scala:186)
	at org.apache.spark.sql.execution.python.EvaluatePython$.nullSafeConvert(EvaluatePython.scala:211)
	at org.apache.spark.sql.execution.python.EvaluatePython$.$anonfun$makeFromJava$16(EvaluatePython.scala:180)
	at org.apache.spark.sql.execution.python.EvaluatePython$$anonfun$$nestedInanonfun$makeFromJava$16$1.applyOrElse(EvaluatePython.scala:193)
	at org.apache.spark.sql.execution.python.EvaluatePython$.nullSafeConvert(EvaluatePython.scala:211)
	at org.apache.spark.sql.execution.python.EvaluatePython$.$anonfun$makeFromJava$16(EvaluatePython.scala:180)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.$anonfun$evaluate$7(BatchEvalPythonExec.scala:88)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:455)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:455)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
	at org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer$$anon$1.hasNext(InMemoryRelation.scala:132)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1423)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2465)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2414)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2413)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2413)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2679)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2621)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2610)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:914)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2259)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2278)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:477)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:430)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3733)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2762)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3724)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
	at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3722)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2762)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2969)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:302)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:339)
	at sun.reflect.GeneratedMethodAccessor147.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 2 fields are required while 1 values are provided.
	at org.apache.spark.sql.execution.python.EvaluatePython$$anonfun$$nestedInanonfun$makeFromJava$16$1.applyOrElse(EvaluatePython.scala:186)
	at org.apache.spark.sql.execution.python.EvaluatePython$.nullSafeConvert(EvaluatePython.scala:211)
	at org.apache.spark.sql.execution.python.EvaluatePython$.$anonfun$makeFromJava$16(EvaluatePython.scala:180)
	at org.apache.spark.sql.execution.python.EvaluatePython$$anonfun$$nestedInanonfun$makeFromJava$16$1.applyOrElse(EvaluatePython.scala:193)
	at org.apache.spark.sql.execution.python.EvaluatePython$.nullSafeConvert(EvaluatePython.scala:211)
	at org.apache.spark.sql.execution.python.EvaluatePython$.$anonfun$makeFromJava$16(EvaluatePython.scala:180)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.$anonfun$evaluate$7(BatchEvalPythonExec.scala:88)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:455)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:455)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
	at org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer$$anon$1.hasNext(InMemoryRelation.scala:132)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1423)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more

This seems to only happen when the Nested field has many=True. If you set many=False then the exception doesn't happen.

TypeError: Object of type datetime is not JSON serializable

When an invalid row contains a datetime field, JSON serialization fails at runtime.

schema = marshmallow_pyspark.Schema.from_dict({
    "time": datetime.datetime,
    "repro": float,
})()

_, errors = schema.validate_df(spark.createDataFrame([
    {"time": datetime.datetime.now(), "repro": "invalid"},
]))

errors.show()
# TypeError: Object of type datetime is not JSON serializable

Marshmallow Date field does not align with the pyspark DateType field

Hello ketgo,
thank you for the great library. I am using it for a project and I run into the following issue:
The mapping from the documentation page is not working as the fields.Date() expects Pyspark schema type StringType and not DateType.
Following example ends with TypeError: Object of type date is not JSON serializable
but the df.schema and AlbumSchema().spark_schema are the same.

Could it be changed so that marshmallow-pyspark validates the schema with DateType and the original field functionality is tied to something like DateStringType?

Thank you

P.

import datetime
from marshmallow_pyspark import Schema
from marshmallow import fields

# Create data schema.
class AlbumSchema(Schema):
    title = fields.Str()
    release_date = fields.Date()

# Input data frame to validate.
df = spark.createDataFrame([
    {"title": "valid_1", "release_date": datetime.date(2020, 1, 10)},
    {"title": "valid_2", "release_date": datetime.date(2020, 1, 11)},
    {"title": "invalid_1", "release_date": datetime.date(2020, 1, 11)},
    {"title": "invalid_2", "release_date": datetime.date(2020, 1, 21)},
])

# Get data frames with valid rows and error prone rows 
# from input data frame by validating using the schema.
valid_df, errors_df = AlbumSchema().validate_df(df)

# Output of valid data frame
valid_df.show()
#    +-------+------------+
#    |  title|release_date|
#    +-------+------------+
#    |valid_1|  2020-01-10|
#    |valid_2|  2020-01-11|
#    +-------+------------+

# Output of errors data frame
errors_df.show()
#    +--------------------+
#    |             _errors|
#    +--------------------+
#    |{"row": {"release...|
#    |{"row": {"release...|
#    +--------------------+

print(df.schema)
print(AlbumSchema().spark_schema)

Unable to import module marshmallow_pyspark on azure synapse analytics

After installing marshmallow pyspark on azure synapse analytics, when I am trying to import the module I receive a module not found error.

installation:
%pip install marshmallow-pyspark
from marshmallow_pyspark import Schema

error:
PythonException: An exception was thrown from the Python worker. Please see the stack trace below. Traceback (most recent call last): File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 603, in main func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type) File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 449, in read_udfs udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i)) File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 251, in read_single_udf f, return_type = read_command(pickleSer, infile) File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 71, in read_command command = serializer._read_with_length(file) File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 160, in _read_with_length return self.loads(obj) File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 430, in loads return pickle.loads(obj, encoding=encoding) ModuleNotFoundError: No module named 'marshmallow_pyspark'

Is it possible that some dependencies need to be updated and that causes this error?

Thank you!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.