Gaurav's Blog

return rand();

Systems Design: Twitter Search

| Comments

In the near future, might be posting about some systems I am reading about. Let’s start with Twitter Search.

EarlyBird Paper

We start by reading the Early Bird paper.

The paper starts with laying out core design principles. Low-latency and high-throughput are obvious. Ability to present real-time tweets is the unique requirement for Twitter at the time of the paper being written, i.e., new tweets should be immediately searchable. Regarding the second requirement, in the past search engines would crawl the web / index their documents periodically, and the indices were built via batch jobs through technologies such as MapReduce.

Since the time paper was authored (Fall 2011), this has changed. Both Google and Facebook surface real-time results in their search results and feed. But arguably a large fraction of Twitter’s core user intent is real-time content, so they have to get it right.

The core of the paper starts with going over the standard fan-out architecture for distributed systems, with replication & caching for distributing query evaluation and then aggregating results. Then they start to focus specifically on what goes on in a single node while evaluating the query.

For IR newbies: An inverted-index maintains something called ‘posting lists’. Consider them to be something like map<Term, vector<Document>> in C++, i.e., a map from a Term to a list of documents. If I am querying for the term beyonce, I’ll look up the posting list for this term, and the list of documents having that term would be present in the list.

Of course, there can be millions of documents with such a popular term, so there is usually a two-phase querying. In the first phase, we do a cheap evaluation on these documents. This is usually achieved by pre-computing some sort of quality score such as PageRank (which is independent of the query and searcher), keeping the documents in the list sorted in descending order according to this quality score. Then at query time, we get the top $N$ candidates from this vector.

Once we have the candidates, then we do a second phase, which involves a more expensive ranking on these candidates, to return a set of ranked results according to all the signals (query, searcher and document specific features) that we care for.

EarlyBird Overview

EarlyBird is based on Lucene (a Java-based open-source search engine). Each Tweet is assigned a static score on creation, and a resonance score (likes, retweets) which is live-updated. Upon querying, the static score, resonance score and the personalization score, which is computed according to the searcher’s social graph are used to rank the tweets.

At the time of the paper being written, they state the latency between tweet creation and it’s searchability was ~ 10s. Their posting lists store documents in chronological order, and at the time of querying, these documents are queried in reverse chrono order (most recent tweet first).

For querying, they re-use Lucene’s and, or etc. operators. Lucene also supports positional queries (you can ask Lucene to return documents which have term A and B, and both are at-most D distance away from each other in the document).

Index-Organization

EarlyBird seems to handle the problem of concurrent read-writes to the index shard by splitting the shard into ‘segments’. All segments but one are read-only. The mutable index continues to receive writes until it ‘fills up’, at which time it becomes immutable. This is analogous to the ‘memtable’ architecture of LSM trees. But I wonder if they do any sort of compactions on the segments. This is not clearly explained here.

Layout for Mutable (Unoptimzed) Index: Then they discuss the problem of how to add new tweets into posting lists. Their posting lists at the time, were supposed to return reverse-chrono results. So they don’t use any sort of document score to sort the results. Instead tweet timestamp is what they want for ordering.

Appending at the end of posting lists, doesn’t gel well with delta-encoding schemes, since they naturally work with forward traversal, and they would have to traverse backwards. Pre-pending at the beginning of the lists using naive methods such as linked lists would be unfriendly for the cache, and require additional memory footprint for the next pointers.

They fall-back to using arrays. The posting list is an array, with 32-bit integer values, where they reserve 24 bits for the document id, and 8 bits for the position of the term in the document. 24 bits is sufficient, because firstly they map global tweet ids, to a local document id in that posting list, secondly their upper limit of how many tweets can go in a segment is < $2^{23}$. Though, I might want to keep additional meta-data about a document, and not just position of the term, so this is a little too-specific for tweets at the time of the paper being authored.

They also keep pools of pre-allocated arrays, or slices (of sizes $2^1$, $2^4$, $2^7$ and $2^{11}$), similar to how a Buddy allocator works. When a posting list exhausts it’s allocated array (slice), they allocate another one which is 8x bigger, until you reach a size of $2^{11}$. There is some cleverness in linking together these slices. If you can get this linking to work, you would not have to do $O(N)$ copy of your data as you outgrow your current allocated slice.

