akka router

Published: by

akka Router.

들어가기에 앞서

모 통신사 프로젝트에 있을때 batch 쪽을 맡아 일을 하고 있었다.
거기에는 통신사의 전국 대리점들의 실적과 정책들을 계산하여 손익의 결과를 도출하는 batch 작업이 있다.
평소때는 무리가 없다가 월말이 되면 정말 속터질 정도로 느렸는데, 그때 생각이 이런 날이면 수직확장이든 수평확장이든 자동 확장하여 처리대상을 평소와 같이 했으면 했었다.
(물론 spring batch의 partitioning, multi-thread로 구현 했지만 이는 또 다른 이야기 이다.)
akka를 들고 다시 그때로 간다면…

Router

akka 의 router pattern의 주된 목적은 여러 개의 routee(router가 선택할 수 있는 작업actor)들을 두어 성능 향상에 목적을 둔다.
또한 같은 message를 routee들에 동시에 발송하여 가장 빠른 응답의 경쟁 관계를 통한 것도 있다.
그리고 메시지의 상태에 따른 routee를 분기할 수 도 있다.

akka 내장 router

akka에서 제공하는 내장 router들에는 2가지가 있다.

pool router

router가 routee를 관리 한다. 관리한다는 것은 routee의 life cycle를 관리한다는 것이다.
그러므로 routee의 부모 actor는 Router이다.
따라서 구성이 간단하지만 routee가 stop된다면 다시 새로 생성하지 않는다.
resize기능을 이용하여 이를 우회 할 수 있다
상세한 routee 생명주기 제어가 필요치 않다면 이를 사용

group router

routee의 life cycle를 관리하는 actor를 만들고 이를 group actor와 통신하며
routee를 group actor에게 message를 보내어 추가,삭제 한다.
따라서 routee의 부모 actor는 router가 아니라 routee의 생성,삭제을 담당하는 actor다.

router logic

akka에서 제공하는 router 방식이 여러가지가 있다. 목록은 akka.io 에서 자세히 보고면 된다.
정리 차원에서 나름 나래비를 세워 본다.

RoundRobinRoutingLogic

  • routee에 순차적으로 message를 보낸다.
  • pool router명 : RoundRobinPool
  • group router명: RoundRobinGroup

ConsistentHashingRoutingLogic

  • 특정 hashing key에 같은 routee에게 message가 분배된다. 주로 key에 따른 message를 취합 시에 적당 한 듯
  • pool router명 : ConsistentHashingPool
  • group router명: ConsistentHashingGroup

ScatterGatherFirstCompletedRoutingLogic

  • message를 모든 routee에게 보내고 최초로 응답한 routee의 응답만을 원래 송신자에게 전달 한다. 이는 scatterGather의 경쟁 pattern부분에 적합하다.
  • pool router명 : ScatterGatherFirstCompletedPool
  • group router명: ScatterGatherFirstCompletedGroup

SmallestMailboxRoutingLogic

  • routee 중에 mailbox에 가장 적은 message가 있는 routee에게 message를 전달
  • pool router명 : SmallestMailboxPool
  • group router명: 없음

이외에도 여러가지가 있으니 akka.io를 찾아보자.

Pool router

우선 local 그러니깐 수직 확장을 해보고, 그 다음 remote원격지에 routee를 가지는 수평확장으로 만들어 보자. router의 생성은 설정에 의한 방법과 code에서 하는 방법이 있다.

local router

설정 file생성

아래는 local-pool.conf로 만든 설정 file

akka {
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  loglevel = "DEBUG"
  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
  actor.deployment {
    /localRouter {
      #router 정책  
      router = round-robin-pool
      #routee 갯수
      nr-of-instances = 2
    }
  }
}

Routee 만들기

별다른 것 이 없다.
Message를 받으면 sender Actor에게 응답을 보낸다.
여기서 sender란 routee입장이 아닌 router 입장에서의 sender이다.

class MyRouteeActor extends Actor with ActorLogging {

  def receive = {
    case MyMessage(value) =>
      log.debug(s"############## $self $value")
      sender() ! MyReplyMessage(value)
  }
}

