diff --git a/src/helma/xmlrpc/XmlRpcClient.java b/src/helma/xmlrpc/XmlRpcClient.java index fdaf41f4..c6fcaa80 100644 --- a/src/helma/xmlrpc/XmlRpcClient.java +++ b/src/helma/xmlrpc/XmlRpcClient.java @@ -19,8 +19,17 @@ public class XmlRpcClient implements XmlRpcHandler { URL url; String auth; + int maxThreads = 50; - /** + Stack pool = new Stack (); + int workers = 0; + + long calls = 0; + int roundtrip = 1000; + + CallData first, last; + + /** * Construct a XML-RPC client with this URL. */ public XmlRpcClient (URL url) { @@ -71,11 +80,15 @@ public class XmlRpcClient implements XmlRpcHandler { */ public Object execute (String method, Vector params) throws XmlRpcException, IOException { Worker worker = getWorker (); + long start = System.currentTimeMillis (); try { Object retval = worker.execute (method, params); + long end = System.currentTimeMillis (); + calls++; + roundtrip = (int) ((roundtrip*4)+(end-start))/5; return retval; } finally { - worker.reset (); + releaseWorker (worker); } } @@ -86,77 +99,117 @@ public class XmlRpcClient implements XmlRpcHandler { * */ public void executeAsync (String method, Vector params, AsyncCallback callback) { + if (workers > 2) { + enqueue (method, params, callback); + return; + } Worker worker = null; try { worker = getWorker (); worker.executeAsync (method, params, callback); } catch (IOException iox) { - if (callback != null) - callback.handleError (iox, url, method); + // make a queued worker that doesn't run immediately + enqueue (method, params, callback); } } - Stack pool = new Stack (); - int workers = 0; - private final Worker getWorker () throws IOException { + synchronized Worker getWorker () throws IOException { try { - return (Worker) pool.pop (); + Worker w = (Worker) pool.pop (); + workers += 1; + return w; } catch (EmptyStackException x) { - if (workers < 100) { + if (workers < maxThreads) { workers += 1; return new Worker (); } throw new IOException ("XML-RPC System overload"); } } - + + /** + * Release possibly big per-call object references to allow them to be garbage collected + */ + synchronized void releaseWorker (Worker w) { + w.result = null; + w.call = null; + if (pool.size() < 20 && !w.fault) + pool.push (w); + workers -= 1; + } + + + synchronized void enqueue (String method, Vector params, AsyncCallback callback) { + CallData call = new CallData (method, params, callback); + if (last == null) + first = last = call; + else { + last.next = call; + last = call; + } + } + + synchronized CallData dequeue () { + if (first == null) + return null; + // if (workers > 2 && workers*4 > roundtrip) + // return null; + CallData call = first; + if (first == last) + first = last = null; + else + first = first.next; + return call; + } class Worker extends XmlRpc implements Runnable { - String method; - Vector params; boolean fault; Object result = null; StringBuffer strbuf; - AsyncCallback callback = null; - public Worker () throws IOException { + CallData call; + + public Worker () { super (); } - public Object execute (String method, Vector params) throws XmlRpcException, IOException { - this.method = method; - this.params = params; - return executeCall (); - } public void executeAsync (String method, Vector params, AsyncCallback callback) { - this.method = method; - this.params = params; - this.callback = callback; + this.call = new CallData (method, params, callback); Thread t = new Thread (this); t.start (); } public void run () { + while (call != null) { + runAsync (call.method, call.params, call.callback); +// System.err.println (workers+" ---- "+ roundtrip); + call = dequeue (); + // if (call != null) System.err.print ("."); + } + releaseWorker (this); + } + + void runAsync (String method, Vector params, AsyncCallback callback) { Object res = null; + long start = System.currentTimeMillis (); try { - res = executeCall (); + res = execute (method, params); // notify callback object if (callback != null) callback.handleResult (res, url, method); } catch (Exception x) { if (callback != null) callback.handleError (x, url, method); - } finally { - reset (); } - System.err.println ("GOT: "+result); + calls++; + long end = System.currentTimeMillis (); + roundtrip = (int) ((roundtrip*4)+(end-start))/5; } - - private Object executeCall () throws XmlRpcException, IOException { + Object execute (String method, Vector params) throws XmlRpcException, IOException { fault = false; long now = System.currentTimeMillis (); try { @@ -246,23 +299,30 @@ public class XmlRpcClient implements XmlRpcHandler { super.startElement (name, atts); } - /** - * Release possibly big per-call object references to allow them to be garbage collected - */ - public void reset () { - result = null; - params = null; - callback = null; - if (workers < 20 && !fault) - pool.push (this); - else - workers -= 1; - - } } // end of inner class Worker - + + class CallData { + + String method; + Vector params; + AsyncCallback callback; + CallData next; + + /** + * Make a call to be queued and then executed by the next free async thread + */ + public CallData (String method, Vector params, AsyncCallback callback) { + this.method = method; + this.params = params; + this.callback = callback; + this.next = null; + } + + } + + /** * Just for testing. */ @@ -279,7 +339,7 @@ public class XmlRpcClient implements XmlRpcHandler { } XmlRpcClient client = new XmlRpcClientLite (url); try { - /* System.err.println (*/client.executeAsync (method, v, null); + System.err.println (client.execute (method, v)); } catch (Exception ex) { System.err.println ("Error: "+ex.getMessage()); }