Globus API C++ wrappers

The following examples show some C++ wrappers for some common Globus services.

ITSO_GASS_TRANSFER

This class provides methods to easily transfer a file from one location to another. The GLOBUS_FILE class is use to refer to a locally stored file, and GLOBUS_URL is used to refer to a remotely stored file that can be reached by either http, https, or gsiftp protocol.

The method setURL() of the GLOBUS_URL class is used to define the URL.

The method Transfer() of the ITSO_GASS_TRANSFER class executes the transfer, and the two arguments are respectively the source file and the destination file. The two arguments can either be of GLOBUS_FILE or GLOBUS_URL type.

The Transfer() is non-blocking, so the Wait() method should be called later in the code to wait for the completion of the transfer.

itso_gass_copy.h
#ifndef ITSO_GASS_COPY_H
#define ITSO_GASS_COPY_H
#include "globus_common.h"
#include "globus_gass_copy.h"
#include "itso_cb.h"
#include <cstdlib>
#include <cstdio>
#include <iostream>
#include <cstring>
#include <string>


class GLOBUS_FILE {
   globus_io_handle_t *io_handle;
   int       file_fd;
      public:
   GLOBUS_FILE();
   GLOBUS_FILE(char* );
   ~GLOBUS_FILE();
   globus_io_handle_t * GLOBUS_FILE::get_globus_io_handle();
};

class GLOBUS_URL {
   globus_url_t url;
   globus_gass_copy_url_mode_t url_mode;
   char*        URL;
   public:
      GLOBUS_URL();
      ~GLOBUS_URL();
      bool setURL(char* destURL);
      bool setURL(string destURL);
      globus_gass_copy_url_mode_t getMode();
      char* getScheme();
      char* getURL();
};

class ITSO_GASS_TRANSFER_EXCEPTION { };

class ITSO_GASS_TRANSFER : public ITSO_CB {
   globus_gass_copy_handle_t          gass_copy_handle;
   globus_gass_copy_handleattr_t      gass_copy_handleattr;
   globus_gass_transfer_requestattr_t*dest_gass_attr;
   globus_gass_copy_attr_t dest_gass_copy_attr;
   globus_gass_transfer_requestattr_t*source_gass_attr;
   globus_gass_copy_attr_t source_gass_copy_attr;
   globus_gass_copy_url_mode_t source_url_mode;
   globus_gass_copy_url_mode_t dest_url_mode;
   globus_ftp_client_operationattr_t*dest_ftp_attr;
   globus_ftp_client_operationattr_t*source_ftp_attr;
   void setSource(GLOBUS_URL& );
   void setDestination(GLOBUS_URL& );
   public:
   ITSO_GASS_TRANSFER();
   ~ITSO_GASS_TRANSFER();
   void Transfer(GLOBUS_FILE& , GLOBUS_URL& );
   void Transfer(GLOBUS_URL&,GLOBUS_FILE&);
   void Transfer(GLOBUS_URL& ,GLOBUS_URL& );
};


#endif

itso_gass_copy.c
/*************************************************
 * For a more complete example
 * see globus-url-copy.c
 *************************************************/
#include "itso_gass_copy.h"

GLOBUS_FILE::GLOBUS_FILE() {};
GLOBUS_FILE::GLOBUS_FILE(char* filename) {
   io_handle =(globus_io_handle_t *)
globus_libc_malloc(sizeof(globus_io_handle_t));
   file_fd=open(filename,O_RDONLY);
       /* convert file  into a globus_io_handle */
       globus_io_file_posix_convert(file_fd,
                                    GLOBUS_NULL,
                                   io_handle);
};

GLOBUS_FILE::~GLOBUS_FILE(){
   close(file_fd);
   globus_libc_free(io_handle);
};

globus_io_handle_t * GLOBUS_FILE::get_globus_io_handle() {
   return io_handle;
};

GLOBUS_URL::GLOBUS_URL() {};
GLOBUS_URL::~GLOBUS_URL() {
   free(URL);
};
bool GLOBUS_URL::setURL(char* destURL) {
    //check if this is a valid URL
    if (globus_url_parse(destURL, &url) != GLOBUS_SUCCESS) {
              cerr << "can not parse destURL" << destURL << endl;
        return false;
    }
    //determine the transfer mode
    if (globus_gass_copy_get_url_mode(destURL, &url_mode) != GLOBUS_SUCCESS) {
         cerr << "failed to determine mode fmeor destURL" << destURL << endl;
         return false;
    };
    URL=strdup(destURL);
    return true;
};

