Saturday, 16 June 2007

Implementing Tag Cloud - The Nasty Way (Part 2)

In the previous part of this article I've started the discussion of the tag cloud construction problem. Let's make a little recollection.

We found out that a relational persistence model is not adequate enough for this purpose due to the huge computational overhead it creates. Then we came to the conclusion that tag cloud should not be calculated each time it is requested. Instead it could be kept between requests as a hash table persisted in the binary stream. Unfortunately this simple solution led us to another problem - we need to adjust the hash table each time someone changes tags of any entity in the system. The problem was escalated further after it had become obvious that we need to keep counters for all tags in the system (not just a few hundred tags from the tag cloud). If we had focused on the cloud tags only we would lose the possibility of adding new tags which became popular to the cloud as well as removing ones which lost their popularity. We would have no idea which of two tags is more popular without usage information about at least one of them.

In this part I will present an algorithm effective enough to cope with the issues mentioned above.

The picture below describes the physical architecture I will use to demonstrate the concept (click on the picture to enlarge).


The architecture presented consists of two clusters: web cluster and data storage cluster. The web cluster is built in the following way:
  1. the shared-nothing approach is used: each server of the cluster is completely independent of its neighbors, can operate even if it is the only server in the cluster and it has the standard predefined configuration and the applications installed;
  2. servers in the web cluster are capable of communicating with each other by means of group communication software;
  3. all logic are performed by web servers and mostly driven by incoming web requests – there are no separate application servers in the system;
  4. there is a load balancer distributing web requests among web servers based on some simple rule (for example randomly with uniform distribution or in a round-robin fashion).
The organization of data storage cluster is not important until it is reliable and provides consistent data to the web cluster. But we have to define the informational schema to store all kind of data we got so far:
  1. all tags together with their usage counters,
  2. the binary representation of the tag cloud hash table,
  3. some additional information which will be required by the algorithm itself.
We will suppose that our data storage is a relational database (or a cluster of relational databases) as it is the most common approach. Please take a look at the picture below.


As you can see there are three tables in the schema:

1. "tag" – contains the list of all tags stored in the system. The table can have millions of rows especially if you have a multilingual site. The table has three columns:
  • id – an integer representing the identifier of the tag in the system,
  • name – the tag itself,
  • count – the total number of tag usages.
2. "tag_list_version" – contains all versions of the tag list which is represented by the “tag” table. Each time the “tag” table is changed a new row should be added to the “tag_list_version” table indicating that a new version of tag list was created. The table has four columns:
  • version – an integer representing an increasing counter for tag list changes,
  • tag_cloud – a binary field which contains a binary representation of the tag cloud hash table which corresponds to the version of the tag list,
  • created – date and time of the version creation.
3. "tag_increment" – contains all increments ever applied to the tag list. Each new version of the tag list is formed from the previous one by applying several tag list increments. A tag list increment (hereinafter increment) is an algorithm specific data structure which is similar to the tag cloud. It is also a hash table of <tag, usage counter> pairs with the only difference – counters can be negative. The negative value means that during the period when the increment was collected a concrete tag was removed more times than added. I will tell more about increments below. Here is the columns:
  • id – an integer representing the identifier of the increment,
  • increment – a binary field which contains a binary representation of increment hast table,
  • created – date and time of the increment persistence,
  • version – the version of the tag list which was formed by applying a set of increments (including this one) to the preceding version in a single step.
Okay, we covered the minimum required to start the algorithm description Let’s move on and pick up some logical reasoning.

Reason 1. Why don’t you give me a KISS?!

According to the point 3 of the web cluster description given above – web servers are the only type of servers which perform any actions with tags except permanent storage. These actions are usually a direct response to appropriate user actions such as adding or removing tags of an entity. It looks like we can make each web server to update the tag cloud each time a tags operation is requested. Isn’t it a good and simple option? For example the user updates a set of tags on a photo - the server performs the direct user request and … immediately updates the tag cloud as well. Everything is excellent – tag cloud is always up-to-date plus no full recalculation is ever required. Aren’t we done? Unfortunately no. There is one significant drawback – performance. Do you remember that we have millions of users registered in our system? The tags operations are frequent. They are performed not only when users work with tags directly modifying them but also when they add and delete entities. It doesn’t sound reasonable to add a relatively slow procedure to frequent operations. The tag cloud update involves a few time-consuming activities:
  1. Network transportation. The tag cloud isn’t small – it has entire tags plus usage counters plus the table of hash values and indexes.
  2. Database updates. The tag list must be updated. The tag cloud is updated if necessary.
  3. Synchronization. Multiple servers compete for the possibility to update the same data structures – it makes the overall system throughput slower.
Conclusion: the solution is possible but it is far from being optimal under the serious load.

Reason 2. Is laziness infectious?

Let’s try to improve the previous case. What if we wouldn’t update the tag cloud immediately in the response to any user action but collect the updates for some time and apply all of them at once later. It will definitely lighten the load on the system but fail the constant up-to-date state of the tag cloud. Do we really need the tag cloud to be up-to-date all the time?