Layout for Immutable (Optimized) Index: The approach of pools is obviously not always efficient. We can end up wasting ~ 50% of the space, if the number of documents for a term are pathologically chosen. In the optimized index, the authors have a concept of long and short posting lists. Short lists are the same as in the unoptimized index.

The long lists comprise of blocks of 256 bytes each. The first four bytes have the first posting uncompressed. The remaining bytes are used to store the document id delta from the previous entry, and the position of the term in a compressed form. I wonder why they don’t do this compression to the entire list, and why have compressed blocks? My guess is that compressing the entire list would be prohibit random access.

Concurrency: Most of the heavy-lifting of consistency within a specific posting list reader-writers is done by keeping a per-posting list value of the maximum document id encountered so far (maxDoc). Keeping this value as volatile in Java introduces a memory barrier. So that there is consistency without giving up too much performance.

Overall

The paper was very easy to read. I would have hoped that the authors would have described how the switching between immutable-to-mutable index happens, how is the index persisted to disk, etc., apart from addressing the rigid structure of meta-data in each posting list entry (just the term position).

Omnisearch and Improvements on EarlyBird

There are a couple of new posts about improvements on top of EarlyBird.

Introducing Omnisearch

This blogpost introduces Omnisearch. As I mentioned earlier, EarlyBird is strongly tied to the tweet’s content. In mature search systems, there are several “verticals”, which the infra needs to support. This blogpost describes how they are moving to a generic infrastructure which can be used to index media, tweets, users, moments, periscopes, etc.

Omnisearch Index Formats

Here is the blogpost, on this topic. It goes over what is mentioned in the paper before describing their contributions. They mostly work on the optimized index, as described earlier.

If a document has duplicate terms, it occurs that many times in the old posting list format. In the new format, they keep (document, count) pairs, instead of (document, position) pairs. They keep another table for positions. To further optimize, since most counts are 1, they store (document, count-1) pairs. They achieve a 2% space saving and 3% latency drop. I’m not entirely convinced why this improves both for tweet-text only index.

However, for indexing terms which are not present in the text (such as for user indices, where we want to keep a term for verified users) and hence the position does not make any sense. In that case, a separate position table makes sense, because we can completely skip the table in those cases.

Super-Root

Super-Root is another layer on top of Twitter’s index servers, which exposes a single API to customers, instead of having them query individual indices themselves.

Super-Root allows them to abstract away lower-level changes, add features like quota limitations, allow query optimization, and allow having thin clients. This is pretty essential when you start having a large number of customers.

Next Binary Permutation: Bitwise Hackery

| Comments

Here is a puzzle that is fairly standard:

Given an array of elements, find the lexicographically next permutation of that array.

As an example, if the array is [1, 2, 2, 3], the lexicographically next permutation would be [1, 3, 2, 2], followed by [3, 1, 2, 2] and so on. There is a really neat article explaining how to solve this. If you don’t know how to do it, I encourage you to try examples with array sized 4 and above, and try to see the patterns.

A recent problem I was solving was a variant of this.

Given an unsigned integer, find the lexicographically next unsigned integer, with the same number of bits set.

It’s trivial to observe that, we can reduce this problem to the above problem, as an array with just two kinds of elements, 0s and 1s.

Alright. Are we done? Not quite. The solution mentioned in the link is an $O(n)$ solution. Technically, an unsigned integer would be 32 or 64 bits. So $n$ would be one of those numbers for the purpose of the problem. It should be easy to repurpose the algorithm in the article mentioned above for our case. But I wanted to see if we can do it with as few operations as possible, because looping when operating with binary numbers is just not cool :-)

I chanced upon Sean Anderson’s amazing bitwise hacks page, and I distinctly remember having seen this 7-8 years ago, when I was in Mumbai. Regardless, if you understand his solution: Awesome! It took me some time to figure out and I wrote a slightly slower but arguably easier to comprehend solution, which is 50% slower than his in my micro-benchmark, but better than the naive looping solution. So here goes.

Example

Let’s pick an example: $10011100$. The next permutation would be $10100011$.

As per the article above, we find the longest increasing suffix from right-to-left (called as longest non-increasing suffix from left-to-right in the article). In this case, it will be $11100$. Thus, the example is made of two parts: $100.11100$ ($.$ for separating).

The first zero before the suffix is the ‘pivot’, further breaking down the example: $10.0.11100$.

