akka router 03

Published: by

akka Router.

들어가기 앞서

저번 post에 router에 대하여 pool, group기반의 생성 및 특징 test를 해보았다. 그리고 pool기반과 group기반의 router의 resize 를 알아 보았다.
이번에는 routee가 aggregator 특징을 가진 경우, 또 message의 상태에 따라 routee를 분기 해야 하는 겨우를 살펴 보자

ConsistentHashing router

router는 message를 routee에 보내 처리 한다. 이때 어떠한 key에 따라 message를 합쳐야 할 경우 기존 router로는 문제가 있다. 어떠한 message가 어떤 routee로 갈지 모르기 때문이다.
따라서 여러 message중 어떠한 key에 따라 같은 routee 가 처리 할 수 있게 해야 하는데 이를 해결하기 위해 ConsistentHashing router가 등장 한다.

ConsistenHashing router의 Routee선택 과정

ConsistentHashing router는 hashing key에 따라 같은 routee 로 가게 한다.
아래는 ConsistentHahsing router의 routee를 선택하는 과정이다.

message -> message key -> hashing code -> n개의 가상노드 -> routee

따라서 같은 routee에게 보내려면 같은 key를 가지게 하면 된다.
또한 routee에 대응되는 가상노드가 n가 있다.

그럼 우선 같은 key를 같게 하는 방법 3가지가 있는데 이를 알아 보자.

Partial Function을 이용

부분함수를 만들어 hashing key로 사용할 정보를 반환 하면 된다. ConsistenHasingPool의 소스를 보면 다음과 같다.

final case class ConsistentHashingPool(
  val nrOfInstances:               Int,
  override val resizer:            Option[Resizer]                               = None,
  val virtualNodesFactor:          Int                                           = 0,
  val hashMapping:                 ConsistentHashingRouter.ConsistentHashMapping = ConsistentHashingRouter.emptyConsistentHashMapping,
  override val supervisorStrategy: SupervisorStrategy                            = Pool.defaultSupervisorStrategy,
  override val routerDispatcher:   String                                        = Dispatchers.DefaultDispatcherId,
  override val usePoolDispatcher:  Boolean                                       = false)
  extends Pool with PoolOverrideUnsetConfig[ConsistentHashingPool]

이를 보면 routee에 대한 가상node 갯수인 virtualNodesFactor 인자와 hashMapping 항목을 볼 수 있다. resizer 인자도 있다.

여하튼 hashingMapping부분인hasingMapping:ConsistentHashingRouter.ConsistentHashMapping 이다
이를 보면 다음과 같은 Partial function이다.

type ConsistentHashMapping = PartialFunction[Any, Any]

자 이제 코딩 해보자

class HashMappingTest extends TestKit(ActorSystem("HashMappingSystem")) 
  with WordSpecLike with MustMatchers with ImplicitSender with StopSystemAfterAll {
  
  "consistent router" must {
    "use partial function ConsistentHashMapping" in {
      
      val router = system.actorOf(
        ConsistentHashingPool(5,
            virtualNodesFactor = 3,
            hashMapping = {case msg: MyMessage[String] => msg.id}).props
            (Props(new Aggregator(3 seconds))),
            "localRouter"
      )
      
      router ! MyMessageImpl("1",Seq("msg01"))
      router ! MyMessageImpl("1",Seq("msg02"))
      
      expectMsg(MyMessageImpl("1", Seq("msg01","msg02")))
      
    }
  }
}

위의 Test 소스처럼 hashingMapping 부분에 key를 기준으로 할 Partial function 를 주면 된다.

전송 Message자체를 ConsistenHashable 유형으로 만들어 이용

두번째 방법은 Routee에 전송할 Message 유형을 ConsistentHashable 유형으로 보내는 것이다.
그러면 따로 router생성시 따로 hasingMapping부분을 선언 하지 않아도 된다.