Router 만들기

testKit를 통해 test를 하며 router 생성 방법을 본자.

class LocalPoolTest extends TestKit(ActorSystem("localPool",ConfigFactory.load("local-pool"))) 
  with WordSpecLike with MustMatchers with ImplicitSender with StopSystemAfterAll {
  
  "router by pool in local" must {
    
    import com.sslee.pool.routemessges._
    
    "using router by config" in {
      
      //router 생성 
      val router =
      	system.actorOf(FromConfig.props(Props[MyRouteeActor]),"localRouter")
      
      //message 를 보내보기
      router ! MyMessage("hellow routee")
      
      //검증 
      expectMsg(MyReplyMessage("hellow routee"))
      
      //Broadcase를 통해 전체 메시지 보내기
      router ! Broadcast(MyMessage("hellow routee"))
      
      val rs = receiveWhile() {
        case MyReplyMessage(msg) => msg
      }
      
      //검증
      rs.size must be(2)
      
    }
    
    "using router by code" in {
      
      //router 생성 
      val router = 
      	system.actorOf(RoundRobinPool(5).props(Props[MyRouteeActor]),
        	"localRouter2")
      
      //message 를 보내보기 
      router ! MyMessage("hellow routee")
      
      //검증 
      expectMsg(MyReplyMessage("hellow routee"))
      
      //Broadcast를 통해 전체에게 메시지 보내기 
      router ! Broadcast(MyMessage("hellow routee"))
      
      val rs = receiveWhile() {
        case MyReplyMessage(msg) => msg
      }
      
      //검증
      rs.size must be(5)
    }
  }
}

ActorSystem은 비데몬성으로 실행 되므로 test를 끝난 경우에도 process가 중지 되지 않는다.
따라서 test가 끝나면 hock메소드에 system를 중지하게 한 따로 만든 class이다.
ImplicitSender를 통해 sender가 암시적으로 testActor가 되도록 했다.
첫번째는 설정정보를 통한 router 생성을 이용한 것이다.
아래 router생성부분을 보면 FromConfig.props에 해당 Routee 생성의 Props를 감싼다.
이게 끝이다.
두번째는 설정 file정보를 읽지 않고 code로 구성시 아래 예제에서는 BalancingPool(routee갯수).prop로 Routee 생성의 Props를 감싸면 된다.

참고로 보낼 message에 Broadcast를 감싸서 보내면 모든 routee에 보내지게 된다.
여기서 모든 routee들은 위의 설정에 의하여 첫번째는 2개 두번재는 5가 되겠다.

만약 routee가 어떤한 이유로 stop이 된다면 routee는 router가 다시 생성 해주지 않는다.
또한 모든 routee가 모두 중지 된다면 router 또한 stop 된다.

remote pool router

원격 router를 test하는데 sbt MultiJvmPlugin으로 test는 행으나 원격지에 있는 routee를 가져 오지 못했다. 그래서 여기서는 App로 test했다.
추후 cluster로 구성하면 쉽게 될거라 생각한다. 우선 cluster부분을 생각하지 말고 해보자.
이 부분을 하려면 우선 sbt에 설정을 조금 변경 해야 한다.
이를 하지 않으면 아래와 같은 에러가 발생 한다.
error message

[error] java.lang.ClassNotFoundException: scala.Int
[error] 	at sbt.internal.inc.classpath.ClasspathFilter.loadClass(ClassLoaders.scala:74)
[error] 	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
[error] 	at java.lang.Class.forName0(Native Method)
[error] 	at java.lang.Class.forName(Class.java:348)
[error] 	at akka.actor.ReflectiveDynamicAccess.$anonfun$getClassFor$1(ReflectiveDynamicAccess.scala:22)
[error] 	at scala.util.Try$.apply(Try.scala:209)
[error] 	at akka.actor.ReflectiveDynamicAccess.getClassFor(ReflectiveDynamicAccess.scala:21)
[error] 	at akka.serialization.Serialization.$anonfun$bindings$3(Serialization.scala:360)
[error] 	at scala.collection.TraversableLike$WithFilter.$anonfun$map$2(TraversableLike.scala:739)

