Dealing with success – splitting tables over multiple databases

Now, let's roll forward a little in time and assume you have been successful enough to attract tens of thousands of users and your single database starts creaking under the load.

My general rule of thumb, is to start planning for a bigger machine, or splitting the database, when you are over 80 percent utilization at least for a few hours a day. It's good to have a plan earlier, but now you have to start doing something about really carrying out the plan.

What expansion plans work and when?

There are a couple of popular ways to grow database-backed systems. Depending on your use case, not all ways will work.

Moving to a bigger server

If you suspect that you are near your top load for the service or product, you can simply move to a more powerful server. This may not be the best long-time scaling solution if you are still in the middle, or even in the beginning of your growth. You will run out of bigger machines to buy long before you are done. Servers also become disproportionately more expensive as the size increases, and you will be left with at least one different, and thus not easily replaceable, server once you implement a proper scaling solution.

On the other hand, this will work for some time and is often the easiest way to get some headroom while implementing real scaling solutions.

Master-slave replication – moving reads to slave

Master-slave replication, either trigger-based or WAL-based, works reasonably well in cases where the large majority of the database accesses are reads. Some things that fall under this case, are website content managers, blogs, and other publishing systems.

As our chat system has more or less a 1:1 ratio of writes and reads, moving reads to a separate server will buy us nothing. The replication itself is more expensive than the possible win from reading from a second server.

Multimaster replication

Multi master replication is even worse than master-slave(s) when the problem is scaling a write-heavy workload. It has all the problems of master-slave, plus it introduces extra load via cross-partition locking or conflict resolution requirements, which further slows down the whole cluster.

Data partitioning across multiple servers

The obvious solution to scaling writes is to split them between several servers. Ideally you could have, for example, four servers and each of them getting exactly one fourth of the load.

In this case, each server would hold a quarter of users and messages, and serve a quarter of all requests.

To make the change transparent for database clients, we introduce a layer of proxy databases. These proxy databases can either reside on the same hosts as the partition databases or be on their own host. The role of the proxy databases is to pretend to be the database for clients, but in fact delegate the real work to partitions by calling the right function in the right partition database.

This client transparency is not terribly important if you have just one application accessing the database. If you did, you could then do the splitting in the client application. It becomes very handy as your system grows to have several applications, perhaps using many different platforms and frameworks on the client side.

Having a separate layer of proxy databases enables easy management of data splitting, so that the client applications don't need to know anything about the underlying data architecture. They just call the functions they need and that's all they need to know. In fact, you can switch out the whole database structure without the clients ever noticing anything, except the better performance from the new architecture.

More on how exactly the proxy works later. For now, let us tackle splitting the data.

Splitting the data

If we split the data, we need a simple and efficient way to determine which server stores each data row. If the data had an integer primary key, you could just go round-robin, store the first row on the first server, the second row on the second, and so on. This would give you a fairly even distribution, even when rows with certain IDs are missing.

The partitioning function for selecting between four servers would be simply:

partition_nr = id & 3

The partitioning mask 3 (binary 11) is for the first two bits. For eight partitions, you would use 7 (binary 111), and for 64 servers it would be 63 (00111111). It is not as easy with things like usernames, where putting all names starting with an A first, B second, and so on, does not produce an even distribution.

Turning the username into a fairly evenly distributed integer via the hash function solves this problem and can be used directly to select the partition.

partition_nr = hashtext(username) & 3

This would distribute the users in the following manner:

hannu=# SELECT username, hashtext(username) & 3 as partition_nr FROM user_info; 
-[ RECORD 1 ]+-------- 
username     | bob 
partition_nr | 1 
-[ RECORD 2 ]+-------- 
username     | jane 
partition_nr | 2 
-[ RECORD 3 ]+-------- 
username     | tom 
partition_nr | 1 
-[ RECORD 4 ]+-------- 
username     | mary 
partition_nr | 3 
-[ RECORD 5 ]+-------- 
username     | jill 
partition_nr | 2 
-[ RECORD 6 ]+-------- 
username     | abigail 
partition_nr | 3 
-[ RECORD 7 ]+-------- 
username     | ted 
partition_nr | 3 
-[ RECORD 8 ]+-------- 
username     | alfonso 
partition_nr | 0 

