Chapter 17. Shared Memory

Modern operating systems enforce address space protection between processes. This means that the OS will not allow two distinct processes to write to each other's address space. Although this is what is needed in most cases, sometimes you may want your processes to share certain parts of their address space. Shared memory primitives allow you to do just this. In fact, when used correctly, shared memory can prove to be the fastest interprocess communication mechanism between collocated processes, especially if large amounts of data need to be shared.

Shared memory is not used only as an IPC mechanism. Shared memory primitives also allow you to work with files by mapping them into memory. This allows you to perform direct memory-based operations on files instead of using file I/O operations. This comes in handy when you want to provide for a simple way to persist a data structure. (In fact, ACE_Configuration can use this technique to provide persistence for your configuration.)

ACE provides several tools to assist in your shared memory adventures. For sharing memory between applications, ACE provides a set of allocators that allow you to allocate memory that is shared. In fact, you can use a shared memory allocator with the containers discussed in Chapter 5 to create containers in shared memory. For example, you could create a hash map in shared memory and share it across processes.

ACE also provides low-level wrappers around the OS shared memory primitives. These wrappers can be used to perform memory-mapped file operations.

17.1 ACE_Malloc and ACE_Allocator

When we talked about containers in Chapter 5, we saw how we could supply special-purpose allocators to the containers. These allocators were of type ACE_Allocator and were usually passed in during container construction. The container then used the allocator to manage any memory it needed to allocate and deallocate.

ACE also includes another family of allocators, based on the ACE_Malloc class template. Unlike the ACE_Allocator family, which is based on polymorphism, the ACE_Malloc family of classes are template based. The ACE_Malloc template takes two major parameters: a lock type and a memory pool type. The lock is used to ensure consistency of the allocator when used by multiple threads or processes. The memory pool is where the allocator obtains and releases memory from/to. To vary the memory allocation mechanism you want ACE_Malloc to use, you need to instantiate it with the right pool type.

ACE comes with several memory pools, including those that allocate shared memory and those that allocate regular heap memory. The various pools are detailed in Table 17.1.

As you can see, several pools are OS specific; make sure that you use a pool that is available on your OS. For example, to create an allocator that uses a pool that is based on a memory-mapped file, you would do something like this:


typedef ACE_Malloc <ACE_MMAP_Memory_Pool, ACE_SYNCH_MUTEX> MALLOC;

If your compiler does not support nested typedefs for template classes, you need to use an ACE-provided macro for the pool type to instantiate the template. The macro names are provided in Table 17.2. Using a macro, the sample example would be:


typedef ACE_Malloc<ACE_MMAP_MEMORY_POOL, ACE_SYNCH_MUTEX> MALLOC;

17.1.1 Map Interface

Besides a memory allocation interface that supports such operations as malloc(), calloc(), and free(), ACE_Malloc also supports a maplike interface. This interface is very similar to the interface supported by ACE_Map_Manager and ACE_Hash_Map_Manager, which we talked about in Chapter 5. This interface allows you to insert values in the underlying memory pool and associate them with a character string key. You can then retrieve the values you stored, using your key. The map also offers LIFO and FIFO iterators that iterate through the map in LIFO and FIFO order of insertion. In the next few sections, we will see several examples that use this interface.

Table 17.1. Memory Pool Types

image

17.1.2 Memory Protection Interface

In certain cases, the underlying shared memory pool allows you to change the memory protection level of individual pages in the memory pool. You could, for example, make certain pages read-only and others read/write or executable. To specify protection, ACE_Malloc offers several protect() methods that allow you to specify the protection attributes for various regions of its underlying memory pool. Note that protection is available only with certain pools, such as ACE_MMAP_Memory_Pool.

17.1.3 Sync Interface

If your memory pool is backed to a file, such as when using ACE_MMAP_Memory_Pool, you need a way to flush out the mapping file to disk at the appropriate times in your program. To allow this, ACE_Malloc includes a sync() method.

Table 17.2. Memory Pool Macro Names

image

17.2 Persistence with ACE_Malloc

Let's look at a simple example of using a shared memory allocator based on the ACE_MMAP_Memory_Pool class. One of the nice properties of the ACE_MMAP_Memory_Pool allocator is that it can be backed up by a file. That is, whatever you allocate using this allocator is saved to a backing file. As we mentioned earlier, you can use this mechanism to provide a simple persistence mechanism for your data. We will use this and the map interface to insert several records into a shared memory allocator with an ACE_MMAP_Memory_Pool.

Let's start by creating a few easy-to-use types that define the allocator type and iterator on that type. We also declare a global pointer to the allocator we are going to create:


#include "ace/Malloc_T.h"

typedef ACE_Malloc<ACE_MMAP_MEMORY_POOL, ACE_Null_Mutex>
  ALLOCATOR;
typedef ACE_Malloc_LIFO_Iterator <ACE_MMAP_MEMORY_POOL,
                                  ACE_Null_Mutex>
  MALLOC_LIFO_ITERATOR;