따라서 아래 같은 설정을 해야 한다. 그러면 sbt위에 실행하는 jvm과 별도의 jvm에서 실행하게 된다.

fork in run := true

frontend node(JVM1)과 backend node(JVM2)을 나누어 test를 할 것이다.
응답 흐름은 다음과 같다. frontend에 있는 senderActor -> router -> backend에 있는 routee 들 -> frontend의 senderActor로 응답

routee로 사용될 actor는 위에 local에서 사용한 actor를 그대로 사용 이 actor 는 원격지에 사용 될 것이다.

SenderActor 생성

frontend 에 있는 senderActor는 그냥 응답만 확인한다. 다음과 같다.

class SenderActor(router: ActorRef) extends Actor with ActorLogging {
  def receive = {
    case MyMessage(msg) => 
      router ! MyMessage(msg)
    case MyReplyMessage(msg) =>
      log.debug(s"############## $msg reply message from routee")
  }
}

설정 file생성

설정 file은 backend , frontend에서 각각 사용할 설정 file을 만든다.

remote-pool-frontend.conf

akka {
  loglevel = DEBUG 
  stdout-loglevel = DEBUG
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  #loggers = ["akka.event.Logging$DefaultLogger"]

  #원격지 RemoteActorRefProvider를 통해 actorRef를 사용   있다.
  actor {
    provider = "akka.remote.RemoteActorRefProvider"

    #routee가 배포될 원격지 router의 명칭(path) /user/remoteRouter이다.
    deployment {
      /remoteRouter {
        remote = "akka.tcp://backend@0.0.0.0:2551"
      }
    }
  }

  #remoting 설정으로 TCP통신으로 전송을 활성화 하고 frontnode의 host와 port설정
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "0.0.0.0"
      port = 2552
    }
  }
}
#원격지 routee의 Address에 사용되어질 주소
routee.address = "akka.tcp://backend@0.0.0.0:2551"

remote-pool-backend.conf 설정

akka {
  loglevel = DEBUG
  stdout-loglevel = DEBUG
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  #loggers = ["akka.event.Logging$DefaultLogger"]

  #원격지 RemoteActorRefProvider를 통해 actorRef를 사용   있다.
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }

  #remoting 설정으로 TCP통신으로 전송을 활성화 하고 backend의 host와 port설정
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "0.0.0.0"
      port = 2551
    }
  }
}

Backend node App boot

backend actorSystem 을 실행할 App boot file를 만든다.

backend에서는 actor를 직접적으로 만들지 않으므로 actorSystem 를 load 하는 것 밖에는 없다.

object RemotePoolBackendBoot extends App {
  val system = 
  	ActorSystem("backend", ConfigFactory.load("remote-pool-backend"))
}

Frontend node App boot

frontend actorSystem 을 실행할 App boot file을 만든다.

여기서는 routee에서 응답을 받을 senderActor와 route를 만든다.
senderActor가 route를 통해 message를 전송하면 route정책이 RoundRobin이므로 backend node 에 있는 routee 5개중 한개의 요청에 응답을 하고 이를 다시 frontend node에 있는 senderActor에 응답을 보낸다.
그 후 요청을 보낼 message를 Broadcast로 감싸서 보내 원격지에 있는 모든 routee에게 message가 전달 되게 한다. 물론 senderActor에게로 5개의 응답을 받을 것 이다.

object RemotePoolFrontendBoot extends App {

  import com.sslee.pool.routemessges._

  //frontend ActorSystem 를 생성 
  val system = 
  	ActorSystem("frontend", ConfigFactory.load("remote-pool-frontend"))

  //routee의 message을 요청하고 응답을 받을 senderActor
  val senderActor = system.actorOf(Props[SenderActor],"senderActor")

  //원격지에 배포될 routee의 주소들 
  val addresses = Seq(
     AddressFromURIString(system.settings.config.getString("routee.address")) 
  )

  //router생성 
  val router = system.actorOf(RemoteRouterConfig(RoundRobinPool(5),addresses)
      .props(MyRouteeActor.props(senderActor)),"myRouter")

