More shifting and shoving and cleaning up in RequestEvaluator.

This commit is contained in:
hns 2005-01-26 15:43:56 +00:00
parent 1c635f8b5c
commit 479a70509d
2 changed files with 122 additions and 129 deletions

View file

@ -398,7 +398,7 @@ public final class Application implements IPathElement, Runnable {
for (Enumeration e = allThreads.elements(); e.hasMoreElements();) { for (Enumeration e = allThreads.elements(); e.hasMoreElements();) {
RequestEvaluator ev = (RequestEvaluator) e.nextElement(); RequestEvaluator ev = (RequestEvaluator) e.nextElement();
ev.stopThread(); ev.stopTransactor();
} }
} }
@ -532,7 +532,7 @@ public final class Application implements IPathElement, Runnable {
allThreads.removeElement(re); allThreads.removeElement(re);
// typemgr.unregisterRequestEvaluator (re); // typemgr.unregisterRequestEvaluator (re);
re.stopThread(); re.stopTransactor();
} catch (EmptyStackException empty) { } catch (EmptyStackException empty) {
return false; return false;
} }
@ -574,13 +574,13 @@ public final class Application implements IPathElement, Runnable {
ev = (RequestEvaluator) activeRequests.get(req); ev = (RequestEvaluator) activeRequests.get(req);
if (ev != null) { if (ev != null) {
res = ev.attachRequest(req); res = ev.attachHttpRequest(req);
} }
if (res == null) { if (res == null) {
primaryRequest = true; primaryRequest = true;
// if attachRequest returns null this means we came too late // if attachHttpRequest returns null this means we came too late
// and the other request was finished in the meantime // and the other request was finished in the meantime
// check if the properties file has been updated // check if the properties file has been updated
updateProperties(); updateProperties();
@ -595,7 +595,7 @@ public final class Application implements IPathElement, Runnable {
} catch (Exception x) { } catch (Exception x) {
errorCount += 1; errorCount += 1;
res = new ResponseTrans(); res = new ResponseTrans();
res.write("Error in application: <b>" + x.getMessage() + "</b>"); res.writeErrorReport(name, x.getMessage());
} finally { } finally {
if (primaryRequest) { if (primaryRequest) {
activeRequests.remove(req); activeRequests.remove(req);

View file

@ -114,28 +114,27 @@ public final class RequestEvaluator implements Runnable {
// update scripting prototypes // update scripting prototypes
scriptingEngine.updatePrototypes(); scriptingEngine.updatePrototypes();
// System.err.println ("Type check overhead: "+(System.currentTimeMillis ()-startCheck)+" millis"); // root object reference
// object refs to ressolve request path
Object root; Object root;
// System.err.println ("Type check overhead: "+(System.currentTimeMillis ()-startCheck)+" millis"); // object reference to ressolve request path
// object refs to ressolve request path
Object currentElement; Object currentElement;
// request path object
requestPath = new RequestPath(app); requestPath = new RequestPath(app);
int tries = 0; int tries = 0;
boolean done = false; boolean done = false;
String error = null; String error = null;
while (!done) { while (!done && localrtx == rtx) {
currentElement = null; currentElement = null;
try { try {
// TODO: transaction names are not set for internal/xmlrpc/external requests
// used for logging // Transaction name is used for logging etc.
StringBuffer txname = new StringBuffer(app.getName()); StringBuffer txname = new StringBuffer(app.getName());
txname.append("/"); txname.append(":").append(req.getMethod().toLowerCase()).append(":");
txname.append((error == null) ? req.path : "error"); txname.append((error == null) ? req.path : "error");
// begin transaction // begin transaction
@ -387,7 +386,7 @@ public final class RequestEvaluator implements Runnable {
ScriptingEngine.ARGS_WRAP_XMLRPC); ScriptingEngine.ARGS_WRAP_XMLRPC);
commitTransaction(); commitTransaction();
} catch (Exception x) { } catch (Exception x) {
abortTransaction(false); abortTransaction();
app.logEvent("Exception in " + Thread.currentThread() + ": " + app.logEvent("Exception in " + Thread.currentThread() + ": " +
x); x);
@ -406,10 +405,6 @@ public final class RequestEvaluator implements Runnable {
case INTERNAL: case INTERNAL:
// TODO: transaction names are not set for internal/xmlrpc/external requests
// Just a human readable descriptor of this invocation
// String funcdesc = app.getName() + ":internal:" + functionName;
// if thisObject is an instance of NodeHandle, get the node object itself. // if thisObject is an instance of NodeHandle, get the node object itself.
if ((thisObject != null) && thisObject instanceof NodeHandle) { if ((thisObject != null) && thisObject instanceof NodeHandle) {
thisObject = ((NodeHandle) thisObject).getNode(app.nmgr.safe); thisObject = ((NodeHandle) thisObject).getNode(app.nmgr.safe);
@ -444,7 +439,7 @@ public final class RequestEvaluator implements Runnable {
ScriptingEngine.ARGS_WRAP_DEFAULT); ScriptingEngine.ARGS_WRAP_DEFAULT);
commitTransaction(); commitTransaction();
} catch (Exception x) { } catch (Exception x) {
abortTransaction(false); abortTransaction();
app.logEvent("Exception in " + Thread.currentThread() + app.logEvent("Exception in " + Thread.currentThread() +
": " + x); ": " + x);
@ -468,7 +463,7 @@ public final class RequestEvaluator implements Runnable {
if (++tries < 8) { if (++tries < 8) {
// try again after waiting some period // try again after waiting some period
abortTransaction(true); abortTransaction();
try { try {
// wait a bit longer with each try // wait a bit longer with each try
@ -480,25 +475,17 @@ public final class RequestEvaluator implements Runnable {
continue; continue;
} else { } else {
abortTransaction(false); abortTransaction();
if (error == null) { if (error == null)
app.errorCount += 1; error = "Application too busy, please try again later";
// set done to false so that the error will be processed // error in error action. use traditional minimal error message
done = false; res.writeErrorReport(app.getName(), error);
error = "Couldn't complete transaction due to heavy object traffic (tried " + done = true;
tries + " times)";
} else {
// error in error action. use traditional minimal error message
res.write("<b>Error in application '" +
app.getName() + "':</b> <br><br><pre>" +
error + "</pre>");
done = true;
}
} }
} catch (Throwable x) { } catch (Throwable x) {
abortTransaction(false); abortTransaction();
// If the transactor thread has been killed by the invoker thread we don't have to // If the transactor thread has been killed by the invoker thread we don't have to
// bother for the error message, just quit. // bother for the error message, just quit.
@ -532,9 +519,7 @@ public final class RequestEvaluator implements Runnable {
} }
} else { } else {
// error in error action. use traditional minimal error message // error in error action. use traditional minimal error message
res.write("<b>Error in application '" + res.writeErrorReport(app.getName(), error);
app.getName() + "':</b> <br><br><pre>" +
error + "</pre>");
done = true; done = true;
} }
} }
@ -543,16 +528,10 @@ public final class RequestEvaluator implements Runnable {
// exit execution context // exit execution context
scriptingEngine.exitContext(); scriptingEngine.exitContext();
// make sure there is only one thread running per instance of this class
// if localrtx != rtx, the current thread has been aborted and there's no need to notify
if (localrtx != rtx) {
localrtx.closeConnections();
return;
}
notifyAndWait(); notifyAndWait();
} while (localrtx == rtx); } while (localrtx == rtx);
} finally { } finally {
localrtx.closeConnections(); localrtx.closeConnections();
} }
@ -565,36 +544,53 @@ public final class RequestEvaluator implements Runnable {
Transactor localrtx = (Transactor) Thread.currentThread(); Transactor localrtx = (Transactor) Thread.currentThread();
if (localrtx == rtx) { if (localrtx == rtx) {
reqtype = NONE;
localrtx.commit(); localrtx.commit();
} else { } else {
throw new TimeoutException(); throw new TimeoutException();
} }
} }
synchronized void abortTransaction(boolean retry) { synchronized void abortTransaction() {
Transactor localrtx = (Transactor) Thread.currentThread(); Transactor localrtx = (Transactor) Thread.currentThread();
if (!retry && (localrtx == rtx)) {
reqtype = NONE;
}
try { try {
localrtx.abort(); localrtx.abort();
} catch (Exception ignore) { } catch (Exception ignore) {
} }
} }
private synchronized void startTransactor() {
if (!app.isRunning()) {
throw new ApplicationStoppedException();
}
if ((rtx == null) || !rtx.isAlive()) {
// app.logEvent ("Starting Thread");
rtx = new Transactor(this, app.threadgroup, app.nmgr);
rtx.setContextClassLoader(app.getClassLoader());
rtx.start();
} else {
notifyAll();
}
}
/** /**
* Tell waiting thread that we're done, then wait for next request * Tell waiting thread that we're done, then wait for next request
*/ */
synchronized void notifyAndWait() { synchronized void notifyAndWait() {
Transactor localrtx = (Transactor) Thread.currentThread(); Transactor localrtx = (Transactor) Thread.currentThread();
if (reqtype != NONE) { // make sure there is only one thread running per instance of this class
return; // is there a new request already? // if localrtx != rtx, the current thread has been aborted and there's no need to notify
if (localrtx != rtx) {
// A new request came in while we were finishing the last one.
// Return to run() to get the work done.
localrtx.closeConnections();
return;
} }
reqtype = NONE;
notifyAll(); notifyAll();
try { try {
@ -609,7 +605,41 @@ public final class RequestEvaluator implements Runnable {
// scriptingEngine = null; // scriptingEngine = null;
rtx = null; rtx = null;
} }
} catch (InterruptedException ir) { } catch (InterruptedException ix) {
}
}
/**
* Stop this request evaluator's current thread. This is called by the
* waiting thread when it times out and stops waiting, or from an outside
* thread. If currently active kill the request, otherwise just notify.
*/
public synchronized void stopTransactor() {
Transactor t = rtx;
rtx = null;
if (t != null && t.isActive()) {
// let the scripting engine know that the
// current transaction is being aborted.
if (scriptingEngine != null) {
scriptingEngine.abort();
}
app.logEvent("Killing Thread " + t);
reqtype = NONE;
t.kill();
try {
t.abort();
} catch (Exception ignore) {
}
t.closeConnections();
notifyAll();
} }
} }
@ -629,40 +659,45 @@ public final class RequestEvaluator implements Runnable {
app.activeRequests.put(req, this); app.activeRequests.put(req, this);
checkThread(); startTransactor();
wait(app.requestTimeout); wait(app.requestTimeout);
if (reqtype != NONE) { if (reqtype != NONE) {
app.logEvent("Stopping Thread for Request " + app.getName() + "/" + req.path); app.logEvent("Stopping Thread for Request " + app.getName() + "/" + req.path);
stopThread(); stopTransactor();
res.reset(); res.reset();
res.write("<b>Error in application '" + app.getName() + res.writeErrorReport(app.getName(), "Request timed out");
"':</b> <br><br><pre>Request timed out.</pre>");
} }
return res; return res;
} }
/** /**
* This checks if the Evaluator is already executing an equal request. If so, attach to it and * This checks if the Evaluator is already executing an equal request.
* wait for it to complete. Otherwise return null, so the application knows it has to run the request. * If so, attach to it and wait for it to complete. Otherwise return null,
* so the application knows it has to run the request.
*/ */
public synchronized ResponseTrans attachRequest(RequestTrans req) public synchronized ResponseTrans attachHttpRequest(RequestTrans req)
throws InterruptedException { throws Exception {
if ((this.req == null) || (res == null) || !this.req.equals(req)) { // Get a reference to the res object at the time we enter
ResponseTrans localRes = res;
if ((localRes == null) || !req.equals(this.req)) {
return null; return null;
} }
// we already know our response object
ResponseTrans r = res;
if (reqtype != NONE) { if (reqtype != NONE) {
wait(app.requestTimeout); wait(app.requestTimeout);
} }
return r; return localRes;
} }
/*
* TODO invokeXmlRpc(), invokeExternal() and invokeInternal() are basically the same
* and should be unified
*/
/** /**
* *
* *
@ -675,15 +710,16 @@ public final class RequestEvaluator implements Runnable {
*/ */
public synchronized Object invokeXmlRpc(String functionName, Object[] args) public synchronized Object invokeXmlRpc(String functionName, Object[] args)
throws Exception { throws Exception {
initObjects(XMLRPC, RequestTrans.XMLRPC); initObjects(functionName, XMLRPC, RequestTrans.XMLRPC);
this.functionName = functionName; this.functionName = functionName;
this.args = args; this.args = args;
checkThread(); startTransactor();
wait(app.requestTimeout); wait(app.requestTimeout);
if (reqtype != NONE) { if (reqtype != NONE) {
stopThread(); stopTransactor();
exception = new RuntimeException("Request timed out");
} }
// reset res for garbage collection (res.data may hold reference to evaluator) // reset res for garbage collection (res.data may hold reference to evaluator)
@ -710,15 +746,16 @@ public final class RequestEvaluator implements Runnable {
*/ */
public synchronized Object invokeExternal(String functionName, Object[] args) public synchronized Object invokeExternal(String functionName, Object[] args)
throws Exception { throws Exception {
initObjects(EXTERNAL, RequestTrans.EXTERNAL); initObjects(functionName, EXTERNAL, RequestTrans.EXTERNAL);
this.functionName = functionName; this.functionName = functionName;
this.args = args; this.args = args;
checkThread(); startTransactor();
wait(); wait();
if (reqtype != NONE) { if (reqtype != NONE) {
stopThread(); stopTransactor();
exception = new RuntimeException("Request timed out");
} }
// reset res for garbage collection (res.data may hold reference to evaluator) // reset res for garbage collection (res.data may hold reference to evaluator)
@ -780,16 +817,17 @@ public final class RequestEvaluator implements Runnable {
public synchronized Object invokeInternal(Object object, String functionName, public synchronized Object invokeInternal(Object object, String functionName,
Object[] args, long timeout) Object[] args, long timeout)
throws Exception { throws Exception {
initObjects(INTERNAL, RequestTrans.INTERNAL); initObjects(functionName, INTERNAL, RequestTrans.INTERNAL);
thisObject = object; thisObject = object;
this.functionName = functionName; this.functionName = functionName;
this.args = args; this.args = args;
checkThread(); startTransactor();
wait(timeout); wait(timeout);
if (reqtype != NONE) { if (reqtype != NONE) {
stopThread(); stopTransactor();
exception = new RuntimeException("Request timed out");
} }
// reset res for garbage collection (res.data may hold reference to evaluator) // reset res for garbage collection (res.data may hold reference to evaluator)
@ -814,20 +852,23 @@ public final class RequestEvaluator implements Runnable {
this.reqtype = HTTP; this.reqtype = HTTP;
this.session = session; this.session = session;
res = new ResponseTrans(req); res = new ResponseTrans(req);
// result = null; result = null;
// exception = null; exception = null;
} }
/** /**
* Init this evaluator's objects for an internal, external or XML-RPC type * Init this evaluator's objects for an internal, external or XML-RPC type
* request. * request.
* *
* @param functionName
* @param reqtype * @param reqtype
* @param reqtypeName * @param reqtypeName
*/ */
private void initObjects(int reqtype, String reqtypeName) { private void initObjects(String functionName, int reqtype, String reqtypeName) {
this.functionName = functionName;
this.reqtype = reqtype; this.reqtype = reqtype;
this.req = new RequestTrans(reqtypeName); req = new RequestTrans(reqtypeName);
req.path = functionName;
session = new Session(functionName, app); session = new Session(functionName, app);
res = new ResponseTrans(req); res = new ResponseTrans(req);
result = null; result = null;
@ -875,54 +916,6 @@ public final class RequestEvaluator implements Runnable {
return null; return null;
} }
/**
* Stop this request evaluator's current thread.
* If currently active kill the request, otherwise just notify.
*/
public synchronized void stopThread() {
Transactor t = rtx;
// let the scripting engine know that the
// current transaction is being aborted.
if (scriptingEngine != null) {
scriptingEngine.abort();
}
rtx = null;
if (t != null) {
if (reqtype != NONE) {
app.logEvent("Killing Thread " + t);
reqtype = NONE;
t.kill();
try {
t.abort();
} catch (Exception ignore) {
}
} else {
notifyAll();
}
t.closeConnections();
}
}
private synchronized void checkThread() {
if (!app.isRunning()) {
throw new ApplicationStoppedException();
}
if ((rtx == null) || !rtx.isAlive()) {
// app.logEvent ("Starting Thread");
rtx = new Transactor(this, app.threadgroup, app.nmgr);
rtx.setContextClassLoader(app.getClassLoader());
rtx.start();
} else {
notifyAll();
}
}
/** /**
* Null out some fields, mostly for the sake of garbage collection. * Null out some fields, mostly for the sake of garbage collection.
*/ */