voldemort.client.protocol.admin
Class BaseStreamingClient

java.lang.Object
  extended by voldemort.client.protocol.admin.BaseStreamingClient
Direct Known Subclasses:
StreamingClient

public class BaseStreamingClient
extends java.lang.Object

The streaming API allows for send events into voldemort stores in the async fashion. All the partition and replication logic will be taken care of internally. The users is expected to provide two callbacks, one for performing period checkpoints and one for recovering the streaming process from the last checkpoint. NOTE: The API is not thread safe, since if multiple threads use this API we cannot make any guarantees about correctness of the checkpointing mechanism. Right now we expect this to used by a single thread per data source


Field Summary
protected  AdminClient adminClient
           
protected  java.util.List<java.lang.Integer> blackListedNodes
           
protected static int CHECKPOINT_COMMIT_SIZE
           
protected  int entriesProcessed
           
protected  java.util.List<java.lang.Integer> faultyNodes
           
protected static boolean MARKED_BAD
           
protected  java.util.HashMap<Pair<java.lang.String,java.lang.Integer>,java.lang.Boolean> nodeIdStoreInitialized
           
protected  java.util.HashMap<Pair<java.lang.String,java.lang.Integer>,java.io.DataOutputStream> nodeIdStoreToOutputStreamRequest
           
protected  java.util.List<Node> nodesToStream
           
protected  RoutingStrategy routingStrategy
           
protected  java.util.HashMap<java.lang.String,RoutingStrategy> storeToRoutingStrategy
           
protected  SocketPool streamingSocketPool
           
protected  EventThrottler throttler
           
 
Constructor Summary
BaseStreamingClient(StreamingClientConfig config)
           
 
Method Summary
protected  void addStoreToSession(java.lang.String store)
          Add another store destination to an existing streaming session
 void blacklistNode(int nodeId)
          mark a node as blacklisted
protected  void close(java.net.Socket socket)
           
 void closeStreamingSession()
          Close the streaming session Flush all n/w buffers and call the commit callback
 void closeStreamingSession(java.util.concurrent.Callable resetCheckpointCallback)
           
 void closeStreamingSessions()
          Close the streaming session Flush all n/w buffers and call the commit callback
 void closeStreamingSessions(java.util.concurrent.Callable resetCheckpointCallback)
           
 void commitToVoldemort()
          Flush the network buffer and write all entries to the server Wait for an ack from the server This is a blocking call.
protected  void finalize()
           
 AdminClient getAdminClient()
           
 java.util.List<java.lang.Integer> getFaultyNodes()
           
 void initStreamingSession(java.lang.String store, java.util.concurrent.Callable checkpointCallback, java.util.concurrent.Callable recoveryCallback, boolean allowMerge)
           
 void initStreamingSessions(java.util.List<java.lang.String> stores, java.util.concurrent.Callable checkpointCallback, java.util.concurrent.Callable recoveryCallback, boolean allowMerge)
           
 void initStreamingSessions(java.util.List<java.lang.String> stores, java.util.concurrent.Callable checkpointCallback, java.util.concurrent.Callable recoveryCallback, boolean allowMerge, java.util.List<java.lang.Integer> blackListedNodes)
           
 void removeStoreFromSession(java.util.List<java.lang.String> storeNameToRemove)
          Remove a list of stores from the session First commit all entries for these stores and then cleanup resources
 void streamingPut(ByteArray key, Versioned<byte[]> value)
          A Streaming Put call
 void streamingPut(ByteArray key, Versioned<byte[]> value, java.lang.String storeName)
           
 void unmarkBad()
          Reset streaming session by unmarking it as bad
 void updateThrottleLimit(int throttleQPS)
           
 
Methods inherited from class java.lang.Object
clone, equals, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

streamingSocketPool

protected SocketPool streamingSocketPool

routingStrategy

protected RoutingStrategy routingStrategy

CHECKPOINT_COMMIT_SIZE

protected static int CHECKPOINT_COMMIT_SIZE

entriesProcessed

protected int entriesProcessed

MARKED_BAD

protected static boolean MARKED_BAD

throttler

protected EventThrottler throttler

adminClient

protected AdminClient adminClient

storeToRoutingStrategy

protected java.util.HashMap<java.lang.String,RoutingStrategy> storeToRoutingStrategy

nodeIdStoreInitialized

protected java.util.HashMap<Pair<java.lang.String,java.lang.Integer>,java.lang.Boolean> nodeIdStoreInitialized

nodeIdStoreToOutputStreamRequest

protected java.util.HashMap<Pair<java.lang.String,java.lang.Integer>,java.io.DataOutputStream> nodeIdStoreToOutputStreamRequest

nodesToStream

protected java.util.List<Node> nodesToStream

blackListedNodes

protected java.util.List<java.lang.Integer> blackListedNodes

faultyNodes

protected java.util.List<java.lang.Integer> faultyNodes
Constructor Detail

BaseStreamingClient

public BaseStreamingClient(StreamingClientConfig config)
Method Detail