bool GLOBUS_URL::setURL(string url) {
   return setURL(const_cast<char*>(url.c_str()));
}

globus_gass_copy_url_mode_t GLOBUS_URL::getMode() {
   return url_mode;
};

char* GLOBUS_URL::getScheme() {
   return url.scheme;
}
char* GLOBUS_URL::getURL() {
   return URL;
}

//***********************************************************
//
//***********************************************************
namespace itso_gass_copy {
static void
url_copy_callback(
          void * callback_arg,
          globus_gass_copy_handle_t * handle,
          globus_object_t * error)
{
            globus_bool_t         use_err = GLOBUS_FALSE;
       ITSO_CB* monitor = (ITSO_CB*)  callback_arg;

            if (error != GLOBUS_SUCCESS)
       {
            cerr << " url copy error:" <<
globus_object_printable_to_string(error) << endl;
            //monitor->setError(error);
         }
      monitor->setDone();
       return;
};
}

ITSO_GASS_TRANSFER::ITSO_GASS_TRANSFER() {
   // handlers initialisation
   // first the attributes
   // then the handler
   globus_gass_copy_handleattr_init(&gass_copy_handleattr);
   globus_gass_copy_handle_init(&gass_copy_handle, &gass_copy_handleattr);
};

ITSO_GASS_TRANSFER::~ITSO_GASS_TRANSFER() {
   globus_gass_copy_handle_destroy(&gass_copy_handle);
   if (source_url_mode == GLOBUS_GASS_COPY_URL_MODE_FTP)
      globus_libc_free(source_ftp_attr);
   if (dest_url_mode == GLOBUS_GASS_COPY_URL_MODE_FTP)
      globus_libc_free(dest_ftp_attr);
   if (source_url_mode == GLOBUS_GASS_COPY_URL_MODE_GASS)
      globus_libc_free(source_gass_attr);
   if (dest_url_mode == GLOBUS_GASS_COPY_URL_MODE_GASS)
      globus_libc_free(dest_gass_attr);
}

void ITSO_GASS_TRANSFER::setSource(GLOBUS_URL& source_url) {
   globus_gass_copy_attr_init(&source_gass_copy_attr);
   source_url_mode=source_url.getMode();
   if (source_url_mode == GLOBUS_GASS_COPY_URL_MODE_FTP) {
      source_ftp_attr = (globus_ftp_client_operationattr_t*)
globus_libc_malloc (sizeof(globus_ftp_client_operationattr_t));

      globus_ftp_client_operationattr_init(source_ftp_attr);
      globus_gass_copy_attr_set_ftp(&source_gass_copy_attr,
                   source_ftp_attr);
   }
   else if (source_url_mode == GLOBUS_GASS_COPY_URL_MODE_GASS) {
      source_gass_attr = (globus_gass_transfer_requestattr_t*)
globus_libc_malloc (sizeof(globus_gass_transfer_requestattr_t));

globus_gass_transfer_requestattr_init(source_gass_attr,source_url.getScheme());
      globus_gass_copy_attr_set_gass(&source_gass_copy_attr,
source_gass_attr);
      globus_gass_transfer_requestattr_set_file_mode(
             source_gass_attr,
             GLOBUS_GASS_TRANSFER_FILE_MODE_BINARY);
      globus_gass_copy_attr_set_gass(&source_gass_copy_attr,
                                source_gass_attr);
  };
};


