Introduce NodeChangeListener interface that can be used to get notifications

about node changes (inserts, updates, deletes) from the NodeManager.
Change Transactor to implement NodeChangeListener.
This commit is contained in:
hns 2004-10-22 12:57:15 +00:00
parent a95dcd16a4
commit 9d53d33ef9
4 changed files with 152 additions and 100 deletions

View file

@ -0,0 +1,29 @@
/*
* Helma License Notice
*
* The contents of this file are subject to the Helma License
* Version 2.0 (the "License"). You may not use this file except in
* compliance with the License. A copy of the License is available at
* http://adele.helma.org/download/helma/license.txt
*
* Copyright 1998-2003 Helma Software. All Rights Reserved.
*
* $RCSfile$
* $Author$
* $Revision$
* $Date$
*/
package helma.objectmodel.db;
import java.util.List;
public interface NodeChangeListener {
/**
* Called when a transaction is committed that has created, modified or
* deleted one or more nodes.
*/
public void nodesChanged(List inserted, List updated, List deleted);
}

View file

@ -36,12 +36,12 @@ public final class NodeManager {
protected Application app; protected Application app;
private ObjectCache cache; private ObjectCache cache;
private Replicator replicator;
protected IDatabase db; protected IDatabase db;
protected IDGenerator idgen; protected IDGenerator idgen;
private long idBaseValue = 1L; private long idBaseValue = 1L;
private boolean logSql; private boolean logSql;
protected boolean logReplication; protected boolean logReplication;
private ArrayList listeners = new ArrayList();
// a wrapper that catches some Exceptions while accessing this NM // a wrapper that catches some Exceptions while accessing this NM
public final WrappedNodeManager safe; public final WrappedNodeManager safe;
@ -72,10 +72,9 @@ public final class NodeManager {
app.logEvent("Setting up replication listener at " + replicationUrl); app.logEvent("Setting up replication listener at " + replicationUrl);
} }
replicator = new Replicator(this); Replicator replicator = new Replicator(this);
replicator.addUrl(replicationUrl); replicator.addUrl(replicationUrl);
} else { addNodeChangeListener(replicator);
replicator = null;
} }
// get the initial id generator value // get the initial id generator value
@ -1802,13 +1801,40 @@ public final class NodeManager {
} }
/** /**
* Get a replicator for this node cache. A replicator is used to transfer updates * Add a listener that is notified each time a transaction commits
* in this node manager to other node managers in remote servers via RMI. * that adds, modifies or deletes any Nodes.
*/ */
protected Replicator getReplicator() { public void addNodeChangeListener(NodeChangeListener listener) {
return replicator; listeners.add(listener);
} }
/**
* Remove a previously added NodeChangeListener.
*/
public void removeNodeChangeListener(NodeChangeListener listener) {
listeners.remove(listener);
}
/**
* Let transactors know if they should collect and fire NodeChangeListener
* events
*/
protected boolean hasNodeChangeListeners() {
return listeners.size() > 0;
}
/**
* Called by transactors after committing.
*/
protected void fireNodeChangeEvent(List inserted, List updated, List deleted) {
int l = listeners.size();
for (int i=0; i<l; i++) {
((NodeChangeListener) listeners.get(i)).nodesChanged(inserted, updated, deleted);
}
}
/** /**
* Receive notification from a remote app that objects in its cache have been * Receive notification from a remote app that objects in its cache have been
* modified. * modified.

View file

@ -18,11 +18,12 @@ package helma.objectmodel.db;
import java.rmi.Naming; import java.rmi.Naming;
import java.util.Vector; import java.util.Vector;
import java.util.List;
/** /**
* This class replicates the updates of transactions to other applications via RMI * This class replicates the updates of transactions to other applications via RMI
*/ */
public class Replicator implements Runnable { public class Replicator implements Runnable, NodeChangeListener {
Vector urls; Vector urls;
Vector add; Vector add;
Vector delete; Vector delete;
@ -99,32 +100,17 @@ public class Replicator implements Runnable {
} }
} }
/**
*
*
* @param n ...
*/
public synchronized void addNewNode(Node n) {
add.addElement(n);
}
/** /**
* * Called when a transaction is committed that has created, modified or
* * deleted one or more nodes.
* @param n ...
*/ */
public synchronized void addModifiedNode(Node n) { public synchronized void nodesChanged(List inserted, List updated, List deleted) {
add.addElement(n); add.addAll(inserted);
add.addAll(updated);
delete.addAll(deleted);
} }
/**
*
*
* @param n ...
*/
public synchronized void addDeletedNode(Node n) {
delete.addElement(n);
}
private synchronized boolean prepareReplication() { private synchronized boolean prepareReplication() {
if ((add.size() == 0) && (delete.size() == 0)) { if ((add.size() == 0) && (delete.size() == 0)) {

View file

@ -20,9 +20,7 @@ import helma.objectmodel.DatabaseException;
import helma.objectmodel.ITransaction; import helma.objectmodel.ITransaction;
import java.sql.Connection; import java.sql.Connection;
import java.util.HashMap; import java.util.*;
import java.util.HashSet;
import java.util.Iterator;
/** /**
* A subclass of thread that keeps track of changed nodes and triggers * A subclass of thread that keeps track of changed nodes and triggers
@ -227,102 +225,115 @@ public class Transactor extends Thread {
int updated = 0; int updated = 0;
int deleted = 0; int deleted = 0;
Object[] dirty = dirtyNodes.values().toArray(); if (!dirtyNodes.isEmpty()) {
Object[] dirty = dirtyNodes.values().toArray();
// the replicator to send update notifications to, if defined ArrayList insertedNodes = new ArrayList();
Replicator replicator = nmgr.getReplicator(); ArrayList updatedNodes = new ArrayList();
// the set to collect DbMappings to be marked as changed ArrayList deletedNodes = new ArrayList();
HashSet dirtyDbMappings = new HashSet(); // if nodemanager has listeners collect dirty nodes
boolean hasListeners = nmgr.hasNodeChangeListeners();
for (int i = 0; i < dirty.length; i++) { // the set to collect DbMappings to be marked as changed
Node node = (Node) dirty[i]; HashSet dirtyDbMappings = new HashSet();
// update nodes in db for (int i = 0; i < dirty.length; i++) {
int nstate = node.getState(); Node node = (Node) dirty[i];
if (nstate == Node.NEW) { // update nodes in db
nmgr.insertNode(nmgr.db, txn, node); int nstate = node.getState();
dirtyDbMappings.add(node.getDbMapping());
node.setState(Node.CLEAN);
// register node with nodemanager cache if (nstate == Node.NEW) {
nmgr.registerNode(node); nmgr.insertNode(nmgr.db, txn, node);
if (replicator != null) {
replicator.addNewNode(node);
}
inserted++;
nmgr.app.logEvent("inserted: Node " + node.getPrototype() + "/" +
node.getID());
} else if (nstate == Node.MODIFIED) {
// only mark DbMapping as dirty if updateNode returns true
if (nmgr.updateNode(nmgr.db, txn, node)) {
dirtyDbMappings.add(node.getDbMapping()); dirtyDbMappings.add(node.getDbMapping());
} node.setState(Node.CLEAN);
node.setState(Node.CLEAN);
// update node with nodemanager cache // register node with nodemanager cache
nmgr.registerNode(node); nmgr.registerNode(node);
if (replicator != null) { if (hasListeners) {
replicator.addModifiedNode(node); insertedNodes.add(node);
} }
updated++; inserted++;
nmgr.app.logEvent("updated: Node " + node.getPrototype() + "/" + nmgr.app.logEvent("inserted: Node " + node.getPrototype() + "/" +
node.getID()); node.getID());
} else if (nstate == Node.DELETED) { } else if (nstate == Node.MODIFIED) {
nmgr.deleteNode(nmgr.db, txn, node); // only mark DbMapping as dirty if updateNode returns true
dirtyDbMappings.add(node.getDbMapping()); if (nmgr.updateNode(nmgr.db, txn, node)) {
dirtyDbMappings.add(node.getDbMapping());
}
node.setState(Node.CLEAN);
// remove node from nodemanager cache // update node with nodemanager cache
nmgr.evictNode(node); nmgr.registerNode(node);
if (replicator != null) { if (hasListeners) {
replicator.addDeletedNode(node); updatedNodes.add(node);
}
updated++;
nmgr.app.logEvent("updated: Node " + node.getPrototype() + "/" +
node.getID());
} else if (nstate == Node.DELETED) {
nmgr.deleteNode(nmgr.db, txn, node);
dirtyDbMappings.add(node.getDbMapping());
// remove node from nodemanager cache
nmgr.evictNode(node);
if (hasListeners) {
deletedNodes.add(node);
}
deleted++;
} }
deleted++; node.clearWriteLock();
} }
node.clearWriteLock(); // set last data change times in db-mappings
long now = System.currentTimeMillis();
for (Iterator i = dirtyDbMappings.iterator(); i.hasNext(); ) {
DbMapping dbm = (DbMapping) i.next();
if (dbm != null) {
dbm.setLastDataChange(now);
}
}
// save the id-generator for the embedded db, if necessary
if (nmgr.idgen.dirty) {
nmgr.db.saveIDGenerator(txn, nmgr.idgen);
nmgr.idgen.dirty = false;
}
if (hasListeners) {
nmgr.fireNodeChangeEvent(insertedNodes, updatedNodes, deletedNodes);
}
} }
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
// set last data change times in db-mappings if (!parentNodes.isEmpty()) {
for (Iterator i = dirtyDbMappings.iterator(); i.hasNext(); ) { // set last subnode change times in parent nodes
DbMapping dbm = (DbMapping) i.next(); for (Iterator i = parentNodes.iterator(); i.hasNext(); ) {
if (dbm != null) { Node node = (Node) i.next();
dbm.setLastDataChange(now); node.setLastSubnodeChange(now);
} }
} }
// set last subnode change times in parent nodes
for (Iterator i = parentNodes.iterator(); i.hasNext(); ) {
Node node = (Node) i.next();
node.setLastSubnodeChange(now);
}
// clear the node collections // clear the node collections
dirtyNodes.clear(); dirtyNodes.clear();
cleanNodes.clear(); cleanNodes.clear();
parentNodes.clear(); parentNodes.clear();
// save the id-generator for the embedded db, if necessary
if (nmgr.idgen.dirty) {
nmgr.db.saveIDGenerator(txn, nmgr.idgen);
nmgr.idgen.dirty = false;
}
if (active) { if (active) {
active = false; active = false;
nmgr.db.commitTransaction(txn); nmgr.db.commitTransaction(txn);
txn = null; txn = null;
} }
nmgr.app.logAccess(tname + " " + dirty.length + " marked, " + inserted + nmgr.app.logAccess(tname + " " + inserted +
" inserted, " + updated + " inserted, " + updated +
" updated, " + deleted + " deleted in " + " updated, " + deleted + " deleted in " +
(now - tstart) + " millis"); (now - tstart) + " millis");