org.edg.info
Class ContinuousConsumerInstance

java.lang.Object
  extended byorg.edg.info.InstanceBase
      extended byorg.edg.info.ConsumerInstance
          extended byorg.edg.info.ContinuousConsumerInstance

public class ContinuousConsumerInstance
extends ConsumerInstance

ContinuousConsumerInstances only ever need to contact one publisher in order to answer a query. The registry will return a list of alternative publishers that the OTCI can use, and the instance will try to identify the one that is "closest". If it fails to contact the closest publisher when the query is run, then the next closest will be used.


Field Summary
 
Fields inherited from class org.edg.info.InstanceBase
DEFAULT_TERMINATION_INTERVAL_MS, s_maxTerminationInterval, s_minTerminationInterval
 
Constructor Summary
ContinuousConsumerInstance(java.lang.String selectStmt, java.net.URL servletURL, int connectionId, long terminationInterval, int cFlags, StreamingServer streamingServer, TimeoutThread timeoutThread, org.glite.rgma.services.config.ConsumerConfig consumerConfig)
          Constructs a ContinuousConsumerInstance.
 
Method Summary
 void addProducer(org.edg.info.system.ServletConnection connection)
          Adds the Producer URL to the Consumer List and starts executing it necessary.
 void destroy()
          XXX-AC: note, this is called from a syncrhonized block in the instanceTracker.
 void flushQueue(java.lang.String queueName, java.lang.String host)
          DOCUMENT ME!
 Message getCurrentMessageInFastQueue()
           
 Message getCurrentMessageInSlowQueue()
           
 Message[] getMessagesInFastQueue()
           
 Message[] getMessagesInSlowQueue()
           
 org.glite.rgma.system.ProducerTableEntryList getProducerTableEntryList()
           
 boolean isExecuting()
          DOCUMENT ME!
 void removeProducer(org.edg.info.system.ServletConnection sc)
          removeProducer will remove the producer from connections and halt any messages that are still being sent to it.
 void setProducerConnections(java.util.Vector servletConnections)
          DOCUMENT ME!
 boolean updateRegistry(long currentTime, boolean registryDown)
          DOCUMENT ME!
 
Methods inherited from class org.edg.info.ConsumerInstance
abort, canPop, count, getClientHostName, getLastPopTimeMillis, getNumTuplesInStore, getQuery, getQueryProperties, getTableNames, hasAborted, incCompletedExecutes, pop, pop, setClientHostName, start, start
 
Methods inherited from class org.edg.info.InstanceBase
canDestroy, disconnect, getCreationTimeMillis, getLastRegistryUpdateTimeMillis, getRegistryUpdateIntervalMillis, getResourceId, getStatus, getTerminationInterval, getTerminationIntervalMillis, getUserLastContactTimeMillis, isDestroyed, isTupleCheckingEnabled, notifyRegistrationThread, reconnect, setCloseCalled, setDestroyed, setRegistrationThread, setTerminationInterval, showSignOfLife, updateLastContactTime, wasCloseCalled
 
Methods inherited from class java.lang.Object
equals, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Constructor Detail

ContinuousConsumerInstance

public ContinuousConsumerInstance(java.lang.String selectStmt,
                                  java.net.URL servletURL,
                                  int connectionId,
                                  long terminationInterval,
                                  int cFlags,
                                  StreamingServer streamingServer,
                                  TimeoutThread timeoutThread,
                                  org.glite.rgma.services.config.ConsumerConfig consumerConfig)
                           throws org.glite.rgma.system.RGMAInternalException,
                                  org.glite.rgma.system.RGMAUserException
Constructs a ContinuousConsumerInstance.

