akka Stream 두번째

Published: by

akka Stream.

akka-http 에서의 akka-stream

akk-http 는 akka-stream을 이용한다.
이는 pub-scribe 의 수요,공급에 대한 비동기 메시지 반응으로 reactive stream을 사용할 수 있다는 것이다. 매우 큰 요청이나 응답일지라도 OOM 걱정없이 지속적인 전달이 가능 하다.
앞전 post에서 이야기한 akka-stream을 그대로 이용하고 여기서 중요한 것은 route에서 directive 지시자 안에서 어떻게 Source를 얻고 Sink로 내보내는지를 보면 된다.
akka-http 에는 String이나 ByteString등에 기본 Marshaller가 있다. 또는 spray-json을 이용한 사용자 정의 Marshaller를 적용할 수 도 있다.
akka.http.scaladsl.model.HttpEntity 로부터 dataBytes를 이용하여 Source[ByteString,_]으로 Source를 얻어 기존에 akka-stream으로 했던 stream component를 이용하는 방법과, (구현하지 않아도 되지만) HttpEntity로 부터 custom Unmarshaller를 통해 Source[Event,_]형태로 source를 바로 얻을 수 도 있다.
akka-http에서의 akka-stream부분의 사용에 초점을 두는 것 이므로 akka-http는 추후 akka-http post에서 보자.

Http를 통한 Stream받기

post로 응답 처리

akka-http에 관한 부분은 생략하고 route부분의 Directive에서 어떻게 stream을 얻는지를 보자

def postRoute = pathPrefix("logs" / Segment) { logId =>
  pathEndOrSingleSlash { 
    post {
      entity(as[HttpEntity]) { entity =>
        onComplete(
          entity.dataBytes // Source[ByteString,_]
            .via(flow)
            .toMat(getSinkLogFile(logId))(Keep.right)
            .run()
        ) {
          case Success(IOResult(count, Success(Done))) =>
            complete((StatusCodes.OK, LogReceipt(logId, count)))
          case Success(IOResult(count, Failure(e))) =>
            complete((
              StatusCodes.BadRequest,
              ParseError(logId, e.getMessage)
            ))
          case Failure(e) =>
            complete((
              StatusCodes.BadRequest,
              ParseError(logId, e.getMessage)
            ))
        }
      }
    }
  }
}

위의 코드는 HttpEntity로 부턴 Source[ByteString,_ ] 를 얻는 예이다.
이 후 flow는 앞전에 보았던 BidiFlow의 조합으로 stream을 구성한다.

val inFlow: Flow[ByteString, Event, NotUsed] = JsonFraming.objectScanner(maxLine)
  .map(b => b.decodeString("UTF-8"))
  .map(s => s.parseJson.convertTo[Event])

val inFlowLinear: Flow[ByteString, Event, NotUsed] =   
  Framing.delimiter(ByteString("\n"), maxLine)
    .map(b => b.decodeString("UTF-8"))
    .map(s => s.parseJson.convertTo[Event])

val outFlow: Flow[Event, ByteString, NotUsed] = 
  Flow[Event].map(e => ByteString(e.toJson.compactPrint))

val bidiFlow: BidiFlow[ByteString, Event, Event, ByteString, NotUsed] =   
  BidiFlow.fromFlows(inFlow, outFlow)

// 흐름 turning 형은 Event -> Event
val flow: Flow[ByteString, ByteString, NotUsed] = bidiFlow.join(Flow[Event])

def getSinkLogFile(id: String): Sink[ByteString, Future[IOResult]] = 
FileIO.toPath(logFile(id), Set(CREATE,WRITE,APPEND))

일단 client로 부터의 Source 추출만 얻는다면 이후 는 다른 것이 없다.
위의 예제는 client로 부터 넘어온 ByteString 을 Event로 변환 후 다시 ByteString으로 file에 기록 하는 것이다.

응답 내보내기

응답은 server의 file을 읽어 client에 json으로 응답하는 부분이다.
여기서도 기존의 stream Source[ByteString,Futre[IOResult ] ]는 기존과 같다. 이를 HttpEntity에게 넘기면 된다.

def getRoute = pathPrefix("logs" / Segment) { logId =>
  pathEndOrSingleSlash {
    get {
      if(Files.exists(logFile(logId))) {
        val src: Source[ByteString, Future[IOResult]] = logFileSource(logId)
        complete(
          HttpEntity(ContentTypes.`application/json`,src)    
        )
      }
      else {
        complete(StatusCodes.NotFound)
      }
    }
  }
}

cumstom marshaller를 이용하기

서버에서 응답 처리

