Category: Advanced

Parallel Processing with Catmandu

In this blog post I’ll show a technique to scale out your data processing with Catmandu. All catmandu scripts use a single process, in a single thread. This means that if you need to process 2 times as much data , you need 2 times at much time. Running a catmandu convert command with the -v option will show you the speed of a typical conversion:

$ catmandu convert -v MARC to JSON --fix heavy_load.fix < input.marc > output.json
added       100 (55/sec)
added       200 (76/sec)
added       300 (87/sec)
added       400 (92/sec)
added       500 (90/sec)
added       600 (94/sec)
added       700 (97/sec)
added       800 (97/sec)
added       900 (96/sec)
added      1000 (97/sec)

In the example above we process an ‘input.marc’ MARC file into a ‘output.json’ JSON file with some difficult data cleaning in the ‘heave_load.fix’ Fix script. Using a single process we can reach about 97 records per second. It would take 2.8 hours to process one million records and 28 hours to process ten million records.

Can we make this any faster?

When you buy a computer they are all equipped with multiple processors. Using a single process, only one of these processors are used for calculations. One would get much ‘bang for the buck’  if all the processors could be used. One technique to do that is called ‘parallel processing’.

To check the amount of processors available on your machine use the file ‘/proc/cpuinfo’: on your Linux system:

$ cat /proc/cpuinfo | grep processor
processor   : 0
processor   : 1

The example above  shows two lines: I have two cores available to do processing on my laptop. In my library we have servers which contain  4 , 8 , 16 or more processors. This means that if we could do our calculations in a smart way then our processing could be 2, 4, 8 or 16 times as fast (in principle).

To check if your computer  is using all that calculating power, use the ‘uptime’ command:

$ uptime
11:15:21 up 622 days,  1:53,  2 users,  load average: 1.23, 1.70, 1.95

In the example above I ran did ‘uptime’ on one of our servers with 4 processors. It shows a load average of about 1.23 to 1.95. This means that in the last 15 minutes between 1 and 2 processors where being used and the other two did nothing. If the load average is less than the number of cores (4 in our case) it means: the server is waiting for input. If the load average is equal to the number of cores  it means: the server  is using all the CPU power available. If the load is bigger than the number of cores, then there is more work available than can be executed by the machine, some processes need to wait.

Now you know some Unix commands we can start using the processing power available on your machine. In my examples I’m going to use a Unix tool called ‘GNU parallel’ to run Catmandu  scripts on all the processors in my machine in the most efficient way possible. To do this you need to install GNU parallel:

sudo yum install parallel

The second ingredient we need is a way to cut our input data into many parts. For instance if we have a 4 processor machine we would like to create 4 equal chunks of data to process in parallel. There are very many ways to cut your data in to many parts. I’ll show you a trick we use in at Ghent University library with help of a MongoDB installation.

First install, MongoDB and the MongoDB catmandu plugins (these examples are taken from our CentOS documentation):

$ sudo cat > /etc/yum.repos.d/mongodb.repo <<EOF
[mongodb]
baseurl=http://downloads-distro.mongodb.org/repo/redhat/os/x86_64
gpgcheck=0
enabled=1
name=MongoDB.org repository
EOF

$ sudo yum install -y mongodb-org mongodb-org-server mongodb-org-shell mongodb-org-mongos mongodb-org-tools
$ sudo cpanm Catmandu::Store::MongoDB

Next, we are going to store our input data in a MongoDB database with help of a Catmandu Fix script that adds some random numbers the data:

$ catmandu import MARC to MongoDB --database_name data --fix random.fix < input.marc

With the ‘random.fix’ like:


random("part.rand2","2")
random("part.rand4","4")
random("part.rand8","8")
random("part.rand16","16")
random("part.rand32","32")

The ‘random()’ Fix function will be available in Catmandu 1.003 but can also be downloaded here (install it in a directory ‘lib/Catmandu/Fix’). This will will make sure that every record in your input file contains four random numbers ‘part.rand2’, ‘part.rand4′ ,’part.rand8′,’part.rand16′,’part.rand32’. This will makes it possible to chop your data into two, four, eight, sixteen or thirty-two parts depending on the number of processors you have in your machine.

To access one chunk of your data the ‘catmandu export’ command can be used with a query. For instance, to export two equal chunks  do:

$ catmandu export MongoDB --database_name -q '{"part.rand2":0}' > part1
$ catmandu export MongoDB --database_name -q '{"part.rand2":1}' > part2

We are going to use these catmandu commands in a Bash script which makes use of GNU parallel run many conversions simultaneously.

#!/bin/bash
# file: parallel.sh
CPU=$1

if [ "${CPU}" == "" ]; then
    /usr/bin/parallel -u $0 {} <<EOF
0
1
EOF
elif [ "${CPU}" != "" ]; then
     catmandu export MongoDB --database_name data -q "{\"part.rand2\":${CPU}}" to JSON --line_delimited 1 --fix heavy_load.fix > result.${CPU}.json
fi

This example script above shows how a conversion process could run on a 2-processor machine. The lines with ‘/usr/bin/parallel’ show how GNU parallel is used to call this script with two arguments ‘0’ and ‘1’ (for the 2-processor example). In the lines with ‘catmandu export’ shows how chunks of data are read from the database and processed with the ‘heavy_load.fix’ Fix script.

If you have a 32-processor machine, you would need to provide parallel an input which contains the numbers 0,1,2 to 31 and change the query to ‘part.rand32’.

GNU parallel is a very powerfull command. It gives the opportunity to run many processes in parallel and even to spread out the load over many machines if you have a cluster. When all these machines have access to your MongoDB database then all can receive chunks of data to be processed. The only task left is to combine all results which can be as easy as a simple ‘cat’ command:

$ cat result.*.json > final_result.json
Advertisements

Matching authors against VIAF identities

At Ghent University Library we enrich catalog records with VIAF identities to enhance the search experience in the catalog. When searching for all the books about ‘Chekov’ we want to match all name variants of this author. Consult VIAF http://viaf.org/viaf/95216565/#Chekhov,_Anton_Pavlovich,_1860-1904 and you will see many of them.

  • Chekhov
  • Čehov
  • Tsjechof
  • Txékhov
  • etc

Any of the these names variants can be available in the catalog data if authority control is not in place (or not maintained). Searching any of these names should result in results for all the variants. In the past it was a labor intensive, manual job for catalogers to maintain an authority file. Using results from Linked Data Fragments research by Ruben Verborgh (iMinds) and the Catmandu-RDF tools created by Jakob Voss (GBV) and RDF-LDF by Patrick Hochstenbach, Ghent University started an experiment to automatically enrich authors with VIAF identities. In this blog post we will report on the setup and results of this experiment which will also be reported at ELAG2015.

Context

Three ingredients are needed to create a web of data:

  1. A scalable way to produce data.
  2. The infrastructure to publish data.
  3. Clients accessing the data and reusing them in new contexts.

On the production site there doesn’t seem to be any problem creating huge datasets by libraries. Any transformation of library data to linked data will quickly generate an enormous number of RDF triples. We see this in the size of public available datasets:

Also for accessing data, from a consumers perspective the “easy” part seems to be covered. Instead of thousands of APIs available and many documents formats for any dataset, SPARQL and RDF provide the programmer a single protocol and document model.

The claim of the Linked Data Fragments researchers is that on the publication side, reliable queryable access to public Linked Data datasets largely remains problematic due to the low availability percentages of public SPARQL endpoints [Ref]. This is confirmed by the 2013 study by researchers from Pontificia Universidad Católica in Chili and National University of Ireland where more than half of the public SPARQL endpoints seem to be offline 1.5 days per month. This gives an availability rate of less than 95% [Ref].

The source of this high rate of inavailability can be traced back to the service model of Linked Data where two extremes exists to publish data (see image below).

At one side, data dumps (or dereferencing of URLs) can be made available which requires a simple HTTP server and lots of processing power on the client side. At the other side, an open SPARQL endpoint can be provided which requires a lot of processing power (hence, hardware investment) on the serverside. With SPARQL endpoints, clients can demand the execution of arbitrarily complicated queries. Furthermore, since each client requests unique, highly specific queries, regular caching mechanisms are ineffective, since they can only optimized for repeated identical requests.

This situation can be compared with providing a database SQL dump to endusers or open database connection on which any possible SQL statement can be executed. To a lesser extent libraries are well aware of the different modes of operation between running OAI-PMH services and Z39.50/SRU services.

