Galaxy Communicator Documentation:

Brokering and Audio Data

License / Documentation home / Help and feedback

In Communicator, there are two ways of moving data from server to server. First, you can package the data into a frame and send it to another server via the Hub, using the standard frame dispatch functions. Alternatively, you can use the Hub to establish a direct connection between servers (called a broker connection), through which you can send data. This latter technique is most typically used for transmitting audio data from the audio server to the recognizer, and from the synthesizer to the audio server; however, it can be used between any two servers, for any type of data. In this document, we describe and exemplify this brokering tool.

As of version 4.0, you can now encapsulate this brokered data in an object called a broker proxy, for which there is a basic frame object type. These broker proxies improve on the original broker method (which will continue to be supported) in two ways:

As of version 4.0, we recommend that everyone upgrade from the old broker API to the new broker proxy API, because of the advantages it presents. We've constructed an example which consists of two sides of a broker connection which transmits the contents of a file containing raw 8-bit audio data and and outputs the data to the audio device. This example illustrates all the possible combinations of brokering, and we'll refer to this example in the discussion below.

By default, the brokering mechanisms use the timed task loop. For information on how to handle brokering when you can't use the timed task loop, look here.

If you use threads, be careful to read the section about delaying broker activation.



The types of broker proxies

Broker proxies are designed specifically for conveying references to data through frames. Typically, what this means is that when you create a broker proxy, you almost always want to wrap a Gal_Object around it. However, this is not always the case. Accordingly, most of the functions in the broker proxy API come in pairs: one for the Gal_Object, and one for the proxy directly.

There are four different types of broker proxies, which can be differentiated by the type information encoded in them, as well as whether they're created from a Gal_Object (see GalSS_ProxifyObject) or just an object type (see GalSS_ProxifyObjectType).

Atomic proxies

