Saturday, 16 June 2007

Implementing Tag Cloud - The Nasty Way (Part 2)

In the previous part of this article I've started the discussion of the tag cloud construction problem. Let's make a little recollection.

We found out that a relational persistence model is not adequate enough for this purpose due to the huge computational overhead it creates. Then we came to the conclusion that tag cloud should not be calculated each time it is requested. Instead it could be kept between requests as a hash table persisted in the binary stream. Unfortunately this simple solution led us to another problem - we need to adjust the hash table each time someone changes tags of any entity in the system. The problem was escalated further after it had become obvious that we need to keep counters for all tags in the system (not just a few hundred tags from the tag cloud). If we had focused on the cloud tags only we would lose the possibility of adding new tags which became popular to the cloud as well as removing ones which lost their popularity. We would have no idea which of two tags is more popular without usage information about at least one of them.

In this part I will present an algorithm effective enough to cope with the issues mentioned above.

The picture below describes the physical architecture I will use to demonstrate the concept (click on the picture to enlarge).


The architecture presented consists of two clusters: web cluster and data storage cluster. The web cluster is built in the following way:
  1. the shared-nothing approach is used: each server of the cluster is completely independent of its neighbors, can operate even if it is the only server in the cluster and it has the standard predefined configuration and the applications installed;
  2. servers in the web cluster are capable of communicating with each other by means of group communication software;
  3. all logic are performed by web servers and mostly driven by incoming web requests – there are no separate application servers in the system;
  4. there is a load balancer distributing web requests among web servers based on some simple rule (for example randomly with uniform distribution or in a round-robin fashion).
The organization of data storage cluster is not important until it is reliable and provides consistent data to the web cluster. But we have to define the informational schema to store all kind of data we got so far:
  1. all tags together with their usage counters,
  2. the binary representation of the tag cloud hash table,
  3. some additional information which will be required by the algorithm itself.
We will suppose that our data storage is a relational database (or a cluster of relational databases) as it is the most common approach. Please take a look at the picture below.


As you can see there are three tables in the schema:

1. "tag" – contains the list of all tags stored in the system. The table can have millions of rows especially if you have a multilingual site. The table has three columns:
  • id – an integer representing the identifier of the tag in the system,
  • name – the tag itself,
  • count – the total number of tag usages.
2. "tag_list_version" – contains all versions of the tag list which is represented by the “tag” table. Each time the “tag” table is changed a new row should be added to the “tag_list_version” table indicating that a new version of tag list was created. The table has four columns:
  • version – an integer representing an increasing counter for tag list changes,
  • tag_cloud – a binary field which contains a binary representation of the tag cloud hash table which corresponds to the version of the tag list,
  • created – date and time of the version creation.
3. "tag_increment" – contains all increments ever applied to the tag list. Each new version of the tag list is formed from the previous one by applying several tag list increments. A tag list increment (hereinafter increment) is an algorithm specific data structure which is similar to the tag cloud. It is also a hash table of <tag, usage counter> pairs with the only difference – counters can be negative. The negative value means that during the period when the increment was collected a concrete tag was removed more times than added. I will tell more about increments below. Here is the columns:
  • id – an integer representing the identifier of the increment,
  • increment – a binary field which contains a binary representation of increment hast table,
  • created – date and time of the increment persistence,
  • version – the version of the tag list which was formed by applying a set of increments (including this one) to the preceding version in a single step.
Okay, we covered the minimum required to start the algorithm description Let’s move on and pick up some logical reasoning.

Reason 1. Why don’t you give me a KISS?!

According to the point 3 of the web cluster description given above – web servers are the only type of servers which perform any actions with tags except permanent storage. These actions are usually a direct response to appropriate user actions such as adding or removing tags of an entity. It looks like we can make each web server to update the tag cloud each time a tags operation is requested. Isn’t it a good and simple option? For example the user updates a set of tags on a photo - the server performs the direct user request and … immediately updates the tag cloud as well. Everything is excellent – tag cloud is always up-to-date plus no full recalculation is ever required. Aren’t we done? Unfortunately no. There is one significant drawback – performance. Do you remember that we have millions of users registered in our system? The tags operations are frequent. They are performed not only when users work with tags directly modifying them but also when they add and delete entities. It doesn’t sound reasonable to add a relatively slow procedure to frequent operations. The tag cloud update involves a few time-consuming activities:
  1. Network transportation. The tag cloud isn’t small – it has entire tags plus usage counters plus the table of hash values and indexes.
  2. Database updates. The tag list must be updated. The tag cloud is updated if necessary.
  3. Synchronization. Multiple servers compete for the possibility to update the same data structures – it makes the overall system throughput slower.
Conclusion: the solution is possible but it is far from being optimal under the serious load.

