![]() |
License / Documentation home / Help and feedback | ![]() |
We can use the elements of the toplevel server loop to provide Communicator server functionality to other systems which have their own main loops, such as scripting language interpreters (Tcl and Python, for instance), window systems (Tk and X, for instance), and distributed object systems (e.g., Xerox PARC's ILU). In this document, we show how to do this.
Recall the Communicator main loop in the timed task case:
/* The main() here is essentially the same main() as inHere's the main function from simple_mainloop_elr.c, in the main loop example which demonstrates the use of ELRs:
the Communicator library. */
int main(int argc, char **argv)
{
GalIO_ServerStruct *server;
server = GalSS_CmdlineSetupServer(argc, argv);
if (!server) {
GalUtil_Fatal("Failed to set up server!\n");
}
GalSS_StartAndRunServer(server);
exit(0);
}
int main(int argc, char **argv)The basic steps in this latter case are
{
Looper *l = SM_NewLooper();
GalSS_ELR *elr = GalSS_ELRSetupServer((GalSS_ServerArgs *) NULL,
argc, argv,
_simple_mainloop_set_timer,
_simple_mainloop_unset_timer,
_simple_mainloop_set_fd,
_simple_mainloop_unset_fd,
_simple_mainloop_behavior_fn,
(void *) l, NULL, 1);
if (!elr) {
exit(1);
}
SM_Mainloop(l);
exit(0);
}
First, in order to configure an external main loop to handle Communicator events appropriately, the external main loop must support both timer-triggered events and file-descriptor-triggered events. The callbacks for a Communicator-compliant server cannot rely exclusively on file descriptors, because sometimes there's material in the incoming or outgoing internal queues which a file descriptor poll would not catch. In principle, timer-triggered events would probably suffice, but the ELR registration is not configured to guarantee that possibility yet. So the ELR object must record functions to set up and cancel both types of events, using the chosen main loop. Here are the appropriate functions for our simple main loop example. Let's start with timers:
static void _simple_mainloop_timer_callback(void *client_data)The setting function takes an ELR object and a timer duration in milliseconds, and returns a tag that can be used to cancel the timer. The unsetting function takes the ELR object and the tag. In order to complete the implementation, the setter needs to refer to a callback function which has a function signature appropriate for the external main loop you're using. This function should call GalSS_ELRDoCallback. Now we do the same thing for file descriptors:
{
GalSS_ELRDoCallback((GalSS_ELR *) client_data, GALSS_ELR_TIMER);
}
void *_simple_mainloop_set_timer(GalSS_ELR *elr, int ms)
{
return (void *) SM_AddTimerCallback((Looper *) GalSS_ELRGetLoopData(elr),
ms, _simple_mainloop_timer_callback,
(void *) elr);
}
void _simple_mainloop_unset_timer(GalSS_ELR *elr, void *tag)
{
SM_RemoveTimerCallback((TimerCallback *) tag);
}
In addition, the ELR object records whether the timer in the main loop is persistent or not. In some cases, a timer resets itself automatically after it's fired, and continues firing until it's explicitly cancelled (that is, it's persistent). In other cases, the timer must be explicitly reset (that is, it's not persistent). For instance, tasks in the Communicator timed task loop are not persistent.
static void _simple_mainloop_fd_callback(void *client_data)
{
GalSS_ELRDoCallback((GalSS_ELR *) client_data, GALSS_ELR_FD);
}
void *_simple_mainloop_set_fd(GalSS_ELR *elr, GAL_SOCKET fd)
{
SM_AddFDCallback((Looper *) GalSS_ELRGetLoopData(elr),
fd, _simple_mainloop_fd_callback,
(void *) elr);
return (void *) fd;
}
void _simple_mainloop_unset_fd(GalSS_ELR *elr, void *tag)
{
SM_RemoveFDCallback((Looper *) GalSS_ELRGetLoopData(elr),
(GAL_SOCKET) tag);
}
Second, the ELR object records behavior associated with Communicator events. For example, you may require special things to happen when a connection starts up or shuts down (you may need to store it somewhere that the GUI can get to it, perhaps), or when a server starts up or shuts down. This is the behavior function in the simple main loop example:
void _simple_mainloop_behavior_fn(GalSS_ELR *elr, int event)In this particular case, the main loop is shut down when the server exits.
{
switch (event) {
case GAL_SERVER_LISTENER_SHUTDOWN_EVENT:
GalUtil_Warn("The server has failed.\n");
SM_RemoveAllFDCallbacks((Looper *) GalSS_ELRGetLoopData(elr));
SM_RemoveAllTimerCallbacks((Looper *) GalSS_ELRGetLoopData(elr));
break;
case GAL_SERVER_DESTRUCTION_EVENT:
GalUtil_Warn("The server has been destroyed.\n");
SM_LooperExit((Looper *) GalSS_ELRGetLoopData(elr));
break;
}
}
Finally, the ELR object records arbitrary external data which the ELR object may need to do its job. In this case, we see in the behavior function that the ELR object requires a reference to the external main loop.
enum {GALSS_ELR_TIMER, GALSS_ELR_FD};
typedef void *(*GalSS_ELTimerSetFn)(GalSS_ELR *, int
ms);
typedef void *(*GalSS_ELFDSetFn)(GalSS_ELR *, GAL_SOCKET
fd);
typedef void (*GalSS_ELUnsetFn)(GalSS_ELR *, void *tag);
typedef void (*GalSS_ELBehaviorFn)(GalSS_ELR *, int
condition);
GalSS_ELR *GalSS_ELRSetupServer(GalSS_ServerArgs
*external_arg_pkg, int argc, char **argv, GalSS_ELTimerSetFn
timer_set_fn, GalSS_ELUnsetFn timer_unset_fn, GalSS_ELFDSetFn
fd_set_fn, GalSS_ELUnsetFn fd_unset_fn, GalSS_ELBehaviorFn
behavior_fn, void *loop_data, void (*loop_data_free_fn)(void
*), int timer_is_persistent)
This function performs a number of tasks. First, it analyzes the command
line arguments using the argument package
tools, treating argc, argv, and external_arg_pkg as in GalSS_ExtractCmdlineServerArgs.
It fixes the loop type to be external using GalSS_SAFixLoopType.
It then calls GalSS_SetupServer,
builds the ELR structure using GalSS_ELRCreate
(the five function arguments and timer_is_persistent), sets the
loop data (if present) using GalSS_ELRSetLoopData
(the loop_data and loop_data_free_fn arguments), and calls
GalIO_ServerStart.
At this point, the server is configured and running, and the appropriate
callbacks should be enabled in the external main loop.
void GalSS_ELRDoCallback(GalSS_ELR
*elr, int timer_or_fd)
This function should be invoked by all external loop timer and file descriptor
callbacks. The timer_or_fd argument should be GALSS_ELR_TIMER
or GALSS_ELR_FD as appropriate.
void GalSS_ELRSetLoopData(GalSS_ELR
*elr, void *loop_data, void (*loop_data_free_fn)(void
*))
This function sets the arbitrary ELR data. If loop_data is not
NULL and loop_data_free_fn is not NULL, loop_data_free_fn
will be called on loop_data when elr is freed.
void *GalSS_ELRGetLoopData(GalSS_ELR
*elr)
Retrieves the arbitrary ELR data.
GalSS_ELR *GalSS_ELRCreate(GalIO_ServerStruct
*scomm, GalSS_ELTimerSetFn timer_set_fn, GalSS_ELUnsetFn
timer_unset_fn, GalSS_ELFDSetFn fd_set_fn, GalSS_ELUnsetFn
fd_unset_fn, GalSS_ELBehaviorFn behavior_fn, int timer_is_persistent)
This function creates the basic ELR object. The scomm is a server
object created using GalSS_SetupServer
or some similar function. The timer_set_fn, timer_unset_fn,
fd_set_fn, and fd_unset_fn are the timer and file descriptor
setting and unsetting functions illustrated here. The behavior_fn is the function
which is called when various Communicator events are fired; for any given
situation, it is called after the corresponding timer and file descriptor
setting and unsetting functions are called (so, for instance, when a new
connection object is created, the timer and file descriptors are registered
for that object, and then the behavior_fn is called). The timer_is_persistent
argument should be 1 if the timers in the external main loop reset themselves
automatically when they're fired, 0 otherwise.
When you create an ELR object, you're actually registering global information for many ELR objects, each of which is associated with a specific server, connection or broker object. When a connection is created, for instance, a local ELR object is create for it which shares the global information with all the other local ELR objects; when the connection is destroyed, the local ELR object is freed. The functions GalSS_ELRSetLoopData and GalSS_ELRGetLoopData access the shared global information, so you don't need to know about this local/global distinction in the normal case. However, under some circumstances you may want to know what sort of local ELR object you're currently holding in a callback. You can do this by using the following three functions.
GalIO_ServerStruct *GalSS_ELRSComm(GalSS_ELR
*elr)
Returns the server object associated with this ELR object.
GalIO_CommStruct *GalSS_ELRGComm(GalSS_ELR
*elr)
Returns the connection object associated with this ELR object, or NULL
if this ELR object is associated with a server.
GalIO_BrokerStruct *GalSS_ELRBroker(GalSS_ELR
*elr)
Returns the broker object associated with this ELR object, or NULL if
this ELR object is associated with a server or connection.
There is currently no support for distinguishing between inbound and outbound brokers.
void GalSS_ELRUpdatePollIntervals(GalSS_ELR
*elr, int server_client_poll_ms, int conn_ms, int
broker_ms)
There are currently three circumstances where timers are used in the ELR
infrastructure: when polling server clients trying to contact
a Hub, when polling connections, and when polling brokers. The default
timer intervals for these polls are 1000 ms, 50 ms, and 50 ms, respectively.
If you want to change the polling interval, you can do so using this function.
An argument of -1 for any of the three polling arguments here will leave
the existing poll untouched.
In rare cases, the programmer will want to change the callback behavior for a specific object type. While this is not without risks, it can be done, with one of the following five functions. The callback function must return 1 if the object is still pollable, 0 otherwise. It is strongly recommended that you begin from the existing implementations of these callback functions and modify them as little as possible.
typedef int (*GalSS_ELCallbackFn)(GalSS_ELR *elr, int timer_or_fd);
void GalSS_ELRSetConnectionCallback(GalSS_ELR
*elr, GalSS_ELCallbackFn fn)
Sets the behavior of GalSS_ELRDoCallback
when the object associated with the ELR is a connection object. The current
definition is as follows:
static int __GalSS_ELRDoConnectionCallback(GalSS_ELR *elr, int timer_or_fd)
{
if ((timer_or_fd == GALSS_ELR_FD) ||
GalIO_CommReadReady(elr->local_info->gcomm) ||
GalIO_CommWriteReady(elr->local_info->gcomm)) {
return GalIO_ConnectionCallbackHandler(elr->local_info->gcomm, 0);
} else {
return 0;
}
}
void GalSS_ELRSetBrokerOutCallback(GalSS_ELR
*elr, GalSS_ELCallbackFn fn)
Sets the behavior of GalSS_ELRDoCallback
when the object associated with the ELR is an outbound broker object. The
current definition is as follows:
static int __GalSS_ELRDoBrokerOutCallback(GalSS_ELR *elr, int timer_or_fd)
{
if (GalIO_BrokerWriteReady(elr->local_info->broker)) {
return GalIO_BrokerDataOutCallbackHandler(elr->local_info->broker);
} else {
return 0;
}
}
void GalSS_ELRSetBrokerInCallback(GalSS_ELR
*elr, GalSS_ELCallbackFn fn)
Sets the behavior of GalSS_ELRDoCallback
when the object associated with the ELR is an inbound broker object. The
current definition is as follows:
static int __GalSS_ELRDoBrokerInCallback(GalSS_ELR *elr, int timer_or_fd)
{
if ((timer_or_fd == GALSS_ELR_FD) ||
GalIO_BrokerReadReady(elr->local_info->broker)) {
return GalIO_BrokerDataInCallbackHandler(elr->local_info->broker, 0);
} else {
return 0;
}
}
void GalSS_ELRSetServerListenerCallback(GalSS_ELR
*elr, GalSS_ELCallbackFn fn)
Sets the behavior of GalSS_ELRDoCallback
when the object associated with the ELR is a server object and the current
ELR was set up to monitor connections from the Hub. The current definition
is as follows:
static int __GalSS_ELRDoServerListenerCallback(GalSS_ELR *elr,
int timer_or_fd)
{
int res = GalIO_ServerCallbackHandler(elr->local_info->scomm, 0,
(GalIO_CommStruct **) NULL);
/* 1 means it got a connection and everything's OK. */
if (res == 1)
res = 0;
return res;
}
void GalSS_ELRSetServerClientCallback(GalSS_ELR
*elr, GalSS_ELCallbackFn fn)
Sets the behavior of GalSS_ELRDoCallback
when the object associated with the ELR is a server object and the current
ELR was set up to monitor connections
to the Hub. The current definition is as follows:
static int __GalSS_ELRDoServerClientCallback(GalSS_ELR *elr,In all cases, the functions that are called are the toplevel functions that an external main loop would call to monitor the relevant object.
int timer_or_fd)
{
GalIO_ServerCheckHubContacts(elr->local_info->scomm);
return 0;
}
GalSS_ELR *GalSS_ELRCopy(GalSS_ELR
*source)
Copies an ELR object. Programmers should not need this function.
void GalSS_ELRShutdown(GalSS_ELR
*elr)
Shuts down all callbacks associated with this ELR. Programmers should
not need this function.
void GalSS_ELRDestroy(GalSS_ELR
*elr)
Frees the ELR. If the ELR is the "root" ELR, the global information is
freed as well.
int main(int argc, char **argv)This example contrasts with the default main loop in that we're using a different main loop; the way the server object is created is essentially the same. There are three sections to this main() function.
{
GalaxyCallbackRecord *gcr = (GalaxyCallbackRecord *) NULL;
int new_argc;
char **new_argv;
GalSS_ServerArgs *arg_pkg;
gcr = (GalaxyCallbackRecord *) malloc(sizeof(GalaxyCallbackRecord));
gcr->timer_cb = (TimerCallback *) NULL;
gcr->l = SM_NewLooper();
/* If you want to use the built-in server arguments, you
can use GalSS_ExtractCmdlineServerArgs. Otherwise, you can just
call GalSS_InitializeServerToplevel(). */
arg_pkg = GalSS_DefaultServerArgs();
/* Make sure it knows that we're using our own main loop. We set this
before we ever parse the server arguments, because we don't even want
the arguments pertaining to the loop type enabled for the user. */
GalSS_SAFixLoopType(arg_pkg, GAL_LOOP_EXTERNAL);
arg_pkg = GalSS_ExtractCmdlineServerArgs(arg_pkg, argc, argv,
&new_argc, &new_argv);
if (!arg_pkg) {
/* Something bad happened, or -help was passed. */
exit(1);
}
/* Now, we call GalSS_InitializeServerFromServerArgs, and we don't have
to worry about the signature of GalSS_InitializeServerToplevel. */
gcr->scomm = GalSS_SetupServer(arg_pkg, new_argc, new_argv);
GalSS_FreeArgPkg(arg_pkg);
if (!gcr->scomm) {
fprintf(stderr, "Couldn't create a server\n");
fflush(stderr);
exit(1);
}
/* Set the connect callback for the server. This gets called
whenever a new connection is established. */
GalIO_AddServerConnectCallback(gcr->scomm,
GCRAddConnectionCallback, (void *) gcr);
/* The server can be a listener when it starts out, or
it can become a listener when an outgoing broker starts up. So
we set a callback to handle whenever this happens. */
GalIO_AddServerCallback(gcr->scomm,
GAL_SERVER_LISTENER_STARTUP_EVENT,
GCRSetupServerListener, (void *) gcr);
/* Similarly, if someone calls GalIO_ContactHub, it may lead to
a new poller starting up. So we should deal with that
as a callback too. */
GalIO_AddServerCallback(gcr->scomm,
GAL_SERVER_CLIENT_POLL_STARTUP_EVENT,
GCRSetupServerClient, (void *) gcr);
/* And now, something that will shut down the loop when
the server is destroyed. */
GalIO_AddServerCallback(gcr->scomm,
GAL_SERVER_DESTRUCTION_EVENT,
GCRDestroyServer, (void *) gcr);
/* Now, start the server, and then the main loop. */
if (!GalIO_ServerStart(gcr->scomm)) {
fprintf(stderr, "Couldn't start the server\n");
fflush(stderr);
exit(1);
}
SM_Mainloop(gcr->l);
exit(0);
}
In the first section, we allocate the structures we need for this type of mainloop. In the second section, we parse the command line arguments, using the Galaxy Communicator library utilities. Notice that before we parse the arguments, we fix the loop type, so that the arguments to control the loop type will not be used. Then, we populate the package of arguments, and then set up a server based on those arguments. In the final section, we set up the callbacks which will handle the server. We do this by setting up four callbacks: one for when the listener starts up, one for when the client poll starts up, one for when a connection is established, and one for when the server is destroyed.
static void GCRServerListenerHandler(void *client_data)The function GCRSetupListenerServer is called when the listener starts up. It adds a shutdown callback to the server, and adds a file descriptor callback for the local main loop which calls GalIO_ServerCallbackHandler.
{
GalaxyCallbackRecord *gcr = (GalaxyCallbackRecord *) client_data;
GalIO_ServerCallbackHandler(gcr->scomm, 0,
(GalIO_CommStruct **) NULL);
}
static void GCRShutdownServerListener(GalIO_ServerStruct *scomm,
void *callback_data)
{
GalaxyCallbackRecord *gcr = (GalaxyCallbackRecord *) callback_data;
GalUtil_Warn("The server has failed.\n");
SM_RemoveAllFDCallbacks(gcr->l);
SM_RemoveAllTimerCallbacks(gcr->l);
}
static void GCRSetupServerListener(GalIO_ServerStruct *scomm,
void *callback_data)
{
GalaxyCallbackRecord *gcr = (GalaxyCallbackRecord *) callback_data;
/* You only need a file descriptor callback here, since
there will be no connection requests in any internal queue. */
SM_AddFDCallback(gcr->l, GalIO_GetServerListenSocket(scomm),
GCRServerListenerHandler,
(void *) gcr);
GalIO_AddServerCallback(scomm,
GAL_SERVER_LISTENER_SHUTDOWN_EVENT,
GCRShutdownServerListener,
(void *) gcr);
}
int GalIO_ServerCallbackHandler(GalIO_ServerStruct
*scomm, int read_blocking, GalIO_CommStruct **new_conn_ptr)
Polls the server scomm and sets *new_conn_pointer to the
new connection, if one is established. If new_conn_ptr is NULL, the
new connection will not be returned. The read_blocking flag should
be 1 if the handler should do a blocking read, 0 otherwise. Returns 1 if there's
a new connection, 0 if there isn't, -1 if an error was encountered and the
listener was shut down, -2 if an error was encountered and the server was
destroyed.
Next, let's look at the client startup callbacks:
/* This function is used when the server is subscribing toThis is exactly parallel to the listener case, except the client shutdown callback is associated with server destruction. The local main loop callback calls GalIO_ServerCheckHubContacts.
Hub listeners. */
static void GCRServerClientHandler(void *client_data)
{
GalaxyCallbackRecord *gcr = (GalaxyCallbackRecord *) client_data;
GalIO_ServerCheckHubContacts(gcr->scomm);
}
static void GCRShutdownServerClient(GalIO_ServerStruct *scomm,
void *callback_data)
{
TimerCallback *cb = (TimerCallback *) callback_data;
SM_RemoveTimerCallback(cb);
}
static void GCRSetupServerClient(GalIO_ServerStruct *scomm,
void *callback_data)
{
GalaxyCallbackRecord *gcr = (GalaxyCallbackRecord *) callback_data;
TimerCallback *cb;
/* Set up a periodic task to check the hub contacts. */
cb = SM_AddTimerCallback(gcr->l, 10, GCRServerClientHandler, (void *) gcr);
/* Add a shutdown callback now. */
GalIO_AddServerCallback(scomm,
GAL_SERVER_DESTRUCTION_EVENT,
GCRShutdownServerClient,
(void *) cb);
}
void GalIO_ServerCheckHubContacts(GalIO_ServerStruct
*scomm)
Polls the server scomm to make sure that all client connections
to Hub listeners are appropriately established.
Next, we have the server destruction callback, which simply exits the local loop.
static void GCRDestroyServer(GalIO_ServerStruct *scomm,
void *callback_data)
{
GalaxyCallbackRecord *gcr = (GalaxyCallbackRecord *) callback_data;
GalUtil_Warn("The server has been destroyed.\n");
SM_LooperExit(gcr->l);
}
typedef struct __connection_container {
GalIO_CommStruct *gcomm;
GalaxyCallbackRecord *gcr;
TimerCallback *t;
GAL_SOCKET fd;
} ConnectionContainer;
typedef struct __connection_container {
GalIO_CommStruct *gcomm;
GalaxyCallbackRecord *gcr;
TimerCallback *t;
GAL_SOCKET fd;
} ConnectionContainer;
/* GalIO_ConnectionCallbackHandler():
-1 means an error was encountered and the connection has been destroyed.
0 means we're in the midst of things.
1 means we're done and the connection has been destroyed. */
static void GCRConnectionDisconnect(GalIO_CommStruct *gcomm, void *caller_data)
{
ConnectionContainer *c = (ConnectionContainer *) caller_data;
GalaxyCallbackRecord *gcr = c->gcr;
SM_RemoveFDCallback(gcr->l, c->fd);
SM_RemoveTimerCallback(c->t);
free(c);
}
/* The loop cleanup is handled in the disconnect callback. */
static void GCRConnectionHandler(void *client_data)
{
ConnectionContainer *c = (ConnectionContainer *) client_data;
GalIO_CommStruct *gcomm = c->gcomm;
GalIO_ConnectionCallbackHandler(gcomm, 0);
}
static void GCRConnectionTimerHandler(void *client_data)
{
/* This is called from the timer. We could go ahead and
just call the normal connection handler, which would
try to read from the file descriptor, but since there's
already a file descriptor callback which triggers that
handler, we'll only do something if there's stuff
in the internal queues. */
ConnectionContainer *c = (ConnectionContainer *) client_data;
GalIO_CommStruct *gcomm = c->gcomm;
if (GalIO_CommReadReady(gcomm) || GalIO_CommWriteReady(gcomm)) {
GCRConnectionHandler(client_data);
}
}
static void GCRAddConnectionCallback(GalIO_ServerStruct *scomm,
GalIO_CommStruct *gcomm,
void *callback_data)
{
GalaxyCallbackRecord *gcr = (GalaxyCallbackRecord *) callback_data;
ConnectionContainer *c = (ConnectionContainer *) calloc(1, sizeof(ConnectionContainer));
c->gcr = gcr;
c->gcomm = gcomm;
c->fd = GalIO_GetCommSocket(gcomm);
/* We'll use the file descriptor callback to check the file
descriptor, and the timer callback to check the internal queue. */
SM_AddFDCallback(gcr->l, c->fd,
GCRConnectionHandler,
(void *) c);
c->t = SM_AddTimerCallback(gcr->l, 5, GCRConnectionTimerHandler,
(void *) c);
/* Finally, to support brokers, and to deal with
disconnections, we need to use the
data slot for the connection. */
/* Make sure you stop polling when the connection dies. */
GalIO_AddConnectionCallback(gcomm,
GAL_CONNECTION_SHUTDOWN_EVENT,
GCRConnectionDisconnect,
(void *) c);
/* And now, add the callbacks for the broker setups. */
GalIO_AddConnectionBrokerCallback(gcomm,
GAL_CONNECTION_BROKER_OUT_STARTUP_EVENT,
GCRSetupBrokerOut,
(void *) gcr);
GalIO_AddConnectionBrokerCallback(gcomm,
GAL_CONNECTION_BROKER_IN_STARTUP_EVENT,
GCRSetupBrokerIn,
(void *) gcr);
}
int GalIO_ConnectionCallbackHandler(GalIO_CommStruct
*gcomm, int read_blocking)
Polls the connection gcomm for dispatch function requests to process.
Does a blocking read if read_blocking is nonzero. Returns -1 if an
error was encountered and the connection has been destroyed, 1 if the connection
is done and it's been destroyed, 0 otherwise.
int GalIO_CommWriteReady(GalIO_CommStruct
*gcomm)
Returns 1 if the connection gcomm has data in its internal outbound
queue waiting to be written, 0 otherwise.
intGalIO_CommReadReady(GalIO_CommStruct
*gcomm)
Returns 1 if the connection gcomm has data in its internal inbound queue
waiting to be processed, 0 otherwise.
typedef struct __broker_container {The outgoing broker sets up a timer callback for the local main loop (the server is already monitoring the connection requests, so all the outgoing broker needs to do is flush data). In addition, the broker adds a shutdown callback for itself.
GalaxyCallbackRecord *gcr;
GalIO_BrokerStruct *b;
TimerCallback *t;
GAL_SOCKET fd;
} BrokerContainer;
/* GalIO_BrokerDataOutCallbackHandler()
returns 1 if the broker is done and has been destroyed,
0 if not done */
/* The timer disconnect is handled in the loop data finalizer. */
static void GCROutBrokerHandler(void *client_data)
{
BrokerContainer *c = (BrokerContainer *) client_data;
if (GalIO_BrokerWriteReady(c->b)) {
GalIO_BrokerDataOutCallbackHandler(c->b);
}
}
void GCRBrokerShutdown(GalIO_BrokerStruct *b, void *loop_data)
{
BrokerContainer *c = (BrokerContainer *) loop_data;
if (c->fd != GAL_INVALID_SOCKET)
SM_RemoveFDCallback(c->gcr->l, c->fd);
SM_RemoveTimerCallback(c->t);
free(c);
}
void GCRSetupBrokerOut(GalIO_CommStruct *gcomm,
GalIO_BrokerStruct *b,
void *caller_data)
{
BrokerContainer *c = (BrokerContainer *) calloc(1, sizeof(BrokerContainer));
c->b = b;
c->gcr = (GalaxyCallbackRecord *) caller_data;
c->fd = GAL_INVALID_SOCKET;
/* There's no point in an fd callback for the outgoing
broker, since it piggybacks off of the server listener. */
c->t = SM_AddTimerCallback(c->gcr->l, 1, GCROutBrokerHandler,
(void *) c);
/* Use the caller data to set up the loop finalizer. */
GalIO_AddBrokerCallback(b, GAL_BROKER_DESTRUCTION_EVENT,
GCRBrokerShutdown,
(void *) c);
}
int GalIO_BrokerDataOutCallbackHandler(GalIO_BrokerStruct
*b)
This function invokes the broker b and destroys the broker in the
appropriate circumstances. This function returns 1 if the broker is done
and has been destroyed, 0 if not done.
int GalIO_BrokerWriteReady(GalIO_BrokerStruct
*b)
This function returns 1 when the broker has data to write in its outbound
queue, 0 otherwise.
int GalSS_BrokerProxyOutCallbackHandler(GalSS_BrokerProxy
*bp)
This function calls GalIO_BrokerDataOutCallbackHandler
on the broker object associated with the outbound broker proxy bp.
As we mentioned, this function
is needed only for programming language embeddings, and should never be needed
by the programmer; all C embeddings for broker proxies are handled transparently
by the broker callbacks.
int GalSS_BrokerProxyWriteReady(GalSS_BrokerProxy
*bp)
Returns 1 if the outbound broker proxy bp has data available
to be written in its outbound queue, 0 otherwise. As we mentioned, this function is needed
only for programming language embeddings, and should never be needed by
the programmer; all C embeddings for broker proxies are handled transparently
by the broker callbacks.
/* Next, the incoming broker. This is much more like theThe callback sets up a local file descriptor and timer callback, and sets up a shutdown callback to destroy the local callbacks.
connection. */
/* GalIO_BrokerInCallbackHandler()
returns 1 if the broker is done and has been destroyed,
0 if not done, -1 if error was encountered and the broker
has been destroyed. */
static void GCRInBrokerHandler(void *client_data)
{
/* The timer disconnect must be handled in the finalizer.
Make sure that's set up correctly in the examples. */
BrokerContainer *c = (BrokerContainer *) client_data;
GalIO_BrokerStruct *b = c->b;
GalIO_BrokerDataInCallbackHandler(b, 0);
}
static void GCRInBrokerTimerHandler(void *client_data)
{
/* This is called from the timer. We could go ahead and
just call the normal in broker handler, which would
try to read from the file descriptor, but since there's
already a file descriptor callback which triggers that
handler, we'll only do something if there's stuff
in the internal queues. */
BrokerContainer *c = (BrokerContainer *) client_data;
GalIO_BrokerStruct *b = c->b;
if (GalIO_BrokerReadReady(b)) {
GCRInBrokerHandler(client_data);
}
}
void GCRSetupBrokerIn(GalIO_CommStruct *gcomm,
GalIO_BrokerStruct *b,
void *caller_data)
{
BrokerContainer *c = (BrokerContainer *) calloc(1, sizeof(BrokerContainer));
c->b = b;
c->gcr = (GalaxyCallbackRecord *) caller_data;
c->fd = GalIO_GetBrokerSocket(b);
SM_AddFDCallback(c->gcr->l, c->fd, GCRInBrokerHandler, (void *) c);
c->t = SM_AddTimerCallback(c->gcr->l, 1, GCRInBrokerTimerHandler,
(void *) c);
/* Use the caller data to set up the loop finalizer. */
GalIO_AddBrokerCallback(b, GAL_BROKER_DESTRUCTION_EVENT,
GCRBrokerShutdown,
(void *) c);
int GalIO_BrokerDataInCallbackHandler(GalIO_BrokerStruct
*b, int read_blocking)
This function polls the broker b and destroys the broker in the
appropriate circumstances. This function returns 1 if the broker is done
and has been destroyed, 0 if not done, -1 if error was encountered and the
broker has been destroyed.
int GalIO_BrokerReadReady(GalIO_BrokerStruct
*b)
This function returns 1 when the broker has data to read in its inbound
queue, 0 otherwise.
int GalSS_BrokerProxyInCallbackHandler(GalSS_BrokerProxy
*bp)
This function calls GalIO_BrokerDataOutCallbackHandler
on the broker object associated with the inbound broker proxy bp.
As we mentioned, this function
is needed only for programming language embeddings, and should never be needed
by the programmer; all C embeddings for broker proxies are handled transparently
by the broker callbacks.
int GalSS_BrokerProxyReadReady(GalSS_BrokerProxy
*bp)
As we already mentioned, there's
no requirement to handle broker proxies specially in these cases, because
the registration will happen through the normal creation callbacks associated
with the brokers embedded in the proxies themselves. However, occasionally
it will be more convenient to poll a broker directly (the Python bindings
make use of this functionality, for instance). This function returns 1 when
the broker proxy has a broker and the broker has data to read in its inbound
queue, 0 otherwise.
![]() |
License / Documentation home / Help and feedback | ![]() |