第 24 章:并发和多核编程

优质
小牛编辑
129浏览
2023-12-01

在撰写此书时,CPU 架构的景观正以几十年来最快的速度发生变化。

定义并发和并行

一个并发程序需要同时处理多个互不相关的任务。考虑一下游戏服务器的例子:典型做法是将数十个组件组合起来,其中的每一个都与外部有复杂交互。可能其中某个组件负责多个用户间聊天;另外一些负责处理玩家的输入,并且将更新后的状态返回给客户端;同时还有其他程序执行物理计算。

并发程序的正确运转并不需要多核,尽管多核可以提高执行效率和响应速度。

相比之下,一个并行程序仅解决一个单独的问题。假设一个金融模型尝试计算并预测下一分钟某支股票的价格波动。如果想在某个交易所列出的所有股票上执行这个模型,例如计算一下那些股票应该买入或卖出,我们希望在五百个核上可以比仅有一个核的时候跑得更快。这表明,并行程序通常不需要通过多核来保证正确性。

另一个有效区分并行和并发的点在于他们如何与外部世界交互。由定义,并发程序连续不断的处理网络协议和数据库之类的东西。典型的并行程序可能更专注:其接收流入的数据,咀嚼一会儿(间或有点 I/O),然后将需要返回的数据流吐出来。

许多传统编程语言进一步模糊了并发和并行之间已经难以辨认的边界,这些语言强制程序员使用相同的基础设施投监这两种程序。

本章将涉及在单个操作系统进程内进行并发和并行编程。

用线程进行并发编程

作为并发编程的基础,大多数语言提供了创建多个多线程的方法。 Haskell 也不例外,尽管使用 Haskell 进行线程编程看起来和其他语言有些不同。

In Haskell, a thread is an IO action that executes independently from other threads. To create a thread, we import the Control.Concurrent module and use the forkIO function Haskell 中,线程是互相独立的 IO 动作。为创建线程,需要导入 Control.Concurrent 模块并使用其中的 forkIO 函数

ghci> :m +Control.Concurrent
ghci> :t forkIO
forkIO :: IO () -> IO ThreadId
ghci> :m +System.Directory
ghci> forkIO (writeFile "xyzzy" "seo craic nua!") >> doesFileExist "xyzzy"
True

新线程几乎立即开始执行,创建它的线程同时继续向下执行。新线程将在它的 IO 动作结束后停止执行。

线程的不确定性

GHC 的运行时组件并不按特定顺序执行多个线程。所以,上面的例子中,文件 xyzzy 的创建时间在初始线程检查其是否存在之前或之后都有可能。如果删除 xyzzy 并且再执行一次,我们可能得到完全相反的结果。

隐藏延迟

假设我们要将一个大文件压缩并写入磁盘,但是希望快速处理用户输入以使他们感觉程序是立即响应的。如果使用 forkIO 来开启一个单独的线程去写文件,这样就可以同时做这两件事。

-- file: ch24/Compressor.hs
import Control.Concurrent (forkIO)
import Control.Exception (handle)
import Control.Monad (forever)
import qualified Data.ByteString.Lazy as L
import System.Console.Readline (readline)

-- http://hackage.haskell.org/ 上的 zlib 包提供了压缩功能
import Codec.Compression.GZip (compress)

main = do
    maybeLine <- readline "Enter a file to compress> "
    case maybeLine of
      Nothing -> return ()      -- 用户输入了 EOF
      Just "" -> return ()      -- 不输入名字按 “想要退出” 处理
      Just name -> do
           handle
             (print :: (SomeException->IO ()))
             $ do
                 content <- L.readFile name
                 forkIO (compressFile name content)
                 return ()
           main
  where compressFile path = L.writeFile (path ++ ".gz") . compress

[ Forec 译注: 原著代码稍微有点瑕疵,上面是修正后的版本。 此外,在部分 GHC 中执行上述程序可能遇到 System.Console.Readline 包无法导入的情况。ReadlineGNU readline 库的 Haskell 绑定,你可以在 http://www.gnu.org/software/readline/ 获取稳定版本的 readline 库。 另一种解决方法是使用 haskeline 代替 readline ,它的文档位于 http://hackage.haskell.org/package/haskeline-0.7.3.1/docs/System-Console-Haskeline.html

import Control.Concurrent (forkIO)
import Control.Exception (handle, SomeException)
import qualified Data.ByteString.Lazy as L
import System.Console.Haskeline (runInputT, defaultSettings, getInputLine)

-- Provided by the 'zlib' package on http://hackage.haskell.org/
import Codec.Compression.GZip (compress)

main :: IO ()
main = do
  maybeLine <- runInputT defaultSettings $ getInputLine "Enter a file to compress> "
  case maybeLine of
    Nothing -> return () -- user entered EOF
    Just "" -> return () -- treat no name as "want to quit"
    Just name -> do
      handle (print :: SomeException -> IO ()) $ do
        content <- L.readFile name
        forkIO (compressFile name content)
        return ()
      main

]

因为使用了惰性的 ByteString I/O ,主线程中做仅仅是打开文件。真正读取文件内容发生在子线程中。

当用户输入的文件名并不存在时将发生异常, handle (print :: (SomeException-> IO ())) 是一个低成本的打印错误信息的方式。

线程间的简单通信

在两个线程之间共享信息最简单的方法是,让它们使用同一个变量。上面文件压缩的例子中, main 线程与子线程共享了文件名和文件内容。 Haskell 的数据默认是不可变的,所以这样共享不会有问题,两个线程都无法修改另一个线程中的文件名和文件内容。

线程经常需要和其他线程进行活跃的通信。例如, GHC 没有提供查看其他线程是否还在执行、执行完毕、或者崩溃的方法 [54] 。可是,其提供了同步变量类型, MVar ,我们可以通过它自己实现上述功能。

MVar 的行为类似一个单元素的箱子:其可以为满或空。将一些东西扔进箱子,使其填满,或者从中拿出一些东西,使其变空。

ghci> :t putMVar
putMVar :: MVar a -> a -> IO ()
ghci> :t takeMVar
takeMVar :: MVar a -> IO a

尝试将一个值放入非空的 MVar ,将会导致线程休眠直到其他线程从其中拿走一个值使其变空。类似的,如果尝试从一个空的 MVar 取出一个值,线程也将休眠,直到其他线程向其中放入一个值。

-- file: ch24/MVarExample.hs
import Control.Concurrent

communicate = do
  m <- newEmptyMVar
  forkIO $ do
    v <- takeMVar m
    putStrLn ("received " ++ show v)
  putStrLn "sending"
  putMVar m "wake up!"

newEmptyMVar 函数的作用从其名字一目了然。要创建一个初始状态非空的 MVar ,需要使用 newMVar

ghci> :t newEmptyMVar
newEmptyMVar :: IO (MVar a)
ghci> :t newMVar
newMVar :: a -> IO (MVar a)

ghci 运行一下上面例子。

ghci> :load MVarExample
[1 of 1] Compiling Main             ( MVarExample.hs, interpreted )
Ok, modules loaded: Main.
ghci> communicate
sending
rece

如果有使用传统编程语言编写并发程序的经验,你会想到 MVar 有助于实现两个熟悉的效果。

  • 从一个线程向另一个线程发送消息,例如:一个提醒。
  • 对线程间共享的可变数据提供互斥。在数据没有被任何线程使用时,将其放入 MVar ,某线程需要读取或改变它时,将其临时从中取出。

主线程等待其他线程

GHC 的运行时系统对主线程的控制与其他线程不同。主线程结束时,运行时系统认为整个程序已经跑完了。其他没有执行完毕的线程,会被强制终止。

所以,如果线程执行时间非常长,且必须不被杀死,必须对主线程做特殊安排,以使得主线程在其他线程完成前都不退出。让我们来开发一个小库实现这一点。

