![]() |
License / Documentation home / Help and feedback | ![]() |
In Communicator, there are two ways of moving data from server to server. First, you can package the data into a frame and send it to another server via the Hub, using the standard frame dispatch functions. Alternatively, you can use the Hub to establish a direct connection between servers (called a broker connection), through which you can send data. This latter technique is most typically used for transmitting audio data from the audio server to the recognizer, and from the synthesizer to the audio server; however, it can be used between any two servers, for any type of data. In this document, we describe and exemplify this brokering tool.
As of version 4.0, you can now encapsulate this brokered data in an object called a broker proxy, for which there is a basic frame object type. These broker proxies improve on the original broker method (which will continue to be supported) in two ways:
As of version 4.0, we recommend that everyone upgrade from the old broker
API to the new broker proxy API, because of the advantages it presents.
We've constructed an example which consists
of two sides of a broker connection which transmits the contents of
a file containing raw 8-bit audio data and and outputs the data to the
audio device. This example illustrates all the possible combinations
of brokering, and we'll refer to this example in the discussion below.
By default, the brokering mechanisms use the timed task loop. For information
on how to handle brokering when you can't use the timed task loop, look
here.
If you use threads, be careful to read the section about delaying broker activation.
There are four different types of broker proxies, which can be differentiated by the type information encoded in them, as well as whether they're created from a Gal_Object (see GalSS_ProxifyObject) or just an object type (see GalSS_ProxifyObjectType).
List proxies are a little tricky, because lists can appear inside lists.
So if you write a list object to a list proxy, the object will be treated
as a list within the overall list. If you want to append the new list
to the list proxy, you must write the individual elements of the new list.
This means that when you create a list proxy from an object, the individual
elements of the list object are written, not the list object itself.
int GalSS_ProxySelfTerminates(GalSS_BrokerProxy
*p)
int GalSS_ObjProxySelfTerminates(Gal_Object
obj)
Returns 1 if the broker proxy p, or the broker proxy
contained in object obj of type GAL_PROXY, is an outbound broker
which does not require an explicit call to GalSS_ProxyDone. This will be the case
for all non-expandable lists and arrays, and for all other types. All
expandable lists and arrays require an explicit call to GalSS_ProxyDone.
When you create an outbound broker proxy, you can do it either via an object
or an object type. We'll illustrate the first situation here. This case is
not illustrated in our audio example, mostly because it's more appropriate
to atomic proxies; however, for the sake
of parallelism, we'll use audio data here as well:
static Gal_Frame prepare_audio_frame(GalSS_Environment *env,The important steps here are to set up the outgoing broker proxy, sending the frame containing that proxy to the Hub (in this case, by a call to GalSS_EnvWriteFrame, not shown), writing the appropriate data to the broker proxy, and announcing that no more data will be sent.
char *filename)
{
Gal_Frame f = Gal_MakeFrame("main", GAL_CLAUSE);
int total = 0;
char *buf = (char *) NULL;
Gal_Object proxy;
Gal_Object data;
/* In the code omitted here, the data from the named file is
read into buf, and total is the number of bytes in buf */
/* .... */
/* We create an non-expandable binary object from the data in buf.
The new object will manage buf. */
data = Gal_BinaryObject((void *) buf, total);
proxy = GalSS_ObjProxifyObject(env, data, 0, 10);
if (proxy) {
Gal_SetProp(f, ":binary_proxy", proxy);
}
return f;
}
GalSS_BrokerProxy *GalSS_ProxifyObject(GalSS_Environment
*env, Gal_Object obj, int poll_ms, int timeout_seconds)
Gal_Object GalSS_ObjProxifyObject(GalSS_Environment
*env, Gal_Object obj, int poll_ms, int timeout_seconds)
A broker proxy is created based on the object obj. GalSS_ObjProxifyObject
wraps a Gal_Object around this proxy, so it can be directly inserted
into the frame. An outbound broker
is constructed and associated with the broker proxy. The outbound broker
uses the server's listener as its listener port. If the server is a Hub client and doesn't have an open
listener, the broker sets up a listener for the server to host. The outbound
broker accesses the server via the connection in env. The poll_ms
is the time in milliseconds that the timed task loop is supposed to check
the broker connection; as with GalIO_ServerInit, 0 is the
default (100 ms), a positive number is an actual number of milliseconds,
and < 0 means not to set up a timed task. The timeout_seconds
is how long before the broker expires; -1 means never expire, 0 means
use the default (which is 10 seconds). You can write to an expired broker
proxy, but it will accept no more connections; when all the data is written
to the existing connections, the broker inside the proxy will be destroyed.
It's important to set the timeout intelligently, because outgoing brokers
can accept multiple connections. This means that they must cache the
data which is written through them for subsequent subscribers. If the
broker never times out, significant memory bloat might result.
If you need to delay broker activation,
pass -1 as poll_ms.
Memory management
The proxy owns the object obj; that is, obj will be freed when the proxy is freed. The underlying broker, however, is neither shut down nor destroyed; it manages its own cleanup, and may continue to run after the proxy which created it has been destroyed.
static Gal_Frame prepare_audio_frame(GalSS_Environment *env,
char *filename)
{
Gal_Frame f = Gal_MakeFrame("main", GAL_CLAUSE);
int total = 0;
char *buf = (char *) NULL;
Gal_Object proxy;
/* In the code omitted here, the data from the named file is
read into buf, and total is the number of bytes in buf */
/* .... */
proxy = GalSS_ObjProxifyObjectType(env, GAL_BINARY, 0, 10);
if (proxy) {
GalSS_ObjProxyArrayAdd(proxy, buf, total);
/* Because the data object is a streaming object, it must
be marked as done. */
GalSS_ObjProxyDone(proxy);
Gal_SetProp(f, ":binary_proxy", proxy);
}
return f;
}
GalSS_BrokerProxy *GalSS_ProxifyObjectType(GalSS_Environment
*env, Gal_ObjectType t, int poll_ms, int timeout_seconds)
Gal_Object GalSS_ObjProxifyObjectType(GalSS_Environment
*env, Gal_ObjectType t, int poll_ms, int timeout_seconds)
Identical to GalSS_ProxifyObject,
except that the proxy encodes type information without an actual Gal_Object
to base it on. If t is -1, the proxy is an "any" proxy, which has no type restrictions. All
array, list and
"any" proxies created with these functions are not self-terminating.
If you need to delay
broker activation, pass -1 as poll_ms.
int GalSS_ProxyWrite(GalSS_BrokerProxy
*p, Gal_Object obj, int manage_memory)
int GalSS_ObjProxyWrite(Gal_Object
proxy_obj, Gal_Object obj, int manage_memory)
Writes obj to proxy p, or to the proxy in the Gal_Object
proxy_obj. The data in the object will be written to the underlying
outbound proxy. If the proxy was created from an expandable object, obj
will be added to the underlying object. If manage_memory is 1,
the proxy will take responsibility for freeing obj when it's done
with it. Returns 1 for success, -1 for failure.
int GalSS_ProxyListAdd(GalSS_BrokerProxy
*p, Gal_Object elt)
int GalSS_ObjProxyListAdd(Gal_Object
obj, Gal_Object elt)
Writes elt to the proxy p, or the proxy in the Gal_Object
obj, if that proxy represents an expandable list type which has
not been terminated yet, otherwise does nothing. Returns 1 for success,
-1 for failure. If it succeeds, it takes responsibility for freeing elt
when it's done with it. If the proxy was created from an object, elt
will be added to the underlying object.
int GalSS_ProxyArrayAdd(GalSS_BrokerProxy
*p, void *data, int size)
int GalSS_ObjProxyArrayAdd(Gal_Object
obj, void *data, int size)
Writes data of size elements to the array proxy
p, or the array proxy in the object obj, if that proxy
represents an expandable array type which has not been terminated yet,
otherwise does nothing. Returns 1 for success, -1 for failure. If the
proxy was created from an object, data will be added to the underlying
object. This function is convenient for streaming array data to a broker
without wrapping a Gal_Object around it.
void GalSS_ProxyDone(GalSS_BrokerProxy
*p)
void GalSS_ObjProxyDone(Gal_Object
obj)
Marks the outbound proxy p, or the proxy in the object
obj, as done. This means that no further data can be written
to it.
For some applications, this may be acceptable, but it's almost certainly the case that you'll want to "stream" audio data, so that the receiving broker can start making use of it as soon as possible (say, to start speech recognition). In these cases, you need to exploit the timed task loop to send outbound data at periodic intervals (say, by polling the audio device and relaying the data when it's available), so that the server has a chance to accept connections.
Here's a version of the broker proxy setup which exemplifies this strategy.
This is a bit unnatural, since all the data is immediately available
in a file, but it illustrates the streaming:
static Gal_Frame prepare_audio_frame(GalSS_Environment *env,Note that we use Gal_AddTask to set up a task to start writing the data in 10 milliseconds. Here's the task:
char *filename)
{
Gal_Frame f = Gal_MakeFrame("main", GAL_CLAUSE);
FILE *fp;
OutData *o = (OutData *) calloc(1, sizeof(OutData));
Gal_Object proxy;
/* In the code omitted here, the file is opened */
/* .... */
proxy = GalSS_ObjProxifyObjectType(env, GAL_BINARY, 0, 10);
if (proxy) {
o->proxy = proxy;
o->fp = fp;
/* The frame f will be freed on write, so we have to
copy it to make sure the proxy survives, since
we plan on writing to it. */
Gal_SetProp(f, ":binary_proxy", Gal_CopyObject(proxy));
Gal_AddTask(__write_data, (void *) o, 10, 0, NULL);
}
return f;
}
static void __write_data(Gal_TaskPkg *p)We keep resetting the task until we run out of data.
{
OutData *o = (OutData *) Gal_TaskPkgData(p);
FILE *fp = o->fp;
char *buf = (char *) malloc(BLOCKSIZE * sizeof(char));
int count = fread(buf, sizeof(char), BLOCKSIZE, fp);
if (count) {
GalSS_ObjProxyArrayAdd(o->proxy, buf, count);
}
free(buf);
if (count == BLOCKSIZE) {
/* Not done yet. */
Gal_ReAddTask(p, (void *) o, 10, 0, NULL);
} else {
GalSS_ObjProxyDone(o->proxy);
/* Freeing the proxy doesn't free the underlying
broker; it frees itself when it's done writing
data to its connections. */
GalSS_FreeBrokerProxy(o->proxy);
fclose(fp);
free(o);
}
}
Gal_Frame receive_audio(Gal_Frame f, void *server_data)When the server receives the receive_audio message, it invokes this dispatch function. The information in the broker proxy is used to contact the sending server; the data is transferred and converted into a Gal_Object.
{
GalSS_Environment *env = (GalSS_Environment *) server_data;
GalSS_BrokerProxy *proxy;
/* We get the proxy object, and if it's a proxy, we
synchronously retrieve the data. The object which is
returned is cached in the proxy object, which retains
"ownership" of it. Therefore, it will be freed when
the dispatch function exits. */
proxy = Gal_GetObject(f, ":binary_proxy");
if (Gal_Proxyp(proxy)) {
Gal_Object bdata = GalSS_ObjUnproxifyObject(env, proxy);
if (Gal_Binaryp(bdata)) {
int size;
void *data;
data = Gal_BinaryValue(bdata, &size);
/* Now, do with the data whatever you want. */
/* ... */
}
}
return (Gal_Frame) NULL;
}
Gal_Object GalSS_UnproxifyObject(GalSS_Environment
*env, GalSS_BrokerProxy *p)
Gal_Object GalSS_ObjUnproxifyObject(GalSS_Environment
*env, Gal_Object obj)
The proxy p, or the proxy inside the Gal_Object obj,
creates an inbound broker which contacts the sending server and transfers
the data. The data is converted into a Gal_Object, which is returned.
Memory management
The proxy retains ownership of the object it returns; if you want to hold onto it beyond the life of the proxy, you must copy it.
Note that your choice of whether to retrieve by object or by typed callback
is independent of how the broker proxy was created. You can create an
outbound broker from an object, and read the inbound side by stream, or
vice versa. The outbound side will determine how many chunks of
data you get, however. If you write a single large array to an outbound
broker, you'll read a single chunk on the inbound side, even if you try
to read by stream.
Here's an illustration of this second strategy.
/* This is the data handler for the proxy. In the waytypedef void (*GalSS_ProxyDataHandler)(GalSS_Environment *env, Gal_ObjectType proxy_type, Gal_Object elt, void *caller_data);
we've set up, it's called once, at the end. Remember, we
own the object passed in, so we have to free it. */
void proxy_audio_handler(GalSS_Environment *env, Gal_ObjectType proxy_type,
Gal_Object elt, void *caller_data)
{
if (proxy_type == GAL_BINARY) {
int size;
void *data;
data = Gal_BinaryValue(elt, &size);
/* Now, do with the data whatever you want. */
/* ... */
}
Gal_FreeObject(elt);
}
/* This is the done handler for the proxy. */
void __proxy_report_done(GalSS_Environment *env, Gal_ObjectType proxy_type,
void *caller_data)
{
__env_notify(env, "Audio received.");
}
/* This is the abort handler for the proxy. */
void __proxy_report_abort(GalSS_Environment *env, Gal_ObjectType proxy_type,
void *caller_data)
{
__env_notify(env, "Audio aborted.");
}
Gal_Frame receive_audio(Gal_Frame f, void *server_data)
{
GalSS_Environment *env = (GalSS_Environment *) server_data;
GalSS_BrokerProxy *proxy;
/* We get the proxy object, and proceed asynchronously. The
object passed into the callback is owned by the callback.
We use a non-immediate callback, which will cache all the
data for us and call the data handler when it's done. */
proxy = Gal_GetObject(f, ":binary_proxy");
if (Gal_Proxyp(proxy)) {
GalSS_ObjUnproxify(env, proxy, proxy_audio_handler,
__proxy_report_done, __proxy_report_abort,
0, 0, NULL, NULL);
}
}
return (Gal_Frame) NULL;
}
typedef void (*GalSS_ProxyDataEventHandler)(GalSS_Environment
*env, Gal_ObjectType proxy_type, void *caller_data);
This is the function signature for the event callbacks associated
with the proxy. The proxy_type is the type associated with the proxy;
it can be -1, indicating an "any" proxy. The
caller_data is arbitrary data passed to the inbound proxy when it's
set up.
void GalSS_Unproxify(GalSS_Environment
*env, GalSS_BrokerProxy *p, GalSS_ProxyDataHandler fn,
GalSS_ProxyDataEventHandler done_fn, GalSS_ProxyDataEventHandler
abort_fn, int immediate, int poll_ms, void *caller_data,
void (*caller_data_free_fn)(void *))
void GalSS_ObjUnproxify(GalSS_Environment
*env, Gal_Object obj, GalSS_ProxyDataHandler fn,
GalSS_ProxyDataEventHandler done_fn, GalSS_ProxyDataEventHandler
abort_fn, int immediate, int poll_ms, void *caller_data,
void (*caller_data_free_fn)(void *))
The call environment env is stored away and made available
to fn, done_fn and abort_fn via the function GalSS_BrokerGetEnvironment. The
proxy p, or the proxy in the object obj, establishes a
broker connection to the appropriate server. The fn is a function
of type GalSS_ProxyDataHandler
which will be called every time an object is read from the broker stream
(if immediate is 1), or once on the accumulated data once all the
data has been read (if immediate is 0). The done_fn is called
on the proxy after all the data has been processed; the abort_fn
is called if the connection aborts before completing. The caller_data
is arbitrary data which will be stored in the broker proxy, and will
be freed when the broker proxy is destroyed using the caller_data_free_fn.
This data is passed to all the callbacks. As usual, poll_ms is the
number of milliseconds for the timed task; 0 means 100 ms, > 0 is
a value in ms, < 0 means not to set up a timed task.
If you need to delay
broker activation, pass -1 as poll_ms.
Memory management
The data passed to the data handler callback function is not freed by the caller. It is up to the callback function itself to free the data when it's done with it.
Here's an example of a dispatch function:
Gal_Frame receive_audio(Gal_Frame f, void *server_data)
{
DataHandler *d = (DataHandler *) malloc(sizeof(DataHandler));
GalSS_Environment *env = (GalSS_Environment *) server_data;
GalIO_BrokerStruct *b;
GalSS_ProxyObject *proxy = Gal_GetObject(f, ":binary_proxy");
d->data_buf = (char *) NULL;
d->size = 0;
if (proxy) {
b = GalSS_EnvBrokerProxyObjInInit(env, proxy, env_audio_handler,
0, d, __FreeDataHandler);
if (b) {
GalIO_AddBrokerCallback(b, GAL_BROKER_ABORT_EVENT,
__report_abort, (void *) NULL);
GalIO_AddBrokerCallback(b, GAL_BROKER_DATA_DONE_EVENT,
__report_done, (void *) NULL);
GalIO_SetBrokerActive(b);
}
} else {
free(d);
}
return (Gal_Frame) NULL;
}
GalIO_BrokerStruct *GalSS_EnvBrokerProxyInInit(GalSS_Environment
*env, GalSS_BrokerProxy *p, GalIO_BrokerDataHandler
fnptr, int poll_ms, void *refptr, void (*free_fn)(void
*))
GalIO_BrokerStruct *GalSS_EnvBrokerProxyObjInInit(GalSS_Environment
*env, Gal_Object proxy, GalIO_BrokerDataHandler fnptr,
int poll_ms, void *refptr, void (*free_fn)(void
*))
Identical to GalIO_EnvBrokerDataInInit,
except that all connection information is stored in the proxy p,
or in the proxy in the object proxy. For details about the types
of the callbacks, etc., see GalIO_EnvBrokerDataInInit.
GalSS_BrokerProxy *GalSS_CreateBrokerProxy(const
char *host, int port, const char *call_id, Gal_ObjectType
object_type, Gal_Object obj_data)
This function creates a broker proxy which represents obj_data
of obj_type being available at host on port, indexed
by the unique call_id. This function is used internally; the
programmer should never need it.
void GalSS_FreeBrokerProxy(GalSS_BrokerProxy
*p)
The broker proxy p is freed. If p contains a Gal_Object
(either by virtue of being an outbound broker wrapped around an object
or an inbound broker which has read its data), the object is freed using
Gal_FreeObject.
GalSS_BrokerProxy *GalSS_CopyBrokerProxy(GalSS_BrokerProxy
*bp)
The broker proxy bp is copied. If bp contains a Gal_Object
(either by virtue of being an outbound broker wrapped around an object
or an inbound broker which has read its data), the object is copied
using Gal_CopyObject.
GalIO_BrokerStruct *GalSS_BrokerProxyBroker(GalSS_BrokerProxy
*bp)
Returns the broker object associated with the broker proxy bp.
This broker can be an inbound or outbound broker. It's not advisable
to manipulate this broker object directly, except to start the broker.
Gal_ObjectType GalSS_BrokerProxyObjectType(GalSS_BrokerProxy
*bp)
Returns the object type associated with the broker proxy bp.
The type can be -1, which means that there are no type restrictions
imposed on the broker proxy.
Gal_Object GalSS_BrokerProxyObject(GalSS_BrokerProxy
*bp)
Returns the object associated with the broker proxy bp.
If you use this object for anything, remember that the broker proxy "owns"
it, so if you intend that it outlast the broker proxy, you should copy
the object using Gal_CopyObject
before you store it anywhere.
GAL_SOCKET GalSS_GetBrokerProxySocket(GalSS_BrokerProxy
*bp)
Returns the socket associated with the inbound broker proxy
bp.
void GalSS_ForceProxyExpiration(GalSS_BrokerProxy
*bp)
Forces the broker associated with the outbound broker proxy
bp to time out. See GalIO_ForceBrokerExpiration.
static Gal_Frame prepare_audio_frame(GalSS_Environment *env,The important steps here are to set up the outgoing broker stream, constructing a well-formed frame for initiating a broker connection, sending the frame to the Hub (in this case, by a call to GalSS_EnvWriteFrame, not shown), writing the appropriate data to the connection, and announcing that no more data will be sent.
char *filename)
{
Gal_Frame f = Gal_MakeFrame("main", GAL_CLAUSE);
int total = 0;
char *buf = (char *) NULL;
GalIO_BrokerStruct *b;
/* In the code omitted here, the data from the named file is
read into buf, and total is the number of bytes in buf */
/* .... */
/* Now that we have the audio, we write the binary data
through the broker. */
b = GalIO_BrokerDataOutInit(GalSS_EnvComm(env), 0, 10);
if (b && (GalIO_GetBrokerListenPort(b) > 0)) {
GalIO_BrokerPopulateFrame(b, f, ":binary_host", ":binary_port");
GalIO_BrokerWriteBinary(b, buf, total);
GalIO_BrokerDataOutDone(b);
}
free(buf);
return f;
}
If you need to delay
broker activation, pass -1 as poll_ms.
void GalIO_BrokerPopulateFrame(GalIO_BrokerStruct
*b, Gal_Frame f, const char *host_key, const
char *port_key)
Populates the frame f with the appropriate broker information,
including the call ID, host and port. The host_key is the key
to store the hostname in the frame, and the port_key is the key
to store the port in. The call ID is stored in the key designated by GAL_BROKER_CALL_ID_FRAME_KEY.
unsigned short GalIO_GetBrokerListenPort(GalIO_BrokerStruct
*b)
Returns the port number the broker b is listening on.
If this value is not 0, the broker has been set up correctly.
char *GalIO_GetBrokerCallID(GalIO_BrokerStruct
*b)
Returns the call ID the broker b is using. You should
not typically need to use this function.
void GalIO_FrameSetBrokerCallID(Gal_Frame
f, const char *call_id)
Sets the unique ID for broker confirmation. You should not typically
need to use this function.
char *GalIO_IPAddress(void)
This function returns the IP address of the host, for use in
creating contact information for the broker connection. You should not
typically need to use this function.
int GalIO_BrokerWriteFrame(GalIO_BrokerStruct
*b, Gal_Frame frame)
int GalIO_BrokerWriteInt(GalIO_BrokerStruct
*b, int i)
int GalIO_BrokerWriteFloat(GalIO_BrokerStruct
*b, float f)
int GalIO_BrokerWriteList(GalIO_BrokerStruct
*b, Gal_Object *elts, int n_elts)
int GalIO_BrokerWriteString(GalIO_BrokerStruct
*b, char *str)
int GalIO_BrokerWriteBinary(GalIO_BrokerStruct
*b, void *data, int n_bytes)
int GalIO_BrokerWriteInt16(GalIO_BrokerStruct
*b, void *data, int n_ints)
int GalIO_BrokerWriteInt32(GalIO_BrokerStruct
*b, void *data, int n_ints)
int GalIO_BrokerWriteInt64(GalIO_BrokerStruct
*b, void *data, int n_ints)
int GalIO_BrokerWriteFloat32(GalIO_BrokerStruct
*b, void *data, int n_floats)
int GalIO_BrokerWriteFloat64(GalIO_BrokerStruct
*b, void *data, int n_floats)
These functions send data through the broker connection. The
encoding for these elements is identical to their encoding within frames
during message transport. These functions correspond in the obvious way
to the Communicator object types.
int GalIO_BrokerWriteObject(GalIO_BrokerStruct
*b, Gal_Object o)
Write a Gal_Object of any type through the broker connection.
The encoding for these elements is identical to their encoding within
frames during message transport.
voidGalIO_ForceBrokerExpiration(GalIO_BrokerStruct
*b)
This function causes the broker to expire immediately. You can
use this in conjunction with GalIO_AddBrokerCallback
to force a broker to expire after it's accepted a certain number of
connections, for instance.
int GalIO_BrokerIsDone(GalIO_BrokerStruct
*b)
Returns 1 if the broker is already marked as done, 0 otherwise.
static Gal_Frame prepare_audio_frame(GalIO_CommStruct *gcomm, char *filename)
{
Gal_Frame f = Gal_MakeFrame("main", GAL_CLAUSE);
FILE *fp;
GalIO_BrokerStruct *b;
OutData *o = (OutData *) calloc(1, sizeof(OutData));
/* In the code omitted here, the file is opened */
/* .... */
/* If we're the sending direction, we read binary data from the file,
relaying it to the broker, until we find EOF. */
b = GalIO_BrokerDataOutInit(gcomm, 0, 10);
if (b && (GalIO_GetBrokerListenPort(b) > 0)) {
GalIO_BrokerPopulateFrame(b, f, ":binary_host", ":binary_port");
o->fp = fp;
o->b = b;
Gal_AddTask(__write_data, (void *) o, 10, 0, NULL);
}
return f;
}
static void __write_data(Gal_TaskPkg *p) {
OutData *o = (OutData *) Gal_TaskPkgData(p);
FILE *fp = o->fp;
GalIO_BrokerStruct *b = o->b;
char *buf = (char *) malloc(BLOCKSIZE * sizeof(char));
int count;
count = fread(buf, sizeof(char), BLOCKSIZE, fp);
if (count) {
GalIO_BrokerWriteBinary(b, buf, count);
}
free(buf);
if (count == BLOCKSIZE) {
/* Not done yet. */
Gal_ReAddTask(p, (void *) o, 10, 0, NULL);
} else {
GalIO_BrokerDataOutDone(b);
fclose(fp);
free(o);
}
}
Gal_Frame receive_audio(Gal_Frame f, void *server_data)When the server receives the receive_audio message, it invokes this dispatch function. If it can find the appropriate host and port set, it sets up a broker receiving connection, makes it active, and exits. This connection takes a callback function and data to be stored in the broker structure for the callback function to use. If the broker is set up appropriately, this function will also set up other callbacks which are invoked when the broker is notified of a broker abort or that data is done (see the documentation on GalIO_AddBrokerCallback and the event-driven programming model in general for further details).
{
DataHandler *d = (DataHandler *) malloc(sizeof(DataHandler));
GalIO_BrokerStruct *b;
char *host = Gal_GetString(f, ":binary_host");
int port = Gal_GetInt(f, ":binary_port");
d->data_buf = (char *) NULL;
d->size = 0;
if (host && port) {
b = GalSS_EnvBrokerDataInInit((GalSS_Environment *) server_data,
host, port, f,
env_audio_handler, 0, d, __FreeDataHandler);
if (b) {
GalIO_AddBrokerCallback(b, GAL_BROKER_ABORT_EVENT,
__report_abort, (void *) NULL);
GalIO_AddBrokerCallback(b, GAL_BROKER_DATA_DONE_EVENT,
__report_done, (void *) NULL);
GalIO_SetBrokerActive(b);
}
} else {
free(d);
}
return (Gal_Frame) NULL;
}
GalIO_BrokerStruct *GalSS_EnvBrokerDataInInit(GalSS_Environment
*env, const char *host, unsigned short port,
Gal_Frame frame, GalIO_BrokerDataHandler fnptr, int poll_ms,
void *refptr, void (*free_fn)(void *))
The call environment env is stored away and made available
to the broker data handler fnptr via the function GalSS_BrokerGetEnvironment.
The host and port are the location of the server to contact.
The frame is the frame which delivered the connection message,
which is forwarded to the other end of the broker connection to verify
that the correct connection is being made. It must contain the :call_id
key. The fnptr is a function of type GalIO_BrokerDataHandler which will be
called when there is data to be read, and the refptr is arbitrary
data which will be stored in the broker structure (for instance, an
object to collect the binary data, as show in the example here) and will
be freed when the broker is destroyed using the free_fn. This
data can be retrieved using the function GalIO_GetBrokerData, as illustrated below.
As usual, poll_ms is the number of milliseconds for the timed task;
0 means 100 ms, > 0 is a value in ms, < 0 means not to set up a
timed task.
If you need to delay
broker activation, pass -1 as poll_ms.
Here's an example of a callback function:Memory management
The data passed to the callback function is not freed by the caller. It is up to the callback function itself to free the data when it's done with it.
static void env_audio_handler(GalIO_BrokerStruct *broker_struct,
void *data, Gal_ObjectType data_type,
int n_samples)
{
DataHandler *d = (DataHandler *) GalIO_GetBrokerData(broker_struct);
switch (data_type) {
case GAL_BINARY:
if (d->data_buf)
d->data_buf = (char *) realloc(d->data_buf, n_samples + d->size);
else
d->data_buf = (char *) malloc(n_samples + d->size);
bcopy(data, d->data_buf + d->size, n_samples);
d->size += n_samples;
free(data);
break;
default:
GalUtil_Warn("Unknown data type %s\n", Gal_ObjectTypeString(data_type));
}
}
When the broker receives a notification that it's done, it calls its GAL_BROKER_DATA_DONE_EVENT callback to finish things up (in the case of this example, the callback writes the audio data to /dev/audio and sends a notification message back to the Hub). Here's an example of a broker event callback, in this case the one that's fired when the broker is notified of an abort:
void __report_abort(GalIO_BrokerStruct *b, void *data)You can see here how the callback uses GalSS_BrokerGetEnvironment to send a message with the appropriate context to the Hub.
{
Gal_Frame f;
GalSS_Environment *env = GalSS_BrokerGetEnvironment(b);
f = Gal_MakeFrame("notify", GAL_CLAUSE);
Gal_SetProp(f, ":notification", Gal_StringObject("Audio aborted."));
GalSS_EnvWriteFrame(env, f, 0);
Gal_FreeFrame(f);
}
void GalIO_SetBrokerActive(GalIO_BrokerStruct
*b)
Enables the broker to process the data handler. Nothing will
happen with the data until this function is called on the broker structure.
This is to allow the user to control when the broker structures are activated
(if, for instance, they play audio output as a side effect).
typedef void (*GalIO_BrokerDataHandler)(GalIO_BrokerStruct
*broker_struct, void *object, Gal_ObjectType object_type,
int object_count)
This is the type of the callback functions. The broker_struct
is the broker structure which has been set up to receive the data.
The object is the data itself. The type is the type of
the object, and the object_count is how many elements of the data
there are (this is interesting mostly in the case of array data sent by
GalIO_BrokerWriteBinary, GalIO_BrokerWriteInt16, etc.).
void *GalIO_GetBrokerData(GalIO_BrokerStruct
*b)
This function retrieves from the broker structure b the
data which was passed in to the refptr argument to GalIO_CommBrokerDataInInit.
void GalIO_SetBrokerData(GalIO_BrokerStruct
*b, void *caller_data, void (*free_fn)(void *))
If you need to update the caller data at any point, this function
will free the old data using the previously set free function and update
the caller data to caller_data and the free function to free_fn.
void GalIO_BrokerDataDone(GalIO_BrokerStruct
*b)
This function should be called on the broker structure if the
client decides it's done before all the data is read.
void GalSS_BrokerSetEnvironment(GalIO_BrokerStruct
*b, GalSS_Environment *env)
Sets the environment associated with the broker b to
env. You should not typically need to use this function.
GalSS_Environment *GalSS_BrokerGetEnvironment(GalIO_BrokerStruct
*b)
Retrieves the environment associated with the broker b.
Gal_Frame GalIO_GetBrokerFrame(GalIO_BrokerStruct
*b)
Returns the internal copy of the frame passed to GalIO_CommBrokerDataInInit. You should
not typically need to use this function.
If you're dealing with multiple incoming broker objects simultaneously, you might wish to use the broker queues. Broker objects can be connected in a doubly-linked list for the purposes of ordering their processing.
GalIO_BrokerStruct *GalIO_BrokerStructQueueAppend(GalIO_BrokerStruct
*b, GalIO_BrokerStruct *bqueue)
Adds the broker structure b to the end of the queue bqueue.
If bqueue is NULL, b is returned, otherwise bqueue
is returned.
GalIO_BrokerStruct *GalIO_BrokerStructQueuePop(GalIO_BrokerStruct
*bqueue)
Removed all finished broker structure from the queue and returns
the broker structure which corresponds to the new head of the active
queue. The broker structure is destroyed.
GalIO_BrokerStruct *GalIO_BrokerStructDequeue(GalIO_BrokerStruct
*b, GalIO_BrokerStruct *bqueue)
Removes a broker structure from the queue and returns the new
queue.
Here's a list of functions which modify the broker or proxy, which would
require you to delay broker activation in the threaded case:
Gal_Frame receive_audio(Gal_Frame f, void *server_data)
{
DataHandler *d = (DataHandler *) malloc(sizeof(DataHandler));
GalIO_BrokerStruct *b;
char *host = Gal_GetString(f, ":binary_host");
int port = Gal_GetInt(f, ":binary_port");
d->data_buf = (char *) NULL;
d->size = 0;
if (host && port) {
b = GalSS_EnvBrokerDataInInit((GalSS_Environment *) server_data,
host, port, f,
env_audio_handler, -1, d, __FreeDataHandler);
if (b) {
GalIO_AddBrokerCallback(b, GAL_BROKER_ABORT_EVENT,
__report_abort, (void *) NULL);
GalIO_AddBrokerCallback(b, GAL_BROKER_DATA_DONE_EVENT,
__report_done, (void *) NULL);
GalIO_SetBrokerActive(b);
GalSS_EnvStartBroker((GalSS_Environment *) server_data, b, 0);
}
} else {
free(d);
}
return (Gal_Frame) NULL;
}
void GalIO_CommStartBroker(GalIO_CommStruct
*gcomm, GalIO_BrokerStruct *b, int poll_ms)
Updates the polling interval for the broker b to poll_ms,
and takes the appropriate actions associated with broker startup for the
connection gcomm to start the polling task.
void GalSS_EnvStartBroker(GalSS_Environment
*env, GalIO_BrokerStruct *b, int poll_ms)
Identical to GalIO_CommStartBroker, except uses the connection stored in
the environment env
GalIO_BrokerStruct *GalIO_BrokerDataInInit(const
char *host, unsigned short port, Gal_Frame frame, GalIO_BrokerDataHandler
fnptr, void *caller_data, int poll_ms)
Like GalIO_CommBrokerDataInInit,
but doesn't allow the programmer to pass in the host connection or
a free function for the data.
void *GalIO_GetBrokerCallerData(GalIO_BrokerStruct
*b)
Identical to GalIO_GetBrokerData
but does not conform to consistent naming conventions.
typedef void (*GalIO_BrokerDataFinalizer)(GalIO_BrokerStruct
*, void *caller_data)
void GalIO_BrokerSetFinalizer(GalIO_BrokerStruct
*b, GalIO_BrokerDataFinalizer finalizer)
Sets a finalizer for the broker structure. This finalizer is
called when the broker structure is destroyed. This can be used, for
instance, to pop a broker queue and activate the next element in the
queue. The finalizer is called with the contents of the refptr argument
passed in to GalIO_BrokerDataInInit. This function has been superseded
by the addition of a free function in GalIO_CommBrokerDataInInit and
by the callback architecture described here.
GalIO_BrokerStruct *GalIO_CommBrokerDataInInit(GalIO_CommStruct
*host_gcomm, const char *host, unsigned short port,
Gal_Frame frame, GalIO_BrokerDataHandler fnptr, int
poll_ms, void *caller_data, void (*caller_data_free_fn)(void
*))
This function provides almost the same functionality as GalSS_EnvBrokerDataInInit, except that
the host_gcomm is the connection which hosts the callback from which
the broker was created; it is provided to support the event-driven programming
model. All other arguments are as they are for GalSS_EnvBrokerDataInInit.
![]() |
License / Documentation home / Help and feedback | ![]() |