ALLOCATOR  *g_allocator;

Next, we instantiate the shared memory allocator. The only option we pass to the constructor is the name of the backing file where we wish to persist the records we will be adding to the allocator:



// Backing file where the data is kept.
#define BACKING_STORE "backing.store"

int ACE_TMAIN (int argc, ACE_TCHAR *[])
{
  ACE_NEW_RETURN (g_allocator,
                  ALLOCATOR (BACKING_STORE),
                  -1);
  if (argc > 1)
    {
      showRecords ();
    }
  else
    {
      addRecords ();
    }

  g_allocator->sync ();
  delete g_allocator;
  return 0;
}

The example needs to be run twice. The first time, you need to run it with no command line arguments; in that case, it will add new records to the memory pool. The second time, you need to run with at least one argument—anything will do, as we are looking only at the number of arguments in the command line—in which case, it will iterate through the inserted records and display them on the screen.

Next, let's look at the record type that we are inserting into memory:


class Record
{
public:
  Record (int id1, int id2, char *name)
    : id1_(id1), id2_(id2), name_(0)
  {
    size_t len = ACE_OS::strlen (name) + 1;
    this->name_ =
      ACE_reinterpret_cast (char *,
                            g_allocator->malloc (len));
    ACE_OS::strcpy (this->name_, name);
  }

  ~Record () { g_allocator->free (name_); }

  char* name(void) { return name_; }
  int id1 (void) { return id1_; }
  int id2 (void) { return id2_; }

private:
  int id1_;
  int id2_;
  char *name_;
};

The Record class has three data members: two simple integers and a char pointer, name_, that represents a string. This is where the tricky part comes in; if we allocate the record in shared memory, we would allocate just enough space for the two integers and the pointer. The pointer itself would be pointing somewhere out into heap space. This is definitely not what we want, as the next time the application is run, the pointer will have a value that points to heap space that does not exist anymore—a recipe for looming disaster.

To ensure that the value name_ is pointing to is in the shared memory pool, we explicitly allocate it by using the shared memory allocator in the constructor:


int addRecords ()
{
  char buf[32];

  for (int i = 0; i < 10; i++)
    {
      ACE_OS::sprintf (buf, "%s:%d", "Record", i);
      void *memory = g_allocator->malloc (sizeof (Record));
      if (memory == 0)
        ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p "),
                           ACE_TEXT ("Unable to malloc")),
                          -1);

      // Allocate and place record
      Record* newRecord = new (memory) Record (i, i+1, buf);
      if (g_allocator->bind (buf, newRecord) == -1)
        ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p "),
                           ACE_TEXT ("bind failed")),
                          -1);
    }

  return 0;
}

To add a record, we allocate enough memory for the record, using the shared memory allocator, and then use the placement new operator to “place” a new Record object into this memory. Next, we bind the record into the allocator, using its map interface. The key is the name of the record, and the value is the record itself. The mapping feature is not a good database, so for realistic needs it may be better to use an ACE_Hash_Map_Manager located in shared memory.


void showRecords ()
{
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("The following records were found: ")));
  {
    MALLOC_LIFO_ITERATOR iter (*g_allocator);

    for (void *temp = 0; iter.next (temp) != 0; iter.advance ())
      {
        Record *record =
          ACE_reinterpret_cast (Record *, temp);
        ACE_DEBUG ((LM_DEBUG,
                    ACE_TEXT ("Record name: %C|id1:%d|id2:%d "),
                    record->name (),
                    record->id1 (),
                    record->id2 ()));
      }
  }
}

Finally, we illustrate LIFO iteration and the fact that the records were indeed persisted by iterating through them and displaying them on the screen. When we run the example again, we get the following results:


The following records were found:
Record name: Record:9|id1:9|id2:10
Record name: Record:8|id1:8|id2:9
Record name: Record:7|id1:7|id2:8
Record name: Record:6|id1:6|id2:7
Record name: Record:5|id1:5|id2:6
Record name: Record:4|id1:4|id2:5
Record name: Record:3|id1:3|id2:4
Record name: Record:2|id1:2|id2:3
Record name: Record:1|id1:1|id2:2
Record name: Record:0|id1:0|id2:1

17.3 Position-Independent Allocation

In the previous example, we glossed over several important issues that arise when you are using shared memory. The first issue is that it may not be possible for the underlying shared memory pool of an allocator to be assigned the same base address in all processes that wish to share it or even every time you start the same process. What does all this mean? Here is an example.

Let's say that process A creates a shared memory pool that has a base address of 0x40000000. Process B opens up the same shared memory pool but maps it to address 0x7e000000. Process A then inserts a record into the pool at 0x400001a0. If process B attempts to obtain this record at this address, the record will not be there; instead, it is mapped at 0x7e0001a0 in its address space! This issue continues to get worse, as the record itself may have pointers to other records that are all in shared memory space, but none are accessible to process B, as the base address of the pool is different for each process.

