Category Archives: tungsten-replicator

Comparing MySQL to Vertica Replication under MemCloud, AWS and Bare Metal

Back in December, I did a detailed analysis for getting data into Vertica from MySQL using Tungsten Replicator, all within the Kodiak MemCloud.

I got some good numbers towards the end – 1.9 million rows/minute into Vertica. I did this using a standard replicator deployment, plus some tweaks to the Vertica environment. In particular:

  • Integer hash for a partition for both the staging and base tables
  • Some tweaks to the queries to ensure that we used the partitions in the most efficient manner
  • Optimized the batching within the applier to hit the right numbers for the transaction counts

That last one is a bit of a cheat because in a real-world situation it’s much harder to be able to identify those transaction sizes and row counts, but for testing, we’re trying to get the best performance!

Next what I wanted to do was set up some bare metal and AWS servers that were of an equivalent configuration and see what I could do to repeat and emulate the tests and see what comparable performance we could get.

How I Load Masses of Data

Before I dip into that, however, I thought it would be worth seeing how I generate the information in the first place. With big data testing (mainly when trying to simulate the data that ultimately gets written into your analytics target) the primary concern is one of reproducing the quantity as well as the variety of the data.

It’s application dependent, but for some analytics tasks the inserts are quite high and the updates/deletes relatively low. So I’ve written a test script that generates up to a million rows of data, split to be around 65% inserts, 25% updates and 10% deletes.

I can tweak that of course, but I’ve found it gives a good spread of data. I can also configure whether that happens in one transaction or each row is a transaction of its own. That all gets dumped into an SQL file. A separate wrapper script and tool then load that information into MySQL, either using redirection within the MySQL command line tool or through a different lightweight C++ client I wrote.

The data itself is light, two columns, an auto-incrementing integer ID and a random string. I’m checking for row inserts here, not data sizes.

So, to summarise:

  • Up to 1 million rows (although this is configurable)
  • Single or multiple transactions
  • Single schema/table or numerous schemas/tables
  • Concurrent, multi-threaded inserts

The fundamental result here is that I can predict the number of transactions and rows, which is really important when you are trying to measure rows-per-time period to use as benchmarks with replication because I can also start and stop replication on the transaction count boundaries to get precise performance.

For the main testing that I use for the performance results, what I do is run a multi-threaded, simultaneous insert into 20 schemas/tables and repeat it 40 times with a transaction/row count size of 10,000. That results in 8,000,000 rows of data, first being inserted/updated/deleted into MySQL, then extracted, replicated, and applied to (in this case) Vertica.

For the testing, I then use the start/stop of sequence number controls in the replicator and then monitor the time I start and stop from those numbers.

This gives me stats within about 2 seconds of the probably true result, but over a period of 15-20 minutes, that’s tolerable.

It also means I can do testing in two ways:

  • Start the load test into MySQL and test for completion into Vertica

or

  • Load the data into THL, and test just the target applier (from network transfer to target DB)

For the real-world performance I use the full end-to-end (MySQL insert and target apply) testing

Test Environments

I tested three separate environments, the original MemCloud hosted servers, some bare metal hosts and AWS EC2 hosts:

MemCloud Bare Metal AWS
Cores

4

12

16

Threads

4

12

16

RAM

64

192

122

Disk

SSD

SSD

SSD

Networking

10GB

10GB

25GB

It’s always difficult to perfectly match the environments across virtual and bare metal, particularly in AWS, but I did my best.

Results

I could go into all sorts of detailed results here, but I think it’s better to simply look at the final numbers because that is what really matters:

Rows Per Minute
Memcloud

1900000

Bare Metal

678222

AWS

492893

Now what’s interesting here is that MemCloud is significantly faster, even though there are fewer CPUs and even lower RAM requirements. It’s perhaps even more surprising to note that MemCloud is more than 4.5x times faster than AWS, even on I/O optimized hosts (probably the limiting factor in Vertica applies).

graph1

 

Even against fairly hefty bare metal hosts, MemCloud is almost 3x faster!

I’ve checked in with the engineers on the Bare Metal which seem striking, especially considering these are really beefy hosts, but it may simply be the SSD interface and I/O that becomes a limiting factor. Within Vertica when writing data with the replicator a few things are happening, we write THL to disk, CSV to disk, read CSV from disk into a staging table, then merge the base and staging tables which involves shuffling a lot of blocks in memory (and ultimately disk) around. It may simply be that the high-memory focused environment of MemCloud allows for very much faster performance all round.