Reason 2. Is laziness infectious?

Let’s try to improve the previous case. What if we wouldn’t update the tag cloud immediately in the response to any user action but collect the updates for some time and apply all of them at once later. It will definitely lighten the load on the system but fail the constant up-to-date state of the tag cloud. Do we really need the tag cloud to be up-to-date all the time?

As I wrote in the previous part of this article the tag cloud changes slowly. It will hardly ever be changed with each request. And even in case if the tag cloud should have been changed but it didn’t the user is not able to detect this fact. The user neither has an idea about what the other users do nor has the possibility to influence the data strongly enough himself. The only way to betray the fact of delayed tag cloud updates is to provide both tags and tags counters to the user. So we just discovered one of the reasons why popular web applications seldom does that :). Tags of the tag cloud are usually presented arranged relatively to each other by popularity – there are no absolute values.

Okay, taking into account all said above - delaying the tag cloud update sounds like a good idea. How can we implement it? We need to accumulate changes and store them somehow till the tag cloud update is performed. As you might suspect it is the primary responsibility of that new structure called tag list increment. The increment is a simple hash table holding all tags either added or removed between two tag cloud updates. A counter which corresponds to each tag in the table is initially set to zero. It is incremented each time the tag is added and decremented each time the tag is removed. Each web server has its own increment and keeps it in the memory. As you can see we still have to count tags but do it with almost no overhead using efficient in-memory structure.

The last opened question is when to start the procedure of tag cloud update. It is obvious that the more different tags we have in the increment the longer the update procedure will be. The procedure consists of merging the increment into the “tag” table and subsequent adjusting of the tag cloud based on tags usage information obtained during merging. In other words if we collected 5,000 different tags in the increment we have to perform 5,000 INSERT/UPDATE statements. This operation seems to be long and (this is the most important!) it is expensive from the database point of view.

But the situation is not bad, right? Can we just monitor increments and force the tag cloud update to start as soon as the increment has some reasonable amount of tags (for example 100)? Unfortunately we can’t. We cannot predict the simultaneous behavior of users at any point of time in the future. The tags increment on a concrete web server can grow to the size of 100 tags in 10 minutes or in 10 milliseconds depending on the current load and its nature. The process is entirely driven by users.

We have another option – we could spoonfeed the data storage sending statements in turn and making pauses after each one. If we don’t use a single transaction for the whole process we can significantly decrease the load on the database increasing the overall update time. Unfortunately this trick doesn’t help us either because we have a web cluster and there might be a lot of web servers (for example www.myspaces.com installation has more than 1000 web servers). If each server of the cluster spoonfeeds the database with its own increment for a relatively long period of time they will interfere with each other and bring the database to its knees.
Anyway, it looks like we got two options. Which one do you prefer?
  1. A lot of servers sending a bunch of 5000 SQL statements each from time to time.
  2. A lot of servers sending 100 SQL statements each per unit of time for a longer period of time with much higher probability of interfering.
To telling the truth both of them don’t look pretty. Let’s try something else.

Conclusion: we got rid of additional overhead per each tags operation. But we still have a problem – increments are big and merging is expensive. The things become worse if we increase the amount of servers in the web cluster.

Reason 3. Divide et impera!

Don’t worry – it is the final one :). Let’s take a look at the conclusion of the previous reason. We decided that increments can become big quickly and there is not much we can do about that. But a big increment is not a problem itself. The problem is in its merging. Every server has to merge its own tag increment as soon as it overgrew a certain threshold. It would be much better if only one server will be responsible for merging. It would solve the problem of competition and we could spoonfeed the database as long as we need. So what’s the problem? Let’s just select a single web server and make others send their increments to this one instead of merging. The chosen server should collect a few increments and commit all of them into the database at the same time. Of course the chosen server has its own tags increment as well and it will send it to itself as others do. What does the improvement we made give us?

Benefits:
  1. There is no competition for the database resources.
  2. The total amount of merges became lower as now both tags and increments are aggregated.
  3. Merging is more efficient as increments usually have a lot of tags in common. If we merge all increments into a single one before committing to the database we will save a lot of database operations.
Drawbacks:
  1. The tag cloud update is delayed more. But it is still not a problem (as a matter of fact it is just a question of the configuration – we can manipulate all thresholds to get the optimal variant).
  2. The update procedure became longer as we have more tags to merge (tags which are different in collected tag increments). And this is also okay. The merging can take longer now and we can spoonfeed (see the first point of the benefits).
  3. We created a single point of failure. If the chosen server goes down – we are in a serious trouble. The tag collection is stopped at all and we lose increments until the server is back online.