In most cases, the operating system will return the same base address for a ACE_MMAP_Memory_Pool by default. This is why the previous example worked. If the OS did not assign the same base address to the allocator, the previous example would not work. Further, if the underlying memory pool needs to grow as you allocate more memory, the system may need to remap the pool to a different base address. This means that if you keep direct pointers into the shared region, they may be invalidated during operation. (This will occur only if you use the special ACE_MMAP_Memory_Pool::NEVER_FIXED option; we talk more about this and other options later.)

As usual, however, ACE comes to the rescue. ACE includes several classes that, when used together, allow you to perform position-independent memory allocation. These classes calculate offsets from the current base address and store them in shared memory. So the allocator knows that the record is located at an offset of 0x01a0 from the base address instead of knowing only that the record is at 0x400001a0. Of course, this comes with some overhead in terms of memory use and processing, but it allows you to write applications that you know will work with shared memory.

Let's modify our previous example to use position-independent allocation:


#include "ace/Malloc_T.h"
#include "ace/PI_Malloc.h"

typedef ACE_Malloc_T <ACE_MMAP_MEMORY_POOL,
                      ACE_Null_Mutex,
                      ACE_PI_Control_Block>
  ALLOCATOR;
typedef ACE_Malloc_LIFO_Iterator_T<ACE_MMAP_MEMORY_POOL,
                                   ACE_Null_Mutex,
                                   ACE_PI_Control_Block>
  MALLOC_LIFO_ITERATOR;

ALLOCATOR  *g_allocator;

We start by changing the typedef for our allocator. Instead of using ACE_Malloc, we use its base class, ACE_Malloc_T. This template includes one additional parameter, a control block type. A control block is allocated in the shared memory pool to provide bookkeeping information. Here, we specify that we want to use the position-independent control block, ACE_PI_Control_Block. This ensures that the find() and bind() operations will continue to work even if the underlying pool is mapped to different addresses in different runs or in different processes:


class Record
{
public:
  Record (int id1, int id2, char *name)
    : id1_(id1), id2_(id2)
  {
    size_t len = ACE_OS::strlen (name) + 1;
    char *buf =
      ACE_reinterpret_cast (char *,
                            g_allocator->malloc (len));
    ACE_OS::strcpy (buf, name);
    name_ = buf;
  }

  ~Record() { g_allocator->free (name_.addr ()); }

  char *name (void) { return name_; }
  int id1 (void) { return id1_; }
  int id2 (void) { return id2_; }

private:
  int id1_;
  int id2_;
  ACE_Based_Pointer_Basic<char> name_;
};

We also need to change our Record class a little. Instead of using a raw pointer for the name, we use a position-independent pointer embodied in the ACE_Based_Pointer_Basic class. This utility class calculates and keeps the offset of the name string instead of keeping the raw pointer to the string. If the underlying memory region is mapped to a different address, we will still get the right pointer for name_ because ACE_Based_Pointer will recalculate the pointer for different base addresses. This class also overloads several useful operators, including (), which for the most part allow you to treat name_ as a regular pointer.

To illustrate that this works, we explicitly map the allocator to a different base address when we are adding rather than merely showing records. To achieve this, we use the ACE_MMAP_Memory_Pool_Options class:


// Backing file where the data is kept.
#define BACKING_STORE "backing2.store"

int ACE_TMAIN (int argc, ACE_TCHAR *[])
{
  if (argc > 1)
    {
      ACE_MMAP_Memory_Pool_Options options
        (ACE_DEFAULT_BASE_ADDR,
         ACE_MMAP_Memory_Pool_Options::ALWAYS_FIXED);
      ACE_NEW_RETURN (g_allocator,
                      ALLOCATOR (BACKING_STORE,
                                 BACKING_STORE,
                                 &options),
                      -1);
      ACE_DEBUG ((LM_DEBUG,
                  ACE_TEXT ("Mapped to base address %@ "),
                  g_allocator->base_addr ()));

      showRecords ();
    }
  else
    {
      ACE_MMAP_Memory_Pool_Options options
        (0, ACE_MMAP_Memory_Pool_Options::NEVER_FIXED);
      ACE_NEW_RETURN (g_allocator,
                      ALLOCATOR (BACKING_STORE,
                                 BACKING_STORE,
                                 &options),
                      -1);
      ACE_DEBUG ((LM_DEBUG,
                  ACE_TEXT ("Mapped to base address %@ "),
                  g_allocator->base_addr ()));
      addRecords();
    }

  g_allocator->sync ();
  delete g_allocator;
  return 0;
}

ACE_MMAP_Memory_Pool_Options allows us to specify various options, including the base address and whether we want the OS to map to a fixed address. Other options are described in Table 17.3.

When we are adding records, we ask the OS to map the address to any address it pleases; however, when we are showing records, we map to a fixed base address. When you run the example, you can see that the allocator is mapped to a different address each time it is run; even though this occurs, the program works flawlessly.

