Giter Club home page Giter Club logo

Comments (5)

lofifnc avatar lofifnc commented on September 20, 2024

Hi,

You are using scala which is not yet officially supported, but shouldn't pose any problems here as far I can see, since you're using the java api of Flink. You're indeed right that I should provide a way here to provide Typeinformation. I will try to include this in the next release.

But I think your problem is right here:
createTimedTestStreamWith(Tuple2.of(null, null))

I use the first element later to extract TypeInformation. I think your test would work fine if the first element in your TestStream / Input would contain actual data.

You could change the logic to provide the input like this:

var testStream :EventTimeSourceBuilder[Tuple2[String, Report]] = null

val it = ReportList.iterator()
var report:Tuple2[String,Report] = null
report = it.next()

testStream = createTimedTestStreamWith(Tuple2.of(report.f0, report.f1))

while(it.hasNext)
  {
    report = it.next()
    testStream = testStream.emit(Tuple2.of(report.f0, report.f1))

  }

This way the first element contains actual data.

And since your'e not providing any timestamps but do Windowing do a .closeAndFlush(); instead of a simple close() this will instruct Flinkspector to emit a max watermark at the end of the test stream.
val testStream1 = testStream.closeAndFlush();

I have not tested any of this. Please provide some Feedback if this resolves your problem.

Cheers,
Alex

from flink-spector.

cslotterback avatar cslotterback commented on September 20, 2024

Hello,

I have found somewhat a solution that works:

var builder: EventTimeSourceBuilder[Tuple2[String, Report]] = null
val reports: List[Tuple2[String, Report]] = ...

if (reports.size < 1) {
      assert(false)
}

builder = createTimedTestStreamWith(reports.head)
for (report <- reports.drop(1)) {
  builder.emit(report, InWindow.to( ... ))
}
val inputStream = builder.close()

The only issue I see is that the first record isn't inserted into a window, so I am not sure where the first record in the list ends up. Maybe #createTimedTestStreamWith could be updated so you can pass a time for the first record, or like previously suggested, just passing the type with the stream constructor, and emiting all the records like normal.

from flink-spector.

rohanbolar avatar rohanbolar commented on September 20, 2024

We got flink-spector working with scala driver by adding following in the sbt

"junit" % "junit" % "4.11" % Test,
crossPaths := false,

Implementation is described as above by chris.

class Consumer extends DataStreamTestBase with AssertionsForJUnit{
@test def testStream() = {
var builder: EventTimeSourceBuilder[Tuple2[String, Report]] = null
val reports: List[Tuple2[String, Report]] = ...

if (reports.size < 1) {
assert(false)
}

builder = createTimedTestStreamWith(reports.head)
for (report <- reports.drop(1)) {
builder.emit(report, InWindow.to( ... ))
}
val inputStream = builder.close()

}

}

you can run the test with "sbt test"

from flink-spector.

lofifnc avatar lofifnc commented on September 20, 2024

@cslotterback You can already to this by using the underlying EventTimeInputBuilder.

    public static <T> EventTimeInputBuilder<T> startWith(T record, long timeStamp) 

    public static <T> EventTimeInputBuilder<T> startWith(T record, Moment moment) 

But I'm going to expose this in the high level api in the next release.

@rohanbolar If I'm not mistaken according to the inputs your still using the java api of flink?
import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSource, SingleOutputStreamOperator}

from flink-spector.

lofifnc avatar lofifnc commented on September 20, 2024

This issue has been bundled into #57

from flink-spector.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.