© Stefania Loredana Nita and Marius Mihailescu 2019
Stefania Loredana Nita and Marius MihailescuHaskell Quick Syntax Referencehttps://doi.org/10.1007/978-1-4842-4507-1_19

19. Haskell Pipes

Stefania Loredana Nita1  and Marius Mihailescu1
(1)
Bucharest, Romania
 

Haskell streaming programs provide great features such as effects, streaming, and composability. In classical programming, you can choose just two of these features. In this chapter, you will learn how to use pipes to achieve all three.

If you renounce effects, then you will obtain lists that are pure and lazy (we will talk about laziness in Haskell in Chapter 21). You will be able to transform them by applying composable functions in constant space, but you will not be able to interleave the effects. If you renounce streaming, then you will obtain mapM (which maps every element of a structure to a monadic action, and after the actions are evaluated from left to right, the results are collected), forM (similar to mapM but with flipped arguments), and a version of ListT that will not work properly. These imply effects and composability, but the result is returned only after the whole list is processed and loaded into memory. Lastly, if you renounce composability, then you will be able to write dependent reads, writes, and transforms, but they won’t be separate or modular.

A way to get all three functionalities is to use pipes, provided by the pipes 1 library. This library provides abstractions such as Producer, Consumer, Pipe, and the correct version of ListT, which can be combined in any way, because they have the same base type.

Specifically, with pipes, levels of streaming processing are forced to be decomposed such that they can be combined. This approach is useful because streaming components can be reused as interfaces or can be connected using constant memory if they are premade.

To decouple data, there are two commands: yield , which sends output data, and await , which receives input data. The following are the monad transformers and the contexts in which they can be used:
  • Producer is used only with yield and models streaming sources.

  • Consumer is used only with await and models streaming sinks.

  • Pipe can be used with both of them and models stream transformations.

  • Effect cannot be used with any of them and models nonstreaming components.

These components are combined using the following tools:
  • for works with yield.

  • (>~) works with await.

  • (>->) works with both of them.

  • (>>=) works with returned values.

When these monad transformers are combined, their types change to focus on the inputs and outputs that have been combined. When all inputs and outputs have been handled (i.e., they have been connected), you obtain an Effect. To stream, the last obtained Effect will be run.

The pipes package is not installed by default, so you need to install it using the following command at a terminal (if you get any error/warning message, see Chapter 26):
cabal install pipes
Let’s see a simple example:
Prelude> import Pipes
Prelude Pipes> import qualified Pipes.Prelude as PP
Prelude Pipes PP> runEffect $ PP.stdinLn >-> PP.takeWhile (/= "exit") >-> PP.stdoutLn
this is [--hit Enter key]
this is
a simple example [--hit Enter key]
a simple example
of using [--hit Enter key]
of using
the pipes library [--hit Enter key]
the pipes library
exit
Prelude Pipes PP>

Here, the first step is to import Pipes (to use runEffect) and Pipes.Prelude (to use stdinLn and takeWhile). The takeWhile function (this action works with Pipe) accepts an input as long as a predicate is satisfied (in this case, as long as the text you introduce is different from exit). The output of takeWhile becomes the input for stdinLn (this action works with Producer), which reads the string and adds a new line. To connect these two actions, you use (>->), and their result is an Effect. Finally, runEffect runs this Effect, converting it to the base monad.

Next, let’s see, as an example, how stdinLn action is defined in [1].
import Control.Monad (unless)
import Pipes
import System.IO (isEOF)
stdinLn :: Producer String IO ()
stdinLn = do
    eof <- lift isEOF        -- 'lift' an 'IO' action from the base monad
    unless eof $ do
        str <- lift getLine
        yield str            -- 'yield' the 'String'
        stdinLn              -- Loop

Here, the current Producer is suspended by yield, which generates a value and keeps the Producer suspended until the value is consumed. There are situations in which the value is not consumed by anybody, in this case yield will never return.

