Giter Club home page Giter Club logo

flink-spector's People

Contributors

aegeaner avatar afwlehmann avatar davefrederick avatar englhardt avatar facboy avatar ivanvergiliev avatar kovrus avatar lofifnc avatar mbode avatar ouertani avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

flink-spector's Issues

Low performance when using EventTimeInputBuilder

I'm currently using flink-spector to read a "large" (about 10.000 values) dataset from a CSV file and use the EventTimeInputBuilder to create a data stream from it. However, the performance drops significantly the larger the input gets. My application needs about 30 minutes to process those data (even though the algorithm basically just calculates a moving average). Cutting down the input to 5.000 values drops the runtime to 5 minutes.

I used a profiler to dig into the problem and it seems that InputUtil.produceWatermarks is quite time consuming.

I'm using the InputBuilder now and assign timestamps and watermarks myself which drops the runtime to a couple of seconds. Just wanted to report this.

unresolved dependencies scala 2.12

I am using flink 1.8.0 and scala 2.12. I add the dependencies to my sbt but getting :

[warn] ::::::::::::::::::::::::::::::::::::::::::::::
[warn] :: UNRESOLVED DEPENDENCIES ::
[warn] ::::::::::::::::::::::::::::::::::::::::::::::
[warn] :: org.flinkspector#flinkspector-datastream_2.12;0.9.4: not found
[warn] ::::::::::::::::::::::::::::::::::::::::::::::

Release 0.8.5?

Could 0.8.5 be released? there are some useful fixes since 0.8.4.

Support for Flink 1.2

Hello, I was trying to use flink-spector together with Flink 1.2, but the test hangs up indefinitely while running. I switched back to 1.1.4 and everything is ok.

Is this because of lack of 1.2 support? Are you planning to support it?

Thank you

Make high level Api less restrictive

As requested in Issues #54 and #52 the high level api should expose more options to the end user.

Options that should be available

  • Stream Time characteristic
  • First record with timestamp or span
  • Providing TypeInformation

Global Windows don't emit data on time

I have a GlobalWindow with a custom trigger (I leave windows open for a few seconds after I have enough data to close the window).

When I emit data into my data stream, the flink execution environment appears to halt after the test data is exhausted but before my GlobalWidow is triggered.

I tried changing my trigger to wait zero seconds on window full, but that just appears to have made my test racy where sometimes the global window triggers and calls apply (so the test passes) and sometimes the environment appears to halt first.

Is there a way for me to leave the execution environment running for a few seconds after all of my data is emitted? Or is there a good way for me to test this? So far my only solution has been to stop using flink-spector, swap to using env.fromCollection() in flink, and then pass a custom iterator where the iterator itself hangs before delivering the last value Thread.sleep(10_000) and then the last value is also untested. That gives the window a chance to trigger and I always get the correct results (huzzah) but it's both hacky and stops me from leveraging flink-spector.

Any advice here is greatly appreciated. Thanks.

AssertionError for executeTest() always saying "timeout", even if not relevant

In DataTestStreamBase.java, the catch statement in executeTest() always prints "Test terminated due to timeout!" when the test has been stopped, and then appends the AssertionError's message. This doesn't seem to be correct usage โ€” the comments around the definition of testEnv.hasBeenStopped() say it's use is for "e.g. timeout", implying the test can be stopped for more than just timeouts.

--

Instead of

