Group Commit

From Blazegraph
Jump to: navigation, search

Starting with 1.5.1, BlazeGraph supports task-oriented concurrent writers. This support is based on the pre-existing support for task-based concurrency control in BlazeGraph. Those mechanisms were previously used only in the scale-out architecture, but they are now incorporated into the REST API and can even be used by aware embedded applications.

This is a beta feature in 1.5.1 — make backups!

Benefits

There are two primary benefits from group commit.

First, you can have multiple tenants in the same database instance and the updates for one tenant will no longer block the updates for the other tenants. Thus, one tenant can be safely running an extensive update and the other tenants can still enjoy low latency updates.

Second, group commit automatically combines a sequence of updates on one (or more) tenant(s) into a single commit point on the disk. This provides higher potential throughput. It also means that it is no longer as important for applications to batch their updates since group commit will automatically perform some batching.

Enabling Group Commit

Early users are encouraged to enable this using the following symbolic property. While the Journal has always supported group commit at the AbstractTask layer, we have added support for hierarchical locking, and modified the REST API to use group commit when this feature is enabled.

# Note: Default is false.
com.bigdata.journal.Journal.groupCommit=true

Note: When enabling group commit for HA (or Standalone), you must also mind the group commit code guidelines when writing a StoredQuery.

HA

The HAJournal.config file includes logic to conditionally enable group commit. Just specify -DgroupCommit=true on the JVM command line.

     new NV(com.bigdata.journal.Journal.Options.GROUP_COMMIT,System.getProperty("groupCommit","false")),

REST API

If you are using the REST API, then that is all you need to do. Group commit will automatically be enabled. This can even be done with an existing Journal since there are no differences in the manner in which the data is stored on the disk.

Embedded Applications and Group Commit

If you are using the internal APIs (Sail, AbstractTripleStore, stored queries, etc.) then you need to understand what is happening when group commit is enabled and make a slight change to your code.

  • When you set this property to true, you are asserting that your application will submit all tasks for evaluation to the IConcurrencyManager associated with the Journal, and you are agreeing to let the database decide when it will perform a commit.
  • When you set this property to false (the default), you are asserting that your application will control when the database performs a commit. This is how embedded application has been written historically.
  • Any mutation operations must use the following incantation. This incantation will submit a task that obtains the necessary locks and the task will then run. If the task exits normally (versus by throwing an exception) then it will join the next commit group. The Future.get() call will return either when the task fails or when its write set has been melded into a commit point.
AbstractApiTask.submitApiTask(IIndexManager indexManager, IApiTask task).get();

There are a few “gotchas” with the group commit support. This is because commits are decided by IApiTask completion and tasks are scheduled by the concurrency manager, lock manager, and write executor service.

  • Mutation tasks that do not complete normally MUST throw an exception!
  • Applications MUST NOT call Journal.commit(). Instead, they submit an IApiTask using AbstractApiTask.submit(). The database will meld the write set of the task into a group commit sometime after the task successfully completed.
  • Servlets exposing mutation methods MUST NOT flush the response inside of their AbstractRestApiTask. This is because ServletOutputStream.flush() is interpreted as it commits the http response to the client. As soon as this is done the client is unblocked and may issue new operations under the assumption that the data has been committed. However, the ACID commit point for the task is *after* it terminates normally. Thus the servlet must flush the response only after the task is done executing and NOT within the task body. The BigdataServlet.submitApiTask() method handles this for you so your code looks like this:
// Example of task execution from within a BigdataServlet
try {
    submitApiTask(new MyTask(req, resp, namespace, timestamp,...)).get();
} catch (Throwable t) {
    launderThrowable(t, resp, ...);
}
  • BigdataSailConnection.commit() no longer causes the database to go through a commit point. You MUST still call conn.commit(). It will still flush out the assertion buffers (for asserted and retracted statements) to the indices, which is necessary for your writes to become visible. When you task ends and the indices go through a checkpoint, it does not actually trigger a commit. Thus, in order to use group commit, you must obtain your connection from within an IApiTask, invoke conn.commit() if things are successful, otherwise throw an exception. The following template shows what this looks like.
// Example of a concurrent writer task using group commit APIs.
public class MyWriteTask extends AbstractApiTask {
    public Void call() throws Exception {
	BigdataSailRepositoryConnection conn = null;
	boolean success = false;
	try {
	    conn = getConnection();
	    // WRITE ON THE CONNECTION
	    conn.commit(); // Commit the mutation.
	    success = true;
	    return (Void) null;
	} finally {
	    if (conn != null) {
		if (!success)
		    conn.rollback();
		conn.close();
	    }
	}
    }
}

Creating a Namespace

Historically applications would create a new namespace using:

Properties rws = ...
Journal journal = new Journal( rws );
rws.setProperty( BigdataSail.Options.NAMESPACE, "kb" );
LocalTripleStore triples = BigdataSail.createLTS( journal, rws );

With group commit, the proper incantation is now

// create/re-open the journal.
final Properties rws = ...
final Journal journal = new Journal( rws );
// create the namespace.
final Properties kbprops = new Properties(); // Note: will inherit property defaults from the Journal.
kbprops.setProperty( BigdataSail.Options.NAMESPACE, "kb" ); // set the name of the namespace.
AbstractApiTask.submitApiTask(journal, new CreateKBTask(namespace, kbprops)).get(); // submit and await task to create the namespace.

The incantation for finding the created triple/quad store is:

final long timestamp = ITx.UNISOLATED; // for the unisolated view or a txId for a read-only or read/write tx.
final AbstractTripleStore tripleStore = (AbstractTripleStore) indexManager.getResourceLocator().locate(namespace, timestamp);

You can then wrap this as a BigdataSail:

BigdataSail sail = new BigdataSail(tripleStore);

Destroying a Namespace

The new incantation to destroy a namespace is:

AbstractApiTask.submitApiTask(indexManager, new DestroyKBTask(namespace)).get();

How it works

The group commit mechanisms are based on hierarchical locking and pre-declared locks. Tasks will pre-declare their locks. The lock manager orders the lock requests to avoid deadlocks. Once a task owns its locks, it is then executed by the WriteExecutorService. Lock in AbstractTask is responsible for isolating its index views, checkpointing the modified indices after the task has finished its work, and handshaking with the WriteExecutorService around group commits.

Most tasks just need to declare the namespace on which they want to operate. This will automatically obtain a lock for all indices in that namespace. Some special kinds of tasks (those that create and destroy namespaces) must also obtain a lock on the global row store (aka the GRS). This is an internal key-value store where BlazeGraph stores the namespace declarations.