Tagged: parallel procesing

Parallel Processing with Catmandu

In this blog post I’ll show a technique to scale out your data processing with Catmandu. All catmandu scripts use a single process, in a single thread. This means that if you need to process 2 times as much data , you need 2 times at much time. Running a catmandu convert command with the -v option will show you the speed of a typical conversion:

$ catmandu convert -v MARC to JSON --fix heavy_load.fix < input.marc > output.json
added       100 (55/sec)
added       200 (76/sec)
added       300 (87/sec)
added       400 (92/sec)
added       500 (90/sec)
added       600 (94/sec)
added       700 (97/sec)
added       800 (97/sec)
added       900 (96/sec)
added      1000 (97/sec)

In the example above we process an ‘input.marc’ MARC file into a ‘output.json’ JSON file with some difficult data cleaning in the ‘heave_load.fix’ Fix script. Using a single process we can reach about 97 records per second. It would take 2.8 hours to process one million records and 28 hours to process ten million records.

Can we make this any faster?

When you buy a computer they are all equipped with multiple processors. Using a single process, only one of these processors are used for calculations. One would get much ‘bang for the buck’  if all the processors could be used. One technique to do that is called ‘parallel processing’.

To check the amount of processors available on your machine use the file ‘/proc/cpuinfo’: on your Linux system:

$ cat /proc/cpuinfo | grep processor
processor   : 0
processor   : 1

The example above  shows two lines: I have two cores available to do processing on my laptop. In my library we have servers which contain  4 , 8 , 16 or more processors. This means that if we could do our calculations in a smart way then our processing could be 2, 4, 8 or 16 times as fast (in principle).

To check if your computer  is using all that calculating power, use the ‘uptime’ command:

$ uptime
11:15:21 up 622 days,  1:53,  2 users,  load average: 1.23, 1.70, 1.95

In the example above I ran did ‘uptime’ on one of our servers with 4 processors. It shows a load average of about 1.23 to 1.95. This means that in the last 15 minutes between 1 and 2 processors where being used and the other two did nothing. If the load average is less than the number of cores (4 in our case) it means: the server is waiting for input. If the load average is equal to the number of cores  it means: the server  is using all the CPU power available. If the load is bigger than the number of cores, then there is more work available than can be executed by the machine, some processes need to wait.

Now you know some Unix commands we can start using the processing power available on your machine. In my examples I’m going to use a Unix tool called ‘GNU parallel’ to run Catmandu  scripts on all the processors in my machine in the most efficient way possible. To do this you need to install GNU parallel:

sudo yum install parallel

The second ingredient we need is a way to cut our input data into many parts. For instance if we have a 4 processor machine we would like to create 4 equal chunks of data to process in parallel. There are very many ways to cut your data in to many parts. I’ll show you a trick we use in at Ghent University library with help of a MongoDB installation.

First install, MongoDB and the MongoDB catmandu plugins (these examples are taken from our CentOS documentation):

$ sudo cat > /etc/yum.repos.d/mongodb.repo <<EOF
[mongodb]
baseurl=http://downloads-distro.mongodb.org/repo/redhat/os/x86_64
gpgcheck=0
enabled=1
name=MongoDB.org repository
EOF

$ sudo yum install -y mongodb-org mongodb-org-server mongodb-org-shell mongodb-org-mongos mongodb-org-tools
$ sudo cpanm Catmandu::Store::MongoDB

Next, we are going to store our input data in a MongoDB database with help of a Catmandu Fix script that adds some random numbers the data:

$ catmandu import MARC to MongoDB --database_name data --fix random.fix < input.marc

With the ‘random.fix’ like:


random("part.rand2","2")
random("part.rand4","4")
random("part.rand8","8")
random("part.rand16","16")
random("part.rand32","32")

The ‘random()’ Fix function will be available in Catmandu 1.003 but can also be downloaded here (install it in a directory ‘lib/Catmandu/Fix’). This will will make sure that every record in your input file contains four random numbers ‘part.rand2’, ‘part.rand4′ ,’part.rand8′,’part.rand16′,’part.rand32’. This will makes it possible to chop your data into two, four, eight, sixteen or thirty-two parts depending on the number of processors you have in your machine.

To access one chunk of your data the ‘catmandu export’ command can be used with a query. For instance, to export two equal chunks  do:

$ catmandu export MongoDB --database_name -q '{"part.rand2":0}' > part1
$ catmandu export MongoDB --database_name -q '{"part.rand2":1}' > part2

We are going to use these catmandu commands in a Bash script which makes use of GNU parallel run many conversions simultaneously.

#!/bin/bash
# file: parallel.sh
CPU=$1

if [ "${CPU}" == "" ]; then
    /usr/bin/parallel -u $0 {} <<EOF
0
1
EOF
elif [ "${CPU}" != "" ]; then
     catmandu export MongoDB --database_name data -q "{\"part.rand2\":${CPU}}" to JSON --line_delimited 1 --fix heavy_load.fix > result.${CPU}.json
fi

This example script above shows how a conversion process could run on a 2-processor machine. The lines with ‘/usr/bin/parallel’ show how GNU parallel is used to call this script with two arguments ‘0’ and ‘1’ (for the 2-processor example). In the lines with ‘catmandu export’ shows how chunks of data are read from the database and processed with the ‘heavy_load.fix’ Fix script.

If you have a 32-processor machine, you would need to provide parallel an input which contains the numbers 0,1,2 to 31 and change the query to ‘part.rand32’.

GNU parallel is a very powerfull command. It gives the opportunity to run many processes in parallel and even to spread out the load over many machines if you have a cluster. When all these machines have access to your MongoDB database then all can receive chunks of data to be processed. The only task left is to combine all results which can be as easy as a simple ‘cat’ command:

$ cat result.*.json > final_result.json