17.4 ACE_Malloc for Containers

As you know, you can specify special-purpose allocators for containers. The container then uses the allocator to satisfy its memory needs. Wouldn't it be nice if we could use an ACE_Malloc shared memory allocator with the container and allocate containers in shared memory? This would mean that the container would allocate memory for itself in shared memory. The problem, of course, is that the allocator needs to implement the ACE_Allocator interface, but ACE_Malloc doesn't.

No need to fret, though, because ACE includes a special adapter class for this purpose. ACE_Allocator_Adapter adapts an ACE_Malloc-based class to the ACE_Allocator interface. This class template is very easy to use. To create the adapter, simply pass in the appropriate ACE_Malloc type. For example:


typedef ACE_Malloc<ACE_MMAP_MEMORY_POOL, ACE_Null_Mutex> MALLOC;
typedef ACE_Allocator_Adapter<MALLOC> ALLOCATOR;

Besides the issue of interface compatibility, another issue crops up. Most of the container classes keep a reference to the allocator they use and use this reference for all memory operations. This reference will point to heap memory, where the shared memory allocator itself is allocated. Of course, this pointer is valid only in the original process that created the allocator and not any other processes that wish to share the container. To overcome this problem, you must provide the container with a valid memory allocator reference for all its operations. As an example, ACE overloads the ACE_Hash_Map container—the new class is ACE_Hash_Map_With_Allocator—to provide this, and you can easily extend the idea to any other containers you wish to use.

Table 17.3. ACE_MMAP_Memory_Pool_Options Attributes

image

Finally, as most, if not all, ACE containers contain raw pointers, you cannot expect to use them between processes that map them to different base addresses. ACE does not include any container class that uses the position-independent pointers we showed you earlier, although you could easily create one on your own. Therefore, if you are going to use a container in shared memory, you must make sure that you can map the entire container into all sharing processes at the same base address.

17.4.1 Hash Map

Now let's get down to an example. We are going to put a hash map into shared memory. We will have a parent process add records into the hash table and have two other worker processes consume these records and remove them from the map. To ensure the map consistency, we create an ACE_Process_Mutex and use it to serialize access to the map:


#include "ace/Hash_Map_With_Allocator_T.h"
#include "ace/Malloc_T.h"
#include "ace/PI_Malloc.h"
#include "ace/Process_Mutex.h"
#include "ace/Process.h"

#define BACKING_STORE "map.store"
#define MAP_NAME "records.db"

#include "Record.h"

typedef ACE_Allocator_Adapter<ACE_Malloc_T <ACE_MMAP_MEMORY_POOL,
                                            ACE_Process_Mutex,
                                            ACE_Control_Block>
                             > ALLOCATOR;
typedef ACE_Hash_Map_With_Allocator<int, Record> MAP;

ACE_Process_Mutex coordMutex("Coord-Mutex");

We start by creating a few convenient type definitions and creating the coordination mutex globally. We create a position-independent allocator and use ACE_Allocator_Adapter to adapt the interface to ACE_Allocator. We define a hash map with simple integer keys and Record values.

Now let's take a quick look at a minor change to the Record class:


class Record
{
public:
  Record () { }
  ~Record () { }

  Record (const Record& rec)
    : id1_(rec.id1_), id2_(rec.id2_)
  {
    ACE_OS::strcpy (recName_, rec.name_);
    this->name_ = recName_;
  }
  Record (int id1, int id2, char *name)
    : id1_(id1), id2_(id2)
  {
    ACE_OS::strcpy (recName_, name);
    this->name_ = recName_;
  }
  char *name (void) { return recName_; }
  int id1 (void) { return id1_; }
  int id2 (void) { return id2_; }

private:
  int id1_;
  int id2_;
  char recName_[128];
  ACE_Based_Pointer_Basic<char> name_;
};

We have written a copy constructor for the Record class, as the map requires this. During a copy, we make a deep copy of the record name. This allows the container to safely delete the memory it allocates for a Record object during an unbind. To simplify managing the name, we've also changed from allocating it separately to storing it in a member array, recName_.

Next, let's look at how we create and place the map into our allocator:


MAP* smap (ALLOCATOR *shmem_allocator)
{
  void *db = 0;
  if (shmem_allocator->find (MAP_NAME, db) == 0)
    return (MAP *) db;
  size_t hash_table_size = sizeof (MAP);
  void *hash_map = shmem_allocator->malloc (hash_table_size);
  if (hash_map == 0)
    return 0;
  new (hash_map) MAP (hash_table_size, shmem_allocator);
  if (shmem_allocator->bind (MAP_NAME, hash_map) == -1)
    {
      ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p "),
                  ACE_TEXT ("allocate_map")));
      shmem_allocator->remove ();
      return 0;
    }
  return (MAP*)hash_map;
}