class HashMappingTest extends TestKit(ActorSystem("HashMappingSystem")) 
  with WordSpecLike with MustMatchers with 
  	ImplicitSender with StopSystemAfterAll {
  "consistent router" must {
	"use ConsistentHashable " in {
      
      case class MyMessageHashingKey[String](id: String, values: Seq[String]) 
        extends MyMessage[String] with ConsistentHashable {
        override def consistentHashKey: Any = id
      }
      
      val router = system.actorOf(
        ConsistentHashingPool(5,virtualNodesFactor = 3).props(
          Props(new Aggregator(3 seconds))),"localRouter2")
        
      router ! MyMessageHashingKey[String]("1",Vector("msg01"))
      router ! MyMessageHashingKey[String]("1",Vector("msg02"))
      
      expectMsg(MyMessageImpl("1",Vector("msg01","msg02")))
    }
  }
}

ConsistentHashable의 추상 method인 consistentHashkey에 key로 적용할 field를 넣으면 되겠다.

router 호출시 지정

이는 아에 router에게 message전송시 지정하는 방법이다. 따라서 key를 직접 넣어 호출 한다.
따라서 호출자가 ConsistentHshingRouter라는 것을 알아야 하고 어떤 key를 사용해야 할지 알아야 하므로 coupling이 증가 한다.
코드는 다음과 같이

val router = system.actorOf(
  ConsistentHashingPool(5,virtualNodesFactor = 3).props(
    Props(new Aggregator(3 seconds))),"localRouter3")

  router ! ConsistentHashableEnvelope( 
    message =  MyMessageImpl("01",Vector("msg01")), hashKey = "01")

  router ! ConsistentHashableEnvelope(
    message = MyMessageImpl("01", Vector("msg02")), hashKey = "01")

Message상태에 따른 Routee 분기

이는 message의 상태 결과에 따라 routee를 분기 해야 할때다
기존의 router들은 작업의 처리리 부부은 하지 않고 route기능만을 했다. OOP 원칙의 단일책임 원칙에 맞는 일이니깐
그런데 Message의 처리결과의 상태에 따른 routee분기라면 처리부분과 route기능을 하게 하는 것도 나쁘지 않다. 그럼 router를 구현 해야 하나? 그러지 않다 분기에 따른 router여럿을 두고 Actor를 만들어서 만든 일반 Actor가 일을 처리하고 결과를 보고 판단하여 분기 대상에 맞는 router에게 message를 전송 하면 된다. 이때 상태 Actor pattern를 이용하면 된다. 아시겠지만 becode/unbecome를 이용

StateActor 만들기

class StateRouteActor(rightActorRef: ActorRef, leftActorRef: ActorRef) 
	extends Actor with ActorLogging {

  def receive = receiveLeft

  def receiveRight: Receive = {

    case RouteStateRight => 
      log.debug("already state right")

    case RouteStateLeft =>
      log.debug("change status RouteStateOn to RouteStateOff")
      context.become(receiveLeft)

    case MyMessage(msg) =>
      rightActorRef ! msg
  }

  def receiveLeft: Receive = {

    case RouteStateRight =>
      log.debug("change status RouteStateOff to RouteStateOn")
      context.become(receiveRight)

    case RouteStateLeft =>
      log.debug("already state left")

    case MyMessage(msg) => 
      leftActorRef ! msg
  }
}

위의 예제에서는 분기할 router를 2개의 router를 생성자 parameter로 받는다.
그리고 context.become를 이용하여 actor의 상태를 변경하여 분기 처리 한다.

test 하기

class StateRouterTest extends TestKit(ActorSystem("StateRouterSystem"))
  with WordSpecLike with MustMatchers
  	with ImplicitSender with StopSystemAfterAll {

  "StateRouter" must {
    "state router seperate Task work and Router" in {

      val rightProbe = TestProbe()
      val leftProbe = TestProbe()

      val router = 
      	system.actorOf(Props(
        	new StateRouteActor(rightProbe.ref, leftProbe.ref)))

      router ! MyMessage("is LeftMessage")
      leftProbe expectMsg "is LeftMessage"
      rightProbe.expectNoMessage(2 seconds)

      router ! RouteStateRight
      router ! MyMessage("is RightMessage")
      rightProbe expectMsg "is RightMessage"
      rightProbe.expectNoMessage(2 seconds)
    }
  }
}

route post1,2,3 으로 route 관련 post를 마친다.