akka Stream 세번째

Published: by

akka stream.

GraphDSL

GraphDSL은 하나의 입출력로 부터 여러 입력, 출력들과의 분기, 조합등을 통한 여러 유형의 일종의 stream 다이어그램이 표현할 수 있는 것들을 쉽게 구성할 수 있게 지원해준다.

BroadcastGraph

하나의 입출력이 아닌 하나의 입력 포트가 있고 이를 여러게의 출력 포트로 broadcase하는 Graph가 있다. Shape는 Graph를 입력 포트, 출력 포트의 갯수에 따른 모양을 정의한다. 이때 입력포트를 Inlet(인렛), 출력포트를 Outlet(아웃렛)이라 한다.
그리고 Source, Sink, Flow은 모두 모양(Shape)가 정의된 Graph이다. 즉 아래와 같다.

Flow[-In, +Out, +Mat] == Graph[FlowShape[In,Out],Mat]
Source[+Out,+Mat] = Graph[SourceShape[Out],Mat]
Sink[-Int,+Mat] = Graph[SinkShape[Int], Mat]

BroadcastGraph는 GraphDSL.builder를 통해 얻을 수 있으며, GraphDSL.builder는 add method를 통해 Shape를 반환하여 Graph의 입력을 표현하게 된다.
아래의 이미지는 Broadcast를 이미지화 한 것이다. 1개의 Inlet를 가지고 여러개의 Outlet을 만들어 낸다. stream01-mergeGraph 이제 예제 코드를 보자 이는 Akka in action 에 나온 예제 이다.
event를 Inlet으로 받아 bracast를 통행 Event으로 Outlet한 후 Event의 상태에 따라 여러게의 Flow를 거처 상태에 따른 분기 file로 Json byteString format log를 남기는 예제 이다.

implicit val system = ActorSystem()
implicit val ec = system.dispatcher
implicit val mat = ActorMaterializer()
  
//Shape는 Graph의 입출력 갯수를 정의 한다.
type FlowLike = Graph[FlowShape[Event, ByteString], NotUsed]

def processState(logId: String): FlowLike = {
  val jsFlow: Flow[Event, ByteString, NotUsed] = 
    Flow[Event].map(e => ByteString(e.toJson.compactPrint))

  //fromGraph(FlowShape)
  Flow.fromGraph(
    GraphDSL.create(){ implicit builder => //GraphDSL.builder
      import GraphDSL.Implicits._

      val bcast: UniformFanOutShape[Event, Event] =
        builder.add(Broadcast[Event](5)) // Broadcase 열린 출력 5개

      val js: FlowShape[Event, ByteString] = builder.add(jsFlow)

      val ok = Flow[Event].filter(e => e.state == Ok)
      val warning = Flow[Event].filter(e => e.state == Warning)
      val error = Flow[Event].filter(e => e.state == Error)
      val critical = Flow[Event].filter(e => e.state == Critical)

      bcast ~> js.in
      bcast ~> ok ~> jsFlow ~> logFileSink(logId,Ok)
      bcast ~> warning ~> jsFlow ~> logFileSink(logId, Warning)
      bcast ~> error ~> jsFlow ~> logFileSink(logId, Error)
      bcast ~> critical ~> jsFlow ~> logFileSink(logId, Critical)
      
      //broadcast의 Inlet과 Flow.out 으로부터 Outlet를 조합한 FlowShape를 반환 한다.
      //Flow.fromGraph 이므로  결과로 1개의  입력포트와 1개의 출력포트를 가지는 Flow를 
      //만들어야 한다. 따라서 FlowShape를 이용하여, broadcase의 Inlet을 입력포트로, 다른 
      //FlowShape로 출력포트를 만든다.
      val result:FlowShape[Event, ByteString] = FlowShape(bcast.in, js.out)
      result
    })
}

val src = 
  FileIO.fromPath(Paths.get("/Users/sslee/temp/stream-example01.json"))
    .via(JsonFraming.objectScanner(1024 * 1024))
    .map(b => b.decodeString("UTF-8"))
    .map(s => s.parseJson.convertTo[Event])

def logFileSink(logId: String,state: State): Sink[ByteString,
  Future[IOResult]] = 
    FileIO.toPath(logFile(logId,state), Set(CREATE,WRITE,APPEND))

def logFile(logId: String,state: State): Path = 
  Paths.get(s"/Users/sslee/temp/$logId-${State.norm(state)}")
    

def logFileSink: Sink[ByteString, Future[IOResult]] = 
  FileIO.toPath(Paths.get("/Users/sslee/work/temp/bidiFlowJson.log"))

val resultFn:Future[IOResult] = 
  src.via(processState("10")).toMat(logFileSink)(Keep.right).run()
resultFn.onComplete {
  case Success(count) =>
    println(count)
  case Failure(e) => 
    println(e)
}

MergeGraph

BroadcastGraph와 반대로 여려 Inlet을 병합하여 하나의 Outlet으로 병합한다.
아래의 그림은 MergeGraph를 나타 낸다.
stream01-mergeGraph 위에 Broadcast예제에서 실행시 생성되는 ok, warning, error, critical 상태에따라 만들어진 file들 중 ok를 제외한 나머지를 GraphDSL의 builder를 통해 Soure를 만든 예이다.

def mergeNotOk(logId: String): Source[ByteString, NotUsed] = {
    
  val warning = logFileSource(logId, Warning)
  val error = logFileSource(logId, Error)
  val critical = logFileSource(logId, Critical)

  //
  Source.fromGraph(
    GraphDSL.create(){ implicit builder =>
      import GraphDSL.Implicits._

      val warningShape: SourceShape[ByteString] = builder.add(warning)
      val errorShape: SourceShape[ByteString] = builder.add(error)
      val criticalShape: SourceShape[ByteString] = builder.add(critical)

      //3개의 Inlet을 가지는 Merge를 만든다.
      val merge: UniformFanInShape[ByteString, ByteString] =
        builder.add(Merge[ByteString](3))

      warningShape ~> merge
      errorShape ~> merge
      criticalShape ~> merge

      //Source.fromGraph 이므로 SourceShape를 이용하여 
      //Merge의 outlet으로 출력포트 1개를 가지는 SourceShape를 만든다.
      SourceShape(merge.out)
  })
}

def logFileSource(logId: String, state: State): 
  Source[ByteString, Future[IOResult]] = 
  FileIO.fromPath(
   Paths.get("/Users/sslee/temp", s"$logId-${State.norm(state)}"))
  .via(JsonFraming.objectScanner(1024 * 1024))

그외의 GraphStage

'akka-io'에 보면 더많은 미리 정의된 GraphStage 들이 있다. 여기에 GraphStage와 사용법이 있으니 봐야 겠다.