Because the map will be shared among processes, we have written a small routine that helps us find the map if it already exists or creates a new one if it has not been created yet. This helper function assumes that the caller already has control of the coordinating mutex; otherwise, you might leak a map.

First, we look for the map in the allocator, using the convenient find() method. If the map is not found, we allocate memory for a new map and use the placement new operator to place a map in this memory. We then associate it with the key MAP_NAME so that in the future, it will be found there by other processes:


int handle_parent (char *cmdLine)
{
  ACE_TRACE (ACE_TEXT ("::handle_parent"));

  ALLOCATOR * shmem_allocator = 0;
  ACE_MMAP_Memory_Pool_Options options
    (ACE_DEFAULT_BASE_ADDR,
     ACE_MMAP_Memory_Pool_Options::ALWAYS_FIXED);

  ACE_NEW_RETURN
    (shmem_allocator,
     ALLOCATOR (BACKING_STORE, BACKING_STORE, &options),
     -1);

  MAP *map = smap (shmem_allocator);

  ACE_Process processa, processb;
  ACE_Process_Options poptions;
  poptions.command_line("%s a", cmdLine);
  {
    ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon,
                      coordMutex, -1);
    ACE_DEBUG ((LM_DEBUG,
                ACE_TEXT ("(%P|%t) Map has %d entries "),
                map->current_size ()));
    ACE_DEBUG ((LM_DEBUG,
                ACE_TEXT ("In parent, map is located at %@ "),
                map));

    // Then have the child show and eat them up.
    processa.spawn (poptions);

    // First append a few records.
    addRecords (map, shmem_allocator);
  }
  {
    ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon,
                      coordMutex, -1);

    // Add a few more records..
    addRecords (map, shmem_allocator);

    // Let's see what's left.
    ACE_DEBUG ((LM_DEBUG,
                ACE_TEXT ("(%P|%t) Parent finished adding, ")
                ACE_TEXT ("map has %d entries "),
                map->current_size ()));

    // Have another child try to eat them up.
    processb.spawn (poptions);
  }

  processa.wait ();
  processb.wait ();

  // No processes are left and we don't want to keep the data
  // around anymore; it's now safe to remove it.
  // !!This will remove the backing store.!!
  shmem_allocator->remove ();
  delete shmem_allocator;
  return 0;
}

int ACE_TMAIN (int argc, ACE_TCHAR *argv[])
{
  if (argc == 1) // parent
    ACE_ASSERT (handle_parent (argv[0]) == 0);
  else
    ACE_ASSERT (handle_child () == 0);

  return 0;
}

When the program is started, it will call the handle_parent() function. First, we create the shared memory allocator on the heap. Next, we acquire the coordinating mutex and add a few records into the map and then start a child process to process these records. Because we still hold the coordinating mutex, the child process will not be able to process the records until the guard goes out of scope. After we release the mutex, we once again try to acquire it and add further records; we also spawn a second child to finish processing these records:


int addRecords(MAP *map, ALLOCATOR *shmem_allocator)
{
  ACE_TRACE (ACE_TEXT ("::addRecords"));

  char buf[32];
  int mapLength = ACE_static_cast (int, map->current_size ());
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("Map has %d entries; adding 20 more "),
              mapLength));

  for (int i = mapLength ; i < mapLength + 20; i++)
    {
      ACE_OS::sprintf (buf, "%s:%d", "Record", i);

      // Allocate new record on stack;
      Record newRecord (i, i+1, buf);
      ACE_DEBUG ((LM_DEBUG,
                  ACE_TEXT ("Adding a record for %d "), i));

      int result = map->bind (i, newRecord, shmem_allocator);
      if (result == -1)
        ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p "),
                           ACE_TEXT ("bind failed")), -1);
    }

  return 0;
}

The addRecords() routine is simple enough. All we do is create a new record on the stack and then bind it into the underlying map. We use a special bind() method here that takes a pointer to the allocator in addition to the key/value pair. As we explained earlier, we must specify the allocator whenever we use the hash map. The internal reference the hash map keeps is valid only in the process that creates the hash map; in this case, the parent process is that process, and this is not strictly necessary, but to avoid errors, you should always follow this rule.

When we bind the record into the hash map, the map will use the allocator to create a copy of the record in shared memory. Now let's look at how the child process will process and then remove the record from the map:



int handle_child (void)
{
  ACE_TRACE (ACE_TEXT ("::handle_child"));

  ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, coordMutex, -1);

  ALLOCATOR * shmem_allocator = 0;
  ACE_MMAP_Memory_Pool_Options options
    (ACE_DEFAULT_BASE_ADDR,
     ACE_MMAP_Memory_Pool_Options::ALWAYS_FIXED);

  ACE_NEW_RETURN (shmem_allocator,
                  ALLOCATOR (BACKING_STORE,
                             BACKING_STORE,
                             &options),
                  -1);

  MAP *map = smap (shmem_allocator);

  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("(%P|%t) Map has %d entries "),
              map->current_size ()));
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("In child, map is located at %@ "),
              map));

  processRecords (map, shmem_allocator);
  shmem_allocator->sync ();
  delete shmem_allocator;

  return 0;
}

