Giter Club home page Giter Club logo

smolder's Introduction

A library for burning through electronic health record data using Apache Spark™

Smolder provides an Apache Spark™ SQL data source for loading EHR data from HL7v2 message formats. Additionally, Smolder provides helper functions that can be used on a Spark SQL DataFrame to parse HL7 message text, and to extract segments, fields, and subfields, from a message.

Project Support

Please note that all projects in the /databrickslabs github account are provided for your exploration only, and are not formally supported by Databricks with Service Level Agreements (SLAs). They are provided AS-IS and we do not make any guarantees of any kind. Please do not submit a support ticket relating to any issues arising from the use of these projects.

Any issues discovered through the use of this project should be filed as GitHub Issues on the Repo. They will be reviewed as time permits, but there are no formal SLAs for support.

Building and Testing

This project is built using sbt and Java 8.

Start an sbt shell using the sbt command.

FYI: The following SBT projects are built on Spark 3.2.1/Scala 2.12.8 by default. To change the Spark version and Scala version, set the environment variables SPARK_VERSION and SCALA_VERSION.

To compile the main code:

compile

To run all Scala tests:

test

To test a specific suite:

testOnly *HL7FileFormatSuite

To create a JAR that can be run as part of an Apache Spark job or shell, run:

package

The JAR can be found under target/scala-<major-version>.

Getting Started

To load HL7 messages into an Apache Spark SQL DataFrame, simply invoke the hl7 reader:

scala> val df = spark.read.format("hl7").load("path/to/hl7/messages")
df: org.apache.spark.sql.DataFrame = [message: string, segments: array<struct<id:string,fields:array<string>>>]

