En akka, quiero poner los elementos en secuencia y devolver un objeto. Sé que los elementos podrían ser una fuente para ejecutar un gráfico. Pero, ¿cómo puedo poner el elemento y devolver un objeto en tiempo de ejecución?


import akka.actor.ActorSystem
import akka.stream.QueueOfferResult.{Dropped, Enqueued, Failure, QueueClosed}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import akka.stream.scaladsl.{Keep, Sink, Source}

import scala.Array.range
import scala.util.Success

object StreamElement {

  implicit val system = ActorSystem("StreamElement")
  implicit val materializer = ActorMaterializer()
  implicit val executionContext = system.dispatcher

  def main(args: Array[String]): Unit = {
    val (queue, value) = Source
    .queue[Int](10, OverflowStrategy.backpressure)
    .map(x => {
      x * x
    })
    .toMat(Sink.asPublisher(false))(Keep.both)
    .run()

    range(0, 10)
      .map(x => {
        queue.offer(x).onComplete {
          case Success(Enqueued) => {
          }

          case Success(Dropped) => {}
          case _ => {
             println("others")
          }
        }
      })
    }
}

¿Cómo puedo obtener el valor devuelto?

2
Tomia Wong 7 feb. 2020 a las 01:01

2 respuestas

La mejor respuesta

En realidad, desea devolver el valor int para cada elemento. Para que pueda crear el flujo, luego conectarse a la fuente y al sumidero para cada vez.


package tech.parasol.scala.akka

import akka.actor.ActorSystem
import akka.stream.QueueOfferResult.{Dropped, Enqueued, Failure, QueueClosed}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}

import scala.Array.range
import scala.util.Success

object StreamElement {

  implicit val system = ActorSystem("StreamElement")
  implicit val materializer = ActorMaterializer()
  implicit val executionContext = system.dispatcher

  val flow = Flow[Int]
    .buffer(16, OverflowStrategy.backpressure)
    .map(x => x * x)

  def main(args: Array[String]): Unit = {
    range(0, 10)
      .map(x => {
        Source.single(x).via(flow).runWith(Sink.head)
      }.map( v => println("v ===> " + v)
      ))
  }

}


2
YouXiang-Wang 7 feb. 2020 a las 02:52

No me queda claro por qué la colección Scala no se envía a Stream como fuente en su código de muestra. Dado que ya ha compuesto una transmisión con valores materializados para capturar en una cola de origen y un sumidero de editor, puede crear una fuente de suscriptor utilizando Source.fromPublisher para recopilar los valores deseados, como se muestra a continuación:

import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.stream._

implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()  // Not needed for Akka 2.6+

val (queue, pub) = Source
  .queue[Int](10, OverflowStrategy.backpressure)
  .map(x =>  x * x)
  .toMat(Sink.asPublisher(false))(Keep.both)
  .run()

val fromQueue = Source(0 until 10).runForeach(queue.offer(_))

val source = Source.fromPublisher(pub)

source.runForeach(x => print(x + " "))
// Output:
// 0 1 4 9 16 25 36 49 64 81 
1
Leo C 8 feb. 2020 a las 03:37