This work is supported by Continuum Analytics and the XDATA Program as part of the Blaze Project
tl;dr We use dask to build a parallel Python list.
Introduction#
This is the seventh in a sequence of posts constructing an out-of-core nd-array using NumPy, and dask. You can view these posts here:
Today we take a break from ND-Arrays and show how task scheduling can attack
other collections like the simple list of Python objects.
Unstructured Data#
Often before we have an ndarray or a table/DataFrame we have unstructured
data like log files. In this case tuned subsets of the language (e.g.
numpy/pandas) aren’t sufficient, we need the full Python language.
My usual approach to the inconveniently large dump of log files is to use
Python streaming
iterators
along with multiprocessing or IPython
Parallel on a single
large machine. I often write/speak about this workflow when discussing
toolz.
This workflow grows complex for most users when you introduce many processes.
To resolve this we build our normal tricks into a new dask.Bag collection.
Bag#
In the same way that dask.array mimics NumPy operations (e.g. matrix
multiply, slicing), dask.bag mimics functional operations like map,
filter, reduce found in the standard library as well as many of the
streaming functions found in toolz.
Dask array = NumPy + threads
Dask bag = Python/Toolz + processes
Example#
Here’s the obligatory wordcount example
>>> from dask.bag import Bag
>>> b = Bag.from_filenames('data/*.txt')
>>> def stem(word):
... """ Stem word to primitive form """
... return word.lower().rstrip(",.!:;'-\"").lstrip("'\"")
>>> dict(b.map(str.split).map(concat).map(stem).frequencies())
{...}
We use all of our cores and stream through memory on each core. We use
multiprocessing but could get fancier with some work.
Design#
As before, a Bag is just a dict holding tasks, along with a little meta data.
>>> d = {('x', 0): (range, 5),
... ('x', 1): (range, 5),
... ('x', 2): (range, 5)}
>>> from dask.bag import Bag
>>> b = Bag(d, 'x', npartitions=3)
In this way we break up one collection
[0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 0, 1, 2, 3, 4]
into three independent pieces
[0, 1, 2, 3, 4]
[0, 1, 2, 3, 4]
[0, 1, 2, 3, 4]
When we abstractly operate on the large collection…
>>> b2 = b.map(lambda x: x * 10)
… we generate new tasks to operate on each of the components.
>>> b2.dask
{('x', 0): (range, 5),
('x', 1): (range, 5),
('x', 2): (range, 5)}
('bag-1', 0): (map, lambda x: x * 10, ('x', 0)),
('bag-1', 1): (map, lambda x: x * 10, ('x', 1)),
('bag-1', 2): (map, lambda x: x * 10, ('x', 2))}
And when we ask for concrete results (the call to list) we spin up a
scheduler to execute the resulting dependency graph of tasks.
>>> list(b2)
[0, 10, 20, 30, 40, 0, 10, 20, 30, 40, 0, 10, 20, 30, 40]
More complex operations yield more complex dasks. Beware, dask code is pretty Lispy. Fortunately these dasks are internal; users don’t interact with them.
>>> iseven = lambda x: x % 2 == 0
>>> b3 = b.filter(iseven).count().dask
{'bag-3': (sum, [('bag-2', 1), ('bag-2', 2), ('bag-2', 0)]),
('bag-2', 0): (count,
(filter, iseven, (range, 5))),
('bag-2', 1): (count,
(filter, iseven, (range, 5))),
('bag-2', 2): (count,
(filter, iseven, (range, 5)))}
The current interface for Bag has the following operations:
all frequencies min
any join product
count map std
filter map_partitions sum
fold max topk
foldby mean var
Manipulations of bags create task dependency graphs. We eventually execute these graphs in parallel.
Execution#
We repurpose the threaded scheduler we used for arrays to support
multiprocessing to provide parallelism even on pure Python code. We’re
careful to avoid unnecessary data transfer. None of the operations listed above
requires significant communication. Notably we don’t have any concept of
shuffling or scatter/gather.
We use dill to
take care to serialize functions properly and collect/report errors, two issues
that plague naive use of
multiprocessing in Python.
>>> list(b.map(lambda x: x * 10)) # This works!
[0, 10, 20, 30, 40, 0, 10, 20, 30, 40, 0, 10, 20, 30, 40]
>>> list(b.map(lambda x: x / 0)) # This errs gracefully!
ZeroDivisionError: Execption in remote Process
integer division or modulo by zero
Traceback:
...
These tricks remove need for user expertise.
Productive Sweet Spot#
I think that there is a productive sweet spot in the following configuration
Pure Python functions
Streaming/lazy data
Multiprocessing
A single large machine or a few machines in an informal cluster
This setup is common and it’s capable of handling terabyte scale workflows. In my brief experience people rarely take this route. They use single-threaded in-memory Python until it breaks, and then seek out Big Data Infrastructure like Hadoop/Spark at relatively high productivity overhead.
Your workstation can scale bigger than you think.
Example#
Here is about a gigabyte of network flow data, recording which computers made connections to which other computers on the UC-Berkeley campus in 1996.
846890339:661920 846890339:755475 846890340:197141 168.237.7.10:1163 83.153.38.208:80 2 8 4294967295 4294967295 846615753 176 2462 39 GET 21068906053917068819..html HTTP/1.0
846890340:989181 846890341:2147 846890341:2268 13.35.251.117:1269 207.83.232.163:80 10 0 842099997 4294967295 4294967295 64 1 38 GET 20271810743860818265..gif HTTP/1.0
846890341:80714 846890341:90331 846890341:90451 13.35.251.117:1270 207.83.232.163:80 10 0 842099995 4294967295 4294967295 64 1 38 GET 38127854093537985420..gif HTTP/1.0
This is actually relatively clean. Many of the fields are space delimited (not all) and I’ve already compiled and run the decade old C-code needed to decompress it from its original format.
Lets use Bag and regular expressions to parse this.
In [1]: from dask.bag import Bag, into
In [2]: b = Bag.from_filenames('UCB-home-IP*.log')
In [3]: import re
In [4]: pattern = """
...: (?P<request_time>\d+:\d+)
...: (?P<response_start>\d+:\d+)
...: (?P<response_end>\d+:\d+)
...: (?P<client_ip>\d+\.\d+\.\d+\.\d+):(?P<client_port>\d+)
...: (?P<server_ip>\d+\.\d+\.\d+\.\d+):(?P<server_port>\d+)
...: (?P<client_headers>\d+)
...: (?P<server_headers>\d+)
...: (?P<if_modified_since>\d+)
...: (?P<response_header_length>\d+)
...: (?P<response_data_length>\d+)
...: (?P<request_url_length>\d+)
...: (?P<expires>\d+)
...: (?P<last_modified>\d+)
...: (?P<method>\w+)
...: (?P<domain>\d+..)\.(?P<extension>\w*)(?P<rest_of_url>\S*)
...: (?P<protocol>.*)""".strip().replace('\n', '\s+')
In [5]: prog = re.compile(pattern)
In [6]: records = b.map(prog.match).map(lambda m: m.groupdict())
This returns instantly. We only compute results when necessary. We trigger
computation by calling list.
In [7]: list(records.take(1))
Out[7]:
[{'client_headers': '2',
'client_ip': '168.237.7.10',
'client_port': '1163',
'domain': '21068906053917068819.',
'expires': '2462',
'extension': 'html',
'if_modified_since': '4294967295',
'last_modified': '39',
'method': 'GET',
'protocol': 'HTTP/1.0',
'request_time': '846890339:661920',
'request_url_length': '176',
'response_data_length': '846615753',
'response_end': '846890340:197141',
'response_header_length': '4294967295',
'response_start': '846890339:755475',
'rest_of_url': '',
'server_headers': '8',
'server_ip': '83.153.38.208',
'server_port': '80'}]
Because bag operates lazily this small result also returns immediately.
To demonstrate depth we find the ten client/server pairs with the most connections.
In [8]: counts = records.pluck(['client_ip', 'server_ip']).frequencies()
In [9]: %time list(counts.topk(10, key=lambda x: x[1]))
CPU times: user 11.2 s, sys: 1.15 s, total: 12.3 s
Wall time: 50.4 s
Out[9]:
[(('247.193.34.56', '243.182.247.102'), 35353),
(('172.219.28.251', '47.61.128.1'), 22333),
(('240.97.200.0', '108.146.202.184'), 17492),
(('229.112.177.58', '47.61.128.1'), 12993),
(('146.214.34.69', '119.153.78.6'), 12554),
(('17.32.139.174', '179.135.20.36'), 10166),
(('97.166.76.88', '65.81.49.125'), 8155),
(('55.156.159.21', '157.229.248.255'), 7533),
(('55.156.159.21', '124.77.75.86'), 7506),
(('55.156.159.21', '97.5.181.76'), 7501)]
Comparison with Spark#
First, it is silly and unfair to compare with PySpark running locally. PySpark offers much more in a distributed context.
In [1]: import pyspark
In [2]: sc = pyspark.SparkContext('local')
In [3]: from glob import glob
In [4]: filenames = sorted(glob('UCB-home-*.log'))
In [5]: rdd = sc.parallelize(filenames, numSlices=4)
In [6]: import re
In [7]: pattern = ...
In [8]: prog = re.compile(pattern)
In [9]: lines = rdd.flatMap(lambda fn: list(open(fn)))
In [10]: records = lines.map(lambda line: prog.match(line).groupdict())
In [11]: ips = records.map(lambda rec: (rec['client_ip'], rec['server_ip']))
In [12]: from toolz import topk
In [13]: %time dict(topk(10, ips.countByValue().items(), key=1))
CPU times: user 1.32 s, sys: 52.2 ms, total: 1.37 s
Wall time: 1min 21s
Out[13]:
{('146.214.34.69', '119.153.78.6'): 12554,
('17.32.139.174', '179.135.20.36'): 10166,
('172.219.28.251', '47.61.128.1'): 22333,
('229.112.177.58', '47.61.128.1'): 12993,
('240.97.200.0', '108.146.202.184'): 17492,
('247.193.34.56', '243.182.247.102'): 35353,
('55.156.159.21', '124.77.75.86'): 7506,
('55.156.159.21', '157.229.248.255'): 7533,
('55.156.159.21', '97.5.181.76'): 7501,
('97.166.76.88', '65.81.49.125'): 8155}
So, given a compute-bound mostly embarrassingly parallel task (regexes are comparatively expensive) on a single machine they are comparable.
Reasons you would want to use Spark
You want to use many machines and interact with HDFS
Shuffling operations
Reasons you would want to use dask.bag
Trivial installation
No mucking about with JVM heap sizes or config files
Nice error reporting. Spark error reporting includes the typical giant Java Stack trace that comes with any JVM solution.
Easier/simpler for Python programmers to hack on. The implementation is 350 lines including comments.
Again, this is really just a toy experiment to show that the dask model isn’t just about arrays. I absolutely do not want to throw Dask in the ring with Spark.
Conclusion#
However I do want to stress the importance of single-machine parallelism. Dask.bag targets this application well and leverages common hardware in a simple way that is both natural and accessible to most Python programmers.
A skilled developer could extend this to work in a distributed memory context. The logic to create the task dependency graphs is separate from the scheduler.
Special thanks to Erik Welch for finely crafting the dask optimization passes that keep the data flowly smoothly.
Comments
comments powered by Disqus