void ITSO_GASS_TRANSFER::setDestination(GLOBUS_URL& dest_url) {
   globus_gass_copy_attr_init(&dest_gass_copy_attr);
   dest_url_mode=dest_url.getMode();
   if (dest_url_mode == GLOBUS_GASS_COPY_URL_MODE_FTP) {
      dest_ftp_attr = (globus_ftp_client_operationattr_t*)globus_libc_malloc
(sizeof(globus_ftp_client_operationattr_t));
      globus_ftp_client_operationattr_init(dest_ftp_attr);
      globus_gass_copy_attr_set_ftp(&dest_gass_copy_attr,
                   dest_ftp_attr);
   }
   else if (dest_url_mode == GLOBUS_GASS_COPY_URL_MODE_GASS) {
      dest_gass_attr = (globus_gass_transfer_requestattr_t*)globus_libc_malloc
(sizeof(globus_gass_transfer_requestattr_t));
       globus_gass_transfer_requestattr_init(dest_gass_attr,
dest_url.getScheme());
      globus_gass_copy_attr_set_gass(&dest_gass_copy_attr, dest_gass_attr);
      globus_gass_transfer_requestattr_set_file_mode(
             dest_gass_attr,
             GLOBUS_GASS_TRANSFER_FILE_MODE_BINARY);
      globus_gass_copy_attr_set_gass(&dest_gass_copy_attr,
                                dest_gass_attr);
   };
};

void ITSO_GASS_TRANSFER::Transfer(GLOBUS_FILE& globus_source_file, GLOBUS_URL&
destURL) {
      setDestination(destURL);
globus_result_t result = globus_gass_copy_register_handle_to_url(
         &gass_copy_handle,
                     globus_source_file.get_globus_io_handle(),
               destURL.getURL(),
              &dest_gass_copy_attr,
         itso_gass_copy::url_copy_callback,
                     (void *) this );
};

void ITSO_GASS_TRANSFER::Transfer(GLOBUS_URL& sourceURL,GLOBUS_FILE&
globus_dest_file) {
      setSource(sourceURL);
globus_result_t result = globus_gass_copy_register_url_to_handle(
          &gass_copy_handle,
          sourceURL.getURL(),
              &source_gass_copy_attr,
                      globus_dest_file.get_globus_io_handle(),
          itso_gass_copy::url_copy_callback,
                      (void *) this );
};

void ITSO_GASS_TRANSFER::Transfer(GLOBUS_URL& sourceURL,GLOBUS_URL& destURL) {
      setSource(destURL);
      setDestination(destURL);
globus_result_t result = globus_gass_copy_register_url_to_url(
                    &gass_copy_handle,
                       sourceURL.getURL(),
                &source_gass_copy_attr,
                       destURL.getURL(),
                       &dest_gass_copy_attr,
          itso_gass_copy::url_copy_callback,
                        (void *) this);
};

ITSO_GLOBUS_FTP_CLIENT

A wrapper for GridFTP capabilities.

itso_globus_ftp_client.h
#ifndef ITSO_ITSO_GLOBUS_FTP_CLIENT_H
#define ITSO_ITSO_GLOBUS_FTP_CLIENT_H
#include <cstdio>
#include <iostream>
#include "globus_ftp_client.h"
#include "itso_cb.h"

#define _(a) r=a;
        if (r!=GLOBUS_SUCCESS) {
      cerr << globus_object_printable_to_string(globus_error_get(r));
      exit(1);
       }


#define MAX_BUFFER_SIZE 2048
#define SUCCESS 0

//*************************************************************************


class ITSO_GLOBUS_FTP_CLIENT : public ITSO_CB {
   FILE*     fd;
    globus_byte_t           buffer[MAX_BUFFER_SIZE];
   globus_ftp_client_handle_t              handle;
   public:
    ITSO_GLOBUS_FTP_CLIENT(char*,char*);
   ~ITSO_GLOBUS_FTP_CLIENT();
   void StartTransfer();
   void Transfer( globus_byte_t*, globus_size_t&,globus_off_t&);
};
#endif
#ifndef ITSO_ITSO_GLOBUS_FTP_CLIENT_H
#define ITSO_ITSO_GLOBUS_FTP_CLIENT_H
#include <cstdio>
#include <iostream>
#include "globus_ftp_client.h"
#include "itso_cb.h"

#define _(a) r=a;
        if (r!=GLOBUS_SUCCESS) {
      cerr << globus_object_printable_to_string(globus_error_get(r));
      exit(1);
       }

#define MAX_BUFFER_SIZE 2048
#define SUCCESS 0


//*************************************************************************
class ITSO_GLOBUS_FTP_CLIENT : public ITSO_CB {
   FILE*     fd;
    globus_byte_t          buffer[MAX_BUFFER_SIZE];
   globus_ftp_client_handle_t             handle;
   public:
    ITSO_GLOBUS_FTP_CLIENT(char*,char*);
   ~ITSO_GLOBUS_FTP_CLIENT();
   void StartTransfer();
   void Transfer( globus_byte_t*, globus_size_t&,globus_off_t&);
};
#endif