We swap the rightmost one in the suffix, with this pivot (rightmost successor in the article). So the example becomes $10.1.11000$. However, the suffix part needs to be sorted, since so far we were permuting with the prefix $10.0.$, but this is the first permuation with the prefix $10.1.$, and therefore the suffix should be it’s own smallest permutation (basically, it should be sorted).

Hence, we move the zeroes in the suffix to the end, resulting in $10.1.00011$. This is the correct next permutation.

Solution

nextPermutation.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
uint64_t next(uint64_t n) {
  // Find the first unset bit (pivot).
  uint64_t pivotBit = __builtin_ctz((n | n - 1) + 1);

  // Set the first unset bit (pivot).
  uint64_t step1 = n | (1<<pivotBit);

  // Unset the first set bit (successor).
  uint64_t step2 = step1 & (step1 - 1);

  // Extract the part after the pivot.
  uint64_t suffixMask = (1 << pivotBit) - 1;
  uint64_t suffix = step2 & suffixMask;

  // Zero out the suffix, and only keep the 1's around.
  uint64_t final = (step2 & ~suffixMask) | (suffix >> __builtin_ctz(suffix));
  return final;
}

We’ll break it down. Trust me, it’s easier to grasp than the terse wisdom of Sean Anderson’s snippet.

Code-Walkthrough

1
2
// Find the first unset bit (pivot).
uint64_t pivotBit = __builtin_ctz((n | n - 1) + 1);

To find the pivot in the example $n = 10011100$, we set all the bits in the suffix to 1 first.

$n-1$ will set the trailing zeroes to 1, i.e., $10011011$, and $n | n-1$ would result in a value with all original bits set, along with the trailing zeroes set to 1, i.e., $10011111$. Thus, all the bits in the suffix are now set.

We now set the pivot bit to 1 (and unset the entire suffix), by adding 1. Using __builtin_ctz we can then find the number of trailing zeroes, which is the same as the bit number of the pivot. See the note below for __builtin functions.

1
2
// Set the first unset bit (pivot).
uint64_t step1 = n | (1<<pivotBit);

We then proceed to set the pivot. Since the value of $n$ was $10011100$, $step1 = 10111100$.

1
2
// Unset the first set bit (successor).
uint64_t step2 = step1 & (step1 - 1);

Now we need to unset the successor, which we can do by a trick similar to how we found the pivot. step1 - 1 would unset the lowest significant set bit (the successor) and set all it’s trailing zeroes (i.e., $10111011$). step1 & (step1 - 1) i.e., $10111100 \& 10111011$ would lead to zero-ing out of the successor bit and the trailing zeroes. Hence, $step2 = 10111000$.

1
2
3
// Extract the part after the pivot.
uint64_t suffixMask = (1 << pivotBit) - 1;
uint64_t suffix = step2 & suffixMask;

This is fairly straightforward, we extract the suffix mask, i.e., all the bits corresponding to the suffix part are set, and then $\&$ with the number $10.1.11000$ so far gives us the modified suffix, i.e. $11000$.

1
2
uint64_t final = (suffix >> __builtin_ctz(suffix)) | (step2 & ~suffixMask);
return final;

All we need to do now is to pull the 1s to the left in the suffix. This is done by left-shifting them by the number of trailing zeroes, so we get $00011$ (the first part of the calculation of final). This is our ‘sorted’ suffix we mentioned earlier.

Then we OR it with the number so far, but except the suffix part zero-ed out, so that we replace the unsorted suffix with this sorted suffix, i.e. $00011 | 10100000 \implies 10100011$.

I hope this helped you breakdown what’s going on, and probably served as a bridge between the naive solution and one-liner bitwise hacks.

Please leave any comments below. I’d be super happy to discuss any alternative solutions!

Appendix: __builtin Functions

Whenever you are working with bitwise operations, gcc provides built-in methods such as __builtin_popcount (number of set bits in the argument), __builtin_ctz (number of trailing zeroes in the argument), and so on. These functions map to fast hardware instructions, and are also concise to write in code, so I use them whenever I can.

Sharding Databases: A (Bunch of) Quick Trick(s)

| Comments

One of the problems with serving databases is horizontal scalability (i.e., being able to add machines in the cluster), and load balance the read/write loads.

Naive Solution

A naive way to rebalance traffic is to assign a part of the key-space to each machine. For an object $o$, with key $k$, the machine serving the object can be found out by $h$($k$) $\%$ $n$. Where $h$ is a hash-function, such as SHA or MD5.