So, partition 0 gets user alfonso, partition 1 bob and tom, partition 2 jane and jill, and partition 3 gets mary, abigail, and ted. The distribution is not exactly one fourth to each partition; but as the number of partitions increase, it will be pretty close where this actually matters.

If we had no PL/Proxy language, we could write the partitioning functions in the most untrusted PL languages. For example, a simple login proxy function written in PL/Pythonu looks like this:

CREATE OR REPLACE FUNCTION pylogin(
    IN i_username text, IN i_pwdhash text,
    OUT status int, OUT message text ) 
AS $$
    import psycopg2
    partitions = [
        'dbname=chap10p0 port=5433',
        'dbname=chap10p1 port=5433',
        'dbname=chap10p2 port=5433',
        'dbname=chap10p3 port=5433',
    ]
    partition_nr = hash(i_username) & 3
    con = psycopg2.connect(partitions[partition_nr])
    cur = con.cursor()
    cur.execute('select * from login(%s,%s)', ( i_username, i_pwdhash))
    status, message = cur.fetchone()
    return (status, message)
$$ LANGUAGE plpythonu SECURITY DEFINER;

Here, we defined a set of four partition databases, given by their connect strings and stored as a list in variable partitions.

When executing the function, we first evaluate the hash function on the username argument (hash(i_username)) and extract two bits from it (& 3) to get an index into the partitions' list (the partition number) for executing each call.

Then, we open a connection to a partition database using the connect string selected by the partition number (con=psycopg2.connect(partitions[partition_nr])).

Finally, we execute a remote query in the partition database and return the results of this to the caller of this proxy function.

This works reasonably well, if implemented like this, but it also has at least two places where it is suboptimal:

  • First, it opens a new database connection each time the function is called, which kills performance
  • Second, it is a maintenance nightmare if you hard-wire the partition information in full, in all functions

The performance problem can be solved by caching the open connections, and the maintenance problem can be solved by having a single function returning the partition information. However, even when we do these changes and stay with PL/Pythonu for partitioning, we will still be doing a lot of copy and paste programming in each of our proxy functions.

Once we had reached the preceding conclusions, when growing our database systems at Skype, the next logical step was quite obvious. We needed a special partitioning language, which would do just this one thing—calling remote SQL functions, and then make it as fast as possible. And thus, the PL/Proxy database partitioning language was born.

PL/Proxy – the partitioning language

The rest of this chapter is devoted to the PL/Proxy language. First, we will install it. Then, we will look at its syntax and ways to configure the partitions for its use. Finally, we will discuss how to do the actual data migration from a single database to a partitioned one and then look at several usage examples.

Installing PL/Proxy

If you are on Debian, Ubuntu, or a Red Hat variant, installing the language is easy.

First, you have to install the required packages on your operating system:

sudo apt-get install postgresql-9.4-plproxy 

Or:

sudo yum install plproxy94 

If you need to install PL/Proxy from the source, you can download it from http://pgfoundry.org/projects/plprox, extract the sources in the contrib folder of your PostgreSQL source tree and run make and make install.

To install PL/Proxy you should run the plproxy.sql file, which is part of the source code or the package you installed.

The PL/Proxy language syntax

The PL/Proxy language itself is very simple. The purpose of a PL/Proxy function is to hand off the processing to another server, so that the language only needs six statements:

  • CONNECT or CLUSTER and RUN ON for selecting the target database partition
  • SELECT and TARGET for specifying the query to run
  • SPLIT for splitting an ARRAY argument between several sub arrays for running on multiple partitions

CONNECT, CLUSTER, and RUN ON

The first group of statements handles the remote connectivity to the partitions. The help determines which database to run the query on. You specify the exact partition to run the query using CONNECT:

CONNECT 'connect string' ;

Here, connect string determines the database to run. connect string is the standard PostgreSQL connect string you would use to connect to the database from a client application, for example: dbname=p0 port=5433 username=test host=localhost.

Or, you can specify a name using CLUSTER:

CLUSTER 'usercluster';

Finally, you can specify a partition number using RUN ON:

RUN ON part_func(arg[, ...]) ; 

part_func()can be any existing or user-defined PostgreSQL function returning an integer. PL/Proxy calls that function with the given arguments and then uses N lower bits from the result to select a connection to a cluster partition.