Linked Data Fragment researchers provide a third way, Triple Pattern Fragments, to publish data which tries to provide the best of both worlds: access to a full dump of datasets while providing a queryable and cachable interface. For more information on the scalability of this solution I refer to the report  presented at the 5th International USEWOD Workshop.

The experiment

VIAF doesn’t provide a public SPARQL endpoint, but a complete dump of the data is available at http://viaf.org/viaf/data/. In our experiments we used the VIAF (Virtual International Authority File), which is made available under the ODC Attribution License.  From this dump we created a HDT database. HDT provides a very efficient format to compress RDF data while maintaining browser and search functionality. Using command line tools RDF/XML, Turtle and NTriples can be compressed into a HDT file with an index. This standalone file can be used to without the need of a database to query huge datasets. A VIAF conversion to HDT results in a 7 GB file and a 4 GB index.

Using the Linked Data Fragments server by Ruben Verborgh, available at https://github.com/LinkedDataFragments/Server.js, this HDT file can be published as a NodeJS application.

For a demonstration of this server visit the iMinds experimental setup at: http://data.linkeddatafragments.org/viaf

Using Triple Pattern Fragments a simple REST protocol is available to query this dataset. For instance it is possible to download the complete dataset using this query:


$ curl -H "Accept: text/turtle" http://data.linkeddatafragments.org/viaf