As I wrote in the previous part of this article the tag cloud changes slowly. It will hardly ever be changed with each request. And even in case if the tag cloud should have been changed but it didn’t the user is not able to detect this fact. The user neither has an idea about what the other users do nor has the possibility to influence the data strongly enough himself. The only way to betray the fact of delayed tag cloud updates is to provide both tags and tags counters to the user. So we just discovered one of the reasons why popular web applications seldom does that :). Tags of the tag cloud are usually presented arranged relatively to each other by popularity – there are no absolute values.

Okay, taking into account all said above - delaying the tag cloud update sounds like a good idea. How can we implement it? We need to accumulate changes and store them somehow till the tag cloud update is performed. As you might suspect it is the primary responsibility of that new structure called tag list increment. The increment is a simple hash table holding all tags either added or removed between two tag cloud updates. A counter which corresponds to each tag in the table is initially set to zero. It is incremented each time the tag is added and decremented each time the tag is removed. Each web server has its own increment and keeps it in the memory. As you can see we still have to count tags but do it with almost no overhead using efficient in-memory structure.

The last opened question is when to start the procedure of tag cloud update. It is obvious that the more different tags we have in the increment the longer the update procedure will be. The procedure consists of merging the increment into the “tag” table and subsequent adjusting of the tag cloud based on tags usage information obtained during merging. In other words if we collected 5,000 different tags in the increment we have to perform 5,000 INSERT/UPDATE statements. This operation seems to be long and (this is the most important!) it is expensive from the database point of view.

But the situation is not bad, right? Can we just monitor increments and force the tag cloud update to start as soon as the increment has some reasonable amount of tags (for example 100)? Unfortunately we can’t. We cannot predict the simultaneous behavior of users at any point of time in the future. The tags increment on a concrete web server can grow to the size of 100 tags in 10 minutes or in 10 milliseconds depending on the current load and its nature. The process is entirely driven by users.

We have another option – we could spoonfeed the data storage sending statements in turn and making pauses after each one. If we don’t use a single transaction for the whole process we can significantly decrease the load on the database increasing the overall update time. Unfortunately this trick doesn’t help us either because we have a web cluster and there might be a lot of web servers (for example www.myspaces.com installation has more than 1000 web servers). If each server of the cluster spoonfeeds the database with its own increment for a relatively long period of time they will interfere with each other and bring the database to its knees.
Anyway, it looks like we got two options. Which one do you prefer?
  1. A lot of servers sending a bunch of 5000 SQL statements each from time to time.
  2. A lot of servers sending 100 SQL statements each per unit of time for a longer period of time with much higher probability of interfering.
To telling the truth both of them don’t look pretty. Let’s try something else.

Conclusion: we got rid of additional overhead per each tags operation. But we still have a problem – increments are big and merging is expensive. The things become worse if we increase the amount of servers in the web cluster.

Reason 3. Divide et impera!

Don’t worry – it is the final one :). Let’s take a look at the conclusion of the previous reason. We decided that increments can become big quickly and there is not much we can do about that. But a big increment is not a problem itself. The problem is in its merging. Every server has to merge its own tag increment as soon as it overgrew a certain threshold. It would be much better if only one server will be responsible for merging. It would solve the problem of competition and we could spoonfeed the database as long as we need. So what’s the problem? Let’s just select a single web server and make others send their increments to this one instead of merging. The chosen server should collect a few increments and commit all of them into the database at the same time. Of course the chosen server has its own tags increment as well and it will send it to itself as others do. What does the improvement we made give us?

Benefits:
  1. There is no competition for the database resources.
  2. The total amount of merges became lower as now both tags and increments are aggregated.
  3. Merging is more efficient as increments usually have a lot of tags in common. If we merge all increments into a single one before committing to the database we will save a lot of database operations.
Drawbacks:
  1. The tag cloud update is delayed more. But it is still not a problem (as a matter of fact it is just a question of the configuration – we can manipulate all thresholds to get the optimal variant).
  2. The update procedure became longer as we have more tags to merge (tags which are different in collected tag increments). And this is also okay. The merging can take longer now and we can spoonfeed (see the first point of the benefits).
  3. We created a single point of failure. If the chosen server goes down – we are in a serious trouble. The tag collection is stopped at all and we lose increments until the server is back online.
So here it is! All that’s left is to fix the “single point of failure” drawback. I’d like to admit that we don’t need the precise values of tags counters to build the tag cloud if we have a lot of users. That’s why we could sometimes afford to lose an entire increment. And that’s why we don’t speak about single points of failure discussing reason 2 – any web server can and will go down from time to time losing the tags from its current increment which is not persisted yet.

But a failure of the centralized collection is much more serious. To prevent that we have to add some redundancy. It would be good to have two or three servers selected for increments collection (let’s name those servers "collectors"). Using group communication system we can create a separate communication group especially for collectors. Each collector must register itself in this group at startup. All web servers (also called as "emitters") must send increments to the group instead of direct communication with servers. It means that all collectors in the group will get all increments. Now it is much harder to lose an increment in case of failure as any collector has a copy of it.