Advantages

  • A simple function to compute where each key goes.

Problems

  • $n$ is fixed, and the clients need to be informed when you update $n$.
  • If you add/remove machines, you need to move all your existing data, so that the hash function works. If you have $K$ keys, you would incur $O$($K$) transfers.

Consistent Hashing

Consistent Hashing is an optimization on the naive solution, in that it avoids the need to copy the entire dataset. Each key is projected as a point on the edge of a unit circle. Every node in the cluster is assigned a point on the edge of the same circle. Then each key is served by the node closest to it on the circle’s edge.

When a machine is added or removed from the cluster, each machine gives up / takes up a small number of keys. This rebalance happens in such a fashion that only $O$($\frac{K}{n}$) transfers happen. Where $n$ is the number of machines in the cluster.

Advantages

  • Minimal number of data transfer between machines. Has been proven to be optimal.

Problems

  • The clients need to know not just the number of nodes, but also the location of each node on the circle.
  • Calculating which key goes to which machine, is slightly more comples, but still $O$(1).

Note that in both the above methods, when you are adding or removing machines, there is some amount of shutdown involved. In the first case, we need to completely turn-off reads and writes because the cluster is going through a complete rebalance. In the second case, we can just turn-off reads and writes for a fraction of the keyspace which is $\frac{1}{n}$ as compared to the first solution.

Pinterest’s Sharding Trick

Pinterest in it’s blogpost about MySQL sharding talks about a setup where they use the key itself as a marker of which shard the key belongs to. When doing a write for the first time on the object $o$, we generate a key for it, in which we keep the higher $x$ bits reserved for the shard the object belongs to. The next time, there is a request for a read, we use those $x$ bits to find which shard should be queried.

Each shard is located on a certain machine, and the shard->machine map is kept in ZooKeeper (a good place to read & write small configs in a consistent fashion). Upon finding the shard, we lookup this map to locate the machine to which the request needs to be made.

When new machines are added, we simply create more shards, and start utilizing those new shards for populating the bits corresponding to the shards. This way, new writes and the reads correspodning to those writes dont hit the existing machines.

I’m going to refer to this as the “Pinterest trick”, because I read it on their blog. Pretty sure, this is not the first time it’s being done though.

Advantages

  • There is no copying whatsoever. Once you add new machines, they start receiving new data, provided you tell the system generating the keys about new machines.

Disadvantages

  • Reads and writes are not balanced to begin with. Newer machines will start with 0 traffic, and slowly ramp up.
  • There is an intermediary lookup involved for the shard->machine mapping.
  • The key itself is modified. This might be okay for some setups, though.

Sharding Trick Deux

Another trick that some setups apply is to have the key-space sufficiently pre-sharded to begin with. Then these shards are simply moved to other machines, if their current hosts can’t serve them, as traffic increases. For MySQL, each shard is a separate database instance. We used a similar approach when operating HBase at FB, where we expected the need to add more machines in future.

Why are we sharding though?

Discussing with Dhruv, brought up an interesting point: Why are we sharding a database? Sure, we want to scale horizontally. But which resource are we running out of? CPU, Disk, Network?

Scaling for CPU

The above tricks that we discussed, scale for disk. Note that, in the case of the Pinterest trick, new shards don’t proportionately help with serving existing read queries. For most Social Networks, the amount of data being created outpaces consumption, and they are bound on disk, rather than CPU.

If you would be bound on CPU, there are several ways to move your shards to not-so-hot machines, depending on which tradeoff you would like to make:

  • Setup Master-Slave replication, with the target machine as the slave. At some point, cut off writes to the existing master, replay from MySQL’s binlog / HBase’s WAL to cover the remaining delta.
  • Setup Master-Master replication (this is more expensive, since an eager M-M replication incurs additional latency), and at some point turn on exclusive writes to the target node. ZooKeeper or something similar can handle this switch. Writes need not be turned off.
  • In HBase, because of the distributed file system underneath, it was possible to “open the region” at one of the three machines which would have a copy of the shard (thanks to the 3x HDFS replication), without incurring the cost to copy the shard data. But this requires turning off the writes.

Comments?

I wrote a lot of this from a high-level knowledge, and discussing with people who have worked on these kind of things. I might have omitted something, or wrote something that is plainly incorrect. Moreover, this is an open topic, with no “right answers” that apply to all. If you have any comments about this topic, please feel free to share in the comments section below.