So here it is! All that’s left is to fix the “single point of failure” drawback. I’d like to admit that we don’t need the precise values of tags counters to build the tag cloud if we have a lot of users. That’s why we could sometimes afford to lose an entire increment. And that’s why we don’t speak about single points of failure discussing reason 2 – any web server can and will go down from time to time losing the tags from its current increment which is not persisted yet.

But a failure of the centralized collection is much more serious. To prevent that we have to add some redundancy. It would be good to have two or three servers selected for increments collection (let’s name those servers "collectors"). Using group communication system we can create a separate communication group especially for collectors. Each collector must register itself in this group at startup. All web servers (also called as "emitters") must send increments to the group instead of direct communication with servers. It means that all collectors in the group will get all increments. Now it is much harder to lose an increment in case of failure as any collector has a copy of it.

Let's decide how the group of collectors should actually work. It is obvious that:
  1. Only one of the collectors can merge at any point of time in order not to start the resources competition again.
  2. Any increment must be committed only once. Other copies must be discarded as soon as the increment is committed to prevent duplication.
  3. All collectors must share (or have access to) the full information about merge operations done so far. Due to the possibility of failures and subsequent restarts collectors won't be in sync from the point of view of increments they had, have or will have.
    • Let's look at the example. Consider an increment sent by an emitter and delivered to two out of three collectors. Because of network malfunction the increment is delayed and has not reached the third server yet. Meantime one of the two servers has met the threshold and started merging. The network delay is big enough and the merge was finished before the increment has been finally delivered to the last server. As a result the increment is obsolete on arrival and must be discarded at once.
All these points are addressed in the final algorithm below. Please pay special attention to them. The correct implementation is crucial for the stability of the system.


THE ALGORITHM

state STARTED:

event - Spontaneously_started:

communication_groups.enter("collectors");
tag_list_version = database.tag_list.get_last_version();
increment_identifiers =
database.tag_list.get_increment_identifiers(tag_list_version);
processed_increment_identifiers.clear();
processed_increment_identifiers.add(increment_identifiers);
become(PROCESSING);

state PROCESSING:

event - Receiving_tag_increment:

increment = network.receive();
if(!processed_increment_identifiers.contains(increment.id))
then
pending_increments.add(increment);
if(pending_increments.count() > threshold)
then
increments_list = pending_increments.remove_first_n(threshold);
call asynchronously Merge_increments(increments_list);
become(PROCESSING);

procedure - Merge_increments(increments_list):

lock = lock_service.acquire_application_wide_lock("collectors");
last_version = database.tag_list.get_last_version();
if(tag_list_version == last_version)
then
tag_list_version = tag_list_version + 1;
transaction = database.begin_transaction();
database.tag_list.insert_new_version(tag_list_version);
database.tag_list.insert_increment_identifiers(
tag_list_version, increments_list
);
single_increment = merge_all_in_one(increments_list);
single_increment =
database.tag_list.merge_with(single_increment);
tag_cloud = database.tag_list.get_cloud(tag_list_version - 1);
tag_cloud.merge_with(single_increment);
database.tag_list.insert_tag_cloud(
tag_list_version, tag_cloud
);
transaction.commit();
communication_groups.
send_notification_to("emitters", "tag list is updated");
else
versions = database.tag_list.
get_all_versions_greater(tag_list_version);
foreach(version in versions)
do
identifiers =
database.tag_list.get_increment_identifiers(version);
pending_increments.remove_by_identifiers_if_any(identifiers);
processed_increment_identifiers.add(identifiers);
tag_list_version = database.tag_list.get_last_version();
lock.release();

I suppose it could be named as pseudo code :). Tell me if I am wrong and the code is not self-explanatory or even unreadable. I hope after the long discussion we had you are more than capable of understanding the main ideas behind this algorithm. I will not scan it row by row (at least for now) and let you think it over.

You are welcome to ask questions and any comments are highly appreciated as usual.

Talk to you soon. Happy thinking.

Friday, 15 June 2007

Implementing Tag Cloud - The Nasty Way (Part 1)

I am going to write about Tag Cloud. I am pretty sure all of you know what it is. I can't say that the majority of Web2.0 applications really need it. But if you think this feature will make your application better - it is definitely necessary to know a little bit about its implementation.
Yes, I know that I am cheating here. The way of implementation described in this post is probably not the best one. Firstly it is for big guys only. Secondly it can be significantly improved.
Don't try this method unless you have a lot of time and millions of users :). You've been warned!

