Now implements an asynchronous client

This commit is contained in:
hns 2001-01-24 15:05:54 +00:00
parent 3cfea3a250
commit 7376855af9

View file

@ -19,6 +19,15 @@ public class XmlRpcClient implements XmlRpcHandler {
URL url;
String auth;
int maxThreads = 50;
Stack pool = new Stack ();
int workers = 0;
long calls = 0;
int roundtrip = 1000;
CallData first, last;
/**
* Construct a XML-RPC client with this URL.
@ -71,11 +80,15 @@ public class XmlRpcClient implements XmlRpcHandler {
*/
public Object execute (String method, Vector params) throws XmlRpcException, IOException {
Worker worker = getWorker ();
long start = System.currentTimeMillis ();
try {
Object retval = worker.execute (method, params);
long end = System.currentTimeMillis ();
calls++;
roundtrip = (int) ((roundtrip*4)+(end-start))/5;
return retval;
} finally {
worker.reset ();
releaseWorker (worker);
}
}
@ -86,24 +99,28 @@ public class XmlRpcClient implements XmlRpcHandler {
*
*/
public void executeAsync (String method, Vector params, AsyncCallback callback) {
if (workers > 2) {
enqueue (method, params, callback);
return;
}
Worker worker = null;
try {
worker = getWorker ();
worker.executeAsync (method, params, callback);
} catch (IOException iox) {
if (callback != null)
callback.handleError (iox, url, method);
// make a queued worker that doesn't run immediately
enqueue (method, params, callback);
}
}
Stack pool = new Stack ();
int workers = 0;
private final Worker getWorker () throws IOException {
synchronized Worker getWorker () throws IOException {
try {
return (Worker) pool.pop ();
Worker w = (Worker) pool.pop ();
workers += 1;
return w;
} catch (EmptyStackException x) {
if (workers < 100) {
if (workers < maxThreads) {
workers += 1;
return new Worker ();
}
@ -111,52 +128,88 @@ public class XmlRpcClient implements XmlRpcHandler {
}
}
/**
* Release possibly big per-call object references to allow them to be garbage collected
*/
synchronized void releaseWorker (Worker w) {
w.result = null;
w.call = null;
if (pool.size() < 20 && !w.fault)
pool.push (w);
workers -= 1;
}
synchronized void enqueue (String method, Vector params, AsyncCallback callback) {
CallData call = new CallData (method, params, callback);
if (last == null)
first = last = call;
else {
last.next = call;
last = call;
}
}
synchronized CallData dequeue () {
if (first == null)
return null;
// if (workers > 2 && workers*4 > roundtrip)
// return null;
CallData call = first;
if (first == last)
first = last = null;
else
first = first.next;
return call;
}
class Worker extends XmlRpc implements Runnable {
String method;
Vector params;
boolean fault;
Object result = null;
StringBuffer strbuf;
AsyncCallback callback = null;
public Worker () throws IOException {
CallData call;
public Worker () {
super ();
}
public Object execute (String method, Vector params) throws XmlRpcException, IOException {
this.method = method;
this.params = params;
return executeCall ();
}
public void executeAsync (String method, Vector params, AsyncCallback callback) {
this.method = method;
this.params = params;
this.callback = callback;
this.call = new CallData (method, params, callback);
Thread t = new Thread (this);
t.start ();
}
public void run () {
while (call != null) {
runAsync (call.method, call.params, call.callback);
// System.err.println (workers+" ---- "+ roundtrip);
call = dequeue ();
// if (call != null) System.err.print (".");
}
releaseWorker (this);
}
void runAsync (String method, Vector params, AsyncCallback callback) {
Object res = null;
long start = System.currentTimeMillis ();
try {
res = executeCall ();
res = execute (method, params);
// notify callback object
if (callback != null)
callback.handleResult (res, url, method);
} catch (Exception x) {
if (callback != null)
callback.handleError (x, url, method);
} finally {
reset ();
}
System.err.println ("GOT: "+result);
calls++;
long end = System.currentTimeMillis ();
roundtrip = (int) ((roundtrip*4)+(end-start))/5;
}
private Object executeCall () throws XmlRpcException, IOException {
Object execute (String method, Vector params) throws XmlRpcException, IOException {
fault = false;
long now = System.currentTimeMillis ();
try {
@ -246,23 +299,30 @@ public class XmlRpcClient implements XmlRpcHandler {
super.startElement (name, atts);
}
/**
* Release possibly big per-call object references to allow them to be garbage collected
*/
public void reset () {
result = null;
params = null;
callback = null;
if (workers < 20 && !fault)
pool.push (this);
else
workers -= 1;
}
} // end of inner class Worker
class CallData {
String method;
Vector params;
AsyncCallback callback;
CallData next;
/**
* Make a call to be queued and then executed by the next free async thread
*/
public CallData (String method, Vector params, AsyncCallback callback) {
this.method = method;
this.params = params;
this.callback = callback;
this.next = null;
}
}
/**
* Just for testing.
*/
@ -279,7 +339,7 @@ public class XmlRpcClient implements XmlRpcHandler {
}
XmlRpcClient client = new XmlRpcClientLite (url);
try {
/* System.err.println (*/client.executeAsync (method, v, null);
System.err.println (client.execute (method, v));
} catch (Exception ex) {
System.err.println ("Error: "+ex.getMessage());
}