在Haskell中执行嵌套循环有两种明显的“惯用”方法:使用列表monad或使用forM\uuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu。我设置了一个基准来确定这些是否编译为紧密循环:
import Control.Monad.Loop
import Control.Monad.Primitive
import Control.Monad
import Control.Monad.IO.Class
import qualified Data.Vector.Unboxed.Mutable as MV
import qualified Data.Vector.Unboxed as V
times = 100000
side = 100
-- Using `forM_` to replace traditional fors
test_a mvec =
forM_ [0..times-1] $ \ n -> do
forM_ [0..side-1] $ \ y -> do
forM_ [0..side-1] $ \ x -> do
MV.write mvec (y*side+x) 1
-- Using the list monad to replace traditional forms
test_b mvec = sequence_ $ do
n <- [0..times-1]
y <- [0..side-1]
x <- [0..side-1]
return $ MV.write mvec (y*side+x) 1
main = do
let vec = V.generate (side*side) (const 0)
mvec <- V.unsafeThaw vec :: IO (MV.MVector (PrimState IO) Int)
-- test_a mvec
-- test_b mvec
vec' <- V.unsafeFreeze mvec :: IO (V.Vector Int)
print $ V.sum vec'
此测试创建一个100x100向量,使用嵌套循环向每个索引写入1,并重复该100k次。用ghc-O2测试编译。hs-o测试(ghc版本7.8.4),结果是:
表单
版本为3.853s,列表单子为10.460s。为了提供参考,我还用JavaScript编写了此测试:
var side = 100;
var times = 100000;
var vec = [];
for (var i=0; i<side*side; ++i)
vec.push(0);
for (var n=0; n<times; ++n)
for (var y=0; y<side; ++y)
for (var x=0; x<side; ++x)
vec[x+y*side] = 1;
var s = 0;
for (var i=0; i<side*side; ++i)
s += vec[i];
console.log(s);
这个等价的JavaScript程序需要1s才能完成,而不是Haskell的unbox向量,这是不寻常的,这表明Haskell不是在常量空间中运行循环,而是执行分配。然后我发现了一个库,它声称提供类型保证的紧循环<代码>控制。单子。循环
:
-- Using `for` from Control.Monad.Loop
test_c mvec = exec_ $ do
n <- for 0 (< times) (+ 1)
x <- for 0 (< side) (+ 1)
y <- for 0 (< side) (+ 1)
liftIO (MV.write mvec (y*side+x) 1)
在1s内运行。这个库使用得不多,也远不是惯用的,那么,获得快速恒定空间二维计算的惯用方法是什么呢?(注意,这不是REPA的情况,因为我想在网格上执行任意IO操作。)
匿名用户
根据我的经验forM_[0... n-1]可以很好地执行,但不幸的是它不可靠。只需将INLINE
pragma添加到test_a
并使用-O2
即可使其运行得更快(对我来说是4秒到1秒),但手动内联(复制粘贴)会再次减慢速度。
一个更可靠的函数是统计中的
-- | Simple for loop. Counts from /start/ to /end/-1.
for :: Monad m => Int -> Int -> (Int -> m ()) -> m ()
for n0 !n f = loop n0
where
loop i | i == n = return ()
| otherwise = f i >> loop (i+1)
{-# INLINE for #-}
在列表中使用它类似于表单:
test_d :: MV.IOVector Int -> IO ()
test_d mv =
for 0 times $ \_ ->
for 0 side $ \i ->
for 0 side $ \j ->
MV.unsafeWrite mv (i*side + j) 1
但性能可靠(对我来说是0.85秒),没有分配列表的风险。
用GHC编写紧凑的变异代码有时会很棘手。我将写一些不同的东西,可能是以一种更为杂乱无章和tl的方式;医生比我更喜欢。
对于初学者,我们应该在任何情况下使用GHC 7.10,否则forM\uCode>和list monad解决方案都不会融合。
此外,我还更换了MV。使用MV写入。unsafeWrite,部分原因是它更快,但更重要的是它减少了最终核心中的一些混乱。从现在起,运行时统计信息将引用带有unsafeWrite的代码。
即使使用GHC 7.10,我们也应该首先注意到所有那些[0... time-1]
和[0... side-1]
表达式,因为如果我们不采取必要的步骤,它们每次都会破坏性能。问题是它们是恒定的范围,并且-ffull-lazity
(在-O
上默认启用)将它们浮动到顶级。这会阻止列表融合,并且遍历Int#
范围比遍历装箱的Int
-s列表更便宜,所以这是一个非常糟糕的优化。
让我们看看未更改的代码(除了使用unsafeWrite)的一些以秒为单位的运行时<代码>ghc-O2-fllvm使用,我使用RTS-s进行计时。
test_a: 1.6
test_b: 6.2
test_c: 0.6
对于GHC Core查看,我使用了ghc-O2-ddump-simpl-d抑制-所有-dno-抑制-类型签名
。
在test_a
的情况下,[0...99]
范围被取消:
main4 :: [Int]
main4 = eftInt 0 99 -- means "enumFromTo" for Int.
尽管最外层的[0...9999]
循环被融合到尾部递归助手中:
letrec {
a3_s7xL :: Int# -> State# RealWorld -> (# State# RealWorld, () #)
a3_s7xL =
\ (x_X5zl :: Int#) (s1_X4QY :: State# RealWorld) ->
case a2_s7xF 0 s1_X4QY of _ { (# ipv2_a4NA, ipv3_a4NB #) ->
case x_X5zl of wild_X1S {
__DEFAULT -> a3_s7xL (+# wild_X1S 1) ipv2_a4NA;
99999 -> (# ipv2_a4NA, () #)
}
}; }
在test_b
的情况下,同样只有[0...99]
被取消。然而,test_b
要慢得多,因为它必须构建和排序实际的[IO()]
列表。至少GHC足够明智,可以只为两个内部循环构建单个[IO()]
,然后对其执行排序10000
次。
let {
lvl7_s4M5 :: [IO ()]
lvl7_s4M5 = -- omitted
letrec {
a2_s7Av :: Int# -> State# RealWorld -> (# State# RealWorld, () #)
a2_s7Av =
\ (x_a5xi :: Int#) (eta_B1 :: State# RealWorld) ->
letrec {
a3_s7Au
:: [IO ()] -> State# RealWorld -> (# State# RealWorld, () #)
a3_s7Au =
\ (ds_a4Nu :: [IO ()]) (eta1_X1c :: State# RealWorld) ->
case ds_a4Nu of _ {
[] ->
case x_a5xi of wild1_X1y {
__DEFAULT -> a2_s7Av (+# wild1_X1y 1) eta1_X1c;
99999 -> (# eta1_X1c, () #)
};
: y_a4Nz ys_a4NA ->
case (y_a4Nz `cast` ...) eta1_X1c
of _ { (# ipv2_a4Nf, ipv3_a4Ng #) ->
a3_s7Au ys_a4NA ipv2_a4Nf
}
}; } in
a3_s7Au lvl7_s4M5 eta_B1; } in
-- omitted
我们该如何补救?我们可以用{-\OPTIONS\u GHC-fno-fulllaziness{-}
来解决这个问题。这对我们的情况确实有很大帮助:
test_a: 0.5
test_b: 0.48
test_c: 0.5
或者,我们可以摆弄INLINE
杂注。显然,在let浮动完成后内联函数可以保持良好的性能。我发现GHC即使没有杂注也会内联我们的测试函数,但显式杂注仅在let浮动后才会导致它内联。例如,这会在没有-fno-full-lazity
的情况下产生良好的性能:
test_a mvec =
forM_ [0..times-1] $ \ n ->
forM_ [0..side-1] $ \ y ->
forM_ [0..side-1] $ \ x ->
MV.unsafeWrite mvec (y*side+x) 1
{-# INLINE test_a #-}
但过早内联会导致性能不佳:
test_a mvec =
forM_ [0..times-1] $ \ n ->
forM_ [0..side-1] $ \ y ->
forM_ [0..side-1] $ \ x ->
MV.unsafeWrite mvec (y*side+x) 1
{-# INLINE [~2] test_a #-} -- "inline before the first phase please"
这种内联解决方案的问题是,面对GHC的浮动攻击,它相当脆弱。例如,手动内联不能保持性能。以下代码速度很慢,因为与内联代码类似,它给GHC一个浮动的机会:
main = do
let vec = V.generate (side*side) (const 0)
mvec <- V.unsafeThaw vec :: IO (MV.MVector (PrimState IO) Int)
forM_ [0..times-1] $ \ n ->
forM_ [0..side-1] $ \ y ->
forM_ [0..side-1] $ \ x ->
MV.unsafeWrite mvec (y*side+x) 1
那我们该怎么办呢?
首先,我认为对于那些想编写高性能代码并对自己的工作有很好了解的人来说,使用fno-fulllaziness是一个完全可行的、甚至更好的选择。例如,它用于无序容器中。有了它,我们对共享有了更精确的控制,并且我们总是可以手动进行浮出或内联操作。
对于更常规的代码,我相信使用控件没有什么错。单子。循环或提供该功能的任何其他软件包。许多Haskell用户对依赖小型“边缘”库并不谨慎。我们还可以在所需的通用性中重新实现
。例如,以下解决方案的性能与其他解决方案一样好:
for :: Monad m => a -> (a -> Bool) -> (a -> a) -> (a -> m ()) -> m ()
for init while step body = go init where
go !i | while i = body i >> go (step i)
go i = return ()
{-# INLINE for #-}
我起初对堆分配上的RTS-s数据感到非常困惑。test_a
与-fno-full-lazness
一起分配,并且test_c
没有完全的惰性,这些分配随着次数
迭代的数量线性扩展,但是test_b
具有完全的惰性,仅为向量分配:
-- with -fno-full-laziness, no INLINE pragmas
test_a: 242,521,008 bytes
test_b: 121,008 bytes
test_c: 121,008 bytes -- but 240,120,984 with full laziness!
此外,在这种情况下,test\u c的
INLINE
pragmas也没有任何帮助。
我花了一些时间试图在相关程序的核心中找到堆分配的迹象,但没有成功,直到我突然意识到:GHC堆栈帧在堆上,包括主线程的帧,而进行堆分配的函数基本上是在最多三个堆栈帧中运行三次嵌套循环。RTS-s注册的堆分配只是堆栈帧的不断弹出和推送。
这在以下代码的核心中非常明显:
{-# OPTIONS_GHC -fno-full-laziness #-}
-- ...
test_a mvec =
forM_ [0..times-1] $ \ n ->
forM_ [0..side-1] $ \ y ->
forM_ [0..side-1] $ \ x ->
MV.unsafeWrite mvec (y*side+x) 1
main = do
let vec = V.generate (side*side) (const 0)
mvec <- V.unsafeThaw vec :: IO (MV.MVector (PrimState IO) Int)
test_a mvec
这就是我在这里的荣耀。请随意跳过。
main1 :: State# RealWorld -> (# State# RealWorld, () #)
main1 =
\ (s_a5HK :: State# RealWorld) ->
case divInt# 9223372036854775807 8 of ww4_a5vr { __DEFAULT ->
-- start of vector creation ----------------------
case tagToEnum# (># 10000 ww4_a5vr) of _ {
False ->
case newByteArray# 80000 (s_a5HK `cast` ...)
of _ { (# ipv_a5fv, ipv1_a5fw #) ->
letrec {
$s$wa_s8jS
:: Int#
-> Int#
-> State# (PrimState IO)
-> (# State# (PrimState IO), Int #)
$s$wa_s8jS =
\ (sc_s8jO :: Int#)
(sc1_s8jP :: Int#)
(sc2_s8jR :: State# (PrimState IO)) ->
case tagToEnum# (<# sc1_s8jP 10000) of _ {
False -> (# sc2_s8jR, I# sc_s8jO #);
True ->
case writeIntArray# ipv1_a5fw sc_s8jO 0 (sc2_s8jR `cast` ...)
of s'#_a5Gn { __DEFAULT ->
$s$wa_s8jS (+# sc_s8jO 1) (+# sc1_s8jP 1) (s'#_a5Gn `cast` ...)
}
}; } in
case $s$wa_s8jS 0 0 (ipv_a5fv `cast` ...)
-- end of vector creation -------------------
of _ { (# ipv6_a4Hv, ipv7_a4Hw #) ->
letrec {
a2_s7MJ :: Int# -> State# RealWorld -> (# State# RealWorld, () #)
a2_s7MJ =
\ (x_a5Ho :: Int#) (eta_B1 :: State# RealWorld) ->
letrec {
a3_s7ME :: Int# -> State# RealWorld -> (# State# RealWorld, () #)
a3_s7ME =
\ (x1_X5Id :: Int#) (eta1_XR :: State# RealWorld) ->
case ipv7_a4Hw of _ { I# dt4_a5x6 ->
case writeIntArray#
(ipv1_a5fw `cast` ...) (*# x1_X5Id 100) 1 (eta1_XR `cast` ...)
of s'#_a5Gn { __DEFAULT ->
letrec {
a4_s7Mz :: Int# -> State# RealWorld -> (# State# RealWorld, () #)
a4_s7Mz =
\ (x2_X5J8 :: Int#) (eta2_X1U :: State# RealWorld) ->
case writeIntArray#
(ipv1_a5fw `cast` ...)
(+# (*# x1_X5Id 100) x2_X5J8)
1
(eta2_X1U `cast` ...)
of s'#1_X5Hf { __DEFAULT ->
case x2_X5J8 of wild_X2o {
__DEFAULT -> a4_s7Mz (+# wild_X2o 1) (s'#1_X5Hf `cast` ...);
99 -> (# s'#1_X5Hf `cast` ..., () #)
}
}; } in
case a4_s7Mz 1 (s'#_a5Gn `cast` ...)
of _ { (# ipv2_a4QH, ipv3_a4QI #) ->
case x1_X5Id of wild_X1e {
__DEFAULT -> a3_s7ME (+# wild_X1e 1) ipv2_a4QH;
99 -> (# ipv2_a4QH, () #)
}
}
}
}; } in
case a3_s7ME 0 eta_B1 of _ { (# ipv2_a4QH, ipv3_a4QI #) ->
case x_a5Ho of wild_X1a {
__DEFAULT -> a2_s7MJ (+# wild_X1a 1) ipv2_a4QH;
99999 -> (# ipv2_a4QH, () #)
}
}; } in
a2_s7MJ 0 (ipv6_a4Hv `cast` ...)
}
};
True ->
case error
(unpackAppendCString#
"Primitive.basicUnsafeNew: length to large: "#
(case $wshowSignedInt 0 10000 ([])
of _ { (# ww5_a5wm, ww6_a5wn #) ->
: ww5_a5wm ww6_a5wn
}))
of wild_00 {
}
}
}
main :: IO ()
main = main1 `cast` ...
main2 :: State# RealWorld -> (# State# RealWorld, () #)
main2 = runMainIO1 (main1 `cast` ...)
main :: IO ()
main = main2 `cast` ...
我们还可以通过以下方式很好地演示帧的分配。让我们更改
test_a
:
test_a mvec =
forM_ [0..times-1] $ \ n ->
forM_ [0..side-1] $ \ y ->
forM_ [0..side-50] $ \ x -> -- change here
MV.unsafeWrite mvec (y*side+x) 1
现在堆分配保持完全相同,因为最里面的循环是尾部递归的,并且使用单个帧。通过以下更改,堆分配减半(至124921008字节),因为我们推送和弹出的帧数减半:
test_a mvec =
forM_ [0..times-1] $ \ n ->
forM_ [0..side-50] $ \ y -> -- change here
forM_ [0..side-1] $ \ x ->
MV.unsafeWrite mvec (y*side+x) 1
而是编译为在单个堆栈框架内使用嵌套case构造的代码,并遍历索引以查看哪个索引应该递增。以下内容请参见核心:
{-# LANGUAGE BangPatterns #-} -- later I'll talk about this
{-# OPTIONS_GHC -fno-full-laziness #-}
main = do
let vec = V.generate (side*side) (const 0)
!mvec <- V.unsafeThaw vec :: IO (MV.MVector (PrimState IO) Int)
test_c mvec
瞧
main1 :: State# RealWorld -> (# State# RealWorld, () #)
main1 =
\ (s_a5Iw :: State# RealWorld) ->
case divInt# 9223372036854775807 8 of ww4_a5vT { __DEFAULT ->
-- start of vector creation ----------------------
case tagToEnum# (># 10000 ww4_a5vT) of _ {
False ->
case newByteArray# 80000 (s_a5Iw `cast` ...)
of _ { (# ipv_a5g3, ipv1_a5g4 #) ->
letrec {
$s$wa_s8ji
:: Int#
-> Int#
-> State# (PrimState IO)
-> (# State# (PrimState IO), Int #)
$s$wa_s8ji =
\ (sc_s8je :: Int#)
(sc1_s8jf :: Int#)
(sc2_s8jh :: State# (PrimState IO)) ->
case tagToEnum# (<# sc1_s8jf 10000) of _ {
False -> (# sc2_s8jh, I# sc_s8je #);
True ->
case writeIntArray# ipv1_a5g4 sc_s8je 0 (sc2_s8jh `cast` ...)
of s'#_a5GP { __DEFAULT ->
$s$wa_s8ji (+# sc_s8je 1) (+# sc1_s8jf 1) (s'#_a5GP `cast` ...)
}
}; } in
case $s$wa_s8ji 0 0 (ipv_a5g3 `cast` ...)
of _ { (# ipv6_a4MX, ipv7_a4MY #) ->
case ipv7_a4MY of _ { I# dt4_a5xy ->
-- end of vector creation
letrec {
a2_s7Q6 :: Int# -> State# RealWorld -> (# State# RealWorld, () #)
a2_s7Q6 =
\ (x_a5HT :: Int#) (eta_B1 :: State# RealWorld) ->
letrec {
a3_s7Q5 :: Int# -> State# RealWorld -> (# State# RealWorld, () #)
a3_s7Q5 =
\ (x1_X5J9 :: Int#) (eta1_XP :: State# RealWorld) ->
letrec {
a4_s7MZ :: Int# -> State# RealWorld -> (# State# RealWorld, () #)
a4_s7MZ =
\ (x2_X5Jl :: Int#) (s1_X4Xb :: State# RealWorld) ->
case writeIntArray#
(ipv1_a5g4 `cast` ...)
(+# (*# x1_X5J9 100) x2_X5Jl)
1
(s1_X4Xb `cast` ...)
of s'#_a5GP { __DEFAULT ->
-- the interesting part! ------------------
case x2_X5Jl of wild_X1y {
__DEFAULT -> a4_s7MZ (+# wild_X1y 1) (s'#_a5GP `cast` ...);
99 ->
case x1_X5J9 of wild1_X1o {
__DEFAULT -> a3_s7Q5 (+# wild1_X1o 1) (s'#_a5GP `cast` ...);
99 ->
case x_a5HT of wild2_X1c {
__DEFAULT -> a2_s7Q6 (+# wild2_X1c 1) (s'#_a5GP `cast` ...);
99999 -> (# s'#_a5GP `cast` ..., () #)
}
}
}
}; } in
a4_s7MZ 0 eta1_XP; } in
a3_s7Q5 0 eta_B1; } in
a2_s7Q6 0 (ipv6_a4MX `cast` ...)
}
}
};
True ->
case error
(unpackAppendCString#
"Primitive.basicUnsafeNew: length to large: "#
(case $wshowSignedInt 0 10000 ([])
of _ { (# ww5_a5wO, ww6_a5wP #) ->
: ww5_a5wO ww6_a5wP
}))
of wild_00 {
}
}
}
main :: IO ()
main = main1 `cast` ...
main2 :: State# RealWorld -> (# State# RealWorld, () #)
main2 = runMainIO1 (main1 `cast` ...)
main :: IO ()
main = main2 `cast` ...
我不得不承认,我基本上不知道为什么有些代码避免创建堆栈帧,而有些则没有。我怀疑从“内部”向外内联会有所帮助,并且快速检查告诉我
Control. Monad. Loop
使用CPS编码,这在这里可能是相关的,尽管Monad. Loop
解决方案对让浮动很敏感,并且我无法在Core的短时间通知下确定为什么test_c
与让浮动无法在单个堆栈帧中运行。
现在,在单堆栈帧中运行的性能优势很小。我们已经看到,test\u b只比test\u a快一点点。我在答案中加入了这一迂回,因为我觉得它很有启发性。
所谓的状态黑客使得GHC在嵌入IO和ST操作时具有攻击性。我想我应该在这里提到它,因为除了让浮动之外,这是另一件会彻底破坏性能的事情。
状态黑客通过优化启用,可能会逐渐减慢程序的运行速度。里德·巴顿的一个简单例子:
import Control.Monad
import Debug.Trace
expensive :: String -> String
expensive x = trace "$$$" x
main :: IO ()
main = do
str <- fmap expensive getLine
replicateM_ 3 $ print str
使用GHC-7.10.2,这会打印
"$$$"
一次而不进行优化,但使用-O2
打印三次。似乎使用GHC-7.10,我们无法使用-fno-state-hack
摆脱这种行为(这是Reid Barton链接票证的主题)。
严格的一元绑定可靠地解决了此问题:
main :: IO ()
main = do
!str <- fmap expensive getLine
replicateM_ 3 $ print str
我认为在IO和ST中进行严格绑定是一个好习惯。我有一些经验(虽然不是确定的,但我远不是GHC专家),如果我们使用的不是完全惰性,那么特别需要严格绑定。显然,完全的懒惰可以帮助消除由状态黑客引起的内联所带来的一些工作重复;没有完全的惰性,省略了对
的严格绑定!mvec公司
问题内容: 我正在使用multiprocessor.Pool()模块来加速“令人尴尬的并行”循环。其实我有一个嵌套的循环,现在用multiprocessor.Pool加快内循环。例如,在不并行化循环的情况下,我的代码如下: 使用并行化: 我的主要问题是这是否正确,我应该在循环内包含multiprocessing.Pool(),或者是否应该在循环外创建池,即: 另外,我不知道我是否应该包括线“poo
在学习Java 8 streams和lambas时,我尝试用streams替换以下嵌套for循环: 循环迭代“ProvidedService”对象的列表,对于每个对象,循环遍历“Desk”对象的列表属性,并将“Id”字段提取到列表中。 我使用streams生成了以下代码: 这是正确/最佳的方法吗?或者有没有一种方法可以在没有第二个嵌套流的情况下实现这一点?
问题内容: 嵌套的RAL居? 假设我正在尝试做Spark中的“嵌套循环”。就像普通语言一样,假设我在内部循环中有一个例程,该例程以Pi Average Spark示例 的方式估算Pi(请参见估算Pi) 我可以在Spark中嵌套并行化调用吗?我正在尝试,但还没有解决。乐于张贴错误和代码,但我想我要问一个更概念性的问题,即这是否是Spark中的正确方法。 我已经可以并行化一个Spark实例/ Pi估计
问题内容: 在学习Java 8流和lambas时,我尝试用流替换以下嵌套的for循环: 该循环迭代“ ProvidedService”对象的列表,并针对每个对象循环访问“ Desk”对象的list属性,并将“ Id”字段提取到列表中。 我想出了以下使用stream的代码: 这是正确/最佳的方式吗?还是有没有第二个嵌套流的方法来执行此操作? 问题答案: 我可能会这样写:
我正在考虑创建一个包含一对多和多对多元素的模式。由于我在这方面有点新手,我搜索了一些资源,发现了两种类型的示例;1.将下层对象的内容放入上层,然后进行处理。第二种类型是创建外键,您认为哪一种更正确? 1. 这两个选项中,哪一个是正确且有效的? 另外,这是我想要创建的数据库模式。。你觉得我该怎么办。。请用java展示我不认识kotlin的例子
问题内容: 打破Javascript中的嵌套循环的最佳方法是什么? 问题答案: 就像Perl一样 如EMCA-262第12.12节所定义。[MDN文件] 与C不同,这些标签只能用于和,而Javascript没有。