Software Transactional Memory

>>> from md.stm import *

Optimistic Concurrency

This is a simple example that uses transactional memory to implement a serial datatype.

>>> import time, threading

>>> class serial(cursor):
...     def __init__(self, value=0):
...         self.value = value
...
...     def inc(self):
...         self.value += 1
...         return self.value

Concurrent operations on a shared serial instance are simulated by forcing a thread to become inactive after acquiring the next value of a sequence, but before committing its transaction.

The worker1 thread will acquire the value 1 and become inactive. In the meantime, worker2 acquires the values 1 and 2 and commits its transaction. When worker1 reactivates, it also gets the value 2, but when it tries to commit, a CannotCommit exception is raised because worker2 already committed new state. This forces the transaction in worker1 to be re-run. The second time, it succeeds and acquires the values 3 and 4.

>>> def next2(ser, sleep=None):
...     first = ser.inc()
...     if sleep is not None:
...         time.sleep(sleep)
...     return (first, ser.inc())

>>> def show(label, work):
...     print label, work

>>> def worker1():
...     show('worker1', transactionally(next2, count, sleep=0.1))

>>> def worker2():
...     show('worker2', transactionally(next2, count))

>>> with transaction():
...     count = serial()

>>> t1 = threading.Thread(target=worker1)
>>> t2 = threading.Thread(target=worker2)
>>> t1.start(); t2.start(); t1.join()
worker2 (1, 2)
worker1 (3, 4)

Custom Cursor

Extending stm.cursor is easiest, but it is possible to make an independent stm.Cursor type from scratch. This example defines a transactional MutableMapping implementation called tmap.

>>> from collections import MutableMapping

>>> class tmap(Cursor, MutableMapping):
...
...    def __new__(cls, *args, **kwargs):
...        return allocate(object.__new__(cls), {})
...
...    def __init__(self, items=(), **kwargs):
...        self.update(items, **kwargs)
...
...    def __repr__(self):
...        return '<%s %r>' % (type(self).__name__, sorted(self.iteritems()))
...
...    def __iter__(self):
...        return iter(readable(self))
...
...    def __len__(self):
...        return len(readable(self))
...
...    def __contains__(self, key):
...        return key in readable(self)
...
...    def __getitem__(self, key):
...        return readable(self)[key]
...
...    def __setitem__(self, key, value):
...        writable(self)[key] = value
...
...    def __delitem__(self, key):
...        del writable(self)[key]

>>> with transaction():
...     t1 = save(tmap(a=1, b=2))
...     with transaction():
...         t1.update(a=20, c=3)
...         print rollback(t1), '(rollback)'
...         t1['d'] = 4
<tmap [('a', 1), ('b', 2)]> (rollback)

>>> t1
<tmap [('a', 1), ('b', 2), ('d', 4)]>

shelf – Persistent Transactional Memory

This is a more advanced example. cursor is extended to have a persistent identity and memory is extended to fetch and store persistent cursors from a shelf.

Basic Use

Here is a basic example. Transactional memory is initialized with a shelf instead of the default (memory).

>>> import gc
>>> from docs.examples.stm_shelf import *

>>> def collect():
...     gc.collect()

>>> initialize(shelf('/tmp/stm.db'))

A pcursor works like a cursor and optionally takes a keyword argument, __id__, that will become the new cursor’s persistent id. If __id__ is not given, a unique persistent id will be generated for the new object.

The operation fetch() can be used to retrive the cursor associated with a persistent id. The pid() operator returns the persistent id associated with a cursor.

>>> with transaction():
...     foo = pcursor(__id__='foo')
...     foo.a = pdict(value='A')
>>> pid(foo)
'foo'

>>> del foo; collect()
>>> foo = fetch('foo'); foo.a
pdict([('value', 'A')])

Unsaved or uncommitted changes are not written to the backing store.

>>> with transaction(autosave=False):
...     foo.a['value'] = 'changed'

>>> del foo; collect()
>>> fetch('foo').a
pdict([('value', 'A')])

Normal cursors and persistent cursors can be mixed. When normal cursors or plain data are used, they are persisted as “part of” the persistent cursor rather than being given a unique persistent identity.

>>> with transaction():
...     bar = pcursor(__id__='bar')
...     bar.b = tdict(value=tlist(['B']))

>>> del bar; collect()
>>> bar = fetch('bar'); bar.b
tdict([('value', tlist(['B']))])

>>> with transaction():
...     bar.b['value'].append('B2')
>>> bar.b
tdict([('value', tlist(['B', 'B2']))])

>>> with transaction():
...     bar.b['value'].append('B3')
...     print rollback(bar.b['value']), '(rolled back)'
...     bar.b['mumble'] = 'quux'
tlist(['B', 'B2']) (rolled back)
>>> sorted(bar.b.items())
[('mumble', 'quux'), ('value', tlist(['B', 'B2']))]

Implementation

This rudimentary implementation is mainly concerned with tracking persistent ids.

An interesting feature is the way pcursor dereferencing is handled. If one pcursor refers to another, a reference is stored in the shelf. When the outer pcursor is loaded, the reference it contains is dereferenced on demand rather than at load time. Weak references are used to track cursors so they can be automatically collected when they are done being used. If pcursor A refers to pcursor B, B can be collected once it’s no longer being directly referenced in code even if A remains strongly referenced. Since A contains a lazy reference to B rather than a strong reference, B can be re-loaded on demand.

