He escrito una biblioteca llamada amqp-worker que proporciona una función llamada {{X0 }} que sondea una cola de mensajes (como RabbitMQ) para mensajes, llamando a un controlador cuando se encuentra un mensaje. Luego vuelve a las encuestas.

Está perdiendo memoria. Lo he perfilado y el gráfico dice que PAP (aplicación de función parcial) es el culpable. ¿Dónde está la fuga en mi código? ¿Cómo puedo evitar las fugas al hacer un bucle en IO con forever?

enter image description here

Aquí hay algunas funciones relevantes. La fuente completa está aquí.

Programa de ejemplo. Esto gotea

main :: IO ()
main = do
  -- connect
  conn <- Worker.connect (fromURI "amqp://guest:guest@localhost:5672")

  -- initialize the queues
  Worker.initQueue conn queue
  Worker.initQueue conn results

  -- publish a message
  Worker.publish conn queue (TestMessage "hello world")

  -- create a worker, the program loops here
  Worker.worker def conn queue onError (onMessage conn)

obrera

worker :: (FromJSON a, MonadBaseControl IO m, MonadCatch m) => WorkerOptions -> Connection -> Queue key a -> (WorkerException SomeException -> m ()) -> (Message a -> m ()) -> m ()
worker opts conn queue onError action =
  forever $ do
    eres <- consumeNext (pollDelay opts) conn queue
    case eres of
      Error (ParseError reason bd) ->
        onError (MessageParseError bd reason)

      Parsed msg ->
        catch
          (action msg)
          (onError . OtherException (body msg))
    liftBase $ threadDelay (loopDelay opts)

consumNext

consumeNext :: (FromJSON msg, MonadBaseControl IO m) => Microseconds -> Connection -> Queue key msg -> m (ConsumeResult msg)
consumeNext pd conn queue =
    poll pd $ consume conn queue

encuesta

poll :: (MonadBaseControl IO m) => Int -> m (Maybe a) -> m a
poll us action = do
    ma <- action
    case ma of
      Just a -> return a
      Nothing -> do
        liftBase $ threadDelay us
        poll us action
16
Sean Clark Hess 23 dic. 2016 a las 21:57

3 respuestas

La mejor respuesta

Aquí hay un ejemplo muy simple que demuestra su problema:

main :: IO ()
main = worker

{-# NOINLINE worker #-}
worker :: (Monad m) => m ()
worker =
  let loop = poll >> loop
  in loop

poll :: (Monad m) => m a
poll = return () >> poll
If you remove the `NOINLINE`, or specialize `m` to
`IO` (while compiling with `-O`), the leak goes away.

Escribí un blog detallado publicar sobre por qué exactamente este código pierde memoria. El resumen rápido es, como Reid señala en su respuesta, que el código crea y recuerda una cadena de aplicaciones parciales de >> s.

También presenté un ghc ticket sobre esto.

15
Roman Cheplyaka 10 ene. 2017 a las 20:53

Quizás un ejemplo más fácil de entender es este

main :: IO ()
main = let c = count 0
       in c >> c

{-# NOINLINE count #-}
count :: Monad m => Int -> m ()
count 1000000 = return ()
count n = return () >> count (n+1)

La evaluación de f >> g para acciones IO produce algún tipo de cierre que tiene referencias a f y g (es básicamente la composición de f y g como funciones en fichas de estado). count 0 devuelve un thunk c que evaluará una gran estructura de cierres de la forma return () >> return () >> return () >> .... Cuando ejecutamos c construimos esta estructura, y dado que tenemos que ejecutar c por segunda vez, toda la estructura aún está activa. Por lo tanto, este programa pierde memoria (independientemente de los indicadores de optimización).

Cuando count se especializa en IO y las optimizaciones están habilitadas, GHC tiene una variedad de trucos disponibles para evitar construir esta estructura de datos; pero todos confían en saber que la mónada es IO.

Volviendo al count :: Monad m => Int -> m () original, podemos intentar evitar construir esta gran estructura cambiando la última línea a

count n = return () >>= (\_ -> count (n+1))

Ahora la llamada recursiva está oculta dentro de una lambda, por lo que c es solo una pequeña estructura return () >>= (\_ -> BODY). Esto realmente evita la pérdida de espacio al compilar sin optimizaciones. Sin embargo, cuando las optimizaciones están habilitadas, GHC flota count (n+1) del cuerpo del lambda (ya que no depende del argumento) produciendo

count n = return () >>= (let body = count (n+1) in \_ -> body)

Y ahora c es una estructura grande de nuevo ...

4
Reid Barton 7 ene. 2017 a las 15:17

La pérdida de memoria estaba en poll. Utilizando monad-loops, cambié la definición a la siguiente: Parece que untilJust hace lo mismo que mi recursión, pero soluciona la fuga.

¿Alguien puede comentar por qué mi definición anterior de poll estaba perdiendo memoria?

{-# LANGUAGE FlexibleContexts #-}

module Network.AMQP.Worker.Poll where

import Control.Concurrent (threadDelay)
import Control.Monad.Trans.Control (MonadBaseControl)
import Control.Monad.Base (liftBase)
import Control.Monad.Loops (untilJust)

poll :: (MonadBaseControl IO m) => Int -> m (Maybe a) -> m a
poll us action = untilJust $ do
    ma <- action
    case ma of
      Just a -> return $ Just a
      Nothing -> do
        liftBase $ threadDelay us
        return Nothing
3
Sean Clark Hess 3 ene. 2017 a las 18:52