Comments (4)
Thanks for the feature request @mkuthan. Will look at this during our preparation of 0.14.
We should also change the BQ to probably expose the handleErrors
with the raw WriteResult
from scio.
One more experiment with an even more generic API that allows encapsulating all steps of the custom IO as a single Scala friendly transform.
With custom output like this, I could provide a composite transform that converts domain objects into underlying storage format (for example JSON bytes) and then save bytes in the database using the Beam IO connector.
In the job test I will be able to use domain object T
because the whole transform will be replaced by the stub. As a bonus, transformFn
has full access to the POutput to handle errors.
val self: SCollection[T] ...
def betterSaveAsCustomOutput[O <: POutput](name: String)(transformFn: SCollection[T] => O): ClosedTap[Nothing] = {
if (self.context.isTest) {
TestDataManager.getOutput(self.context.testId.get)(CustomIO[T](name))(self)
} else {
self.applyInternal(
name,
new PTransform[PCollection[T], O]() {
override def expand(input: PCollection[T]): O =
transformFn(self.context.wrap(input))
}
)
}
ClosedTap[Nothing](EmptyTap)
}
For the reading part I could do the same. Provide the composite transform that reads bytes from the database using Beam IO connector, deserializes JSON, and returns domain objects. Everything as a single transform, easy to test at the job level. In the test I only need to prepare domain objects as input. The bytes representation from the Beam IO connector is fully encapsulated in the composite transform.
val self: ScioContent = ...
def betterCustomInput[T, I >: PBegin <: PInput](name: String)(transformFn: I => SCollection[T]): SCollection[T] =
self.requireNotClosed {
if (self.isTest) {
TestDataManager.getInput(self.testId.get)(CustomIO[T](name)).toSCollection(self)
} else {
self.applyTransform(
name,
new PTransform[I, PCollection[T]]() {
override def expand(input: I): PCollection[T] =
transformFn(input).internal
}
)
}
}
}
Alternatively I could put my composite transform into plain Beam PTransform and use existing customInput
or saveAsCustomOutput
. But I would prefer to use the Scio API in my code :)
from scio.
Wonderfull, thanks!
You could also look at the (ugly) workaround I applied in my playground project:
- https://github.com/mkuthan/stream-processing/blob/main/stream-processing-infrastructure/src/main/scala/com/spotify/scio/BetterScioContext.scala
- https://github.com/mkuthan/stream-processing/blob/main/stream-processing-infrastructure/src/main/scala/com/spotify/scio/values/BetterSCollection.scala
BigQuery IO aligned to my vision of the infrastructure layer:
- https://github.com/mkuthan/stream-processing/blob/main/stream-processing-infrastructure/src/main/scala/org/mkuthan/streamprocessing/infrastructure/bigquery/syntax/BigQuerySCollectionSyntax.scala
- https://github.com/mkuthan/stream-processing/blob/main/stream-processing-infrastructure/src/main/scala/org/mkuthan/streamprocessing/infrastructure/bigquery/syntax/BigQueryScioContextSyntax.scala
How it plays with JobTest. Input/output defined within domain types instead of bytes from Pubsub or TableRows from BigQuery. TestDataManager fully overrides IO connectors from the infrastructure layer.
- https://github.com/mkuthan/stream-processing/blob/main/toll-application/src/test/scala/org/mkuthan/streamprocessing/toll/application/streaming/TollStreamingJobTest.scala
- https://github.com/mkuthan/stream-processing/blob/main/toll-application/src/test/scala/org/mkuthan/streamprocessing/toll/application/batch/TollBatchJobTest.scala
from scio.
@mkuthan just a status update on this issue.
We've opted for another strategy that avoids passing a pipeline transforms function to the IO. We prefered to expose possible SCollection
write result as side output. This way, we can keep a 'flat' pipeline definition.
On 0.14, the testing framework will mock those as empty, but we plan to let users set custom values in the future.
I hope this setup fits with your needs. Let us know otherwise.
from scio.
Related Issues (20)
- Drop java 8 support
- Add java 21 Support HOT 1
- Support Taps for SMB writes/transforms HOT 1
- Support projections in ParquetAvroFileOperations/ParquetAvroSortedBucketIO HOT 1
- Flaky test in SCollectionTest
- Flaky test in ParquetExampleIOTest
- Expose extendedErrorInfo in all BQ APIs
- Document BigTableDoFn
- Using sparkey largeHashIntersectByKey blocks pipeline on Dataflow v1
- ParquetAvroTap does not set projection or dataSupplier HOT 2
- ParquetAvroFileOperations does not set dataSupplier
- Make SplittableDoFn the default Parquet read implementation HOT 2
- StorageUtil.toTableSchema fails on BigNumeric BQ column
- com.spotify.scio.bigquery.client.BigQuery.defaultInstance().run does not respect destinationTable=null setting for UML HOT 1
- ParquetAvroDynamicTest is flaky HOT 2
- ByteArrayCoder sometimes produces encodings that match Deflate Codec header
- Deadlock in AsyncLookupDoFn HOT 1
- Support setting numShards > 1 in SMB transforms
- Could we add `SchemaUpdateOption` param to BQ WriteParam? HOT 1
- CsvIO limitation of HeaderDecoder and RowDecoder HOT 2
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from scio.