akka Stream 네번째

Published: by

akka stream.

Buffer

Broadcast에서는 여러 출력포트 들에 대한 소비자들이 있다. 이 소비자들 중 어느 한 쪽에서 배압(back pressure)이 들어 오는 경우 이를 적용한다. 이는 가장 느린 소비자가 stream을 소비하는 속도에 따라 broadcast의 속도가 제한 된다.
따라서 속도가 제한 되지 않게 하기 위하여 노드사이에 버퍼를 두어 이를 해결할 수 있다. 버퍼는 자체 공간을 다 소비할 때까지 배압의 영향이 graph앞의 전달을 막아 준다.

Broadcast 에서의 back-pressure

Broadcast의 stage에 처리 성능은 여러 출력포트의 소비자들 중 가장 성능이 느린 쪽에 맞추어 진다.
이유는 처리 속도가 느린 쪽으로 부터 back-pressure가 들어 오기 때문이다.
따라서 처리 성능이 빠른쪽이 느려지지 않게 하기 위해서는 느른쪽에 buffer를 두면 된다. 하지만 여기서 주의 할 점이 있다. 버퍼를 두어도 결과는 같기 때문이다. 이유는 같은 stage에 있는 것들은 동기적이기 때문에 비동기 스테이지를 추가해야한다. 이것은 Akka가 두 개의 별개의 액터를 사용하여 두 단계를 실행하도록 명령하는 방법이다.

buffer not async boundary

아래의 코드는 2개의 소비자의 처리 속도가 다른 경우 전체가 느려지는 예이다.

//비싼처리 작업이 있는 Sink
val slowSink = Sink.foreach[Int] { i =>
  Thread.sleep(3000L)
  println(s"slow Sink $i")
}

//buffer를 두어 차이를 없앤다.
val buffer = Flow[Int].buffer(10, OverflowStrategy.dropHead)

//처리성능이 무지 빠른 Sink
val fastSink = Sink.foreach[Int](i => println(s"fast Sinke $i"))

val sink = Sink.fromGraph(
  GraphDSL.create(){ implicit builder =>
    import GraphDSL.Implicits._

    val bcast = builder.add(Broadcast[Int](2))

    //bcast ~> buffer ~> slowSink
    bcast ~> slowSink
    bcast ~> fastSink

    SinkShape(bcast.in)
  }
)

val result = Source(1 to 5).to(sink).run()

functor 위의 코드를 실행 해보면 3초마다 slowSink와 fastSink가 처리 됨을 알 수 있다.
분명 buffer를 slowSink에 두었는 데도 말이다. 이는 stage가 빠른 sink나 느린 sink 나 같은 boundary stage가 묶여 있기 때문이다. 따라서 느린쪽에 boundary를 분리 하면 결과가 다르게 나온다.

buffer async boundary

functor
위의 그림은 akka-io에 있는 async에 대한 이미지 이다.
아래코드에서 처럼 async를 두어 boundary를 기존 stage에서 분리 시킨다.

//비싼처리 작업이 있는 Sink
val slowSink = Sink.foreach[Int] { i =>
  Thread.sleep(3000L)
  println(s"slow Sink $i")
}

//buffer를 두어 차이를 없앤다. 이때 async를 두어 
//(buffer ~> slowSink)부분의 boundary를 기존 statage에서 따로 분리 시킨다.
val buffer = Flow[Int].buffer(10, OverflowStrategy.dropHead).async

//처리성능이 무지 빠른 Sink
val fastSink = Sink.foreach[Int](i => println(s"fast Sinke $i"))

val sink = Sink.fromGraph(
  GraphDSL.create(){ implicit builder =>
    import GraphDSL.Implicits._

    val bcast = builder.add(Broadcast[Int](2))

    //bcast ~> buffer ~> slowSink
    bcast ~> slowSink
    bcast ~> fastSink

    SinkShape(bcast.in)
  }
)

val result = Source(1 to 5).to(sink).run()

functor 위의 결과를 보면 fastSink 쪽은 먼저처리가 끝남을 볼 수 있다.

buffer limit async boundary

앞전에서의 코드는 기본 buffer size가 16으로 block size 8KB 가 설정이 되어있어 실행시 fastSink가 실행시 slowSink쪽에서의 back-pressure를 가하지 않았다.
아래는 buffer flow의 buffer 크기를 1로 했을때 어떤 결과가 나오는지를 보여 준다.

//비싼처리 작업이 있는 Sink
val slowSink = Sink.foreach[Int] { i =>
  Thread.sleep(3000L)
  println(s"slow Sink $i")
}

//buffer를 두어 차이를 없앤다. 이때 async를 두어 
//(buffer ~> slowSink)부분의 boundary를 기존 statage에서 따로 분리 시킨다.
//하지만 buffer size를 1로 주므로써 배압(back-pressure)이 가해지게 된다.
val buffer = Flow[Int]
   .buffer(10, OverflowStrategy.dropHead)
   .async
   .addAttributes(Attributes.inputBuffer(initial = 1, max = 1))

//처리성능이 무지 빠른 Sink
val fastSink = Sink.foreach[Int](i => println(s"fast Sinke $i"))

val sink = Sink.fromGraph(
  GraphDSL.create(){ implicit builder =>
    import GraphDSL.Implicits._

    val bcast = builder.add(Broadcast[Int](2))

    //bcast ~> buffer ~> slowSink
    bcast ~> slowSink
    bcast ~> fastSink

    SinkShape(bcast.in)
  }
)

val result = Source(1 to 5).to(sink).run()

functor

이거말고도 akka-stream에 대해 더 많은 내용이 있다 시간이 되는데로 좀더 살펴봐야 겠다.