  //원격지에 있는  routee중 1개의 routee가 요청을 받아 처리 
  router ! MyMessage("hellow Routee!")
  println("send message to all routees")
  
  //모든 routee 가 요청을 처리 하고 응답을 ActorSender에게 보낸다.
  router ! Broadcast(MyMessage("hellow Routee!"))
}

console 실행

2개를 을 실행해서 각각에서 sbt 로 하나는 frontend 쪽을 run, 다른 한쪽은 backend쪽을 run한다.
아래의 그림은 실행 과 결과를 보여 준다.
backend jvm에서 실행
bakendjvm1

frontend jvm에서 실행
bakendjvm1

backend의 routee중 1개의 routee가 요청을 처리하고 응답하는 결과
bakendjvm1

frontend의 actorSender가 routee응답을 받은 결과
bakendjvm1

group router

group router는 pool router 처럼 routee의 관리를 router 하는 것이 아니라고 했다.
따라서 routee를 관리하는 actor를 직접 만들어야 한다.
이때 좀 귀찮은 것들을 생각 해야 한다 가령 routee 가 중지시 routee의 부모 actor는 router가 아니므로 router는 routee가 중지 된 사실을 모른다. 이후에 routee에 대한 resize도 이야기 할 것인데 routee 에 대한 size up, down시 pool router 는 설정으로 가능 하지만, group router는 resize후 router 와 message를 주고 받으며 routee에 대한 정보를 router에 갱신 시켜야 한다.

local group router

아래 예제를 진행 해보며 살펴 보자.

routee 관리자 Actor

routee를 관리할 Actor 를 생성하자.
actor 생성시 life cycle에 따른 hock method중 preStart(이는 actor생성시 호출됨)에 routee를 만들어 놓는다.
그리고 routee 들을 감시한다. 이렇게 만들어 놓으면 router가 routee의 routee path를 통해 이 routee들을 사용 한다.

RouteeCreateActor는 다음과 같다.

class RouteeCreateActor(nrActors: Int) extends Actor with ActorLogging {

  override def preStart() {

    log.debug(s"######RouteeCreateActor call preStarting... and create routees")
	
    super.preStart()

    (0 until nrActors).map { n => 
      val routee = context.actorOf(Props[MyRouteeActor],s"myRoutee-$n")
      context watch routee
    }
  }

  def receive = {
    //routee가 stop 되었을 경우 
    case Terminated(actorRef) => {
      log.debug(s"##### recieve message Terminated routee $actorRef")
      val actor = context.actorOf(Props[MyRouteeActor],actorRef.path.name)
      context.watch(actor)
    }
  }
}

위의 코드를 보면 preStart method에 super.preStart()를 호출하는 것이 좋다.
Actor에 preStart(): Unit = () 이렇게 되어 있지만, preRestart등은 모든 자식 actor을 감시에서 해지하고 중지하는 부분이 들어 있다. 따라서 life cycle 관련 hock등을 override시 받드시 super의 hock를 호출 하는 것이 좋다. 이부분에 대해서는 다음에 post하기로…
어찌 되었든 preStart 에서 roter가 사용할 reoutee들을 생성하고 감시 대상자로 등록 한다.
receive method에서는 routee 가 종료 되었을때 응답을 받는 부분으로 종료된 routee의 actor path로 새로운 routee를 생성해서 감시 대상자로 등록 한다.

설정file 생성

local-group.conf file

akka {
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  #loggers = ["akka.event.Logging$DefaultLogger"]
  loglevel = "DEBUG"
  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
  
  actor.deployment {
    /localRouter {
      router = round-robin-group
      routees.paths = [
        "/user/createActor/myRoutee-0",
        "/user/createActor/myRoutee-1"
      ]
    }
  }
}

router의 이름 path는 /user/localRouter 이다.
router 정책은 roundrobin 이고, routees의 path를 직접 입력 했다.

testKit 만들기

routee를 관리할 createActor를 생성하고 , createActor가 생성한 routee들을 사용할 router를 설정 정보로 통해 생성 한다.