-- file: ch24/NiceFork.hs
import Control.Concurrent
import Control.Exception (Exception, try)
import qualified Data.Map as M

data ThreadStatus = Running
                  | Finished         -- 正常退出
                  | Threw Exception  -- 被未捕获的异常终结
                    deriving (Eq, Show)

-- | 创建一个新线程管理器
newManager :: IO ThreadManager

-- | 创建一个被管理的线程
forkManaged :: ThreadManager -> IO () -> IO ThreadId

-- | 立即返回一个被管理线程的状态
getStatus :: ThreadManager -> ThreadId -> IO (Maybe ThreadStatus)

-- | 阻塞,直到某个特定的被管理线程终结
waitFor :: ThreadManager -> ThreadId -> IO (Maybe ThreadStatus)

-- | 阻塞,直到所有被管理线程终结
waitAll :: ThreadManager -> IO ()

[Forec 译注:需要对代码做一些改动。在新版本 Control.Exception 中,Exception 的 kind 是 * -> *,需要提供一个具体型别作为参数。 可以将代码中的两处 Exception 替换为 SomeException。]

我们使用一个常见的方法来实现 ThreadManager 的类型抽象:将其包裹进一个 newtype ,并防止使用者直接创建这个类型的值。在模块的导出声明中,我们列出了一个创建线程管理器的 IO 动作,但是并不直接导出类型构造器。

-- file: ch24/NiceFork.hs
module NiceFork
    (
      ThreadManager
    , newManager
    , forkManaged
    , getStatus
    , waitFor
    , waitAll
    ) where

ThreadManager 的实现中维护了一个线程 ID 到线程状态的 map 。我们将此作为线程 map 。

-- file: ch24/NiceFork.hs
newtype ThreadManager =
    Mgr (MVar (M.Map ThreadId (MVar ThreadStatus)))
    deriving (Eq)

newManager = Mgr `fmap` newMVar M.empty

此处使用了两层 MVar 。首先将 Map 保存在 MVar 中。这将允许通过使用新版本替换来“改变” map 中的值。同样确保了每个使用这个 Map 的线程可以看到一致的内容。

对每个被管理的线程,都维护一个对应的 MVar 。这种 MVar 从空状态开始,表示这个线程正在执行。当线程被杀死或者发生未处理异常导致退出时,我们将此类信息写入这个 MVar

为了创建一个线程并观察它的状态,必须做一点簿记。

-- file: ch24/NiceFork.hs
forkManaged (Mgr mgr) body =
    modifyMVar mgr $ \m -> do
      state <- newEmptyMVar
      tid <- forkIO $ do
        result <- try body
        putMVar state (either Threw (const Finished) result)
      return (M.insert tid state m, tid)

[Forec 译注: 上面这段代码中有一些对读者而言可能相对生疏的函数,在此稍作解释:try 的型别声明是 Exception e => IO a -> IO (Either e a) ,它执行一个 IO 操作,若执行过程中发生异常则返回 Left e,否则返回 Righteither 的型别声明是 (a -> c) -> (b -> c) -> Either a b -> c,如果 try 返回的是 Left 类型,either 会用 Threw 将异常值包裹,否则无论 Right 中包含的值是什么,都返回 Finished 的状态。 关于 modifyMVar ,请看下一节的介绍。它的返回值是一个 tuple,这个 tuple 的第一个元素将被放回到 mgr 中,而第二个元素会作为返回值。 ]

安全的修改 MVar

forkManaged 中使用的 modifyMVar 函数很实用:它将 takeMVarputMVar 安全的组合在一起。

ghci> :t modifyMVar
modifyMVar :: MVar a -> (a -> IO (a, b)) -> IO b

其从一个 MVar 中取出一个值,并传入一个函数。这个函数生成一个新的值,且返回一个结果。如果函数抛出一个异常, modifyMVar 会将初始值重新放回 MVar ,否则其会写入新值。它还会返回另一个返回值。

使用 modifyMVar 而非手动使用 takeMVarputMVar 管理 MVar , 可以避免两类并发场景下的问题。

  • 忘记将一个值放回 MVar 。有的线程会一直等待 MVar 中被放回一个值,如果一致没有等到,就将导致死锁。
  • 没有考虑可能出现的异常,扰乱了某端代码的控制流。这可能导致一个本应执行的 putMVar 没有执行,进而导致死锁。

因为这些美妙的安全特性,尽可能的使用 modifyMVar 是明智的选择。

安全资源管理:一个相对简单的好主意。

modifyMVar 遵循的模式适用很多场景。下面是这些模式:

  1. 获得一份资源。
  2. 将资源传入一个将处理它函数。
  3. 始终释放资源,即使函数抛出异常。如果发生异常,重新抛出异常,以便使其被程序捕获。

除了安全性,这个方法还有其他好处:可以是代码更简短且容易理解。正如前面的 forkManagedHakell 的简洁语法和匿名函数使得这种风格的代码看起来一点都不刺眼。

下面是 modifyMVar 的定义,从中可以了解这个模式的细节:

-- file: ch24/ModifyMVar.hs
import Control.Concurrent (MVar, putMVar, takeMVar)
import Control.Exception (block, catch, throw, unblock)
import Prelude hiding (catch) -- use Control.Exception's version

modifyMVar :: MVar a -> (a -> IO (a,b)) -> IO b
modifyMVar m io =
  block $ do
    a <- takeMVar m
    (b,r) <- unblock (io a) `catch` \e ->
             putMVar m a >> throw e
    putMVar m b
    return r

这种模式很容易用于你的特定需求,无论是处理网络连接,数据库句柄,或者被 C 库函数管理的数据。

