Giter Club home page Giter Club logo

kamon-akka's Introduction

This reporsitory has been moved.

Since March 2020 all the Kamon instrumentation and reporting modules were moved to Kamon's main repository at https://github.com/kamon-io/kamon. Please check out the main repository for the latest sources, reporting issues or start contributing. You can also stop by our Gitter Channel.

kamon-akka's People

Contributors

agarbutt avatar chamakits avatar dpsoft avatar ivantopo avatar kubukoz avatar lustefaniak avatar mladens avatar pjan avatar pjfanning avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

kamon-akka's Issues

Slow Response time when using AspectJ Weaver Agent

I have noticed that whenever i am running my application with the AspectJ Weaver jar the performance is really slow. And this is a must for kamon akka to run.
What should i do to fix the performance?

NullPointerException when instrumenting ActorSystem specifying a defaultExecutionContext

When creating our ActorSystem we are providing a defaultExecutionContext value.

Then when InstrumentNewExecutorServiceOnAkka25.around is called the dispatcherName property appears to be null and blows up the accept matcher with a NullPointerException.

stack trace
Exception in thread "main" java.lang.NullPointerException
	at java.base/java.util.regex.Matcher.getTextLength(Matcher.java:1770)
	at java.base/java.util.regex.Matcher.reset(Matcher.java:416)
	at java.base/java.util.regex.Matcher.<init>(Matcher.java:253)
	at java.base/java.util.regex.Pattern.matcher(Pattern.java:1133)
	at kamon.util.Filter$Glob.accept(Filter.scala:197)
	at kamon.util.Filter$IncludeExclude.$anonfun$includes$1(Filter.scala:140)
	at kamon.util.Filter$IncludeExclude.$anonfun$includes$1$adapted(Filter.scala:140)
	at scala.collection.LinearSeqOptimized.exists(LinearSeqOptimized.scala:95)
	at scala.collection.LinearSeqOptimized.exists$(LinearSeqOptimized.scala:92)
	at scala.collection.immutable.List.exists(List.scala:89)
	at kamon.util.Filter$IncludeExclude.includes(Filter.scala:140)
	at kamon.util.Filter$IncludeExclude.accept(Filter.scala:137)
	at kamon.instrumentation.akka.instrumentations.akka_25.InstrumentNewExecutorServiceOnAkka25$.around(DispatcherInstrumentation.scala:130)
	at akka.dispatch.DefaultExecutorServiceConfigurator$$anon$4.createExecutorService(AbstractDispatcher.scala)
	at akka.dispatch.Dispatcher$LazyExecutorServiceDelegate.executor$lzycompute(Dispatcher.scala:43)
	at akka.dispatch.Dispatcher$LazyExecutorServiceDelegate.executor(Dispatcher.scala:43)
	at akka.dispatch.ExecutorServiceDelegate.execute(ThreadPoolBuilder.scala:217)
	at akka.dispatch.ExecutorServiceDelegate.execute$(ThreadPoolBuilder.scala:217)
	at akka.dispatch.Dispatcher$LazyExecutorServiceDelegate.execute(Dispatcher.scala:42)
	at akka.dispatch.Dispatcher.registerForExecution(Dispatcher.scala:126)
	at akka.dispatch.MessageDispatcher.attach(AbstractDispatcher.scala:140)
	at akka.actor.dungeon.Dispatch.start(Dispatch.scala:116)
	at akka.actor.dungeon.Dispatch.start$(Dispatch.scala:114)
	at akka.actor.ActorCell.start(ActorCell.scala:447)
	at akka.actor.LocalActorRef.start(ActorRef.scala:347)
	at akka.actor.LocalActorRefProvider.init(ActorRefProvider.scala:674)
	at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:912)
	at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:908)
	at akka.actor.ActorSystemImpl._start(ActorSystem.scala:908)
	at akka.actor.ActorSystemImpl.start(ActorSystem.scala:930)
	at akka.actor.ActorSystem$.apply(ActorSystem.scala:258)
	at akka.actor.ActorSystem$.apply(ActorSystem.scala:302)
	at com.digitalasset.canton.Environment.<init>(Environment.scala:112)
       ...

If we don't provide a defaultExecutionContext value to the actor system then it'll choose the default so name in this block is "akka.actor.default-dispatcher".

Versions:

Component Version
kamon-bundle 2.0.5
akka 2.5.23