class GroupTest extends TestKit(ActorSystem("GroupRouterSystem",ConfigFactory.load("local-group"))) 
  with WordSpecLike with MustMatchers with ImplicitSender with StopSystemAfterAll {

  "Group local router" must {
    "router with creater " in {

      import com.sslee.group._
      import routemessages._

      val createActor = system.actorOf(Props(new RouteeCreateActor(3)),"createActor")

      /* code로 router를 생성시 
      val paths = List(
        "/user/createActor/myRoutee-0",
        "/user/createActor/myRoutee-1"
      )
      
      val router = system.actorOf(RoundRobinGroup(paths).props(),"groupRouter")
      */

      val router = system.actorOf(FromConfig.props,"localRouter")

      router ! Broadcast(PoisonPill)
      Thread.sleep(1000L)
      router ! MyMessage("hellow routee!")

      expectMsg(MyReplyMessage("hellow routee!"))

      router ! Broadcast(PoisonPill)
      Thread.sleep(1000L)
      router ! Broadcast(MyMessage("hellow routee!"))

      val rs = receiveWhile() { case MyReplyMessage(msg) => msg }
      rs.size must be(2)
    }
  }

}

remote group router

시나리오는 위의 원격 pool-router하고 같다. frontend의 senderActor가 router를 통해 원격지(backend node)에 있는 routee에 message를 보내고 routee는 요청을 처리하고 다시 senderActor에게 응답을 하는 구조 이다.
설정 file들만 추가하고 router 생성시 remote를 통한다는 부분만 수정하면 된다.

frontend node 설정 file

remote-group-frontend.conf

akka {
  loglevel = DEBUG 
  stdout-loglevel = DEBUG
  #loggers = ["akka.event.slf4j.Slf4jLogger"]
  loggers = ["akka.event.Logging$DefaultLogger"]

  actor {
    provider = "akka.remote.RemoteActorRefProvider"
    
    #routee가 배포될 원격지 
    deployment {
      /remoteRouter {
        router = round-robin-group
        routees.paths = [
          "akka.tcp://backend@0.0.0.0:2551/user/createActor/myRoutee-0",
          "akka.tcp://backend@0.0.0.0:2551/user/createActor/myRoutee-1"]
      }
    }
  }

  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "0.0.0.0"
      port = 2552
    }
  }
}

위의 설명한 것에 새로운 것이 없다. 다만 pool방식에서 없었던 routee.paths가 있다. 이를 통해 원격지(backend)에서 createActor가 생성한 routee들을 frontend에서 사용 한다.

backend node 설정

akka {
  loglevel = DEBUG
  stdout-loglevel = DEBUG
  #loggers = ["akka.event.slf4j.Slf4jLogger"]
  loggers = ["akka.event.Logging$DefaultLogger"]

  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }

  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "0.0.0.0"
      port = 2551
    }
  }
}

별로 특별한 것이 없다.

Backend node App boot생성

object RemoteGroupBackendBoot extends App {

  val system = 
  	ActorSystem("backend",ConfigFactory.load("remote-group-backend"))
  val createActor = system.actorOf(
  	Props(new RouteeCreateActor(2)),"createActor")
}

backend node에서는 backend에서 실행될 ActorSystem 를 load하고 routee를 관리할 createActor를 생성하여 routee을 준비 시킨다.

Frontend node App boot생성

object RemoteGroupFrontendBoot extends App {

  val config = ConfigFactory.load("remote-group-frontend")
  val system = ActorSystem("frontend",config)

  val router = system.actorOf(FromConfig.props,"remoteRouter")
  val senderActorRef = 
  	system.actorOf(SenderActor.props(router),SenderActor.name)

  senderActorRef ! MyMessage("hellow remote routee!")
}

senderActor가 원격지의 createActor가 만든 routee를 사용하는 router를 통해 message를 보내고 응답을 받는다.

실행과 결과는 따로 올리지 않겠다.

route post 부분이 길어져서 router의 resize와 aggregator역할을 하는 routee일때, 그리고 message 상태에른 routee분기를 다음 post에 이어서 posting하 겠다.