Transaction journals use strong references, so anything fetched from the backing store will not be released during the scope of the transaction. This makes transactions a sort of temporary object cache; lazy references will only be loaded the first time they are used in a transaction.

from __future__ import absolute_import
import os, shelve, uuid, weakref, copy
from collections import Iterator
from md import stm
from md.stm.transaction import current_memory
from md.stm.journal import alloc, copy_state, is_deleted

__all__ = (
    'pid', 'pcursor', 'pdict', 'plist', 'pset',
    'fetch', 'shelf', 'current_memory'
)


### Persistent Cursor

def pid(cursor):
    return cursor.__pid__

class PCursor(object):
    __slots__ = ()

    def __new__(cls, *args, **kwargs):
	return allocated(cls, cls.StateType(), kwargs.get('__id__'))

    def __copy__(self):
	return allocated(type(self), copy_state(readable(self)))

    def __reduce__(self):
	return (delayed, (type(self), pid(self)))

    __id__ = property(pid)

def persist(name, base):
    """Create a persistent type from any transactional type."""

    def __init__(self, *args, **kwargs):
	kwargs.pop('__id__', None)
	base.__init__(self, *args, **kwargs)

    return type(name, (PCursor, base), dict(
	    __slots__ = ('__pid__', ),
	    __module__ = __name__,
	    __init__ = __init__,
    ))

pcursor = persist('pcursor', stm.cursor)
pdict = persist('pdict', stm.tdict)
plist = persist('plist', stm.tlist)
pset = persist('pset', stm.tset)

def allocated(cls, state, id=None):
    return stm.allocate(set_pid(object.__new__(cls), id), state)

def set_pid(cursor, id=None):
    object.__setattr__(cursor, '__pid__', id or uuid.uuid4().hex)
    return cursor


### Persistent Memory

def delayed(cls, id):
    return current_memory().delayed(cls, id)

def fetch(id):
    return current_memory().fetch(id)

class shelf(stm.memory):
    def __init__(self, path, check_read=True, check_write=True):
	super(shelf, self).__init__(path, check_read, check_write)
	self.pcursors = weakref.WeakValueDictionary()
	self.store = None
	self.open()

    def open(self):
	if self.store is None:
	    self.store = shelve.open(self.name, protocol=-1)

    def close(self):
	if self.store is not None:
	    self.store.close()
	    self.store = None

    def destroy(self):
	self.close()
	os.unlink(self.name)

    def clear(self):
	if self.store:
	    self.store.clear(); self.store.sync()
	    self.mem.clear()
	    self.pcursors.clear()

    def read_saved(self, cursor):
	try:
	    return super(shelf, self).read_saved(cursor)
	except KeyError:
	    return load_state(self, cursor)

    def write_changes(self, nested, changed):
	if isinstance(changed, Iterator):
	    changed = list(changed)
	super(shelf, self).write_changes(nested, changed)
	self.store_changes(nested, changed)

    def store_changes(self, nested, changed):
	## Store (cls, state) tuples rather than directly pickling a
	## cursor.  Pickling it would only make a lazy reference.
	for (cursor, state) in changed:
	    if isinstance(cursor, PCursor):
		key = verify_pid(self, cursor)
		if is_deleted(state):
		    del self.store[key]
		else:
		    self.store[key] = (type(cursor), state)
	self.store.sync()

    def delayed(self, cls, id):
	"""Return a persistent cursor, but do not load its state."""
	with self.write_lock:
	    try:
		return self.pcursors[id]
	    except KeyError:
		return identify(self, cls, id)

    def fetch(self, id):
	"""Return a persistent cursor; make sure its state is
	loaded."""
	with self.write_lock:
	    try:
		## At least a lazy reference exists.  State is
		## unknown.
		cursor = self.pcursors[id]
		state = None
	    except KeyError:
		## No reference of any kind exists.  Make one.
		(cls, state) = self.store[id]
		cursor = identify(self, cls, id)

	    try:
		## State is already loaded for this cursor.  Use
		## super() here to maybe avoid loading state twice.
		super(shelf, self).read_saved(cursor)
	    except KeyError:
		## No state is loaded.  Load it (possibly using state
		## already retrived from the store).
		load_state(self, cursor, state)

	return cursor

def identify(memory, cls, id):
    """Associate a new cursor with a persistent id."""
    memory.pcursors[id] = cursor = object.__new__(cls)
    return set_pid(cursor, id)

def verify_pid(memory, cursor):
    """Verify that the cursor is identical to the one associated with
    its persistent id in memory.  This is a sanity check."""
    key = pid(cursor)
    memory.pcursors.setdefault(key, cursor)
    if memory.pcursors[key] is not cursor:
	raise ValueError(
	    'This persistent id is already associated with another cursor',
	    id, cursor
	)
    return key

def load_state(memory, cursor, state=None):
    """Allocate the state for the cursor, loading it from the backing
    store if necessary."""
    with memory.write_lock:
	if state is None:
	    (cls, state) = memory.store[pid(cursor)]
	alloc(memory, cursor, state)
	return state

Table Of Contents

Previous topic

Fluid Bindings

This Page