Managing pgq-queues

One of the core components of Skytools is pgq. It provides a generic queuing interface, which allows you to deliver messages from a provider to an arbitrary number of consumers.

The question is: What is the point of a queue in general? A queue has some very nice features. First of all it will guarantee the delivery of a message. In addition to that it will make sure that the order in which messages are put into the queue is preserved. This is highly important in the case of replication because we must definitely make sure that messages will not overtake each other.

The idea of a queue is to be able to send anything from an entity producing the data to any other host participating in the system. This is not only suitable for replication but for a lot more—you can use pgq as an infrastructure to flexibly dispatch information. Practical examples for this could be shopping cart purchases, bank transfers, or user messages. Replicating a full table is in this sense more or less a special case.

In general a queue knows two operations:

  • Enqueue: To put a message into the queue
  • Dequeue: To fetch a message from the queue (this is also called "consuming" a message).

Those two operations are the backbone of every "queue"-based application.

Tip

What we define as a queue in Skytools is something you would call "topic" in JMS terminology.

Running pgq

To use pgq inside a database you have to install it as a normal PostgreSQL extension. If the installation process has worked properly, you can simply run the following instruction:

test=# CREATE EXTENSION pgq;
CREATE EXTENSION

Now that all modules have been loaded into the database we create a simple queue.

Creating queues and adding data

For the purpose of this example we create a queue named DemoQueue:

test=# SELECT pgq.create_queue('DemoQueue'),
create_queue 
--------------
            1
(1 row)

If the queue has been created successfully, a number will be returned. Internally the queue is just an entry inside some pgq bookkeeping table:

test=# x
Expanded display is on.
test=# SELECT * FROM pgq.queue;
-[ RECORD 1 ]------------+------------------------------
queue_id                 | 1
queue_name               | DemoQueue
queue_ntables            | 3
queue_cur_table          | 0
queue_rotation_period    | 02:00:00
queue_switch_step1       | 489693
queue_switch_step2       | 489693
queue_switch_time        | 2013-05-14 16:35:38.132693+02
queue_external_ticker    | f
queue_disable_insert     | f
queue_ticker_paused      | f
queue_ticker_max_count   | 500
queue_ticker_max_lag     | 00:00:03
queue_ticker_idle_period | 00:01:00
queue_per_tx_limit       | 
queue_data_pfx           | pgq.event_1
queue_event_seq          | pgq.event_1_id_seq
queue_tick_seq           | pgq.event_1_tick_seq

The bookkeeping table outlines some essential information about our queue internals. In this specific example it will tell us how many internal tables pgq will use to handle our queue, which table is active at the moment, how often it is switched and so on. Practically this information is not relevant to ordinary users—it is merely an internal thing.

Once the queue has been created, we can add data to the queue. The function to do that has three parameters: The first parameter is the name of the queue. The second and third parameters are data values to enqueue. In many cases it makes a lot of sense to use two values here. The first value can nicely represent a key while the second value can be seen as the payload of this message. Here is an example:

test=# BEGIN;
BEGIN
test=# SELECT pgq.insert_event('DemoQueue', 
  'some_key_1', 'some_data_1'),
insert_event 
--------------
            1
(1 row)

test=# SELECT pgq.insert_event('DemoQueue', 
  'some_key_2', 'some_data_2'),
insert_event 
--------------
            2
(1 row)

test=# COMMIT;
COMMIT

Adding consumers

In our case we have added two rows featuring some sample data. Now we can register two consumers, which are supposed to get those messages in the proper order:

test=# BEGIN;
BEGIN
test=# SELECT pgq.register_consumer('DemoQueue', 
	'Consume_1'),
register_consumer 
-------------------
                 1
(1 row)

test=# SELECT pgq.register_consumer('DemoQueue',
	'Consume_2'),
register_consumer 
-------------------
                 1
(1 row)

test=# COMMIT;
COMMIT

Two consumers have been created. A message will be marked as processed as soon as both consumers have fetched the message and marked it as done.

Configuring the ticker

Before we can actually see how the messages can be consumed, we have to discuss the way pgq works briefly. How does the consumer know which rows are there to consume? Managing a queue is not simple. Just imagine two concurrent transactions adding rows. A transaction can only be replicated if all depending transactions are replicated.

Here is an example:

Connection 1:

Connection 2:

INSERT ... VALUES (1)

BEGIN;

BEGIN;

INSERT ... VALUES (2)

INSERT ... VALUES (3)

COMMIT;

INSERT ... VALUES (4)

COMMIT;

 

Remember, if we manage queues we have to make sure that total order is maintained so we can only provide row number 3 to the consumer if the transaction writing row number 4 has committed. If we were to provide row number 3 to the consumer before the second transaction in connection 1 has finished, row number 3 would effectively overtake row number 2. This must not be the case.

In the case of pgq a so called ticker process will take care of those little details.

The ticker (pgqd) process will handle the queue for us and decide who is ready to already consume what. To make the ticker process work, we create two directories. One will hold logfiles and the other one is going to store the pid files created by the ticker process:

hs@hs-VirtualBox:~$ mkdir log
hs@hs-VirtualBox:~$ mkdir pid

Once we have created those directories we have to come up with a config file for the ticker:

[pgqd]
logfile = ~/log/pgqd.log
pidfile = ~/pid/pgqd.pid

## optional parameters ##
# libpq connect string without dbname=
base_connstr = host=localhost

# startup db to query other databases
initial_database = postgres

# limit ticker to specific databases
database_list = test

# log into syslog
syslog = 0
syslog_ident = pgqd

## optional timeouts ##
# how often to check for new databases
check_period = 60

# how often to flush retry queue
retry_period = 30

# how often to do maintentance
maint_period = 120

# how often to run ticker
ticker_period = 1

As we have mentioned already, the ticker is in charge of those queues. To make sure that this works nicely, we have to point the ticker to the PostgreSQL instance. Keep in mind that the connect string will be autocompleted (some information is already known by the infrastructure and it is used for autocompletion). Ideally you will use the database_list directive here to make sure that only those databases that are really needed will be taken.

As far as logging is concerned you got two options here. You can directly log to syslog or send the log to a logfile. In our example we have decided not to use syslog (syslog has been set to 0 in our config file). Finally there are some parameters to configure how often queue maintenance should be performed and so on.

The ticker can be started easily:

hs@hs-VirtualBox:~/skytools$ pgqd ticker.ini
2013-05-14 17:01:38.006 23053 LOG Starting pgqd 3.1.4
2013-05-14 17:01:38.059 23053 LOG test: pgq version ok: 3.1.3
2013-05-14 17:02:08.010 23053 LOG {ticks: 30, maint: 1, retry: 0}
2013-05-14 17:02:38.012 23053 LOG {ticks: 30, maint: 0, retry: 0}

The important thing here is that the ticker can also be started as daemon directly. The –d command line option will automatically send the process to the background and decouple it from the active terminal.

Consuming messages

Just adding messages to the queue might not be what we want. At some point we will also want to consume this data. To do so we can call pgq.next_batch. The system will return a number identifying the batch:

test=# BEGIN;
BEGIN
test=# SELECT pgq.next_batch('DemoQueue', 'Consume_1'),
next_batch 
------------
          1
(1 row)

Once we have got the ID of the batch we can fetch the data itself:

test=# x
Expanded display is on.
test=# SELECT * FROM pgq.get_batch_events(1);
-[ RECORD 1 ]----------------------------
ev_id     | 1
ev_time   | 2013-05-14 16:43:39.854199+02
ev_txid   | 489695
ev_retry  | 
ev_type   | some_key_1
ev_data   | some_data_1
ev_extra1 | 
ev_extra2 | 
ev_extra3 | 
ev_extra4 | 
-[ RECORD 2 ]----------------------------
ev_id     | 2
ev_time   | 2013-05-14 16:43:39.854199+02
ev_txid   | 489695
ev_retry  | 
ev_type   | some_key_2
ev_data   | some_data_2
ev_extra1 | 
ev_extra2 | 
ev_extra3 | 
ev_extra4 | 

test=# COMMIT;
COMMIT

In our case the batch consists of two messages. It is important to know: Messages that have been enqueued in separate transactions or by many different connections, might still end up in the same pack of work for the consumer. This is totally intended behavior. The correct order will be preserved.

Once a batch has been processed by the consumer, it can be marked as done:

test=# SELECT pgq.finish_batch(1);
finish_batch 
--------------
            1
(1 row)

This means that the data is gone from the queue—logically pgq.get_batch_events will return an error for this batch ID:

test=# SELECT * FROM pgq.get_batch_events(1);
ERROR:  batch not found
CONTEXT:  PL/pgSQL function pgq.get_batch_events(bigint) line 16 at assignment

Tip

The message is only gone for this consumer. Other consumers will still be able to consume it once.

Dropping queues

If a queue is no longer needed, it can be dropped. But, you cannot simply call pgq.drop_queue. Dropping the queue is only possible if all consumers have unregistered:

test=# SELECT pgq.drop_queue('DemoQueue'),
ERROR:  cannot drop queue, consumers still attached
CONTEXT:  PL/pgSQL function pgq.drop_queue(text) line 10 at RETURN

To unregister the consumer we can do the following:

test=# SELECT pgq.unregister_consumer('DemoQueue', 
  'Consume_1'),
unregister_consumer 
---------------------
                   1
(1 row)

test=# SELECT pgq.unregister_consumer('DemoQueue',
 'Consume_2'),
unregister_consumer 
---------------------
                   1
(1 row)

Now we can safely drop the queue.

test=# SELECT pgq.drop_queue('DemoQueue'),
drop_queue 
------------
          1
(1 row)

Using pgq for large projects

pgq has proven to be especially useful if you have to model a flow of messages that has to be transactional. The beauty of pgq is that you can put basically everything into a queue—you can decide freely on the type of messages and their format (as long as you are using text).

It is important to see that pgq is not just something that is purely related to replication—it has a much wider range and offers a solid technology base for countless applications.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.91.24