Sunday 16 February 2014

Parallel Processing In Sython

Pythons are water creatures (or is that Anacondas?) Oh well - here is a random picture of a river.
CPython is not great at multi-threading - but Jython on the other hand is BRILLIANT at multi-threading.

This all goes back to Sonic Field which before porting to Jython used a command called Do. This took a closure, executed it and forwarded a future. A Do task would execute in parallel to the rest of the program until such time as it's result was required. By making retrieving the result from a future part of the dereferencing of variables, the Do semantics approach was dead easy to use.
  1. Define a task just like any other piece of code, just wrap it in a closure
  2. Pass the closure to the executer
  3. Return a future
  4. Automatically wait for the future to return its result when we need it, not when we created it
Well, pretty much the same thing can be done in Jython using the standard Executor/Future features of the JDK.  Here is the guts of the required Python code (which heavily borrows form the Jython docs - thanks).

import threading
import time
from java.util.concurrent import Executors, TimeUnit
from java.util.concurrent import Callable

SF_MAX_CONCURRENT = 16
SF_POOL = Executors.newFixedThreadPool(SF_MAX_CONCURRENT)

class sf_callable(Callable):
    def __init__(self,toDo):
        self.toDo=toDo
        
    def call(self):
        return self.toDo()

def sf_do(toDo):
    task=sf_callable(toDo)
    return SF_POOL.submit(task)

from java.util.concurrent import TimeUnit

def shutdown_and_await_termination(pool, timeout):
    pool.shutdown()
    try:
        if not pool.awaitTermination(timeout, TimeUnit.SECONDS):
            pool.shutdownNow()
            if (not pool.awaitTermination(timeout, TimeUnit.SECONDS)):
                print >> sys.stderr, "Pool did not terminate"
    except InterruptedException, ex:
        # (Re-)Cancel if current thread also interrupted
        pool.shutdownNow()
        # Preserve interrupt status
        Thread.currentThread().interrupt()
        
def shutdownConcurrnt():
    shutdown_and_await_termination(SF_POOL, 5)

The key is that all functions in Jython are actually closures which can be executed. To take advantage of this we wrap the closure in a Callable object. If we just execute the closure directly it is interpreted as a Runnable and the .get of the Future returns null. Wrapping in Callable solves this.

class sf_callable(Callable):
    def __init__(self,toDo):
        self.toDo=toDo
        
    def call(self):
        return self.toDo()

Now we make a simple function which does the wrapping and creates the future:

def sf_do(toDo):
    task=sf_callable(toDo)
    return SF_POOL.submit(task)

sf_do will return a Future object. To get the result of the task being executed we need to call .get() on the Future. That can be done by hand in Python. For the Sonic Field code, all objects coming in from Python into the Java audio processor code go through a specialise casting class. This simply check to Future objects and calls .get() on them. Thus, the conversion from Futures to their returned values is transparent. This is also the place that the transparent swapping in of swapped out audio signals happens.

    public static Object checkAutoTranslation(Object o) throws SFPL_RuntimeException
    {
        if (o instanceof SFMemoryManager) try
        {
            return ((SFMemoryManager) o).readInObject();
        }
        catch (ClassNotFoundException | IOException e)
        {
            throw new SFPL_RuntimeException(Messages.getString("Caster.6"), e); //$NON-NLS-1$
        }

        if (o instanceof FutureTask)
        {
            FutureTask doer = (FutureTask) o;
            try
            {
                Object d = doer.get();
                return checkAutoTranslation(d);
            }
            catch (Throwable t)
            {
                throw new SFPL_RuntimeException(t);
            }
        }

        if (o == null)
        {
            throw new SFPL_RuntimeException(Messages.getString("Caster.12"));             //$NON-NLS-1$
        }

        return o;
    }

So code can pass around values which might be signals, futures which produce signals, swapped out signals or even futures which will produce swapped out signals and all the necessary dereferencing happens transparently. OK - so how do we use this technique? Here is an example Sython script:

import random

execfile("patches/python/concurrent.py")

