Comments (5)
Hi,
You are using scala which is not yet officially supported, but shouldn't pose any problems here as far I can see, since you're using the java api of Flink. You're indeed right that I should provide a way here to provide Typeinformation. I will try to include this in the next release.
But I think your problem is right here:
createTimedTestStreamWith(Tuple2.of(null, null))
I use the first element later to extract TypeInformation. I think your test would work fine if the first element in your TestStream / Input would contain actual data.
You could change the logic to provide the input like this:
var testStream :EventTimeSourceBuilder[Tuple2[String, Report]] = null
val it = ReportList.iterator()
var report:Tuple2[String,Report] = null
report = it.next()
testStream = createTimedTestStreamWith(Tuple2.of(report.f0, report.f1))
while(it.hasNext)
{
report = it.next()
testStream = testStream.emit(Tuple2.of(report.f0, report.f1))
}
This way the first element contains actual data.
And since your'e not providing any timestamps but do Windowing do a .closeAndFlush();
instead of a simple close()
this will instruct Flinkspector to emit a max watermark at the end of the test stream.
val testStream1 = testStream.closeAndFlush();
I have not tested any of this. Please provide some Feedback if this resolves your problem.
Cheers,
Alex
from flink-spector.
Hello,
I have found somewhat a solution that works:
var builder: EventTimeSourceBuilder[Tuple2[String, Report]] = null
val reports: List[Tuple2[String, Report]] = ...
if (reports.size < 1) {
assert(false)
}
builder = createTimedTestStreamWith(reports.head)
for (report <- reports.drop(1)) {
builder.emit(report, InWindow.to( ... ))
}
val inputStream = builder.close()
The only issue I see is that the first record isn't inserted into a window, so I am not sure where the first record in the list ends up. Maybe #createTimedTestStreamWith
could be updated so you can pass a time for the first record, or like previously suggested, just passing the type with the stream constructor, and emiting all the records like normal.
from flink-spector.
We got flink-spector working with scala driver by adding following in the sbt
"junit" % "junit" % "4.11" % Test,
crossPaths := false,
Implementation is described as above by chris.
class Consumer extends DataStreamTestBase with AssertionsForJUnit{
@test def testStream() = {
var builder: EventTimeSourceBuilder[Tuple2[String, Report]] = null
val reports: List[Tuple2[String, Report]] = ...
if (reports.size < 1) {
assert(false)
}
builder = createTimedTestStreamWith(reports.head)
for (report <- reports.drop(1)) {
builder.emit(report, InWindow.to( ... ))
}
val inputStream = builder.close()
}
}
you can run the test with "sbt test"
from flink-spector.
@cslotterback You can already to this by using the underlying EventTimeInputBuilder
.
public static <T> EventTimeInputBuilder<T> startWith(T record, long timeStamp)
public static <T> EventTimeInputBuilder<T> startWith(T record, Moment moment)
But I'm going to expose this in the high level api in the next release.
@rohanbolar If I'm not mistaken according to the inputs your still using the java api of flink?
import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSource, SingleOutputStreamOperator}
from flink-spector.
This issue has been bundled into #57
from flink-spector.
Related Issues (20)
- Low performance when using EventTimeInputBuilder HOT 1
- Tests don't work while making mvn test on CircleCI HOT 2
- Convert Message Headers to Long
- Global Windows don't emit data on time HOT 2
- Publish versions to OSS Repository Hosting HOT 2
- "Task was cancelled" exception when testing CoProcessFunction HOT 6
- Tests flicker when running in parallel (when using Managed State) HOT 4
- Publish 0.8.2 to Maven Central HOT 5
- NPE in test HOT 6
- Consider splitting out JUnit 4 functionality into separate module and documenting HOT 2
- assertStream always passes HOT 6
- There is no DataStreamTestBase.emit variant that accepts an arbitrary timestamp HOT 1
- Run pipeline without flushing all the windows HOT 2
- TestBaseUtils.startCluster is deprecated as of Flink 1.7.0 HOT 5
- Release 0.8.5? HOT 3
- Flinkspector 0.9.1 references package/classes not published by Flink HOT 4
- Is there any plan to support flink 1.8? HOT 6
- setParallelism changes test outcome HOT 6
- unresolved dependencies scala 2.12
- unmaintained? HOT 2
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from flink-spector.