The schema returned contains the message header in the message column. The message segments are nested in the segments column, which is an array. This array contains two nested fields: the string id for the segment (e.g., PID for a patient identification segment and an array of segment fields.

Parsing message text from a DataFrame

Smolder can also be used to parse raw message text. This might happen if you had an HL7 message feed land in an intermediate source first (e.g., a Kafka stream). To do this, we can use Smolder's parse_hl7_message helper function. First, we start with a DataFrame containing HL7 message text:

scala> val textMessageDf = ...
textMessageDf: org.apache.spark.sql.DataFrame = [value: string]

scala> textMessageDf.show()
+--------------------+                                                          
|               value|
+--------------------+
|MSH|^~\&|||||2020...|
+--------------------+

Then, we can import the parse_hl7_message message from the com.databricks.labs.smolder.functions object and apply that to the column we want to parse:

scala> import com.databricks.labs.smolder.functions.parse_hl7_message
import com.databricks.labs.smolder.functions.parse_hl7_message

scala> val parsedDf = textMessageDf.select(parse_hl7_message($"value").as("message"))
parsedDf: org.apache.spark.sql.DataFrame = [message: struct<message: string, segments: array<struct<id:string,fields:array<string>>>>]

This yields the same schema as our hl7 data source.

Extracting fields from an HL7 message segment

While Smolder provides an easy-to-use schema for HL7 messages, we also provide helper functions in com.databricks.labs.smolder.functions to extract subfields of a message segment. For instance, let's say we want to get the patient's name, which is the 5th field in the patient ID (PID) segment. We can extract this with the segment_field function:

scala> import com.databricks.labs.smolder.functions.segment_field
import com.databricks.labs.smolder.functions.segment_field

scala> val nameDf = df.select(segment_field("PID", 4).alias("name"))
nameDf: org.apache.spark.sql.DataFrame = [name: string]

scala> nameDf.show()
+-------------+
|         name|
+-------------+
|Heller^Keneth|
+-------------+

If we then wanted to get the patient's first name, we can use the subfield function:

scala> import com.databricks.labs.smolder.functions.subfield
import com.databricks.labs.smolder.functions.subfield

scala> val firstNameDf = nameDf.select(subfield($"name", 1).alias("firstname"))
firstNameDf: org.apache.spark.sql.DataFrame = [firstname: string]

scala> firstNameDf.show()
+---------+
|firstname|
+---------+
|   Keneth|
+---------+

License and Contributing

Smolder is made available under an Apache 2.0 license, and we welcome contributions from the community. Please see our contibutor guidance for information about how to contribute to the project. To ensure that contributions to Smolder are properly licensed, we follow the Developer Certificate of Origin (DCO) for all contributions to the project.

smolder's People

Contributors

fnothaft avatar pohlposition avatar ryandecosmo avatar zavoraad avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

smolder's Issues

segment returns empty

segment returns empty when parse message with parse_hl7_message function.

Here is the simple scala code:

val message = """MSH|^~&|||||20201020150800.739+0000||ADT^A03^ADT_A03|11374301|P|2.4
EVN|A03|20170820074419-0500
PID|||d40726da-9b7a-49eb-9eeb-e406708bbb60||Heller^Keneth||||||140 Pacocha Way Suite 52^^Northampton^Massachusetts^^USA
PV1||a|^^^COOLEY DICKINSON HOSPITAL INC THE|ambulatory||||49318f80-bd8b-3fc7-a096-ac43088b0c12^THE^COOLEY||||||||||||||||||||||||||||||||||||20170820074419-0500"""

val columns = Seq("value")
val data = Seq(message)

val rdd = spark.sparkContext.parallelize(data)

val dfFromRDD1 = rdd.toDF()

val parsedDf = dfFromRDD1.select(parse_hl7_message($"value").as("message")) // here we can see that segments return empty
Screen Shot 2022-01-11 at 5 11 06 AM

parse_hl7_message results in error

Hi, I am trying to parse HL7 message using the parse_hl7_message function (similar to the example in the doc)

val parsedDf = hl7Bronze.select(parse_hl7_message($"value").as("message"))

and I am getting the following error. Any hints as to what could be going wrong? Thanks a lot !

java.lang.AbstractMethodError: Method com/databricks/labs/smolder/sql/ParseHL7Message.child()Lorg/apache/spark/sql/catalyst/trees/TreeNode; is abstract
  at com.databricks.labs.smolder.sql.ParseHL7Message.child(ParseHL7Message.scala)

  at org.apache.spark.sql.catalyst.trees.UnaryLike.children(TreeNode.scala:1127)

  at org.apache.spark.sql.catalyst.trees.UnaryLike.children$(TreeNode.scala:1127)

  at org.apache.spark.sql.catalyst.expressions.UnaryExpression.children$lzycompute(Expression.scala:467)

  at org.apache.spark.sql.catalyst.expressions.UnaryExpression.children(Expression.scala:467)

  at org.apache.spark.sql.catalyst.trees.TreeNode.getDefaultTreePatternBits(TreeNode.scala:119)

  at org.apache.spark.sql.catalyst.trees.TreeNode.treePatternBits$lzycompute(TreeNode.scala:130)

  at org.apache.spark.sql.catalyst.trees.TreeNode.treePatternBits(TreeNode.scala:130)

  at org.apache.spark.sql.catalyst.trees.TreeNode.getDefaultTreePatternBits(TreeNode.scala:121)

  at org.apache.spark.sql.catalyst.trees.TreeNode.treePatternBits$lzycompute(TreeNode.scala:130)

  at org.apache.spark.sql.catalyst.trees.TreeNode.treePatternBits(TreeNode.scala:130)

  at org.apache.spark.sql.catalyst.plans.QueryPlan.treePatternBits$lzycompute(QueryPlan.scala:62)

  at org.apache.spark.sql.catalyst.plans.QueryPlan.treePatternBits(QueryPlan.scala:57)

  at org.apache.spark.sql.catalyst.trees.TreePatternBits.containsPattern(TreePatternBits.scala:32)

  at org.apache.spark.sql.catalyst.trees.TreePatternBits.containsPattern$(TreePatternBits.scala:31)

  at org.apache.spark.sql.catalyst.trees.TreeNode.containsPattern(TreeNode.scala:96)

  at org.apache.spark.sql.catalyst.optimizer.OptimizeUpdateFields$.$anonfun$apply$1(UpdateFields.scala:76)

  at org.apache.spark.sql.catalyst.optimizer.OptimizeUpdateFields$.$anonfun$apply$1$adapted(UpdateFields.scala:76)

  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:167)

  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:164)

  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:30)

  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning(AnalysisHelper.scala:99)

  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning$(AnalysisHelper.scala:96)

  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsWithPruning(LogicalPlan.scala:30)

  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveExpressionsWithPruning(AnalysisHelper.scala:244)

  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveExpressionsWithPruning$(AnalysisHelper.scala:242)

  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveExpressionsWithPruning(LogicalPlan.scala:30)

  at org.apache.spark.sql.catalyst.optimizer.OptimizeUpdateFields$.apply(UpdateFields.scala:76)

  at org.apache.spark.sql.catalyst.optimizer.OptimizeUpdateFields$.apply(UpdateFields.scala:33)

  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)

  at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)

  at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)

  at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38)

  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)

  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)

  at scala.collection.immutable.List.foreach(List.scala:431)

  at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)

  at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:222)

  at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:218)

  at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:167)

  at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:218)

  at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:182)

  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)

  at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:93)

  at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)

  at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:203)

  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)

  at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:202)

  at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:78)

  at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:120)

  at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:207)

  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)

  at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:207)

  at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:78)

  at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:76)

  at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:68)

  at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:91)

  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)

  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:89)

  at org.apache.spark.sql.Dataset.withPlan(Dataset.scala:3747)

  at org.apache.spark.sql.Dataset.select(Dataset.scala:1467)

HL7 V3 Support

Current and potential future users have enquired about support for HL7 v3. Creating this issue to document the desire to have HL7 v3 support and provide tracking for future contributions to address support for this feature.

FHIR format support

Hello!
I would like to know if there is any plan in the future to support the ingestion FHIR data format?
I had a look on the bunsen library (https://github.com/cerner/bunsen) provided by Cerner, but it seems that there is no activity anymore on the project.
It will be very helpful for analyzing healthcare relying on the FHIR to have a spark library to transform it in a more proper format for performing AI.

I look forward for your answers.

Regards,
Florin

ENH: Recognize and process HL7 messages when MLLP framing bytes are included

There's a example where the HL7 message is take and placed in a file or on a message queue with the leading 0x0b and the trailing 0x1c0x0d byte sequences (the same as found in the MLLP transition).
It would be nice if the parser would recognize the presence of the framing bytes and remove them and parse the message (silent to the end user).

Object functions is not a member of package

I am trying to import com.databricks.labs.smolder.functions.parse_hl7_message but I am getting a is not a member error when trying to import in datatricks. I am using Spark 3.2 and Scala 2.12

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.