There are two more versions of the RUN ON statement:

RUN ON ANY;

This means that the function can be executed on any partition in a cluster. This can be used when all the required data for a function is present on all partitions.

The other version is:

RUN ON ALL;

This runs the statement on all partitions in parallel and then returns a concatenation of results from the partitions. This has at least three main uses:

  • For cases when you don't know where the required data row is, like when getting data using non-partition keys. For example, getting a user by its e-mail when the table is partitioned by username.
  • Running aggregate functions over larger subsets of data, say counting all users. For example, getting all the users who have a certain user in their friends' lists.
  • Manipulating data that needs to be the same on all partitions. For example, when you have a price list that other functions are using, then one simple way to manage this price list is using a RUN ON ALL function.

SELECT and TARGET

The default behavior of a PL/Proxy function, if no SELECT or TARGET is present, is to call the function with the exact same signature as itself in the remote partition.

Suppose we have the function:

CREATE OR REPLACE FUNCTION login(
    IN i_username text, IN i_pwdhash text,
    OUT status int, OUT message text ) 
AS $$
    CONNECT 'dbname=chap10 host=10.10.10.1';
$$ LANGUAGE plproxy SECURITY DEFINER;

If it is defined in schema public, the following call select * from login('bob', 'secret') connects to the database chap10 on host 10.10.10.1 and runs the following SQL statement there:

SELECT * FROM public.login('bob', 'secret')

This retrieves the result and returns it to its caller.

If you don't want to define a function inside the remote database, you can substitute the default select * from <thisfunction>(<arg1>, ...) call with your own by writing it in the function body of the PL/Proxy function:

CREATE OR REPLACE FUNCTION get_user_email(i_username text) 
RETURNS SETOF text AS $$
    CONNECT 'dbname=chap10 host=10.10.10.1';
    SELECT email FROM user_info where username = i_username;
$$ LANGUAGE plproxy SECURITY DEFINER;

Only a single SELECT is supported; for any other, or more complex SQL statements, you have to write a remote function and call it.

The third option, is to still call a function similar to itself, but named differently. For example, if you have a proxy function defined not in a separate proxy database, but in a partition, you may want it to target the local database for some data:

CREATE OR REPLACE FUNCTION public.get_user_email(i_username text) RETURNS SETOF text AS $$
    CLUSTER 'messaging';
    
    RUN ON hashtext(i_username);
    TARGET local.get_user_email;
$$ LANGUAGE plproxy SECURITY DEFINER;

In this setup, the local version of get_user_email()is in schema local on all partitions. Therefore, if one of the partitions connects back to the same database that it is defined in, it avoids circular calling.

SPLIT – distributing array elements over several partitions

The last PL/Proxy statement is for cases where you want some bigger chunk of work to be done in appropriate partitions.

For example, if you have a function to create several users in one call and you still want to be able to use it after partitioning, the SPLIT statement is a way to tell PL/Proxy to split the arrays between the partitions based on the partitioning function:

CREATE or REPLACE FUNCTION create_new_users(
    IN i_username text[], IN i_pwdhash text[], IN i_email text[],
    OUT status int, OUT message text )  RETURNS SETOF RECORD
AS $$
BEGIN
  FOR i IN 1..array_length(i_username,1) LOOP
      SELECT *
        INTO status, message
        FROM new_user(i_username[i], i_pwdhash[i], i_email[i]);
      RETURN NEXT;
  END LOOP;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

The following PL/Proxy function definition, created on the proxy database, can be used to split the calls across the partitions:

CREATE or REPLACE FUNCTION create_new_users(
    IN i_username text[], IN i_pwdhash text[], IN i_email text[],
    OUT status int, OUT message text )  RETURNS SETOF RECORD
AS $$
  CLUSTER 'messaging';
  RUN ON hashtext(i_username);
  SPLIT  i_username,  i_pwdhash,  i_email;
$$ LANGUAGE plproxy SECURITY DEFINER;

It would be called by sending in three arrays to the function:

SELECT * FROM create_new_users(
    ARRAY['bob', 'jane', 'tom'],
    ARRAY[md5('bobs_pwd'), md5('janes_pwd'), md5('toms_pwd')],
    ARRAY['[email protected]', '[email protected]', '[email protected]']
);

