Actualmente estoy trabajando en una aplicación Play con un trabajo en segundo plano que debería enviar correos periódicamente para los que quiero usar Akka. Debo agregar que soy realmente nuevo en Scala / Play / Akka.

Actualmente tengo la siguiente configuración:

// JobModule.scala
bind(classOf[MailJobScheduler]).asEagerSingleton()

Esto debería iniciar la siguiente pieza de código que funciona cada segundo

// MailJobScheduler.scala
val mailActor = actorSystem.actorOf(MailActor.props, "mail-actor")

actorSystem.scheduler.schedule(0 seconds, 1 seconds) {
    // check how many mails have to be sent and sent messages to the mailActor 
}

Es posible que cada segundo deban enviarse varios correos nuevos . Me pregunto: en caso de que envíe cada segundo 10 mensajes al mailActor, ¿será realmente solo un actor el que tendrá que hacer todo el trabajo o habrá varios actores haciendo el trabajo al mismo tiempo?

Si es un actor, ¿cómo puedo tener varios actores a los que puedo asignar el trabajo y cuántos puedo / debo tener?

0
Chris 11 ene. 2017 a las 23:24

2 respuestas

¿Qué tal si usas las transmisiones de Akka en su lugar?

import akka.Done
import akka.stream.{KillSwitch, KillSwitches, OverflowStrategy}
import akka.stream.scaladsl.{Keep, Sink, Source}
import scala.concurrent.duration._
import scala.concurrent.Future

object BatchEmailSender {
  sealed trait Msg
  case object Tick extends Msg
  case class Email(toAddress: String, body: String) extends Msg

  def apply(sendEmail: Email => Future[Done], sendInterval: FiniteDuration = 10.seconds)(implicit mat: ActorMaterializer)
    : (Email => Unit, KillSwitch) = {
    val emailQueue = scala.collection.mutable.Queue[Email]()

    val (emailCmdQueue, killSwitch) = Source.queue[Msg](0, OverflowStrategy.backpressure)
      .merge(Source.tick(0.seconds, sendInterval, Tick))
      .viaMat(KillSwitches.single)(Keep.both)
      .toMat(Sink.foreach {
        case newEmail: Email =>
          emailQueue.enqueue(newEmail)
        case Tick =>
          emailQueue.dequeueAll(_ => true).foreach { email =>
            sendEmail(email).onFailure { case e =>
              println(s"Error sending email to ${email.toAddress}: $e")
            }
          }
      })(Keep.left)
      .run()

    (emailCmdQueue.offer(_), killSwitch)
  }
}

Necesita una función sendEmail, y luego funcionaría así:

import scala.concurrent.ExecutionContext.Implicits.global // TODO: remove me

object TestApp extends App {
  import BatchEmailSender._
  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()

  def sendEmail(email: Email): Future[Done] ={
    println(s"Sending email $email") // TODO: insert real email sender code here
    Future.successful(Done)
  }

  val (sendEmailEvery10s, killSwitch) = BatchEmailSender(sendEmail)
  sendEmailEvery10s(Email("foo@bar.com", "Email will arrive in 10s"))
  sendEmailEvery10s(Email("foo@bar.com", "Email will arrive in same batch"))
  Thread.sleep(11000)
  sendEmailEvery10s(Email("foo@bar.com", "Email will arrive after another 10s"))
  Thread.sleep(11000)
  killSwitch.shutdown()
}

Puede que te haya complicado la vida, pero Akka Streams te permite hacer estas cosas sin preocuparte de qué actor hace qué, tiene contrapresión y, por lo general, es un código mucho más robusto.

Habría usado 1 actor si las transmisiones de Akka no existieran. Acumula todos los mensajes en el actor y luego envía un tick a sí mismo periódicamente.

0
Lodewijk Bogaards 12 ene. 2017 a las 01:00

Use el programador como lo está haciendo en su ejemplo, pero no veo cómo el mailActor lo ayude aquí.

actorSystem.scheduler.schedule(0 seconds, 1 seconds) {
    // just call the code the the checks for email
}

No asuma que habrá un hilo. es decir, tenga mucho cuidado de no cerrar por referencias inestables

0
andyczerwonka 12 ene. 2017 a las 01:15