These proxies encode the types GAL_FRAME, GAL_STRING, GAL_INT, GAL_FLOAT, and GAL_PROXY (yes, you can wrap a proxy around a proxy, although we can't think of any reason to do so). These proxies are self-terminating, in that you can write exactly one object to them, after which they'll automatically mark themselves as done. If you create the proxy from an object instead of an object type, it'll be marked as done immediately, since creating a proxy from an object is exactly the same as creating it from an object type and writing the object to it. So for atomic proxies, it's most straightforward to create the proxy from an object.

Array proxies

These proxies encode one of the array types: GAL_BINARY, GAL_INT_16, etc. If the proxy is created from an array object, and the object is not expandable (see Gal_BinaryObject), the proxy is self-terminating; otherwise, multiple objects of the appropriate type may be written to it, and it must be terminated explicitly (see GalSS_ProxyDone). These multiple objects are treated as successive portions of the same array.

List proxies

These proxies encode the GAL_LIST type. If the proxy is created from an object, and the object is not expandable (see Gal_ListObject), the proxy is self-terminating; otherwise (see Gal_CreateListObject), multiple objects may be written to it, and it must be terminated explicitly (see GalSS_ProxyDone). These objects may be of any type; each one is interpreted as a member of the list.

List proxies are a little tricky, because lists can appear inside lists. So if you write a list object to a list proxy, the object will be treated as a list within the overall list. If you want to append the new list to the list proxy, you must write the individual elements of the new list. This means that when you create a list proxy from an object, the individual elements of the list object are written, not the list object itself.

"Any" proxies

It's not necessary for a proxy to encode a specific type. If you create a proxy using GalSS_ProxifyObjectType and you use -1 as the type, the proxy will not encode any type. This proxy will not be self-terminating. Like a list proxy, you can write any type of object to it; but unlike a list proxy, the elements you write will not be interpreted as elements of a single object, but rather just an unstructured sequence of objects. This behavior is very similar to the original lower-level broker API; however, you still derive the benefit of having the broker information automatically encoded in a distinguished and recognizable way.

int GalSS_ProxySelfTerminates(GalSS_BrokerProxy *p)
int GalSS_ObjProxySelfTerminates(Gal_Object obj)
Returns 1 if the broker proxy p, or the broker proxy contained in object obj of type GAL_PROXY, is an outbound broker which does not require an explicit call to GalSS_ProxyDone. This will be the case for all non-expandable lists and arrays, and for all other types. All expandable lists and arrays require an explicit call to GalSS_ProxyDone.



Sending broker proxy data

When you create an outbound broker proxy, you can do it either via an object or an object type. We'll illustrate the first situation here. This case is not illustrated in our audio example, mostly because it's more appropriate to atomic proxies; however, for the sake of parallelism, we'll use audio data here as well:

static Gal_Frame prepare_audio_frame(GalSS_Environment *env,
                                     char *filename)
{
  Gal_Frame f = Gal_MakeFrame("main", GAL_CLAUSE);
  int total = 0;
  char *buf = (char *) NULL;
  Gal_Object proxy;
  Gal_Object data;
 
  /* In the code omitted here, the data from the named file is
     read into buf, and total is the number of bytes in buf */
 
  /* .... */

  /* We create an non-expandable binary object from the data in buf.
     The new object will manage buf. */

  data = Gal_BinaryObject((void *) buf, total);

  proxy = GalSS_ObjProxifyObject(env, data, 0, 10);
  if (proxy) {
    Gal_SetProp(f, ":binary_proxy", proxy);
  }
 
  return f;
}

The important steps here are to set up the outgoing broker proxy, sending the frame containing that proxy to the Hub (in this case, by a call to GalSS_EnvWriteFrame, not shown), writing the appropriate data to the broker proxy, and announcing that no more data will be sent.

Step 1: setting up the outgoing broker proxy

GalSS_BrokerProxy *GalSS_ProxifyObject(GalSS_Environment *env, Gal_Object obj, int poll_ms, int timeout_seconds)
Gal_Object GalSS_ObjProxifyObject(GalSS_Environment *env, Gal_Object obj, int poll_ms, int timeout_seconds)
A broker proxy is created based on the object obj. GalSS_ObjProxifyObject wraps a Gal_Object around this proxy, so it can be directly inserted into the frame. An outbound broker is constructed and associated with the broker proxy. The outbound broker uses the server's listener as its listener port. If the server is a Hub client and doesn't have an open listener, the broker sets up a listener for the server to host. The outbound broker  accesses the server via the connection in env. The poll_ms is the time in milliseconds that the timed task loop is supposed to check the broker connection; as with GalIO_ServerInit, 0 is the default (100 ms), a positive number is an actual number of milliseconds, and < 0 means not to set up a timed task. The timeout_seconds is how long before the broker expires; -1 means never expire, 0 means use the default (which is 10 seconds). You can write to an expired broker proxy, but it will accept no more connections; when all the data is written to the existing connections, the broker inside the proxy will be destroyed.

It's important to set the timeout intelligently, because outgoing brokers can accept multiple connections. This means that they must cache the data which is written through them for subsequent subscribers. If the broker never times out, significant memory bloat might result.

If you need to delay broker activation, pass -1 as poll_ms.

Memory management

The proxy owns the object obj; that is, obj will be freed when the proxy is freed. The underlying broker, however, is neither shut down nor destroyed; it manages its own cleanup, and may continue to run after the proxy which created it has been destroyed.

Using object types instead of objects

You may neither need or want to rely on an intermediate Gal_Object to base your broker proxy on. In some circumstances, you may not want to associate a type with the broker proxy at all. For these situations, there's an alternative way of creating an outbound broker, based on object types instead of objects. We illustrate from our audio example:
static Gal_Frame prepare_audio_frame(GalSS_Environment *env,
                                     char *filename)
{
  Gal_Frame f = Gal_MakeFrame("main", GAL_CLAUSE);
  int total = 0;
  char *buf = (char *) NULL;
  Gal_Object proxy;
 
  /* In the code omitted here, the data from the named file is
     read into buf, and total is the number of bytes in buf */
 
  /* .... */

  proxy = GalSS_ObjProxifyObjectType(env, GAL_BINARY, 0, 10);
  if (proxy) {
    GalSS_ObjProxyArrayAdd(proxy, buf, total);
    /* Because the data object is a streaming object, it must
       be marked as done. */
    GalSS_ObjProxyDone(proxy);
    Gal_SetProp(f, ":binary_proxy", proxy);
  }
 
  return f;
}

GalSS_BrokerProxy *GalSS_ProxifyObjectType(GalSS_Environment *env, Gal_ObjectType t, int poll_ms, int timeout_seconds)
Gal_Object GalSS_ObjProxifyObjectType(GalSS_Environment *env, Gal_ObjectType t, int poll_ms, int timeout_seconds)
Identical to GalSS_ProxifyObject, except that the proxy encodes type information without an actual Gal_Object to base it on. If t is -1, the proxy is an "any" proxy, which has no type restrictions. All array, list and "any" proxies created with these functions are not self-terminating.

If you need to delay broker activation, pass -1 as poll_ms.

Step 2: writing data to the broker proxy, if necessary

If you create a proxy from an object, and the proxy is self-terminating, you won't be able to write any more data to the proxy, even if you wanted to. However, if you create a proxy from an object type, or you created it from an expandable list or array object, you can write more data to the proxy.

int GalSS_ProxyWrite(GalSS_BrokerProxy *p, Gal_Object obj, int manage_memory)
int GalSS_ObjProxyWrite(Gal_Object proxy_obj, Gal_Object obj, int manage_memory)
Writes obj to proxy p, or to the proxy in the Gal_Object proxy_obj. The data in the object will be written to the underlying outbound proxy. If the proxy was created from an expandable object, obj will be added to the underlying object. If manage_memory is 1, the proxy will take responsibility for freeing obj when it's done with it. Returns 1 for success, -1 for failure.

int GalSS_ProxyListAdd(GalSS_BrokerProxy *p, Gal_Object elt)
int GalSS_ObjProxyListAdd(Gal_Object obj, Gal_Object elt)
Writes elt to the proxy p, or the proxy in the Gal_Object obj, if that proxy represents an expandable list type which has not been terminated yet, otherwise does nothing. Returns 1 for success, -1 for failure. If it succeeds, it takes responsibility for freeing elt when it's done with it. If the proxy was created from an object, elt will be added to the underlying object. 

int GalSS_ProxyArrayAdd(GalSS_BrokerProxy *p, void *data, int size)
int GalSS_ObjProxyArrayAdd(Gal_Object obj, void *data, int size)
Writes data of size elements to the array proxy p, or the array proxy in the object obj, if that proxy represents an expandable array type which has not been terminated yet, otherwise does nothing. Returns 1 for success, -1 for failure. If the proxy was created from an object, data will be added to the underlying object. This function is convenient for streaming array data to a broker without wrapping a Gal_Object around it.

Step 3: announcing the end of the data, if necessary

If proxies are self-terminating, you don't need to explicitly mark them as done. If you do need to do so, use this function.

void GalSS_ProxyDone(GalSS_BrokerProxy *p)
void GalSS_ObjProxyDone(Gal_Object obj)
Marks the outbound proxy p, or the proxy in the object obj, as done. This means that no further data can be written to it.


Streaming data to proxies

There is a minor problem with the example we've presented here: the broker proxy setup both establishes the broker proxy and writes the data. In an application without threads, this means that the receiving broker won't be able to connect to the sending broker until all the data is written to the outgoing broker proxy (because the server won't have a chance to accept connections for the outgoing broker until the broker setup returns).

For some applications, this may be acceptable, but it's almost certainly the case that you'll want to "stream" audio data, so that the receiving broker can start making use of it as soon as possible (say, to start speech recognition). In these cases, you need to exploit the timed task loop to send outbound data at periodic intervals (say, by polling the audio device and relaying the data when it's available), so that the server has a chance to accept connections.

Here's a version of the broker proxy setup which exemplifies this strategy. This is a bit unnatural, since all the data is immediately available in a file, but it illustrates the streaming:

static Gal_Frame prepare_audio_frame(GalSS_Environment *env,
                                     char *filename)
{
  Gal_Frame f = Gal_MakeFrame("main", GAL_CLAUSE);
  FILE *fp;
  OutData *o = (OutData *) calloc(1, sizeof(OutData));
  Gal_Object proxy;
 
  /* In the code omitted here, the file is opened */
 
  /* .... */

  proxy = GalSS_ObjProxifyObjectType(env, GAL_BINARY, 0, 10);
  if (proxy) {
    o->proxy = proxy;
    o->fp = fp;
    /* The frame f will be freed on write, so we have to
       copy it to make sure the proxy survives, since
       we plan on writing to it. */
    Gal_SetProp(f, ":binary_proxy", Gal_CopyObject(proxy));
    Gal_AddTask(__write_data, (void *) o, 10, 0, NULL);
  }
 
  return f;
}
Note that we use Gal_AddTask to set up a task to start writing the data in 10 milliseconds. Here's the task:
static void __write_data(Gal_TaskPkg *p)
{
  OutData *o = (OutData *) Gal_TaskPkgData(p);
  FILE *fp = o->fp;  
  char *buf = (char *) malloc(BLOCKSIZE * sizeof(char));
  int count = fread(buf, sizeof(char), BLOCKSIZE, fp);
 
  if (count) {
    GalSS_ObjProxyArrayAdd(o->proxy, buf, count);
  }
  free(buf);
  if (count == BLOCKSIZE) {
    /* Not done yet. */
    Gal_ReAddTask(p, (void *) o, 10, 0, NULL);
  } else {
    GalSS_ObjProxyDone(o->proxy);
    /* Freeing the proxy doesn't free the underlying
       broker; it frees itself when it's done writing
       data to its connections. */
    GalSS_FreeBrokerProxy(o->proxy);
    fclose(fp);
    free(o);
  }
}
We keep resetting the task until we run out of data.


Receiving broker proxies

There are three different ways of receiving data via broker proxies: either by object, by typed callback, or by untyped callback. We cover each of these in turn.

Receiving data by object

This is by far the simplest way of receiving broker data. Here's how it works in our audio example:
Gal_Frame receive_audio(Gal_Frame f, void *server_data)
{  
  GalSS_Environment *env = (GalSS_Environment *) server_data;
  GalSS_BrokerProxy *proxy;

  /* We get the proxy object, and if it's a proxy, we
     synchronously retrieve the data. The object which is
     returned is cached in the proxy object, which retains
     "ownership" of it. Therefore, it will be freed when
     the dispatch function exits. */
  proxy = Gal_GetObject(f, ":binary_proxy");

  if (Gal_Proxyp(proxy)) {
    Gal_Object bdata = GalSS_ObjUnproxifyObject(env, proxy);

    if (Gal_Binaryp(bdata)) {
      int size;
      void *data;

      data = Gal_BinaryValue(bdata, &size);

      /* Now, do with the data whatever you want. */

      /* ... */
    }
  }
  return (Gal_Frame) NULL;
}
When the server receives the receive_audio message, it invokes this dispatch function. The information in the broker proxy is used to contact the sending server; the data is transferred and converted into a Gal_Object.

Gal_Object GalSS_UnproxifyObject(GalSS_Environment *env, GalSS_BrokerProxy *p)
Gal_Object GalSS_ObjUnproxifyObject(GalSS_Environment *env, Gal_Object obj)
The proxy p, or the proxy inside the Gal_Object obj, creates an inbound broker which contacts the sending server and transfers the data. The data is converted into a Gal_Object, which is returned.

Memory management

The proxy retains ownership of the object it returns; if you want to hold onto it beyond the life of the proxy, you must copy it.

Receiving data by typed callback

Receiving data by object, while convenient, exhibits problems analogous to those faced on the outbound side: without threads, nothing happens in the server until all the data is received. You may prefer to set up a callback to handle the data, either bit by bit or when it's all been received.

Note that your choice of whether to retrieve by object or by typed callback is independent of how the broker proxy was created. You can create an outbound broker from an object, and read the inbound side by stream, or vice versa. The outbound side will determine how many chunks of data you get, however. If you write a single large array to an outbound broker, you'll read a single chunk on the inbound side, even if you try to read by stream.

Here's an illustration of this second strategy.

/* This is the data handler for the proxy. In the way
   we've set up, it's called once, at the end. Remember, we
   own the object passed in, so we have to free it. */

void proxy_audio_handler(GalSS_Environment *env, Gal_ObjectType proxy_type,
                         Gal_Object elt, void *caller_data)
{
  if (proxy_type == GAL_BINARY) {
    int size;
    void *data;

    data = Gal_BinaryValue(elt, &size);

    /* Now, do with the data whatever you want. */

    /* ... */
  }
  Gal_FreeObject(elt);
}

/* This is the done handler for the proxy. */

void __proxy_report_done(GalSS_Environment *env, Gal_ObjectType proxy_type,
                         void *caller_data)
{
  __env_notify(env, "Audio received.");
}

/* This is the abort handler for the proxy. */

void __proxy_report_abort(GalSS_Environment *env, Gal_ObjectType proxy_type,
                          void *caller_data)
{
  __env_notify(env, "Audio aborted.");
}

Gal_Frame receive_audio(Gal_Frame f, void *server_data)
{  
  GalSS_Environment *env = (GalSS_Environment *) server_data;
  GalSS_BrokerProxy *proxy;

  /* We get the proxy object, and proceed asynchronously. The
     object passed into the callback is owned by the callback.
     We use a non-immediate callback, which will cache all the
     data for us and call the data handler when it's done. */
  proxy = Gal_GetObject(f, ":binary_proxy");

  if (Gal_Proxyp(proxy)) {
    GalSS_ObjUnproxify(env, proxy, proxy_audio_handler,
                       __proxy_report_done, __proxy_report_abort,
                       0, 0, NULL, NULL);

    }
  }
  return (Gal_Frame) NULL;
}
typedef void (*GalSS_ProxyDataHandler)(GalSS_Environment *env, Gal_ObjectType proxy_type, Gal_Object elt, void *caller_data);
This is the function signature for the data callbacks. The proxy_type is the type associated with the proxy; it can be -1, indicating an "any" proxy. The elt is the data; it can either represent all the data read, or a portion of it, depending on how the inbound proxy is set up. The caller_data is arbitrary data passed to the inbound proxy when it's set up. 

typedef void (*GalSS_ProxyDataEventHandler)(GalSS_Environment *env, Gal_ObjectType proxy_type, void *caller_data);
This is the function signature for the event callbacks associated with the proxy. The proxy_type is the type associated with the proxy; it can be -1, indicating an "any" proxy. The caller_data is arbitrary data passed to the inbound proxy when it's set up.

void GalSS_Unproxify(GalSS_Environment *env, GalSS_BrokerProxy *p, GalSS_ProxyDataHandler fn, GalSS_ProxyDataEventHandler done_fn, GalSS_ProxyDataEventHandler abort_fn, int immediate, int poll_ms, void *caller_data, void (*caller_data_free_fn)(void *))
void GalSS_ObjUnproxify(GalSS_Environment *env, Gal_Object obj, GalSS_ProxyDataHandler fn, GalSS_ProxyDataEventHandler done_fn, GalSS_ProxyDataEventHandler abort_fn, int immediate, int poll_ms, void *caller_data, void (*caller_data_free_fn)(void *))
The call environment env is stored away and made available to fn, done_fn and abort_fn via the function GalSS_BrokerGetEnvironment. The proxy p, or the proxy in the object obj, establishes a broker connection to the appropriate server. The fn is a function of type GalSS_ProxyDataHandler which will be called every time an object is read from the broker stream (if immediate is 1), or once on the accumulated data once all the data has been read (if immediate is 0). The done_fn is called on the proxy after all the data has been processed; the abort_fn is called if the connection aborts before completing. The caller_data is arbitrary data which will be stored in the broker proxy, and will be freed when the broker proxy is destroyed using the caller_data_free_fn. This data is passed to all the callbacks. As usual, poll_ms is the number of milliseconds for the timed task; 0 means 100 ms, > 0 is a value in ms, < 0 means not to set up a timed task.

If you need to delay broker activation, pass -1 as poll_ms.

Memory management

The data passed to the data handler callback function is not freed by the caller. It is up to the callback function itself to free the data when it's done with it.

Receiving data by untyped callback

This final method is the closest method to the original broker API. The broker proxy doesn't do any work for you beyond encapsulating the contact information; you're entirely responsible for collecting the data and establishing the callbacks. We don't recommend using this mechanism, even if you're decoding an "any" proxy. However, if you want a simple upgrade path to using proxies without having to rewrite the code for the receiving end, this is the path of least resistance.

Here's an example of a dispatch function:

Gal_Frame receive_audio(Gal_Frame f, void *server_data)
{
  DataHandler *d = (DataHandler *) malloc(sizeof(DataHandler));
  GalSS_Environment *env = (GalSS_Environment *) server_data;
  GalIO_BrokerStruct *b;
  GalSS_ProxyObject *proxy = Gal_GetObject(f, ":binary_proxy");
  d->data_buf = (char *) NULL;
  d->size = 0;
  if (proxy) {
    b = GalSS_EnvBrokerProxyObjInInit(env, proxy, env_audio_handler,
                                      0, d, __FreeDataHandler);
    if (b) {  
      GalIO_AddBrokerCallback(b, GAL_BROKER_ABORT_EVENT,
                              __report_abort, (void *) NULL);
      GalIO_AddBrokerCallback(b, GAL_BROKER_DATA_DONE_EVENT,
                              __report_done, (void *) NULL);
      GalIO_SetBrokerActive(b); 
    }
  } else {
    free(d);
  }
  return (Gal_Frame) NULL;
}

GalIO_BrokerStruct *GalSS_EnvBrokerProxyInInit(GalSS_Environment *env, GalSS_BrokerProxy *p, GalIO_BrokerDataHandler fnptr, int poll_ms, void *refptr, void (*free_fn)(void *))
GalIO_BrokerStruct *GalSS_EnvBrokerProxyObjInInit(GalSS_Environment *env, Gal_Object proxy, GalIO_BrokerDataHandler fnptr, int poll_ms, void *refptr, void (*free_fn)(void *))
Identical to GalIO_EnvBrokerDataInInit, except that all connection information is stored in the proxy p, or in the proxy in the object proxy. For details about the types of the callbacks, etc., see GalIO_EnvBrokerDataInInit.  


Administrative functions for broker proxy objects

GalSS_BrokerProxy *GalSS_CreateBrokerProxy(const char *host, int port, const char *call_id, Gal_ObjectType object_type, Gal_Object obj_data)
This function creates a broker proxy which represents obj_data of obj_type being available at host on port, indexed by the unique call_id. This function is used internally; the programmer should never need it.

 void GalSS_FreeBrokerProxy(GalSS_BrokerProxy *p)
The broker proxy p is freed. If p contains a Gal_Object (either by virtue of being an outbound broker wrapped around an object or an inbound broker which has read its data), the object is freed using Gal_FreeObject.

GalSS_BrokerProxy *GalSS_CopyBrokerProxy(GalSS_BrokerProxy *bp)
The broker proxy bp is copied. If bp contains a Gal_Object (either by virtue of being an outbound broker wrapped around an object or an inbound broker which has read its data), the object is copied using Gal_CopyObject.

GalIO_BrokerStruct *GalSS_BrokerProxyBroker(GalSS_BrokerProxy *bp)
Returns the broker object associated with the broker proxy bp.  This broker can be an inbound or outbound broker. It's not advisable to manipulate this broker object directly, except to start the broker.

Gal_ObjectType GalSS_BrokerProxyObjectType(GalSS_BrokerProxy *bp)
Returns the object type associated with the broker proxy bp. The type can be -1, which means that there are no type restrictions imposed on the broker proxy.

Gal_Object GalSS_BrokerProxyObject(GalSS_BrokerProxy *bp)
Returns the object associated with the broker proxy bp. If you use this object for anything, remember that the broker proxy "owns" it, so if you intend that it outlast the broker proxy, you should copy the object using Gal_CopyObject before you store it anywhere.

GAL_SOCKET GalSS_GetBrokerProxySocket(GalSS_BrokerProxy *bp)
Returns the socket associated with the inbound broker proxy bp.

void GalSS_ForceProxyExpiration(GalSS_BrokerProxy *bp)
Forces the broker associated with the outbound broker proxy bp to time out. See GalIO_ForceBrokerExpiration


Sending broker data

We now turn to the original broker API. Here's a code fragment which illustrates how the sender uses the broker connection:
static Gal_Frame prepare_audio_frame(GalSS_Environment *env,
                                     char *filename)
{
  Gal_Frame f = Gal_MakeFrame("main", GAL_CLAUSE);
  int total = 0;
  char *buf = (char *) NULL;
  GalIO_BrokerStruct *b;
  /* In the code omitted here, the data from the named file is
     read into buf, and total is the number of bytes in buf */
  /* .... */
  /* Now that we have the audio, we write the binary data
     through the broker. */
  b = GalIO_BrokerDataOutInit(GalSS_EnvComm(env), 0, 10);
  if (b && (GalIO_GetBrokerListenPort(b) > 0)) {
    GalIO_BrokerPopulateFrame(b, f, ":binary_host", ":binary_port");
    GalIO_BrokerWriteBinary(b, buf, total);
    GalIO_BrokerDataOutDone(b);
  }
  free(buf);
  return f;
}
 
The important steps here are to set up the outgoing broker stream, constructing a well-formed frame for initiating a broker connection, sending the frame to the Hub (in this case, by a call to GalSS_EnvWriteFrame, not shown), writing the appropriate data to the connection, and announcing that no more data will be sent.

Step 1: setting up the outgoing broker stream

GalIO_BrokerStruct *GalIO_BrokerDataOutInit(GalIO_CommStruct *gcomm, int poll_ms, int timeout_seconds)
The outgoing broker uses the server's listener as its listener port. If the server is a Hub client and doesn't have an open listener, the broker sets up a listener for the server to host. The outgoing broker accesses the server via gcomm. The poll_ms and timeout_seconds are as for GalSS_ProxifyObject;  see that function for more details about the broker object.

If you need to delay broker activation, pass -1 as poll_ms.

Step 2: constructing a well-formed frame

If you're not using a broker proxy, you must ensure that all the information that would go in the proxy (host, port, unique ID) is inserted into the outgoing frame (and is relayed to the receiving server by the Hub). A well-formed frame for initiating a broker connection contains the following keys: As of version 3.0, it is no longer the programmer's responsibility to construct a unique call ID for the brokers. In addition to being more convenient, much existing brokering code doesn't take into account the possibility that servers may have multiple outgoing brokers simultaneously available.

void GalIO_BrokerPopulateFrame(GalIO_BrokerStruct *b, Gal_Frame f, const char *host_key, const char *port_key)
Populates the frame f with the appropriate broker information, including the call ID, host and port. The host_key is the key to store the hostname in the frame, and the port_key is the key to store the port in. The call ID is stored in the key designated by GAL_BROKER_CALL_ID_FRAME_KEY.

unsigned short GalIO_GetBrokerListenPort(GalIO_BrokerStruct *b)
Returns the port number the broker b is listening on. If this value is not 0, the broker has been set up correctly.

char *GalIO_GetBrokerCallID(GalIO_BrokerStruct *b)
Returns the call ID the broker b is using. You should not typically need to use this function.

void GalIO_FrameSetBrokerCallID(Gal_Frame f, const char *call_id)
Sets the unique ID for broker confirmation. You should not typically need to use this function.

char *GalIO_IPAddress(void)
This function returns the IP address of the host, for use in creating contact information for the broker connection. You should not typically need to use this function.

Step 3: writing data to the broker

Since brokers now accept multiple connections, all data which is written to an outgoing broker is cached until the broker expires. Each new client is guaranteed of getting all the data written to the broker, in the order in which it was written.

int GalIO_BrokerWriteFrame(GalIO_BrokerStruct *b, Gal_Frame frame)
int GalIO_BrokerWriteInt(GalIO_BrokerStruct *b, int i)
int GalIO_BrokerWriteFloat(GalIO_BrokerStruct *b, float f)
int GalIO_BrokerWriteList(GalIO_BrokerStruct *b, Gal_Object *elts, int n_elts)
int GalIO_BrokerWriteString(GalIO_BrokerStruct *b, char *str)
int GalIO_BrokerWriteBinary(GalIO_BrokerStruct *b, void *data, int n_bytes)
int GalIO_BrokerWriteInt16(GalIO_BrokerStruct *b, void *data, int n_ints)
int GalIO_BrokerWriteInt32(GalIO_BrokerStruct *b, void *data, int n_ints)
int GalIO_BrokerWriteInt64(GalIO_BrokerStruct *b, void *data, int n_ints)
int GalIO_BrokerWriteFloat32(GalIO_BrokerStruct *b, void *data, int n_floats)
int GalIO_BrokerWriteFloat64(GalIO_BrokerStruct *b, void *data, int n_floats)
These functions send data through the broker connection. The encoding for these elements is identical to their encoding within frames during message transport. These functions correspond in the obvious way to the Communicator object types.

int GalIO_BrokerWriteObject(GalIO_BrokerStruct *b, Gal_Object o)
Write a Gal_Object of any type through the broker connection. The encoding for these elements is identical to their encoding within frames during message transport.

Step 4: announcing the end of the data

void GalIO_BrokerDataOutDone(GalIO_BrokerStruct *b)
This function should be called on the broker structure when all the data has been sent. If you do not call this function, the incoming broker on the other end of the broker connection will never terminate.

voidGalIO_ForceBrokerExpiration(GalIO_BrokerStruct *b)
This function causes the broker to expire immediately. You can use this in conjunction with GalIO_AddBrokerCallback to force a broker to expire after it's accepted a certain number of connections, for instance.

int GalIO_BrokerIsDone(GalIO_BrokerStruct *b)
Returns 1 if the broker is already marked as done, 0 otherwise.


Streaming broker data

The streaming problem described for proxies also exists for the old API, for the same reasons. Here's the solution to that problem in terms of the old API:
static Gal_Frame prepare_audio_frame(GalIO_CommStruct *gcomm, char *filename)
{
  Gal_Frame f = Gal_MakeFrame("main", GAL_CLAUSE);
  FILE *fp;
  GalIO_BrokerStruct *b;
  OutData *o = (OutData *) calloc(1, sizeof(OutData));
  /* In the code omitted here, the file is opened */
  /* .... */
  /* If we're the sending direction, we read binary data from the file,
     relaying it to the broker, until we find EOF. */
  b = GalIO_BrokerDataOutInit(gcomm, 0, 10);
  if (b && (GalIO_GetBrokerListenPort(b) > 0)) {
    GalIO_BrokerPopulateFrame(b, f, ":binary_host", ":binary_port");
    o->fp = fp;
    o->b = b;
    Gal_AddTask(__write_data, (void *) o, 10, 0, NULL);
  }
  return f;
}

static void __write_data(Gal_TaskPkg *p) {
  OutData *o = (OutData *) Gal_TaskPkgData(p);
  FILE *fp = o->fp;
  GalIO_BrokerStruct *b = o->b;
  char *buf = (char *) malloc(BLOCKSIZE * sizeof(char));
  int count;
  count = fread(buf, sizeof(char), BLOCKSIZE, fp);
  if (count) {
    GalIO_BrokerWriteBinary(b, buf, count);
  }
  free(buf);
  if (count == BLOCKSIZE) {
    /* Not done yet. */
    Gal_ReAddTask(p, (void *) o, 10, 0, NULL);
  } else {
    GalIO_BrokerDataOutDone(b);
    fclose(fp);
    free(o);
  }
}


Receiving broker data

Here's the other end of the connection from our example, using the old broker API. The DataHandler structure is not part of the Communicator package; we use it for illustration:
Gal_Frame receive_audio(Gal_Frame f, void *server_data)
{
  DataHandler *d = (DataHandler *) malloc(sizeof(DataHandler));
  GalIO_BrokerStruct *b;
  char *host = Gal_GetString(f, ":binary_host");
  int port = Gal_GetInt(f, ":binary_port");
  d->data_buf = (char *) NULL;
  d->size = 0;
  if (host && port) {
    b = GalSS_EnvBrokerDataInInit((GalSS_Environment *) server_data,
                                  host, port, f,
                                  env_audio_handler, 0, d, __FreeDataHandler);
    if (b) {  
      GalIO_AddBrokerCallback(b, GAL_BROKER_ABORT_EVENT,
                              __report_abort, (void *) NULL);
      GalIO_AddBrokerCallback(b, GAL_BROKER_DATA_DONE_EVENT,
                              __report_done, (void *) NULL);
      GalIO_SetBrokerActive(b); 
    }
  } else {
    free(d);
  }
  return (Gal_Frame) NULL;
}
When the server receives the receive_audio message, it invokes this dispatch function. If it can find the appropriate host and port set, it sets up a broker receiving connection, makes it active, and exits. This connection takes a callback function and data to be stored in the broker structure for the callback function to use. If the broker is set up appropriately, this function will also set up other callbacks which are invoked when the broker is notified of a broker abort or that data is done (see the documentation on GalIO_AddBrokerCallback and the event-driven programming model in general for further details).

GalIO_BrokerStruct *GalSS_EnvBrokerDataInInit(GalSS_Environment *env, const char *host, unsigned short port, Gal_Frame frame, GalIO_BrokerDataHandler fnptr, int poll_ms, void *refptr, void (*free_fn)(void *))
The call environment env is stored away and made available to the broker data handler fnptr via the function GalSS_BrokerGetEnvironment. The host and port are the location of the server to contact. The frame is the frame which delivered the connection message, which is forwarded to the other end of the broker connection to verify that the correct connection is being made. It must contain the :call_id key. The fnptr is a function of type GalIO_BrokerDataHandler which will be called when there is data to be read, and the refptr is arbitrary data which will be stored in the broker structure (for instance, an object to collect the binary data, as show in the example here) and will be freed when the broker is destroyed using the free_fn. This data can be retrieved using the function GalIO_GetBrokerData, as illustrated below. As usual, poll_ms is the number of milliseconds for the timed task; 0 means 100 ms, > 0 is a value in ms, < 0 means not to set up a timed task.

If you need to delay broker activation, pass -1 as poll_ms.

Memory management

The data passed to the callback function is not freed by the caller. It is up to the callback function itself to free the data when it's done with it.
Here's an example of a callback function:
static void env_audio_handler(GalIO_BrokerStruct *broker_struct,
                              void *data, Gal_ObjectType data_type,
                              int n_samples)
{
  DataHandler *d = (DataHandler *) GalIO_GetBrokerData(broker_struct);
  switch (data_type) {
  case GAL_BINARY:
    if (d->data_buf)
      d->data_buf = (char *) realloc(d->data_buf, n_samples + d->size);
    else
      d->data_buf = (char *) malloc(n_samples + d->size);
    bcopy(data, d->data_buf + d->size, n_samples);
    d->size += n_samples;
    free(data);
    break;
  default:
    GalUtil_Warn("Unknown data type %s\n", Gal_ObjectTypeString(data_type));
  }
}
 
You can see how the types which are checked correspond to the types of the data which are sent. You can also see how the DataHandler object is retrieved from the broker structure. Finally, observe that the callback function frees the data it's passed after processing it.

When the broker receives a notification that it's done, it calls its GAL_BROKER_DATA_DONE_EVENT callback to finish things up (in the case of this example, the callback writes the audio data to /dev/audio and sends a notification message back to the Hub). Here's an example of a broker event callback, in this case the one that's fired when the broker is notified of an abort:

void __report_abort(GalIO_BrokerStruct *b, void *data)
{
  Gal_Frame f;
  GalSS_Environment *env = GalSS_BrokerGetEnvironment(b);

  f = Gal_MakeFrame("notify", GAL_CLAUSE);
  Gal_SetProp(f, ":notification", Gal_StringObject("Audio aborted."));
  GalSS_EnvWriteFrame(env, f, 0);
  Gal_FreeFrame(f);
}
You can see here how the callback uses GalSS_BrokerGetEnvironment to send a message with the appropriate context to the Hub.

void GalIO_SetBrokerActive(GalIO_BrokerStruct *b)
Enables the broker to process the data handler. Nothing will happen with the data until this function is called on the broker structure. This is to allow the user to control when the broker structures are activated (if, for instance, they play audio output as a side effect).

typedef void (*GalIO_BrokerDataHandler)(GalIO_BrokerStruct *broker_struct, void *object, Gal_ObjectType object_type, int object_count)
This is the type of the callback functions. The broker_struct is the broker structure which has been set up to receive the data. The object is the data itself. The type is the type of the object, and the object_count is how many elements of the data there are (this is interesting mostly in the case of array data sent by GalIO_BrokerWriteBinary, GalIO_BrokerWriteInt16, etc.).

void *GalIO_GetBrokerData(GalIO_BrokerStruct *b)
This function retrieves from the broker structure b the data which was passed in to the refptr argument to GalIO_CommBrokerDataInInit.

void GalIO_SetBrokerData(GalIO_BrokerStruct *b, void *caller_data, void (*free_fn)(void *))
If you need to update the caller data at any point, this function will free the old data using the previously set free function and update the caller data to caller_data and the free function to free_fn.

void GalIO_BrokerDataDone(GalIO_BrokerStruct *b)
This function should be called on the broker structure if the client decides it's done before all the data is read.

void GalSS_BrokerSetEnvironment(GalIO_BrokerStruct *b, GalSS_Environment *env)
Sets the environment associated with the broker b to env. You should not typically need to use this function.

GalSS_Environment *GalSS_BrokerGetEnvironment(GalIO_BrokerStruct *b)
Retrieves the environment associated with the broker b.

Gal_Frame GalIO_GetBrokerFrame(GalIO_BrokerStruct *b)
Returns the internal copy of the frame passed to GalIO_CommBrokerDataInInit. You should not typically need to use this function.

If you're dealing with multiple incoming broker objects simultaneously, you might wish to use the broker queues. Broker objects can be connected in a doubly-linked list for the purposes of ordering their processing.

GalIO_BrokerStruct *GalIO_BrokerStructQueueAppend(GalIO_BrokerStruct *b, GalIO_BrokerStruct *bqueue)
Adds the broker structure b to the end of the queue bqueue. If bqueue is NULL, b is returned, otherwise bqueue is returned.

GalIO_BrokerStruct *GalIO_BrokerStructQueuePop(GalIO_BrokerStruct *bqueue)
Removed all finished broker structure from the queue and returns the broker structure which corresponds to the new head of the active queue. The broker structure is destroyed.

GalIO_BrokerStruct *GalIO_BrokerStructDequeue(GalIO_BrokerStruct *b, GalIO_BrokerStruct *bqueue)
Removes a broker structure from the queue and returns the new queue.


Delaying broker activation

When you construct a broker or a broker proxy, typically the task associated with the broker proxy is started immediately. In some cases, especially in threading, this isn't what you want. If you need to modify the broker in any way, and you expect to run the code in threaded mode, you should pass in -1 for the value of poll_ms in all these functions and use GalIO_CommStartBroker or GalSS_EnvStartBroker when the broker is set up. If you're using a proxy, use the function GalSS_BrokerProxyBroker to access the underlying broker.

Here's a list of functions which modify the broker or proxy, which would require you to delay broker activation in the threaded case:

Once all the changes you want are set, you can start the broker using GalIO_CommStartBroker or GalSS_EnvStartBroker. Here's a version of an example of receiving broker data which uses this approach: 
Gal_Frame receive_audio(Gal_Frame f, void *server_data)
{
  DataHandler *d = (DataHandler *) malloc(sizeof(DataHandler));
  GalIO_BrokerStruct *b;
  char *host = Gal_GetString(f, ":binary_host");
  int port = Gal_GetInt(f, ":binary_port");
  d->data_buf = (char *) NULL;
  d->size = 0;
  if (host && port) {
    b = GalSS_EnvBrokerDataInInit((GalSS_Environment *) server_data,
                                  host, port, f,
                                  env_audio_handler, -1, d, __FreeDataHandler);

    if (b) {  
      GalIO_AddBrokerCallback(b, GAL_BROKER_ABORT_EVENT,
                              __report_abort, (void *) NULL);
      GalIO_AddBrokerCallback(b, GAL_BROKER_DATA_DONE_EVENT,
                              __report_done, (void *) NULL);
      GalIO_SetBrokerActive(b);
      GalSS_EnvStartBroker((GalSS_Environment *) server_data, b, 0);
    }
  } else {
    free(d);
  }
  return (Gal_Frame) NULL;
}

void GalIO_CommStartBroker(GalIO_CommStruct *gcomm, GalIO_BrokerStruct *b, int poll_ms)
Updates the polling interval for the broker b to poll_ms, and takes the appropriate actions associated with broker startup for the connection gcomm to start the polling task.

void GalSS_EnvStartBroker(GalSS_Environment *env, GalIO_BrokerStruct *b, int poll_ms)
Identical to GalIO_CommStartBroker, except uses the connection stored in the environment env


Backward compatibility

These functions are provided for backward compatibility with releases of the Galaxy Communicator infrastructure previous to 3.0. They are all inadequate in one way or another, and should be avoided. They will not be deprecated or removed yet, however.

GalIO_BrokerStruct *GalIO_BrokerDataInInit(const char *host, unsigned short port, Gal_Frame frame, GalIO_BrokerDataHandler fnptr, void *caller_data, int poll_ms)
Like GalIO_CommBrokerDataInInit, but doesn't allow the programmer to pass in the host connection or a free function for the data.

void *GalIO_GetBrokerCallerData(GalIO_BrokerStruct *b)
Identical to GalIO_GetBrokerData but does not conform to consistent naming conventions.

typedef void (*GalIO_BrokerDataFinalizer)(GalIO_BrokerStruct *, void *caller_data)
void GalIO_BrokerSetFinalizer(GalIO_BrokerStruct *b, GalIO_BrokerDataFinalizer finalizer)
Sets a finalizer for the broker structure. This finalizer is called when the broker structure is destroyed. This can be used, for instance, to pop a broker queue and activate the next element in the queue. The finalizer is called with the contents of the refptr argument passed in to GalIO_BrokerDataInInit. This function has been superseded by the addition of a free function in GalIO_CommBrokerDataInInit and by the callback architecture described here.

GalIO_BrokerStruct *GalIO_CommBrokerDataInInit(GalIO_CommStruct *host_gcomm, const char *host, unsigned short port, Gal_Frame frame, GalIO_BrokerDataHandler fnptr, int poll_ms, void *caller_data, void (*caller_data_free_fn)(void *))
This function provides almost the same functionality as GalSS_EnvBrokerDataInInit, except that the host_gcomm is the connection which hosts the callback from which the broker was created; it is provided to support the event-driven programming model. All other arguments are as they are for GalSS_EnvBrokerDataInInit.


License / Documentation home / Help and feedback
Last updated July 9, 2002