It will result in two parallel calls to partitions 1 and 2 (as using hashtext(i_username) bob and tom map to partition 1 and mary to partition 2 of the total for the partitions, as explained earlier), with the following arguments for partition 1:

SELECT * FROM create_new_users(
    ARRAY['bob', 'tom'],
    ARRAY['6c6e5b564fb0b192f66b2a0a60c751bb','edcc36c33f7529f430a1bc6eb7191dfe'],
    ARRAY['[email protected]','[email protected]']
);

And this for partition 2:

SELECT * FROM create_new_users(
    ARRAY['jane'],
    ARRAY['cbbf391d3ef4c60afd851d851bda2dc8'],
    ARRAY['[email protected]']
);

Then, it returns a concatenation of the results:

status | message 
--------+--------- 
    200 | OK 
    200 | OK 
    200 | OK 
(3 rows) 

The distribution of data

First, what is a cluster in PL/Proxy? Well, the cluster is a set of partitions that make up the whole database. Each cluster consists of a number of partitions, as determined by the cluster configuration. Each partition is uniquely specified by its connect string. The list of connection strings is what makes up a cluster. The position of the partition in this list is what determines the partition number, so the first element in the list is partition 0, the second partition is 1, and so on.

The partition is selected by the output of the RUN ON function, and then masked by the right number of bits to map it on the partitions. So, if hashtext(i_username) returns 14 and there are four partitions (2 bits, mask binary 11 or 3 in decimal), the partition number will be 14 and 3 = 2, and the function will be called on partition 2 (starting from zero), which is the third element in the partition list.

Note

The constraint that the number of partitions has to be a power of two may seem an unnecessary restriction at first, but it was done in order to make sure that it is, and will remain to be, easy to expand the number of partitions without the need to redistribute all the data.

For example, if you tried to move from three partitions to four, most likely three fourths of the data rows in partitions 0 to 2 have to be moved to new partitions to evenly cover 0 to 3. On the other hand, when moving from four to eight partitions, the data for partitions 0 and 1 is exactly the same that was previously on partition 0, 2-3 is the old 1 and so on. That is, your data does not need to be moved immediately, and half of the data does not need to be moved at all.

The actual configuration of the cluster, the definition of partitions, can be done in two ways, either by using a set of functions in schema plproxy, or you can take advantage of the SQL/MED connection management (SQL/MED is available starting at PostgreSQL 8.4 and above). You can read more about SQL/MED here https://wiki.postgresql.org/wiki/SQL/MED

Configuring the PL/Proxy cluster using functions

This is the original way to configure PL/Proxy, which works on all versions of PostgreSQL. When a query needs to be forwarded to a remote database, the function plproxy.get_cluster_partitions(cluster) is invoked by PL/Proxy to get the connection string to use for each partition.

The following function is an example, which returns information for a cluster with four partitions, p0 to p3:

CREATE OR REPLACE FUNCTION plproxy.get_cluster_partitions(cluster_name text) 
RETURNS SETOF text AS $$ 
BEGIN 
    IF cluster_name = 'messaging' THEN 
        RETURN NEXT 'dbname=p0'; 
        RETURN NEXT 'dbname=p1'; 
        RETURN NEXT 'dbname=p2'; 
        RETURN NEXT 'dbname=p3'; 
    ELSE
        RAISE EXCEPTION 'Unknown cluster'; 
    END IF; 
END; 
$$ LANGUAGE plpgsql; 

A production application might query some configuration tables, or even read some configuration files to return the connection strings. Once again, the number of partitions returned must be a power of two. If you are absolutely sure that some partitions are never used, you can return empty strings for these.

We also need to define a plproxy.get_cluster_version(cluster_name) function. This is called on each request and if the cluster version has not changed, the output from a cached result from plproxy.get_cluster_partitions can be reused. So, it is best to make sure that this function is as fast as possible:

CREATE OR REPLACE FUNCTION plproxy.get_cluster_version(cluster_name text) 
RETURNS int4 AS $$ 
BEGIN 
    IF cluster_name = 'messaging' THEN 
        RETURN 1; 
    ELSE
        RAISE EXCEPTION 'Unknown cluster'; 
    END IF; 