itso_globus_ftp_client.C
#include "itso_globus_ftp_client.h"

namespace itso_globus_ftp_client {
static
void
done_cb(
        void*                user_arg,
        globus_ftp_client_handle_t *             handle,
        globus_object_t *                        err)
{
    ITSO_CB* f=(ITSO_CB*) user_arg;
if(err)
    {
        cerr << globus_object_printable_to_string(err);
    };
    f->setDone();
    return;
};

static
void
data_cb(
    void *                                      user_arg,
    globus_ftp_client_handle_t *                handle,
    globus_object_t *                           err,
    globus_byte_t *                             buffer,
    globus_size_t                               length,
    globus_off_t                                offset,
    globus_bool_t                               eof)
{
    ITSO_GLOBUS_FTP_CLIENT* l = (ITSO_GLOBUS_FTP_CLIENT*) user_arg;

    if(err)
    {
        fprintf(stderr, "%s", globus_object_printable_to_string(err));
    }
    else
    {
       if(!eof)
       l->Transfer(
                                buffer,
                                length,
                                offset
       );
    } /* else */
    return;
} /* data_cb */
};

ITSO_GLOBUS_FTP_CLIENT::ITSO_GLOBUS_FTP_CLIENT(char* f,char* dst)
      fd = fopen(f,"r");
      globus_ftp_client_handle_init(&handle,  GLOBUS_NULL);
      globus_result_t r;
      globus_ftp_client_put(
                            &handle,
                            dst,
                            GLOBUS_NULL,
                            GLOBUS_NULL,
                            itso_globus_ftp_client::done_cb,
                            this);
   };
ITSO_GLOBUS_FTP_CLIENT::~ITSO_GLOBUS_FTP_CLIENT() {
      fclose(fd);
      globus_ftp_client_handle_destroy(&handle);
   };
void ITSO_GLOBUS_FTP_CLIENT::StartTransfer() {
        int rc;
           rc = fread(buffer, 1, MAX_BUFFER_SIZE, fd);
           globus_ftp_client_register_write(
            &handle,
               buffer,
            rc,
               0,
            feof(fd) != SUCCESS,
           itso_globus_ftp_client::data_cb,
                (void*) this);
   };