Unlike the parent, the child first acquires the coordinating mutex and then creates the shared memory allocator. As it is going to dereference records that are created by the parent, the child must be sure that the underlying pool does not grow after it has created an allocator and mapped the pool into its address space. If not, the following scenario could occur.

  1. The child creates the allocator whose underlying map is, for example, 16K.
  2. The parent continues to add records, causing the map to grow to 20K.
  3. The child gets the coordinating mutex and starts accessing all the records in the map.
  4. The child reaches a record that lies outside its mapping, that is, between the 16K and 20K region. The child dereferences it and receives an address exception.
  5. The parent process did not have to worry about this, as it knows that the child never causes the map to grow.

We have solved this problem by locking everyone else out of the entire map. ACE offers another solution, which we talk about in Section 17.4.2. First, let's see how the child processes the records:


int processRecords (MAP *map, ALLOCATOR *shmem_allocator)
{
  ACE_TRACE (ACE_TEXT ("::processRecords"));

  size_t mapLength = map->current_size ();
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("(%P|%t) Found %d records "),
              mapLength));

  int *todelete = new int[mapLength];
  int i = 0;

  for (MAP::iterator iter = map->begin ();
      iter != map->end ();
      iter++)
    {
      int key = (*iter).ext_id_;
      ACE_DEBUG ((LM_DEBUG,
                  ACE_TEXT ("(%P|%t) [%d] Preprocessing %d:%@ "),
                  i+1, key, &(*iter).ext_id_));

      todelete[i++] = key;    // Mark message for deletion.

      // Illustrate the find feature of the map.
      Record record;
      int result = map->find (key, record, shmem_allocator);
      if (result == -1)
        ACE_DEBUG ((LM_ERROR,
                    ACE_TEXT ("Could not find record for %d "),
                    key));
      else
        ACE_DEBUG ((LM_DEBUG,
                    ACE_TEXT ("Record name: %C|id1:%d|id2:%d "),
                    record.name (), record.id1(), record.id2()));
    }
  // Delete everything we processed.
  for (int j = 0; j < i ; j++)
    {
      int result = map->unbind (todelete[j],
                                shmem_allocator);
      if (result == -1)
        ACE_ERROR_RETURN ((LM_ERROR,
                           ACE_TEXT ("Failed on key %d: %p "),
                           ACE_TEXT ("unbind"),
                           todelete[j]),
                          -1);
      else
        ACE_DEBUG ((LM_INFO,
                    ACE_TEXT ("Fully processed and removed %d "),
                    j));
    }

  delete [] todelete;

  return 0;
}

Processing the records involves iterating through the map, collecting all the record IDs that need to removed, and then unbinding them. Note that the find() and unbind() methods both take an additional allocator argument, just like the bind() method did earlier.

Although this example was a little more involved, it illustrates two issues that you need to be aware of when you are using shared memory.

  1. You must first realize that this example depends on the fact that all processes—parent and child—manage to map the shared memory pool to the same base address. If you cannot achieve this, the pointers in the hash map will be invalid.
  2. If one process causes the underlying memory pool to grow—in this case, the parent does this by calling addRecords()—this does not grow the pool in the other processes—in this case, the children calling processRecords(). When the child accesses a record that is not in range, dereferencing it will cause a fault.

17.4.2 Handling Pool Growth

You can handle the memory pool growth in three ways.

  1. You can make sure that it does not happen, by allocating enough memory to start with, thus preventing the problem from occurring. If this is possible, it is perhaps the easiest, most portable, and most efficient thing to do. However, this approach can limit scalability.
  2. You can make sure that the child process maps the pool in and uses it only when sure that the pool will not grow after it has been mapped, similar to the previous example. The problem here is that you lose all opportunities for concurrent read/write access to the entire pool.
  3. You can use your OS exception-handling mechanism in conjunction with the ACE-provided remap() functionality to remap the pool so that the faulting address is now mapped into the process. OS exception handling comes in two flavors: UNIX-based signal handling and Windows structured exception handling (SEH).

For those of you lucky enough to be using a platform with UNIX signals, ACE will automatically install a signal handler for SIGSEGV for you. This will cause the remapping to occur transparently. If you are using Windows, you will have to handle the exception yourself, using structured exception handling.

In this next example, we show you how to handle pool growth using Windows structured exception handling. In this example, we also extend the same idiom ACE uses to create ACE_Hash_Map_With_Allocator to build our own Unbounded_Queue class that will work with shared memory. The program will start and spawn two child processes. The parent will then add messages into a queue, and the child processes will continuously dequeue them from the queue until they get a special termination message, at which point the child processes will exit.

First, let's look at the Unbounded_Queue type that we have created:


template <class T>
class Unbounded_Queue : public ACE_Unbounded_Queue<T>
{
public:
  typedef ACE_Unbounded_Queue<T> BASE;