I also looked at the performance as I started to increase the number of MySQL sources feeding into the systems, this is to separate schemas, rather than the single, unified schema/table within Vertica.

Sources

1

1

2

3

4

5

Target Schemas

20

40

40

60

80

100

Rows Written

8000000

8000000

16000000

24000000

32000000

40000000

Memcloud

1900057

1972000

3617042

5531460

7353982

9056410

Bare Metal

678222

635753

1051790

1874454

2309055

3168275

AWS

492893

402047

615856

What is significant here is that with MemCloud I noticed a much more linear ramp up in performance that I didn’t see to the same degree within the Bare metal or AWS. In fact, with AWS I couldn’t even remotely achieve the same levels and by the time I got to three simultaneous sources I got such wildly random results between executions that I gave up trying to test. From experience, I suspect this is due to the networking an IOPS environment, even on a storage optimized host.

The graph version shows the differences more clearly:

graph2

 

Bottom line, MemCloud seems really quick, and the statement I made in the original testing still seems to be valid:

The whole thing went so quick I thought it hadn’t executed at all!

MariaDB to Hadoop in Spanish

Nicolas Tobias has written an awesome guide to setting up replication from MariaDB to Hadoop/HDFS using Tungsten Replicator, in Spanish! He’s planning more of these so if you like what you see, please let him know!

Semana santa y yo con nuevas batallas que contar.
Me hayaba yo en el trabajo, pensando en que iba a invertir la calma que acompa;a a los dias de vacaciones que libremente podemos elegir trabajar y pense: No seria bueno terminar esa sincronizacion entre los servidores de mariaDB y HIVE?

Ya habia buscado algo de info al respecto en Enero hasta tenia una PoC montada con unas VM que volvi a encender, pero estaba todo podrido: no arrancaba, no funcionba ni siquiera me acordaba como lo habia hecho y el history de la shell er un galimatias. Decidi que si lo rehacia todo desde cero iba a poder dejarlo escrito en un playbook y ademas, aprenderlo y automatizarlo hasta el limite de poder desplegar de forma automatica on Ansible.

via De MariaDB a HDFS: Usando Continuent Tungsten. Parte 1 | run.levelcin.co

Analytical Replication Performance from MySQL to Vertica on MemCloud

I’ve recently been trying to improve the performance of the Vertica replicator, particularly in the form of the of the new single schema replication. We’ve done a lot in the new Tungsten Replicator 5.3.0 release to improve (and ultimately support) the new single schema model.

As part of that, I’ve also been personally looking to Kodiak MemCloud as a deployment platform. The people at Kodiak have been really helpful (disclaimer: I’ve worked with some of them in the past). MemCloud is a high-performance cloud platform that is based on hardware with high speed (and volume) RAM, SSD and fast Ethernet connections. This means that even without any adjustment and tuning you’ve got a fast platform to work on.

However, if you are willing to put in some extra time, you can tune things further. Once you have a super quick environment, you find you can tweak and update the settings a little more because you have more options available.  Ultimately you can then make use of that faster environment to stretch things a little bit further. And that’s exactly what I did when trying to determine how quickly I could get data into Vertica from MySQL.

In fact, the first time I ran my high-load test suite on MemCloud infrastructure, replicating data from MySQL into Vertica, I made this comment to my friend at Kodiak:

The whole thing went so quick I thought it hadn’t executed at all!

In general, there are two things you want to test when using replication to move data from a transactional environment into an analytical one:

  • Latency of moving the data
  • Apply rate for moving the data

The two are subtly different. The first measures how long it takes to get the data from the source to the target. The second measures how much data you can move in a set period of time.

Depending on your deployment and application, either, or both, can be critical. For example, if you are using analytics to perform real-time analysis and charging on your data, the first one is the most important, because you want the info up to date as quickly as possible. If you performing log analysis or longer-term trends, the second is probably more important. You may not worry about being a few seconds, but you want many thousands of transactions to be transferred. I concentrated on the former rather than the latter, because latency in the batch applier is something you can control by setting the batch interval.

So what did I test?

