hdfs - Continuous REST consumption using Akka stream and http -


i trying build akka based system periodically(every 15 second) send rest request, filter, data cleansing , validation on received data , save hdfs.

below code wrote.

import akka.actor.actorsystem import akka.stream.actormaterializer import akka.stream.scaladsl.{flow, sink, source} import akka.http.scaladsl.http import akka.http.scaladsl.model.{httprequest, httpresponse, statuscodes} import akka.actor.props import akka.event.logging import akka.actor.actor  import scala.concurrent.{executioncontext, future} import scala.util.try import akka.http.scaladsl.client.requestbuilding._  /**   * created rabbanerjee on 4/6/2017.   */ class myactor extends actor {   val log = logging(context.system, this)    import scala.concurrent.executioncontext.implicits.global   def receive = {     case j:httpresponse => log.info("received" +j)     case k:anyref      => log.info("received unknown message"+k)   } } object stest extends app{    implicit val system = actorsystem("sys")   import system.dispatcher    implicit val materializer = actormaterializer()    val ss = system.actorof(props[myactor])   val httpclient = http().outgoingconnection(host = "rest_server.com", port = 8080)   val filtersuccess = flow[httpresponse].filter(_.status.issuccess())      val runnnn = source.tick(finiteduration(1,timeunit.seconds),finiteduration(15,timeunit.seconds),get("/"))         .via(httpclient)         .via(filtersuccess)         .to(sink.actorref(ss,oncompletemessage = "done"))    runnnn.run()  }  

the problem facing is,

even though used repeat/tick source, can see result once. it's not repetitively firing request.

i trying find grouping result of 50 such request, coz writing hadoop, cant write every request, flood hdfs multiple file.

you not consuming responses getting http call. compulsory consume entity bytes returned akka http, even if not interested in them.

more can found in docs.

in example, not using response entity, can discard bytes. see example below:

val runnnn = source.tick(finiteduration(1,timeunit.seconds),finiteduration(15,timeunit.seconds),get("/"))     .via(httpclient)     .map{resp => resp.discardentitybytes(); resp}     .via(filtersuccess)     .to(sink.actorref(ss,oncompletemessage = "done")) 

Comments

Popular posts from this blog

ios - Change Storyboard View using Seague -

commonjs - How to write a typescript definition file for a node module that exports a function? -

openid - Okta: Failed to get authorization code through API call -