  Unbounded_Queue(ACE_Allocator* allocator)
    : ACE_Unbounded_Queue<T> (allocator)
  { }
  int enqueue_tail (const T &new_item, ACE_Allocator* allocator)
  {
    this->allocator_ = allocator;
    return BASE::enqueue_tail (new_item);
  }

  int dequeue_head (T &item, ACE_Allocator* allocator)
  {
    this->allocator_ = allocator;
    return BASE::dequeue_head (item);
  }

  void delete_nodes (ACE_Allocator* allocator)
  {
    this->allocator_ = allocator;
    delete_nodes ();
  }
};

This simple data type overloads the enqueue and dequeue methods to allow us to specify the shared memory allocator in each method. All we do is reassign the allocator pointer to be sure that the queue uses a valid memory allocator instead of using the invalid allocator pointer it has in shared memory.

Let's look at the parent's actions first:


int handle_parent (char *cmdLine)
{
  ALLOCATOR *shmem_allocator = 0;
  ACE_MMAP_Memory_Pool_Options options
    (ACE_DEFAULT_BASE_ADDR,
     ACE_MMAP_Memory_Pool_Options::ALWAYS_FIXED);

  // Create the allocator.
  ACE_NEW_RETURN (shmem_allocator,
                  ALLOCATOR (BACKING_STORE,
                             BACKING_STORE,
                             &options),
                  -1);

  ACE_Process processa, processb;
  ACE_Process_Options poptions;
  poptions.command_line ("%s a", ACE_TEXT_ALWAYS_CHAR (cmdLine));
  processa.spawn (poptions);
  processb.spawn (poptions);

  // Make sure the child does map a partial pool in memory.
  ACE_OS::sleep (2);

  for (int i = 0; i < 100; i++)
    sendRecord (i, shmem_allocator);
  sendRecord (-1, shmem_allocator);

  processa.wait ();
  processb.wait ();
  shmem_allocator->remove ();
  return 0;
}

When the parent starts, it first allocates the shared memory pool and spawns two child processes. At this point, it waits for a bit, just to make sure that each one of the children has mapped in the pool at this point, where it has no queue or messages on the queue. This ensures that after the parent adds messages to the queue, the memory pool will need to grow. The parent then quickly places 100 records on the queue, sends a termination message, and waits for the children to exit.

Now, let's look at the child's actions:


int handle_child (void)
{
  ALLOCATOR *shmem_allocator = 0;
  ACE_MMAP_Memory_Pool_Options options
    (ACE_DEFAULT_BASE_ADDR,
     ACE_MMAP_Memory_Pool_Options::ALWAYS_FIXED);
  ACE_NEW_RETURN (shmem_allocator,
                  ALLOCATOR (BACKING_STORE,
                             BACKING_STORE,
                             &options),
                  -1);
  g_shmem_allocator = shmem_allocator;

#if defined (WIN32)
  while (processWin32Record (shmem_allocator) != -1)
    ;
#else
  while (processRecord (shmem_allocator) != -1)
    ;
#endif
  return 0;
}

On start-up, the child goes into the handle_child() method, which creates the shared memory allocator, and then loops through processing records. Note that we have elected to use polling here; in a realistic application, you would probably want to place a condition variable in shared memory and use that to notify the child when it is appropriate to read.

When running on Windows, we call a special processWin32Record() function, which uses structured exception handling to handle the remapping case. Let's first take a quick look at processRecord():


int processRecord (ALLOCATOR *shmem_allocator)
{
  ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, coordMutex, -1);

  QUEUE* queue = squeue (shmem_allocator);
  if (queue == 0)
    {
      delete shmem_allocator;
      ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p "),
                         ACE_TEXT ("Could not obtain queue")),
                        -1);
    }

  if (queue->is_empty ())  // Check for anything to process.
    return 0;

  Record record;
  if (queue->dequeue_head (record, shmem_allocator) == -1)
    {
      ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p "),
                         ACE_TEXT ("dequeue_head ")),
                        -1);
    }

  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("(%P|%t) Processing record|name: %C")
              ACE_TEXT ("|Record id1:%d|Record id2:%d "),
              record.name (), record.id1 (), record.id2 ()));
  if (record.id1 () == -1)
    queue->enqueue_tail (record, shmem_allocator);
  return record.id1 ();
}

All we do here is get the queue, check whether any messages are on it, and dequeue the message. If the message was a termination message, we put it back on the queue so that any other children can pick it up and process it.

Finally, we get to processWin32Record():


#if defined(WIN32)