def fixSize(signal):
    mag=sf.MaxValue(signal)
    return sf.NumericVolume(signal,1.0/mag)
 
def fixSizeSat(signal):
     return fixSize(sf.Saturate(fixSize(signal)))
    
def saturatedNode(pitch,a,d,s,r,v):
    def saturateNode_():
        l=a+d+s+r
        signal1=sf.SineWave(l,pitch)
        signal2=sf.SineWave(l,2*pitch*1.003)
        signal3=sf.SineWave(l,3*pitch*1.005)
        envelope= sf.NumericShape(
                 (0,0),
                 (a,1),
                 (a+d,0.75),
                 (a+d+s,0.25),
                 (a+d+s+r,0)
        )
        sat=(20-pitch/1000)
        if sat<1:
            sat=1
            
        def doSat(sigIn):
            temp=sf.NumericVolume(sf.Multiply(sigIn,envelope),sat)
            return sf.Normalise(sf.Clean(sf.Saturate(temp)))
        
        signal=sf.Mix(
            doSat(signal1),
            sf.DB_6(doSat(signal2)),
            sf.DB_15(doSat(signal3))
        )
        signal=fixSize(signal)
        hf=sf.Clip(sf.NumericVolume(signal,3))
    
        r1=fixSizeSat(sf.RBJPeaking(hf,pitch*1.3,0.5,75))
        r2=fixSizeSat(sf.RBJPeaking(hf,pitch*2.1,0.5,75))
        r3=fixSizeSat(sf.RBJPeaking(hf,pitch*2.9,0.5,75))
    
        signal=sf.Mix(
            sf.DB_6(signal),
            sf.DB_1(r1),
            sf.DB_4(r2),
            sf.DB_6(r3)
        )
        signal=sf.Clean(sf.NumericVolume(signal,v))
        envelope= sf.NumericShape(
                 (0,1),
                 (a+d+s+r-125,1),
                 (a+d+s+r,0)
        )
        print "Returing from do task"   
        return sf.Multiply(envelope,signal)
    return sf_do(saturateNode_)

all=[sf.Silence(100)]

for x in range(12, 32):
    signal=saturatedNode(math.pow(2,x*0.325),125,500,1000,2500,1)
    print "All:" + all.__str__() + " , " + "Signal: " + signal.__str__()
    all.append(signal)

all=sf.Concatenate(all)
random.seed(0.128)
all=sf.Normalise(all)
sf.WriteFile32((all,all),"temp/temp.wav")
shutdownConcurrnt()
Let's have a look at the definition of saturateNode
def saturatedNode(pitch,a,d,s,r,v):
    def saturateNode_():

I create a closure immediately which closes around the contents of the function. This means the closure can do exactly what the function would do with exactly all the same names; however, it can do it anywhere in time or space. We have taken the function and detached its work into a separate, self contained task.

return sf.Multiply(envelope,signal)
    return sf_do(saturateNode_)

At the return end of the function, I return from the closure as though it were the outer function and return sf_do(<>) from the outer function. This makes writing code to be executed asynchronously near effortless (just as it was in SFPL) but still using standard Python code. Warnings! There is a problem. Python has a floppy approach to closure. It does not quite close around values and it does not quite close around references and it certainly does not let you choose which. Like most things in Python, it does not seem possible to properly declare your intension or express it in the code, you just have to hack around it. No this case we need to realise that Python closes around names.

So, if we do anything which would cause the setting of a local variable, the variable will be reified.
n=4 def y():
    n=5
z=y
z()
print n

 z has the value of the closure y. We execute it and set n. In this case the result is 4 because n has been reified to a local variable. If we just dereference the variable and update the thing it references then we are accessing the external scope:

n=[]
def y():
    n.append("dog")
z=y
z()
print n[0]

This will yield dog because we are not creating a local variable it is accessing from the outer scope. This means we MUST NOT DO THIS in closures used for tasks. Because Python has no way of enforcing stuff at compile time (no compile time) we just need to be careful to reify all local variables and be patient if we get bugs by forgetting to do so :(

No comments:

Post a Comment