At a basic level, I was replicating data from MySQL directly into Vertica. That is, extracting data from the MySQL binary log, and writing that into a table within HPE Vertica cluster using 3 nodes. Each is running in MemCloud, which each running with 64GB of RAM and 2TB of SSD disk space. I’ve deliberately made no configuration changes to Vertica at this point.

The first thing I did was set-up a basic replication pipeline between the two. Replication into Vertica works by batch-loading data through CSV files into Vertica tables and then ‘materialising’ the changes into the carbon copy tables. Because it’s done in batches, the latency is effectively governed by the batch apply settings, which were configured for 10,000 rows or 5 seconds.

To generate the load, I’ve written a script that generates 100,000 rows of random data, then updates about 70% of those randomly and deletes the other 30%. So each schema is generating about 200,000 rows of changes for each load. This is designed to test the specific batch replication scenario. Ultimately it does this across multiple schemas (same structure). I specifically use this because I match this with the replication to get replication (rather than transaction) statistics. I need to be able to effectively monitor the apply rate into Vertica from MySQL.

The first time I ran the process of just generating the data and inserting into MySQL, the command returned almost immediately. I seriously thought it had failed because I couldn’t believe I’d just inserted 200,000 rows into MySQL that quick. Furthermore, over on the Vertica side, I’m monitoring the application through the trepctl perf command, which provides the live output of the process. And for a second I see the blip as the data is replicated and then applied. I thought it was so quick, it was a single row (or even failed transaction) that caused the blip.

The first time I ran the tests, I got some good results with 20 simultaneous schemas:

  • 460,000 rows/minute from a single MySQL source into Vertica. 

Then I doubled up the source MySQL servers, so two servers, 40 simultaneous schemas, and ultimately writing in about 8 million rows:

  • 986,000 rows/minute into Vertica across 40 schemas from 2 sources

In both cases, the latency was between 3-7 seconds for each batch write (remember we are handling 10,000 rows or 5s per batch). We are also doing this across *different* schemas at this stage. These are not bad figures.

I did some further tweaks, this time, reconfiguring the batch writes to do larger blocks and larger intervals. This increases the potential latency (because there will be bigger gaps between writes into Vertica), but increases the overall row-apply performance. Now we are handling 100,000 rows or 10s intervals. The result? A small bump for a single source server:

  • 710,000 rows/minute into Vertica across 20 schemas from 1 source

Latency has increased though, with us topping out at around 11.5s for the write when performing the very big blocks. Remember this is single-source, and so I know that the potential is there to basically double that with a second MySQL source since the scaling seems almost linear.

Now I wanted to move on to test a specific scenario I added into the applier, which is the ability to replicate from multiple source schemas into a single target schema. Each source is identical, and to ensure that the ‘materialise’ step works correctly, and that we can still analyse the data, a filter is inserted into the replication that adds the source schema to each row.

The sample data inserts look like this:

insert into msg values (0,"RWSAjXaQEtCf8nf5xhQqbeta");
insert into msg values (0,"4kSmbikgaeJfoZ6gLnkNbeta");
insert into msg values (0,"YSG4yeG1RI6oDW0ohG6xbeta");

With the filter, what gets inserted is;

0, "RWSAjXaQEtCf8nf5xhQqbeta", "sales1"
0,"4kSmbikgaeJfoZ6gLnkNbeta", "sales1"

Etc, where ‘sales1’ is the source schema, added as an extra column to each row.

This introduces two things we need to handle on the Vertica side:

  1. We now have to merge taking into account the source schema (since the ID column of the data could be the same across multiple schemas). For example, whereas before we did ‘DELETE WHERE ID IN (xxxx)’, and now we have to do ‘DELETE WHERE ID IN (xxxx) AND dbname = ‘sales1”.
  2. It increases the contention ratio on the Vertica table because we now effectively write into only one table. This increases the locks and the extents processed by Vertica.

The effect of this change is that the overall apply rate slows down slightly due to the increased contention on a single table. Same tests, 20 schemas from one MySQL source database and we get the following:

  • 760,000 rows/minute into Vertica with a single target table

This is actually not as bad as I was expecting when you consider that we are modifying every row of incoming data, and are no longer able to multi-thread the apply.

I then tried increasing that using the two sources and 40 schemas into the same single table. Initially, the performance was no longer linear, and I failed to get any improvement beyond about 10% above that 760K/min figure.