int handle_remap (EXCEPTION_POINTERS *ep)
{
  ACE_DEBUG ((LM_INFO, ACE_TEXT ("Handle a remap ")));

  DWORD ecode = ep->ExceptionRecord->ExceptionCode;
  if (ecode != EXCEPTION_ACCESS_VIOLATION)
    return EXCEPTION_CONTINUE_SEARCH;

  void *addr =
    (void *) ep->ExceptionRecord->ExceptionInformation[1];
  if (g_shmem_allocator->alloc().memory_pool().remap (addr) == -1)
    return EXCEPTION_CONTINUE_SEARCH;
#if __X86__
  // This is 80x86-specific.
  ep->ContextRecord->Edi = (DWORD) addr;
#elif __MIPS__
  ep->ContextRecord->IntA0 =
    ep->ContextRecord->IntV0 = (DWORD) addr;
  ep->ContextRecord->IntT5 =
    ep->ContextRecord->IntA0 + 3;
#endif /* __X86__ */

  return EXCEPTION_CONTINUE_EXECUTION;
}

int processWin32Record (ALLOCATOR *shmem_allocator)
{
  ACE_SEH_TRY
  {
    return processRecord (shmem_allocator);
  }

  ACE_SEH_EXCEPT (handle_remap (GetExceptionInformation ()))
  { }

  return 0;
}
#endif /*WIN32*/

Here, we place processRecord() in an SEH _try/_except clause, wrapped by ACE_SEH_TRY and ACE_SEH_EXCEPT. If a fault occurs, the handle_remap() SEH selector is called. This checks whether an EXCEPTION_ACCESS_VIOLATION occurred and, if so, finds the faulting address and uses the allocator's remap() feature to map the faulting address into its pool. Once the remap() returns, we return EXCEPTION_CONTINUE_EXECUTION, causing the program to continue normally.

17.5 Wrappers

Besides the more advanced features provided by ACE_Malloc and friends, you may want to do something much simpler. For example, mapping files into memory is a common technique many web servers use to reduce the time it takes to send high hit-rate files back to client browsers.

ACE provides several wrapper classes that wrap lower-level shared memory primitives, such as mmap(), MapViewOfFileEx(), System V shared memory segments, and so on. We take a brief look at ACE_Mem_Map, which is a wrapper around the memory-mapping primitives of the OS.

In this next example, we rewrite the classic Richard Stevens example of copying files [8], using memory mapping. We map both source and destination files into memory and then use a simple memcpy() call to copy source to destination:


int ACE_TMAIN (int argc, ACE_TCHAR *argv[])
{
  ACE_HANDLE srcHandle = ACE_OS::open (argv[1], O_RDONLY);
  ACE_ASSERT(srcHandle != ACE_INVALID_HANDLE);

  ACE_Mem_Map srcMap (srcHandle, -1, PROT_READ, ACE_MAP_PRIVATE);
  ACE_ASSERT(srcMap.addr () != 0);

  ACE_Mem_Map destMap (argv[2],
                       srcMap.size (),
                       O_RDWR | O_CREAT,
                       ACE_DEFAULT_FILE_PERMS,
                       PROT_RDWR,
                       ACE_MAP_SHARED);
  ACE_ASSERT(destMap.addr () != 0);

  ACE_OS::memcpy (destMap.addr (),
                  srcMap.addr (),
                  srcMap.size ());
  destMap.sync ();

  srcMap.close ();
  destMap.close ();
  return 0;
}

We create two ACE_Mem_Map instances that represent the memory mappings of both the source and destination files. To keep things interesting, we map the source by explicitly opening the file ourselves in read-only mode and then supplying the handle to srcMap. However, we let ACE_Mem_Map open the destination file for us in read/write/create mode for us. The PROT_READ and PROT_RDWR specify the protection mode for the pages that are mapped into memory. Here, the source file memory-mapped pages can only be read from, whereas the destination pages can be both read and written to. Finally, we specify the sharing mode as ACE_MAP_PRIVATE for the source file and ACE_MAP_SHARED for the destination. ACE_MAP_PRIVATE indicates that if any changes are made to the in-memory pages, the changes will not be propagated back to the backing store or to any other processes that have the same file mapped into memory. ACE_MAP_SHARED implies that changes are shared and will be seen in the backing store and in other processes.

In many cases, it is not feasible to map an entire file into memory at once. Instead, you can use ACE_Mem_Map to map chunks of the file into memory. You can then operate on the chunk, release it when you are done with it, and map the next chunk. To do this, you must specify the size of the chunk you want to map and the offset into the file where the chunk will begin.1

17.6 Summary

In this chapter, we reviewed the ACE_Malloc family of allocators. In particular, we talked about the shared memory pools you can use with ACE_Malloc. First, we showed how you can use an ACE_Malloc allocator to build a simple persistence mechanism. We then showed how you can use position-independent pointers to build portable applications that will work no matter where an allocator pool is mapped to in virtual memory. Next, we showed how to adapt the ACE_Malloc class to the ACE_Allocator interface and use it with the containers that are supplied with ACE. We also identified the problems that come with dynamic shared memory pool growth and explained how you can tackle them. Finally, we mentioned a few of the wrapper classes that ACE provides to deal with shared memory.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.129.70.157