* Merge -r 8778:8790 from branches/refactor_transactor.

This commit is contained in:
hns 2008-02-08 15:58:14 +00:00
parent a44af35c59
commit 58b7de53c4
12 changed files with 393 additions and 145 deletions

View file

@ -626,8 +626,8 @@ public class ResponseBean implements Serializable {
* @throws Exception thrown if commit fails * @throws Exception thrown if commit fails
*/ */
public void commit() throws Exception { public void commit() throws Exception {
if (Thread.currentThread() instanceof Transactor) { Transactor tx = Transactor.getInstance();
Transactor tx = (Transactor) Thread.currentThread(); if (tx != null) {
String tname = tx.getTransactionName(); String tname = tx.getTransactionName();
tx.commit(); tx.commit();
tx.begin(tname); tx.begin(tname);
@ -640,8 +640,8 @@ public class ResponseBean implements Serializable {
* @throws Exception thrown if rollback fails * @throws Exception thrown if rollback fails
*/ */
public void rollback() throws Exception { public void rollback() throws Exception {
if (Thread.currentThread() instanceof Transactor) { Transactor tx = Transactor.getInstance();
Transactor tx = (Transactor) Thread.currentThread(); if (tx != null) {
String tname = tx.getTransactionName(); String tname = tx.getTransactionName();
tx.abort(); tx.abort();
tx.begin(tname); tx.begin(tname);

View file

@ -53,7 +53,9 @@ public final class RequestEvaluator implements Runnable {
private volatile ResponseTrans res; private volatile ResponseTrans res;
// the one and only transactor thread // the one and only transactor thread
private volatile Transactor rtx; private volatile Thread thread;
private volatile Transactor transactor;
// the type of request to be serviced, // the type of request to be serviced,
// used to coordinate worker and waiter threads // used to coordinate worker and waiter threads
@ -128,13 +130,13 @@ public final class RequestEvaluator implements Runnable {
public void run() { public void run() {
// first, set a local variable to the current transactor thread so we know // first, set a local variable to the current transactor thread so we know
// when it's time to quit because another thread took over. // when it's time to quit because another thread took over.
Transactor localrtx = (Transactor) Thread.currentThread(); Thread localThread = Thread.currentThread();
// spans whole execution loop - close connections in finally clause // spans whole execution loop - close connections in finally clause
try { try {
// while this thread is serving requests // while this thread is serving requests
while (localrtx == rtx) { while (localThread == thread) {
// object reference to ressolve request path // object reference to ressolve request path
Object currentElement; Object currentElement;
@ -153,7 +155,7 @@ public final class RequestEvaluator implements Runnable {
String functionName = function instanceof String ? String functionName = function instanceof String ?
(String) function : null; (String) function : null;
while (!done && localrtx == rtx) { while (!done && localThread == thread) {
// catch errors in path resolution and script execution // catch errors in path resolution and script execution
try { try {
@ -161,7 +163,7 @@ public final class RequestEvaluator implements Runnable {
initScriptingEngine(); initScriptingEngine();
app.setCurrentRequestEvaluator(this); app.setCurrentRequestEvaluator(this);
// update scripting prototypes // update scripting prototypes
scriptingEngine.updatePrototypes(); scriptingEngine.enterContext();
// avoid going into transaction if called function doesn't exist. // avoid going into transaction if called function doesn't exist.
@ -196,7 +198,8 @@ public final class RequestEvaluator implements Runnable {
txname.append((error == null) ? req.getPath() : "error"); txname.append((error == null) ? req.getPath() : "error");
// begin transaction // begin transaction
localrtx.begin(txname.toString()); transactor = Transactor.getInstance(app.nmgr);
transactor.begin(txname.toString());
Object root = app.getDataRoot(); Object root = app.getDataRoot();
initGlobals(root, requestPath); initGlobals(root, requestPath);
@ -412,7 +415,7 @@ public final class RequestEvaluator implements Runnable {
} }
// check if request is still valid, or if the requesting thread has stopped waiting already // check if request is still valid, or if the requesting thread has stopped waiting already
if (localrtx != rtx) { if (localThread != thread) {
return; return;
} }
commitTransaction(); commitTransaction();
@ -459,13 +462,13 @@ public final class RequestEvaluator implements Runnable {
ScriptingEngine.ARGS_WRAP_XMLRPC, ScriptingEngine.ARGS_WRAP_XMLRPC,
false); false);
// check if request is still valid, or if the requesting thread has stopped waiting already // check if request is still valid, or if the requesting thread has stopped waiting already
if (localrtx != rtx) { if (localThread != thread) {
return; return;
} }
commitTransaction(); commitTransaction();
} catch (Exception x) { } catch (Exception x) {
// check if request is still valid, or if the requesting thread has stopped waiting already // check if request is still valid, or if the requesting thread has stopped waiting already
if (localrtx != rtx) { if (localThread != thread) {
return; return;
} }
abortTransaction(); abortTransaction();
@ -473,7 +476,7 @@ public final class RequestEvaluator implements Runnable {
// If the transactor thread has been killed by the invoker thread we don't have to // If the transactor thread has been killed by the invoker thread we don't have to
// bother for the error message, just quit. // bother for the error message, just quit.
if (localrtx != rtx) { if (localThread != thread) {
return; return;
} }
@ -495,13 +498,13 @@ public final class RequestEvaluator implements Runnable {
ScriptingEngine.ARGS_WRAP_DEFAULT, ScriptingEngine.ARGS_WRAP_DEFAULT,
true); true);
// check if request is still valid, or if the requesting thread has stopped waiting already // check if request is still valid, or if the requesting thread has stopped waiting already
if (localrtx != rtx) { if (localThread != thread) {
return; return;
} }
commitTransaction(); commitTransaction();
} catch (Exception x) { } catch (Exception x) {
// check if request is still valid, or if the requesting thread has stopped waiting already // check if request is still valid, or if the requesting thread has stopped waiting already
if (localrtx != rtx) { if (localThread != thread) {
return; return;
} }
abortTransaction(); abortTransaction();
@ -509,7 +512,7 @@ public final class RequestEvaluator implements Runnable {
// If the transactor thread has been killed by the invoker thread we don't have to // If the transactor thread has been killed by the invoker thread we don't have to
// bother for the error message, just quit. // bother for the error message, just quit.
if (localrtx != rtx) { if (localThread != thread) {
return; return;
} }
@ -524,7 +527,7 @@ public final class RequestEvaluator implements Runnable {
// res.abort() just aborts the transaction and // res.abort() just aborts the transaction and
// leaves the response untouched // leaves the response untouched
// check if request is still valid, or if the requesting thread has stopped waiting already // check if request is still valid, or if the requesting thread has stopped waiting already
if (localrtx != rtx) { if (localThread != thread) {
return; return;
} }
abortTransaction(); abortTransaction();
@ -535,7 +538,7 @@ public final class RequestEvaluator implements Runnable {
if (++tries < 8) { if (++tries < 8) {
// try again after waiting some period // try again after waiting some period
// check if request is still valid, or if the requesting thread has stopped waiting already // check if request is still valid, or if the requesting thread has stopped waiting already
if (localrtx != rtx) { if (localThread != thread) {
return; return;
} }
abortTransaction(); abortTransaction();
@ -549,11 +552,12 @@ public final class RequestEvaluator implements Runnable {
res.reportError(interrupt); res.reportError(interrupt);
done = true; done = true;
// and release resources and thread // and release resources and thread
rtx = null; thread = null;
transactor = null;
} }
} else { } else {
// check if request is still valid, or if the requesting thread has stopped waiting already // check if request is still valid, or if the requesting thread has stopped waiting already
if (localrtx != rtx) { if (localThread != thread) {
return; return;
} }
abortTransaction(); abortTransaction();
@ -563,16 +567,15 @@ public final class RequestEvaluator implements Runnable {
done = true; done = true;
} }
} catch (Throwable x) { } catch (Throwable x) {
String txname = localrtx.getTransactionName();
// check if request is still valid, or if the requesting thread has stopped waiting already // check if request is still valid, or if the requesting thread has stopped waiting already
if (localrtx != rtx) { if (localThread != thread) {
return; return;
} }
abortTransaction(); abortTransaction();
// If the transactor thread has been killed by the invoker thread we don't have to // If the transactor thread has been killed by the invoker thread we don't have to
// bother for the error message, just quit. // bother for the error message, just quit.
if (localrtx != rtx) { if (localThread != thread) {
return; return;
} }
@ -589,6 +592,8 @@ public final class RequestEvaluator implements Runnable {
done = false; done = false;
error = x; error = x;
Transactor tx = Transactor.getInstance();
String txname = tx == null ? "no-txn" : tx.getTransactionName();
app.logError(txname + ": " + error, x); app.logError(txname + ": " + error, x);
if (req.isXmlRpc()) { if (req.isXmlRpc()) {
@ -607,18 +612,18 @@ public final class RequestEvaluator implements Runnable {
} }
} finally { } finally {
app.setCurrentRequestEvaluator(null); app.setCurrentRequestEvaluator(null);
}
}
// exit execution context // exit execution context
if (scriptingEngine != null) if (scriptingEngine != null)
scriptingEngine.exitContext(); scriptingEngine.exitContext();
}
}
notifyAndWait(); notifyAndWait();
} }
} finally { } finally {
localrtx.closeConnections(); Transactor tx = Transactor.getInstance();
if (tx != null) tx.closeConnections();
} }
} }
@ -627,10 +632,12 @@ public final class RequestEvaluator implements Runnable {
* @throws Exception transaction couldn't be committed * @throws Exception transaction couldn't be committed
*/ */
synchronized void commitTransaction() throws Exception { synchronized void commitTransaction() throws Exception {
Transactor localrtx = (Transactor) Thread.currentThread(); Thread localThread = Thread.currentThread();
if (localrtx == rtx) { if (localThread == thread) {
localrtx.commit(); Transactor tx = Transactor.getInstance();
if (tx != null)
tx.commit();
} else { } else {
throw new TimeoutException(); throw new TimeoutException();
} }
@ -640,8 +647,8 @@ public final class RequestEvaluator implements Runnable {
* Called by the transactor thread when the request didn't terminate successfully. * Called by the transactor thread when the request didn't terminate successfully.
*/ */
synchronized void abortTransaction() { synchronized void abortTransaction() {
Transactor localrtx = (Transactor) Thread.currentThread(); Transactor tx = Transactor.getInstance();
localrtx.abort(); if (tx != null) tx.abort();
} }
/** /**
@ -652,11 +659,11 @@ public final class RequestEvaluator implements Runnable {
throw new ApplicationStoppedException(); throw new ApplicationStoppedException();
} }
if ((rtx == null) || !rtx.isAlive()) { if ((thread == null) || !thread.isAlive()) {
// app.logEvent ("Starting Thread"); // app.logEvent ("Starting Thread");
rtx = new Transactor(this, app.threadgroup, app.nmgr); thread = new Thread(app.threadgroup, this);
rtx.setContextClassLoader(app.getClassLoader()); thread.setContextClassLoader(app.getClassLoader());
rtx.start(); thread.start();
} else { } else {
notifyAll(); notifyAll();
} }
@ -666,14 +673,17 @@ public final class RequestEvaluator implements Runnable {
* Tell waiting thread that we're done, then wait for next request * Tell waiting thread that we're done, then wait for next request
*/ */
synchronized void notifyAndWait() { synchronized void notifyAndWait() {
Transactor localrtx = (Transactor) Thread.currentThread(); Thread localThread = Thread.currentThread();
// make sure there is only one thread running per instance of this class // make sure there is only one thread running per instance of this class
// if localrtx != rtx, the current thread has been aborted and there's no need to notify // if localrtx != rtx, the current thread has been aborted and there's no need to notify
if (localrtx != rtx) { if (localThread != thread) {
// A new request came in while we were finishing the last one. // A new request came in while we were finishing the last one.
// Return to run() to get the work done. // Return to run() to get the work done.
localrtx.closeConnections(); Transactor tx = Transactor.getInstance();
if (tx != null) {
tx.closeConnections();
}
return; return;
} }
@ -685,16 +695,18 @@ public final class RequestEvaluator implements Runnable {
wait(1000 * 60 * 10); wait(1000 * 60 * 10);
} catch (InterruptedException ix) { } catch (InterruptedException ix) {
// we got interrrupted, releases resources and thread // we got interrrupted, releases resources and thread
rtx = null; thread = null;
transactor = null;
} }
// if no request arrived, release ressources and thread // if no request arrived, release ressources and thread
if ((reqtype == NONE) && (rtx == localrtx)) { if ((reqtype == NONE) && (thread == localThread)) {
// comment this in to release not just the thread, but also the scripting engine. // comment this in to release not just the thread, but also the scripting engine.
// currently we don't do this because of the risk of memory leaks (objects from // currently we don't do this because of the risk of memory leaks (objects from
// framework referencing into the scripting engine) // framework referencing into the scripting engine)
// scriptingEngine = null; // scriptingEngine = null;
rtx = null; thread = null;
transactor = null;
} }
} }
@ -704,8 +716,9 @@ public final class RequestEvaluator implements Runnable {
* thread. If currently active kill the request, otherwise just notify. * thread. If currently active kill the request, otherwise just notify.
*/ */
synchronized boolean stopTransactor() { synchronized boolean stopTransactor() {
Transactor t = rtx; Transactor t = transactor;
rtx = null; thread = null;
transactor = null;
boolean stopped = false; boolean stopped = false;
if (t != null && t.isActive()) { if (t != null && t.isActive()) {
// let the scripting engine know that the // let the scripting engine know that the
@ -971,7 +984,7 @@ public final class RequestEvaluator implements Runnable {
globals.put("path", requestPath); globals.put("path", requestPath);
// enter execution context // enter execution context
scriptingEngine.enterContext(globals); scriptingEngine.setGlobals(globals);
} }
/** /**
@ -1092,8 +1105,8 @@ public final class RequestEvaluator implements Runnable {
* *
* @return the current transactor thread * @return the current transactor thread
*/ */
public synchronized Transactor getThread() { public synchronized Thread getThread() {
return rtx; return thread;
} }
/** /**

View file

@ -16,8 +16,8 @@
package helma.framework.core; package helma.framework.core;
import helma.objectmodel.INode; import helma.objectmodel.INode;
import helma.objectmodel.db.Node;
import helma.objectmodel.db.NodeHandle; import helma.objectmodel.db.NodeHandle;
import helma.objectmodel.db.Transactor;
import helma.scripting.ScriptingEngine; import helma.scripting.ScriptingEngine;
import java.util.*; import java.util.*;
@ -206,8 +206,10 @@ public class SessionManager {
} }
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
Transactor tx = Transactor.getInstance(app.getNodeManager());
try { try {
tx.begin("sessionloader");
// load the stored data: // load the stored data:
InputStream istream = new BufferedInputStream(new FileInputStream(f)); InputStream istream = new BufferedInputStream(new FileInputStream(f));
ObjectInputStream p = new ObjectInputStream(istream); ObjectInputStream p = new ObjectInputStream(istream);
@ -230,11 +232,17 @@ public class SessionManager {
istream.close(); istream.close();
sessions = newSessions; sessions = newSessions;
app.logEvent("loaded " + newSessions.size() + " sessions from file"); app.logEvent("loaded " + newSessions.size() + " sessions from file");
tx.commit();
} catch (FileNotFoundException fnf) { } catch (FileNotFoundException fnf) {
// suppress error message if session file doesn't exist // suppress error message if session file doesn't exist
tx.abort();
} catch (Exception e) { } catch (Exception e) {
app.logError("error loading session data.", e); app.logError("error loading session data.", e);
tx.abort();
} finally {
tx.closeConnections();
} }
} }
/** /**

View file

@ -69,9 +69,8 @@ public class DbSource {
public synchronized Connection getConnection() public synchronized Connection getConnection()
throws ClassNotFoundException, SQLException { throws ClassNotFoundException, SQLException {
Connection con; Connection con;
Transactor tx = null; Transactor tx = Transactor.getInstance();
if (Thread.currentThread() instanceof Transactor) { if (tx != null) {
tx = (Transactor) Thread.currentThread();
con = tx.getConnection(this); con = tx.getConnection(this);
} else { } else {
con = getThreadLocalConnection(); con = getThreadLocalConnection();

View file

@ -266,9 +266,9 @@ public final class Node implements INode, Serializable {
return; // no need to lock transient node return; // no need to lock transient node
} }
Transactor current = (Transactor) Thread.currentThread(); Transactor tx = Transactor.getInstanceOrFail();
if (!current.isActive()) { if (!tx.isActive()) {
throw new helma.framework.TimeoutException(); throw new helma.framework.TimeoutException();
} }
@ -279,14 +279,14 @@ public final class Node implements INode, Serializable {
" was invalidated by another thread."); " was invalidated by another thread.");
} }
if ((lock != null) && (lock != current) && lock.isAlive() && lock.isActive()) { if ((lock != null) && (lock != tx) && lock.isAlive() && lock.isActive()) {
// nmgr.logEvent("Concurrency conflict for " + this + ", lock held by " + lock); // nmgr.logEvent("Concurrency conflict for " + this + ", lock held by " + lock);
throw new ConcurrencyException("Tried to modify " + this + throw new ConcurrencyException("Tried to modify " + this +
" from two threads at the same time."); " from two threads at the same time.");
} }
current.visitDirtyNode(this); tx.visitDirtyNode(this);
lock = current; lock = tx;
} }
/** /**
@ -306,9 +306,8 @@ public final class Node implements INode, Serializable {
state = s; state = s;
if (Thread.currentThread() instanceof Transactor) { Transactor tx = Transactor.getInstance();
Transactor tx = (Transactor) Thread.currentThread(); if (tx != null) {
if (s == CLEAN) { if (s == CLEAN) {
clearWriteLock(); clearWriteLock();
tx.dropDirtyNode(this); tx.dropDirtyNode(this);
@ -332,11 +331,13 @@ public final class Node implements INode, Serializable {
// the process of being persistified - except if "manual" subnoderelation is set. // the process of being persistified - except if "manual" subnoderelation is set.
if ((state == TRANSIENT || state == NEW) && subnodeRelation == null) { if ((state == TRANSIENT || state == NEW) && subnodeRelation == null) {
return; return;
} else if (Thread.currentThread() instanceof Transactor) { } else {
Transactor tx = (Transactor) Thread.currentThread(); Transactor tx = Transactor.getInstance();
if (tx != null) {
tx.visitParentNode(this); tx.visitParentNode(this);
} }
} }
}
/** /**
* Notify the node's parent that its child collection needs to be reloaded * Notify the node's parent that its child collection needs to be reloaded
@ -934,7 +935,7 @@ public final class Node implements INode, Serializable {
} }
if (state != TRANSIENT) { if (state != TRANSIENT) {
Transactor tx = (Transactor) Thread.currentThread(); Transactor tx = Transactor.getInstanceOrFail();
SyntheticKey key = new SyntheticKey(this.getKey(), prop); SyntheticKey key = new SyntheticKey(this.getKey(), prop);
tx.visitCleanNode(key, node); tx.visitCleanNode(key, node);
nmgr.registerNode(node, key); nmgr.registerNode(node, key);
@ -1250,7 +1251,7 @@ public final class Node implements INode, Serializable {
// nodemanager. Otherwise, we just evict whatever was there before // nodemanager. Otherwise, we just evict whatever was there before
if (create) { if (create) {
// register group node with transactor // register group node with transactor
Transactor tx = (Transactor) Thread.currentThread(); Transactor tx = Transactor.getInstanceOrFail();
tx.visitCleanNode(node); tx.visitCleanNode(node);
nmgr.registerNode(node); nmgr.registerNode(node);
} else { } else {
@ -2387,7 +2388,7 @@ public final class Node implements INode, Serializable {
// this is done anyway when the node becomes persistent. // this is done anyway when the node becomes persistent.
if (n.state != TRANSIENT) { if (n.state != TRANSIENT) {
// check node in with transactor cache // check node in with transactor cache
Transactor tx = (Transactor) Thread.currentThread(); Transactor tx = Transactor.getInstanceOrFail();
// tx.visitCleanNode (new DbKey (dbm, nID), n); // tx.visitCleanNode (new DbKey (dbm, nID), n);
// UPDATE: using n.getKey() instead of manually constructing key. HW 2002/09/13 // UPDATE: using n.getKey() instead of manually constructing key. HW 2002/09/13
@ -2539,9 +2540,9 @@ public final class Node implements INode, Serializable {
getHandle().becomePersistent(); getHandle().becomePersistent();
// register node with the transactor // register node with the transactor
Transactor current = (Transactor) Thread.currentThread(); Transactor tx = Transactor.getInstanceOrFail();
current.visitDirtyNode(this); tx.visitDirtyNode(this);
current.visitCleanNode(this); tx.visitCleanNode(this);
// recursively make children persistable // recursively make children persistable
makeChildrenPersistable(); makeChildrenPersistable();

View file

@ -149,7 +149,7 @@ public final class NodeManager {
public void deleteNode(Node node) throws Exception { public void deleteNode(Node node) throws Exception {
if (node != null) { if (node != null) {
synchronized (this) { synchronized (this) {
Transactor tx = (Transactor) Thread.currentThread(); Transactor tx = Transactor.getInstanceOrFail();
node.setState(Node.INVALID); node.setState(Node.INVALID);
deleteNode(db, tx.txn, node); deleteNode(db, tx.txn, node);
@ -162,7 +162,7 @@ public final class NodeManager {
* a reference to another node via a NodeHandle/Key. * a reference to another node via a NodeHandle/Key.
*/ */
public Node getNode(Key key) throws Exception { public Node getNode(Key key) throws Exception {
Transactor tx = (Transactor) Thread.currentThread(); Transactor tx = Transactor.getInstanceOrFail();
// See if Transactor has already come across this node // See if Transactor has already come across this node
Node node = tx.getCleanNode(key); Node node = tx.getCleanNode(key);
@ -212,7 +212,7 @@ public final class NodeManager {
return null; return null;
} }
Transactor tx = (Transactor) Thread.currentThread(); Transactor tx = Transactor.getInstanceOrFail();
Key key; Key key;
DbMapping otherDbm = rel == null ? null : rel.otherType; DbMapping otherDbm = rel == null ? null : rel.otherType;
@ -408,8 +408,9 @@ public final class NodeManager {
public void evictKey(Key key) { public void evictKey(Key key) {
cache.remove(key); cache.remove(key);
// also drop key from thread-local transactor cache // also drop key from thread-local transactor cache
if (Thread.currentThread() instanceof Transactor) { Transactor tx = Transactor.getInstance();
((Transactor) Thread.currentThread()).dropCleanNode(key); if (tx != null) {
tx.dropCleanNode(key);
} }
} }
@ -1856,16 +1857,19 @@ public final class NodeManager {
if (id == null) { if (id == null) {
return null; return null;
} else if (Thread.currentThread() instanceof Transactor) { } else {
Transactor tx = Transactor.getInstance();
if (tx != null) {
// Check if the node is already registered with the transactor - // Check if the node is already registered with the transactor -
// it may be in the process of being DELETED, but do return the // it may be in the process of being DELETED, but do return the
// new node if the old one has been marked as INVALID. // new node if the old one has been marked as INVALID.
DbKey key = new DbKey(dbmap, id); DbKey key = new DbKey(dbmap, id);
Node dirtyNode = ((Transactor) Thread.currentThread()).getDirtyNode(key); Node dirtyNode = tx.getDirtyNode(key);
if (dirtyNode != null && dirtyNode.getState() != Node.INVALID) { if (dirtyNode != null && dirtyNode.getState() != Node.INVALID) {
return dirtyNode; return dirtyNode;
} }
} }
}
Hashtable propMap = new Hashtable(); Hashtable propMap = new Hashtable();
DbColumn[] columns2 = dbmap.getColumns(); DbColumn[] columns2 = dbmap.getColumns();

View file

@ -28,7 +28,7 @@ import java.util.*;
* A subclass of thread that keeps track of changed nodes and triggers * A subclass of thread that keeps track of changed nodes and triggers
* changes in the database when a transaction is commited. * changes in the database when a transaction is commited.
*/ */
public class Transactor extends Thread { public class Transactor {
// The associated node manager // The associated node manager
NodeManager nmgr; NodeManager nmgr;
@ -61,15 +61,18 @@ public class Transactor extends Thread {
// a name to log the transaction. For HTTP transactions this is the rerquest path // a name to log the transaction. For HTTP transactions this is the rerquest path
private String tname; private String tname;
// the thread we're associated with
private Thread thread;
private static final ThreadLocal <Transactor> txtor = new ThreadLocal <Transactor> ();
/** /**
* Creates a new Transactor object. * Creates a new Transactor object.
* *
* @param runnable ... * @param nmgr the NodeManager used to fetch and persist nodes.
* @param group ...
* @param nmgr ...
*/ */
public Transactor(Runnable runnable, ThreadGroup group, NodeManager nmgr) { private Transactor(NodeManager nmgr) {
super(group, runnable, group.getName()); this.thread = Thread.currentThread();
this.nmgr = nmgr; this.nmgr = nmgr;
dirtyNodes = new HashMap(); dirtyNodes = new HashMap();
@ -82,6 +85,41 @@ public class Transactor extends Thread {
killed = false; killed = false;
} }
/**
* Get the transactor for the current thread or null if none exists.
* @return the transactor associated with the current thread
*/
public static Transactor getInstance() {
return txtor.get();
}
/**
* Get the transactor for the current thread or throw a IllegalStateException if none exists.
* @return the transactor associated with the current thread
* @throws IllegalStateException if no transactor is associated with the current thread
*/
public static Transactor getInstanceOrFail() throws IllegalStateException {
Transactor tx = txtor.get();
if (tx == null)
throw new IllegalStateException("Operation requires a Transactor, " +
"but current thread does not have one.");
return tx;
}
/**
* Get the transactor for the current thread, creating a new one if none exists.
* @param nmgr the NodeManager used to create the transactor
* @return the transactor associated with the current thread
*/
public static Transactor getInstance(NodeManager nmgr) {
Transactor t = txtor.get();
if (t == null) {
t = new Transactor(nmgr);
txtor.set(t);
}
return t;
}
/** /**
* Mark a Node as modified/created/deleted during this transaction * Mark a Node as modified/created/deleted during this transaction
* *
@ -185,6 +223,15 @@ public class Transactor extends Thread {
return active; return active;
} }
/**
* Check whether the thread associated with this transactor is alive.
* This is a proxy to Thread.isAlive().
* @return true if the thread running this transactor is currently alive.
*/
public boolean isAlive() {
return thread != null && thread.isAlive();
}
/** /**
* Register a db connection with this transactor thread. * Register a db connection with this transactor thread.
* @param src the db source * @param src the db source
@ -430,28 +477,29 @@ public class Transactor extends Thread {
* Kill this transaction thread. Used as last measure only. * Kill this transaction thread. Used as last measure only.
*/ */
public synchronized void kill() { public synchronized void kill() {
killed = true; killed = true;
interrupt(); thread.interrupt();
// Interrupt the thread if it has not noticed the flag (e.g. because it is busy // Interrupt the thread if it has not noticed the flag (e.g. because it is busy
// reading from a network socket). // reading from a network socket).
if (isAlive()) { if (thread.isAlive()) {
interrupt(); thread.interrupt();
try { try {
join(1000); thread.join(1000);
} catch (InterruptedException ir) { } catch (InterruptedException ir) {
// interrupted by other thread // interrupted by other thread
} }
} }
if (isAlive() && "true".equals(nmgr.app.getProperty("requestTimeoutStop"))) { if (thread.isAlive() && "true".equals(nmgr.app.getProperty("requestTimeoutStop"))) {
// still running - check if we ought to stop() it // still running - check if we ought to stop() it
try { try {
Thread.sleep(2000); Thread.sleep(2000);
if (isAlive()) { if (thread.isAlive()) {
// thread is still running, pull emergency break // thread is still running, pull emergency break
nmgr.app.logEvent("Stopping Thread for Transactor " + this); nmgr.app.logEvent("Stopping Thread for Transactor " + this);
stop(); thread.stop();
} }
} catch (InterruptedException ir) { } catch (InterruptedException ir) {
// interrupted by other thread // interrupted by other thread

View file

@ -18,7 +18,6 @@ package helma.objectmodel.db;
import helma.objectmodel.ObjectNotFoundException; import helma.objectmodel.ObjectNotFoundException;
import java.util.List;
import java.util.Vector; import java.util.Vector;
/** /**
@ -56,11 +55,17 @@ public final class WrappedNodeManager {
* @return * @return
*/ */
public Node getNode(Key key) { public Node getNode(Key key) {
Transactor tx = checkLocalTransactor();
try { try {
return nmgr.getNode(key); beginLocalTransaction(tx, "getNode");
Node node = nmgr.getNode(key);
commitLocalTransaction(tx);
return node;
} catch (ObjectNotFoundException x) { } catch (ObjectNotFoundException x) {
abortLocalTransaction(tx);
return null; return null;
} catch (Exception x) { } catch (Exception x) {
abortLocalTransaction(tx);
nmgr.app.logError("Error retrieving Node for " + key, x); nmgr.app.logError("Error retrieving Node for " + key, x);
throw new RuntimeException("Error retrieving Node", x); throw new RuntimeException("Error retrieving Node", x);
} }
@ -75,11 +80,17 @@ public final class WrappedNodeManager {
* @return * @return
*/ */
public Node getNode(Node home, String id, Relation rel) { public Node getNode(Node home, String id, Relation rel) {
Transactor tx = checkLocalTransactor();
try { try {
return nmgr.getNode(home, id, rel); beginLocalTransaction(tx, "getNode");
Node node = nmgr.getNode(home, id, rel);
commitLocalTransaction(tx);
return node;
} catch (ObjectNotFoundException x) { } catch (ObjectNotFoundException x) {
abortLocalTransaction(tx);
return null; return null;
} catch (Exception x) { } catch (Exception x) {
abortLocalTransaction(tx);
nmgr.app.logError("Error retrieving Node \"" + id + "\" from " + home, x); nmgr.app.logError("Error retrieving Node \"" + id + "\" from " + home, x);
throw new RuntimeException("Error retrieving Node", x); throw new RuntimeException("Error retrieving Node", x);
} }
@ -94,9 +105,14 @@ public final class WrappedNodeManager {
* @return * @return
*/ */
public SubnodeList getNodes(Node home, Relation rel) { public SubnodeList getNodes(Node home, Relation rel) {
Transactor tx = checkLocalTransactor();
try { try {
return nmgr.getNodes(home, rel); beginLocalTransaction(tx, "getNodes");
SubnodeList list = nmgr.getNodes(home, rel);
commitLocalTransaction(tx);
return list;
} catch (Exception x) { } catch (Exception x) {
abortLocalTransaction(tx);
throw new RuntimeException("Error retrieving Nodes", x); throw new RuntimeException("Error retrieving Nodes", x);
} }
} }
@ -150,9 +166,13 @@ public final class WrappedNodeManager {
* @param node * @param node
*/ */
public void deleteNode(Node node) { public void deleteNode(Node node) {
Transactor tx = checkLocalTransactor();
try { try {
beginLocalTransaction(tx, "deleteNode");
nmgr.deleteNode(node); nmgr.deleteNode(node);
commitLocalTransaction(tx);
} catch (Exception x) { } catch (Exception x) {
abortLocalTransaction(tx);
throw new RuntimeException("Error deleting Node", x); throw new RuntimeException("Error deleting Node", x);
} }
} }
@ -236,9 +256,14 @@ public final class WrappedNodeManager {
* Gets the application's root node. * Gets the application's root node.
*/ */
public Node getRootNode() { public Node getRootNode() {
Transactor tx = checkLocalTransactor();
try { try {
return nmgr.getRootNode(); beginLocalTransaction(tx, "getRootNode");
Node node = nmgr.getRootNode();
commitLocalTransaction(tx);
return node;
} catch (Exception x) { } catch (Exception x) {
abortLocalTransaction(tx);
throw new RuntimeException(x.toString(), x); throw new RuntimeException(x.toString(), x);
} }
} }
@ -275,4 +300,45 @@ public final class WrappedNodeManager {
public DbMapping getDbMapping(String name) { public DbMapping getDbMapping(String name) {
return nmgr.app.getDbMapping(name); return nmgr.app.getDbMapping(name);
} }
// helper methods to wrap execution inside local transactions
private Transactor checkLocalTransactor() {
Transactor tx = Transactor.getInstance();
if (tx != null) {
// transactor already associated with current thread - return null
return null;
}
return Transactor.getInstance(nmgr);
}
private void beginLocalTransaction(Transactor tx, String name) {
if (tx != null) {
try {
tx.begin(name);
} catch (Exception x) {
nmgr.app.logError("Error in beginLocalTransaction", x);
}
}
}
private void commitLocalTransaction(Transactor tx) {
if (tx != null) {
try {
tx.commit();
} catch (Exception x) {
nmgr.app.logError("Error in commitLocalTransaction", x);
}
}
}
private void abortLocalTransaction(Transactor tx) {
if (tx != null) {
try {
tx.abort();
} catch (Exception x) {
nmgr.app.logError("Error in abortLocalTransaction", x);
}
}
}
} }

View file

@ -16,7 +16,6 @@
package helma.scripting; package helma.scripting;
import helma.framework.IPathElement;
import helma.framework.repository.Resource; import helma.framework.repository.Resource;
import helma.framework.core.Application; import helma.framework.core.Application;
import helma.framework.core.RequestEvaluator; import helma.framework.core.RequestEvaluator;
@ -56,22 +55,28 @@ public interface ScriptingEngine {
/** /**
* Init the scripting engine with an application and a request evaluator * Init the scripting engine with an application and a request evaluator
* @param app the application
* @param reval the request evaluator
*/ */
public void init(Application app, RequestEvaluator reval); public void init(Application app, RequestEvaluator reval);
/** /**
* This method is called before an execution context for a request * This method is called when an execution context for a request
* evaluation is entered to let the Engine know it should update * evaluation is entered to let the Engine know it should update
* its prototype information * its prototype information
* @throws IOException an I/O exception occurred
* @throws ScriptingException a script related exception occurred
*/ */
public void updatePrototypes() throws IOException, ScriptingException; public void enterContext() throws IOException, ScriptingException;
/** /**
* This method is called when an execution context for a request * This method is called when an execution context for a request
* evaluation is entered. The globals parameter contains the global values * evaluation is entered. The globals parameter contains the global values
* to be applied during this execution context. * to be applied during this execution context.
* @param globals map of global variables
* @throws ScriptingException a script related exception occurred
*/ */
public void enterContext(Map globals) throws ScriptingException; public void setGlobals(Map globals) throws ScriptingException;
/** /**
* This method is called to let the scripting engine know that the current * This method is called to let the scripting engine know that the current

View file

@ -192,6 +192,7 @@ public final class RhinoCore implements ScopeProvider {
} }
void initDebugger(Context context) { void initDebugger(Context context) {
context.setGeneratingDebug(true);
try { try {
if (debugger == null) { if (debugger == null) {
debugger = new HelmaDebugger(app.getName()); debugger = new HelmaDebugger(app.getName());
@ -1120,6 +1121,7 @@ public final class RhinoCore implements ScopeProvider {
protected void onContextCreated(Context cx) { protected void onContextCreated(Context cx) {
cx.setWrapFactory(wrapper); cx.setWrapFactory(wrapper);
cx.setOptimizationLevel(optLevel); cx.setOptimizationLevel(optLevel);
// cx.setInstructionObserverThreshold(5000);
if (cx.isValidLanguageVersion(languageVersion)) { if (cx.isValidLanguageVersion(languageVersion)) {
cx.setLanguageVersion(languageVersion); cx.setLanguageVersion(languageVersion);
} else { } else {
@ -1143,5 +1145,16 @@ public final class RhinoCore implements ScopeProvider {
return super.hasFeature(cx, featureIndex); return super.hasFeature(cx, featureIndex);
} }
} }
/**
* Implementation of
* {@link Context#observeInstructionCount(int instructionCount)}.
* This can be used to customize {@link Context} without introducing
* additional subclasses.
*/
/* protected void observeInstructionCount(Context cx, int instructionCount) {
if (instructionCount >= 0xfffffff)
throw new EvaluatorException("Exceeded instruction count, interrupting");
} */
} }
} }

View file

@ -26,7 +26,6 @@ import helma.main.Server;
import helma.objectmodel.*; import helma.objectmodel.*;
import helma.objectmodel.db.DbMapping; import helma.objectmodel.db.DbMapping;
import helma.objectmodel.db.Relation; import helma.objectmodel.db.Relation;
import helma.objectmodel.db.NodeHandle;
import helma.scripting.*; import helma.scripting.*;
import helma.scripting.rhino.debug.Tracer; import helma.scripting.rhino.debug.Tracer;
import helma.util.StringUtils; import helma.util.StringUtils;
@ -54,7 +53,7 @@ public class RhinoEngine implements ScriptingEngine {
// the per-thread global object // the per-thread global object
GlobalObject global; GlobalObject global;
// the request evaluator instance owning this fesi evaluator // the request evaluator instance owning this rhino engine
RequestEvaluator reval; RequestEvaluator reval;
// the rhino core // the rhino core
@ -149,7 +148,7 @@ public class RhinoEngine implements ScriptingEngine {
* This method is called before an execution context is entered to let the * This method is called before an execution context is entered to let the
* engine know it should update its prototype information. * engine know it should update its prototype information.
*/ */
public synchronized void updatePrototypes() throws IOException { public synchronized void enterContext() throws IOException {
// remember the current thread as our thread - we do this here so // remember the current thread as our thread - we do this here so
// the thread is already set when the RequestEvaluator calls // the thread is already set when the RequestEvaluator calls
// Application.getDataRoot(), which may result in a function invocation // Application.getDataRoot(), which may result in a function invocation
@ -173,7 +172,7 @@ public class RhinoEngine implements ScriptingEngine {
* evaluation is entered. The globals parameter contains the global values * evaluation is entered. The globals parameter contains the global values
* to be applied during this execution context. * to be applied during this execution context.
*/ */
public synchronized void enterContext(Map globals) throws ScriptingException { public synchronized void setGlobals(Map globals) throws ScriptingException {
// remember the current thread as our thread // remember the current thread as our thread
thread = Thread.currentThread(); thread = Thread.currentThread();
@ -523,15 +522,17 @@ public class RhinoEngine implements ScriptingEngine {
*/ */
public void serialize(Object obj, OutputStream out) throws IOException { public void serialize(Object obj, OutputStream out) throws IOException {
core.contextFactory.enter(); core.contextFactory.enter();
engines.set(this);
try { try {
// use a special ScriptableOutputStream that unwraps Wrappers // use a special ScriptableOutputStream that unwraps Wrappers
ScriptableOutputStream sout = new ScriptableOutputStream(out, core.global) { ScriptableOutputStream sout = new ScriptableOutputStream(out, core.global) {
protected Object replaceObject(Object obj) throws IOException { protected Object replaceObject(Object obj) throws IOException {
// FIXME doesn't work because we need a transactor thread for deserialization if (obj instanceof HopObject)
// if (obj instanceof Wrapper) return new HopObjectProxy((HopObject) obj);
// obj = ((Wrapper) obj).unwrap(); if (obj instanceof helma.objectmodel.db.Node)
// if (obj instanceof helma.objectmodel.db.Node) return new HopObjectProxy((helma.objectmodel.db.Node) obj);
// return ((helma.objectmodel.db.Node) obj).getHandle(); if (obj instanceof GlobalObject)
return new GlobalProxy((GlobalObject) obj);
if (obj instanceof ApplicationBean) if (obj instanceof ApplicationBean)
return new ScriptBeanProxy("app"); return new ScriptBeanProxy("app");
if (obj instanceof RequestBean) if (obj instanceof RequestBean)
@ -543,8 +544,8 @@ public class RhinoEngine implements ScriptingEngine {
return super.replaceObject(obj); return super.replaceObject(obj);
} }
}; };
sout.addExcludedName("Xml"); // sout.addExcludedName("Xml");
sout.addExcludedName("global"); // sout.addExcludedName("global");
sout.writeObject(obj); sout.writeObject(obj);
sout.flush(); sout.flush();
@ -564,15 +565,12 @@ public class RhinoEngine implements ScriptingEngine {
*/ */
public Object deserialize(InputStream in) throws IOException, ClassNotFoundException { public Object deserialize(InputStream in) throws IOException, ClassNotFoundException {
core.contextFactory.enter(); core.contextFactory.enter();
engines.set(this);
try { try {
ObjectInputStream sin = new ScriptableInputStream(in, core.global) { ObjectInputStream sin = new ScriptableInputStream(in, core.global) {
protected Object resolveObject(Object obj) throws IOException { protected Object resolveObject(Object obj) throws IOException {
if (obj instanceof NodeHandle) { if (obj instanceof SerializationProxy) {
// FIXME doesn't work unless we have a transactor thread return ((SerializationProxy) obj).getObject(RhinoEngine.this);
// Object node = ((NodeHandle) obj).getNode(app.getNodeManager().safe);
// return Context.toObject(node, global);
} else if (obj instanceof ScriptBeanProxy) {
return ((ScriptBeanProxy) obj).getObject();
} }
return super.resolveObject(obj); return super.resolveObject(obj);
} }
@ -610,7 +608,7 @@ public class RhinoEngine implements ScriptingEngine {
} }
/** /**
* Return the RequestEvaluator owning and driving this FESI evaluator. * Return the RequestEvaluator owningthis rhino engine.
*/ */
public RequestEvaluator getRequestEvaluator() { public RequestEvaluator getRequestEvaluator() {
return reval; return reval;
@ -687,24 +685,4 @@ public class RhinoEngine implements ScriptingEngine {
return skin; return skin;
} }
/**
* Serialization proxy for app, req, res, path objects.
*/
class ScriptBeanProxy implements Serializable {
String name;
ScriptBeanProxy(String name) {
this.name = name;
}
/**
* Lookup the actual object in the current scope
* @return the object represented by this proxy
*/
Object getObject() {
return global.get(name, global);
}
}
} }

View file

@ -0,0 +1,113 @@
/*
* Helma License Notice
*
* The contents of this file are subject to the Helma License
* Version 2.0 (the "License"). You may not use this file except in
* compliance with the License. A copy of the License is available at
* http://adele.helma.org/download/helma/license.txt
*
* Copyright 2008 Helma Software. All Rights Reserved.
*
* $RCSfile$
* $Author$
* $Revision$
* $Date$
*/
package helma.scripting.rhino;
import helma.objectmodel.INode;
import helma.objectmodel.db.NodeHandle;
import org.mozilla.javascript.Context;
import java.io.Serializable;
/**
* Serialization proxy/placeholder interface. This is used for
* for various Helma and Rhino related classes..
*/
public interface SerializationProxy extends Serializable {
public Object getObject(RhinoEngine engine);
}
/**
* Serialization proxy for app, req, res, path objects.
*/
class ScriptBeanProxy implements SerializationProxy {
String name;
ScriptBeanProxy(String name) {
this.name = name;
}
/**
* Lookup the actual object in the current scope
*
* @return the object represented by this proxy
*/
public Object getObject(RhinoEngine engine) {
return engine.global.get(name, engine.global);
}
}
/**
* Serialization proxy for global scope
*/
class GlobalProxy implements SerializationProxy {
boolean shared;
GlobalProxy(GlobalObject scope) {
shared = !scope.isThreadScope;
}
/**
* Lookup the actual object in the current scope
*
* @return the object represented by this proxy
*/
public Object getObject(RhinoEngine engine) {
return shared ? engine.core.global : engine.global;
}
}
/**
* Serialization proxy for various flavors of HopObjects/Nodes
*/
class HopObjectProxy implements SerializationProxy {
Object ref;
boolean wrapped = false;
HopObjectProxy(HopObject obj) {
INode n = obj.getNode();
if (n == null)
ref = obj.getClassName();
else {
if (n instanceof helma.objectmodel.db.Node)
ref = new NodeHandle(((helma.objectmodel.db.Node) n).getKey());
else
ref = n;
}
wrapped = true;
}
HopObjectProxy(helma.objectmodel.db.Node node) {
ref = new NodeHandle(node.getKey());
}
/**
* Lookup the actual object in the current scope
*
* @return the object represented by this proxy
*/
public Object getObject(RhinoEngine engine) {
if (ref instanceof String)
return engine.core.getPrototype((String) ref);
else if (ref instanceof NodeHandle) {
Object n = ((NodeHandle) ref).getNode(engine.app.getWrappedNodeManager());
return wrapped ? Context.toObject(n, engine.global) : n;
}
return Context.toObject(ref, engine.global);
}
}