Parameters:
selectStmt - SQL SELECT statement.
servletURL - The servlet's URL.
connectionId - Consumer connection ID.
terminationInterval - Termination interval.
cFlags - Consumer flags (CONTINUOUS, OLD etc)
streamingServer - Reference to the StreamingServer.
timeoutThread - Thread used for start(long).
consumerConfig - TODO
Throws:
org.glite.rgma.system.RGMAException - Thrown by init()
org.glite.rgma.system.RGMAInternalException
org.glite.rgma.system.RGMAUserException
Method Detail

isExecuting

public boolean isExecuting()
                    throws org.glite.rgma.system.RGMAUserException
DOCUMENT ME!

Specified by:
isExecuting in class ConsumerInstance
Returns:
DOCUMENT ME!
Throws:
org.glite.rgma.system.RGMAUserException - DOCUMENT ME!

addProducer

public void addProducer(org.edg.info.system.ServletConnection connection)
Adds the Producer URL to the Consumer List and starts executing it necessary.
XXX - RB: Things to watch out for: The start() and addProducer() are both synchronized methods. The start() method will not return until all its streaming threads have been started. (XXX-AC: not any more!) Each streaming thread will contact the StreamProducer. The StreamProducer will contact the Consumer when it knows about a new Producer. So adding any locks on the StreamProducer may cause a deadlock.

Parameters:
connection - the SerlvetConnection Object of the Producer.
Throws:
org.glite.rgma.system.RGMAException - if this Instance Object has been closed.

destroy

public void destroy()
Description copied from class: ConsumerInstance
XXX-AC: note, this is called from a syncrhonized block in the instanceTracker.

Overrides:
destroy in class ConsumerInstance
See Also:
org.edg.info.InstanceBase#destroy()

removeProducer

public void removeProducer(org.edg.info.system.ServletConnection sc)
removeProducer will remove the producer from connections and halt any messages that are still being sent to it.

XXX-AC: any synchronization needed? I don't think so.

Parameters:
sc - Servlet connection

flushQueue

public void flushQueue(java.lang.String queueName,
                       java.lang.String host)
DOCUMENT ME!

Specified by:
flushQueue in class ConsumerInstance
Parameters:
queueName - DOCUMENT ME!
host - DOCUMENT ME!

setProducerConnections

public void setProducerConnections(java.util.Vector servletConnections)
DOCUMENT ME!

Specified by:
setProducerConnections in class ConsumerInstance
Parameters:
servletConnections - DOCUMENT ME!

updateRegistry

public boolean updateRegistry(long currentTime,
                              boolean registryDown)
                       throws org.glite.rgma.system.RGMAException,
                              org.glite.rgma.system.RemoteException
DOCUMENT ME!

Overrides:
updateRegistry in class InstanceBase
Parameters:
currentTime - DOCUMENT ME!
registryDown - DOCUMENT ME!
Returns:
DOCUMENT ME!
Throws:
org.glite.rgma.system.RGMAException - DOCUMENT ME!
org.glite.rgma.system.RemoteException - DOCUMENT ME!

getProducerTableEntryList

public org.glite.rgma.system.ProducerTableEntryList getProducerTableEntryList()
Overrides:
getProducerTableEntryList in class ConsumerInstance
Returns:
A list of producers the consumer is currently using.

getMessagesInFastQueue

public Message[] getMessagesInFastQueue()
Overrides:
getMessagesInFastQueue in class ConsumerInstance
Returns:
A list of messages stored in the fast message queue.

getMessagesInSlowQueue

public Message[] getMessagesInSlowQueue()
Overrides:
getMessagesInSlowQueue in class ConsumerInstance
Returns:
A list of messages stored in the slow message queue.

getCurrentMessageInFastQueue

public Message getCurrentMessageInFastQueue()
Overrides:
getCurrentMessageInFastQueue in class ConsumerInstance
Returns:
The current message being sent in the fast queue. Returns null if no message exists.

getCurrentMessageInSlowQueue

public Message getCurrentMessageInSlowQueue()
Overrides:
getCurrentMessageInSlowQueue in class ConsumerInstance
Returns:
The current message being sent in the slow quue. Returns null if no message exists.