En akka, quiero poner los elementos en secuencia y devolver un objeto. Sé que los elementos podrían ser una fuente para ejecutar un gráfico. Pero, ¿cómo puedo poner el elemento y devolver un objeto en tiempo de ejecución?
import akka.actor.ActorSystem
import akka.stream.QueueOfferResult.{Dropped, Enqueued, Failure, QueueClosed}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import akka.stream.scaladsl.{Keep, Sink, Source}
import scala.Array.range
import scala.util.Success
object StreamElement {
implicit val system = ActorSystem("StreamElement")
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher
def main(args: Array[String]): Unit = {
val (queue, value) = Source
.queue[Int](10, OverflowStrategy.backpressure)
.map(x => {
x * x
})
.toMat(Sink.asPublisher(false))(Keep.both)
.run()
range(0, 10)
.map(x => {
queue.offer(x).onComplete {
case Success(Enqueued) => {
}
case Success(Dropped) => {}
case _ => {
println("others")
}
}
})
}
}
¿Cómo puedo obtener el valor devuelto?
2 respuestas
En realidad, desea devolver el valor int para cada elemento. Para que pueda crear el flujo, luego conectarse a la fuente y al sumidero para cada vez.
package tech.parasol.scala.akka
import akka.actor.ActorSystem
import akka.stream.QueueOfferResult.{Dropped, Enqueued, Failure, QueueClosed}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import scala.Array.range
import scala.util.Success
object StreamElement {
implicit val system = ActorSystem("StreamElement")
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher
val flow = Flow[Int]
.buffer(16, OverflowStrategy.backpressure)
.map(x => x * x)
def main(args: Array[String]): Unit = {
range(0, 10)
.map(x => {
Source.single(x).via(flow).runWith(Sink.head)
}.map( v => println("v ===> " + v)
))
}
}
No me queda claro por qué la colección Scala no se envía a Stream como fuente en su código de muestra. Dado que ya ha compuesto una transmisión con valores materializados para capturar en una cola de origen y un sumidero de editor, puede crear una fuente de suscriptor utilizando Source.fromPublisher
para recopilar los valores deseados, como se muestra a continuación:
import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.stream._
implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer() // Not needed for Akka 2.6+
val (queue, pub) = Source
.queue[Int](10, OverflowStrategy.backpressure)
.map(x => x * x)
.toMat(Sink.asPublisher(false))(Keep.both)
.run()
val fromQueue = Source(0 until 10).runForeach(queue.offer(_))
val source = Source.fromPublisher(pub)
source.runForeach(x => print(x + " "))
// Output:
// 0 1 4 9 16 25 36 49 64 81
Nuevas preguntas
scala
Scala es un lenguaje de programación de propósito general dirigido principalmente a la máquina virtual Java. Diseñado para expresar patrones de programación comunes de una manera concisa, elegante y segura de tipos, fusiona estilos de programación imperativos y funcionales. Sus características clave son: un sistema de tipo estático avanzado con inferencia de tipo; tipos de funciones; la coincidencia de patrones; parámetros implícitos y conversiones; sobrecarga del operador; interoperabilidad total con Java; concurrencia