Giter Club home page Giter Club logo

akkaagentsexample's Introduction

Crash into akka

Small documentation with my first steps into the akka framework that's made possible use the Actor Model paradigm in the Scala Language

Options, options and more options

I need to made a multi-threading application in wich thoose threads have to send messages between them. I went trough multiple options like do it all my self using plain sockets and Threads, but the cost of maintenance of a development like that could be very high. After that I thought about a RPC aproach and I was almost decided, but i found that the akka framework has a great compatibility with the project Apache Kafka, wich i use as a message broker in that application, so i finally decided to use akka as a long term bet of using it with Apache Kafka Streams into a Future.

With this selection I'm aware that the cost of a framework like akka is higher than the cost of a RPC framework like Fineagle(Looks like a cool project made by Twitter).

What is akka?

So if you check the akka project webpage, you can see a really fancy definition of the framework, but to keep it simple, it's just a framework that made really easy for developers to develop distributed applications using the Actor model

Time to get dirty

I think that the best way to understand what is akka and how it works it doing a simple example, so let's develop and application in which we have 2 kinds of Actors, Producers and Consumers. The idea is that, the Producers will send a message through a socket that will be listening a Consumer Actor. Once the Consumer Actor recieves the data, it will print the message.

So with this example, we are going to learn how to send message to actor using akka and how to write and read from sockets,

         Producer    Consumer
            * --------> *

Let's start building the producer

The producer Actor that we are building it's an easy one. The Actor will do the following steps:

  1. Actor check if can stablish a connection to a socket
  2. Actor sends data trough the socket
  3. Actor close the connection
  4. Actor destroys itself

For this puspose, we are going to use Scala companion objects.

First of all we have to create the following Boilerplate in a file that I'm gonna name SocketProducer.scala

import java.net.InetSocketAddress

import akka.actor.{Actor, ActorLogging, Props}
import akka.io.{IO, Tcp}
import akka.util.ByteString

import scala.collection.mutable

object SocketProducer {
  //Needed to create an Actor with a given configuration
  def props(host: String, port: Int, messages: mutable.MutableList[String]): Props =
    Props(new SocketProducer(host,port,messages))

}

class SocketProducer(host: String, port: Int, messages: mutable.MutableList[String]) extends Actor with ActorLogging{

  import SocketProducer._

  // function that allows us to do some logic before the Actor is up
  override def preStart(): Unit = {
    log.info("Starting socket PRODUCER actor with following config {}:{}",host,port)
  }

  // function that allows us to do some logic once the Actor is down
  override def postStop(): Unit = {
    log.info("Stopped socket PRODUCER actor")
  }

}

Now we need to connect the Actor to the socket, for that purppose we have to communicate our Actor with the IO Actor that manages the communication with the Socket and we also have to implement the receive method of our Actor, because the IO Actor will send use message with information about the socket.

import java.net.InetSocketAddress

import akka.actor.{Actor, ActorLogging, Props}
import akka.io.{IO, Tcp}
import akka.util.ByteString

import scala.collection.mutable

object SocketProducer {

  //Needed to create an Actor with a given configuration
  def props(host: String, port: Int, messages: mutable.MutableList[String]): Props =
    Props(new SocketProducer(host,port,messages))

}

class SocketProducer(host: String, port: Int, messages: mutable.MutableList[String]) extends Actor with ActorLogging{

  import SocketProducer._

  import akka.io.Tcp._
  import context.system

  // Actor that manages the low level communication with the socket
  // this Actor can send messages to out SocketProducer Actor, because
  // a reference to the SocketProducer Actor is implicit passed
  // when we invoke IO(Tcp)
  IO(Tcp) ! Connect(new InetSocketAddress(host,port))

  // function that allows us to do some logic before the Actor is up
  override def preStart(): Unit = {
    log.info("Starting socket PRODUCER actor with following config {}:{}",host,port)
  }

  // function that allows us to do some logic once the Actor is down
  override def postStop(): Unit = {
    log.info("Stopped socket PRODUCER actor")
  }

  override def receive = {
    case CommandFailed(_: Connect) =>
      context stop self

    //Needed to establish a connection
    case c @ Connected(remote, local) =>

      //opens connection
      val connection = sender()
      //establish connection
      connection ! Register(self)
      //sends messages
      messages.foreach(message => {
        log.info("Trying to send message: "+message)
        connection ! Write(ByteString(message))
      })
      //close connection
      connection ! Close

      //this is some black magic that will be explained later
      //but the idea it's that it manages the socket status
      context become {
        case data: ByteString =>
          connection ! Write(data)
        case CommandFailed(w: Write) =>
          // O/S buffer was full
          log.warning("write failed")
        case Received(data) =>
        case "close" =>
          connection ! Close
        case _: ConnectionClosed =>
          log.warning("connection closed")
          //kills the Actor
          context stop self
      }

    //if the Actor receives a message that it doesn't understand, it sends a warning through the logger
    case x @ _ => log.warning("Something else is up. ---> " + x.toString)

  }
}

