CAP Theorem

If you’re talking or thinking about distributed data systems these days, you are almost certain to come across some discussion of the CAP theorem.  This is one of those beautifully simplistic ideas that helps explain something extraordinarily complex.

So, what does it mean?

CAP stands for Consistency, Availability, and Partition tolerance.  The theorem simply states that any shared-data system can only achieve two of these three.

Consistency

Consistency describes how and whether a system is left in a consistent state after an operation. In a distributed data system, this usually means that once a writer has written, all readers will see that write.

A distributed data system is either strongly consistent or has some form of weak consistency.  The most well known example of strong consistency in databases is ACID (Atomicity Consistency Isolation Durability), used in most relational databases.  On the other end of the spectrum is BASE (Basically Available Soft-state Eventual consistency).

Most often, weak consistency comes in the form of eventual consistency which means the database eventually reaches a consistent state.  Weak consistency systems are usually ones where data is replicated; the latest version of something is sitting on some node in the cluster, but older versions are still out there on other nodes, but eventually all nodes will see the latest version.

If you are interested in learning more, Werner Vogels, CTO of Amazon, has two good blog posts on eventual consistency here and here.

Availability

High-Availability refers to the design and implementation of a system such that it is ensured to remain operational over some period of time.

In this context, it generally means a system that is tolerant of node failures and can also remain available during software and hardware upgrades.  This is perhaps the simplest to understand and most commonly desired property, yet can be quite difficult to achieve to any level of certainty.

Partition Tolerance

Partition tolerance refers to the ability for a system to continue to operate in the presence of a network partitions.  For example, if I have a database running on 80 nodes across 2 racks and the interconnect between the racks is lost, my database is now partitioned.  If the system is tolerant of it, then the database will still be able to perform read and write operations while partitioned.  If not, often times the cluster is completely unusable or is read-only.

Who came up with it?

In July 2000, Dr. Eric Brewer of Berkeley gave a talk, Toward Robust Distributed Systems.

In it, he talks about the many trade-offs between ACID and BASE systems.  He explains that it should be thought of not as one or the other, but rather as a continuous spectrum.  As a useful principle, he then introduced the CAP Theorem.

Why now?

Distributed data systems are increasingly becoming a hot area of research and development.  Before the Internet and the web, there were not many companies dealing with terabyte or petabyte datasets.  With the explosion of content and information from websites, blogs, and social networks, more and more businesses are now trying to store, analyze, and serve massive amounts of data.  And they need to be able to perform massive batch operations on it while also serving it up to clients in near real-time.

These companies each have their own requirements: performance, reliability, durability; ACID, BASE, or somewhere in between.

Real World Example

In November 2006, Google released a paper, BigTable: A Distributed Storage System for Structured Data describing a distributed, column-oriented database that sat on top of the distributed Google File System.

In October 2007, Amazon released their own paper, Dynamo: Amazon’s highly available Key-value Store describing a distributed key-value database designed and in-use at Amazon.

What makes these two products a great example is that they are modern designs and implementations of distributed, shared data systems but with two different philosophies regarding CAP.

BigTable is a CA system; it is strongly consistent and highly available, but can be unavailable under network partitions.  BigTable has no replication at the database level, rather replication is handled underneath by GFS.

Dynamo is an AP system; it is highly available, even under network partitions, but eventually consistent.  Data is replicated within a single cluster, so even under partitions most data is available, however one node’s latest version might not match that of another, so every reader is only guaranteed to see every write eventually.

CAP at Streamy

First and foremost, Streamy is backed by HBase, an open-source implementation of Google’s BigTable.  As such, the core of our database systems are strongly consistent and highly available.  We are not overly concerned with network partitions as our clusters are all within a single data center and connected via local gigabit switches.  Once we expand to additional data centers, we plan to employ inter-cluster replication, with each cluster located in a single DC.  Remote replication will introduce some eventual consistency into the system, but each cluster will continue to be strongly consistent.

