akka router 02

Published: by

akka Router.

들어가기 앞서

저번 post에 router에 대하여 pool, group기반의 생성 및 특징 test를 해보았다.
이번 post에는 router의 resize에 대해서 알아 보자

Resize

일을 처리할 task가 처리할 data의 량에 따라 자동적으로 늘어나고 줄어 들면 일정한 처리 속도를 유지 할 수 있다.
resizer의 판단은 router가 routee에 message를 할당하기 전에 resize를 판단한다.
pool기반의 resize는 resizer라는 설정으로 routee의 size를 조절 할 수 있다. akka router의 resizer를 살펴 보자.

Pool에서의 resize

설정 file

...생략
actor.deployment {
/localRouter {
  router = round-robin-pool
  nr-of-instances = 3

  #resize의 판단은 routee에 message를 할당하기 전에 판단한다.
  #resize는 동기적으로 실행 되지 않는다.
  resizer {

    #resizer 기능을 킨다.
    enabled = on

    #routee의 최소 갯수
    lower-bound = 1 

    #routee의 최대 갯수       
    upper-bound = 100 

    #resiz의 발생 기준
    #모든 actor의 mailbox에 message가 1 있을 경우
    #0 일경우 mailbox에 message 갯수와 상관 없이 모든 routee가 메시지를 처리중  경우
    pressure-threshold = 1 

    #size up 변동 크기 갯수 
    #현재 routee의 갯수 * 0.65  = result의 소숫점 첫째자리 올림
    rampup-rate = 0.65

    #언제 routee의 갯수를 줄일 것인가?
    #message를 처리하는 routee의 비율이 0.3 미만일때 resize down
    #현재 전체 routee 갯수 * 0.3  대한 미만  경우
    backoff-threshold = 0.3

    #size down 변동 크기 갯수
    #rampup-rate  개산방식은 동일 하다. 
    backoff-rate = 0.1

    #크기 조정  다음 재조정까지 최소  개의 메시지를 받아야 하는지 설정
    #이는 message가 도착할 때마다 크기가 꼐속 늘거나 줄어드는 것을 방지 하기 위함
    messages-per-resize = 1
  }
}

위의 설정을 보면 최초 routee는 3개이다.
이는 처음에 routee 3개이다 처음 router가 message를 1개 받아서 routee에 주기 전에 resize를 판단한다. backoff-threadhold를 보면 값은 0.3 * 3 = 0.9 올림한다고 했으니 1 이 것의 의미는 현재 message를 처리하고 있는 routee수의 비율이 1개 미만이면 bacloff-rate 비율만큼 줄이겠다는 의미로 현재는 1개미만 이다. 따라서 1번째 message를 줄때 routee가 2개 된다.

그리고 나머지 코드를 보고 resize 속성에 따른 결과를 보며 결과를 분석 해보자.

Routee Actor

class ResizeRouteeActor extends Actor with ActorLogging {
  def receive = {
    case MyMessage(value) =>
      //현재 처리하는 routee를 보기 위해 
      log.debug(s"##### $self $value")
	  
      //기간을 주어 mailbox에 mail을 쌓이게 한다.
	  Thread.sleep(3000L)
      sender ! MyReplyMessage(value)
  }
}

아래와 같이 (여기서는 그냥 App로 )test할 App file를 만들고 결과를 보자

실행 및 결과 분석

object ResizerApp extends App {
  
  import com.sslee.pool.routemessges._
  
  val system = ActorSystem("resizeActorSystem", ConfigFactory.load("local-pool-resizer"))
  
  val router =
  	system.actorOf(FromConfig.props(Props[ResizeRouteeActor]),"localRouter")
  val senderActor = system.actorOf(Props(new SenderActor(router)))
  
  //모든 routeee mailbox 에 message가 들어 가도록 한다.
  senderActor ! MyMessage(s"message 0")
  senderActor ! MyMessage(s"message 1")
  senderActor ! MyMessage(s"message 2")
  
  //messages-per-resize = 1 로 설정 했으므로 
  // 이번 message들 때는 판단을 유보 한다. 
  // 이번 message들은 모두 2개의  routee mail box로 간다.,
  // 그 후 부터는 message들은 다시 resize 판단의 기준다.
  (0 to 8) foreach { i =>
    senderActor ! MyMessage(s"overload message $i")
  }
  
  //resize는 동기적으로 일어나지 않기 때문에  reoutee를 추가하기전에 routee가 먼저 일을 끝낼 수 있다.
  //따라서 시간을 두어 모든 message가 현재 2개의 routee mail box로 모두 들어가지 않게 한다.
  //설정에 router가 routee message를 주기 전에 
  //현재 모든 routee들의 mail box가 message1개 있을 경우에 size up이 발생한다. 
  Thread.sleep(1000L)
  (0 to 8) foreach { i =>
    senderActor ! MyMessage(s"overload message $i$i")
  }
  
  println("send end")
  
}

결과는 아래와 같이 알아보기 쉽게 편집 했음

$b#1295478387] message 1
$a#264400209]  message 0
$d#2046181299] overload message 22
$e#435088530] overload message 33
$a#264400209] message 2
$b#1295478387] overload message 0
$d#2046181299] overload message 66

...중략 

시작하자 마자 backoff-threadhold 때문에 3개에서 2로 시작됨을 알수 있다. 왜냐면 roundrobin 방색이므로 routee가 3개로 시작했다면 message0,1,2 를 a,b,c 3개가 되어야 하는데 위에 결과를 보면 2개의 routee가 모든 메시지를 처리 함을 볼 수 있다.
또한 overload message $i$i 는 (messages-per-resize = 1)의 설정에 따라 overload message $i 가 지난 후 resize이 되어 rampup-rate = 0.65 즉 2 * 0.65 = 1.3 => 2(올림) 이되어 $d#2046181299] overload message 22 와 $e#435088530] overload message 33 을 볼 수 있다.

group router resize

group는 router가 routee를 관리하는 것이 아닌 관리 Actor가 따로 있다 했다.
따라서 routee의 resize를 routee 관리 Actor가 하고 그에 따른 routee를 router에 알려야 한다.
설정 file은 기존에 했던 group router와 같다.

routee 관리 Actor

이 곳에서 해야 할 것들이 있다.
우선 reoutee 를 생성하고, routee의 resize요청이 오면 routee를 생성,중지를 해하고 이를 router에게 알려야 한다. 또한 routee가 중지시 router가 중지를 요청 한 것인지 판단하여 새로 생성 할지 말지를 판단 해야 한다.

우선 만들기에 앞서 다음과 같은 Message유형에 대하여 알아야 한다. 이를 통해 router와 message를 주고 받으며 위에 기술한 일들을 할 수 있기 때문이다.

GetRoutees

이 Message 유형은 akka.routing.GetRoutees 이며 이 message를 router에게 전송하면 router가 routee 에게 이 message를 전송하는 것이 아니라 직접 처리 한다. 처리의 내용은 자신의 알고 있는 routee들의 정보를 akka.routing.Routees 형태의 Message로 응답한다.

AddRoutee(routee: Routee)

이 Message 유형은 akka.routing.AddRoutee 이며 이 message또한 router에게 전송하면 router가 routee에게 이 message전송하지 않고 직접 처리 한다.
이는 router에 routee를 추가 한다.

RemoveRoutee(routee: Routee)

이 Message 유형은 akka.routing.AddRoutee 이며 이 message또한 router에게 전송하면 router가 routee에게 이 message전송하지 않고 직접 처리 한다.
이는 router에 routee를 삭제 한다.

Routee 구현체

위의 2개의 Message Addroutee, RemoveMessage를 보면 인자로 Routee를 받는다.
따라서 이 구현체를 만들어야 한다. 다행히 ActorRef와 ActorSelection를 통해 얻을 수 있다.

ActorRefRoutee(ref: ActorRef)

이를 만들어 AddRoutee를 전송하면 Router가 routee의 부모가 된다. 따라서 routee관리 Actor가 부모가 되지 않기 때문에 무제가 될 수 있다.

ActorSelectionReoutee(selection: ActorSelection)

이는 actorRef가 아닌 ActorSelection을 인자로 받는다.
이는 ActorRefRoutee와 다르게 routee의 부모가 routee관리 Actor가 된다.

이제 routee 관리 Actor를 만들어 보자

class ResizeCreateActor(nrActors: Int,props: Props, router: ActorRef) 
	extends Actor with ActorLogging  {

  //routee 갯수
  var nrRoutees = nrActors
  var nrChildInstance = 0


  override def preStart() {
    super.preStart()
    (0 until nrRoutees).map(nr => createRoutee())
  }

  //routee를 생성 한다.
  def createRoutee() {

    nrChildInstance += 1
    val child = context.actorOf(props, s"myRoutee-$nrChildInstance")


    //routee 생성 
    //router 에게 알리고 router에 생성된 routee를 추가 하라고 이야기 한다.
    val selection = context.actorSelection(child.path)
	router ! AddRoutee(ActorSelectionRoutee(selection))
    context.watch(child)
  }

  def receive = {

    //resize 요구가 왔을 경우
    case Resizing(size) => {
      //줄이라는 요구시
      if(size < nrRoutees) {
        context.children.take(nrRoutees - size).foreach {routee => 
          val selection = context.actorSelection(routee.path)
          //router에게 삭제된 routee를 알린다.
          router ! RemoveRoutee(ActorSelectionRoutee(selection))
        }

        //router 에게 현재 routee가 가지고 있는 routee목록을 요청하여 
        //creator 가 가지고 있는 routee을 Terminated 시킨다. 
        //이는  GetRoutess의 응답으로 Routees를 받아 처리 한다.
        router ! GetRoutees
      }
      //else if(size == nrRoutees) 
      	log.debug("request resize size equals now state size")
      else {
        (nrRoutees until size).map(n => createRoutee())
      }

      nrRoutees = size
    }

    //router에게 GetRoutees를 요청하면 응답 받는 경우 이다
    case routees: Routees => {
      import collection.JavaConverters._
      //응답유형은 Routees 의 하위 구현체 인데, 
      //위에서 ActorSelectionRoutee를 사용하기 로 했으므로 
      //ActorSelectionRoutee가 case가 실행 될 것 이다.
      var activeRoutees = routees.getRoutees.asScala.map {
        case actor: ActorSelectionRoutee => actor.selection.pathString
      }

      /*
       아래는 다음 2가지 일을 한다.
       1.resize를 통한 router에서 제거된 routee를 creator에서도 제거
       2.routee가 어떠한 이유로 stop된경우 router가 고의로 중지 한 것이 아니면 
         다시 생성 하고 router에 생성을 알린다
       */
      for(child <- context.children) {
        val idx = activeRoutees.indexOf(child.path.toStringWithoutAddress)
        if(idx >= 0)
          //creator 에서 새로 생성할 routee 목록에서 제거 
          activeRoutees.remove(idx) 
        else 
          // router에 없는 routee는 creator에서 제거
          child ! PoisonPill
      }
      activeRoutees.foreach { terminated => 
        val name  = terminated.substring(terminated.lastIndexOf("/")+1)
        val newChild = context.actorOf(props,name)
        context.watch(newChild)
      }
    }

    //creator의 child(routee)가 중지 신호가 왔을 경우 router가
    //고의로 중지한 것인지 알아본다. 
    case Terminated(child) =>
      router ! GetRoutees
  }
}

receive의 partial function을 보면 줄여야 된다고 판단 된 경우 관리 Actor의 자식 Actor들 여기서는 routee들이 될 것이다. 줄일 대상을 Router에게 RemoveRouee를 통해 전달 한다.
그후 GetRoutees 요청 하고 응답으로 Routees를 받아 case routees: Routees부분이 실행 된다.
여기서 자신의 자식 routee을 Router 가 들고 있는 목록과 비교하여 Router에 없다면 자신에게서 제거 한다. 이는 아까 resize 요청에서 줄이라고 판단된 결과에 대한 응답이다.
또한 routee가 중지되어 Terminated를 받는 경우 router에는 있고 관리 Actor에게는 없는 routee 의 path를 찾아 새로 생성하고 감독한다. 이 경우는 router의 고의적 중지가 아닌 routee가 중지된 경우가 해당 한다.

test 헤보기

class GroupResizeTest extends TestKit(ActorSystem("GroupResizeSystem"))
    with WordSpecLike with MustMatchers 
    	with ImplicitSender with StopSystemAfterAll {

  "resize by group in local system" must {
    "resize test" in {


      //path는 넣지 않는다. routeeCreatActor 가 routee를 만들 것 이므로 
      //따로 여기서 path를 넣으면 GetRoutees에 대한  응답으로 같은 routee 가 
      //여러번 담겨져 온다.
      val router =
      	system.actorOf(RoundRobinGroup(List.empty[String]).props,
      	"groupRouter")
      
      val resizeCreator = 
      	system.actorOf(Props(new ResizeCreateActor(2,
        	Props(new MyRouteeActor()),router)),"resizeCreateActor")

      // routee creator가 routee를 생성할 시간을 준다.
      Thread.sleep(1000L)

      router ! MyMessage("hellow routee!")
      expectMsg(MyReplyMessage("hellow routee!"))

      resizeCreator ! Resizing(4)
      Thread.sleep(1000L)
      router ! GetRoutees

      val rs = receiveWhile() {
        case routees: Routees =>
          routees.getRoutees.size()
      }

      rs.head must be(4)

      resizeCreator ! Resizing(3)
      Thread.sleep(1000L)
      router ! GetRoutees

      val rs2 = receiveWhile() {
        case routees: Routees =>
          routees.getRoutees.size()
      }

      rs2.head must be(3)
    }
  }

}

위의 실행시 주의점은 router 생성시 routee에 대한 adress정보를 넘기면 안된다. 그러면 GetRoutees 응답으로 Routees 에 같은 routee에 대한 정보가 중복으로 들어 가게 된다.
뭐 위의 Testkit에 대한 설명은 따로 안해도 될 듯 하니 생략 한다.
다음 post에는 routee가 aggregator 특징을 가진 경우, 또 message의 상태에 따라 routee를 분기 해야 하는 겨우를 살펴 보자