Now it was time to tune other things. First of all, I changed some of the properties on the Vertica side in terms of the queries I was running, tweaking the selecting and query parameters for the DELETE operations. For batch loading, what we do is DELETE and then INSERT, or, in some case, DELETE and UPDATE if you’ve configured it that way. Tweaking the subquery that is being used increased the performance a little.

Changing the projections used also increased the performance of the single schema apply. But the biggest gains, perhaps unsurprisingly, were to change the way the tables were defined in the first place and to use partitions in the table definition. To do this, I modified the original filter that was adding the schema name, and instead had it add the schema hash, a unique Java ID for the string. Then I created the staging and base tables in Vertica using the integer hash as the partition. Then I modified the queries to ensure that the partitions would be used effectively.

The result was that the 760k/min rate was now scalable. I couldn’t get any faster when writing into a single schema, but the rate remains relatively constant whether I am replicating five schemas into a single one, or 40 schemas from two or three sources into the same. In fact, it does ultimately start to dip slightly as you add more source schemas.

Even better, the changes I’d made to the queries and the overall Vertica batch applier also improved the speed of the standard (i.e. multi-schema) applier. I also added partitions to the ID field for too to improve the general apply rate. After testing for a couple days, the average rate:

  • 1,900,000 rows/minute from a single MySQL source into Vertica.

This was also scalable. Five MySQL sources elicited a rate of 8.8 million rows/minute into Vertica, making the applier rate linear with a 1% penalty for each additional MySQL source. The latency stayed the same, hovering around the same level as before of around 11s for most of the time. Occasionally you’d get a spike because Vertica was having trouble keeping up, but

Essentially, we are replicating data from MySQL into Vertica for analytics at a rate I simply called ‘outstandingly staggering’. I still do.

The new single schema applier, database name filter (rowadddbname) and performance improvements are all incorporated into the new Tungsten Replicator.

Keynote and Session at Percona Live Dublin 2017

On Sunday I will travel over to Dublin for Percona Live 2017.

I have two sessions, a keynote on the Wednesday morning where I’ll be talking about all the fun new stuff we have planned at Continuent and some new directions we’re working on.

I also have a more detailed session on our new appliers for Kafka, Elasticsearch and Cassandra, that’s Tuesday morning.

MCBrown-Keynote.jpg

If you haven’t already booked to come along, feel free to use the discount code SeeMeSpeakPLE17 which will get you 15% off!

Upcoming Webinar, 19th July, What is New in Tungsten Replicator 5.2 and Tungsten Clustering 5.2?

Continuent Tungsten 5.2 is just around the corner. This is one of our most exciting Tungsten product releases for some time!

In this webinar we’re going to have a look at a host of new features in the new release, including
Three new Replication Applier Targets (Kafka, Cassandra and Elasticsearch)
New improvements to our core command-line tools trepctl and thl
New foundations for our filtering services, and
Improvements to the compatibility between replication and clustering

This webinar is going be a packed session and we’ll show all the exciting stuff with more in-depth follow-up sessions in the coming weeks.

 

You’ll also learn about some more exciting changes coming in the upcoming Tungsten releases (5.2.1 and 5.3), and our major Tungsten 6.0 release due out by the end of the year.

So come and join us to get the low down on everything related to Tungsten Replicator 5.2 and Tungsten Clustering 5.2. on Wednesday, July 19, 2017 9:00 AM – 9:30 AM PDT at https://attendee.gotowebinar. com/register/ 4108437731342545667

New Continuent Webinar Wednesdays and Training Tuesdays

We are just starting to get into the swing of setting up our new training and webinar schedule.
Initially, there will be one Webinar session (typically on a Wednesday) and one training session (on a Tuesday) every week from now. We’ll be covering a variety of different topics at each.
Typically our webinars will be about products and features, comparisons to other products, mixed in with product news (new releases, new features) and individual sessions based on what is going on at Continuent and the market in general.
Our training, by comparison, is going to be a hands-on, step-by-step sequence covering all of the different areas of our product. So we’ll cover everything from the basics of how the products work, how to deploy them, typical functionality (switching, start/stop, etc), and troubleshooting.
All of the sessions are going to be recorded and we’ll produce a suitable archive page so that you can go and view the past sessions. Need a refresher on re-provisioning a node in your cluster? There’s going to be a video for it and documentation to back it up.
Our first webinar is actually next Thursday (the Wednesday rule wouldn’t be a good one without an exception) and is all about MySQL Multi-Site/Multi-Master Done Right:
In this webinar, we discuss what makes Tungsten Clustering better than other alternatives (AWS RDS, Galera, MySQL InnoDB Cluster, and XtraDBCluster), especially for geographically distributed multi-site deployments, both for disaster recovery and multi-site/multi-master needs.
If you want to attend, please go ahead and register using this link: http://go.continuent.com/n0JI04Q03EAV0RD0i000Vo4
Keep your eyes peeled for the other upcoming sessions. More details soon.