In addition to HBase, we also have a number of additional data systems that are responsible for indexing, sorting, merging, aggregating, and joining our data.  Some of these systems could be considered distributed or replicated, meaning there are multiple instances on multiple nodes and they talk to each other.  Since none are actually persistent (we do not rely on their state or their ability to save data under faults, that’s what HBase is for), we are most heavily focused on high availability and ensuring a read-your-writes consistency (a special form of eventual consistency).

Without going into more detail, it can be said that we employ both CA and AP systems here at Streamy.  The focus is not on fundamentally which is “better” but rather what the requirements are for that particular application and what the expectation is for our users.  The most important thing to avoid is a user who performs an action but then is unable to see that action immediately, which is why we often enforce a read-your-write consistency when we do need to relax our constraints.

, , , , , , , , , , , , , , , , , ,

4 Comments

HUG7: HBase User Group Wrap-Up

The biggest ever HBase User Group went off this past Friday, August 7 at StumbleUpon HQ in San Francisco.  We had over 30 attendees, ranging from new users to committers, and four presentations. Big thanks to everyone for coming out, and especially to StumbleUpon for hosting us.

Jonathan Gray (me) covered the new features, architecture, and API of the latest HBase 0.20 release, Michael Stack introduced the proposed features for 0.21, Ryan Rawson led a great practical discussion from his experiences using HBase at StumbleUpon, and finally Bradford Stephens gave an interesting talk on building an analytics framework atop Hadoop and HBase.

To read about what’s planned for the next release of HBase, 0.21, check out the HBase 0.21 Roadmap, notes from the HBase Hackathon Day One Wrap-Up, or issues in JIRA.

HBase 0.20 Introduction

-

Practical HBase at StumbleUpon

-

Hadoop Analytics - A DB for 80% of Big Data sites

-
-
-

JG

, , , , , , , ,

3 Comments

HBase Hackathon 2 Wrap-Up: Day One

The second HBase Hackathon took place this weekend at the StumbleUpon headquarters in San Francisco.  Big thanks to them for hosting us and feeding us.

The primary goal of this Hackathon was to roadmap, design, and hack on new features for the next release of HBase, 0.21.  The major targets are a Master rewrite, expanded use of Zookeeper, and cross-cluster replication.

Here are notes from Day One:

HBase Hackathon Notes

Saturday, August 8, 2009

-

· Data Integrity

o WAL

§ HADOOP-4379/HDFS-200 needs testing on large clusters

o HBfsck (HBASE-7)

§ Sequence IDs

· Change to stamps?

· Must be unique

· Fix for merges

§ Start/end keys match

· Figures out / runs merges

§ Repair references

§ Two modes

· Quick mode scans META

· Full mode reads HFiles and verifies fully

-

· Master Rewrite (HBASE-1110)

o HCD + HTD

§ Schema -> JSON

§ Stored in ZK

o JNI ZK Client – Nitay

§ Fix ZK being a black box

§ Not kicked out from GC pauses

§ Only responsible for ephemeral, other stuff still Java API

§ Monitor existence of JVM RS process

o Master is “powerless”

§ Should be written as though its ok we are offline for minutes

o Region assignment to ZK

§ Consensus to definitely do this

§ Master verify assignment state periodically?

o META information in ZK? (HBASE-1755)

§ Replacement for getClosestRowBefore

§ /hbase/TABLE/region

§ Goal: 10k regions * 10k clients

o Load balancing

§ RS publish load statistics to ZK

§ Master makes all decisions

§ Zookeeper coordination

o Get rid of heartbeats (HBASE-1502)

§ Master uses RPC?

§ Zookeeper coordination / messaging instead?

-

· Gets -> Scan

o Timestamp collisions, known issues here (HBASE-1485)

o Bloom filters?

-

· Replication (HBASE-1295)

o Sharded counters possible?

o Log shipping initially

o Separate process alongside HRS

§ Who does he talk to on the other cluster?

§ Coordination with shared ZK?

o When network partition

§ Buffer in memory, then buffer on hdfs

· How big can it get? Hours, days, years?

§ Eventually must fall back on snapshot or full table replay?

-

· Snapshot Backup Mechanism