catch (AssertionError var2) {
            if(this.testEnv.hasBeenStopped().booleanValue()) {
                throw new AssertionError("Test terminated due timeout!" + var2.getMessage());
            } else {
                throw var2;
            }

Should the catch statement just be the below?

catch (AssertionError var2) {
            if(this.testEnv.hasBeenStopped().booleanValue()) {
                throw new AssertionError(var2.getMessage());
            } else {
                throw var2;
            }

Tests don't work while making mvn test on CircleCI

Hello,

I encountered a problem. On my machine, when I create tests and run them with mvn test, all of them are passed, but when I build on CircleCI, project compiles, all dependencies are resolved, but all the tests are failed and instead of any results, I get:

testClientSexCounter(FlinkOperationsTest)  Time elapsed: 2.138 sec  <<< FAILURE!
java.lang.AssertionError: 
Expected: output ( contains all <List((F,1), (M,1), (F,2))>) 
     but: was <[]>

Is there an issue with tests from such services?

Best Regards

Sefirq

Upgrade to Flink 1.0.0

Are there any plans on upgrading to Flink 1.0.0? I realized there is no branch yet on github.

Cheers,

Konstantin

Configurable TimeCharacteristic for test environment

I'm trying to test some CEP with flink-spector, but I can't make it work.
My test is shown below. This test passes although it should not (the expected record should not match the actual one). I'm wondering if the CEP processing is applied correctly...
The code is in Scala, but I'm using the Java DataStream classes.

@Test
  def testWithFlinkspectorAndJavaStreams(): Unit = {
    val event1 = Event("xxx", "", "message")
    val event2 = Event("xxx", "", "web portal")
    val event3 = Event("xxx", "", "message")
    val event4 = Event("xxx", "", "message")
    val event5 = Event("xxx", "", "call")

    val input = createTestStreamWith(event1).emit(event2).emit(event3).emit(event4).emit(event5).close()

    val pattern = Pattern.begin("event0").subtype(classOf[Event]).where(new EventActionCondition("message")).next("event1").where(new EventActionCondition("call"))

    val patternStream = CEP.pattern(input, pattern)
    val alertStream: DataStream[String] = patternStream.select(new AlertPatternSelectFunction())

    val output: ExpectedRecords[String] = ExpectedRecords.create[String]("Got one !")
    assertStream(alertStream, output)
  }

With the util classes:

class EventActionCondition(action: String) extends IterativeCondition[Event] {
  override def filter(value: Event, ctx: IterativeCondition.Context[Event]): Boolean = {
    value.action == action
  }
}
class AlertPatternSelectFunction extends PatternSelectFunction[Event, String] {
  override def select(pattern: util.Map[String, util.List[Event]]): String = {
    "got it !"
  }
}

Random message ordering

Hi,

I have a RichCoFlatMapFunction with flatMap1 and flatMap2 functions for each kind of messages.

When I'm specifying order of original messages in a stream, the order they come to RichCoFlatMapFunction is arbitrary

I'm creating stream like that:
DataStream<Message> stream = createTestStreamWith(message1).emit(message2).close();

Doing assertion:
assertStream(job.processStream(stream), matcher);

processStream looks like:

DataStream<IN1> stream1 = ...
DataStream<IN2> stream2 = ...

return stream1.connect(stream2).keyBy("key1", "key2")
        .flatMap(new MyRichCoFlatMapFunction());

Is there a way to define strict order for messages?
Currently they can arrive to MyRichCoFlatMapFunction as "message1,message2" or "message2,message1" triggering flatMap1 or flatMap2 in arbitrary order, which is not good for unit testing

Tests flicker when running in parallel (when using Managed State)

I am testing an operator that works on a Keyed stream and uses Managed State (e.g. ValueState, MapState). The tests pass as expected when running them one by one - i.e. separate JUnit runs. However, when running them in parallel, they flicker - i.e. they sometimes pass but sometimes fail.

The 2 tests I am running are nearly identical - the only difference is they have different JUnit Test class names. The inputs to both tests were kept constant, in my case the same JSON document.

When I debugged the runs of both tests (run in parallel), I noticed that

  1. First test passes
  2. When the second test starts, it's initial state (a ValueState member) is being initialized with the last input from the first test (this happens in the open() method of the CoProcessFunction derived class). This is unexpected, and subsequently the test produces incorrect result stream and fails.

As I understand each DataStreamTestBase gets its own distinct test environment. However, when using managed state there seems to be some sharing going on. Could there be some external temporary file or socket/port re-use --- just an initial guess?

Lastly, I resorted to running the tests in separate JVMs. This seems to fix the problem, but I still need to confirm (I think I saw it still fail once - but need to double check).

(p.s. - I'll update this with a code example later today)

Tests don't fail if the output stream is empty

For some reason exceptions thrown by the HamcrestVerifier cause the ?jvm to crash? if the output list is empty. The same exception will be propagated up the chain if the output contains any element.

Flink Spector throws please specify the TypeInformation manually via StreamExecutionEnvironment#fromElements(Collection, TypeInformation) error

Hi There. i am having issue with running the test case

  1. I have file containging avro JSON Message
  2. I would like to have DStream created with Tuple2<key, AvroObject>
  3. run window.reduce operation and validate the aggregate
  4. I looked into source code of spector. there is no hook available to specify the TypInformation

Any help would be great help.

package com.xyz.ips

import java.io.{InputStream, InputStreamReader}
import java.util.concurrent.TimeUnit

import com.xyz.ips.mediation.Report
import com.xyz.ips.test.utii.TestDataReader
import com.xyz.ips.transformation.{Counter, EMMFlatMap}
import com.typesafe.config.ConfigException.Null
import abc.efg.EReport
import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSource, SingleOutputStreamOperator}
import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.streaming.api.windowing.time.Time
import org.flinkspector.core.quantify.{MatchTuples, OutputMatcher}
import org.flinkspector.datastream.{DataStreamTestBase, DataStreamTestEnvironment}
import org.flinkspector.datastream.input.time.InWindow
import org.hamcrest.CoreMatchers._
import org.junit.Test
import org.scalatest.junit.AssertionsForJUnit

import scala.io.Source
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericDatumReader, GenericRecord}
import org.apache.avro.io.{DatumReader, Decoder, DecoderFactory, JsonDecoder}
import org.apache.avro.specific.{SpecificData, SpecificDatumReader, SpecificRecord}
import org.apache.commons.io.IOUtils
import org.apache.avro.io.Decoder
import org.apache.avro.io.JsonDecoder
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.scala.createTypeInformation
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.flinkspector.datastream.input.EventTimeSourceBuilder

import scala.collection.mutable.ArrayBuffer

class ReportConsumerTest extends DataStreamTestBase with AssertionsForJUnit{
private var schema = null

/**
* Reader that deserializes byte array into a record.
*/
private var datumReader = null

/**
* Input stream to read message from.
*/
private var inputStream = null

/**
* Avro decoder that decodes binary data.
*/
private var decoder = null

/**
* Record to deserialize byte array to.
*/
private var record = null

val serDataType: TypeInformation[Tuple2[String, Report]] = createTypeInformation[Tuple2[String, Report]]

@test def testCounter() = {

val testPayload = this.getClass().getClassLoader().getResourceAsStream("samples/simplecounter-2.txt")
val ReportList: java.util.List[Tuple2[String,Report]] = TestDataReader.parseAvro(testPayload);



setParallelism(2)

var testStream :EventTimeSourceBuilder[Tuple2[String, Report]] = null

 testStream = createTimedTestStreamWith(Tuple2.of(null, null))
val serDataType: TypeInformation[Tuple2[String, Report]] = createTypeInformation[Tuple2[String, Report]]


val it = ReportList.iterator()

var report:Tuple2[String,Report] = null

while(it.hasNext)
  {
    report = it.next()
    testStream = testStream.emit(Tuple2.of(report.f0, report.f1))

  }



**val testStream1 = testStream.close();**

/*
System.out.println("test****************")

val matcher :OutputMatcher[Tuple2[String, EReport]]  =
//name the values in your tuple with keys:
  new MatchTuples[Tuple2[String, EReport]]("value", "name")
    //add an assertion using a value and hamcrest matchers
    .assertThat("name", either(is("fritz")).or(is("peter")))
    //express how many matchers must return true for your test to pass:
    .anyOfThem()
    //define how many records need to fulfill the
    .onEachRecord()

assertStream(window(testStream1), matcher);

*/

}

def window(stream: DataStream[Tuple2[String,Report]]):DataStream[Tuple2[String,EReport]] = {

val hourlyAggregate: DataStream[Tuple2[String, EReport]] = stream
  .flatMap(new EFlatMap(null))
  .keyBy(0)
  .window(TumblingEventTimeWindows.of(Time.seconds(10)))
  .reduce(new Counter())
return hourlyAggregate;

}

}

-Getting following issue

val testStream1 = testStream.close(); line is throwing error
[error] Test com.xyz.ips.ReportConsumerTest.testCounter failed: java.lang.RuntimeException: Could not startWith TypeInformation for type class org.apache.flink.streaming.runtime.streamrecord.StreamRecord; please specify the TypeInformation manually via StreamExecutionEnvironment#fromElements(Collection, TypeInformation), took 1.131 sec
[error] at org.flinkspector.datastream.DataStreamTestEnvironment.fromCollectionWithTimestamp(DataStreamTestEnvironment.java:188)
[error] at org.flinkspector.datastream.DataStreamTestEnvironment.fromInput(DataStreamTestEnvironment.java:142)
[error] at org.flinkspector.datastream.input.EventTimeSourceBuilder.close(EventTimeSourceBuilder.java:62)
[error] at com.xyz.ips.ReportConsumerTest.testCounter(ReportConsumerTest.scala:92)

ExpectedRecords cannot have type Collection

When using List<T> with ExpectedRecords<T>, a compile error returns Incompatible equality constraint: List<T> and T.

Example:

List<Integer> integerList = new ArrayList<>();
integerList.add(1); integerList.add(2); integerList.add(3);

ExpectedRecords<List<Integer>> expectedIntegers = ExpectedRecords.create(integerList);

Potential solution

I think I see why this is happening. In org.flinkspector.core.collection:

public static <T> ExpectedRecords<T> create(Collection<T> records) {
   ExpectedRecords<T> output = new ExpectedRecords<>();
ย   return output.expectAll(records);
}

That is, when you receive a collection like above, rather than accepting that as one single record, it gets interpreted as a set of multiple records. I understand why you would want this and agree it should be kept, but I'm hoping there's a way we can also accept a Collection as just one individual record.

I'm more than happy to look into this and make a PR! I don't have a ton of experience with generics though - do any ideas immediately pop out?

@lofifnc

assertStream always passes

I am attempting to write a unit test on my DataStream, but it is constantly passing. Here is my code:

        testEnv = DataStreamTestEnvironment.createTestEnvironment(1);
        createTestStream(Arrays.asList(eventStreamPayload));
        DataStream<JSONObject> dataStream = createTestStream(Arrays.asList(eventStreamPayload)).flatMap(new myFlatMap());
        TestSink<JSONObject> outputFormat = createTestSink(hasItems(new JSONObject(eventStreamPayload2)));
        dataStream.addSink(outputFormat);
        JSONObject jsonObject = new JSONObject(eventStreamPayload2);
        assertStream(dataStream, hasItems(jsonObject));

Now in the above case, eventStreamPayload and eventStreamPayload2 are different and myFlatMap() modifies eventStreamPayload as well. Yet the TestSink and assertStream is always passing. What am I a missing?

"Task was cancelled" exception when testing CoProcessFunction

Does flink spector work for testing CoProcessFunction operators? When attempting to test it, I keep getting a

java.util.concurrentCancellationException: Task was cancelled.

Notably, the exception seems to occur because in io.flinkspector.core.runtime.OutputSubscriber class in method getNextMessage(). The method appears to throw an InterruptedException, with then causes it to return an empty byte array, which then causes the job to end.

    public byte[] getNextMessage() {
        try {
            return queue.poll(TIMEOUT, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            //Thread interrupted stop waiting and signal it to the subscriber
            return new byte[0];
        }
    }

This then causes the OutputHandler to finish...

        if (bytes.length == 0) {
            //the subscriber has been cancelled from outside quietly finish the process
            return Action.FINISH;
        }

My CoProcessFunction is very (intentionally) simple:

public class DummyJoinFunction extends CoProcessFunction<MyType, MyOtherType, MyType> {
     
     public void processElement1(MyType value, Context ctx, Collector<MyType> out)  {
          // just pass the input to the collector;
          out.collect(value);
     }

     public void processElement1(MyOtherType value, Context ctx, Collector<MyType> out)  {
          // do nothing here;
     }
}

And my test class:

public class TestDummyJoinFunction extends DataStreamTestBase {
      
       @Test
       public void testTheFunction() {
             Input<MyType> myTypeInput = InputBuilder.startsWith(new MyType(1)).emit(new MyType(2));
             KeyedStream<MyType, Integer> myInputStream = this.createTestStream(myTypeInput).keyBy(new MyKeySelector());

             Input<MyOtherType> myOtherTypeInput = InputBuilder.startsWith(new MyOtherType(1)).emit(new MyOtherType(2));
             KeyedStream<MyOtherType, Integer> myOtherInputStream = this.createTestStream(myOtherTypeInput).keyBy(new MyOtherKeySelector());

             DataStream outputStream = myInputStream.connect(myOtherInputStream).process (new DummyJoinFunction());

             assertStream (outputStream, new MyOutputMatcher());             
       }
}

setParallelism changes test outcome

I have a test case that passes when I run with setParallelism(1) but fails with e.g. setParallelism(4);

The test case looks like this:

image

None of the tests are dependent on any other data source nor stream, than the input stream that the test supplies.

So I'm a bit confounded as to why parallelism might cause a different test outcome? Flink 1.7.2 on JDK 8, here.

I added some println statements at the last output node and it prints the correct values; only the test sink fails to receive the events.

TestBaseUtils.startCluster is deprecated as of Flink 1.7.0

Since the newest Flink 1.7.0 release the method TestBaseUtils.startCluster is not available anymore, resulting in a NoSuchMethodException when running tests with flink-spector.

Apart from adapting this to work with Flink 1.7.0 in general it is worth thinking about shading the Flink artifacts that come with flink-spector and only use the shaded version internally (avoiding, however, the too general org.apache.flink.shaded because it might further conflict with additional third-party libraries).

Run pipeline without flushing all the windows

I have a test similar like this one:

    @Test
    public void should_not_fire_window_yet() {
        DataStream<String> testStream = createTimedTestStreamWith("message1")
                .emit("message2")
                .close()
                .timeWindowAll(Time.days(1))
                .reduce((a, b) -> a + b);

        assertStream(testStream, emptyIterableOf(String.class));
    }```

This test fails because it outputs the message "message1message2", even though the window is set to 1 day and normally it should not fire until 24 pass.

Adding some break lines seems to indicate a watermark with timestamp Long.MAX_VALUE is being send, which causes the window the fire.

Is there anyway to to alter this behavior? I'd like the have tests that don't automatically 'fire' the window at the end.

Update wiki

Wiki needs to be updated according to new naming scheme.

Consider splitting out JUnit 4 functionality into separate module and documenting

We are in the process of migrating our tests to JUnit 5. Some of the classes here are strongly tied to JUnit 4's execution model. For example DataStreamTestBase

Proposal:

  • Split out JUnit 4 specific runners into a separate module
  • Add some documentation around using library without inheriting from DataStreamTestBase

A dependency on JUnit 4 is still ok because the JUnit Platform/Jupiter pieces reside in an entirely different, non-conflicting package namespace. A quick peak into the DataStreamTestBase and it looks like some parts are mostly separate, but some documentation and guidance around alternative Runner methods would be very helpful to be able to apply the same model to JUnit 5.

Tests fail under Flink 1.1

The stable version of Flink is now at 1.1, however running tests that worked with the previous stable version of flink now result in:

java.lang.NoSuchMethodError: org.apache.flink.runtime.taskmanager.TaskManager$.startTaskManagerComponentsAndActor(Lorg/apache/flink/configuration/Configuration;Lakka/actor/ActorSystem;Ljava/lang/String;Lscala/Option;Lscala/Option;ZLjava/lang/Class;)Lakka/actor/ActorRef;
    at org.apache.flink.test.util.ForkableFlinkMiniCluster.startTaskManager(ForkableFlinkMiniCluster.scala:127)
    at org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$3.apply(FlinkMiniCluster.scala:319)
    at org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$3.apply(FlinkMiniCluster.scala:312)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.immutable.Range.foreach(Range.scala:141)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:312)
    at org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:269)
    at org.apache.flink.test.util.ForkableFlinkMiniCluster.start(ForkableFlinkMiniCluster.scala:223)
    at org.apache.flink.test.util.TestBaseUtils.startCluster(TestBaseUtils.java:146)
    at org.apache.flink.test.util.TestBaseUtils.startCluster(TestBaseUtils.java:122)
    at org.flinkspector.datastream.DataStreamTestEnvironment.createTestEnvironment(DataStreamTestEnvironment.java:72)
    at org.flinkspector.datastream.DataStreamTestBase.initialize(DataStreamTestBase.java:60)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
    at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
    at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
    at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
    at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)

java.lang.NullPointerException
    at org.flinkspector.datastream.DataStreamTestBase.executeTest(DataStreamTestBase.java:247)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
    at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:33)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
    at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
    at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)


unmaintained?

is this project unmaintained now? it's been over 18 months since the last commit.

before/after do they refer to processing time?

Hi,

I have POJOs that describe sensor values (id, timestamp, value), thus I use a custom event time extractor and key by id with windows of 1 minutes:

        DataStream<SensorValue> values =
                createTimedTestStreamWith(new SensorValue(1L, 2.0, 0L))
                        .emit(new SensorValue(1L, 12.0, 60001L), after(5, seconds))
                        .emit(new SensorValue(1L, 16.0, 60002L), after(5, seconds))
                        .emit(new SensorValue(1L, 10.0, 1L), after(2, minutes))
                        .close();

        DataStream<SensorValueAggregate> aggregates = values
                .assignTimestampsAndWatermarks(new SensorTimestampsAndWatermarks())
                .keyBy("id")
                .timeWindow(Time.minutes(1))
                .apply(new SensorValueAggregate(), new AggregateSensorValues(), new AggregateSensorValues());

do the after(5, seconds) refer to processing time? I expected to miss the last sensor value du to 1 minute windows. Does the after(5, seconds) have no effect as I have overwritten the timestamp extractor?


UPDATE

I was able to achieve the element dropping using a custom watermark assigner with punctuation, where I emitted a watermark after each element. But it would be useful to define timing behavior also on processing time.

NPE in test

Build hangs on flinkspector unit test. After some time it shows NullPointerException inside of Flinkspector code. It is not clear what causes it.

17:18:36,808 INFO org.apache.flink.runtime.minicluster.FlinkMiniCluster - Starting FlinkMiniCluster.
17:18:36,840 INFO org.apache.flink.runtime.metrics.MetricRegistryImpl - No metrics reporter configured, no metrics will be exposed/reported.
17:18:38,718 INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
17:18:38,885 INFO org.apache.flink.runtime.blob.BlobServer - Created BLOB server storage directory /tmp/blobStore-7bcc77e5-111e-4808-8fdd-6cc199bc47c3
17:18:38,928 INFO org.apache.flink.runtime.blob.BlobServer - Started BLOB server at 0.0.0.0:41743 - max concurrent requests: 50 - max backlog: 1000
17:18:39,233 INFO org.apache.flink.runtime.jobmanager.MemoryArchivist - Started memory archivist akka://flink/user/archive_1
17:18:39,255 INFO org.apache.flink.runtime.jobmanager.JobManager - Starting JobManager at akka://flink/user/jobmanager_1.
17:18:39,258 INFO org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService - Proposing leadership to contender org.apache.flink.runtime.jobmanager.JobManager@73c3f03f @ akka://flink/user/jobmanager_1
17:18:39,278 INFO org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration - Messages have a max timeout of 1000000 ms
17:18:39,328 INFO org.apache.flink.runtime.taskexecutor.TaskManagerServices - Temporary file directory '/tmp': total 9 GB, usable 4 GB (44.44% usable)
17:18:39,332 INFO org.apache.flink.runtime.jobmanager.JobManager - JobManager akka://flink/user/jobmanager_1 was granted leadership with leader session ID Some(11ce4b11-eff3-4964-9cb7-99e5a62dfd4e).
17:18:39,393 INFO org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService - Received confirmation of leadership for leader akka://flink/user/jobmanager_1 , session=11ce4b11-eff3-4964-9cb7-99e5a62dfd4e
17:18:39,486 INFO org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager - Trying to associate with JobManager leader akka://flink/user/jobmanager_1
17:18:39,610 INFO org.apache.flink.runtime.io.network.buffer.NetworkBufferPool - Allocated 144 MB for network buffer pool (number of memory segments: 4612, bytes per segment: 32768).
17:18:39,639 INFO org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager - Resource Manager associating with leading JobManager Actor[akka://flink/user/jobmanager_1#-734530174] - leader session 11ce4b11-eff3-4964-9cb7-99e5a62dfd4e
17:18:39,686 WARN org.apache.flink.runtime.query.QueryableStateUtils - Could not load Queryable State Client Proxy. Probable reason: flink-queryable-state-runtime is not in the classpath. Please put the corresponding jar from the opt to the lib folder.
17:18:39,686 WARN org.apache.flink.runtime.query.QueryableStateUtils - Could not load Queryable State Server. Probable reason: flink-queryable-state-runtime is not in the classpath. Please put the corresponding jar from the opt to the lib folder.
17:18:39,692 INFO org.apache.flink.runtime.io.network.NetworkEnvironment - Starting the network environment and its components.
17:18:39,694 INFO org.apache.flink.runtime.taskexecutor.TaskManagerServices - Limiting managed memory to 80 MB, memory will be allocated lazily.
17:18:39,713 INFO org.apache.flink.runtime.io.disk.iomanager.IOManager - I/O manager uses directory /tmp/flink-io-c0057d8c-e00b-48b4-b8e6-d04e0325babc for spill files.
17:18:39,729 INFO org.apache.flink.runtime.filecache.FileCache - User file cache uses directory /tmp/flink-dist-cache-d6c66807-0c28-450e-b9b1-5ac6c3ffb0aa
17:18:39,784 INFO org.apache.flink.runtime.filecache.FileCache - User file cache uses directory /tmp/flink-dist-cache-bf8a0fe4-1d1a-43fb-857c-9d14010da32e
17:18:39,906 INFO org.apache.flink.runtime.taskmanager.TaskManager - Starting TaskManager actor at akka://flink/user/taskmanager_1#954964096.
17:18:39,910 INFO org.apache.flink.runtime.taskmanager.TaskManager - TaskManager data connection information: 2a476f7f300008d39542925dbc4e71c0 @ localhost (dataPort=-1)
17:18:39,911 INFO org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 1 task slot(s).
17:18:39,917 INFO org.apache.flink.runtime.taskmanager.TaskManager - Memory usage stats: [HEAP: 155/348/1446 MB, NON HEAP: 29/30/-1 MB (used/committed/max)]
17:18:39,940 INFO org.apache.flink.runtime.taskmanager.TaskManager - Trying to register at JobManager akka://flink/user/jobmanager_1 (attempt 1, timeout: 500 milliseconds)
17:18:39,960 INFO org.apache.flink.runtime.instance.InstanceManager - Registered TaskManager at localhost (akka://flink/user/taskmanager_1) as 91cb3a85518a3daeec6ba199a25b1384. Current number of registered hosts is 1. Current number of alive task slots is 1.
17:18:39,969 INFO org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager - TaskManager 2a476f7f300008d39542925dbc4e71c0 has started.
17:18:39,971 INFO org.apache.flink.runtime.taskmanager.TaskManager - Successful registration at JobManager (akka://flink/user/jobmanager_1), starting network stack and library cache.
17:18:39,990 INFO org.apache.flink.runtime.taskmanager.TaskManager - Determined BLOB server address to be localhost/127.0.0.1:41743. Starting BLOB cache.
17:18:39,996 INFO org.apache.flink.runtime.blob.PermanentBlobCache - Created BLOB cache storage directory /tmp/blobStore-594aa983-44f2-4809-b20c-2d1f04654653
17:18:40,002 INFO org.apache.flink.runtime.blob.TransientBlobCache - Created BLOB cache storage directory /tmp/blobStore-665c9ef1-bf7e-45b0-a862-b0703d0b753a
17:18:41,893 INFO org.apache.flink.runtime.client.JobSubmissionClientActor - Disconnect from JobManager null.
17:18:41,910 INFO org.apache.flink.runtime.client.JobSubmissionClientActor - Received SubmitJobAndWait(JobGraph(jobId: 27f30ea34456e47491c489bb258c77b0)) but there is no connection to a JobManager yet.
17:18:41,911 INFO org.apache.flink.runtime.client.JobSubmissionClientActor - Received job Flink Streaming Job (27f30ea34456e47491c489bb258c77b0).
17:18:41,923 INFO org.apache.flink.runtime.client.JobSubmissionClientActor - Connect to JobManager Actor[akka://flink/user/jobmanager_1#-734530174].
17:18:41,930 INFO org.apache.flink.runtime.client.JobSubmissionClientActor - Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-734530174] with leader session id 11ce4b11-eff3-4964-9cb7-99e5a62dfd4e.
17:18:41,930 INFO org.apache.flink.runtime.client.JobSubmissionClientActor - Sending message to JobManager akka://flink/user/jobmanager_1 to submit job Flink Streaming Job (27f30ea34456e47491c489bb258c77b0) and wait for progress
17:18:41,943 INFO org.apache.flink.runtime.client.JobSubmissionClientActor - Upload jar files to job manager akka://flink/user/jobmanager_1.
java.lang.NullPointerException
at io.flinkspector.core.runtime.OutputHandler.processMessage(OutputHandler.java:156)
at io.flinkspector.core.runtime.OutputHandler.call(OutputHandler.java:100)
at io.flinkspector.core.runtime.OutputHandler.call(OutputHandler.java:39)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

IllegalStateException at the end of CEP test

I get the below exception at the end of my test. The test completes successfully though.
Why is that ?

java.lang.IllegalStateException: cannot enqueue after timer shutdown
at akka.actor.LightArrayRevolverScheduler.scheduleOnce(LightArrayRevolverScheduler.scala:137)
at akka.actor.Scheduler$class.scheduleOnce(Scheduler.scala:126)
at akka.actor.LightArrayRevolverScheduler.scheduleOnce(LightArrayRevolverScheduler.scala:37)
at akka.pattern.PromiseActorRef$.apply(AskSupport.scala:602)
at akka.pattern.GracefulStopSupport$class.gracefulStop(GracefulStopSupport.scala:48)
at akka.pattern.package$.gracefulStop(package.scala:41)
at akka.pattern.Patterns$.gracefulStop(Patterns.scala:229)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$4$$anonfun$apply$1.apply(FlinkMiniCluster.scala:432)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$4$$anonfun$apply$1.apply(FlinkMiniCluster.scala:432)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$4.apply(FlinkMiniCluster.scala:432)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$4.apply(FlinkMiniCluster.scala:432)
at scala.Option.map(Option.scala:146)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster.shutdown(FlinkMiniCluster.scala:431)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster.stop(FlinkMiniCluster.scala:410)
at org.apache.flink.test.util.TestBaseUtils.stopCluster(TestBaseUtils.java:208)
at io.flinkspector.core.runtime.Runner.shutdownLocalCluster(Runner.java:134)
at io.flinkspector.core.runtime.Runner.cleanUp(Runner.java:176)
at io.flinkspector.core.runtime.Runner.executeTest(Runner.java:219)
at io.flinkspector.datastream.DataStreamTestEnvironment.executeTest(DataStreamTestEnvironment.java:79)
at io.flinkspector.datastream.DataStreamTestBase.executeTest(DataStreamTestBase.java:304)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:33)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

My test

@Test
  def testWithFlinkspectorAndJavaStreams(): Unit = {
    val event1 = Event("xxx", "", "message")
    val event2 = Event("xxx", "", "web portal")
    val event3 = Event("xxx", "", "message")
    val event4 = Event("xxx", "", "message")
    val event5 = Event("xxx", "", "call")

    val input = createTimedTestStreamWith(event1).emit(event2).emit(event3).emit(event4).emit(event5).close()

    val pattern = Pattern.begin("event0").subtype(classOf[Event]).where(new EventActionCondition("message")).next("event1").where(new EventActionCondition("call"))

    val patternStream = CEP.pattern(input, pattern)
    val alertStream: DataStream[StreamAlert] = patternStream.select(new AlertPatternSelectFunction())

    val output: ExpectedRecords[StreamAlert] = ExpectedRecords.create(StreamAlert(2, "message", "call", "xxx", ""))

    assertStream(alertStream, output)
  }

And the util classes

class EventActionCondition(action: String) extends IterativeCondition[Event] {
  override def filter(value: Event, ctx: IterativeCondition.Context[Event]): Boolean = {
    value.action == action
  }
}
class AlertPatternSelectFunction extends PatternSelectFunction[Event, StreamAlert] {
  override def select(pattern: util.Map[String, util.List[Event]]): StreamAlert = {
    val event0 = pattern.get("event0").get(0)
    val event1 = pattern.get("event1").get(0)
    StreamAlert(pattern.size(), event0.action, event1.action, event0.user, event0.time)
  }
}
case class StreamAlert(nbOfEvents: Int, firstAction: String, lastAction: String, user: String, lastActionTime: String)

Flinkspector 0.9.1 references package/classes not published by Flink

Part of flinkspector 0.9.1 was adding support for Flink 1.7.x.
The use of the TestingCluster class was introduced due to the removal of Flink's TestBaseUtils.startCluster (#85 and de979a4).

However, it appears that TestingCluster and the org.apache.flink.runtime.testingUtils package are not distributed in Flink repos. They are in the test directory of flink-runtime (https://github.com/apache/flink/tree/release-1.7/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils).

This results in a runtime error in io.flinkspector.dataset.DataSetTestEnvironment.java claiming that the org.apache.flink.runtime.testingUtils package does not exist.


A pointer to the proper usage of the 0.9.1 version would be appreciated in the case that I'm wrong.

Type mismatch when asserting CEP stream with ExpectedRecords

I have the test below. The alertStream that is tested receives one element of type StreamAlert. To assert it, I'm creating an ExpectedRecords object with one element, which should match the element in the stream. However, the test fails with the following assertion error:

Expected: output ( contains all <List(StreamAlert(2,message,call,,))>) 
     but: was <[StreamAlert(2,message,call,xxx,), StreamAlert(2,message,call,xxx,)]>

Maybe I'm not using the ExpectedRecords matcher correctly.
Any help would be appreciated

My test function

@Test
  def testWithFlinkspectorAndJavaStreamsAndExpectedRecords(): Unit = {
    val event1 = Event("xxx", "", "message")
    val event2 = Event("xxx", "", "web portal")
    val event3 = Event("xxx", "", "message")
    val event4 = Event("xxx", "", "message")
    val event5 = Event("xxx", "", "call")

    val input = createTimedTestStreamWith(event1).emit(event2).emit(event3).emit(event4).emit(event5).close()

    val pattern = Pattern.begin("event0").subtype(classOf[Event]).where(new EventActionCondition("message")).next("event1").where(new EventActionCondition("call"))

    val patternStream = CEP.pattern(input, pattern)
    val alertStream: DataStream[StreamAlert] = patternStream.select(new AlertPatternSelectFunction())

    val output: ExpectedRecords[StreamAlert] = ExpectedRecords.create(StreamAlert(2, "message", "call", "", ""))

    assertStream(alertStream, output)
  }

And the utils classes

class EventActionCondition(action: String) extends IterativeCondition[Event] {
  override def filter(value: Event, ctx: IterativeCondition.Context[Event]): Boolean = {
    value.action == action
  }
}
class AlertPatternSelectFunction extends PatternSelectFunction[Event, StreamAlert] {
  override def select(pattern: util.Map[String, util.List[Event]]): StreamAlert = {
    val event0 = pattern.get("event0").get(0)
    val event1 = pattern.get("event1").get(0)
    StreamAlert(pattern.size(), event0.action, event1.action, event0.user, event0.time)
  }
}
case class StreamAlert(nbOfEvents: Int, firstAction: String, lastAction: String, user: String, lastActionTime: String)

Find a way to manipulate ProcessingTime

I have a GlobalWindow with a simple class TimedTrigger extends Trigger<Value,GlobalWindow>.
I registerProcessingTimeTimer
ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime()+timer);

But it never gets triggered ( unless I put the 'timer' value to the unrealistic value of 2 milliseconds). I have tired to increase the timeout value with no effect.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.