[Forec 译注:blockunblock 在很久以前就已经被弃置了。最新 base 包中 modifyMVar 的实现如下:

modifyMVar :: MVar a -> (a -> IO (a,b)) -> IO b
modifyMVar m io =
  mask $ \restore -> do
  a      <- takeMVar m
  (a',b) <- restore (io a >>= evaluate) `onException` putMVar m a
  putMVar m a'
  return b

]

查看线程状态

我们编写的 getStatus 函数用于获取某个线程的当前状态。若某线程已经不被管理(或者未被管理),它返回 Nothing

-- file: ch24/NiceFork.hs
getStatus (Mgr mgr) tid =
  modifyMVar mgr $ \m ->
    case M.lookup tid m of
      Nothing -> return (m, Nothing)
      Just st -> tryTakeMVar st >>= \mst -> case mst of
                   Nothing -> return (m, Just Running)
                   Just sth -> return (M.delete tid m, Just sth)

若线程仍在运行,它返回 Just Running 。 否则,它指出将线程为何被终止,并停止管理这个线程。

tryTakeMVar 函数发现 MVar 为空,它将立即返回 Nothing 而非阻塞等待。

ghci> :t tryTakeMVar
tryTakeMVar :: MVar a -> IO (Maybe a)

否则,它将从 MVar 取到一个值。

waitFor 函数的行为较简单,其会阻塞等待给定线程终止,而非立即返回。

-- file: ch24/NiceFork.hs
waitFor (Mgr mgr) tid = do
  maybeDone <- modifyMVar mgr $ \m ->
    return $ case M.updateLookupWithKey (\_ _ -> Nothing) tid m of
      (Nothing, _) -> (m, Nothing)
      (done, m') -> (m', done)
  case maybeDone of
    Nothing -> return Nothing
    Just st -> Just `fmap` takeMVar st

首先读取保存线程状态的 MVar ,若其存在。 Map 类型的 updateLookupWithKey 函数很有用:它将查找某个值与更新或移除组合起来。

ghci> :m +Data.Map
ghci> :t updateLookupWithKey
updateLookupWithKey :: (Ord k) =>
                       (k -> a -> Maybe a) -> k -> Map k a -> (Maybe a, Map k a)

在此处,我们希望若保存线程状态的 MVar 存在,则将其从 Map 中移除,这样线线程管理器将不在管理这个线程。若从其中取到了值,则从中取出线程的退出状态,并将其返回。

我们的最后一个实用函数简单的等待所有当前被管理的线程完成,且忽略他们的退出状态。

-- file: ch24/NiceFork.hs
waitAll (Mgr mgr) = modifyMVar mgr elems >>= mapM_ takeMVar
    where elems m = return (M.empty, M.elems m)

[Forec 译注:注意 waitAll 函数其实是有缺陷的,它仅仅能够等待在执行 waitAll 之前创建的所有线程。如果在等待期间存在某个线程异步启动, waitAll 是无法获知其状态的。

至此,这个简单的 ThreadManager 基本可以运行了,你可以在 GHCI 中通过如下方式检测一下:

Prelude> :l NiceFork.hs
[1 of 1] Compiling NiceFork         ( NiceFork.hs, interpreted )
Ok, modules loaded: NiceFork.
*NiceFork> let calc = do { calc; return ()}
*NiceFork> manager <- newManager
*NiceFork> tid <- forkManaged manager calc
*NiceFork> ans <- getStatus manager tid
*NiceFork> :m +Data.Maybe
*NiceFork Data.Maybe> fromJust ans
Threw stack overflow

我们通过一个反复调用自身的 calc 函数构造栈溢出,线程管理器成功地返回了这一结果。

]

编写更紧凑的代码

我们在上面定义的 waitFor 函数有点不完善,因为或多或少执行了重复的模式分析:在 modifyMVar 内部的回调函数,以及处理其返回值时。

当然,我们可以用一个函数消除这种重复。这是 Control.Monad 模块中的 join 函数。

ghci> :m +Control.Monad
ghci> :t join
join :: (Monad m) => m (m a) -> m a

这是个有趣的主意:可以创建一个 monadic 函数或纯代码中的 action ,然后一直带着它直到最终某处有个 monad 可以使用它。一旦我们了解这种写法适用的场景,就可以更灵活的编写代码。

-- file: ch24/NiceFork.hs
waitFor2 (Mgr mgr) tid =
  join . modifyMVar mgr $ \m ->
    return $ case M.updateLookupWithKey (\_ _ -> Nothing) tid m of
      (Nothing, _) -> (m, return Nothing)
      (Just st, m') -> (m', Just `fmap` takeMVar st)

使用频道通信

对于线程间的一次性通信, MVar 已经足够好了。另一个类型, Chan 提供了单向通信频道。此处有一个使用它的简单例子。

-- file: ch24/Chan.hs
import Control.Concurrent
import Control.Concurrent.Chan

chanExample = do
  ch <- newChan
  forkIO $ do
    writeChan ch "hello world"
    writeChan ch "now i quit"
  readChan ch >>= print
  readChan ch >>= print

若一个 Chan 未空, readChan 将一直阻塞,直到读到一个值。 writeChan 函数从不阻塞:它会立即将一个值写入 Chan

注意事项

MVar 和 Chan 是非严格的

正如大多数 Haskell 容器类型, MVarChar 都是非严格的:从不对其内容求值。我们提到它,并非因为这是一个问题,而是因为这通常是一个盲点:人们倾向于假设这些类型是严格的,这大概是因为它们被用在 IO monad 中。

正如其他容器类型,误认为 MVarChan 是严格的会导致空间和性能的泄漏。考虑一下这个很可能发生的情况:

我们分离一个线程以在另一个核上执行一些开销较大的计算

-- file: ch24/Expensive.hs
import Control.Concurrent

notQuiteRight = do
  mv <- newEmptyMVar
  forkIO $ expensiveComputation_stricter mv
  someOtherActivity
  result <- takeMVar mv
  print result

它看上去做了一些事情并将结果存入 MVar

-- file: ch24/Expensive.hs
expensiveComputation mv = do
  let a = "this is "
      b = "not really "
      c = "all that expensive"
  putMVar mv (a ++ b ++ c)

当我们在父线程中从 MVar 获取结果并尝试用它做些事情时,我们的线程开始疯狂的计算,因为我们从未强制指定在其他线程中的计算真正发生。

照旧,一旦我们知道了有个潜在问题,解决方案就很简单:未分离的线程添加严格性,以确保计算确实发生。这个严格性最好加在一个位置,以避免我们忘记添加过它。

-- file: ch24/ModifyMVarStrict.hs
{-# LANGUAGE BangPatterns #-}

import Control.Concurrent (MVar, putMVar, takeMVar)
import Control.Exception (block, catch, throw, unblock)
import Prelude hiding (catch) -- 使用 Control.Exception's 中的 catch 而非 Prelude 中的。

modifyMVar_strict :: MVar a -> (a -> IO a) -> IO ()
modifyMVar_strict m io = block $ do
  a <- takeMVar m
  !b <- unblock (io a) `catch` \e ->
        putMVar m a >> throw e
  putMVar m b

[Forec 译注:blockunblock 不被建议使用,更好的方式是使用 mask 。此外,上面的模式匹配仅仅将表达式求值为 WHNF(弱首范式),关于 弱首范式的内容将在本章后半部分讨论。你暂时可以简单地将其理解为 “剥去表达式的一层”,例如 1 + 2 将被求值为 3,而 "ab" ++ "bc" 则仅仅被求值为 ('a': ("b" ++ "bc"))。所以这里代码的 “严格” 是有缺陷的。 一个可行但不太符合工程应用的做法是使用 Control.DeepSeq 中的 rnf 方法,该方法将表达式求值为范式。建议将原著代码修改如下:

-- file: ch24/ModifyMVarStrict.hs
{-# LANGUAGE BangPatterns #-}

import Control.Concurrent (MVar, putMVar, takeMVar)
import Control.Exception (catch, throw, mask)
import Prelude hiding (catch)

modifyMVar_strict :: MVar a -> (a -> IO a) -> IO ()
modifyMVar_strict m io = mask $ \restore -> do
  a <- takeMVar m
  b <- restore (io a) `catch` \e ->
        putMVar m a >> throw e
rnf b `seq` putMVar m b

]

Note

查看 Hackage 始终是值得的。

Hackage 包数据库,你将发现一个库,strict-concurrency ,它提供了严格版本的 MVarChan 类型

上面代码中的 ! 模式用起来很简单,但是并不总是足以确保我们的数据已经被求值。更完整的方法,请查看下面的段落“从求值中分离算法”。

Chan 是无边界的

因为 writeChan 总是立即成功,所以在使用 Chan 时有潜在风险。若对某个 Chan 的写入多于其读取, Chan 将用不检查的方法增长:对未读消息的读取将远远落后于其增长。

共享状态的并发仍不容易

尽管 Haskell 拥有与其他语言不同的基础设施用于线程间共享数据,它仍需克服相同的基本问题:编写正确的并发程序极端困难。真的,一些其他语言中的并发编程陷阱也会在 Haskell 中出现。其中为人熟知的两个是死锁和饥饿。

死锁

死锁的情况下,两个或多个线程永远卡在争抢共享资源的访问权上。制造多线程程序死锁的一个经典方法是不按顺序加锁。这种类型的 bug 很常见,它有个名字:锁顺序倒置。 Haskell 没有提供锁, 但 MVar 类型可能会有顺序倒置问题。这有一个简单例子:

-- file: ch24/LockHierarchy.hs
import Control.Concurrent

nestedModification outer inner = do
    modifyMVar_ outer $ \x -> do
        yield -- 强制当前线程让出 CPU
        modifyMVar_ inner $ \y -> return (y + 1)
        return (x + 1)
    putStrLn "done"

main = do
    a <- newMVar 1
    b <- newMVar 2
    forkIO $ nestedModification a b
    forkIO $ nestedModification b a

在 ghci 中运行这段程序,它通常会(但不总是)不打印任何信息,表明两个线程已经卡住了。

容易看出 nestedModification 函数的问题。在第一个线程中,我们先取出 MVar a ,接着取出 b 。在第二个线程中,先取出 b 然后取出 a ,若第一个线程成功取出了 a 然后要取出 b ,这是两个线程都会阻塞:每个线程都尝试获取一个 MVar ,而这个 MVar 已经被另一个线程取空了,所以二者都不能完成整个流程。

无论何种语言,通常解决倒序问题的方法是申请资源时一直遵循一致的顺序。因为这需要人工遵循编码规范,在实践中很容易遗忘。

更麻烦的是,这种倒序问题在实际代码中很难被发现。获取 MVar 的动作经常跨越不同文件中的不同函数,这使得通过观察源码检查时更加棘手。更糟糕的是,这类问题通常是间歇性的,这使得它们难于重现,更不要说隔离和修复了。

饥饿

并发软件通常可能会导致饥饿问题,某个线程霸占了共享资源,阻止其他线程使用。很容易想象这是如何发生的:一个线程调用 modifyMVar 执行一个 100 毫秒的代码段,稍后另外一个线程对同一个 MVar 调用 modifyMVar 执行一个 1 毫秒的代码段。第二个线程在第一个线程完成前将无法执行。

MVar 类型的非严格性质使会导致或恶化饥饿的问题。若我们将一个求值开销很大的 thunk 写入一个 MVar ,在一个看上去开销较小的线程中取出并求值,这个线程的执行开销马上会变大。所以我们在 “MVar 和 Chan 是非严格的” 一章中特地给出了一些建议。

没希望了吗?

幸运的是,我们已经提及的并发 API 并不是故事的全部。最近加入 Haskell 中的一个设施,软件事务内存,使用起来更加容易和安全。我们将在第 28 章,软件事务内存中介绍。

练习

  1. Chan 类型是使用 MVar 实现的。使用 MVar 来开发一个有边界的 Chan 库。
  2. 你开发的 newBoundedChanfunction 接受一个 Int 参数,限制单独 BoundedChan 中的未读消息数量。
  3. 达到限制是, 调用 writeBoundedChanfunction 要被阻塞,知道某个读取者使用 readBoundedChan 函数消费掉队列中的一个值。
  4. 尽管我们已经提到过 Hackage 库中的 strict-concurrency 包,试着自己开发一个,作为内置 MVar 类型的包装。按照经典的 Haskell 实践,使你的库类型安全,让用户不会混淆严格和非严格的 MVar

在 GHC 中使用多核

默认情况下, GHC 生成的程序只使用一个核,甚至在编写并发代码时也是如此。要使用多核,我们必须明确指定。当生成可执行程序时,要在链接阶段指定这一点。

  • “non-threaded” 运行时库在一个操作系统线程中运行所有 Haskell 线程。这个运行时在创建线程和通过 MVar 传递数据时很高效。
  • “threaded” 库使用多个操作系统线程运行 Haskell 线程。它在创建线程和使用 MVar 时具有更高的开销。

若我们在向编译器传递 -threadedoption 参数,它将使用 threaded 运行时库链接我们的程序。在编译库和源码文件时无需指定 -threaded ,只是在最终生成可执行文件时需要指定。

即使为程序指定了 threaded 运行时,默认情况下它仍将只使用一个核运行。必须明确告诉运行时使用多少个核。

运行时选项

运行程序时可以向 GHC 的运行时系统传递命令行参数。在将控制权交给我们的代码前,运行时扫描程序的参数,看是否有命令行选项 +RTS 。其后跟随的所有选项都被运行时解释,直到特殊的选项 -RTS ,这些选项都是提供给运行时系统的,不为我们的程序。运行时会对我们的代码隐藏所有这些选项。当我们使用 System.Environment 模块的 getArgsfunction 来获得我们的命令行参数是,我们不会在其中获得运行时选项。

threaded 运行时接受参数 -N [55] 。 其接受一个参数,指定了 GHC 的运行时系统将使用的核数。这个选项对输入很挑剔: -N 和参数之间必须没有空格。 -N4 可被接受, -N 4 则不被接受。

找出 Haskell 可以使用多少核

GHC.Conc 模块输出一个变量, numCapabilities ,它会告诉我们运行时系统被 -NRTS 选项指定了多少核。

-- file: ch24/NumCapabilities.hs
import GHC.Conc (numCapabilities)
import System.Environment (getArgs)

main = do
    args <- getArgs
    putStrLn $ "command line arguments: " ++ show args
    putStrLn $ "number of cores: " ++ show numCapabilitie

若编译上面的程序,我们可以看到运行时系统的选项对于程序来说是不可见的,但是它可以看其运行在多少核上。

$ ghc -c NumCapabilities.hs
$ ghc -threaded -o NumCapabilities NumCapabilities.o $ ./NumCapabilities +RTS -N4 -RTS foo
command line arguments: ["foo"]
number of cores: 4

选择正确的运行时

选择正确的运行时需要花点心思。 threaded 运行时可以使用多核,但是也有相应的代价:线程间共享数据的成本比 non-threaded 运行时更大。

更关键的是,GHC 6.8.3 版本使用的还是单线程的垃圾收集器:它在执行时会暂停其他所有线程,然后在单个核上执行垃圾收集工作。这限制了我们在使用多核的时候希望看到的性能改进 [56]

很多真实世界中的并发程序中,一个单独的线程多数时间实在等待一个网络请求或响应。这些情况下,若以一个单独的 Haskell 程序为数万并发客户端提供服务,使用低开销的 non-threaded 运行时很可能是合适的。例如,与其用 4 个核跑 threaded 运行时的单个服务器程序,可能同时跑 4 个 non-threaded 运行时的相同服务器程序性能更好。

我们的目的并不是阻止你使用 threaded 运行时。相对于 non-threaded 运行时它并没有特别大的开销:相对于其他编程语言,线程依旧惊人的轻量。我们仅是希望说明 threaded 运行时并不是在所有场景都是最佳选择。

Haskell 中的并行编程

现在让我们来关注一下并行编程。对很多计算密集型问题,可以通过分解问题,并在多个核上求值来更快的计算出结果。多核计算机已经普及,甚至在最新的笔记本上都有,但是很少有程序可以利用这一优势。

大部分原因是因为传统观念认为并行编程非常困难。在一门典型的编程语言中,我们将用处理并发程序相同的库和设施处理并发程序。这是我们的注意力集中在处理一些熟悉的问题比如死锁、竞争条件、饥饿和陡峭的复杂性。

但是我们可以确定,使用 Haskell 的并发特性开发并行代码时,有许多更简单的方法。在一个普通的 Haskell 函数上稍加变化,就可以并行求值。

范式和首范式

seq 函数将一个表达式求值为首范式(简称 HNF)。seq 一旦到达最外层的构造函数(也就是 “首部”)就会停止,这与范式不同(NF), 被称作范式的表达式必然是被完全求值的,而非仅仅 “剥离” 掉最外层的构造函数。

你可能会经常听到 Haskell 程序员提到弱首范式(WHNF)。对一般数据来说,弱首范式和首范式相同。它们仅仅在功能上有些许区别, 这里我们就不过多关注了。

[ Forec 译注:读者只需要记住范式和弱首范式这两个概念的区别,HNF 几乎可以忽略。以下两个链接可以帮助更好的理解, 这部分内容与并行编程关系不是非常紧密,因此不在此处过多叙述: * Haskell Wiki: https://en.wikibooks.org/wiki/Haskell/Laziness * StackOverflow: http://stackoverflow.com/questions/6872898/haskell-what-is-weak-head-normal-form ]

排序

这是一个使用分治算法实现的 Haskell 排序函数:

-- file: ch24/Sorting.hs
sort :: (Ord a) => [a] -> [a]
sort (x:xs) = lesser ++ x:greater
  where lesser  = sort [y | y <- xs, y <  x]
        greater = sort [y | y <- xs, y >= x]
sort _ = []

sort 函数实现了著名的快速排序算法,很多 Haskell 程序员将其视作经典: 在早期的 Haskell 教程中,用一行代码实现的 sort 经常作为示例向读者展示 Haskell 强大的表达能力。 这里我们将代码切分为几行以方便比较串行和并行版本。

下面是对 sort 工作流程的简要介绍:

  1. 从列表中取出一个元素,这个元素被称为 “轴心”(或哨兵)。每个元素都要和轴心比较。上面的代码简单地通过模式匹配选取列表的第一个元素作为轴心;
  2. 使用原始列表中除轴心外的其它元素构造一个子列表,子列表中元素的值全部小于轴心,并递归地处理子列表;
  3. 与 2 类似,构造另一个子列表,但子列表中元素的值均大于或等于轴心的值,递归处理这个子列表;
  4. 将 2、3 两步中排序后的子列表通过轴心进行连接。

将代码变换为并行版本

并行版本的排序函数相对要复杂一些:

-- file: ch24/Sorting.hs
module Sorting where

import Control.Parallel (par, pseq)

parSort :: (Ord a) => [a] -> [a]
parSort (x:xs)    = force greater `par` (force lesser `pseq` (lesser ++ x:greater))
  where lesser  = parSort [y | y <- xs, y <  x]
        greater = parSort [y | y <- xs, y >= x]
parSort _         = []

不过我们并没有改变代码的结构。parSort 仅仅多使用了三个函数: parpseqforce

par 函数由 Control.Parallel 模块提供。它与 seq 目的类似:将左侧参数求值为弱首范式并返回右侧参数。 par 的名字很好地阐述了它的功能:par 能够在其它运算执行的同时并行地对其左侧参数求值。

pseq 也和 seq 类似:它在返回右侧表达式之前将左侧表达式求值为弱首范式。这二者之间的区别很微妙, 但对于并行编程而言非常重要:编译器不保证 seq 的求值顺序,即如果编译器认为首先对右侧参数求值能够提高性能,则 它将先计算右侧参数。对于单核执行的程序来说,这种灵活性很有必要,但对多核代码而言不够健壮。 相比之下,编译器能够保证 pseq 左侧参数的求值过程早于右侧参数。

以上修改将对一些我们没有提到的方面产生巨大的影响,比如说:

  • 使用多少个核心
  • 线程间如何通信
  • 如何将工作分配给多个可用核心
  • 明确哪些数据将在线程间共享,哪些属于线程私有
  • 如何确定所有任务均已完成

明确在并行中执行什么

并行的 Haskell 代码之所以能够表现出更优秀的性能,是因为计算过程中有大量重复、独立、可并行计算的工作。 非严格求值会阻碍并行程序的执行,因此我们会在并行代码中使用 force 函数。下面通过一个错误的例子解释 force 函数的功能:

-- file: ch24/Sorting.hs
sillySort (x:xs) = greater `par` (lesser `pseq` (lesser ++ x:greater))
  where lesser   = sillySort [y | y <- xs, y <  x]
        greater  = sillySort [y | y <- xs, y >= x]
sillySort _        = []

注意,我们在 parpseq 两处用普通求值取代了 force lesserforce greater

回忆一下,对弱首范式的求值会在 “看到” 表达式的外部构造器时停止。在错误的例子中,我们将每个有序的子列表求值为 WHNF。 因为最外层的构造器仅仅是一个列表构造器,上面的代码实际上仅仅强制对每个排序子列表的第一个元素做了求值, 每个排序子列表剩余的元素仍未被完全求值。换句话说,上面代码在并行部分执行时几乎没有做任何有效计算, sillySort 的执行过程和完全顺序的代码没什么差别。

[Forec 译注:考虑此前译注给出的例子,"ab" ++ "bc" 的弱首范式仅仅是 'a' : ("b" ++ "bc")。 以待排序列表 [3, 2, 5, 1, 3] 为例,sillySort 试图在 par 包装的并行操作中计算出排序好的两个子列表。 假设轴心元素为 3,则小于轴心的待排序子列表应为 [2, 1],而 par 中的并行操作在计算出第一个满足条件的 元素 2 后,得到 lesser = (2: _),已经遇到了最外层的列表构造器,因此停止计算。 真正求出完整待排序子列表是在后续顺序操作 ++ 时,因为需要列表中所有元素,这时程序才开始计算。

这里给出一个译者认为更容易理解弱首范式的例子:在 GHCI 中执行如下指令。

> let list = [1..10]
> let whnfList = map (+1) list
> :sprint whnfList
whnfList = _
> length whnfList
10
> :sprint whnfList
whnfList = [_, _, _, _, _, _, _, _, _, _]

可以看出,length 操作仅需要 whnfList 中元素的数量,并没有对其中元素进行更深层次的求值。

]

我们使用 force 函数,在构造函数返回前遍历整个列表以避免这种情况出现。

-- file: ch24/Sorting.hs
force :: [a] -> ()
force xs = go xs `pseq` ()
  where go (_:xs) = go xs
        go [] = 1

注意,我们并不在乎列表中具体有什么,而是仅仅把列表遍历一遍,遍历之后再调用 pseq 。 因为我们会在 par 或者 pseq 的左侧使用 force,所以返回值无所谓。

当然,很多情况下我们会需要对列表中的个别元素强制求值。下面会介绍一个基于类型类的解决方案。

par 提供什么保证?

实际上,par 函数并不保证会将表达式并行求值,它只在对表达式并行求值有意义的时候才这么做。 在并行编程中,这种行为比保证并行执行更有效。它允许运行时系统遇到 par 时智能调度。

举个例子,运行时系统可能发现表达式过于简单,并行求值带来的性能提升远低于并行操作本身的额外开销。 或者,运行时系统发现所有的计算核心均正在工作,而启动一个新的并行运算仅仅会增加待运行线程的数量。

这个潜规则影响了我们如何编写并行代码。假设系统性能不会因为线程间争夺核心资源下降, 考虑到 par 在运行时可以智能调度,我们就可以将它应用到任何想应用的地方。

运行并测试性能

sortparSortparSort2 保存到 Sorting.hs 中并封装为 Sorting 模块。 我们创建一个驱动程序以计算这些排序函数的性能:

-- file: ch24/SortMain.hs

module Main where

import Data.Time.Clock (diffUTCTime, getCurrentTime)
import System.Environment (getArgs)
import System.Random (StdGen, getStdGen, randoms)

import Sorting

-- testFunction = sort
-- testFunction = seqSort
testFunction = parSort
-- testFunction = parSort2 2

randomInts :: Int -> StdGen -> [Int]
randomInts k g = let result = take k (randoms g)
         in force result `seq` result

main = do
  args <- getArgs
  let count | null args = 500000
            | otherwise = read (head args)
  input <- randomInts count `fmap` getStdGen
  putStrLn $ "We have " ++ show (length input) ++ " elements to sort."
  start <- getCurrentTime
  let sorted = testFunction input
  putStrLn $ "Sorted all " ++ show (length sorted) ++ " elements."
  end <- getCurrentTime
  putStrLn $ show (end `diffUTCTime` start) ++ " elapsed."

简单起见,我们使用 testFunction 变量选择用于基准测试的排序函数。

上面的程序接受一个可选的命令行参数,用于指定待排序随机数组的长度。

非严格求值是性能测量和分析中要注意的 “雷区”。下面是驱动程序中特别要避免的一些潜在问题:

  • 非严格求值会使测量单一行为变为测量多个行为。 Haskell 默认的随机数产生器(PRNG)很慢,而且 random 函数仅仅在需要的时候才产生下一个随机数。我们在记录开始时间之前对输入列表的每个元素进行了强制求值,并且打印了列表的长度:这保证了程序在计算之前就已经生成了全部的随机数。如果我们忽略了这一步,则并行计算的过程会包含随机数的生成,进而导致测量出的时间变为生成随机数和数据排序所用时间之和,而非数据排序本身的耗时。
  • 隐含的数据依赖。 在产生随机数列表时,只打印列表的长度并不会对列表完全求值。length 函数只会遍历列表的结构而非列表内的每个元素。因此,在排序操作执行前,列表中并没有生成好的随机数。这一行为可能严重拖慢性能。每个随机数的产生都取决于列表中前一个随机数的值,但并行的数据排序已经将列表元素分散到了不同的处理器内核中。如果排序前没有对输入的随机数列表完全求值,那么运行时就会遭遇可怕的 “乒乓” 效应:计算会在核心之间不停跳跃,导致性能的迅速下降。尝试删除 main 函数中应用在随机数列表上的 force :你会发现并行代码比顺序执行的程序慢了三倍。[Forec 译注:原著作者似乎忘记在代码中使用 force。可以在运算开始前加入 force input。]
  • 让我们错误地认为代码执行了有意义的工作。 为了保证数据排序的执行,我们在记录结束时间之前将结果列表的长度打印到了屏幕上。如果没有 putStrLn 强制要求列表长度,排序压根就不会执行。

在构建程序时开启优化和 GHC 的运行时线程支持:

$ ghc -threaded -O2 --make SortMain
[1 of 2] Compiling Sorting          ( Sorting.hs, Sorting.o )
[2 of 2] Compiling Main             ( SortMain.hs, SortMain.o )
Linking SortMain ...

在程序运行时告知 GHC 的运行时系统使用多少核心。首先测试最原始的 sort 函数,看看基础性能如何:

$ ./Sorting +RTS -N1 -RTS 700000
We have 700000 elements to sort.
Sorted all 700000 elements.
3.178941s elapsed.

启用两个核心并不会提升顺序执行代码的性能:

$ ./Sorting +RTS -N2 -RTS 700000
We have 700000 elements to sort.
Sorted all 700000 elements.
3.259869s elapsed.

如果重新编译,测试 parSort 的性能,就会发现结果还不如顺序代码:

$ ./Sorting +RTS -N1 -RTS 700000
We have 700000 elements to sort.
Sorted all 700000 elements.
3.915818s elapsed.
$ ./Sorting +RTS -N2 -RTS 700000
We have 700000 elements to sort.
Sorted all 700000 elements.
4.029781s elapsed.

性能上没有任何提升。这看起来是以下两个原因之一造成的:要么是 par 本身的开销过大, 要么是我们滥用了 par 。为了鉴别究竟哪个才是罪魁祸首, 我们编写了一个类似 parSort 的函数 seqSort,它使用 pseq 代替 par

seqSort :: (Ord a) => [a] -> [a]
seqSort (x:xs) = lesser `pseq` (greater `pseq` (lesser ++ x:greater))
  where lesser  = seqSort [y | y <- xs, y <  x]
        greater = seqSort [y | y <- xs, y >= x]
seqSort _ = []

我们还删去了 parSort 中对 force 的调用。所以将 seqSortsort 比较就可以观察到只应用 pseq 的性能:

$ ./Sorting +RTS -N1 -RTS 700000
We have 700000 elements to sort.
Sorted all 700000 elements.
3.848295s elapsed.

上面的运行结果说明 parpseq 耗时相似。我们该如何改进性能呢?

性能调优

parSort 中,调用 par 的次数是待排序数据数量的两倍。尽管 par 的开销很小,但它毕竟也不是 “免费” 的。 当递归调用 parSort 时,我们最终将 par 应用到了单个元素上。在这种细微的粒度下,使用 par 的成本远超其带来的增益。 为了减少这种影响,我们在待排序数据数量小于某个阈值时采用非并行排序。

-- file: ch24/Sorting.hs
parSort2 :: (Ord a) => Int -> [a] -> [a]
parSort2 d list@(x:xs)
  | d <= 0     = sort list
  | otherwise  = force greater `par` (force lesser `pseq` (lesser ++ x:greater))
    where lesser      = parSort2 d' [y | y <- xs, y <  x]
          greater     = parSort2 d' [y | y <- xs, y >= x]
          d' = d - 1
parSort2 _ _              = []

parSort2 在到达可控深度时停止递归以及创建新的并行计算。如果待处理数据的规模已知,程序就能在剩余工作数量足够少的时刻停止 并行计算并切换到非并行代码。

$ ./Sorting +RTS -N2 -RTS 700000
We have 700000 elements to sort.
Sorted all 700000 elements.
2.947872s elapsed.

在双核系统中,这一改进将运行速度提升了大约 25%。性能的提升并不高,但想想我们对代码所做的改变:几句注解而已就让速度提升了四分之一。

上面的排序函数难以获得良好的并行性能。它执行了大量内存分配,导致垃圾回收器频繁运行。-sstderr 这一 RTS 选项能够将垃圾回收统计信息打印到屏幕,反馈信息显示程序大约有 40% 的时间用于回收垃圾。 由于 GHC 6.8 中的垃圾回收器会停止所有线程并运行在单个内核上,这一点也就成为了程序的瓶颈。

par 应用在内存分配不频繁的代码中可能会带来明显的性能改进。我们已经看到,相比单核运算, 上面的基准测试在双核系统上能够取得 1.8 倍加速。在这本书编写时,GHC 正在开发一个并行的垃圾回收器, 届时在多核系统上执行大量内存分配的代码也能获得令人满意的性能。

[Forec 译注:翻译此处时 GHC 的最新版本为 8.0.2,已支持多核系统的垃圾回收。实际测试表明, 即使支持多核 GC,并行编程的主要瓶颈在很多时候还是垃圾回收。]

练习

  1. 决定什么时候从 parSort2 切换到 sort 不是一件容易的事。我们上面实现的方法是根据递归的深度选择,另一种方法是根据待排序子列表的长度决定。重写 parSort2 使其在待排序子列表长度小于某个数的时候切换到 sort
  2. 测试基于列表长度切换的 parSort2 的性能,并于基于递归深度的版本比较。哪一个性能更好?

并行策略和 Map Reduce

在编程社区中,Google 用于并行处理海量数据的 MapReduce 框架是受函数式编程启发的著名软件系统之一。

我们可以用 Haskell 轻松构建一个简单而实用的 “山寨版” MapReduce。我们将把重点放在 Web 服务器日志文件的处理上,这些文件通常规模较大并且包含丰富的信息 [57]

下面这个例子是 Apache Web 服务器记录的一条访问日志。这条日志原本是一行,这里为了阅读方便将其切分成多行。

201.49.94.87 - - [08/Jun/2008:07:04:20 -0500] "GET / HTTP/1.1"
200 2097 "http://en.wikipedia.org/wiki/Mercurial_(software)"
"Mozilla/5.0 (Windows; U; Windows XP 5.1; en-GB; rv:1.8.1.12)
Gecko/20080201 Firefox/2.0.0.12" 0 hgbook.red-bean.com

虽然直接处理这些日志文件的实现非常简单,但我们显然不能满足于此。如果换个思路,考虑解决一类问题而非一个问题,我们就可能得到适用范围更广的代码。

在开发并行程序时,无论使用何种底层语言,我们都总会遇到一些 “坏钱币” 问题:

[Forec 译注:坏钱币,原文为 “Bad penny”,意味着一个不好的东西总会一次接一次地出现。]

  • 算法很快就会被分区和通信的细节所掩盖,从而导致代码变得难以理解和修改;
  • 对 “粒度” 的选择将变得困难 —— 我们将无法轻易地决定应该分配多少工作给每个核心。如果粒度太小了,那么核心将会在簿记工作上花费大量时间,导致并行程序跑得比串行程序还要慢;但如果粒度太大的话,某些核心又会因为糟糕的负载平衡而被闲置。

将算法和求值分离

并行 Haskell 代码不存在传统语言中通信部分代码产生的混乱,取而代之的,是 parpseq 对应的繁杂注释。举个例子,下面这个函数的操作方式与 map 类似,但会以并行方式将每个元素求值为弱首范式(WHNF)。[Forec 译注:弱首范式的概念在上文的译注中已经介绍过。]

-- file: ch24/ParMap.hs
import Control.Parallel (par)

parallelMap :: (a -> b) -> [a] -> [b]
parallelMap f (x:xs) = let r = f x
                       in r `par` r : parallelMap f xs
parallelMap _ _      = []

类型 b 可以是一个列表,或者是其它容易求值为弱首范式的类型。我们希望得到万精油式的解法,而不必为列表或者其他任何特殊类型编写定制的 parallelMap

要解决这个问题,首先需要考虑如何强制求值。下面这个函数将列表中的每个元素强制求值为弱首范式:

-- file: ch24/ParMap.hs
forceList :: [a] -> ()
forceList (x:xs) = x `pseq` forceList xs
forceList _      = ()

上面的函数并不对列表做任何计算(实际上,该函数的类型签名就已经说明它无法做任何计算,因为它并不知晓列表中的具体元素)。这个函数的唯一目的是确保列表被求值为首范式,它仅能作为 seqpar 函数的第一个参数,例如:

-- file: ch24/ParMap.hs
stricterMap :: (a -> b) -> [a] -> [b]
stricterMap f xs = forceList xs `seq` map f xs

上面的 stricterMap 仅仅使列表中的元素被求值为弱首范式。我们可以添加一个函数作为参数,从而强制每个元素被求值至更深的层次:

-- file: ch24/ParMap.hs
forceListAndElts :: (a -> ()) -> [a] -> ()
forceListAndElts forceElt (x:xs) =
    forceElt x `seq` forceListAndElts forceElt xs
forceListAndElts _        _      = ()

模块 Control.Parallel.Strategies 将这个想法浓缩为一个库,它引入了 “求值策略”(Evaluation Strategy) 这个概念:

-- file: ch24/Strat.hs
type Done = ()
type Strategy a = a -> Done

一个求值策略不会执行任何计算操作,它仅仅保证求值的程度。最简单的策略是 r0,也就是什么都不做:

-- file: ch24/Strat.hs
r0 :: Strategy a
r0 _ = ()

下面是 rwhnf ,它会将参数求值为弱首范式:

-- file: ch24/Strat.hs
rwhnf :: Strategy a
rwhnf x = x `seq` ()

这个模块还提供了一个类型类 NFData,它提供了方法 rnf ,该方法能够将一个值求值为范式:

-- file: ch24/Strat.hs
class NFData a where
  rnf :: Strategy a
  rnf = rwhnf

[Forec 译注: NFData 类型类已经被迁移至 Control.DeepSeqrnf 现在已经不再是策略,取而代之的是 rdeepseq 。]

对于基本的类型(如 Int ),弱首范式和范式完全相同,这也是 NFData 类型类将 rwhnf 用作 rnf 默认实现的原因。 Control.Parallel.Strategies 模块也为许多常见类型提供了 NFData 的实例。

-- file: ch24/Strat.hs
instance NFData Char
instance NFData Int

instance NFData a => NFData (Maybe a) where
    rnf Nothing  = ()
    rnf (Just x) = rnf x

{- ... and so on ... -}

通过这些例子,你应该已经大致清楚如何为自定义类型编写 NFData 实例了。你的 rnf 应当能够处理每个构造器,并且将 rnf 应用到构造器的每个字段上。

将算法和策略分离

我们可以通过这些基本策略构造更多复杂策略。 Control.Parallel.Strategies 提供了很多更复杂的策略。比如说, parList 会将一个求值策略并行地应用到列表的每个元素上:

-- file: ch24/Strat.hs
parList :: Strategy a -> Strategy [a]
parList strat []     = ()
parList strat (x:xs) = strat x `par` (parList strat xs)

该模块用 parList 定义了并行的 map 函数:

-- file: ch24/Strat.hs
parMap :: Strategy b -> (a -> b) -> [a] -> [b]
parMap strat f xs = map f xs `using` parList strat

这正是代码的有趣之处。在 using 的左边是普通形式的 map ,而右侧是求值策略。组合器 using 告诉我们如何对一个值使用策略,从而使代码和具体的求值方式分离:

-- file: ch24/Strat.hs
using :: a -> Strategy a -> a
using x s = s x `seq` x

Control.Parallel.Strategies 还包含了许多其它函数,它们可以更精确地求值。例如,parZipWith 通过求值策略并行地应用 zipWith

-- file: ch24/Strat.hs
vectorSum' :: (NFData a, Num a) => [a] -> [a] -> [a]
vectorSum' = parZipWith rnf (+)

[Forec 译注:函数 parZipWith 似乎并不存在,也许这里是原作自己实现的某个函数。]

编写简单的 MapReduce 定义

只需要考虑一下 mapReduce 函数需要做什么,就能推导出它的类型。首先,map 部件是必须的,因此有类型 a -> b ;其次需要 reduce ,也就是 fold 的同义词,这里我们使用更普适的类型 [b] -> c 。这个类型同时兼容左/右 fold,因此我们可以根据数据和处理过程灵活地选择。

把这些类型整合到一起,就得到了以下这个完整的类型:

-- file: ch24/MapReduce.hs
simpleMapReduce
    :: (a -> b)      -- map function
    -> ([b] -> c)    -- reduce function
    -> [a]           -- list to map over
    -> c

代码非常简单:

simpleMapReduce mapFunc reduceFunc = reduceFunc . map mapFunc

MapReduce 和策略

上面定义的 simpleMapReduce 实在太简单了。我们希望能够指定一些并行执行的工作,让它变得更加实用。这一目标可以通过分别在 map 和 reduce 阶段传入策略实现。

-- file: ch24/MapReduce.hs
mapReduce
    :: Strategy b    -- evaluation strategy for mapping
    -> (a -> b)      -- map function
    -> Strategy c    -- evaluation strategy for reduction
    -> ([b] -> c)    -- reduce function
    -> [a]           -- list to map over
    -> c

修改后的类型和函数体大小都有所增长。

-- file: ch24/MapReduce.hs
mapReduce mapStrat mapFunc reduceStrat reduceFunc input =
    mapResult `pseq` reduceResult
  where mapResult    = parMap mapStrat mapFunc input
        reduceResult = reduceFunc mapResult `using` reduceStrat

适当调整工作量

要取得更好的性能,必须保证 par 中每个应用所做的工作量远大于维护并行计算所需额外数据的开销。例如处理一个巨大的文件时,若按行切割,那么真正有意义的工作将远小于切割、合并的额外开销。

我们将在后面的章节开发一种处理大块文件的方式。这些大块会由什么组成?由于 Web 服务器日志文件应该只包含 ASCII 文本,我们将会看到 ByteString 的出色性能:这种类型极其高效,并且以流方式传输时仅消耗非常少的内存。

-- file: ch24/LineChunks.hs
module LineChunks
    (
      chunkedReadWith
    ) where

import Control.Exception (bracket, finally)
import Control.Monad (forM, liftM)
import Control.Parallel.Strategies (NFData, rnf)
import Data.Int (Int64)
import qualified Data.ByteString.Lazy.Char8 as LB
import GHC.Conc (numCapabilities)
import System.IO

data ChunkSpec = CS {
      chunkOffset :: !Int64
    , chunkLength :: !Int64
    } deriving (Eq, Show)

withChunks :: (NFData a) =>
              (FilePath -> IO [ChunkSpec])
           -> ([LB.ByteString] -> a)
           -> FilePath
           -> IO a
withChunks chunkFunc process path = do
  (chunks, handles) <- chunkedRead chunkFunc path
  let r = process chunks
  (rnf r `seq` return r) `finally` mapM_ hClose handles

chunkedReadWith :: (NFData a) =>
                   ([LB.ByteString] -> a) -> FilePath -> IO a
chunkedReadWith func path =
    withChunks (lineChunks (numCapabilities * 4)) func path

这个程序会以并行的方式处理各个块,并谨慎地利用惰性 I/O 的优势以便确保程序可以安全地传输这些块。

减轻惰性 I/O 的风险

惰性 I/O 带来了一些众所周知的风险,以下是一些我们需要避免的地方:

  • 如果不强制要求程序从文件中拉取数据并计算,那么惰性 I/O 的特性会隐式地延长文件句柄打开的时间。操作系统会限制同时打开的文件数量,这个限制的数量较小且固定。如果这个风险不解决,可能会导致程序中使用到文件句柄的其他部分发生饥饿现象。
  • 如果程序没有显式地关闭文件句柄,那么文件句柄将由垃圾回收器负责自动关闭,但等到 GC 注意到这个泄漏的文件句柄可能需要很长的时间。这与上面的饥饿风险同理。
  • 显式关闭句柄能够避免饥饿,但如果关闭得太早,惰性计算可能还需要从这个被关闭的句柄中读取更多数据,这将导致该计算的失败。

除了这些常见的风险外,我们无法通过单个文件句柄为多个线程提供数据。一个文件句柄只有一个 “寻址指针”,它指明了当前应当读取的位置。如果我们想要读取多个块,那么每一块都需要从文件中的不同位置读取,而单个句柄显然是无法满足这种要求的。

把上面这些思考都整合到一起,我们就得到了以下的惰性 I/O 解决方案:

-- file: ch24/LineChunks.hs
chunkedRead :: (FilePath -> IO [ChunkSpec])
            -> FilePath
            -> IO ([LB.ByteString], [Handle])
chunkedRead chunkFunc path = do
  chunks <- chunkFunc path
  liftM unzip . forM chunks $ \spec -> do
    h <- openFile path ReadMode
    hSeek h AbsoluteSeek (fromIntegral (chunkOffset spec))
    chunk <- LB.take (chunkLength spec) `liftM` LB.hGetContents h
    return (chunk, h)

上面的代码通过显式关闭文件句柄来避免饥饿问题。程序会为读取同一个文件的多个线程分别提供独立的句柄,从而允许这些线程同时读取该文件的不同块。

我们要解决的最后一个问题,就是如何防止句柄在计算结束之前被关闭。程序使用 rnf 保证所有的处理过程在 withChunks 函数返回前完成,从而可以显式关闭文件句柄(可以确保这些句柄不会再被读取)。如果你必须在程序中使用惰性 I/O,那么最好用这种方式构建一道 “防火墙”,从而避免在意想不到的位置发生错误。

高效寻找行对齐的块

由于服务器的日志文件是面向行的,所以需要寻找一种高效的方式,保证在文件切分为大块后,每块均在行的边界上结束。我们不想通过扫描一个块的所有数据来确定边界,因为每一块数据可能达到几十兆字节。

无论是固定块的大小还是固定块的数量,我们的方法都能奏效,这里选择后者。首先寻求一个大块结尾的大致位置,之后向前扫描直到换行符,然后在换行符后开始寻找下一个块,并重复该过程。

-- file: ch24/LineChunks.hs
lineChunks :: Int -> FilePath -> IO [ChunkSpec]
lineChunks numChunks path = do
  bracket (openFile path ReadMode) hClose $ \h -> do
    totalSize <- fromIntegral `liftM` hFileSize h
    let chunkSize = totalSize `div` fromIntegral numChunks
        findChunks offset = do
          let newOffset = offset + chunkSize
          hSeek h AbsoluteSeek (fromIntegral newOffset)
          let findNewline off = do
                eof <- hIsEOF h
                if eof
                  then return [CS offset (totalSize - offset)]
                  else do
                    bytes <- LB.hGet h 4096
                    case LB.elemIndex '\n' bytes of
                      Just n -> do
                        chunks@(c:_) <- findChunks (off + n + 1)
                        let coff = chunkOffset c
                        return (CS offset (coff - offset):chunks)
                      Nothing -> findNewline (off + LB.length bytes)
          findNewline newOffset
    findChunks 0

最后一块的大小会比前面的块稍小一些,但这种差异在实际应用中无关紧要。

计算行数

下面这个简单的例子展示了如何使用我们构建的 “脚手架”。

-- file: ch24/LineCount.hs
module Main where

import Control.Monad (forM_)
import Data.Int (Int64)
import qualified Data.ByteString.Lazy.Char8 as LB
import System.Environment (getArgs)

import LineChunks (chunkedReadWith)
import MapReduce (mapReduce, rnf)

lineCount :: [LB.ByteString] -> Int64
lineCount = mapReduce rnf (LB.count '\n')
                      rnf sum

main :: IO ()
main = do
  args <- getArgs
  forM_ args $ \path -> do
    numLines <- chunkedReadWith lineCount path
    putStrLn $ path ++ ": " ++ show numLines

若使用 ghc -O2 --make-threaded 命令编译此程序,它在运行过一次后性能会更好(文件系统已缓存数据)。在双核笔记本上,处理 248MB (110 万行)的数据只需要 0.361 秒(加上 +RTS -N2 参数),单核需要 0.576 秒。

找到最受欢迎的 URL

下面这个例子将计算每个 URL 被访问的次数,它来自 [Google08] (Google 讨论 MapReduce 的原始论文)。在 map 阶段,我们为每个数据块创建从 URL 映射到访问次数的 Map 。在 reduce 阶段,合并这些映射。

[Forec 译注:该链接需科学上网后访问。]

-- file: ch24/CommonURLs.hs
module Main where

import Control.Parallel.Strategies (NFData(..), rwhnf)
import Control.Monad (forM_)
import Data.List (foldl', sortBy)
import qualified Data.ByteString.Lazy.Char8 as L
import qualified Data.ByteString.Char8 as S
import qualified Data.Map as M
import Text.Regex.PCRE.Light (compile, match)

import System.Environment (getArgs)
import LineChunks (chunkedReadWith)
import MapReduce (mapReduce)

countURLs :: [L.ByteString] -> M.Map S.ByteString Int
countURLs = mapReduce rwhnf (foldl' augment M.empty . L.lines)
                      rwhnf (M.unionsWith (+))
  where augment map line =
            case match (compile pattern []) (strict line) [] of
              Just (_:url:_) -> M.insertWith' (+) url 1 map
              _ -> map
        strict  = S.concat . L.toChunks
        pattern = S.pack "\"(?:GET|POST|HEAD) ([^ ]+) HTTP/"

为了从日志文件的每一行中提取出 URL,我们使用了在 chapter_17 中开发的 PCRE 正则表达式库。

驱动程序打印了十条最受欢迎的 URL。与统计行数的例子类似,这个程序在双核上的速度是单核的 1.8 倍,处理 110 万条日志只需要 1.7 秒。

总结

给定一个适合 MapReduce 模型的问题,MapReduce 编程模型可以帮助我们使用 Haskell 以极小的工作量编写 “临时” 但高效的并行程序。此外,这个想法可以轻松扩展到其它数据源,比如文件集以及网络上的数据资源。

在许多情况下,为了跟上计算核心的处理速度,数据流的速度必须足够快,这一点将成为性能瓶颈。例如,若我们尝试将上述某个示例程序应用于一个没有缓存在主存的文件,或者是一个通过高带宽存储阵列传输的文件,那么大部分时间都将浪费在等待磁盘 I/O 上,而无法利用多核的优势。

[54]在稍后将展示,GHC 的线程异常轻量。如果运行时提供检查每个线程状态的方法,每个线程的开销将增加,哪怕永远不会用到这些信息。
[55]non-threaded 运行时不接受这个选项,会用一条错误信息拒绝它。
[56]此书撰写时,垃圾收集器已经开始重新编写以利用多核,但是我们不确定它在未来的效果。
[57]这个想法是 Tim Bray 提出的。