o Flush, stop compactions

o Local copy first

o Should be per table

o Utilize distcp

o Can be used to initialize a slave replicated cluster

-

· New RPC

o Punted to 0.22

-

· Unit test speedup (HBASE-1556)

o HTableInterface, HRSInterface, HRInterface, etc

o Assigned to: Stack

-

· New UI

o Punt with redirect

o JG building something that taps into ZK and can publish JSON/XML/etc

§ Used for nagios plugin also being built

-

· Behind API Upload Process

o Patch exists, need to test HBASE-48

o Does not currently support multi-family or importing into table with data

-

· Cascading

-

· Distributed log splitting (HBASE-1364)

o Stripped down version of MR?

o Use ZK to map each HLog to which regions it contains edits for?

o We want to open any regions during a recovery that do not have edits immediately

o JD leading on the design

-

· Intra-row Scanning (HBASE-1537)

o We need it, but it’s hard :(

-

Thanks again to everyone at StumbleUpon and to all the committers and contributors for the good ideas and the good times!

JG

1 Comment

HOWTO: Determine the size of a Java Object or Class

Finding out the size (memory/heap usage) of Java Objects and Classes can be somewhat tricky as there is no built-in sizeof()-like functionality.  Luckily most people don’t have to face this problem.  But when you do, it can be difficult and in the end is almost always a best-guess approximation.

The reason that I started to look in to this subject was to build a memory-aware, LRU eviction cache for the open source database project HBase.  You can see the final result of that effort here.

When sizing a Class you can take two approaches.  One is to use the methods totalMemory and freeMemory from the Java Runtime class. The other strategy is to analyze the internal structure of your Class.  The first approach has the benefit that it gives you the actual memory footprint of your object rather than the expected one, but has the downside of relying on the garbage collector to do it’s thing.  So for this to work you often have to make repeated calls to the GC and sleep in between to give it time to process and minimize heap usage.  As one can see, this is not a solution suitable to be used in real time system, but can be a solid solution that you go back to when everything else fails, or just to verify your findings.

The second way of sizing your Class is to dissect it into it’s fundamental building blocks.  Let’s start by introducing the notion of a reference, known in other languages as a pointer.  A reference, or REF for short, is 4 bytes on a 32 bit system and 8 bytes on a 64 bit system.  Also note, the total size of an Object will always be word-aligned (4 byte aligned on 32bit, 8 byte aligned on 64bit).

Every instantiated Object in Java has a fixed overhead of 2 REFs.  To that, you add the contributions from all the non-static variables/fields contained within the Object.  For the most basic primitives, these sizes are:

byte = 1 byte, int = 4 bytes, long = 8 bytes

For example, let’s define and size the Class PrimitiveContainer

public class PrimitiveContainer {
  public int x = 1;
  public long y = 2;
}

The calculation for this would be:

overhead + sizeof(int) + sizeof(long) = (2 * REF) + 4 + 8 = 28 bytes on 64-bit, after word-alignment we get 32 bytes

Another example that uses Objects instead of primitives:

public class ObjectContainer {
  public Integer x = 1;
  public Long y = 2;
}

Since this Object contains references to other Objects (not primitives), we will calculate and include both the size of the referenced Objects (Integer and Long) as well as the references to those Objects (in ObjectContainer).  Both Integer and Long will have the fixed Object overhead of 2 * REF in addition to either an int or long primitive in each.  So:

sizeof(Integer) = (2 * REF) + 4 = 20
sizeof(Long) = (2 * REF) + 8 = 24

The total calculation would be:

overhead + reference to Integer + sizeof(Integer) + reference to Long + sizeof(Long) = (2 * REF) + (REF + 20) + (REF + 24) = 80 bytes

As you can see that there is a significant increase in memory usage between using primitives when compared to their Object counterparts, in this case 2.5X larger.

Arrays in Java, though underneath are really Objects, are unlike other complex Classes (Lists, Maps, etc) because they can be used with primtives directly.  Through experimentation and instrumentation of the JVM, the fixed overhead of any array is 3 * REF.

So with this in mind you can treat it as a regular object that aligns with it’s daughter objects.

For example if you have:

byte [] bs = {1,2,3,4};

on a 32 bit system, you get a total size of : 3 * REF + 4 = 3 * 4 + 4 = 16 bytes
on a 64 bit system, you get a total size of:  3 * REF + 4 = 3 * 8 + 4 = 28 which aligns to 32 bytes (twice the memory usage in this case)

One more thing that is worth mentioning is the cost for static and final variables. Usually static variables are not taken into consideration when sizing you object, since this is only done once and not for each instantiated Object.  Final variables should almost always be included, but of course it depends on your use case and the reason that you are sizing your objects.

, , , , , , ,

9 Comments

Streamy @ Hadoop Summit: HBase Goes Realtime

The Hadoop Summit was a great success this year and I had a ton of fun giving the presentation on HBase in front of a standing-room-only crowd. Videos from the conference are now available online from Yahoo here.

Jean-Daniel Cryans and I presented on the (any day now) HBase 0.20.0 release.

Check out the slides and video from HBase Goes Realtime below…


HBase Goes Realtime

, , , , ,

No Comments

Streamy at the Hadoop Summit

Along with Jean-Daniel Cryans, I will be giving a presentation at the upcoming Hadoop Summit titled “HBase Goes Realtime”.  We will be discussing the major architectural changes that have led to significant increases in performance across the board in the upcoming 0.20 version of HBase.  In addition, there are a number of other improvements to the API and fault-tolerance that will be discussed.

The Hadoop Summit is taking place on June 10th from 8AM to 9PM at the Santa Clara Marriott in Santa Clara, CA, USA.  You can order tickets for $100 here.

On June 9th, HBase developers will be meeting at 3PM in San Francisco.  If you are interested in attending, drop me a line.  We’ll then be taking the party bus down to the Marriott at 7PM for ScaleCamp, presented by our friends at Scale Unlimited.  You can order FREE tickets here.

Finally… A big shout out to Priya Narasimhan, one of my favorite professors from CMU that I reconnected with after learning she is also presenting at the Hadoop Summit.  Looking forward to collaborating with her and CMU soon!

, , , , , , ,

1 Comment

Debugging FastCGI and Lighttpd with spawn-fcgi and fcgi-debug

The web server of choice here at Streamy is Lighttpd. Written in C, it’s designed for performance, efficiency, security and concurrency. In addition to being one of the best performing http daemons for serving static content, it’s integration with FastCGI makes it simple to turn applications in almost any language into a CGI script with little or no code modification.  The communication between your script’s process and lighttpd is done with unix sockets or via tcp, making it easy to load-balance and have scripts that run on remote nodes.

Lighttpd makes it very easy to launch your cgi processes automatically without having to worry much about the interface.  However, debugging them like this can be arduous, if even possible.  When you want to get serious about developing fastcgi apps, you’ll need to run them yourself using spawn-fcgi and can then debug them using fcgi-debug.  spawn-fcgi is a way to separate the launching of the web server from the launching of the cgi proccess(es).  fcgi-debug is a nifty program that sits in between the web server and your script that allows you to see everything that’s going on.

A HUGE thanks to stbuehler who not only develops and maintains both of these applications but also spent time helping me debug a nasty issue we were having at Streamy.  IRC (and for developers, Freenode) is still one of the most powerful social networks out there! :)