Pending Work not affected by actor termination.

In an Actor Group Pending Work is incremented whenever a message is enqueued in the mailbox (.../ActorMonitor.scala#L252) but is only decremented once the message is processed (.../ActorMonitor.scala#L268).
In case an Actor of the Actor Group terminates and still has messages in the mailbox/stash it does not decrement (.../ActorMonitor.scala#L272).
Therefore Pending Work is in that case constantly increasing and not reflecting the real number of messages still pending.

The same issue seems to be in Routers.

aspectJ not finding akka modules for logging

Error:

[info] Running the application with Aspectj Weaver
[NamedWeavingURLClassLoader] error can't determine superclass of missing type org.codehaus.janino.ScriptEvaluator
when weaving type ch.qos.logback.core.boolex.JaninoEventEvaluatorBase
when weaving classes
when weaving
 [Xlint:cantFindType]
[NamedWeavingURLClassLoader] error can't determine superclass of missing type org.codehaus.janino.ScriptEvaluator
when weaving type ch.qos.logback.core.boolex.JaninoEventEvaluatorBase
when weaving classes
when weaving
 [Xlint:cantFindType]
[NamedWeavingURLClassLoader] error can't determine whether missing type org.codehaus.janino.ScriptEvaluator is an instance of akka.pattern.AskableActorRef$
when weaving type ch.qos.logback.core.boolex.JaninoEventEvaluatorBase
when weaving classes
when weaving
 [Xlint:cantFindType]

build.sbt:

scalaVersion := "2.12.2"
libraryDependencies ++= Seq(
  guice,
  ws,
  ehcache,
  "com.amazonaws" % "aws-java-sdk-bom" % "1.11.307",
  "com.amazonaws" % "aws-java-sdk-s3" % "1.11.307",
  "io.kamon" %% "kamon-core" % "1.1.0",
  "io.kamon" %% "kamon-logback" % "1.0.0",
  "io.kamon" %% "kamon-akka-2.5" % "1.0.0",
  "io.kamon" %% "kamon-prometheus" % "1.0.0"
)

plugins.sbt:

addSbtPlugin("com.typesafe.play" % "sbt-plugin" % "2.6.3")

resolvers += Resolver.bintrayIvyRepo("kamon-io", "sbt-plugins")
addSbtPlugin("io.kamon" % "sbt-aspectj-runner-play-2.6" % "1.1.0")

application.conf:

kamon {
  track-unmatched-entities = yes

  util.filters {

    "akka.tracked-actor" {
      includes = ["**"]
    }

    "akka.tracked-dispatcher" {
      includes = [ "**" ]
    }
  }
}

sbt.version = 0.13.15

I notice that I can hit the Prometheus exporter just fine on 9095, and I do see mailbox metrics, but nothing about specific actors nor the dispatcher.

Thanks in advance!

Unexpected behavior with Kamon.IO and Akka Cluster Aware Routers

Akka messages can be successfully sent (and delivered) to router that configured as allow-local-routees = off when kamon used in project.
Messages that are sent to router with configuration allow-local-routees = on just disappear.
I prepared a small example that makes it easy to reproduce the problem.

Publish all artifacts from the latest release

Hello guys,

Looking at maven central I see that different modules have been published at different release numbers but not all, not always.
For instance, readme file says the latest stable is 0.6.6, while

If you take kamon-autoweave-example for instance, there is no possible version number that crosses all needed dependencies for scala 2.12

It would be risky (if possible at all) to use mixed version numbers so could you release all missing modules for a single version, 1.0.0-RC1 for instance, for the different scala versions you intend to support? (particularly interested in 2.12)

Cheers,
Oswaldo

Added support for Akka "affinity-pool-executor" dispatcher executor

Hey,
Currently, it seems that the only metrics being sent from Kamon are for dispatchers configured as either fork-join-pools or thread-pool-executors.
In my case, I have affinity-pool-executor (you can read about it here) configured and I don't get the metrics for it (while I do get the metrics from thread-pool-executor I configured).

I am using Scala 2.11.12 with Akka 2.5.13.

Let me know if I need to supply any other information

Use gauges where appropriate

There are a couple of histogram metrics being returned from this module that would be more appropriate as gauges.

  • executor.queue-size
  • executor.threads.active
  • executor.threads.total
  • akka.group.pending-messages
  • akka.group.members
  • akka.system.active-actors

For these metrics I am interested in what the value is at a point in time, not the distribution of those values

akka 2.5

It looks like kamon-akka-2.4 works with akka 2.5.0. Are there any plans to issue a specific kamon-akka-2.5 or is this unnecessary?
One proviso is that akka-http 10.0.5 is needed for akka 2.5.0. So it might be worth updating the kamon-akka-http dependency on akka-http to 10.0.5.
https://github.com/pjfanning/kamon-akka-group demonstrates a simple sample that uses akka 2.5.0 and kamon-akka-2.4 together.

missing type io.aeron.Aeron error.

[Xlint:cantFindType]
[AppClassLoader@18b4aac2] error can't determine superclass of missing type io.aeron.Aeron
when weaving type akka.remote.artery.ArteryTransport
when weaving classes
when weaving
[Xlint:cantFindType]
[AppClassLoader@18b4aac2] error can't determine whether missing type io.aeron.Aeron is an instance of akka.pattern.AskableActorRef$
when weaving type akka.remote.artery.ArteryTransport
when weaving classes
when weaving

Kamon trace-token propagation not reliable

The trace-token propagation seems to fail when using akka-streams with mapAsync/futures. Instead of the correct trace-token it will start using random, old trace-tokens. This will only happen when processing the events in the stream fast enough.

I have made a simple test program that shows the problem (complete project here).

object Main extends App with LazyLogging {
  Kamon.start()

  implicit val system: ActorSystem = ActorSystem("kamon-akka")
  implicit val materializer: ActorMaterializer = ActorMaterializer()

  val stream =
    Source(1 to 100)
      // Adding throttle will make the trace-token work again
      //.throttle(30, 1.second, 1, ThrottleMode.Shaping)
      .via(traceToken)
      .via(asyncFlow)
      .runWith(sink)

  stream.onComplete {
    case Success(Done) => logger.debug(s"Stream completed OK")
    case Failure(e) => logger.debug(s"Stream failed with exc: $e")
  }

  def traceToken = Flow[Int].map { i =>
    // Start a new trace context and set a custom trace token
    Tracer.setCurrentContext(Kamon.tracer.newContext("kamon-context", Some(s"token-$i")))
    checkTraceToken("Init", i)
  }

  def asyncFlow = Flow[Int].mapAsync(10) { i =>
    Future {
      checkTraceToken("Async", i)
    }
  }

  def sink: Sink[Int, Future[Done]] = Sink.foreach { i =>
    checkTraceToken("Sink", i)
    Tracer.currentContext.finish()
    Tracer.clearCurrentContext
  }

  def checkTraceToken(text: String, i: Int): Int = {
    // Read the trace token and compare it to the correct value
    val token = Tracer.currentContext.token
    if (token != s"token-$i")
      logger.error(s"*** $text *** '$i' doesn't match '$token' !!")
    i
  }
}

This code should not log any errors since the Int in the passing event should always match the trace-token.

This is the output I get when I run the code.

$ sbt run
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=384m; support was removed in 8.0
[warn] Executing in batch mode.
[warn]   For better performance, hit [ENTER] to switch to interactive mode, or
[warn]   consider launching sbt without any commands, or explicitly passing 'shell'
[info] Loading global plugins from /Users/jjacobsson/.sbt/0.13/plugins/project
[info] Loading global plugins from /Users/jjacobsson/.sbt/0.13/plugins
[info] Loading project definition from /Users/jjacobsson/git/testing/kamon-akka/project
[info] Set current project to kamon-akka (in build file:/Users/jjacobsson/git/testing/kamon-akka/)
[info] Compiling 1 Scala source to /Users/jjacobsson/git/testing/kamon-akka/target/scala-2.11/classes...
[info] Running Main
[error] objc[34383]: Class JavaLaunchHelper is implemented in both /Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/jre/bin/java (0x10e6314c0) and /Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/jre/lib/libinstrument.dylib (0x10e6f94e0). One of the two will be used. Which one is undefined.
[info] 12:31:12.901 [main] INFO kamon.Kamon$Instance - Initializing Kamon...
[info] 12:31:13.284 [main] INFO kamon.Kamon$Instance - Kamon-autoweave has been successfully loaded.
[info] 12:31:13.285 [main] INFO kamon.Kamon$Instance - The AspectJ load time weaving agent is now attached to the JVM (you don't need to use -javaagent).
[info] 12:31:13.286 [main] INFO kamon.Kamon$Instance - This offers extra flexibility but obviously any classes loaded before attachment will not be woven.
[info] 12:31:16.123 [kamon-akka-akka.actor.default-dispatcher-6] ERROR Main$ - *** Sink *** '2' doesn't match 'token-4' !!
[info] 12:31:16.124 [kamon-akka-akka.actor.default-dispatcher-6] ERROR Main$ - *** Sink *** '3' doesn't match 'token-5' !!
[info] 12:31:16.124 [kamon-akka-akka.actor.default-dispatcher-6] ERROR Main$ - *** Sink *** '4' doesn't match 'token-6' !!
[info] 12:31:16.124 [kamon-akka-akka.actor.default-dispatcher-6] ERROR Main$ - *** Sink *** '5' doesn't match 'token-7' !!
[info] 12:31:16.125 [kamon-akka-akka.actor.default-dispatcher-6] ERROR Main$ - *** Sink *** '6' doesn't match 'token-8' !!
[info] 12:31:16.125 [kamon-akka-akka.actor.default-dispatcher-6] ERROR Main$ - *** Sink *** '7' doesn't match 'token-9' !!
[info] 12:31:16.126 [kamon-akka-akka.actor.default-dispatcher-6] ERROR Main$ - *** Sink *** '8' doesn't match 'token-11' !!
[info] 12:31:16.127 [kamon-akka-akka.actor.default-dispatcher-6] ERROR Main$ - *** Sink *** '9' doesn't match 'token-12' !!
[info] 12:31:16.127 [kamon-akka-akka.actor.default-dispatcher-6] ERROR Main$ - *** Sink *** '10' doesn't match 'token-13' !!
[info] 12:31:16.127 [kamon-akka-akka.actor.default-dispatcher-6] ERROR Main$ - *** Sink *** '11' doesn't match 'token-14' !!
[info] 12:31:16.130 [kamon-akka-akka.actor.default-dispatcher-2] ERROR Main$ - *** Sink *** '13' doesn't match '' !!
[info] 12:31:16.131 [kamon-akka-akka.actor.default-dispatcher-2] ERROR Main$ - *** Sink *** '14' doesn't match 'token-22' !!
...
[info] 12:31:16.169 [kamon-akka-akka.actor.default-dispatcher-6] ERROR Main$ - *** Sink *** '92' doesn't match 'token-100' !!
[info] 12:31:16.169 [kamon-akka-akka.actor.default-dispatcher-6] ERROR Main$ - *** Sink *** '93' doesn't match '' !!
[info] 12:31:16.169 [kamon-akka-akka.actor.default-dispatcher-6] ERROR Main$ - *** Sink *** '94' doesn't match '' !!
[info] 12:31:16.169 [kamon-akka-akka.actor.default-dispatcher-6] ERROR Main$ - *** Sink *** '95' doesn't match '' !!
[info] 12:31:16.169 [kamon-akka-akka.actor.default-dispatcher-6] ERROR Main$ - *** Sink *** '96' doesn't match '' !!
[info] 12:31:16.169 [kamon-akka-akka.actor.default-dispatcher-6] ERROR Main$ - *** Sink *** '97' doesn't match '' !!
[info] 12:31:16.170 [kamon-akka-akka.actor.default-dispatcher-6] ERROR Main$ - *** Sink *** '98' doesn't match '' !!
[info] 12:31:16.170 [kamon-akka-akka.actor.default-dispatcher-6] ERROR Main$ - *** Sink *** '99' doesn't match '' !!
[info] 12:31:16.170 [kamon-akka-akka.actor.default-dispatcher-6] ERROR Main$ - *** Sink *** '100' doesn't match '' !!
[info] 12:31:16.171 [ForkJoinPool-1-worker-15] DEBUG Main$ - Stream completed OK

This is the dependencies that I am using. I have tried to play around with different ones as well but that doesn't seem to make any difference.

libraryDependencies ++= Seq(
  "ch.qos.logback"             % "logback-core"         % "1.1.7",
  "ch.qos.logback"             % "logback-classic"      % "1.1.7",
  "com.typesafe.scala-logging" %% "scala-logging-slf4j" % "2.1.2",
  "com.typesafe.scala-logging" %% "scala-logging"       % "3.1.0",

  "com.typesafe.akka"   %% "akka-stream"      % "2.5.8",
  "com.typesafe.akka"   %% "akka-slf4j"       % "2.5.8",
  "io.kamon"            %% "kamon-core"       % "0.6.7",
  "io.kamon"            %% "kamon-akka-2.5"   % "0.6.8",
  "io.kamon"            %% "kamon-scala"      % "0.6.7",
  "io.kamon"            %% "kamon-autoweave"  % "0.6.5"
)

and the AspectJ plugin

addSbtPlugin("io.kamon" % "sbt-aspectj-runner" % "1.0.4")

memory leak when using PinnedDispatcher

When using a PinnedDispatcher the instrumentation starts accumulating a bunch of objects on the heap. It seems like if a PinnedDispatcher is shutdown there is some part of the instrumentation that keeps a reference to it and then if the same dispatcher is used again (looked up by name) then a new set of instruments are created. If you create/shutdown often enough you will see the leak.

This is a naive piece of code that can reproduce the issue:

import akka.actor.{Actor, ActorSystem, PoisonPill, Props}
import kamon.Kamon
import kamon.kamino.KaminoReporter
import kamon.prometheus.PrometheusReporter

import scala.concurrent.Future

object Hello extends App {
  Kamon.addReporter(new PrometheusReporter())
  Kamon.addReporter(new KaminoReporter())

  val system = ActorSystem("test")

  while(true) {
    for(dispatcherID <- 1 to 5) {
      val folk = system.actorOf(Props[ActorWithDispatcher].withDispatcher(s"custom-dispatcher-${dispatcherID}"))
      folk ! "start"
    }

    Thread.sleep(2000)

  }

}

class ActorWithDispatcher extends Actor {
  override def receive: Receive = {
    case "start" =>
      implicit val ec = context.dispatcher
        for (_ <- 1 to 2000) {
          Future {
            Thread.sleep(1)
          }
        }
  }
}

With this configuration:

custom-dispatcher-1 {
  type = "PinnedDispatcher"
  executor = "thread-pool-executor"
  throughput = 100

  thread-pool-executor {

  }
}

custom-dispatcher-2 {
  type = "PinnedDispatcher"
  executor = "thread-pool-executor"
  throughput = 100

  thread-pool-executor {

  }
}

custom-dispatcher-3 {
  type = "PinnedDispatcher"
  executor = "thread-pool-executor"
  throughput = 100

  thread-pool-executor {

  }
}

custom-dispatcher-4 {
  type = "PinnedDispatcher"
  executor = "thread-pool-executor"
  throughput = 100

  thread-pool-executor {

  }
}

custom-dispatcher-5 {
  type = "PinnedDispatcher"
  executor = "thread-pool-executor"
  throughput = 100

  thread-pool-executor {

  }
}

kamon.util.filters {
  "akka.tracked-dispatcher" {
    includes = ["**"]
  }
}

kamon.metric.tick-interval = 1 second
kamon.environment.service = "thread-pool-tests"

Kamon - Akka - Java - InstrumentedEnvelope not serializable

Hi,
we have a problem while creating persistent mailbox.

While akka.kamon.instrumentation.InstrumentedEnvelope is not serializable and we can't persist EnvelopeContext so we persist just envelope - but after unpersisting envelope we have NullPointerException in (AkkaMonitor.scala(58) -> Tracer.withContext(envelopeContext.context)
because EnvelopeContext is null.

Is there a way to persist envelope with envelopeContext, or apply new context while unpersisting envelope with message?

Here is the exception:

java.lang.NullPointerException: null
 at akka.kamon.instrumentation.ActorMonitors$$anon$1.processMessage(ActorMonitor.scala:58)
 at akka.kamon.instrumentation.ActorCellInstrumentation.aroundBehaviourInvoke(ActorInstrumentation.scala:44)
 at akka.actor.ActorCell.invoke(ActorCell.scala:488)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
 at akka.dispatch.Mailbox.run(Mailbox.scala:224)
 at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

instrument AkkaForkJoinPool

I've been using kamon-akka to get metrics from an application and have found that the existing ForkJoinPool metrics don't provide much detail about the load being handled by the ForkJoinPools. The queued-task-count rarely goes above zero. I've also been outputting the queued-submission-count from the ForkJoinPool and it takes a massive amount of load to make this go above zero.
I'm wondering if it would be useful to use instrument AkkaForkJoinPool to try to get counts the execute calls.
I'm not sure how but it would be useful if the AkkaForkJoinPool metrics were tied back to the dispatcher that wraps the AkkaForkJoinPool.

Fails to deserialise remote message for akka version 2.6.1

I am getting following errors when trying to send message to the remote actor with Akka version 2.6.1

2019-12-19T15:49:34.548Z WARN :slf4j  (Slf4jLogger.scala 89) - 
Failed to deserialize message from [unknown] with serializer id [17] and manifest [d]. 
akka.protobufv3.internal.InvalidProtocolBufferException: 
Protocol message contained an invalid tag (zero).

Discarding entity bytes creates on akka http leaks memory if kamon-akka is enabled

This is on akka 2.4.18, scala 2.11

If a (short) response is discarded using HttpEntity.discardBytes, it seems that the resulting ActorGraphInterpreter actor for the discard flow keeps running, even though the flow itself has ended.

This does NOT occur under plain akka, only if the kamon-akka module is loaded.

Reproducer:

import com.typesafe.config.ConfigFactory
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
import scala.concurrent.duration._
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import java.net.ServerSocket
import scala.io.BufferedSource
import java.io.PrintStream
import akka.http.scaladsl.model.HttpRequest
import akka.stream.ActorMaterializer
import scala.concurrent.Future
import scala.concurrent.Await
import org.scalatest.WordSpec
import java.util.Date
import _root_.kamon.Kamon

class MemoryLeakSpec extends WordSpec  {
  Kamon.start()
  
  val config = ConfigFactory.parseString("""
akka {
  loglevel = "WARNING"
}
akka.http {
  client {
    idle-timeout = 2 seconds 
  }
  host-connection-pool {
    max-connections = 4
    client {
      idle-timeout = 2 seconds
    }
  }
}
""")
  implicit val system = ActorSystem("test", config)
  implicit val materializer = ActorMaterializer()
  import system.dispatcher
  val http = Http(system)
  
  "a failing server" should {

    val route =
        get {
          complete(StatusCodes.GatewayTimeout, """lqgqiflahf ;s jf;oirsagn;os f;osng;kwat4w;oitjtqe
            lqgqiflahf ;s jf;oirsagn;os f;osng;kwat4w;oitjtqe
            lqgqiflahf ;s jf;oirsagn;os f;osng;kwat4w;oitjtqe
            lqgqiflahf ;s jf;oirsagn;os f;osng;kwat4w;oitjtqe
            lqgqiflahf ;s jf;oirsagn;os f;osng;kwat4w;oitjtqe
            lqgqiflahf ;s jf;oirsagn;os f;osng;kwat4w;oitjtqe
            lqgqiflahf ;s jf;oirsagn;os f;osng;kwat4w;oitjtqe
            lqgqiflahf ;s jf;oirsagn;os f;osng;kwat4w;oitjtqe
            lqgqiflahf ;s jf;oirsagn;os f;osng;kwat4w;oitjtqe""")
        }

    val bindingFuture = Http().bindAndHandle(route, "localhost", 9999)
    Await.result(bindingFuture, 10.seconds)

    "not leak all the memory" in {
      var i = 0
      println ("GO!")
    
      def loop(): Unit = {
        while (true) {
          i += 1
          if (i % 100 == 0) println("" + new Date() + " Making request " + i)
          Await.result(
          http.singleRequest(HttpRequest(uri = "http://localhost:9999")).map { resp =>
            resp.discardEntityBytes()
            resp
          }, 60.seconds)
        }        
      }
    
      for (t <- 0 until 15) {
        new Thread {
          override def run = loop()
        }.start()
      }
    
      loop()
    }
  }
}

When running the above spec on a -Xmx64M VM, it'll slowly increase memory in the first ~30 minutes or so, after which a heap dump shows the memory usage described above.

When actors are tracked, 2 ActorMonitors are created and they create separate ActorRefs

For tracked actors, there appears to be 2 ActorRefs created for the same systemactorOf request.

ActorInstrumentation.scala has

  @After("actorCellCreation(cell, system, ref, parent)")
  def afterCreation(cell: Cell, system: ActorSystem, ref: ActorRef, parent: ActorRef): Unit = {
    cell.asInstanceOf[ActorInstrumentationAware].setActorInstrumentation(
      ActorMonitor.createActorMonitor(cell, system, ref, parent))
  }

  @After("repointableActorRefCreation(cell, system, ref, parent)")
  def afterRepointableActorRefCreation(cell: Cell, system: ActorSystem, ref: ActorRef, parent: ActorRef): Unit = {
    cell.asInstanceOf[ActorInstrumentationAware].setActorInstrumentation(
      ActorMonitor.createActorMonitor(cell, system, ref, parent))
  }

Both of these seem to create ActorMonitors.

createRegularActorMonitor cell=CellInfo(Entity(actor-group-metrics-spec/user/tracked-actor,akka-actor,Map()),false,false,true) 

java.lang.Exception: tracked-actor
	at akka.kamon.instrumentation.ActorMonitor$.createRegularActorMonitor(ActorMonitor.scala:42)
	at akka.kamon.instrumentation.ActorMonitor$.createActorMonitor(ActorMonitor.scala:30)
	at akka.kamon.instrumentation.ActorCellInstrumentation.afterRepointableActorRefCreation(ActorInstrumentation.scala:37)
	at akka.actor.UnstartedCell.<init>(RepointableActorRef.scala:195)
	at akka.actor.RepointableActorRef.initialize(RepointableActorRef.scala:75)
	at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:764)
	at akka.actor.dungeon.Children$class.makeChild(Children.scala:273)
	at akka.actor.dungeon.Children$class.attachChild(Children.scala:46)
	at akka.actor.ActorCell.attachChild(ActorCell.scala:374)
	at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:729)
	at kamon.akka.ActorGroupMetricsSpec$ActorGroupMetricsFixtures$class.createTestActor(ActorGroupMetricsSpec.scala:62)
	at kamon.akka.ActorGroupMetricsSpec$$anonfun$1$$anonfun$apply$mcV$sp$1$$anon$1.createTestActor(ActorGroupMetricsSpec.scala:34)
	at kamon.akka.ActorGroupMetricsSpec$$anonfun$1$$anonfun$apply$mcV$sp$1$$anon$1.<init>(ActorGroupMetricsSpec.scala:38)
	at kamon.akka.ActorGroupMetricsSpec$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ActorGroupMetricsSpec.scala:34)
	at kamon.akka.ActorGroupMetricsSpec$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ActorGroupMetricsSpec.scala:34)
	at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.WordSpecLike$$anon$1.apply(WordSpecLike.scala:1078)
	at org.scalatest.TestSuite$class.withFixture(TestSuite.scala:196)
	at kamon.testkit.BaseKamonSpec.withFixture(BaseKamonSpec.scala:28)
	at org.scalatest.WordSpecLike$class.invokeWithFixture$1(WordSpecLike.scala:1075)
	at org.scalatest.WordSpecLike$$anonfun$runTest$1.apply(WordSpecLike.scala:1088)
	at org.scalatest.WordSpecLike$$anonfun$runTest$1.apply(WordSpecLike.scala:1088)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
	at org.scalatest.WordSpecLike$class.runTest(WordSpecLike.scala:1088)
	at kamon.testkit.BaseKamonSpec.runTest(BaseKamonSpec.scala:28)
	at org.scalatest.WordSpecLike$$anonfun$runTests$1.apply(WordSpecLike.scala:1147)
	at org.scalatest.WordSpecLike$$anonfun$runTests$1.apply(WordSpecLike.scala:1147)
	at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
	at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
	at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:373)
	at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:410)
	at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
	at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
	at org.scalatest.WordSpecLike$class.runTests(WordSpecLike.scala:1147)
	at kamon.testkit.BaseKamonSpec.runTests(BaseKamonSpec.scala:28)
	at org.scalatest.Suite$class.run(Suite.scala:1147)
	at kamon.testkit.BaseKamonSpec.org$scalatest$WordSpecLike$$super$run(BaseKamonSpec.scala:28)
	at org.scalatest.WordSpecLike$$anonfun$run$1.apply(WordSpecLike.scala:1192)
	at org.scalatest.WordSpecLike$$anonfun$run$1.apply(WordSpecLike.scala:1192)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
	at org.scalatest.WordSpecLike$class.run(WordSpecLike.scala:1192)
	at kamon.testkit.BaseKamonSpec.org$scalatest$BeforeAndAfterAll$$super$run(BaseKamonSpec.scala:28)
	at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
	at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
	at kamon.testkit.BaseKamonSpec.run(BaseKamonSpec.scala:28)
	at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
	at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1340)
	at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1334)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1334)
	at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1011)
	at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1010)
	at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1500)
	at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010)
	at org.scalatest.tools.Runner$.main(Runner.scala:827)
	at org.scalatest.tools.Runner.main(Runner.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at scala.tools.eclipse.scalatest.launching.ScalaTestLauncher$.main(ScalaTestLauncher.scala:20)
	at scala.tools.eclipse.scalatest.launching.ScalaTestLauncher.main(ScalaTestLauncher.scala)
createRegularActorMonitor cell=CellInfo(Entity(actor-group-metrics-spec/user/tracked-actor,akka-actor,Map()),false,false,true,List(tracked-group)) 

java.lang.Exception: tracked-actor
	at akka.kamon.instrumentation.ActorMonitor$.createRegularActorMonitor(ActorMonitor.scala:42)
	at akka.kamon.instrumentation.ActorMonitor$.createActorMonitor(ActorMonitor.scala:30)
	at akka.kamon.instrumentation.ActorCellInstrumentation.afterCreation(ActorInstrumentation.scala:31)
	at akka.actor.ActorCell.<init>(ActorCell.scala:402)
	at akka.actor.RepointableActorRef.newCell(RepointableActorRef.scala:119)
	at akka.actor.RepointableActorRef.point_aroundBody0(RepointableActorRef.scala:93)
	at akka.actor.RepointableActorRef$AjcClosure1.run(RepointableActorRef.scala:1)
	at org.aspectj.runtime.reflect.JoinPointImpl.proceed(JoinPointImpl.java:149)
	at akka.kamon.instrumentation.TraceContextIntoRepointableActorRefMixin$$anonfun$afterRepointableActorRefCreation$1.apply(ActorSystemMessageInstrumentation.scala:77)
	at kamon.trace.Tracer$.withContext(TracerModule.scala:58)
	at akka.kamon.instrumentation.TraceContextIntoRepointableActorRefMixin.afterRepointableActorRefCreation(ActorSystemMessageInstrumentation.scala:76)
	at akka.actor.RepointableActorRef.point(RepointableActorRef.scala:90)
	at akka.actor.ActorCell.handleSupervise(ActorCell.scala:625)
	at akka.actor.ActorCell.supervise(ActorCell.scala:617)
	at akka.actor.ActorCell.invokeAll$1_aroundBody2(ActorCell.scala:468)
	at akka.actor.ActorCell$AjcClosure3.run(ActorCell.scala:1)
	at org.aspectj.runtime.reflect.JoinPointImpl.proceed(JoinPointImpl.java:149)
	at akka.kamon.instrumentation.ActorSystemMessageInstrumentation$$anonfun$aroundSystemMessageInvoke$1.apply(ActorSystemMessageInstrumentation.scala:34)
	at kamon.trace.Tracer$.withContext(TracerModule.scala:58)
	at akka.kamon.instrumentation.ActorSystemMessageInstrumentation.aroundSystemMessageInvoke(ActorSystemMessageInstrumentation.scala:34)
	at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:1)
	at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
	at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
	at akka.dispatch.Mailbox.run(Mailbox.scala:223)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Dispatcher reports pool size of 0

I created a minimal example using akka 2.5.6 and the latest kamon versions. While executing sbt run, the reporting works as expected, but I see pool sizes of 0, e.g.

[info] [INFO] [11/21/2017 16:34:29.652] [kamon-akka.actor.default-dispatcher-2] [akka://kamon/user/kamon-log-reporter]
[info] +-------------------------------------------------------------------------------------------------------------------------+
[info] |  Fork-Join-Pool                                                                                                         |
[info] |                                                                                                                         |
[info] |  Dispatcher: kamon/akka.actor.default-dispatcher                                                                        |
[info] |                                                                                                                         |
[info] |  Paralellism: 8                                                                                                         |
[info] |                                                                                                                         |
[info] |                 Pool Size       Active Threads     Running Threads     Queue Task Count     Queued Submission Count     |
[info] |      Min           0                 0                   0                   0                        0                 |
[info] |      Avg           0.0               0.0                 0.0                 0.0                      0.0               |
[info] |      Max           0                 0                   0                   0                        0                 |
[info] |                                                                                                                         |
[info] +-------------------------------------------------------------------------------------------------------------------------+

Does that make sense? How can a pool have size 0? Shouldn't I see at least 2 threads, like it's defined here?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.