Friday, March 27, 2015

Cassandra anti-patterns webinar: Video and Q&A

Last week I gave a webinar on avoiding anti-patterns in Cassandra. It was good fun to do and prepare and if you look through my blog most of the sections have a dedicated post.

Here is the recording:



We got a lot of questions and didn't get to them in the recording so catching up now. If I have missed yours or you think of more then ping me on twitter: @chbatey


Q: When is DSE going to support UDTs?

DSE 4.7 will include a certified version of Cassandra 2.1, sometime in the next few months.

Q: Can you alter a UDT?

Yes see here: http://www.datastax.com/dev/blog/cql-in-2-1

Q: with denormalized data, how do you handle a store name change or staff name change?

First make sure you need the update, when modelling data immutably that is not often the case. If you need to change a small number of rows I'd do it with a small script/program, large number of rows Apache Spark.

Q: I had the idea that C* 2.x has vector clock, am I wrong? 

No Vector clocks in Cassandra, see http://www.datastax.com/dev/blog/why-cassandra-doesnt-need-vector-clocks
Q: Using the event source model with frequent rollups, would that not generate a 'queueing' style anti-pattern if data from previous rollup period then gets deleted?

If you used the same partition and did range queries, yes. But I would use a partition say per day (or what ever the period is that you didn't have rolled up), thus avoiding ever reading over the deleted data.

Q: How would you do the "roll ups" in the account balance calculation example?

Most cases I'd do it in application for the first query that required it. It doesn't matter if two threads get to it first as they can both calculate it and the write to the roll up table would be idempotent. If the rollup calculation takes too long and you don't want to slow down a user request with it then you can schedule it in your app or by a different process.

Q: Why would you not use counters for balance?

Cassandra counters are more for things like statistics, page views etc. You can't update them atomically and they are slower to update then a pure write.

Q: C = Quorum?

Coordinator

Q: How might you go about modeling the "versioning" of time series data so as to avoid updates? I mean where you write a measurement for a particular timestamp and then later on you need to write a new measurement for the same timestamp.

Use a TimeUUID rather than a Timestamp. Then you can have millions per millisecond.

Q: If I perform an "if not exist" write and it fails to reach enough replicas, what state can I expect the data to be in? In other words, can I expect the data to not be written to the cluster?

So assuming it for past the if not exists part (for that you'll get applied = false in the response. Then it is like any write. Cassandra will return how many replicas acked the write. You can't be sure that the rest didn't get it as they may have just not have responded.

Q: I'm wondering if Cassandra could be used to implement distributed locks (Like Redis, Zookeeper)?

You can with LWTs, here are the details: http://www.datastax.com/dev/blog/consensus-on-cassandra

Q: In order to emulate a queue without falling on this anti-pattern, can I use the new Date Time Compaction Strategy and TTL?

Answered at the end of the recording

Q: And we have 24 table per date. After day we create one table on date and drop table per hour. Is it anti patern.

Moving table is like moving partition, it does avoid the anti-pattern but it is a lot of work.

Q: Why not change the tombstome grace period to delete quickly?

You can, but then you need to keep up with repairs which may not be possible.

Q: What would the use case for using Cassandra in a queueing pattern vs. a traditional message oriented middleware?

People typically try and use Cassandra as as queue when they already have it in their infrastructure and they need to get messages from one DC to another. This is when they fall into the anti-pattern.

Q: For the Queue anti-pattern, the > timeuid clause will help on fetch, what about compaction/jvm issues; any recommendations or comments?

Nothing specifically, the best discussion of Cassandra JVM tuning for GC that I have read is here: https://issues.apache.org/jira/browse/CASSANDRA-8150

Q: There are times where data simply cannot be written simultaneously and therefore must be joined at a later time. What do you recommend for joining needs? An external tool such as Spark SQL or ?

Answered at the end of the recording.

Q: Probably one of the best Webinars. Example, were really great. Appreciate DataStax arranging for this. Thanks.

Okay okay this wasn't a question :)

Q: Will quorum reads of a partially-successful counter update get the latest info?

Depends on the number of replicas the write for to and at what consistency. You'll get back in the WriteTimeoutException how many acked the write. If it is a QURORUM (e.g 2 if RF = 3) then it will read it, otherwise you don't know.

Q: Can you point to a good read for retry, no rollback?

On failure modes: http://www.datastax.com/dev/blog/cassandra-error-handling-done-right

Q: How would I go about solving limit offset queries, without having to skip rows programmatically, for example taking a simple page 2 customer table?

Just make sure you have a clustering column and start the next limit query from the last result from the previous query.

Q: You said Cassandra does not do a rollback. Is that true for all cases -- are there any instance where Cassandra would do a rollback?

Not as far as I know.

Q: I missed the beginning. Are UNLOGGED batches OK to use to speed up writes? See: http://christopher-batey.blogspot.com/2015/02/cassandra-anti-pattern-misuse-of.html

Q: Great presentation. Regarding the secondary index question, the second one should be much more faster, as it hits the primary key, yes?

Yes, so it only needs to go to a small section of the secondary index table as it knows which node the partition is on.

Q: which is the best pattern for timeseries

This depends on the type of time series, quantity/frequency. What you basically want is partitions that don't grow too large, so in the millions, not hundreds of millions and the use of a TimeUUID as the clustering column.

Q: Are the batch execution started in separate threads when using the the batch optimization?

They will be sent off in parallel, I don't know the threading model here but I imagine they are split on one thread and sent aync. A good question for the cassandra devs who hangout in #cassandra on freenode.

Q: What approach can be taken with dse, which is C* 2.0 and doesn't have UDT's?

You can just have a lot of columns! The next DSE version will be 2.1

Q: Using a time bucket is a way to also prevent the rows from growing too wide (I.e. many millions of columns). Any guidance for the recommended tradeoffs between wide rows with slice queries and more narrow rows and some multi-partition queries?

There is rarely a general rule for Cassandra, it is all about your data set and read/write frequency. However in general I do my best to keep all reads from a single partition and go out of my way to keep it at most 2. If you have a very high ingest rate and you read for long periods this can get hard and you may need to go to more partitions.

Q: Do the same rules apply to batch loading when using SSTableLoader and/or the BulkOutputFormat with Hadoop?

I've never used the BulkOutputFormat with Hadoop. For the SSTableLoader. For the sstableloader command, once you have generated the SS tables then it handles the importing.

Q: is BatchType.LOGGED the default for a BatchStatement?

Yes

Q: do we have any ORM framworks for datastax cassandra

The DataStax Java/C# driver now have it built in, there is also the less popular SpringData

Q: What if you have constraint to write data in table only if it is different (by different I meant different by all properties which can be 5-10)?

If you want to write this at a high throughput then I would resolve it at read time as otherwise you'll be doing a read then write which has a lot of race conditions and it a lot slower. IF you include a TimeUUID and write all updates you can then work it out at read time.


Q: Do tombstones get created with data inserted with a TTL and automatically deleted when expired?

Yes it generated a tombstone. For immutable timeseries data the new DateTieredCompaction strategy makes deleting this data a lot more efficient.


Q: Can you go explain a bit more about the de-normalization solution to secondary indexes.

Write the same data but with a partition key as staff ID and the time as the clustering column. This means you can go to a single partition and do a range query. Even a secondary index with a partition key in the query is worse than this as it has to go to the secondary index table and then do a multi partition query in the original table keyed by customer id.

Q: Does the removal of a secondary index cause a performance hit during the delete? Assuming you aren't using the index for any queries

Don't know about this one, I've asked around and will update once i get an answer.

Q: Question about secondary indexes vs inverted indexes...is inverted superior to secondary? Will global indexes replace inverted indexes?

By inverted I am assuming you mean manually inserting data twice with a different primary key. This will always out perform secondary index as you're storing all the customer events for a staff member on one node and sequentially on disk. For global indexes we'll have to wait and see but that is the idea. The only concern I have is you can specialise the double write to exactly what you want (e.g bucket up staff members or not) where as global indexes will have to be a more general solution.

Q: Using the default token split on adding a node in 1.2.x, what issues/symptoms will I experience if I continue to use this method with low numbers of nodes?

I assume you're talking about vnodes as without them you pick the token split. The allocation of tokens with vnodes is well discussed here: https://issues.apache.org/jira/browse/CASSANDRA-7032

Tuesday, March 17, 2015

Using Gradle as a poor man's Cassandra schema management tool

I work across a desktop and two laptops so reproducible builds mean a lot to me! I often slate Gradle for being buggy and not doing the simple things well (e.g. dependency management for local development).

However it is awesome when you want a quick bit of build logic. I wanted to build my schema for a Cassandra application I am working on to keep my various machines up to date.

So easy in an extensible system like Gradle. I already had my schema creation commands in src/main/resources/schema/tables.cql

I then added a built script dependency to my build.gradle:


Then added a few imports and a couple of nifty tasks:


Of course this relies on one CQL command per line and isn't exactly liquabase but not bad for 10 minutes hacking.

Lots of these hacks can lead to very ugly build scripts so be careful :)

Monday, March 16, 2015

Pushing metrics to Graphite from a Spring Boot Cassandra application

If you're going down the microservice rabbit whole using frameworks like Spring Boot and Dropwizard it is imperative you can monitor what is going on, part of that is pushing metrics to some type of metrics system.

The last set of applications I built used Graphite for this purpose, and fortunately the DataStax Java driver stores lots of interesting metrics using the brilliant dropwizard metrics library.

Here's what it takes to get the Cassandra metrics and your custom metrics from a Spring boot application into Graphite.

This article assumes you know how to use the DataStax Cassandra driver and the Dropwizard metrics library and you're familiar with tools like Maven and Gradle. If you don't go read up on those first.

First let's get the Cassandra driver and metrics libraries on our classapth, here is my example using Gradle:


I've included the Actuator from Spring boot as well.

Assuming you have a bean that is your Cassandra Session add a bean to expose the MetricRegistry and to create a GraphiteReporter:

Here I have a graphite server running on 192.168.10.120. If you don't want to install Graphite to try this out I have a Vagrant VM on my GitHub which launches Graphtie + Graphana.

If we had the Cluster as a bean rather than the Session we'd have injected that. We've now set it up so that all the metrics the DataStax Java driver records will be published to Graphite every 30 seconds.

Now we can plot all kinds of graphs:



For instance we can plot request times, number of errors, number of requests, etc. This becomes even more powerful when you are deploying multiple versions of your application and you pre-fix each instance with a identifier such as its IP.

Adding our own metrics with annotations


The next step is to add more metrics, as the ones in the DataStax library aren't very fine grained, for example we might want to time particular queries, or look at our response times.

You can do this manually but it is easier with annotations. We can do this with the Metric-Spring project. This project integrates Spring AOP with drop wizard metrics.

However it is quite fiddly to get working as we now have three libraries that want to create a MetricRegistry: SpringBoot, Cassandra Driver and Metric-Spring.

To get everyone to use the Cassandra driver's MetricRegistry we need to create a MetricsConfigurerAdapter:

The reason we're injecting the Session is we can no longer register a bean for the MetricRegistry as Spring-Metric does this and we don't want to end up with two. To get this to work we have to remove the metricRegistry bean from the code above. The other thing we do is add the EnableMetric annotation to our Application class:


Once all this is done we can annotate our public methods with @Timed like this:


Then in Graphite we can see them, their name is derived from the fully qualified method name.


So now our Spring Boot application has Cassandra metrics and our own custom application metrics all pushing to Graphite!

The whole application is on GitHub if you want the full Spring config and dependencies.

Thursday, March 12, 2015

Cassandra schema migrations made easy with Apache Spark

By far the most common question I get asked when talking about Cassandra is once you've denormalised based on your queries what happens if you were wrong or a new requirement comes in that requires a new type of query.

First I always check that it is a real requirement to be able to have this new functionality on old data. If that's not the case, and often it isn't, then you can just start double/triple writing into the new table.

However if you truly need to have the new functionality on old data then Spark can come to the rescue. The first step is to still double write. We can then backfill using Spark. The awesome thing is that nearly all writes in Cassandra are idempotent, so when we backfill we don't need to worry about inserting data that was already inserted via the new write process.

Let's see an example. Suppose you were storing customer events so you know what they are up to. At first you want to query by customer/time so you end up with following table:


Then the requirement comes in to be able to look for events by staff member. My reaction a couple of years ago would have been something like this:


However if you have Spark workers on each of your Cassandra nodes then this is not an issue.

Assuming you want to a new table keyed by staff_id and have modified your application to double write you do the back fill with Spark. Here's the new table:


Then open up a Spark-shell (or submit a job) with the Spark-Cassandra connector on the classpath and all you'll need is something like this:


How can a few lines do so much! If you're in a shell obviously you don't even need to create a SparkContext. What will happen here is the Spark workers will process the partitions on a Cassandra node that owns the data for the customer table (original table) and insert it back into Cassandra locally. Cassandra will then handle the replication to the correct nodes for the staff table.

This is the least network traffic you could hope to achieve. Any solution that you write your self with Java/Python/Shell will involve pulling the data back to your application and pushing it to a new node, which will then need to replicate it for the new table.

You won't want to do this at a peak time as this will HAMMER you Cassandra cluster as Spark is going to do this quickly. If you have a small DC for just running the Spark jobs and let it asynchronously replicate to your operational DC this is less of a concern.

Wednesday, March 11, 2015

Cassandra anti-pattern: Logged batches

I've previously blogged about other anti-patterns:
  1. Distributed joins
  2. Unlogged batches
This post is similar to the unlogged batches post but is instead about logged batches.

We'll again go through an example Java application.

The good news is that the common misuse is virtually the same as the last article on unlogged batches, so you know what not to do. The bad news is if you do happen to misuse them it is even worse!

Let's see why. Logged batches are used to ensure that all the statements will eventually succeed. Cassandra achieves this by first writing all the statements to a batch log. That batch log is replicated to two other nodes in case the coordinator fails. If the coordinator fails then another replica for the batch log will take over.

Now that sounds like a lot of work. So if you try to use logged batches as a performance improvement then you'll be very disappointed! For a logged batch with 8 insert statements (equally distributed) in a 8 node cluster it will look something like this:


The coordinator has to do a lot more work than any other node in the cluster. Where if we were to just do them as regular inserts we'd be looking like this:


A nice even workload.

So when would you want to use logged batches?


Short answer: consistent denormalisation. In most cases you won't want to use them, they are a performance hit. However for some tables where you have denormalised you can decide to make sure that both statements succeed. Lets go back to our customer event table from the previous post but also add a customer events by staff id table:


We could insert into this table in a logged batch to ensure that we don't end up with events in one table and not the other. The code for this would look like this:


This would mean both inserts would end up in the batch log and be guaranteed to eventually succeed.

The downside is this adds more work and complexity to our write operations. Logged batches have two opportunities to fail:
  1. When writing to the batch log
  2. When applying the actual statements
Let's forget about reads as they aren't destructive and concentrate on writes. If the first phase fails Cassandra returns a WriteTimeoutException with write type of BATCH_LOG. This you'll need to retry if you want your inserts to take place. 

If the second phase fails you'll get a WriteTimeoutException with the write type of BATCH. This means it made it to the batch log so that they will get replayed eventually. If you definitely need to read the writes you would read at SERIAL, meaning any committed batches would be replayed first.

Conclusion


Logged batches are rarely to be used, they add complexity if you try to read at SERIAL after failure and they are a performance hit. If you are going to use them it is in the odd situation where you can't handle inconsistencies between tables. They allow you to guarantee the updates will eventually happen, they do not however offer isolation i.e a client can see part of the batch before it is finished.