Let’s imagine you have a compiled binary FastCGI program called myscript and you want to debug it.  At first, a simple lighttpd configuration for it might look like this:

fastcgi.server ( "/myscript" => ( "myscript" => (
  "socket" => "/home/user/myscript.sock",
  "bin-path" => "/home/user/myscript",
  "min-procs" => 1,
  "max-procs" => 1
)))

Full configuration information for mod_fastcgi is available here

The first step towards debugging is to change from Lighttpd spawning the process to using spawn-fcgi.  There are more detailed instructions available here and here, but in short what you will do is modify your lighty config to just point to the socket.  Remove bin-path and all process/load settings, otherwise you will run into conflicts.

fastcgi.server ( "/myscript" => ( "myscript" => (
  "socket" => "/home/user/myscript.sock"
)))

If we were not interested in debugging the interface/communications between lighttpd and fastcgi, we would spawn the script like this:

spawn-fcgi -s /home/user/myscript.sock -C 0 -F 1 -n -- /home/user/myscript

Adding fcgi-debug in the middle is as easy as calling it first rather than the script:

spawn-fcgi -s /home/user/myscript.sock -C 0 -F 1 -n -- fcgi-debug /home/user/myscript

You should now see lots of debug information directly to stdout.

You should also ensure that you have fastcgi debug turned on in your lighttpd logs. This is done with the setting:

server.errorlog = "/home/user/lighttpd_error.log"
fastcgi.debug = 1
If using fcgi-debug, you are required to use spawn-fcgi 1.6.2 rather than the one packaged with your lighttpd installation. spawn-fcgi 1.4.22 will run but will not show any debug output.

, , , , , , , , , , ,

No Comments

HBase 101: Row key design for paging (LIMIT, OFFSET) queries

Paging is a very common use-case for web sites and many other applications.  In relational databases, this is easily implemented with LIMIT and OFFSET, or by selecting the row number in the query and adding conditionals based on it’s value.  HBase 0.19.x, on the other hand, does not provide any queries or filters that support paging directly. After a quick example using SQL, I will show how to implement the same functionality in HBase.

Let’s assume that we have a large number of users.  Each user has performed a number of actions.  Each action has a unique identifier, a timestamp, and a name.

This is how you might get the third page of an individual users’ actions using SQL:

SELECT id, name, stamp FROM actions WHERE userid = 1
ORDER BY stamp DESC LIMIT 10 OFFSET 20;

This utilizes secondary indexes on both userid and stamp, meaning to accomplish this query you need at least three indexes on this table as id is the primary key.  Though a simple query to write, you will run into problems as the actions table grows to millions of rows and beyond.  Insertions would look like:

INSERT INTO actions (id, userid, name, stamp) VALUES (newid(), 1, 'Joe User', epoch());

HBase has no real indexes.  Rows are stored in sorted order, and columns in a family are sorted.  For more information, read the HBase Architecture page on the HBase Wiki.

Very conscious of the primary queries we will run on user-actions, we will design an HBase table to support paging queries on per-user, time-ordered lists of actions.

We will use the Java Client API for HBase, specifically the HTable class.  What we are looking for are two methods:

public static List<Action> getUserActions(int userid, int offset, int limit)
public static void putUserAction(Action action)

Please note, I am using a custom object, Action, for simplicity.  It is a client-side holder for the four action fields (id, userid, name, stamp).

There are a number of ways to store your data in HBase that will allow the getUserActions query, but in this case we will go with a very tall table design (lots of rows with few columns in them) rather than wide (lots of columns in each row).  Specifically, the difference here would be whether you have a row-per-action or a row-per-user.  We will do a row-per-action, but will be designing our row key (the primary key) to be a composite key to allow for grouping and sorting of actions, rather than just the action id.  This means we will not have random-access to an action by it’s id, so rather than defining this as the actions table (which might also exist if you needed actionid random access) we will define it as the useractions table, and we will only store a single column in a single family, content:name.

The row key that we will use in our HBase useractions table is:

<userid><reverse_order_stamp><actionid>

It’s very important that each of these fields is fixed-length and binary so that the lexicographical/ascending byte-ordering of HBase will properly sort our rows.

The userid field will be a 4 byte, big endian integer.  reverse_order_stamp is an 8 byte, big endian long with a value of (Long.MAX_VALUE - epoch).  This is so the most recent stamp is at the top rather than the bottom.  actionid is another 4 byte, big endian integer.  Thankfully, HBase provides helpful utilties in the org.apache.hadoop.hbase.util.Bytes class to deal with this (unfortunately it lacked some key features in 0.19, so the code below makes use of the Bytes class available in 0.20/TRUNK).  Before we get into HBase code, let’s define the helper methods makeActionRow and readActionRow to deal with the composite key:

public static byte [] makeActionRow(int userid, long stamp, int actionid)
throws Exception {
  byte [] useridBytes = Bytes.toBytes(userid);
  byte [] stampBytes = Bytes.toBytes(stamp);
  byte [] actionidBytes = Bytes.toBytes(actionid);
  return Bytes.add(useridBytes, stampBytes, actionidBytes);
}
 
public static Action readActionRow(byte [] row)
throws Exception {
  // Bytes.toInt(byte [] buf, int offset, int length)
  int userid = Bytes.toInt(row,0,4);
  long stamp = Long.MAX_VALUE - Bytes.toLong(row,4,8);
  int actionid = Bytes.toInt(row,12,4);
  return new Action(userid,stamp,actionid);
}

Now that we can deal with the composite keys, insertion is very straightforward:

public static void putUserAction(Action action) throws Exception {
  // Get the fields from the Action object
  int userid = action.getUserID();
  long stamp = Long.MAX_VALUE - action.getStamp();
  int actionid = action.getID();
  String name = action.getName();
 
  // Build the composite row, column, and value
  byte [] row = makeActionRow(userid,stamp,actionid);
  byte [] column = Bytes.toBytes("content:name");
  byte [] value = Bytes.toBytes(name);
 
  // Insert to HBase
  HTable ht = new HTable("useractions");
  BatchUpdate bu = new BatchUpdate(row);
  bu.put(column,value)
  ht.commit(bu);
}

We just serialize the fields into the composite row, and write the single column to HBase in a BatchUpdate.  Reading will deal with unserializing the fields and Scanners.  In addition to matching for the content:name column, we will also specify a startRow and stopRow so that the Scanner only returns results from the user we are looking at.  This way we do not have to worry about jumping to the next user in our code, the Scanner will just stop.

public static List<Action> getUserActions(int userid, int offset, int limit)
throws Exception {
  // Initialize counter and List to return
  int count = 0;
  List<Action> actions = new ArrayList<Action>(limit);
 
  // Initialize startRow, stopRow, and columns to match
  byte [] startRow = makeActionRow(userid,0,0);
  byte [] stopRow = makeActionRow(userid,Long.MAX_VALUE,Integer.MAX_VALUE);
  byte [][] columns = {Bytes.toBytes("content:name")};
 
  // Open Scanner
  HTable ht = new HTable("useractions");
  Scanner s = ht.getScanner(columns,startRow,stopRow);
  RowResult res = null;
 
  // Iterate over Scanner
  while((res = s.next()) != null) {
    // Check if past offset
    if(++count <= offset) continue;
 
    // Get data from RowResult
    byte [] row = res.getRow();
    byte [] value = res.get(columns[0]).getValue();
 
    // Build Action
    Action action = readActionRow(row);
    String name = Bytes.toString(value);
    action.setName(name);
    actions.add(action);
 
    // Check limit
    if(count == offset + limit) break;
  }
  // Cleanup and return
  s.close();
  return actions;
}

The storage of your data must be tied to how you need to query it.  Without a sophisticated query engine or indexing capabilities, you must design to take advantage of sorted rows and columns, potentially designing a table per query type.  Denormalization is okay!

In my next posts, I will show more interesting ways to use HBase for persisted dictionary/keyval/Object storage and directly address secondary indexing with HBase.

Disclaimer: The code in these examples is designed to illustrate the practical use of HBase.  While the design is sound, the code itself may not optimized for performance.

, , , , , , , , , , , , , , , ,

No Comments

Web Scale

Development at Streamy is always done with the mindset of “will it scale” in the back of our minds.

Generally speaking, scalability deals with the ability for a software system to handle increasing load when given additional resources.  Increased load could mean more concurrency, a larger data set, or increased complexity.  Additional resources refers to hardware and either scaling vertically (upgrading a single node) or scaling horizontally (adding additional nodes).  While vertical scale is important in terms of squeezing all the performance you can out of each node, and rapidly dropping hardware prices means even cheap nodes are powerful, the key to achieving true scalability is the ability to horizontally scale, or distribute.