Extending the Tungsten Replicator Core JS Filter Functionality

Tungsten Replicator has a really cool feature in that we can filter data as it goes past on the wire.
The replicator itself is written entirely in Java and writing filters for it is not as straightforward as it looks, which is why the much better feature is just to use the JavaScript mechanism and write filters using that tool instead. I’ll save the details for how you can write filters to process and massage data for another time, but right now I wanted to find a good way of improving that JavaScript environment.
There are filters, for example, where I want to be able to load JSON option and configuration files, or write out JSON versions of information, and plenty more.
Mozilla’s Rhino JS environment is what is used to provide the internal JS environment for running filters. The way this is supported is that rather than creating a Rhino JS environment that can do whatever it wants, instead, we create a JS instance specifically for executing the required functions within the filter. One of these instances is created for each filter that is configured in the system (and each batch instance too).
The reason we do this is because for each filter, we want each transaction event that appears in the THL log to get executed through the JS instance where the filter() function in the JS filter is executed with a single argument, the event data.
The limitation of this model is that we dont get the full Rhino environment because we execute the JS function directly, so certain top level items and functions like load() or require(), or utilities like JSON.stringify() are not available. We could do that by changing the way we do the configuration, but that could start to get messy quickly, while also complicating the security aspects of how we execute these components.
There are some messy ways in which we could get round this, but in the end, because I also wanted to add some general functionality into the filters system shared across all JS instances, I chose instead to just load a set of utility functions, written in JavaScript, into the JS instance for the filter. The wonderful thing about JS is that we can write all of the functions in JS, even for classes, methods and functions that aren’t provided elsewhere.
So I chose the path of least resistance, which means loading and executing a core JS file before loading and executing the main filter JS so that. We can place into that JS file all of the utility functions we want to be available to all of the filters.
So, to enable this the first thing we do is update the core Java code when we load the filter JS to load our core utility JS first. That occurs in replicator/src/java/com/continuent/tungsten/replicator/filter/JavaScriptFilter.java, within the prepare() function which is where we instantiate the JS environment based on the code.
String coreutilssrc = properties.getString("replicator.filter.coreutils");

// Import the standard JS utility script first
try
 {
 // Read and compile the core script functions
 BufferedReader inbase = new BufferedReader(new FileReader(coreutilssrc));
 script = jsContext.compileReader(inbase, scriptFile, 0, null);
 inbase.close();

 script.exec(jsContext, scope);
 }
catch (IOException e)
 {
 throw new ReplicatorException("Core utility library file not found: "
 + coreutilssrc, e);
 }
catch (EvaluatorException e)
 {
 throw new ReplicatorException(e);
 }
This is really straightforward, we obtain the path to the core utilities script from the configuration file (we’ll look at how we define that later), and then compile that within the jsContext object, where our JavaScript is being executed. We add some sensible error checking, but otherwise this is simple.
It’s important to note that this is designed to load that core file *before* the main filter file just in case we want to use anything in there.
Next, that configuration line, we can add into a default config by creating a suitable ‘template’ file for tpm, which we do by creating the file replicator/samples/conf/filters/default/coreutils.tpl. I’ve put it into the filters section because it only applies to filter environments.
The content is simple, it’s the line with the location of our core utility script:
# Defines the core utility script location
replicator.filter.coreutils=${replicator.home.dir}/support/filters-javascript/coreutils.js

And lastly, we need the script itself, replicator/support/filters-javascript/coreutils.js :
// Core utility JavaScript and functions for use in filters
//
// Author: MC Brown (9af05337@opayq.com)


// Simulate the load() function to additional external JS scripts