위의 서버에서 응답처리의 예제에서 entity(as[HttpEntity ]) { entity => 를 보면 entity를 이용하여 Source[ByteString ]를 얻은 후 이후 stream를 이용했다.
하지만 as[Source[Event,_ ] ]를 하여 바로 원하는 type으로 변환된 stream를 얻을 수 있다.
as 는 암시자로 Unmarshaller[T ] 이 필요하다. 따러서 Unmarshaller를 만들어야 한다.

def createEventUnMarshaller(maxLine: Int, maxJsonObject: Int)  = {
  new Unmarshaller[HttpEntity, Source[Event, _]]  {
    def apply(entity: HttpEntity)(implicit es: ExecutionContext, mat: Materializer): Future[Source[Event, _]] = {
      val future = entity.contentType match {
        case ContentTypes.`application/json` =>
		  Future.successful{
			JsonFraming.objectScanner(maxJsonObject)
  			  .map(b => b.decodeString("UTF-8"))
			  .map(s => s.parseJson.convertTo[Event])
		  }
		case other =>
            Future.failed(new UnsupportedContentTypeException(supported)) 

        future.map(flow => entity.dataBytes.via(flow))(es)
	}
  }.forContentTypes(ContentTypes.`application/json`) 
}

Unmarshaller는 trait로 apply 생성 method를 구현하면 된다. 위의 예는 HttpEntity로 부터 contentType에 따라 Future[Source[Event,_ ] ]으로 반환 한다.
forContentTypes로 contentType이 application/json인 것만 변환한다.

route에서는 as[HttpEntity ]가 아닌 as[Source[Event,_ ] ]로 할 수 가 있다.(물론 암시가 scope에 있어야 한다.)

//위에서 만든 unmarshaller 암시를 선언 
implicit val eventUnmarshaller: Unmarshaller[HttpEntity, Source[Event,_]] = 
    LogEntityMarshaller.createEventUnMarshaller(maxLine, maxJsObject)
def postRoute = pathPrefix("logs" / Segment) { logId =>
  pathEndOrSingleSlash {
	post {
      //as는 Unmarshaller 암시자가 필요
      entity(as[Source[Event,_]]) { esrc =>
        onComplete(
          esrc.via(eventToStringFlow)
            .toMat(fileSink(logId))(Keep.right)
            .run()    
        ) {
          case Success(IOResult(count, Success(Done))) => 
            complete((StatusCodes.OK, LogReceipt(logId, count)))
          case Success(IOResult(count, Failure(e))) => 
            complete((
              StatusCodes.BadRequest,
              ParseError(logId, e.getMessage)
            ))
          case Failure(e) => 
            complete((
              StatusCodes.BadRequest,
              ParseError(logId, e.getMessage)
            ))
        }
      }
    }
  }
}

응답 내보내기

akka.http.scaladsl.marshalling.Marshal.toResponseFor를 이용여 Source를 넘기면 된다.
이때 toResponseFor metho는 ToResponseMarshaller[A ] 암시자가 필요하다.
toResponseFor method는 다음과 같다.

def toResponseFor(request: HttpRequest)(implicit m: ToResponseMarshaller[A],
	ec: ExecutionContext): Future[HttpResponse]

이제 Marshaller를 만드는 부분이다.

val jsContent = ContentTypes.`application/json`
def createEventMarshaller(maxJsonObject: Int): 
  ToEntityMarshaller[Source[ByteString,_]] = {

  val jsMarshaller: Marshaller[Source[ByteString,_], HttpEntity.Chunked] = 
    Marshaller.withFixedContentType(jsContent) {
      source: Source[ByteString, _] => HttpEntity(jsContent, source)
    }
}

위에 첫번째 예제에서의 응답보낼때 HttpEntity를 이용하여 Source를 넘긴 부분과 같다.
HttpEntity(ContentTypes.application/json,src)
이제 route부분을 보자

//위에서 만든 marshaller 암시를 선언 
implicit val eventMarshaller: ToEntityMarshaller[Source[ByteString, _]] = 
    LogEntityMarshaller.createEventMarshaller(maxJsObject)
    
def getRoute = pathPrefix("logs" / Segment) { logId =>
  pathEndOrSingleSlash {
    get {
      extractRequest{ req => 
        if(Files.exists(logFile(logId))) {
          val src = logFileSource(logId)
          //Marshal.toResponseFor 는 ToResponseMarshaller 암시자 를 필요로 한다.
          complete(Marshal(src).toResponseFor(req))
        } else {
          complete(StatusCodes.NotFound)
        }
      }
    }
  }
}

JsonEntityStreamingSupport 이용

위의 예는 custom Unmarshaller를 만드는 예를 보기 위한 것이다. 이를 통행 HttpEntity로 부터 Source[Event,_ ]를 얻었지만, akka.http.scaladsl.common.EntityStreamingSupport 객체를 이용하여 암시로 JsonEntityStreamingSupport를 선언하면 as[Source[Event,_ ] ]를 asSourceOf[Event ] { Source[event,NotUsed] => … 로 바로 사용 할 수가 있다.

import akka.http.scaladsl.common.EntityStreamingSupport._
import akka.http.javadsl.common.JsonEntityStreamingSupport

implicit val entityStreamingSupport:JsonEntityStreamingSupport =
 EntityStreamingSupport.json
  
def postRoute = pathPrefix("logs" / Segment) { logId =>
  pathEndOrSingleSlash {
    post {
      //asSourceOf[T]는 JsonEntityStreamingSupport 암시자가 필요
      entity(asSourceOf[Event]) { esrc =>
        onComplete(
          esrc.via(eventToStringFlow)
            .toMat(fileSink(logId))(Keep.right)
            .run()    
        ){
          case Success(IOResult(count, Success(Done))) => 
            complete((StatusCodes.OK, LogReceipt(logId, count)))
          case Success(IOResult(count, Failure(e))) => 
            complete((
              StatusCodes.BadRequest,
              ParseError(logId, e.getMessage)
            ))
          case Failure(e) => 
            complete((
              StatusCodes.BadRequest,
              ParseError(logId, e.getMessage)
            ))
        }
      }
    }
  }
}

BroadcastGraph

하나의 입출력이 아닌 하나의 입력 포트가 있고 이를 여러게의 출력 포트로 broadcase하는 Graph가 있다. Shape는 Graph를 입력 포트, 출력 포트의 갯수에 따른 모양을 정의한다. 이때 입력포트를 Inlet(인렛), 출력포트를 Outlet(아웃렛)이라 한다.
그리고 Source, Sink, Flow은 모두 모양(Shape)가 정의된 Graph이다. 즉 아래와 같다.

Flow[-In, +Out, +Mat] == Graph[FlowShape[In,Out],Mat]
Source[+Out,+Mat] = Graph[SourceShape[Out],Mat]
Sink[-Int,+Mat] = Graph[SinkShape[Int], Mat]

BroadcastGraph는 GraphDSL.builder를 통해 얻을 수 있으며, GraphDSL.builder는 add method를 통해 Shape를 반환하여 Graph의 입력을 표현하게 된다.
아래의 이미지는 Broadcast를 이미지화 한 것이다. 1개의 Inlet를 가지고 여러개의 Outlet을 만들어 낸다. stream01 이제 예제 코드를 보자 이는 Akka in action 에 나온 예제 이다.
event를 Inlet으로 받아 bracast를 통행 Event으로 Outlet한 후 Event의 상태에 따라 여러게의 Flow를 거처 상태에 따른 분기 file로 Json byteString format log를 남기는 예제 이다.

implicit val system = ActorSystem()
implicit val ec = system.dispatcher
implicit val mat = ActorMaterializer()
  
//Shape는 Graph의 입출력 갯수를 정의 한다.
type FlowLike = Graph[FlowShape[Event, ByteString], NotUsed]

def processState(logId: String): FlowLike = {
  val jsFlow: Flow[Event, ByteString, NotUsed] = 
    Flow[Event].map(e => ByteString(e.toJson.compactPrint))

  //fromGraph(FlowShape)
  Flow.fromGraph(
    GraphDSL.create(){ implicit builder => //GraphDSL.builder
      import GraphDSL.Implicits._

      val bcast: UniformFanOutShape[Event, Event] =
        builder.add(Broadcast[Event](5)) // Broadcase 열린 출력 5개

      val js: FlowShape[Event, ByteString] = builder.add(jsFlow)

      val ok = Flow[Event].filter(e => e.state == Ok)
      val warning = Flow[Event].filter(e => e.state == Warning)
      val error = Flow[Event].filter(e => e.state == Error)
      val critical = Flow[Event].filter(e => e.state == Critical)

      bcast ~> js.in
      bcast ~> ok ~> jsFlow ~> logFileSink(logId,Ok)
      bcast ~> warning ~> jsFlow ~> logFileSink(logId, Warning)
      bcast ~> error ~> jsFlow ~> logFileSink(logId, Error)
      bcast ~> critical ~> jsFlow ~> logFileSink(logId, Critical)
      
      //broadcast의 Inlet과 Flow.out 으로부터 Outlet를 조합한 FlowShape를 반환 한다.
      //Flow.fromGraph 이므로  결과로 1개의  입력포트와 1개의 출력포트를 가지는 Flow를 
      //만들어야 한다. 따라서 FlowShape를 이용하여, broadcase의 Inlet을 입력포트로, 다른 
      //FlowShape로 출력포트를 만든다.
      val result:FlowShape[Event, ByteString] = FlowShape(bcast.in, js.out)
      result
    })
}

val src = 
  FileIO.fromPath(Paths.get("/Users/sslee/temp/stream-example01.json"))
    .via(JsonFraming.objectScanner(1024 * 1024))
    .map(b => b.decodeString("UTF-8"))
    .map(s => s.parseJson.convertTo[Event])

def logFileSink(logId: String,state: State): Sink[ByteString,
  Future[IOResult]] = 
    FileIO.toPath(logFile(logId,state), Set(CREATE,WRITE,APPEND))

def logFile(logId: String,state: State): Path = 
  Paths.get(s"/Users/sslee/temp/$logId-${State.norm(state)}")
    

def logFileSink: Sink[ByteString, Future[IOResult]] = 
  FileIO.toPath(Paths.get("/Users/sslee/work/temp/bidiFlowJson.log"))

val resultFn:Future[IOResult] = 
  src.via(processState("10")).toMat(logFileSink)(Keep.right).run()
resultFn.onComplete {
  case Success(count) =>
    println(count)
  case Failure(e) => 
    println(e)
}

MergeGraph

BroadcastGraph와 반대로 여려 Inlet을 병합하여 하나의 Outlet으로 병합한다.