{"id":6890,"date":"2014-02-27T19:09:34","date_gmt":"2014-02-27T19:09:34","guid":{"rendered":"http:\/\/mcslp.wordpress.com\/?p=10002"},"modified":"2014-02-27T19:09:34","modified_gmt":"2014-02-27T19:09:34","slug":"real-time-replication-from-mysql-to-cassandra","status":"publish","type":"post","link":"https:\/\/planet.mcb.guru\/?p=6890","title":{"rendered":"Real-Time Replication from MySQL to Cassandra"},"content":{"rendered":"<p>Earlier this month I blogged about our new Hadoop applier, I published the docs for that this week (<a href=\"http:\/\/docs.continuent.com\/tungsten-replicator-3.0\/deployment-hadoop.html\">http:\/\/docs.continuent.com\/tungsten-replicator-3.0\/deployment-hadoop.html<\/a>) as part of the Tungsten Replicator 3.0 documentation (<a href=\"http:\/\/docs.continuent.com\/tungsten-replicator-3.0\/index.html\">http:\/\/docs.continuent.com\/tungsten-replicator-3.0\/index.html<\/a>). It contains some additional interesting nuggets that will appear in future blog posts.<\/p>\n<p>The main part of that functionality that performs the actual applier for Hadoop is based around a JavaScript applier engine &#8211; there will\u00a0eventually be docs for that as part of the Batch Applier content (<a href=\"http:\/\/docs.continuent.com\/tungsten-replicator-3.0\/deployment-batchloading.html\">http:\/\/docs.continuent.com\/tungsten-replicator-3.0\/deployment-batchloading.html<\/a>). The core of this system is that it \u00a0 \u00a0takes the information from the data stream of the THL and the CSV file that was written by the batch applier system, and runs the commands necessary to load it into Hadoop and perform any necessary merges.<\/p>\n<p>I wanted to see how easy it would be to use the same system to use that same flexible system and bend it to another database system, in my case, I chose Cassandra.<\/p>\n<p>For the record, it took me a couple of hours to have this working, and I&#8217;m guessing another hour will file down some of the rough edges.<\/p>\n<p>Cassandra is interesting as a database because it mixes a big distributed key\/value store with a close-enough to SQL like interface in the form of CQL. And that means we can make use of the CQL to help us perform the merging into the final tables in a manner not dissimilar to the method we use for loading into Vertica.<\/p>\n<p>Back to the Javascript batch loader, the applier provides five different implementable functions (all are technically optional) that you can use at different stages of the applier process. These are:<\/p>\n<ul>\n<li>prepare() &#8211; called once when the applier goes online and can be used to create temporary directories or spaces<\/li>\n<li>begin() &#8211; called at the start of each transaction<\/li>\n<li>apply() &#8211; called at the end of the transaction once the data file has been written, but before the commit<\/li>\n<li>commit() &#8211; called after each transaction commit has taken place; this where we can consolidate info.<\/li>\n<li>release() &#8211; called when the applier goes offline<\/li>\n<\/ul>\n<p>We can actually align these functions with a typical transaction &#8211; prepare() happens before the statements even start, begin() is the same as BEGIN, apply() happens immediately before COMMIT and commit() happens just after. release() can be used to do any clean up afterwards.<\/p>\n<p>So let\u2019s put this into practice and use it for Cassandra.<\/p>\n<p>The basic process for loading is as follows:<\/p>\n<ol>\n<li>Write a CSV file to load into Cassandra<\/li>\n<li>Load the CSV file into a staging table within Cassandra; this is easy through CQL using the &#8216;COPY tablename FROM filename\u2019 CQL statement.<\/li>\n<li>Merge the staging table data with a live table to create a carbon copy of our MySQL table content.<\/li>\n<\/ol>\n<p>For the loading portion, what we\u2019ll do is load the CSV into a staging table, and then we\u2019ll merge the staging table and live table data together during the commit stage of our batch applier. We\u2019ll return to this in more detail.<\/p>\n<p>For the merging, we\u2019ll take the information from the staging table, which includes the sequence number and operation type, and then write the\u00a0\u2018latest\u2019 version of that row and put it into the live table. That gives us a structure like this:<\/p>\n<p><a href=\"http:\/\/mcslp.files.wordpress.com\/2014\/02\/cassandra-loader.png\"><img class=\"alignnone size-medium wp-image-10003\" alt=\"Cassandra Loader\" src=\"http:\/\/mcslp.files.wordpress.com\/2014\/02\/cassandra-loader.png?w=300&#038;h=222\" width=\"300\" height=\"222\" \/><\/a><\/p>\n<p>Tungsten Replicator is going to manage this entire process for us &#8211; all we need to do ins install the replicators, plug in these custom bits, and let it run.<\/p>\n<p>As with the Hadoop applier, what we\u2019re going to do is use the batch applier to generate only insert and delete rows; UPDATE statements will be converted into a delete of the\u00a0original version and insert of the new version. So:<\/p>\n<pre><code>INSERT INTO sample VALUES (1,\u2019Message\u2019)<\/code><\/pre>\n<p>Is an insert\u2026<\/p>\n<pre><code>DELETE sample WHERE id \u00a0= 1<\/code><\/pre>\n<p>Is a delete, and:<\/p>\n<pre><code>UPDATE sample SET message =\u00a0\u2019Now you see me\u2019 WHERE id = 1<\/code><\/pre>\n<p>is actually:<\/p>\n<pre><code>DELETE sample WHERE id \u00a0= 1\r\n INSERT INTO sample VALUES (1,\u2019Now you see me\u2019)<\/code><\/pre>\n<p>This gets round the problem of doing updates (which in big data stores are expensive, particularly Hadoop which doesn\u2019t support updating existing data), into a more efficient delete and insert.<\/p>\n<p>In the CSV data itself, this is represented by prefix every row with three fields:<\/p>\n<pre><code>optype, sequence number,\u00a0unique id<\/code><\/pre>\n<p>Optype is\u00a0\u2018D\u2019 for a delete and\u00a0\u2018I\u2019 for an insert and is used to identify what needs to be done. The sequence number is the unique transaction ID from the replicator THL. This number increases by one for every transaction,\u00a0and this means we can always identify the\u00a0\u2018latest\u2019 version of a row, which is important to us when processing\u00a0the transaction into Cassandra.\u00a0the unique ID is the primary key (or compound key) from the source data. We need this to ensure we update the right row. To replicate data in this way, we must have a primary key on the data. If you\u00a0don\u2019t have primary keys, you are probably in a world of hurt anyway, so it\u00a0shouldn\u2019t be a stretch.<\/p>\n<p>One difficulty here is that we need to cope with an idiosyncracy of Cassandra, which is that by default, Cassandra orders fields in the \u2018tables\u2019 (really collections of key\/values) so that integers and numbers appear first in the table, and text appears last. This is an optimisation that Cassandra makes that complicates things for us, but only in a very small way. For the moment, we\u2019ll handle it by assuming that we are loading only one table with a known format into Cassandra. We could handle multiple tables by using a simple IF statement in the JS and using different formats for that, or we could actually extract the info from the incoming data; I\u2019m going to skip that because it keeps us away from the\u00a0cool element of actually getting the data in.<\/p>\n<p>Within Cassandra then we have two tables, the table we are loading data into, and the staging table that we load the CSV data into. For our sample, the live schema is\u00a0\u2018sample', the live table is\u00a0\u2018sample\u2019 and the staging table is\u00a0\u2018staging_sample\u2019.<\/p>\n<p>The definitions for these in Cassandra are for the sample live table:<\/p>\n<pre><code> CREATE TABLE sample (\r\n id int,\r\n message text,\r\n PRIMARY KEY (id)\r\n ) WITH\r\n bloom_filter_fp_chance=0.010000 AND\r\n caching='KEYS_ONLY' AND\r\n comment='' AND\r\n dclocal_read_repair_chance=0.000000 AND\r\n gc_grace_seconds=864000 AND\r\n index_interval=128 AND\r\n read_repair_chance=0.100000 AND\r\n replicate_on_write='true' AND\r\n populate_io_cache_on_flush='false' AND\r\n default_time_to_live=0 AND\r\n speculative_retry='99.0PERCENTILE' AND\r\n memtable_flush_period_in_ms=0 AND\r\n compaction={'class': 'SizeTieredCompactionStrategy'} AND\r\n compression={'sstable_compression': 'LZ4Compressor'};<\/code><\/pre>\n<p>And for the staging_sample table:<\/p>\n<pre><code>CREATE TABLE staging_sample (\r\n optype text,\r\n seqno int,\r\n fragno int,\r\n id int,\r\n message text,\r\n PRIMARY KEY (optype, seqno, fragno, id)\r\n ) WITH\r\n bloom_filter_fp_chance=0.010000 AND\r\n caching='KEYS_ONLY' AND\r\n comment='' AND\r\n dclocal_read_repair_chance=0.000000 AND\r\n gc_grace_seconds=864000 AND\r\n index_interval=128 AND\r\n read_repair_chance=0.100000 AND\r\n replicate_on_write='true' AND\r\n populate_io_cache_on_flush='false' AND\r\n default_time_to_live=0 AND\r\n speculative_retry='99.0PERCENTILE' AND\r\n memtable_flush_period_in_ms=0 AND\r\n compaction={'class': 'SizeTieredCompactionStrategy'} AND\r\n compression={'sstable_compression': 'LZ4Compressor'};<\/code><\/pre>\n<p>I\u2019ve put both tables into a\u00a0\u2018sample\u2019 collection.<\/p>\n<p>Remember that that\u00a0idiosyncrasy I mentioned? Here it is, a bare table loading from CSV will actually order the data as:<\/p>\n<pre><code>seqno,uniqno,id,optype,message<\/code><\/pre>\n<p>This is Cassandra\u2019s way of optimising integers over text to speed up lookups, but for us is a minor niggle. Right now, I\u2019m going to handle it by assuming we are replicating only one schema\/table and we we not what the structure of that looks like. Longer term, I want to pull it out of the metadata, but\u00a0that\u2019s a refinement.<\/p>\n<p>So let\u2019s start by having a look at the basic JS loader script, it\u2019s really the component that is going to handle the core element of the\u00a0work, managing the CSV files that come in from the batch engine and applying them into Cassandra. Remember, there are five functions that we can define, but for the purposes of this demonstration we\u2019re going to use only two of them, apply(), which will load the CSV file into Cassandra, and the commit() function, which will perform the steps to merge the stage data.<\/p>\n<p>The apply() function does two things, it identifies the\u00a0table and schema, and then runs the command to load this data into Cassandra through the cqlsh command-line tool. We\u00a0actually\u00a0can\u2019t run CQL directly from this command line, but I wrote a quick\u00a0shell script that pipes CQL from the command-line into a running cqlsh.<\/p>\n<p>The commit() function on the other hand is simpler, although it does a much more complicated job using another external script, this time written in Ruby.<\/p>\n<p>So this gives us a cassandra.js script for the batch applier that looks like this:<\/p>\n<pre>function apply(csvinfo)\r\n{\r\n   sqlParams = csvinfo.getSqlParameters();\r\n   csv_file = sqlParams.get(\"%%CSV_FILE%%\");\r\n   schema = csvinfo.schema;\r\n   table = csvinfo.table;\r\n  runtime.exec(\"\/opt\/continuent\/share\/applycqlsh.sh \" + schema + ' \"copy staging_' + table + \" (optype,seqno,uniqno,id,message) from '\" + csv_file + \"';\\\"\");\r\n}\r\n\r\nfunction commit()\r\n{\r\n  runtime.exec(\"\/opt\/continuent\/share\/merge.rb \" + schema);\r\n}<\/pre>\n<p>So, the apply() function is called for each event as written into the THL from the MySQL binary log, and the content of the CSV file generated at that\u00a0point contains the contents of the THL event; if it\u2019s one row, it\u2019s a one-row CSV file; if it\u2019s a statement or transaction that created 2000 rows, it\u2019s a 2000 row CSV file.<\/p>\n<p>The csvinfo object that is provided contains information about the batch file that is written, including, as you can see here, the schema and table names, and the sequence number. Note that we could, at this point, pull out table info, but we\u2019re going to concentrate on pulling a single table here just for demo purposes.<\/p>\n<p>The CQL for loading the CSV data is:<\/p>\n<pre><code>COPY staging_tablename (optype,seqno,uniqno,id,message) from\u00a0\u2018FILENAME\u2019;<\/code><\/pre>\n<p>This says, copy the the specific columns in this order from the file into the specified table. \u00a0As I mentioned, currently this is hard coded into the applier JS, but would be easy to handle for more complex schemas and structures.<\/p>\n<p>The commit() function is even simpler, because it just calls a script that will do the merging for us - we\u2019ll get to that in a minute.<\/p>\n<p>So here\u2019s the script that applies an arbitrary CQL statement into Cassandra:<\/p>\n<pre> #!\/bin\/bash\r\nSCHEMA=$1;shift\r\necho \"$*\" |cqlsh -k $SCHEMA tr-cassandra2<\/pre>\n<p>Really simple, but gets round a simple issue.<\/p>\n<p>The script that does the merge work is more complex; in other environments we might be able to do this all within SQL, but CQL is fairly limited with no sub-queries. So we do it long-hand using Ruby. The basic sequence is quite simple, and is in two phases:<\/p>\n<ol>\n<li>Delete every row mentioned in the staging table with an optype of D with a matching unique key<\/li>\n<li>Insert the *last* version of an insert for each unique ID - the last version will be the latest one in the output. We can pick this out by just iterating over every insert and picking the one with the highest Sequence number as generated by the THL transaction ID.<\/li>\n<li>Delete the content from the staging table because we\u2019ve finished with it. That empties the staging table ready for the next set of transactions.<\/li>\n<\/ol>\n<p>That file looks like this:<\/p>\n<pre>#!\/usr\/bin\/ruby\r\n\r\nrequire 'cql'\r\n\r\nclient = Cql::Client.connect(hosts: ['192.168.1.51'])\r\nclient.use('sample')\r\n\r\nrows = client.execute(\"SELECT id FROM staging_sample where optype = 'D'\")\r\n\r\ndeleteids = Array.new()\r\n\r\nrows.each do |row|\r\nputs \"Found ID #{row['id']} has to be deleted\"\r\ndeleteids.push(row['id'])\r\nend\r\n\r\ndeleteidlist = deleteids.join(\",\")\r\n\r\nclient.execute(\"delete from sample where id in (#{deleteidlist})\");\r\nputs(\"delete from sample where id in (#{deleteidlist})\");\r\nrows = client.execute(\"SELECT * FROM staging_sample where optype = 'I'\");\r\n\r\nupdateids = Hash.new()\r\nupdatedata = Hash.new()\r\n\r\nrows.each do |row|\r\nid = row['id']\r\nputs \"Found ID #{id} seq #{row['seqno']} has to be inserted\"\r\nif updateids[id]\r\nif updateids[id] &lt; row['seqno']\r\nupdateids[id] = row['seqno']\r\nrow.delete('seqno')\r\nrow.delete('fragno')\r\nrow.delete('optype')\r\nupdatedata[id] = row\r\nend\r\nelse\r\nupdateids[id] = row['seqno']\r\nrow.delete('seqno')\r\nrow.delete('fragno')\r\nrow.delete('optype')\r\nupdatedata[id] = row\r\nend\r\nend\r\n\r\nupdatedata.each do |rowid,rowdata|\r\nputs \"Should update #{rowdata['id']} with #{rowdata['message']}\"\r\ncollist = rowdata.keys.join(',')\r\ncolcount = rowdata.keys.length\r\nsubstbase = Array.new()\r\n#\u00a0 (1..colcount).each {substbase.push('?')}\r\nrowdata.values.each do |value|\r\nif value.is_a?(String)\r\nsubstbase.push(\"'\" + value.to_s + \"'\")\r\nelse\r\nsubstbase.push(value)\r\nend\r\nend\r\n\r\nsubstlist = substbase.join(',')\r\n\r\nputs('Column list: ',collist)\r\nputs('Subst list: ',substlist)\r\ncqlinsert = \"insert into sample (\"+collist+\") values (\"+substlist+\")\"\r\nputs(\"Statement: \" + cqlinsert)\r\nclient.execute(cqlinsert)\r\nend\r\n\r\nclient.execute(\"delete from staging_sample where optype in ('D','I')\")<\/pre>\n<p>Again, currently, this is hard coded, but I could easily of got the schema\/table name from the JS batch applier - the actual code is table agnostic and will work with any table.<\/p>\n<p>So, I\u2019ve setup two replicators - one uses the cassandra.js rather than hadoop.js but works the same way, and copied the applycqlsh.sh and merge.rb into \/opt\/continuent\/share.<\/p>\n<p>And we\u2019re ready to run. Let\u2019s try it:<\/p>\n<pre>mysql&gt; insert into sample values (0,'First Message\u2019);\r\nQuery OK, 1 row affected (0.01 sec)<\/pre>\n<p>We\u2019ve inserted one row. Let\u2019s go check Cassandra:<\/p>\n<pre>cqlsh:sample&gt; select * from sample;\r\n\r\nid\u00a0 | message\r\n-----+---------------\r\n489 | First Message<\/pre>\n<p>Woohoo - data from MySQL straight into Cassandra.<\/p>\n<p>Now let\u2019s try updating it:<\/p>\n<pre>mysql&gt; update sample set message = 'Updated Message' where id = 489;\r\nQuery OK, 1 row affected (0.01 sec)\r\nRows matched: 1\u00a0 Changed: 1\u00a0 Warnings: 0<\/pre>\n<p>And in Cassandra:<\/p>\n<pre>cqlsh:sample&gt; select * from sample;\r\n\r\nid\u00a0 | message\r\n-----+-----------------\r\n489 | Updated Message<\/pre>\n<p>Bigger woohoo. Not only am I loading data directly into Cassandra, but I can update it as well. Now I can have a stream of update and information within MySQL replicated over to Cassandra for whatever analysis or information that I need without any issues.<\/p>\n<p>Cool\u00a0huh? I certainly think so (OK, but I\u2019m biased).<\/p>\n<p>Now I haven\u2019t tested it, but this should just as easily work from Oracle; I\u2019ll be testing that and let you know.<\/p>\n<p>Any other database destinations people would like to see for replicating into? If so, let me know and I\u2019ll see what I can do.<\/p><br \/>  <a rel=\"nofollow\" href=\"http:\/\/feeds.wordpress.com\/1.0\/gocomments\/mcslp.wordpress.com\/10002\/\"><img alt=\"\" border=\"0\" src=\"http:\/\/feeds.wordpress.com\/1.0\/comments\/mcslp.wordpress.com\/10002\/\" \/><\/a> <img alt=\"\" border=\"0\" src=\"http:\/\/pixel.wp.com\/b.gif?host=mcslp.wordpress.com&#038;blog=164882&%23038;post=10002&%23038;subd=mcslp&%23038;ref=&%23038;feed=1\" width=\"1\" height=\"1\" \/>","protected":false},"excerpt":{"rendered":"<p>Earlier this month I blogged about our new Hadoop applier, I published the docs for that this week (http:\/\/docs.continuent.com\/tungsten-replicator-3.0\/deployment-hadoop.html) as part of the Tungsten Replicator 3.0 documentation (http:\/\/docs.continuent.com\/tungsten-replicator-3.0\/index.html). It contains some additional interesting nuggets that will appear in future blog &hellip; <a href=\"http:\/\/mcslp.wordpress.com\/2014\/02\/27\/real-time-replication-from-mysql-to-cassandra\/\">Continue reading <span>&rarr;<\/span><\/a><img loading=\"lazy\" decoding=\"async\" alt=\"\" border=\"0\" src=\"http:\/\/pixel.wp.com\/b.gif?host=mcslp.wordpress.com&amp;blog=164882&amp;post=10002&amp;subd=mcslp&amp;ref=&amp;feed=1\" width=\"1\" height=\"1\"\/><\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"closed","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[86,90,70,80,89,91],"tags":[42,57],"_links":{"self":[{"href":"https:\/\/planet.mcb.guru\/index.php?rest_route=\/wp\/v2\/posts\/6890"}],"collection":[{"href":"https:\/\/planet.mcb.guru\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/planet.mcb.guru\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/planet.mcb.guru\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/planet.mcb.guru\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=6890"}],"version-history":[{"count":4,"href":"https:\/\/planet.mcb.guru\/index.php?rest_route=\/wp\/v2\/posts\/6890\/revisions"}],"predecessor-version":[{"id":6968,"href":"https:\/\/planet.mcb.guru\/index.php?rest_route=\/wp\/v2\/posts\/6890\/revisions\/6968"}],"wp:attachment":[{"href":"https:\/\/planet.mcb.guru\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=6890"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/planet.mcb.guru\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=6890"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/planet.mcb.guru\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=6890"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}