Distributed systems are becoming more and more mainstream as the web has flourished.  Search engines index billions of web pages and social networks support millions of concurrent users.  Content continues to evolve from being static, to dynamic, to the current emphasis on personalization and customization.  The adoption of AJAX and COMET have further increased requirements for concurrency.  And all of this must be highly-available and low-latency (sub-second).

This is what I call Web Scale.

So what exactly is it?

It’s a new set of technical requirements borne out of Web 2.0, and the move towards distributed systems and cloud computing as the solution.  It encompasses all the different aspects of today’s web applications: the data, the storage, the caches, the web servers, the communications, the realtime queries, the batch queries, and everything in between.  It marks a departure from the vertical scaling of relational databases and web servers as the solution to scale towards a new world of horizontal scalability: distributed hash tables, consistent hashing, column-orientation, horizontal partitioning, eventual consistency, elastic computing, and every other buzzword you can think of.

Who has solved it?

Google. They deserve a great deal of credit for being the first to really achieve web scale.  Long before web sites really considered their architecture as part of their competitive advantage, Google embraced the notion and invented their own solutions.  They subsequently published a number of papers describing their efforts:  The Google File System, MapReduce, and BigTable.

Amazon.  The enormous amount of work that was done in order to achieve scalability for the world’s premier e-commerce site is obvious; one need only look at their extensive and fast-growing elastic storage and computing services like S3 and EC2.  An e-commerce site becoming a service provider?  There’s only so much cost-savings a good architecture can give an e-commerce site, so it only makes sense that they try to profit from their proprietary systems.  The CTO of Amazon, Werner Vogels, has an excellent blog AllThingsDistributed.  Read through some of his posts and you can quickly see that Amazon is a company that understands scaling, distribution, and the right way to go about design and engineering in today’s Web Scale world.

Who struggles with it?

Facebook.   Today’s prime example for what happens when you don’t build for scale early on: the only short-term solution to rapid growth is to throw money at the problem.  Utilizing both horizontal and vertical scale, Facebook has enormous clusters of MySQL and Memcached to deal with storing user data and serving user queries.  As reported nearly a year ago, they already had close to 2,000 MySQL boxes and 1,000 Memcached boxes in addition to their 10,000 web servers.

They have been playing catch-up ever since, slowly developing (and open-sourcing in some cases) distributed systems to deal with their enormous scale.  Though almost always plagued by the lack of community and direct support from Facebook engineers, they have some very interesting projects including Cassandra and Hive.  More recently, they seem to finally have solved their photo storage cost issues with Haystack, saving them from needing to buy an additional $2M+ server every month just to keep up.  The difference in architectures is well described in Niall Kennedy’s post.

Twitter.  Considering how well known the fail whale is, it’s clear to most that Twitter has been plagued by slow responses and downtime, even to this day.  Though extremely simple in its requirements, the personalized views, emphasis on search, and massive use of the API by developers creates huge amounts of load for the microblogging (or is it nanoblogging?) service.

FriendFeed.  A company led by ex-Googlers, and to my knowledge without major technical issues, seems to be going down a path of scaling that seems clunky and backwards.  Generally speaking, scaling a database means letting go of some of the traditional restrictions, first things like normalization and secondary indexes, and then more significantly by relaxing ACID-compliance or adding eventual consistency.  As outlined in a blog post by one of their founders, Bret Taylor, their attempt at a schema-less storage system atop MySQL seems to be a good idea and good effort gone wrong.

When what you’re after is schema-less storage, and the need for partitioning/distribution, why would you base it on a system completely tied to schemas and full-blown transactions?  You’re bringing with you all the things you don’t want, at the expense of performance and flexibility, because they “trust” MySQL and are already familiar with it.  Good reasons, no doubt, but the whole thing appears misguided.  In any case, I bet that it works and performance is acceptable.  But it’s not always about finding a solution that works.  Flexibility, simplicity, and of course, additional scalability, are also important and something so confusing to do something so simple just ask for you to not want to touch it once it works :)