Let's decide how the group of collectors should actually work. It is obvious that:
  1. Only one of the collectors can merge at any point of time in order not to start the resources competition again.
  2. Any increment must be committed only once. Other copies must be discarded as soon as the increment is committed to prevent duplication.
  3. All collectors must share (or have access to) the full information about merge operations done so far. Due to the possibility of failures and subsequent restarts collectors won't be in sync from the point of view of increments they had, have or will have.
    • Let's look at the example. Consider an increment sent by an emitter and delivered to two out of three collectors. Because of network malfunction the increment is delayed and has not reached the third server yet. Meantime one of the two servers has met the threshold and started merging. The network delay is big enough and the merge was finished before the increment has been finally delivered to the last server. As a result the increment is obsolete on arrival and must be discarded at once.
All these points are addressed in the final algorithm below. Please pay special attention to them. The correct implementation is crucial for the stability of the system.


THE ALGORITHM

state STARTED:

event - Spontaneously_started:

communication_groups.enter("collectors");
tag_list_version = database.tag_list.get_last_version();
increment_identifiers =
database.tag_list.get_increment_identifiers(tag_list_version);
processed_increment_identifiers.clear();
processed_increment_identifiers.add(increment_identifiers);
become(PROCESSING);

state PROCESSING:

event - Receiving_tag_increment:

increment = network.receive();
if(!processed_increment_identifiers.contains(increment.id))
then
pending_increments.add(increment);
if(pending_increments.count() > threshold)
then
increments_list = pending_increments.remove_first_n(threshold);
call asynchronously Merge_increments(increments_list);
become(PROCESSING);

procedure - Merge_increments(increments_list):

lock = lock_service.acquire_application_wide_lock("collectors");
last_version = database.tag_list.get_last_version();
if(tag_list_version == last_version)
then
tag_list_version = tag_list_version + 1;
transaction = database.begin_transaction();
database.tag_list.insert_new_version(tag_list_version);
database.tag_list.insert_increment_identifiers(
tag_list_version, increments_list
);
single_increment = merge_all_in_one(increments_list);
single_increment =
database.tag_list.merge_with(single_increment);
tag_cloud = database.tag_list.get_cloud(tag_list_version - 1);
tag_cloud.merge_with(single_increment);
database.tag_list.insert_tag_cloud(
tag_list_version, tag_cloud
);
transaction.commit();
communication_groups.
send_notification_to("emitters", "tag list is updated");
else
versions = database.tag_list.
get_all_versions_greater(tag_list_version);
foreach(version in versions)
do
identifiers =
database.tag_list.get_increment_identifiers(version);
pending_increments.remove_by_identifiers_if_any(identifiers);
processed_increment_identifiers.add(identifiers);
tag_list_version = database.tag_list.get_last_version();
lock.release();

I suppose it could be named as pseudo code :). Tell me if I am wrong and the code is not self-explanatory or even unreadable. I hope after the long discussion we had you are more than capable of understanding the main ideas behind this algorithm. I will not scan it row by row (at least for now) and let you think it over.

You are welcome to ask questions and any comments are highly appreciated as usual.

Talk to you soon. Happy thinking.

10 comments:

MikeNZ said...

Thanks for your articles. This is a great insight into the complexities of tagging. I'm about to design and implement a tagging system for our opensource cms (Jojo). Everything I've done on Jojo has had scalability in mind so figured the tag system should to. But now I'm not so sure I want to do that degree of work ;-)

Thanks for publicizing your think. It's a huge help.

Bob Warfield said...

You should give us a "Part 3" that shows how to build Tag Clouds with MapReduce/Hadoop-style programming, just for fun.

I am envisioning a set of servers, with each server dedicated to recreating a subset of tag clouds. Perhaps you break them up along the user dimension, or perhaps it works better to break them up by the tags themselves.

In any event, these algorithms generate a lot of attention and people would likely want to see how to apply them here.

Yury said...

Mike, I'm glad you found this information useful. If you decide to go this way in your project feel free to come back and ask. By the way I'm a big fan of web content management systems. I developed quite a big one from scratch some time ago. It was excellent experience in web technologies. Unfortunately there were no high reliability and scalability requirements :).

Yury said...

Bob, thank you for the idea.
I've started looking into Hadoop project the moment I found it. It looks very promising. I think about construction a .Net clone of it. It is difficult to say how well it suits the purpose of tag cloud construction right now but I hope to find out this quite soon. Probably will write a few words about the results later...

George GuimarĂ£es said...

Yuri,

I loved these articles about tag architecture. We're also asking ourselves about this and our solution was like yours. But you talked about things we didn't had thought before, so I guess we're going to redesign a few things. :)

But our blog is dead. Please, bring us new posts!

Bye,
George.

Vlad said...

Fantastic information, great articles! I'm still not getting everything, but I'm learning and I will read them again and again! Thanks.

software Development India said...
This comment has been removed by the author.
software Development India said...

I just came upon your site and desired to say that I have really experienced reading your content. Any way I will be registering to your rss and I wish you publish again soon.
software outsourcing

Akhilesh Dubey said...

Nice insight into the tagging implementation. Heartily appreciated.

Akhilesh Dubey said...

Nice insight into the tagging implementation. Heartily appreciated.