One of the core components of Skytools is pgq
. It provides a generic queuing interface, which allows you to deliver messages from a provider to an arbitrary number of consumers.
The question is: What is the point of a queue in general? A queue has some very nice features. First of all it will guarantee the delivery of a message. In addition to that it will make sure that the order in which messages are put into the queue is preserved. This is highly important in the case of replication because we must definitely make sure that messages will not overtake each other.
The idea of a queue is to be able to send anything from an entity producing the data to any other host participating in the system. This is not only suitable for replication but for a lot more—you can use pgq
as an infrastructure to flexibly dispatch information. Practical examples for this could be shopping cart purchases, bank transfers, or user messages. Replicating a full table is in this sense more or less a special case.
In general a queue knows two operations:
Those two operations are the backbone of every "queue"-based application.
To use pgq
inside a database you have to install it as a normal PostgreSQL extension. If the installation process has worked properly, you can simply run the following instruction:
test=# CREATE EXTENSION pgq;
CREATE EXTENSION
Now that all modules have been loaded into the database we create a simple queue.
For the purpose of this example we create a queue named DemoQueue
:
test=# SELECT pgq.create_queue('DemoQueue'),
create_queue
--------------
1
(1 row)
If the queue has been created successfully, a number will be returned. Internally the queue is just an entry inside some pgq
bookkeeping table:
test=# x
Expanded display is on.
test=# SELECT * FROM pgq.queue;
-[ RECORD 1 ]------------+------------------------------
queue_id | 1
queue_name | DemoQueue
queue_ntables | 3
queue_cur_table | 0
queue_rotation_period | 02:00:00
queue_switch_step1 | 489693
queue_switch_step2 | 489693
queue_switch_time | 2013-05-14 16:35:38.132693+02
queue_external_ticker | f
queue_disable_insert | f
queue_ticker_paused | f
queue_ticker_max_count | 500
queue_ticker_max_lag | 00:00:03
queue_ticker_idle_period | 00:01:00
queue_per_tx_limit |
queue_data_pfx | pgq.event_1
queue_event_seq | pgq.event_1_id_seq
queue_tick_seq | pgq.event_1_tick_seq
The bookkeeping table outlines some essential information about our queue internals. In this specific example it will tell us how many internal tables pgq
will use to handle our queue, which table is active at the moment, how often it is switched and so on. Practically this information is not relevant to ordinary users—it is merely an internal thing.
Once the queue has been created, we can add data to the queue. The function to do that has three parameters: The first parameter is the name of the queue. The second and third parameters are data values to enqueue. In many cases it makes a lot of sense to use two values here. The first value can nicely represent a key while the second value can be seen as the payload of this message. Here is an example:
test=# BEGIN; BEGIN test=# SELECT pgq.insert_event('DemoQueue', 'some_key_1', 'some_data_1'), insert_event -------------- 1 (1 row) test=# SELECT pgq.insert_event('DemoQueue', 'some_key_2', 'some_data_2'), insert_event -------------- 2 (1 row) test=# COMMIT; COMMIT
In our case we have added two rows featuring some sample data. Now we can register two consumers, which are supposed to get those messages in the proper order:
test=# BEGIN; BEGIN test=# SELECT pgq.register_consumer('DemoQueue', 'Consume_1'), register_consumer ------------------- 1 (1 row) test=# SELECT pgq.register_consumer('DemoQueue', 'Consume_2'), register_consumer ------------------- 1 (1 row) test=# COMMIT; COMMIT
Two consumers have been created. A message will be marked as processed as soon as both consumers have fetched the message and marked it as done.
Before we can actually see how the messages can be consumed, we have to discuss the way pgq
works briefly. How does the consumer know which rows are there to consume? Managing a queue is not simple. Just imagine two concurrent transactions adding rows. A transaction can only be replicated if all depending transactions are replicated.
Here is an example:
Connection 1: |
Connection 2: |
---|---|
|
|
|
|
|
Remember, if we manage queues we have to make sure that total order is maintained so we can only provide row number 3 to the consumer if the transaction writing row number 4 has committed. If we were to provide row number 3 to the consumer before the second transaction in connection 1 has finished, row number 3 would effectively overtake row number 2. This must not be the case.
In the case of pgq
a so called ticker process will take care of those little details.
The ticker (pgqd
) process will handle the queue for us and decide who is ready to already consume what. To make the ticker process work, we create two directories. One will hold logfiles and the other one is going to store the pid
files created by the ticker process:
hs@hs-VirtualBox:~$ mkdir log hs@hs-VirtualBox:~$ mkdir pid
Once we have created those directories we have to come up with a config file for the ticker:
[pgqd] logfile = ~/log/pgqd.log pidfile = ~/pid/pgqd.pid ## optional parameters ## # libpq connect string without dbname= base_connstr = host=localhost # startup db to query other databases initial_database = postgres # limit ticker to specific databases database_list = test # log into syslog syslog = 0 syslog_ident = pgqd ## optional timeouts ## # how often to check for new databases check_period = 60 # how often to flush retry queue retry_period = 30 # how often to do maintentance maint_period = 120 # how often to run ticker ticker_period = 1
As we have mentioned already, the ticker is in charge of those queues. To make sure that this works nicely, we have to point the ticker to the PostgreSQL instance. Keep in mind that the connect string will be autocompleted (some information is already known by the infrastructure and it is used for autocompletion). Ideally you will use the database_list
directive here to make sure that only those databases that are really needed will be taken.
As far as logging is concerned you got two options here. You can directly log to syslog or send the log to a logfile. In our example we have decided not to use syslog (syslog has been set to 0 in our config file). Finally there are some parameters to configure how often queue maintenance should be performed and so on.
The ticker can be started easily:
hs@hs-VirtualBox:~/skytools$ pgqd ticker.ini 2013-05-14 17:01:38.006 23053 LOG Starting pgqd 3.1.4 2013-05-14 17:01:38.059 23053 LOG test: pgq version ok: 3.1.3 2013-05-14 17:02:08.010 23053 LOG {ticks: 30, maint: 1, retry: 0} 2013-05-14 17:02:38.012 23053 LOG {ticks: 30, maint: 0, retry: 0}
The important thing here is that the ticker can also be started as daemon directly. The –d
command line option will automatically send the process to the background and decouple it from the active terminal.
Just adding messages to the queue might not be what we want. At some point we will also want to consume this data. To do so we can call pgq.next_batch
. The system will return a number identifying the batch:
test=# BEGIN; BEGIN test=# SELECT pgq.next_batch('DemoQueue', 'Consume_1'), next_batch ------------ 1 (1 row)
Once we have got the ID of the batch we can fetch the data itself:
test=# x Expanded display is on. test=# SELECT * FROM pgq.get_batch_events(1); -[ RECORD 1 ]---------------------------- ev_id | 1 ev_time | 2013-05-14 16:43:39.854199+02 ev_txid | 489695 ev_retry | ev_type | some_key_1 ev_data | some_data_1 ev_extra1 | ev_extra2 | ev_extra3 | ev_extra4 | -[ RECORD 2 ]---------------------------- ev_id | 2 ev_time | 2013-05-14 16:43:39.854199+02 ev_txid | 489695 ev_retry | ev_type | some_key_2 ev_data | some_data_2 ev_extra1 | ev_extra2 | ev_extra3 | ev_extra4 | test=# COMMIT; COMMIT
In our case the batch consists of two messages. It is important to know: Messages that have been enqueued in separate transactions or by many different connections, might still end up in the same pack of work for the consumer. This is totally intended behavior. The correct order will be preserved.
Once a batch has been processed by the consumer, it can be marked as done:
test=# SELECT pgq.finish_batch(1);
finish_batch
--------------
1
(1 row)
This means that the data is gone from the queue—logically pgq.get_batch_events
will return an error for this batch ID:
test=# SELECT * FROM pgq.get_batch_events(1);
ERROR: batch not found
CONTEXT: PL/pgSQL function pgq.get_batch_events(bigint) line 16 at assignment
If a queue is no longer needed, it can be dropped. But, you cannot simply call pgq.drop_queue
. Dropping the queue is only possible if all consumers have unregistered:
test=# SELECT pgq.drop_queue('DemoQueue'),
ERROR: cannot drop queue, consumers still attached
CONTEXT: PL/pgSQL function pgq.drop_queue(text) line 10 at RETURN
To unregister the consumer we can do the following:
test=# SELECT pgq.unregister_consumer('DemoQueue', 'Consume_1'), unregister_consumer --------------------- 1 (1 row) test=# SELECT pgq.unregister_consumer('DemoQueue', 'Consume_2'), unregister_consumer --------------------- 1 (1 row)
Now we can safely drop the queue.
test=# SELECT pgq.drop_queue('DemoQueue'),
drop_queue
------------
1
(1 row)
pgq
has proven to be especially useful if you have to model a flow of messages that has to be transactional. The beauty of pgq
is that you can put basically everything into a queue—you can decide freely on the type of messages and their format (as long as you are using text).
It is important to see that pgq
is not just something that is purely related to replication—it has a much wider range and offers a solid technology base for countless applications.
3.144.91.24