Note:  This is not to say that these companies are “doing it wrong”.  I point out these examples because they are cases of costs gone out of control, continued performance and uptime issues, approaches I personally would not recommend, etc.  MySQL + Memcached is certainly part of the Web Scale tool box and in a great deal of use cases is satisfactory.  For more information on relational databases and how they compare to something like HBase, check out my presentation on Hadoop and HBase vs RDBMS.

So, how does Streamy solve it?

Stay tuned!  This will be the topic of an upcoming series of posts over the next few weeks.

, , , , , , , , , , , , ,

6 Comments

HBase Hackathon Wrap-up

HBase contributors came together last weekend for the first ever HBase Hackathon here at Streamy HQ in Manhattan Beach, California.  In attendance were most of the HBase committers, guys from Sun and StumbleUpon… nearly 20 developers in total.  There are photos posted on the Meetup Page (for members only).  If anyone else who attended has pictures please post links in the comments!

We spent a great deal of time discussing the new features set for the 0.20 release of HBase.  You can follow all the issues slated for 0.20 here.  It’s been a few days and so much has already come out of the weekend so I thought I’d post a quick follow-up to share some of the cool stuff being worked on now.

Cascading Support for HBase
Chris Wensel, of Concurrent Inc., has successfully implemented the first version of HBase adapters for Cascading.  Streamy devs are really looking forward to refactoring some of their MapReduce jobs for Cascading!  We will report back soon.

HBase New File Format
We have hit a performance wall in HBase with the Hadoop MapFile format.  It was never intended for a random-access read pattern which is really the primary purpose of HBase.  Based on the hard work by guys over at Yahoo on the TFile binary file format, work is well under way on a new HBase-specific file format, currently being called HFile.  Michael Stack of Powerset and Ryan Rawson of StumbleUpon are leading the effort.

The emphasis is on speed and efficiency.  By switching to a block-based segmenting/indexing of the file, we can have predictable memory usage and an ideal abstraction for caching.  Once in memory, we can use something like Java’s NIO ByteBuffers to allow high numbers of concurrent scanners with minimal memory copying.  Remember, even random-access reads require scanning.  The new format also supports meta blocks for additional indexes, bloom filters, meta data and anything else we want to add in the future.

Cell Caching
Streamy’s own Erik Holstad is wrapping up testing, benchmarking and optimizing the new Cell Cache.  We’re seeing a 5-10X improvement in random-access speed when serving out of the cache.  A big part of this feature is implementing a memory-aware LRU in Java.  Since Java will not tell you the size of an Object in memory, we have had to hack our way around through profiling and some tools we’ve built to determine sizes.  More on this in a later post.

Zookeeper Integration
Jean-Daniel Cryans and Nitay Joffe have made leaps and bounds with Zookeper integration into HBase.  Initially designed to remove the single point of failure, discussions at the Hackathon opened the door to future improvements such as configuration management and even to eventually distribute master functionality and eliminate the HMaster all together.  They have already committed 5 issues to 0.20 trunk.  You can follow their progress here.

Datanode Network I/O Improvements
Andrew Purtell at Trend Micro is working on a Hadoop issue that creates a big headache for the users of HBase.  We keep a large numbers of files open at a time and since it is currently implemented using a thread-per-connection model, we end up with thousands of idle threads and having to keep increasing the total number of receivers.  I’m not sure where this currently stands.

My Random Contributions
In addition to voicing my opinion and contributing to design and decision making, I’m currently working on a number of small issues:  a binary key range splitting algorithm, the ability to run more than one mapper per region or to specify start and stop rows for MR jobs sourcing from HBase, and a number of benchmarking tools to evaluate all the new stuff.

All in all, it was a terrific weekend.  The weather was absolutely perfect and it was as friendly and smart a group as I could have imagined.  Thanks to everyone who came, especially those who made the trip down from Norcal and JD who came from Canada to defrost for a bit in the Cali sun.  Summer is coming soon, stay tuned for the next beach city hackathon :)

, , , , , , , , , , , ,

No Comments