void ITSO_GLOBUS_FTP_CLIENT::Transfer(
       globus_byte_t*                          buffer,
       globus_size_t&                           length,
       globus_off_t&                            offset
           )
{
   int rc;
        rc = fread(buffer, 1, MAX_BUFFER_SIZE, fd);
        if (ferror(fd) != SUCCESS)
        {
        printf("Read error in function data_cb; errno = %d
", errno);
        return;
    }
    globus_ftp_client_register_write(
       &handle,
        buffer,
        rc,
        offset + length,
        feof(fd) != SUCCESS,
    itso_globus_ftp_client::data_cb,
        (void *) this);
};

ITSO_CB

Below is a sample callback mechanism.

itso_cb.h
#ifndef ITSO_CB_H
#define ITSO_CB_H
#include <cstdio>
#include <iostream>
#include <globus_common.h>


class ITSO_CB {
   globus_mutex_t mutex;
   globus_cond_t cond;
   globus_bool_t done;
     public:
   ITSO_CB() {
      globus_mutex_init(&mutex, GLOBUS_NULL);
      globus_cond_init(&cond, GLOBUS_NULL);
      done = GLOBUS_FALSE ;
   };
   ~ITSO_CB() {
              globus_mutex_destroy(&mutex);
              globus_cond_destroy(&cond);
   };
   globus_bool_t IsDone();
        void setDone();
        void Continue();
   virtual void Wait();
};
#endif

itso_cb.C
#include "itso_cb.h"
globus_bool_t ITSO_CB::IsDone() { return done; };

void ITSO_CB::setDone() {
              globus_mutex_lock(&mutex);
              done = GLOBUS_TRUE;
         globus_cond_signal(&cond);
         globus_mutex_unlock(&mutex);
            }

void ITSO_CB::Continue() {
              globus_mutex_lock(&mutex);
              done = GLOBUS_FALSE;
         globus_mutex_unlock(&mutex);
            }

void ITSO_CB::Wait() {
   globus_mutex_lock(&mutex);
    while(!IsDone())
        globus_cond_wait(&cond, &mutex);
   globus_mutex_unlock(&mutex);
};

ITSO_GRAM_JOB

This class provides methods to easily submit a job to a Globus grid. It works in an asynchronous way:

  • The Submit() method takes a host name and the RSL string to submit the job and returns immediately.

  • The Wait() method waits for the completion of the job.

The class is derived from ITSO_CB and provides the following methods to check the status of the job:

  • IsDone() to check the status of the job (finished or not).

  • HasFailed() to check if the GRAM submission has failed. Note that this method will not detect if the executable aborted during execution or if it hangs.

The class can be used serveral times to submit different jobs with either a different host name or a different RSL string. Use the Continue() method to be able to use the Submit() method again.

itso_gram_job.h
#ifndef ITSO_GRAM_JOB_H
#define ITSO_GRAM_JOB_H
#include <cstdio>
#include <string>
#include "globus_gram_client.h"
#include "itso_cb.h"


class ITSO_GRAM_JOB : public ITSO_CB {
   char* job_contact;
        char* callback_contact; /* This is the identifier for
                                   * globus_gram_job_request

                */
   bool   failed;// used to check if a job has failed
    public:
   ITSO_GRAM_JOB();
   ~ITSO_GRAM_JOB();
   bool Submit(string,string);
   void Cancel();
   void SetJobContact(const char*);
   void Wait();
   void SetFailed();
   bool HasFailed();
};

#endif

itso_gram_jobs_callback.h
#ifndef ITSO_GRAM_JOBS_CALLBACK_H
#define ITSO_GRAM_JOBS_CALLBACK_H
#include <cstdio>
#include <string>
#include <map>
#include "globus_gram_client.h"
#include "itso_cb.h"


class ITSO_GRAM_JOBS_CALLBACK;
class ITSO_GRAM_JOB;

class ITSO_GRAM_JOBS_CALLBACK {
   globus_mutex_t JobsTableMutex;
   char* callback_contact; /* This is the identifier for
                                  * the callback, returned by
                                  * globus_gram_job_request
                   */
   map<string,ITSO_GRAM_JOB*> JobsTable;
   void  Lock();
   void  UnLock();
     public:
   ITSO_GRAM_JOBS_CALLBACK();
   ~ITSO_GRAM_JOBS_CALLBACK();
   void      Add(string,ITSO_GRAM_JOB*);
   void      Remove(char*);
   char*     GetURL();
   ITSO_GRAM_JOB*GetJob(char*);
};

class ITSO_GRAM_JOB : public ITSO_CB {
   char*        jobcontact;
   bool         failed;
   ITSO_GRAM_JOBS_CALLBACK* callback;
      public:
   ITSO_GRAM_JOB(ITSO_GRAM_JOBS_CALLBACK* f) :
failed(false),jobcontact(NULL),callback(f) {};
   ~ITSO_GRAM_JOB() {};
   bool Submit(string,string);
   void Cancel();
   void SetJobContact(char*);
   void Wait();
   void SetFailed();
   bool HasFailed();
};
#endif

itso_gram_jobs_callback.C
#include "itso_gram_jobs_callback.h"

namespace itso_gram_jobs {
static void callback_func(void * user_callback_arg,
                   char * job_contact,
                   int state,
                   int errorcode)
{
    ITSO_GRAM_JOBS_CALLBACK*  Monitor = (ITSO_GRAM_JOBS_CALLBACK*)
user_callback_arg;

    ITSO_GRAM_JOB* job = Monitor->GetJob(job_contact);

    switch(state)
    {
    case GLOBUS_GRAM_PROTOCOL_JOB_STATE_STAGE_IN:
   cout << "Staging file in on: " << job_contact << endl;
   break;
    case GLOBUS_GRAM_PROTOCOL_JOB_STATE_STAGE_OUT:
   cout << "Staging file out  on: " << job_contact << endl;
   break;
    case GLOBUS_GRAM_PROTOCOL_JOB_STATE_PENDING:
   break; /* Reports state change to the user */
    case GLOBUS_GRAM_PROTOCOL_JOB_STATE_ACTIVE:
   break; /* Reports state change to the user */

    case GLOBUS_GRAM_PROTOCOL_JOB_STATE_FAILED:
   Monitor->Remove(job_contact);
   job->SetFailed();
   job->setDone();
   cerr << "Job Failed on: " << job_contact << endl;
   break; /* Reports state change to the user */

    case GLOBUS_GRAM_PROTOCOL_JOB_STATE_DONE:
   cout << "Job Finished on: " << job_contact << endl;
   Monitor->Remove(job_contact);
   job->setDone();
   break; /* Reports state change to the user */
    }
}
static void request_callback(void * user_callback_arg,
                             globus_gram_protocol_error_t failure_code,
                             const char * job_contact,
                             globus_gram_protocol_job_state_t state,
                             globus_gram_protocol_error_t errorcode) {
    ITSO_GRAM_JOB*  Request = (ITSO_GRAM_JOB*) user_callback_arg;
    cout << "Contact on the server " << job_contact << endl;

    if (failure_code==0) {
    Request->SetJobContact(const_cast<char*>(job_contact));
    }
    else {
       cout << "Error during the code submission" << endl << "Error Code:" <<
failure_code << endl;
   Request->setDone();
   Request->SetFailed();
    }
}
}

void ITSO_GRAM_JOB::SetJobContact(char* c) {
    jobcontact=c;
    callback->Add(c,this);
};

bool ITSO_GRAM_JOB::Submit(string res, string rsl) {
    failed=false;
    int rc = globus_gram_client_register_job_request(res.c_str(),
                         rsl.c_str(),
                    GLOBUS_GRAM_PROTOCOL_JOB_STATE_ALL,
           callback->GetURL(),
                            GLOBUS_GRAM_CLIENT_NO_ATTR,
           itso_gram_jobs::request_callback,
           (void*) this);
    if (rc != 0) /* if there is an error */
    {
        printf("TEST: gram error: %d - %s
",
                rc,
                /* translate the error into english */
                globus_gram_client_error_string(rc));
        return true;
    }
    else {

    return false;
    };
};

void ITSO_GRAM_JOB::Wait() {
   ITSO_CB::Wait();
        /* Free up the resources of the job_contact, as the job is over, and
         * the contact is now useless.
         */
   if (jobcontact!=NULL) {
           globus_gram_client_job_contact_free(jobcontact);
       jobcontact=NULL;
   };
   Continue();
};

void ITSO_GRAM_JOB::Cancel() {
    int rc;
    printf("	TEST: sending cancel to job manager...
");

    if ((rc = globus_gram_client_job_cancel(jobcontact)) != 0)
    {
       printf("	TEST: Failed to cancel job.
");
       printf("	TEST: gram error: %d - %s
",
               rc,
               globus_gram_client_error_string(rc));
    }
    else
    {
       printf("	TEST: job cancel was successful.
");
    }
};

void ITSO_GRAM_JOB::SetFailed() {
   failed=true;
};
bool ITSO_GRAM_JOB::HasFailed() {
   return failed;
};

ITSO_GRAM_JOBS_CALLBACK::ITSO_GRAM_JOBS_CALLBACK() {
   globus_mutex_init(&JobsTableMutex, ITSO_NULL);
    globus_gram_client_callback_allow(
       itso_gram_jobs::callback_func,
                (void *) this,
                 &callback_contact);
   cout << "Gram contact " << callback_contact << endl;
};

char*  ITSO_GRAM_JOBS_CALLBACK::GetURL() {
   return callback_contact;
}

ITSO_GRAM_JOB*  ITSO_GRAM_JOBS_CALLBACK::GetJob(char* s) {
   return JobsTable[s];
}

ITSO_GRAM_JOBS_CALLBACK::~ITSO_GRAM_JOBS_CALLBACK() {
   cout << callback_contact << " destroyed" << endl;
   globus_gram_client_callback_disallow(callback_contact);
   globus_free(callback_contact);
   globus_mutex_destroy(&JobsTableMutex);
};

void ITSO_GRAM_JOBS_CALLBACK::Add(string jobcontact,ITSO_GRAM_JOB* job) {
   Lock();
   JobsTable[jobcontact]=job;
   UnLock();
};

void ITSO_GRAM_JOBS_CALLBACK::Remove(char* jobcontact){
   Lock();
   JobsTable.erase(jobcontact);
   UnLock();
};

void ITSO_GRAM_JOBS_CALLBACK::Lock() { globus_mutex_lock(&JobsTableMutex); };
void ITSO_GRAM_JOBS_CALLBACK::UnLock() { globus_mutex_unlock(&JobsTableMutex);
};

itso_gram_job.C
#include "itso_gram_job.h"

namespace itso_gram_job {
static void callback_func(void * user_callback_arg,
                   char * job_contact,
                   int state,
                   int errorcode)
{
    ITSO_GRAM_JOB*  Monitor = (ITSO_GRAM_JOB*) user_callback_arg;

    switch(state)
    {
    case GLOBUS_GRAM_PROTOCOL_JOB_STATE_STAGE_IN:
          cout << "Staging file in on: " << job_contact << endl;
          break;
    case GLOBUS_GRAM_PROTOCOL_JOB_STATE_STAGE_OUT:
          cout << "Staging file out  on: " << job_contact << endl;
          break;
    case GLOBUS_GRAM_PROTOCOL_JOB_STATE_PENDING:
          break; /* Reports state change to the user */

     case GLOBUS_GRAM_PROTOCOL_JOB_STATE_ACTIVE:
           break; /* Reports state change to the user */

     case GLOBUS_GRAM_PROTOCOL_JOB_STATE_FAILED:
           cerr << "Job Failed on: " << job_contact << endl;
           Monitor->SetFailed();
           Monitor->setDone();
           break; /* Reports state change to the user */

    case GLOBUS_GRAM_PROTOCOL_JOB_STATE_DONE:
           cout << "Job Finished on: " << job_contact << endl;
           Monitor->setDone();
           break; /* Reports state change to the user */
    }
}

static void request_callback(void * user_callback_arg,
      globus_gram_protocol_error_t        failure_code,
      const char *                        job_contact,
      globus_gram_protocol_job_state_t    state,
      globus_gram_protocol_error_t errorcode)
{
    ITO_GRAM_JOB*  Request = (ITSO_GRAM_JOB*) user_callback_arg;
    cout << "Contact on the server " << job_contact << endl;
     Request->SetRequestDone(job_contact);
}
}

ITSO_GRAM_JOB::ITSO_GRAM_JOB() {
};
ITSO_GRAM_JOB::~ITSO_GRAM_JOB() {
};

void ITSO_GRAM_JOB::SetRequestDone( const char* j) {
   job_contact = const_cast<char*>(j);
   request_cb.setDone();
}

void ITSO_GRAM_JOB::Submit(string res, string rsl) {
    failed=false;
    globus_gram_client_callback_allow(itso_gram_job::callback_func,
                       (void *) this,
                       &callback_contact);
    int rc = globus_gram_client_register_job_request(res.c_str(),
                          rsl.c_str(),
                          GLOBUS_GRAM_PROTOCOL_JOB_STATE_ALL,
                          callback_contact,
                          GLOBUS_GRAM_CLIENT_NO_ATTR,
                          itso_gram_job::request_callback,
                          (void*) this);
    if (rc != 0) /* if there is an error */
    {
        printf("TEST: gram error: %d - %s
",
                rc,
                /* translate the error into english */
                globus_gram_client_error_string(rc));
        return;
     }
};

void ITSO_GRAM_JOB::Wait() {
   request_cb.Wait();
   ITSO_CB::Wait();
        /* Free up the resources of the job_contact, as the job is over, and
         * the contact is now useless.
         */
        globus_gram_client_job_contact_free(job_contact);
   request_cb.Continue();
   ITSO_CB::Continue();
};

void ITSO_GRAM_JOB::Cancel() {
    int rc;
    printf("	TEST: sending cancel to job manager...
");

    if ((rc = globus_gram_client_job_cancel(job_contact)) != 0)
    {
       printf("	TEST: Failed to cancel job.
");
       printf("	TEST: gram error: %d - %s
",
              rc,
              globus_gram_client_error_string(rc));
    }
    else
    {
      printf("	TEST: job cancel was successful.
");
    }
};

void ITSO_GRAM_JOB::SetFailed() {
   failed=true;
}

bool ITSO_GRAM_JOB::HasFailed() {
   return failed;
}

StartGASSServer() and StopGASSServer()

These two functions provide an easy way to start and stop a local GASS server. The StartGASSServer takes one argument (the port number on which the GASS server must listen on).

As the callback function used for globus_gass_server_ez_init() does not take any argument, an object cannot be passed to the callbacks. Consequently, if the application needs to start two local GASS servers, two different callback functions must be used and globus_gass_server_ez_init() must be called twice with a different callback each time.

itso_gass_server.h
#ifndef ITSO_GASS_SERVER_H
#define ITSO_GASS_SERVER_H

#include <unistd.h>
#include "globus_common.h"
#include "globus_gass_server_ez.h"
#include <iostream>

namespace itso_gass_server {

void StartGASSServer(int);

void StopGASSServer();

};

#endif

itso_gass_server.C
#include "itso_gass_server.h"

namespace itso_gass_server {

globus_mutex_t                mutex;
globus_cond_t                 cond;
bool                      done;
globus_gass_transfer_listener_t GassServerListener;

void callback_c_function() {
       globus_mutex_lock(&mutex);
       done = true;
       globus_cond_signal(&cond);
}

void StartGASSServer(int port=10000) {
   // Never forget to activate GLOBUS module
   globus_module_activate(GLOBUS_GASS_SERVER_EZ_MODULE);

   // let s define options for our GASS server
   unsigned long server_ez_opts=0UL;

   //Files openfor writing will be written a line at a time
   //so multiple writers can access them safely.
   server_ez_opts |= GLOBUS_GASS_SERVER_EZ_LINE_BUFFER;

   //URLs that have ~ character, will be expanded to the home
   //directory of the user who is running the server
   server_ez_opts |= GLOBUS_GASS_SERVER_EZ_TILDE_EXPAND;

   //URLs that have ~user character, will be expanded to the home
   //directory of the user on the server machine
   server_ez_opts |= GLOBUS_GASS_SERVER_EZ_TILDE_USER_EXPAND;

   //"get" requests will be fullfilled
   server_ez_opts |= GLOBUS_GASS_SERVER_EZ_READ_ENABLE;

   //"put" requests will be fullfilled
   server_ez_opts |= GLOBUS_GASS_SERVER_EZ_WRITE_ENABLE;

   // for put requets on /dev/stdout will be redirected to the standard
   // output stream of the gass server
   server_ez_opts |= GLOBUS_GASS_SERVER_EZ_STDOUT_ENABLE;

   // for put requets on /dev/stderr will be redirected to the standard
   // output stream of the gass server
   server_ez_opts |= GLOBUS_GASS_SERVER_EZ_STDERR_ENABLE;
   // "put requests" to the URL https://host/dev/globus_gass_client_shutdown
   // will cause the callback function to be called.  this allows
   // the GASS client to communicate shutdown requests to the server
   server_ez_opts |= GLOBUS_GASS_SERVER_EZ_CLIENT_SHUTDOWN_ENABLE;
   
   // Secure
   char* scheme="https";
   //unsecure
   //char* scheme="http";
        globus_gass_transfer_listenerattr_t  attr;
        globus_gass_transfer_listenerattr_init(&attr, scheme);

        //we want to listen on post 10000
   globus_gass_transfer_listenerattr_set_port(&attr, port);

   //Now, we can start this gass server !
        globus_gass_transfer_requestattr_t  * reqattr      = GLOBUS_NULL;
//purpose unknown

        globus_mutex_init(&mutex, GLOBUS_NULL);
        globus_cond_init(&cond, GLOBUS_NULL);
        done = false;

   int err = globus_gass_server_ez_init(&GassServerListener,
                                         &attr,
                                         scheme,
                                         GLOBUS_NULL, //purpose unknown
                                         server_ez_opts,
                 callback_c_function); //or GLOBUS_NULL otherwise
                 //GLOBUS_NULL); //or GLOBUS_NULL otherwise

      if((err != GLOBUS_SUCCESS)) {
              cerr << "Error: initializing GASS (" << err << ")" << endl;
             exit(1);
      }

        char *
gass_server_url=globus_gass_transfer_listener_get_base_url(GassServerListener);
   cout << "we are listening on " << gass_server_url << endl;

};

void StopGASSServer() {
   globus_gass_server_ez_shutdown(GassServerListener);
   globus_module_deactivate(GLOBUS_GASS_SERVER_EZ_MODULE);
};

};

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.142.166.31