function load(filename) {
    var file = new java.io.BufferedReader(new java.io.FileReader(new java.io.File(filename)));

    var sb = "";
    while((line = file.readLine()) != null)
        {
            sb = sb + line + java.lang.System.getProperty("line.separator");
        }

    eval(sb);
}

// Read a file and evaluate it as JSON, returning the evaluated portion

function readJSONFile(path)
{
    var file = new java.io.BufferedReader(new java.io.FileReader(new java.io.File(path)));

    var sb = "";
    while((line = file.readLine()) != null)
        {
            sb = sb + line + java.lang.System.getProperty("line.separator");
        }

    jsonval = eval("(" + sb + ")");

    return jsonval;
}

// Class for reoncstituing objects into JSON

JSON = {
    parse: function(sJSON) { return eval('(' + sJSON + ')'); },
    stringify: (function () {
      var toString = Object.prototype.toString;
      var isArray = Array.isArray || function (a) { return toString.call(a) === '[object Array]'; };
      var escMap = {'"': '\"', '\': '\\', 'b': '\b', 'f': '\f', 'n': '\n', 'r': '\r', 't': '\t'};
      return function stringify(value) {
        if (value == null) {
          return 'null';
        } else if (typeof value === 'number') {
          return isFinite(value) ? value.toString() : 'null';
        } else if (typeof value === 'boolean') {
          return value.toString();
        } else if (typeof value === 'object') {
          if (typeof value.toJSON === 'function') {
            return stringify(value.toJSON());
          } else if (isArray(value)) {
            var res = '[';
            for (var i = 0; i < value.length; i++)
              res += (i ? ', ' : '') + stringify(value[i]);
            return res + ']';
          } else if (toString.call(value) === '[object Object]') {
            var tmp = [];
            for (var k in value) {
              if (value.hasOwnProperty(k))
                tmp.push(stringify(k) + ': ' + stringify(value[k]));
            }
            return '{' + tmp.join(', ') + '}';
          }
        }
        return '"' + value.toString() + '"';
      };
    })()
  };

For the purposes of validating my process, there are three functions:
  • load() – which loads an external JS file and executes it, so that we can load other JS scripts and libraries.
  • readJSONFile() – which loads a JSON file and returns it as a JSON object.
  • JSON class – which does two things, one is provides  JSON.parse() method for parsing strings as JSON objects into JS objects and the other is JSON.stringify() which will turn a JS object back into JSON
Putting all of this together gives you a replicator where we now have some useful functions to make writing JavaScript filters easier. I’ve pushed all of this up into my fork of the Tungsten Replicator code here: https://github.com/mcmcslp/tungsten-replicator/tree/jsfilter-enhance
Now, one final note. Because of the way load() works, in terms of running an eval() on the code to import it, it does mean that there is one final step to make functions useful. To explain what I mean, let’s say you’ve written a new JS filter using the above version of the replicator.
In your filter you include the line:
load("/opt/continuent/share/myreallyusefulfunctions.js");
Within that file, you define a function called runme():
function runme()
{
     logger.info("I'm a bit of text");
}

Now within myreallyusefulfunctions.js I can call that function fine:
runme();
But from within the JS filter, runme() will raise an unknown function error. The reason is that we eval()‘d the source file within the load() function, and so it’s context is wrong.
We can fix that within myreallyusefulfunctions.js by exporting the name explicitly:
if (runme.name) this[runme.name] = runme;
This points the parent namespace to the runme() in this context, and we put that at the end of myreallyusefulfunctions.js script and everything is fine.
I’m lazy, and I haven’t written a convenient function for it, but I will in a future blog.
Now we’ve got this far, let’s start building some useful JS functions and functionality to make it all work nicely…

Extending the Tungsten Replicator Core JS Filter Functionality

Tungsten Replicator has a really cool feature in that we can filter data as it goes past on the wire.

The replicator itself is written entirely in Java and writing filters for it is not as straightforward as it looks, which is why the much better feature is just to use the JavaScript mechanism and write filters using that tool instead. I’ll save the details for how you can write filters to process and massage data for another time, but right now I wanted to find a good way of improving that JavaScript environment.

There are filters, for example, where I want to be able to load JSON option and configuration files, or write out JSON versions of information, and plenty more.

Mozilla’s Rhino JS environment is what is used to provide the internal JS environment for running filters. The way this is supported is that rather than creating a Rhino JS environment that can do whatever it wants, instead, we create a JS instance specifically for executing the required functions within the filter. One of these instances is created for each filter that is configured in the system (and each batch instance too).

The reason we do this is because for each filter, we want each transaction event that appears in the THL log to get executed through the JS instance where the filter() function in the JS filter is executed with a single argument, the event data.

The limitation of this model is that we dont get the full Rhino environment because we execute the JS function directly, so certain top level items and functions like load() or require(), or utilities like JSON.stringify() are not available. We could do that by changing the way we do the configuration, but that could start to get messy quickly, while also complicating the security aspects of how we execute these components.

There are some messy ways in which we could get round this, but in the end, because I also wanted to add some general functionality into the filters system shared across all JS instances, I chose instead to just load a set of utility functions, written in JavaScript, into the JS instance for the filter. The wonderful thing about JS is that we can write all of the functions in JS, even for classes, methods and functions that aren’t provided elsewhere.

So I chose the path of least resistance, which means loading and executing a core JS file before loading and executing the main filter JS so that. We can place into that JS file all of the utility functions we want to be available to all of the filters.

So, to enable this the first thing we do is update the core Java code when we load the filter JS to load our core utility JS first. That occurs in replicator/src/java/com/continuent/tungsten/replicator/filter/JavaScriptFilter.java, within the prepare() function which is where we instantiate the JS environment based on the code.

String coreutilssrc = properties.getString(“replicator.filter.coreutils”);

// Import the standard JS utility script first
try
 {
 // Read and compile the core script functions
 BufferedReader inbase = new BufferedReader(new FileReader(coreutilssrc));
 script = jsContext.compileReader(inbase, scriptFile, 0, null);
 inbase.close();

 script.exec(jsContext, scope);
 }
catch (IOException e)
 {
 throw new ReplicatorException("Core utility library file not found: "
 + coreutilssrc, e);
 }
catch (EvaluatorException e)
 {
 throw new ReplicatorException(e);
 }

This is really straightforward, we obtain the path to the core utilities script from the configuration file (we’ll look at how we define that later), and then compile that within the jsContext object, where our JavaScript is being executed. We add some sensible error checking, but otherwise this is simple.

It’s important to note that this is designed to load that core file *before* the main filter file just in case we want to use anything in there.

Next, that configuration line, we can add into a default config by creating a suitable ‘template’ file for tpm, which we do by creating the file replicator/samples/conf/filters/default/coreutils.tpl. I’ve put it into the filters section because it only applies to filter environments.

The content is simple, it’s the line with the location of our core utility script:

# Defines the core utility script location
replicator.filter.coreutils=${replicator.home.dir}/support/filters-javascript/coreutils.js

And lastly, we need the script itself, replicator/support/filters-javascript/coreutils.js :

// Core utility JavaScript and functions for use in filters
//
// Author: MC Brown (9af05337@opayq.com)


// Simulate the load() function to additional external JS scripts

function load(filename) {
    var file = new java.io.BufferedReader(new java.io.FileReader(new java.io.File(filename)));

    var sb = "";
    while((line = file.readLine()) != null)
        {
            sb = sb + line + java.lang.System.getProperty("line.separator");
        }

    eval(sb);
}

// Read a file and evaluate it as JSON, returning the evaluated portion

function readJSONFile(path)
{
    var file = new java.io.BufferedReader(new java.io.FileReader(new java.io.File(path)));

    var sb = "";
    while((line = file.readLine()) != null)
        {
            sb = sb + line + java.lang.System.getProperty("line.separator");
        }

    jsonval = eval("(" + sb + ")");

    return jsonval;
}

// Class for reoncstituing objects into JSON

JSON = {
    parse: function(sJSON) { return eval('(' + sJSON + ')'); },
    stringify: (function () {
      var toString = Object.prototype.toString;
      var isArray = Array.isArray || function (a) { return toString.call(a) === '[object Array]'; };
      var escMap = {'"': '\\"', '\\': '\\\\', '\b': '\\b', '\f': '\\f', '\n': '\\n', '\r': '\\r', '\t': '\\t'};
      return function stringify(value) {
        if (value == null) {
          return 'null';
        } else if (typeof value === 'number') {
          return isFinite(value) ? value.toString() : 'null';
        } else if (typeof value === 'boolean') {
          return value.toString();
        } else if (typeof value === 'object') {
          if (typeof value.toJSON === 'function') {
            return stringify(value.toJSON());
          } else if (isArray(value)) {
            var res = '[';
            for (var i = 0; i < value.length; i++)
              res += (i ? ', ' : '') + stringify(value[i]);
            return res + ']';
          } else if (toString.call(value) === '[object Object]') {
            var tmp = [];
            for (var k in value) {
              if (value.hasOwnProperty(k))
                tmp.push(stringify(k) + ': ' + stringify(value[k]));
            }
            return '{' + tmp.join(', ') + '}';
          }
        }
        return '"' + value.toString() + '"';
      };
    })()
  };

For the purposes of validating my process, there are three functions:

  • load() – which loads an external JS file and executes it, so that we can load other JS scripts and libraries.
  • readJSONFile() – which loads a JSON file and returns it as a JSON object.
  • JSON class – which does two things, one is provides  JSON.parse() method for parsing strings as JSON objects into JS objects and the other is JSON.stringify() which will turn a JS object back into JSON

Putting all of this together gives you a replicator where we now have some useful functions to make writing JavaScript filters easier. I’ve pushed all of this up into my fork of the Tungsten Replicator code here: https://github.com/mcmcslp/tungsten-replicator/tree/jsfilter-enhance

Now, one final note. Because of the way load() works, in terms of running an eval() on the code to import it, it does mean that there is one final step to make functions useful. To explain what I mean, let’s say you’ve written a new JS filter using the above version of the replicator.

In your filter you include the line:

load("/opt/continuent/share/myreallyusefulfunctions.js");

Within that file, you define a function called runme():

function runme()
{
     logger.info("I'm a bit of text");
}

Now within myreallyusefulfunctions.js I can call that function fine:

runme();

But from within the JS filter, runme() will raise an unknown function error. The reason is that we eval()‘d the source file within the load() function, and so it’s context is wrong.

We can fix that within myreallyusefulfunctions.js by exporting the name explicitly:

if (runme.name) this[runme.name] = runme;

This points the parent namespace to the runme() in this context, and we put that at the end of myreallyusefulfunctions.js script and everything is fine.

I’m lazy, and I haven’t written a convenient function for it, but I will in a future blog.

Now we’ve got this far, let’s start building some useful JS functions and functionality to make it all work nicely…


Tungsten Replicator 3.0 is Cloudera Enterprise 5 Certified

One of the key platforms I’ve been testing on for the MySQL to Hadoop replication has been Cloudera, largely driven by customer requirements, but it’s also one of the easiest way to get started with Hadoop.

logo_cloudera_certified

What I’m even more pleased about is the fact that we are proud to announce that Tungsten Replicator 3.0 is certified for use on the new Cloudera Enterprise 5 platform. That means that we’re sure that replicating your data from MySQL to Cloudera 5 and have it work without causing problems or difficulties on the Hadoop loading and materialisation.

Cloudera is a great product, and we’re very happy to be working so effectively with the new Cloudera Enterprise 5. Cloudera certainly makes the core operation of managing and monitoring your Hadoop cluster so much easier, while still providing core functionality from the Hadoop family like Hive, HBase and Impala.

What I’m really interested in is the support for Spark, which will allow much easier live-querying and access to data.  That should make some data processing and live data views much easier to build and query further down the line.


Tungsten Replicator 3.0 is Cloudera Enterprise 5 Certified

One of the key platforms I’ve been testing on for the MySQL to Hadoop replication has been Cloudera, largely driven by customer requirements, but it’s also one of the easiest way to get started with Hadoop.

logo_cloudera_certified

What I’m even more pleased about is the fact that we are proud to announce that Tungsten Replicator 3.0 is certified for use on the new Cloudera Enterprise 5 platform. That means that we’re sure that replicating your data from MySQL to Cloudera 5 and have it work without causing problems or difficulties on the Hadoop loading and materialisation.

Cloudera is a great product, and we’re very happy to be working so effectively with the new Cloudera Enterprise 5. Cloudera certainly makes the core operation of managing and monitoring your Hadoop cluster so much easier, while still providing core functionality from the Hadoop family like Hive, HBase and Impala.

What I’m really interested in is the support for Spark, which will allow much easier live-querying and access to data.  That should make some data processing and live data views much easier to build and query further down the line.