END; 
$$ LANGUAGE plpgsql; 

The last function needed is plproxy.get_cluster_config, which enables you to configure the behavior of PL/Proxy. This sample will set the connection lifetime to 10 minutes:

CREATE OR REPLACE FUNCTION plproxy.get_cluster_config( 
    in cluster_name text, 
    out key text, 
    out val text) 
RETURNS SETOF record AS $$ 
BEGIN 
    -- lets use same config for all clusters 
    key := 'connection_lifetime'; 
    val := 10*60; 
    RETURN NEXT; 
    RETURN; 
END; 
$$ LANGUAGE plpgsql; 

Configuring the PL/Proxy cluster using SQL/MED

Since version 8.4, PostgreSQL has support for an SQL standard for management of external data, usually referred to as SQL/MED. SQL/MED is simply a standard way to access a database. Using functions to configure partitions is arguably insecure, as any caller of plproxy.get_cluster_partitions() can learn connection strings for partitions that may contain sensitive info like passwords. PL/Proxy also provides a way to do the cluster configuration using SQL/MED, which follows the standard SQL security practices.

The same configuration, as discussed earlier, when done using SQL/MED, is as follows:

  1. First, create a foreign data wrapper called plproxy:
    proxy1=# CREATE FOREIGN DATA WRAPPER plproxy;
  2. Then, create an external server that defines both the connection options and the partitions:
    proxy1=# CREATE SERVER messaging FOREIGN DATA WRAPPER plproxy
    proxy1-# OPTIONS (connection_lifetime '1800',
    proxy1(#          p0 'dbname=p0',
    proxy1(#          p1 'dbname=p1',
    proxy1(#          p2 'dbname=p2',
    proxy1(#          p3 'dbname=p3'
    proxy1(# );
    CREATE SERVER
  3. Then, grant usage on this server to either PUBLIC, so all users can use it:
    proxy1=# CREATE USER MAPPING FOR PUBLIC SERVER messaging;
    CREATE USER MAPPING

    Or, to some specific users or groups:

    proxy1=# CREATE USER MAPPING FOR bob SERVER  messaging
    proxy1-#   OPTIONS (user 'plproxy', password 'very.secret'),
    CREATE USER MAPPING
  4. Finally, grant usage on the cluster to the users who need to use it:
    proxy1=# GRANT USAGE ON FOREIGN SERVER messaging TO bob;
    GRANT

Note

More info on SQL/MED, as implemented in PostgreSQL, can be found at http://www.postgresql.org/docs/current/static/sql-createforeigndatawrapper.html.

Moving data from the single to the partitioned database

If you can schedule some downtime and your new partition databases are as big as your original single database, the easiest way to partition the data is to make a full copy of each of the nodes and then simply delete the rows that do not belong to the partition:

pg_dump chap10 | psql p0
psql p0 -c 'delete from message where hashtext(to_user) & 3 <> 0'
psql p0 -c 'delete from user_info where hashtext(username) & 3 <> 0'

Repeat this for partitions p1 to p3, each time deleting rows which don't match the partition number (psql chap10p1 -c 'delete … & 3 <> 1).

Note

Remember to vacuum when you are finished deleting the rows. PostgreSQL will leave the dead rows in the data tables, so do a little maintenance while you have some downtime.

When trying to delete from user_info, you will notice that you can't do it without dropping a foreign key from message.from_user.

Here, we could decide that it is Okay to keep the messages on the receivers partition only, and if needed, that the sent messages can be retrieved using a RUN ON ALL function. So, we will drop the foreign key from messages.from_user.

psql p0 -c 'alter table message drop constraint message_from_user_fkey'

There are other options, when splitting the data, that require less disk space usage for a database system, if you are willing to do more manual work.

For example, you can copy over just the schema using pg_dump -s and then use COPY from an SQL statement to move over just the needed rows:

pg_dump -s chap10 | psql p0
psql chap10 -c "COPY (select * from message where hashtext(to_user) & 3 = 0) TO stdout" | psql p0 -c 'COPY messages FROM stdin'

Or even set up a specially designed Londiste replica and do the switch from a single database to a partitioned cluster in only seconds, once the replica has reached a stable state.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.34.109