getAdminClient

public AdminClient getAdminClient()

getFaultyNodes

public java.util.List<java.lang.Integer> getFaultyNodes()

updateThrottleLimit

public void updateThrottleLimit(int throttleQPS)

initStreamingSession

public void initStreamingSession(java.lang.String store,
                                 java.util.concurrent.Callable checkpointCallback,
                                 java.util.concurrent.Callable recoveryCallback,
                                 boolean allowMerge)
Parameters:
store - - the name of the store to be streamed to
checkpointCallback - - the callback that allows for the user to record the progress, up to the last event delivered. This callable would be invoked every so often internally.
recoveryCallback - - the callback that allows the user to rewind the upstream to the position recorded by the last complete call on checkpointCallback whenever an exception occurs during the streaming session.
allowMerge - - whether to allow for the streaming event to be merged with online writes. If not, all online writes since the completion of the last streaming session will be lost at the end of the current streaming session.

streamingPut

public void streamingPut(ByteArray key,
                         Versioned<byte[]> value)
A Streaming Put call

Parameters:
key - - The key
value - - The value

closeStreamingSession

public void closeStreamingSession(java.util.concurrent.Callable resetCheckpointCallback)
Parameters:
resetCheckpointCallback - - the callback that allows for the user to clean up the checkpoint at the end of the streaming session so a new session could, if necessary, start from 0 position.

closeStreamingSession

public void closeStreamingSession()
Close the streaming session Flush all n/w buffers and call the commit callback


close

protected void close(java.net.Socket socket)

finalize

protected void finalize()
Overrides:
finalize in class java.lang.Object

initStreamingSessions

public void initStreamingSessions(java.util.List<java.lang.String> stores,
                                  java.util.concurrent.Callable checkpointCallback,
                                  java.util.concurrent.Callable recoveryCallback,
                                  boolean allowMerge)
Parameters:
stores - - the list of name of the stores to be streamed to
checkpointCallback - - the callback that allows for the user to record the progress, up to the last event delivered. This callable would be invoked every so often internally.
recoveryCallback - - the callback that allows the user to rewind the upstream to the position recorded by the last complete call on checkpointCallback whenever an exception occurs during the streaming session.
allowMerge - - whether to allow for the streaming event to be merged with online writes. If not, all online writes since the completion of the last streaming session will be lost at the end of the current streaming session.

initStreamingSessions

public void initStreamingSessions(java.util.List<java.lang.String> stores,
                                  java.util.concurrent.Callable checkpointCallback,
                                  java.util.concurrent.Callable recoveryCallback,
                                  boolean allowMerge,
                                  java.util.List<java.lang.Integer> blackListedNodes)
Parameters:
stores - - the list of name of the stores to be streamed to
checkpointCallback - - the callback that allows for the user to record the progress, up to the last event delivered. This callable would be invoked every so often internally.
recoveryCallback - - the callback that allows the user to rewind the upstream to the position recorded by the last complete call on checkpointCallback whenever an exception occurs during the streaming session.
allowMerge - - whether to allow for the streaming event to be merged with online writes. If not, all online writes since the completion of the last streaming session will be lost at the end of the current streaming session.
blackListedNodes - - the list of Nodes not to stream to; we can probably recover them later from the replicas

addStoreToSession

protected void addStoreToSession(java.lang.String store)
Add another store destination to an existing streaming session

Parameters:
store - the name of the store to stream to

removeStoreFromSession

public void removeStoreFromSession(java.util.List<java.lang.String> storeNameToRemove)
Remove a list of stores from the session First commit all entries for these stores and then cleanup resources

Parameters:
storeNameToRemove - List of stores to be removed from the current streaming session

streamingPut

public void streamingPut(ByteArray key,
                         Versioned<byte[]> value,
                         java.lang.String storeName)
Parameters:
key - - The key
value - - The value
storeName - takes an additional store name as a parameter If a store is added mid way through a streaming session we do not play catchup and entries that were processed earlier during the session will not be applied for the store.

commitToVoldemort

public void commitToVoldemort()
Flush the network buffer and write all entries to the server Wait for an ack from the server This is a blocking call. It is invoked on every Commit batch size of entries It is also called on the close session call


unmarkBad

public void unmarkBad()
Reset streaming session by unmarking it as bad


blacklistNode

public void blacklistNode(int nodeId)
mark a node as blacklisted

Parameters:
nodeId - Integer node id of the node to be balcklisted

closeStreamingSessions

public void closeStreamingSessions(java.util.concurrent.Callable resetCheckpointCallback)
Parameters:
resetCheckpointCallback - - the callback that allows for the user to clean up the checkpoint at the end of the streaming session so a new session could, if necessary, start from 0 position.

closeStreamingSessions

public void closeStreamingSessions()
Close the streaming session Flush all n/w buffers and call the commit callback



Jay Kreps, Roshan Sumbaly, Alex Feinberg, Bhupesh Bansal, Lei Gao, Chinmay Soman, Vinoth Chandar, Zhongjie Wu