Let's start from a few simple questions. Why to discuss Tag Cloud at all? It seems like a common feature and it shouldn't be tricky to implement. But have you ever think about how the most noble applications (such as www.flickr.com - yep, I'm a worshiper of this product) construct Tag Cloud and keep it up-to-date taking into account the huge amount of data moving forth and back?

Here are just two problems you could unfold trying to find the answer to this question (in fact there is much more to discuss but it could take a lot of time - more then I have):

1. Relational database.

Yes, relational database is a problem in case of a normalized data model usage (anyone wants a piece of me now? :)). Why?
  • Let assume you have 10,000,000 users. They form the first table - "user".
  • Let further assume each user upload at least 1,000 photos (not a big deal in the age of digital photography). So we have 10,000,000,000 photos - cool isn't it? And it is the second table - "photo".
  • And finally let's add some tags - I usually have from 3 to 6 tags per item. If anyone had been like me we would have about 45,000,000,000 tags in average. These are the third and the forth tables - "tag" and "photo_tags" relationship respectively.
It seems we could end up with quite long tables (especially the one for a relationship). And now we need the final step of this story. Here it is:

In order to extract the information we would need to make ... JOINS! Could anyone say how many records the Cartesian product of all four tables will have? ;)

2. The system restart.

Okay. There are a lot of relational algebra gurus to hire and you might have bought a database engine capable of performing JOINs of tables with infinite number of rows in a finite amount of time. So far so good.

Let's assume your system was up and running until you decided to upgrade the database schema (for example to reach the 13th normal form :)). You planned a downtime, announced the date to your users and stopped the system. Upgrade was done quickly. You started the application again and ... your application now need to calculate the tag cloud! I can even make things "easier" for you - your system has to calculate a tag cloud for everyone from those 10,000,000 people to build personal tag clouds! According to the data model we saw above the calculation like this requires an intensive rows counting in the relationship table(s). And taking into account the number of rows it seems like your database requires additional super ability now - it should be able to count rows as quickly as it JOINs tables ;).

For those of you who are suspicious of my words I suggest to make a little experiment - take your favorite database engine, generate a table with at least 100,000,000 records, perform the following statement: "SELECT COUNT(*) FROM Table;" and measure the time of execution. I promise you will be surprised. Also I suggest to play with your data a little. Try to add some conditions to the statement above, add indexes and check the same statements again, measure the size of indexes, time of their construction and defragmentation etc. This should give you good feeling of the problem.

(By the way do you know my favorite joke from Chuck Norris Facts? - "Chuck Norris counted to infinity - twice.")

Let's move on now and try to think about alternative ways of tags cloud manipulation and persistence.

Firstly there is no need in relational data storage. It doesn't mean you should throw away that thing you payed so much money for - just leave it for more adequate data. A standard hash table with tags and their respective counters of usage persisted as a binary stream will do the trick. It is pretty much enough to represent the entire tag cloud (as well as a personal tag cloud). Of course you shouldn't keep all tags which exist in the system in a single hash table. The tag cloud consists of the most popular tags only (approximately 200 tags in our system) and it is adjusted all the time to be up-to-date.

You can still use the database by storing hash tables as BLOBs (which is probably the easiest way as you don't need an additional type of data storage).

Secondly there is no need to recalculate the tag cloud (as well as personal tag clouds). It can be created once and adjusted all the time according to changes in tags applied to all entities in the system. Moreover these adjustments can be infrequent. Tag cloud is usually rather static. Just look at any tag cloud of the popular photo sharing applications and you can notice that words like "art", "cool", "wedding" etc. are always somewhere near the top. Unfortunately you still have to have all tags and counters of their usage somewhere in the database (perhaps in the separate table) and update those counters all the time to give "new" tags a chance to enter the tag cloud. But it is nothing comparing to the process of full tag cloud recalculation.

The personal tag clouds can be treated similarly. The same representation of a tag cloud as a persisted hash table works here as well. Although the situation is much simpler as you deal with lower number of tags (a vocabulary of an ordinary person is about 5000 words - the real number of words used as tags will be considerably lower). As a result you can have all tags of a person in a single hash table - not just the most popular ones as in the case of the global tag cloud . Put this persisted hash table in the "user" table in an additional column and you will always have a personal tag cloud at your disposal. Don't forget to update it each time the user changes its tags.

So what did we get so far? We managed to replace the extremely slow process of tag cloud calculation (that must be performed only when tag cloud is requested) to the process of continuous tag cloud adjustment (that should be done each time tags change is requested). In other words the infrequent requests (such as getting the tag cloud) are made very quick but frequent requests (such as adding/deleting/changing tags) are made slower. Strange result, isn't it? Don't worry. In the second part I'll show a concrete distributed algorithm you can use to make tag cloud adjustment quick to the point it wouldn't slow the ordinary tags operations. And we will have the only major drawback - the complexity of the implementation.

That's all for now :). I hope you find these ideas useful.
Don't hesitate to ask questions and express your opinion.

Good luck!