Ok, we have the Actor, but we need to test if it's working as we tought. First of all, we have to launch the Actor from somewhere in our code. In this case I'm gonna invoke the Actor from a Main scala object.

import actors.SocketProducer
import akka.actor.ActorSystem


import scala.collection.mutable

object Main {

  def main(args: Array[String]): Unit = {

    //Creates and Actor system in wich will reside/live our actor
    val actorSystem = ActorSystem.create("MyActorSystem")

    //data that we want to send with our actor
    val list = mutable.MutableList("1","2","Three","0100").map(_+"\n")
    //I append a \n to each element in the list for visibility

    //creates and runs the actor
    val actor = actorSystem.actorOf(SocketProducer.props("localhost",9000,list))


  }

}

That code is enough to launch our akka Actor, but we need a somebody listen in localhost:9000 to check if everything is alright. Before creating the consumer Actor we can check it using the netcat terminal utiity/program in a terminal as follows:

  nc -lk 9000

So if we launch our scala code once netcat is running, netcat should print the following output:

1
2
Three
0100

Everything is working! nice!

Building the Consumer

import java.net.InetSocketAddress

import akka.actor.{Actor, ActorLogging, Props}
import akka.io.{IO, Tcp}
import akka.util.ByteString

import scala.collection.mutable

object SocketConsumer {

  //Needed to create an Actor with a given configuration
  def props(port: Int): Props = Props(new SocketConsumer(port))

}

class SocketConsumer(port: Int) extends Actor with ActorLogging{

  import SocketConsumer._

  import akka.io.Tcp._
  import context.system

  // Actor that manages the low level communication with the socket
  // this Actor can send messages to out SocketProducer Actor, because
  // a reference to the SocketProducer Actor is implicit passed
  // when we invoke IO(Tcp)
  IO(Tcp) ! Bind(self, new InetSocketAddress("localhost", port))

  // function that allows us to do some logic before the Actor is up
  override def preStart(): Unit = {
    log.info("Starting socket CONSUMER actor in port {}",port)
  }

  // function that allows us to do some logic once the Actor is down
  override def postStop(): Unit = {
    log.info("Stopped socket CONSUMER actor")
  }

  override def receive = {

    case b @ Bound(localAddress) =>
      log.info("CONSUMER bound to: "+b)

    case CommandFailed(_: Bind) => context stop self

    case c @ Connected(remote, local) =>
      val connection = sender()
      connection ! Register(self)

    case Received(message) => log.info("Received: \n"+ message.decodeString("UTF8"))
    case PeerClosed     => context stop self

    //if the Actor receives a message that it doesn't understand, it sends a warning through the logger
    case x @ _ => log.warning("Something else is up. ---> " + x.toString)

  }
}

Now we have to add the consumer to our Main scala object:

import actors.{SocketConsumer, SocketProducer}
import akka.actor.ActorSystem

import scala.collection.mutable

object Main {

  def main(args: Array[String]): Unit = {

    //Creates and Actor system in wich will reside/live our actor
    val actorSystem = ActorSystem.create("MyActorSystem")

    //data that we want to send with our actor
    val list = mutable.MutableList("1","2","Three","0100").map(_+"\n")

    //creates and runs the actor
    val actorConsumer = actorSystem.actorOf(SocketConsumer.props(9000))
    val actorProducer = actorSystem.actorOf(SocketProducer.props("localhost",9000,list))

  }

}

Finally if whe launch the program, the output should be:

[INFO] [09/05/2017 13:31:06.639] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/$a] Starting socket CONSUMER actor in port 9000
[INFO] [09/05/2017 13:31:06.639] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/$b] Starting socket PRODUCER actor with following config localhost:9000
[INFO] [09/05/2017 13:31:06.652] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/$a] CONSUMER bound to: Bound(/127.0.0.1:9000)
[INFO] [09/05/2017 13:31:06.657] [MyActorSystem-akka.actor.default-dispatcher-7] [akka://MyActorSystem/user/$b] Trying to send message: 1

[INFO] [09/05/2017 13:31:06.666] [MyActorSystem-akka.actor.default-dispatcher-7] [akka://MyActorSystem/user/$b] Trying to send message: 2

[INFO] [09/05/2017 13:31:06.666] [MyActorSystem-akka.actor.default-dispatcher-7] [akka://MyActorSystem/user/$b] Trying to send message: Three

[INFO] [09/05/2017 13:31:06.666] [MyActorSystem-akka.actor.default-dispatcher-7] [akka://MyActorSystem/user/$b] Trying to send message: 0100

[INFO] [09/05/2017 13:31:06.669] [MyActorSystem-akka.actor.default-dispatcher-7] [akka://MyActorSystem/user/$a] Received:
1
2
Three
0100

[INFO] [09/05/2017 13:31:06.672] [MyActorSystem-akka.actor.default-dispatcher-7] [akka://MyActorSystem/user/$a] Stopped socket CONSUMER actor
[WARN] [09/05/2017 13:31:06.673] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/$b] connection closed
[INFO] [09/05/2017 13:31:06.673] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/$b] Stopped socket PRODUCER actor

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.