If we only want the triples concerning Chekhov (http://viaf.org/viaf/95216565) we can provide a query parameter:


$ curl -H "Accept: text/turtle" http://data.linkeddatafragments.org/viaf?subject=http://viaf.org/viaf/95216565

Likewise, using the predicate and object query any combination of triples can be requested from the server.


$ curl -H "Accept: text/turtle" http://data.linkeddatafragments.org/viaf?object="Chekhov"

The memory requirements of this server are small enough to run a copy of the VIAF database on a MacBook Air laptop with 8GB RAM.

Using specialised Triple Pattern Fragments clients, SPARQL queries can be executed against this server. For the Catmandu project we created a Perl client RDF::LDF which is integrated into Catmandu-RDF.

To request all triples from the endpoint use:


$ catmandu convert RDF --url http://data.linkeddatafragments.org/viaf --sparql 'SELECT * {?s ?p ?o}'

Or, only those Triples that are about “Chekhov”:


$ catmandu convert RDF --url http://data.linkeddatafragments.org/viaf --sparql 'SELECT * {?s ?p "Chekhov"}'

In the Ghent University experiment a more direct approach was taken to match authors to VIAF. First, as input a MARC dump from the catalog is being streamed into a Perl program using a Catmandu iterator. Then, we extract the 100 and 700 fields which contain $a (name) and $d (date) subfields. These two fields are combined in a search query, as if we would search:


Chekhov, Anton Pavlovich, 1860-1904

If there is exactly one hit in our local VIAF copy, then the result is reported. A complete script to process MARC files this way is available at a GitHub gist. To run the program against a MARC dump execute the import_viaf.pl command:


$ ./import_viaf.pl --type USMARC file.mrc
000000089-2 7001  L $$aEdwards, Everett Eugene,$$d1900- http://viaf.org/viaf/110156902
000000122-8 1001  L $$aClelland, Marjorie Bolton,$$d1912-   http://viaf.org/viaf/24253418
000000124-4 7001  L $$aSchein, Edgar H.
000000124-4 7001  L $$aKilbridge, Maurice D.,$$d1920-   http://viaf.org/viaf/29125668
000000124-4 7001  L $$aWiseman, Frederick.
000000221-6 1001  L $$aMiller, Wilhelm,$$d1869- http://viaf.org/viaf/104464511
000000256-9 1001  L $$aHazlett, Thomas C.,$$d1928-  http://viaf.org/viaf/65541341

[edit: 2017-05-18 an updated version of the code is available as a Git project https://github.com/LibreCat/MARC2RDF ]

All the authors in the MARC dump will be exported. If there is exactly one single match against VIAF it will be added to the author field. We ran this command for one night in a single thread against 338.426 authors containing a date and found 135.257 exact matches in VIAF (=40%).

In a quite recent follow up of our experiments, we investigated how LDF clients can be used in a federated setup. When combining in the LDF algorithm the triples result from many LDF servers, one SPARQL query can be run over many machines. These results are demonstrated at the iMinds demo site where a single SPARQL query can be executed over the combined VIAF and DBPedia datasets. A Perl implementation of this federated search is available in the latest version of RDF-LDF at GitHub.

We strongly believe in the success of this setup and the scalability of this solution as demonstrated by Ruben Verborgh at the USEWOD Workshop. Using Linked Data Fragments a range of solutions are available to publish data on the web. From simple data dumps to a full SPARQL endpoint any service level can be provided given the resources available. For more than a half year DBPedia has been running an LDF server with 99.9994% availability on a 8 CPU , 15 GB RAM Amazon server with 4.5 million requests. Scaling out, services such has the LOD Laundromat cleans 650.000 datasets and provides access to them using a single fat LDF server (256 GB RAM).

For more information on the Federated searches with  Linked Data Fragments  visit the blog post of Ruben Verborgh at: http://ruben.verborgh.org/blog/2015/06/09/federated-sparql-queries-in-your-browser/

Create a fixer [Part 2]

By Patrick Hochstenbach

This is part two of a two-part overview of extending the Catmandu Fix language. In Part 1, we showed how a Catmandu Fix is just a simple Perl object that has a ‘fix’ instance method. The only argument passed to the method is the Perl hash to transform; the return value should be the transformed/fixed Perl hash.

Using Moo a Fix can be written as:

package Catmandu::Fix::do_nothing;

use Moo;

sub fix {
    my ($self,$data) = @_;
    # … your fixes on $data…
    $data;
}

1;

Most of the cases the Fixes you create shouldn’t be any more complicated than this.

Things change when you want to use the Fix path language. For instance, when you need to evaluate deeply nested Perl hashes with paths like:

/foo/bar/1/*/test.$append

In these cases Fixes should extend the Catmandu::Fix::Base class and implement an emit function. In this post we shall create a ‘md5sum’ fix using emit functions.

Lets start easy by creating a skeleton fix that does nothing:

package Catmandu::Fix::nothing;

use Catmandu::Sane; 
use Moo;

with 'Catmandu::Fix::Base';

around BUILDARGS => sub {
    my ($orig, $class) = @_;
    $orig->($class);
};

sub emit {
    my ($self, $fixer) = @_;
    'undef';
}

1;

We first declare ‘nothing‘ as a subclass of Catmandu::Fix::Base using Moo roles (the ‘with’ keyword). With around BUILDARGS we define how a new instance of the ‘nothing’ Fix can be created. This is also the place where you can add any required arguments or options for this fix command (more on this later).

The emit subroutine contains the logic of the transformation and should return a string. This string contains Perl code to transform input data. Catmandu will compile the string to speed up the processing of fixes. In the example above we return undef that is translated into a null-operation after compilation.

When Catmandu compiles the Perl emit code above it will generate something like this:

sub {
    my $__0 = $_[0];
    eval {
        __FIX__1: {undef};
        1;
    } or do {
        my $__1 = $@;
        die $__1.Data::Dumper->Dump([$__0], [qw(data)]);
    };
    return $__0;
};

Noticed how the ‘undef’ of the emit code gets inserted next to the __FIX__1 directive? Catmandu will call this code with the input data as only argument. This maybe looks quite complicated, but it was created by Nicolas Steenlant to provide an easy (as in fast) and efficient (as in very fast) way to generate a lot of Perl code that can walk a deeply nested Perl hash.

Lets add some more code to explain the emit function a bit better. We will add hardcoded a string to add a ‘foo’ => ‘bar’ name-value pair in the input data:

sub emit {
    my ($self, $fixer) = @_;
    my $data = $fixer->var;
    "${data}->{foo} = 'bar'";
}

The attribute $fixer->var contains a reference to the input data we need to process. That is: $fixer->var contains the name of the variable that contains the input data. Internally in Catmandu this code gets compiled into:

sub {
    my $__0 = $_[0];
    eval {
        __FIX__1: { $__0->{foo} = 'bar'}};
        1;
    } or do {
        my $__1 = $@;
        die $__1.Data::Dumper->Dump([$__0], [qw(data)]);
    };
    return $__0;
};

We see that value of $fixer->var is the string ‘$__0’. When this subroutine is called with a Perl hash as input, then a new field ‘foo’ with value ‘bar’ is added to the input hash.

In the next examples we will skip the compiled code. Only try to remember that ‘emit’ returns a string that gets compiled into Perl code and that Catmandu::Fix has a lot of helper methods to generate a lot of Perl code to transform deeply nested hashes.

Lets try to do something with a deeply nested path. This is a pattern you will see in many of our Catmandu Fixes. In the code below we create a MD5 hash sum in a deeply nested Perl hash. We would like to say something like:

md5sum('my.deeply.nested.field');

or

md5sum('my.deeply.nested.*.field');

in case ‘nested’ was an array of hashes.

In pseudo Perl code this could be coded as:

$data->{my}->{deeply}->{nested}->{field} = md5sum($data->{my}->{deeply}->{nested}->{field});

and

for (@{$data->{my}->{deeply}->{nested}}) {
    $_->{field} = md5sum($_->{field});
}

In the first case you would end up with creating a lot of leaf nodes if ‘my.deeply.nested.field’ didn’t exist in the hash. In the second case you will end up with a lot of nested for loops for every array in the path.

Catmandu can help you to walk this path by breaking this operation into two steps:

  1. Walk the deeply nested hash until you hit my.deeply.nested (or my.deeply.nested.* in the second case).
  2. Set the ‘field’ value to a md5sum of itself.

In emit code this will look like:

01: sub emit {
02:    my ($self, $fixer) = @_;
03:    my $path = $fixer->split_path($self->path);
04:    my $key = pop @$path;
05:
06:    $fixer->emit_walk_path($fixer->var, $path, sub {
07:     my $var = shift;
08:     $fixer->emit_get_key($var, $key, sub {
09:     my $var = shift;
10:    "if (is_string(${var})) {" .
11:        "${var} = Digest::MD5::md5_hex(${var});" .
12:    "}";
13:    });
14:    });
15:}

n line 03 we read the path and split it into parts: my , deeply , nested , * , field.

In line 04 we create two things: 1) the path we want to walk [my, deeply, nested, *] and 2) the key ‘field’ we need to change into a md5sum.

In line 06 we start walking the deeply nested hash starting from $fixer->var which is the input data. For every end node in the path a callback function is called. This is the anonymous subroutine on line 06, which gets one argument: the current node.

In line 08 we change the current node by asking for the ‘field’ key and changing the value with a callback function.

All these $fixer->emits return strings that get concatenated into a very large string (depending on the size of your path) at the end of the emit_walk_path function. The resulting string will be compiled into Perl code as stated above.

Our new fixer can work on any type of path. We can fix:

md5sum("my.deeply.nested.field");

In this case emit_walk_path walks to ‘my.deeply.nested’ and changes ‘field’ into its MD5 value.

We can fix:

md5sum("my.deeply.nested.*.field");

In the case emit_walk_path loops into the ‘my.deeply.nested’ array of hashes and changes ‘field’ into its MD5 value.

There are other emit helper functions you can use.

Use:

$fixer->emit_get_key($var, $key, sub {});

to change the value of a variable that gets passed on by emit_walk_path.

Use:

$fixer->emit_delete_key($var, $key);

to delete a key from the variable that gets passed on by emit_walk_path

Use:

$fixer->emit_create_path($var, [ 'still','deeper','path'] , sub {});

to create a new path and possibly set its value.

Below we provide a complete example to generate the MD5 fix that can serve as a template for your fixes:

package Catmandu::Fix::md5sum;

use Catmandu::Sane;
use Digest::MD5;
use Moo;

with 'Catmandu::Fix::Base';

has path => (is => 'ro', required => 1);

around BUILDARGS => sub {
    my ($orig, $class, $path) = @_;
    $orig->($class, path => $path);
};

sub emit {
    my ($self, $fixer) = @_;
    my $path = $fixer->split_path($self->path);
    my $key = pop @$path;
    $fixer->emit_walk_path($fixer->var, $path, sub {
        my $var = shift;
        $fixer->emit_get_key($var, $key, sub {
            my $var = shift;
            "if (is_string(${var})) {" .
                "${var} = Digest::MD5::md5_hex(${var});" .
            "}";
        });
    });
}

1;