We found out that a relational persistence model is not adequate enough for this purpose due to the huge computational overhead it creates. Then we came to the conclusion that tag cloud should not be calculated each time it is requested. Instead it could be kept between requests as a hash table persisted in the binary stream. Unfortunately this simple solution led us to another problem - we need to adjust the hash table each time someone changes tags of any entity in the system. The problem was escalated further after it had become obvious that we need to keep counters for all tags in the system (not just a few hundred tags from the tag cloud). If we had focused on the cloud tags only we would lose the possibility of adding new tags which became popular to the cloud as well as removing ones which lost their popularity. We would have no idea which of two tags is more popular without usage information about at least one of them.
In this part I will present an algorithm effective enough to cope with the issues mentioned above.
The picture below describes the physical architecture I will use to demonstrate the concept (click on the picture to enlarge).
The architecture presented consists of two clusters: web cluster and data storage cluster. The web cluster is built in the following way:
- the shared-nothing approach is used: each server of the cluster is completely independent of its neighbors, can operate even if it is the only server in the cluster and it has the standard predefined configuration and the applications installed;
- servers in the web cluster are capable of communicating with each other by means of group communication software;
- all logic are performed by web servers and mostly driven by incoming web requests – there are no separate application servers in the system;
- there is a load balancer distributing web requests among web servers based on some simple rule (for example randomly with uniform distribution or in a round-robin fashion).
- all tags together with their usage counters,
- the binary representation of the tag cloud hash table,
- some additional information which will be required by the algorithm itself.
As you can see there are three tables in the schema:
1. "tag" – contains the list of all tags stored in the system. The table can have millions of rows especially if you have a multilingual site. The table has three columns:
- id – an integer representing the identifier of the tag in the system,
- name – the tag itself,
- count – the total number of tag usages.
- version – an integer representing an increasing counter for tag list changes,
- tag_cloud – a binary field which contains a binary representation of the tag cloud hash table which corresponds to the version of the tag list,
- created – date and time of the version creation.
id – an integer representing the identifier of the increment, increment – a binary field which contains a binary representation of increment hast table, created – date and time of the increment persistence, version – the version of the tag list which was formed by applying a set of increments (including this one) to the preceding version in a single step.
Reason 1. Why don’t you give me a KISS?!
According to the point 3 of the web cluster description given above – web servers are the only type of servers which perform any actions with tags except permanent storage. These actions are usually a direct response to appropriate user actions such as adding or removing tags of an entity. It looks like we can make each web server to update the tag cloud each time a tags operation is requested. Isn’t it a good and simple option? For example the user updates a set of tags on a photo - the server performs the direct user request and … immediately updates the tag cloud as well. Everything is excellent – tag cloud is always up-to-date plus no full recalculation is ever required. Aren’t we done? Unfortunately no. There is one significant drawback – performance. Do you remember that we have millions of users registered in our system? The tags operations are frequent. They are performed not only when users work with tags directly modifying them but also when they add and delete entities. It doesn’t sound reasonable to add a relatively slow procedure to frequent operations. The tag cloud update involves a few time-consuming activities:
Network transportation. The tag cloud isn’t small – it has entire tags plus usage counters plus the table of hash values and indexes. Database updates. The tag list must be updated. The tag cloud is updated if necessary. Synchronization. Multiple servers compete for the possibility to update the same data structures – it makes the overall system throughput slower.
Reason 2. Is laziness infectious?
Let’s try to improve the previous case. What if we wouldn’t update the tag cloud immediately in the response to any user action but collect the updates for some time and apply all of them at once later. It will definitely lighten the load on the system but fail the constant up-to-date state of the tag cloud. Do we really need the tag cloud to be up-to-date all the time?
As I wrote in the previous part of this article the tag cloud changes slowly. It will hardly ever be changed with each request. And even in case if the tag cloud should have been changed but it didn’t the user is not able to detect this fact. The user neither has an idea about what the other users do nor has the possibility to influence the data strongly enough himself. The only way to betray the fact of delayed tag cloud updates is to provide both tags and tags counters to the user. So we just discovered one of the reasons why popular web applications seldom does that :). Tags of the tag cloud are usually presented arranged relatively to each other by popularity – there are no absolute values.
Okay, taking into account all said above - delaying the tag cloud update sounds like a good idea. How can we implement it? We need to accumulate changes and store them somehow till the tag cloud update is performed. As you might suspect it is the primary responsibility of that new structure called tag list increment. The increment is a simple hash table holding all tags either added or removed between two tag cloud updates. A counter which corresponds to each tag in the table is initially set to zero. It is incremented each time the tag is added and decremented each time the tag is removed. Each web server has its own increment and keeps it in the memory. As you can see we still have to count tags but do it with almost no overhead using efficient in-memory structure.
The last opened question is when to start the procedure of tag cloud update. It is obvious that the more different tags we have in the increment the longer the update procedure will be. The procedure consists of merging the increment into the “tag” table and subsequent adjusting of the tag cloud based on tags usage information obtained during merging. In other words if we collected 5,000 different tags in the increment we have to perform 5,000 INSERT/UPDATE statements. This operation seems to be long and (this is the most important!) it is expensive from the database point of view.
But the situation is not bad, right? Can we just monitor increments and force the tag cloud update to start as soon as the increment has some reasonable amount of tags (for example 100)? Unfortunately we can’t. We cannot predict the simultaneous behavior of users at any point of time in the future. The tags increment on a concrete web server can grow to the size of 100 tags in 10 minutes or in 10 milliseconds depending on the current load and its nature. The process is entirely driven by users.
We have another option – we could spoonfeed the data storage sending statements in turn and making pauses after each one. If we don’t use a single transaction for the whole process we can significantly decrease the load on the database increasing the overall update time. Unfortunately this trick doesn’t help us either because we have a web cluster and there might be a lot of web servers (for example www.myspaces.com installation has more than 1000 web servers). If each server of the cluster spoonfeeds the database with its own increment for a relatively long period of time they will interfere with each other and bring the database to its knees.
Anyway, it looks like we got two options. Which one do you prefer?
A lot of servers sending a bunch of 5000 SQL statements each from time to time. A lot of servers sending 100 SQL statements each per unit of time for a longer period of time with much higher probability of interfering.
Conclusion: we got rid of additional overhead per each tags operation. But we still have a problem – increments are big and merging is expensive. The things become worse if we increase the amount of servers in the web cluster.
Reason 3. Divide et impera!
Don’t worry – it is the final one :). Let’s take a look at the conclusion of the previous reason. We decided that increments can become big quickly and there is not much we can do about that. But a big increment is not a problem itself. The problem is in its merging. Every server has to merge its own tag increment as soon as it overgrew a certain threshold. It would be much better if only one server will be responsible for merging. It would solve the problem of competition and we could spoonfeed the database as long as we need. So what’s the problem? Let’s just select a single web server and make others send their increments to this one instead of merging. The chosen server should collect a few increments and commit all of them into the database at the same time. Of course the chosen server has its own tags increment as well and it will send it to itself as others do. What does the improvement we made give us?
Benefits:
There is no competition for the database resources. The total amount of merges became lower as now both tags and increments are aggregated. Merging is more efficient as increments usually have a lot of tags in common. If we merge all increments into a single one before committing to the database we will save a lot of database operations.
The tag cloud update is delayed more. But it is still not a problem (as a matter of fact it is just a question of the configuration – we can manipulate all thresholds to get the optimal variant). The update procedure became longer as we have more tags to merge (tags which are different in collected tag increments). And this is also okay. The merging can take longer now and we can spoonfeed (see the first point of the benefits). We created a single point of failure. If the chosen server goes down – we are in a serious trouble. The tag collection is stopped at all and we lose increments until the server is back online.
But a failure of the centralized collection is much more serious. To prevent that we have to add some redundancy. It would be good to have two or three servers selected for increments collection (let’s name those servers "collectors"). Using group communication system we can create a separate communication group especially for collectors. Each collector must register itself in this group at startup. All web servers (also called as "emitters") must send increments to the group instead of direct communication with servers. It means that all collectors in the group will get all increments. Now it is much harder to lose an increment in case of failure as any collector has a copy of it.
Let's decide how the group of collectors should actually work. It is obvious that:
Only one of the collectors can merge at any point of time in order not to start the resources competition again. Any increment must be committed only once. Other copies must be discarded as soon as the increment is committed to prevent duplication. All collectors must share (or have access to) the full information about merge operations done so far. Due to the possibility of failures and subsequent restarts collectors won't be in sync from the point of view of increments they had, have or will have. Let's look at the example. Consider an increment sent by an emitter and delivered to two out of three collectors. Because of network malfunction the increment is delayed and has not reached the third server yet. Meantime one of the two servers has met the threshold and started merging. The network delay is big enough and the merge was finished before the increment has been finally delivered to the last server. As a result the increment is obsolete on arrival and must be discarded at once.
THE ALGORITHM
state STARTED:
event - Spontaneously_started:
communication_groups.enter("collectors");
tag_list_version = database.tag_list.get_last_version();
increment_identifiers =
database.tag_list.get_increment_identifiers(tag_list_version);
processed_increment_identifiers.clear();
processed_increment_identifiers.add(increment_identifiers);
become(PROCESSING);
state PROCESSING:
event - Receiving_tag_increment:
increment = network.receive();
if(!processed_increment_identifiers.contains(increment.id))
then
pending_increments.add(increment);if(pending_increments.count() > threshold)
then
increments_list = pending_increments.remove_first_n(threshold);
call asynchronously Merge_increments(increments_list);
become(PROCESSING);
procedure - Merge_increments(increments_list):
lock = lock_service.acquire_application_wide_lock("collectors");
last_version = database.tag_list.get_last_version();
if(tag_list_version == last_version)
then
tag_list_version = tag_list_version + 1;
transaction = database.begin_transaction();
database.tag_list.insert_new_version(tag_list_version);
database.tag_list.insert_increment_identifiers(
tag_list_version, increments_list
);
single_increment = merge_all_in_one(increments_list);
single_increment =
database.tag_list.merge_with(single_increment);
tag_cloud = database.tag_list.get_cloud(tag_list_version - 1);
tag_cloud.merge_with(single_increment);
database.tag_list.insert_tag_cloud(
tag_list_version, tag_cloud
);
transaction.commit();
communication_groups.
send_notification_to("emitters", "tag list is updated");
else
versions = database.tag_list.
get_all_versions_greater(tag_list_version);
foreach(version in versions)
do
identifiers =
database.tag_list.get_increment_identifiers(version);
pending_increments.remove_by_identifiers_if_any(identifiers);
processed_increment_identifiers.add(identifiers);
tag_list_version = database.tag_list.get_last_version();
lock.release();
I suppose it could be named as pseudo code :). Tell me if I am wrong and the code is not self-explanatory or even unreadable. I hope after the long discussion we had you are more than capable of understanding the main ideas behind this algorithm. I will not scan it row by row (at least for now) and let you think it over.
You are welcome to ask questions and any comments are highly appreciated as usual.
Talk to you soon. Happy thinking.