akk-http 는 akka-stream을 이용한다.
이는 pub-scribe 의 수요,공급에 대한 비동기 메시지 반응으로 reactive stream을 사용할 수 있다는 것이다. 매우 큰 요청이나 응답일지라도 OOM 걱정없이 지속적인 전달이 가능 하다.
앞전 post에서 이야기한 akka-stream을 그대로 이용하고 여기서 중요한 것은 route에서 directive 지시자 안에서 어떻게 Source를 얻고 Sink로 내보내는지를 보면 된다.
akka-http 에는 String이나 ByteString등에 기본 Marshaller가 있다. 또는 spray-json을 이용한 사용자 정의 Marshaller를 적용할 수 도 있다.
akka.http.scaladsl.model.HttpEntity 로부터 dataBytes를 이용하여 Source[ByteString,_]으로 Source를 얻어 기존에 akka-stream으로 했던 stream component를 이용하는 방법과, (구현하지 않아도 되지만) HttpEntity로 부터 custom Unmarshaller를 통해 Source[Event,_]형태로 source를 바로 얻을 수 도 있다.
akka-http에서의 akka-stream부분의 사용에 초점을 두는 것 이므로 akka-http는 추후 akka-http post에서 보자.
Http를 통한 Stream받기
post로 응답 처리
akka-http에 관한 부분은 생략하고 route부분의 Directive에서 어떻게 stream을 얻는지를 보자
위의 코드는 HttpEntity로 부턴 Source[ByteString,_ ] 를 얻는 예이다.
이 후 flow는 앞전에 보았던 BidiFlow의 조합으로 stream을 구성한다.
일단 client로 부터의 Source 추출만 얻는다면 이후 는 다른 것이 없다.
위의 예제는 client로 부터 넘어온 ByteString 을 Event로 변환 후 다시 ByteString으로 file에 기록 하는 것이다.
응답 내보내기
응답은 server의 file을 읽어 client에 json으로 응답하는 부분이다.
여기서도 기존의 stream Source[ByteString,Futre[IOResult ] ]는 기존과 같다. 이를 HttpEntity에게 넘기면 된다.
cumstom marshaller를 이용하기
서버에서 응답 처리
위의 서버에서 응답처리의 예제에서 entity(as[HttpEntity ]) { entity => 를 보면 entity를 이용하여 Source[ByteString ]를 얻은 후 이후 stream를 이용했다.
하지만 as[Source[Event,_ ] ]를 하여 바로 원하는 type으로 변환된 stream를 얻을 수 있다.
as 는 암시자로 Unmarshaller[T ] 이 필요하다. 따러서 Unmarshaller를 만들어야 한다.
Unmarshaller는 trait로 apply 생성 method를 구현하면 된다. 위의 예는 HttpEntity로 부터 contentType에 따라 Future[Source[Event,_ ] ]으로 반환 한다.
forContentTypes로 contentType이 application/json인 것만 변환한다.
route에서는 as[HttpEntity ]가 아닌 as[Source[Event,_ ] ]로 할 수 가 있다.(물론 암시가 scope에 있어야 한다.)
응답 내보내기
akka.http.scaladsl.marshalling.Marshal.toResponseFor를 이용여 Source를 넘기면 된다.
이때 toResponseFor metho는 ToResponseMarshaller[A ] 암시자가 필요하다.
toResponseFor method는 다음과 같다.
이제 Marshaller를 만드는 부분이다.
위에 첫번째 예제에서의 응답보낼때 HttpEntity를 이용하여 Source를 넘긴 부분과 같다.
HttpEntity(ContentTypes.application/json,src)
이제 route부분을 보자
JsonEntityStreamingSupport 이용
위의 예는 custom Unmarshaller를 만드는 예를 보기 위한 것이다. 이를 통행 HttpEntity로 부터 Source[Event,_ ]를 얻었지만, akka.http.scaladsl.common.EntityStreamingSupport 객체를 이용하여 암시로 JsonEntityStreamingSupport를 선언하면 as[Source[Event,_ ] ]를 asSourceOf[Event ] { Source[event,NotUsed] => … 로 바로 사용 할 수가 있다.
BroadcastGraph
하나의 입출력이 아닌 하나의 입력 포트가 있고 이를 여러게의 출력 포트로 broadcase하는 Graph가 있다.
Shape는 Graph를 입력 포트, 출력 포트의 갯수에 따른 모양을 정의한다. 이때 입력포트를 Inlet(인렛), 출력포트를 Outlet(아웃렛)이라 한다.
그리고 Source, Sink, Flow은 모두 모양(Shape)가 정의된 Graph이다. 즉 아래와 같다.
BroadcastGraph는 GraphDSL.builder를 통해 얻을 수 있으며, GraphDSL.builder는 add method를 통해 Shape를 반환하여 Graph의 입력을 표현하게 된다.
아래의 이미지는 Broadcast를 이미지화 한 것이다. 1개의 Inlet를 가지고 여러개의 Outlet을 만들어 낸다.
이제 예제 코드를 보자 이는 Akka in action 에 나온 예제 이다.
event를 Inlet으로 받아 bracast를 통행 Event으로 Outlet한 후 Event의 상태에 따라 여러게의 Flow를 거처 상태에 따른 분기 file로 Json byteString format log를 남기는 예제 이다.
MergeGraph
BroadcastGraph와 반대로 여려 Inlet을 병합하여 하나의 Outlet으로 병합한다.