cleaned up request handling
This commit is contained in:
parent
c94f3e5a6b
commit
9a1f750707
1 changed files with 364 additions and 318 deletions
|
@ -121,28 +121,25 @@ public class RequestEvaluator implements Runnable {
|
|||
}
|
||||
|
||||
|
||||
/* private void startThread () {
|
||||
rtx = new Transactor (this, app.nmgr);
|
||||
evaluator.thread = rtx;
|
||||
rtx.start ();
|
||||
// yield to make sure the transactor thread reaches waiting status before the first
|
||||
// invocation request comes in
|
||||
Thread.yield ();
|
||||
} */
|
||||
|
||||
public void run () {
|
||||
|
||||
int txcount = 0;
|
||||
// first, set a local variable to the current transactor thread so we know
|
||||
// when it's time to quit because another thread took over.
|
||||
Transactor localrtx = (Transactor) Thread.currentThread ();
|
||||
|
||||
// evaluators are only initialized as needed, so we need to check that here
|
||||
if (!initialized)
|
||||
app.typemgr.initRequestEvaluator (this);
|
||||
|
||||
try {
|
||||
do {
|
||||
|
||||
// make sure there is only one thread running per instance of this class
|
||||
Transactor tx = rtx;
|
||||
if (Thread.currentThread () != tx)
|
||||
if (localrtx != rtx) {
|
||||
localrtx.closeConnections ();
|
||||
return;
|
||||
}
|
||||
|
||||
// IServer.getLogger().log ("got request "+reqtype);
|
||||
|
||||
|
@ -160,9 +157,9 @@ public class RequestEvaluator implements Runnable {
|
|||
|
||||
String requestPath = app.getName()+"/"+req.path;
|
||||
// set Timer to get some profiling data
|
||||
tx.timer.reset ();
|
||||
tx.timer.beginEvent (requestPath+" init");
|
||||
tx.begin (requestPath);
|
||||
localrtx.timer.reset ();
|
||||
localrtx.timer.beginEvent (requestPath+" init");
|
||||
localrtx.begin (requestPath);
|
||||
|
||||
Action action = null;
|
||||
|
||||
|
@ -192,7 +189,6 @@ public class RequestEvaluator implements Runnable {
|
|||
} else {
|
||||
|
||||
// march down request path...
|
||||
|
||||
// is the next path element a subnode or a property of the last one?
|
||||
// currently only used for users node
|
||||
boolean isProperty = false;
|
||||
|
@ -277,51 +273,56 @@ public class RequestEvaluator implements Runnable {
|
|||
current = getNodeWrapper (root);
|
||||
}
|
||||
|
||||
tx.timer.endEvent (requestPath+" init");
|
||||
localrtx.timer.endEvent (requestPath+" init");
|
||||
|
||||
try {
|
||||
tx.timer.beginEvent (requestPath+" execute");
|
||||
localrtx.timer.beginEvent (requestPath+" execute");
|
||||
current.doIndirectCall (evaluator, current, action.getFunctionName (), new ESValue[0]);
|
||||
tx.timer.endEvent (requestPath+" execute");
|
||||
localrtx.timer.endEvent (requestPath+" execute");
|
||||
done = true;
|
||||
} catch (RedirectException redirect) {
|
||||
res.redirect = redirect.getMessage ();
|
||||
done = true;
|
||||
}
|
||||
|
||||
// check if we're still on track
|
||||
if (tx == rtx)
|
||||
tx.commit ();
|
||||
else
|
||||
throw new TimeoutException ();
|
||||
// check if we're still the one and only or if the waiting thread has given up on us already
|
||||
commitTransaction ();
|
||||
done = true;
|
||||
|
||||
} catch (ConcurrencyException x) {
|
||||
try { tx.abort (); } catch (Exception ignore) {}
|
||||
|
||||
res.reset ();
|
||||
if (++tries < 8) {
|
||||
if (++tries < 10) {
|
||||
// try again after waiting some period
|
||||
abortTransaction (true);
|
||||
try {
|
||||
// wait a bit longer with each try
|
||||
int base = 500 * tries;
|
||||
int base = 200 * tries;
|
||||
Thread.currentThread ().sleep ((long) (base + Math.random ()*base*2));
|
||||
} catch (Exception ignore) {}
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
abortTransaction (false);
|
||||
app.errorCount += 1;
|
||||
res.write ("<b>Error in application '"+app.getName()+"':</b> <br><br><pre>Couldn't complete transaction due to heavy object traffic (tried "+tries+" times).</pre>");
|
||||
done = true;
|
||||
}
|
||||
|
||||
} catch (FrameworkException x) {
|
||||
try { tx.abort (); } catch (Exception ignore) {}
|
||||
|
||||
abortTransaction (false);
|
||||
|
||||
app.errorCount += 1;
|
||||
res.reset ();
|
||||
res.write ("<b>Error in application '"+app.getName()+"':</b> <br><br><pre>" + x.getMessage () + "</pre>");
|
||||
if (app.debug)
|
||||
x.printStackTrace ();
|
||||
|
||||
done = true;
|
||||
|
||||
} catch (Exception x) {
|
||||
try { tx.abort (); } catch (Exception ignore) {}
|
||||
|
||||
abortTransaction (false);
|
||||
|
||||
app.errorCount += 1;
|
||||
System.err.println ("### Exception in "+app.getName()+"/"+req.path+": current = "+currentNode);
|
||||
System.err.println (x);
|
||||
|
@ -332,8 +333,8 @@ public class RequestEvaluator implements Runnable {
|
|||
|
||||
// If the transactor thread has been killed by the invoker thread we don't have to
|
||||
// bother for the error message, just quit.
|
||||
if (rtx != tx)
|
||||
return;
|
||||
if (localrtx != rtx)
|
||||
break;
|
||||
|
||||
res.reset ();
|
||||
res.write ("<b>Error in application '"+app.getName()+"':</b> <br><br><pre>" + x.getMessage () + "</pre>");
|
||||
|
@ -343,7 +344,7 @@ public class RequestEvaluator implements Runnable {
|
|||
break;
|
||||
case XMLRPC:
|
||||
try {
|
||||
tx.begin (app.getName()+":xmlrpc/"+method);
|
||||
localrtx.begin (app.getName()+":xmlrpc/"+method);
|
||||
|
||||
root = app.getDataRoot ();
|
||||
|
||||
|
@ -383,14 +384,17 @@ public class RequestEvaluator implements Runnable {
|
|||
}
|
||||
|
||||
result = FesiRpcUtil.convertE2J (current.doIndirectCall (evaluator, current, method, esa));
|
||||
tx.commit ();
|
||||
commitTransaction ();
|
||||
|
||||
} catch (Exception wrong) {
|
||||
try { tx.abort (); } catch (Exception ignore) {}
|
||||
|
||||
abortTransaction (false);
|
||||
|
||||
// If the transactor thread has been killed by the invoker thread we don't have to
|
||||
// bother for the error message, just quit.
|
||||
if (evaluator.thread != Thread.currentThread())
|
||||
if (localrtx != rtx) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.exception = wrong;
|
||||
}
|
||||
|
@ -401,7 +405,7 @@ public class RequestEvaluator implements Runnable {
|
|||
// Just a human readable descriptor of this invocation
|
||||
String funcdesc = app.getName()+":internal/"+method;
|
||||
try {
|
||||
tx.begin (funcdesc);
|
||||
localrtx.begin (funcdesc);
|
||||
|
||||
root = app.getDataRoot ();
|
||||
|
||||
|
@ -422,14 +426,17 @@ public class RequestEvaluator implements Runnable {
|
|||
}
|
||||
}
|
||||
esresult = current.doIndirectCall (evaluator, current, method, new ESValue[0]);
|
||||
tx.commit ();
|
||||
commitTransaction ();
|
||||
|
||||
} catch (Throwable wrong) {
|
||||
try { tx.abort (); } catch (Exception ignore) {}
|
||||
|
||||
abortTransaction (false);
|
||||
|
||||
// If the transactor thread has been killed by the invoker thread we don't have to
|
||||
// bother for the error message, just quit.
|
||||
if (evaluator.thread != Thread.currentThread())
|
||||
if (localrtx != rtx) {
|
||||
return;
|
||||
}
|
||||
|
||||
String msg = wrong.getMessage ();
|
||||
if (msg == null || msg.length () == 0)
|
||||
|
@ -440,26 +447,60 @@ public class RequestEvaluator implements Runnable {
|
|||
break;
|
||||
|
||||
}
|
||||
reqtype = NONE;
|
||||
|
||||
// create a new Thread every 1000 requests. The current one will fade out
|
||||
if (txcount++ > 1000) {
|
||||
stopThread (); // stop thread - a new one will be created when needed
|
||||
// stop thread - a new one will be created when needed
|
||||
localrtx.closeConnections ();
|
||||
break;
|
||||
}
|
||||
|
||||
synchronized (this) {
|
||||
notifyAndWait ();
|
||||
|
||||
} while (localrtx == rtx);
|
||||
|
||||
} finally {
|
||||
localrtx.closeConnections ();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called by the transactor thread when it has successfully fulfilled a request.
|
||||
*/
|
||||
synchronized void commitTransaction () throws Exception {
|
||||
Transactor localrtx = (Transactor) Thread.currentThread ();
|
||||
if (localrtx == rtx) {
|
||||
reqtype = NONE;
|
||||
localrtx.commit ();
|
||||
} else {
|
||||
throw new TimeoutException ();
|
||||
}
|
||||
}
|
||||
|
||||
synchronized void abortTransaction (boolean retry) {
|
||||
Transactor localrtx = (Transactor) Thread.currentThread ();
|
||||
if (!retry && localrtx == rtx)
|
||||
reqtype = NONE;
|
||||
try {
|
||||
localrtx.abort ();
|
||||
} catch (Exception ignore) {}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tell waiting thread that we're done, then wait for next request
|
||||
*/
|
||||
synchronized void notifyAndWait () {
|
||||
Transactor localrtx = (Transactor) Thread.currentThread ();
|
||||
if (reqtype != NONE)
|
||||
return;
|
||||
notifyAll ();
|
||||
try {
|
||||
wait ();
|
||||
} catch (InterruptedException ir) {
|
||||
Thread.currentThread ().interrupt ();
|
||||
localrtx.closeConnections ();
|
||||
}
|
||||
}
|
||||
|
||||
} while (evaluator.thread == Thread.currentThread () && evaluator.thread == rtx);
|
||||
}
|
||||
|
||||
public synchronized ResponseTrans invoke (RequestTrans req, User user) throws Exception {
|
||||
this.reqtype = HTTP;
|
||||
this.req = req;
|
||||
|
@ -468,7 +509,7 @@ public class RequestEvaluator implements Runnable {
|
|||
|
||||
checkThread ();
|
||||
wait (app.requestTimeout);
|
||||
if (reqtype > 0) {
|
||||
if (reqtype != NONE) {
|
||||
IServer.getLogger().log ("Stopping Thread for Request "+app.getName()+"/"+req.path);
|
||||
stopThread ();
|
||||
res.reset ();
|
||||
|
@ -489,7 +530,7 @@ public class RequestEvaluator implements Runnable {
|
|||
|
||||
checkThread ();
|
||||
wait (app.requestTimeout);
|
||||
if (reqtype > 0) {
|
||||
if (reqtype != NONE) {
|
||||
stopThread ();
|
||||
}
|
||||
|
||||
|
@ -521,7 +562,7 @@ public class RequestEvaluator implements Runnable {
|
|||
checkThread ();
|
||||
wait (60000l*15); // give internal call more time (15 minutes) to complete
|
||||
|
||||
if (reqtype > 0) {
|
||||
if (reqtype != NONE) {
|
||||
stopThread ();
|
||||
}
|
||||
|
||||
|
@ -543,7 +584,7 @@ public class RequestEvaluator implements Runnable {
|
|||
checkThread ();
|
||||
wait (app.requestTimeout);
|
||||
|
||||
if (reqtype > 0) {
|
||||
if (reqtype != NONE) {
|
||||
stopThread ();
|
||||
}
|
||||
|
||||
|
@ -559,17 +600,20 @@ public class RequestEvaluator implements Runnable {
|
|||
*/
|
||||
public synchronized void stopThread () {
|
||||
// IServer.getLogger().log ("Stopping Thread");
|
||||
evaluator.thread = null;
|
||||
Transactor t = rtx;
|
||||
evaluator.thread = null;
|
||||
rtx = null;
|
||||
if (t != null) {
|
||||
if (reqtype != NONE) {
|
||||
try { t.abort (); } catch (Exception ignore) {}
|
||||
try {
|
||||
t.abort ();
|
||||
} catch (Exception ignore) {}
|
||||
t.kill ();
|
||||
reqtype = NONE;
|
||||
} else {
|
||||
notifyAll ();
|
||||
}
|
||||
t.cleanup ();
|
||||
t.closeConnections ();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -590,7 +634,9 @@ public class RequestEvaluator implements Runnable {
|
|||
* in those places when wrappers have to be updated if they already exist.
|
||||
*/
|
||||
public ESNode getNodeWrapperFromCache (INode n) {
|
||||
return n == null ? null : (ESNode) objectcache.get (n);
|
||||
if (n == null)
|
||||
return null;
|
||||
return (ESNode) objectcache.get (n);
|
||||
}
|
||||
|
||||
public ESNode getNodeWrapper (INode n) {
|
||||
|
|
Loading…
Add table
Reference in a new issue