A great example of using pipes is an example of communication between a client and the server, provided in [3]. Here, the type of data that can be used in communication is defined:
{-# LANGUAGE DeriveGeneric #-}
module Command where
import Data.Binary
import GHC.Generics (Generic)
data Command = FirstMessage
           | DoNothing
           | DoSomething Int
           deriving (Show,Generic)
instance Binary Command
Next, you can see the way in which the server should handle the communication. To write the Server module , pipes-binary and pipes-network need to be installed. Open a terminal and type the following:
cabal install pipes-binary
cabal install pipes-network
Server looks like this:
module Server where
import Pipes
import qualified Pipes.Binary as PipesBinary
import qualified Pipes.Network.TCP as PNT
import qualified Command as C
import qualified Pipes.Parse as PP
import qualified Pipes.Prelude as PipesPrelude
pageSize :: Int
pageSize = 4096
-- pure handler, to be used with PipesPrelude.map
pureHandler :: C.Command -> C.Command
pureHandler c = c  -- answers the same command that we have receveid
-- impure handler, to be used with PipesPremude.mapM
sideffectHandler :: MonadIO m => C.Command -> m C.Command
sideffectHandler c = do
  liftIO $ putStrLn $ "received message = " ++ (show c)
  return $ C.DoSomething 0
  -- whatever incoming command 'c' from the client, answer DoSomething 0
main :: IO ()
main = PNT.serve (PNT.Host "127.0.0.1") "23456" $
  (connectionSocket, remoteAddress) -> do
                 putStrLn $ "Remote connection from ip = " ++ (show remoteAddress)
                 _ <- runEffect $ do
                   let bytesReceiver = PNT.fromSocket connectionSocket pageSize
                   let commandDecoder = PP.parsed PipesBinary.decode bytesReceiver
                   commandDecoder >-> PipesPrelude.mapM sideffectHandler >-> for cat PipesBinary.encode >-> PNT.toSocket connectionSocket
                   -- if we want to use the pureHandler
                   --commandDecoder >-> PipesPrelude.map pureHandler >-> for cat PipesBinary.Encode >-> PNT.toSocket connectionSocket
                 return ()
Finally, the client acts like this:
module Client where
import Pipes
import qualified Pipes.Binary as PipesBinary
import qualified Pipes.Network.TCP as PNT
import qualified Pipes.Prelude as PipesPrelude
import qualified Pipes.Parse as PP
import qualified Command as C
pageSize :: Int
pageSize = 4096
-- pure handler, to be used with PipesPrelude.amp
pureHandler :: C.Command -> C.Command
pureHandler c = c  -- answer the same command received from the server
-- inpure handler, to be used with PipesPremude.mapM
sideffectHandler :: MonadIO m => C.Command -> m C.Command
sideffectHandler c = do
  liftIO $ putStrLn $ "Received: " ++ (show c)
  return C.DoNothing  -- whatever is received from server, answer DoNothing
main :: IO ()
main = PNT.connect ("127.0.0.1") "23456" $
  (connectionSocket, remoteAddress) -> do
    putStrLn $ "Connected to distant server ip = " ++ (show remoteAddress)
    sendFirstMessage connectionSocket
    _ <- runEffect $ do
      let bytesReceiver = PNT.fromSocket connectionSocket pageSize
      let commandDecoder = PP.parsed PipesBinary.decode bytesReceiver
      commandDecoder >-> PipesPrelude.mapM sideffectHandler >-> for cat PipesBinary.encode >-> PNT.toSocket connectionSocket
    return ()
sendFirstMessage :: PNT.Socket -> IO ()
sendFirstMessage s = do
  _ <- runEffect $ do
    let encodedProducer = PipesBinary.encode C.FirstMessage
    encodedProducer >-> PNT.toSocket s
  return ()
In this example from [3], the client requests a connection through FirstMessage. The server accepts the connection through DoSomething 0 , and then the client notices the connection is opened and sends DoNothing . After the connection is initiated, the communication is defined through cycles of DoSomething 0 and DoNothing. To compile, use ghc, as shown here:
ghc Command.hs
ghc -main-is Client Client.hs
ghc -main-is Server Server.hs

Summary

In this chapter, you learned the following:
  • You saw that Haskell provides great features, but they cannot be used all at once.

  • You saw that, luckily, there is a library that forces the program to combine them all, namely, pipes.

  • You saw a more complex example of using pipes.

References

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.250.153