This episode is freely available thanks to the support of our subscribers

Subscribers get exclusive access to new and all previous subscriber-only episodes, video downloads, and 30% discount for team members. Become a Subscriber

We implement a concurrent version of the map method for arrays.

00:06 Today we're going to parse a list of GPX files and see how we can make this process faster. Without going into how the parsing actually works, we'll first parse all the files, one by one, and see how long it takes.

00:33 We have a bunch of URLs pointing to GPX files in a local directory. We parse each file to collect its number of waypoints. In the process, we can force-unwrap the Parser, which would only be nil if the given URL were invalid:

let pointCounts = urls.map { url in
    Parser(url: url)!.points.count
}

01:25 As a test for any upcoming refactoring, we add up the number of points from all GPX files. This way, we'll be able to verify that we're not breaking the parsing later on:

print(pointCounts.reduce(0, +))
// 44467

02:05 We've prepared a time function to measure how long the parsing takes. It prints the measured time to the console:

time {
    let pointCounts = urls.map { url in
        Parser(url: url)!.points.count
    }
    print(pointCounts.reduce(0, +))
}
/*
44467
1.322891076 sec
*/

Process in Parallel

02:30 With the map function, we're reading the GPX files one after another. However, reading one file doesn't depend on reading another, so we could easily process multiple files in parallel, which should improve the speed of parsing the entire collection.

02:55 But we can't process the URLs in parallel as long as we use map, so we have to write the code differently. Instead, we'll work with an empty result array of integers and — in a concurrent way — loop over the URLs, writing each result into the result array:

var result: [Int] = []

03:47 DispatchQueue.concurrentPerform — formerly known as dispatch_apply — lets us specify an amount of iterations and a block that's called with each iteration. We set the amount of iterations to be the length of the URL array, which means that the current iteration number, passed into the block, is an index into the array:

DispatchQueue.concurrentPerform(iterations: urls.count) { idx in
    // process urls[idx] into result[idx]
}

04:28 Now we can assign the result of a single iteration into the result array. This means we have to preallocate the result array so that it has the same length as the URL array:

var result = Array<Int?>(repeating: nil, count: urls.count)
DispatchQueue.concurrentPerform(iterations: urls.count) { idx in
    let url = urls[idx]
    let pointCount = Parser(url: url)!.points.count
    result[idx] = pointCount
}

06:14 The result array contains optional integers, and after the parsing, all its elements are set to a value. So, before we can reduce the array to add up the point counts, we have to force-unwrap each optional count:

print(result.map { $0! }.reduce(0, +))

06:39 This new version of the parsing takes 0.359 seconds. That's much faster, but the code isn't yet correct. It could crash, because concurrently modifying a variable isn't allowed. So, like we've done in the past, we can use a dispatch queue to safeguard the access to a variable:

var result = Array<Int?>(repeating: nil, count: urls.count)
let q = DispatchQueue(label: "sync queue")
DispatchQueue.concurrentPerform(iterations: urls.count) { idx in
    // ...
    q.sync { result[idx] = pointCount }
}

08:13 This way, we make sure that only one task at a time can access the result variable.

concurrentMap

08:44 We've improved the performance, but our code is now more verbose. The first version, using map, was much cleaner. Let's see if we can get this simplicity back by creating a concurrentMap function, which has the same interface as map.

09:37 We could write concurrentMap for any collection, as long as it can give us a count and we have a way to randomly access the elements by index. On top of the implementation below, the normal map also supports throwing functions, but we'll skip that functionality for now:

extension Array {
    func concurrentMap<B>(_ transform: @escaping (Element) -> B) -> [B] {
        var result = Array<B?>(repeating: nil, count: count)
        let q = DispatchQueue(label: "sync queue")
        DispatchQueue.concurrentPerform(iterations: count) { idx in
            let element = self[idx]
            q.sync {
                result[idx] = transform(element)
            }
        }
        return result.map { $0! }
    }
}

11:32 Now we can call concurrentMap, and we no longer have to force-unwrap the elements of result, because it's nicely hidden inside concurrentMap:

time {
    let result = urls.concurrentMap { Parser(url: $0)!.points.count }
    print(result.reduce(0, +))
}
/*
44467
1.370519089 sec
*/

12:34 It works, but we're back at the old, slow speed! We made a little mistake in concurrentMap, because we're calling the transform function inside the synchronous dispatch to the serial queue. That means all the parsing is done serially again. To fix this bug, we move the call to the transform function outside the sync block:

extension Array {
    func concurrentMap<B>(_ transform: @escaping (Element) -> B) -> [B] {
        var result = Array<B?>(repeating: nil, count: count)
        let q = DispatchQueue(label: "sync queue")
        DispatchQueue.concurrentPerform(iterations: count) { idx in
            let element = self[idx]
            let transformed = transform(element)
            q.sync {
                result[idx] = transformed
            }
        }
        return result.map { $0! }
    }
}
/*
44467
0.418863306 sec
*/

13:30 The parsing happens concurrently again, and we're back at roughly the same improved speed.

13:39 The code at the call site is much cleaner now: we're simply calling concurrentMap instead of map. This isn't necessarily a solution for everything; we don't need concurrency whenever we call map. In this case, it works nicely, because we have a good amount of expensive operations, so it pays off to perform them concurrently. But if we just want to map over an array of integers and do simple calculations, the overhead of concurrency doesn't make much sense.

Abstracting Out ThreadSafe

14:41 There's one more thing to improve. We're using a serial queue to guard access to the result variable, but we can abstract this out. In fact, we did this before in an episode about thread safety.

15:26 We create a generic class, ThreadSafe, that will hold any value and provide a thread-safe way to access that value:

final class ThreadSafe<A> {
    private var _value: A
    private let queue = DispatchQueue(label: "ThreadSafe")
    init(_ value: A) {
        self._value = value
    }

    var value: A {
        return queue.sync { _value }
    }
}

17:03 In addition to getting the value out, we also need a way to mutate the value. The most obvious way would be to define a setter that, like the getter, dispatches to the serial queue. But that's a bad idea, as a couple of viewers pointed out to us last time; doing something like x.value += 1 can cause issues, because reading the value and writing to it are operations performed separately on the private queue of the thread-safe x:

final class ThreadSafe<A> {
    // ...

    func atomically(_ transform: (inout A) -> ()) {
        queue.sync {
            transform(&self._value)
        }
    }
}

19:06 We can now get rid of the queue in concurrentMap, and instead wrap the result in a ThreadSafe instance:

extension Array {
    func concurrentMap<B>(_ transform: @escaping (Element) -> B) -> [B] {
        let result = ThreadSafe(Array<B?>(repeating: nil, count: count))
        DispatchQueue.concurrentPerform(iterations: count) { idx in
            let element = self[idx]
            let transformed = transform(element)
            result.atomically {
                $0[idx] = transformed
            }
        }
        return result.value.map { $0! }

    }
}

20:10 This seems to work well and we've cleaned up concurrentMap. It's a very small improvement, but abstracting out the thread safety means we have one less thing to worry about. And we're able to use ThreadSafe in other places too.

20:42 We could also consider a version of atomically that writes to the value asynchronously. In our case, this wouldn't work because we want to return from concurrentMap and assure the caller that the work is done. Using sync is therefore the better option. However, the downside of sync is that we have to be careful and think about deadlocks, but in this case, we're certain the code is correct.

21:27 We've made it possible to apply a minimal change at the call site — switching from map to concurrentMap — and achieve a drastic performance improvement for parsing our GPX files. We're calling it a day at this point, but in the future, we could also make concurrentMap work for Collection and add support for error throwing.