PostMan.java

/*
* The contents of this file are subject to the BT "ZEUS" Open Source
* Licence (L77741), Version 1.0 (the "Licence"); you may not use this file
* except in compliance with the Licence. You may obtain a copy of the Licence
* from $ZEUS_INSTALL/licence.html or alternatively from
* http://www.labs.bt.com/projects/agents/zeus/licence.htm
*
* Except as stated in Clause 7 of the Licence, software distributed under the
* Licence is distributed WITHOUT WARRANTY OF ANY KIND, either express or
* implied. See the Licence for the specific language governing rights and
* limitations under the Licence.
*
* The Original Code is within the package zeus.*.
* The Initial Developer of the Original Code is British Telecommunications
* public limited company, whose registered office is at 81 Newgate Street,
* London, EC1A 7AJ, England. Portions created by British Telecommunications
* public limited company are Copyright 1996-9. All Rights Reserved.
*
* THIS NOTICE MUST BE INCLUDED ON ANY COPY OF THIS FILE
*/



/*
 * @(#)PostMan.java 1.00
 */

package zeus.actors;

import java.net.*;
import java.io.*;
import java.util.*;
import zeus.util.*;
import zeus.concepts.*;

//added for secure socket support 5/10/2001
import javax.net.ssl.*;
    
/**
 * This component is part of the {@link MailBox}, and is responsible for
 * dispatching messages on demand to their recipients. This component
 * operates within its own thread enabling the MailBox to send and receive
 * multiple messages simultaneously. 

* * It is unlikely that developers will need to call these methods directly. * Although if the user wants to replace the default TCP/IP messaging mechanism * this could be cleanly achieved by reimplementing the methods of this class. */ public class PostMan extends Thread { protected final int MAX_RETRY = 10; /** Data structure holding messages pending dispatch */ protected Queue outMail = null; /** Data structure holding CC'ed messages pending dispatch to Visualisers */ protected Queue ccMail = null; /** Reference to MailBox of which this is a sub-component */ protected MailBox mbox = null; protected Address myAddress = null; protected boolean dispatching; protected Hashtable waitQueue = new Hashtable(); public PostMan() { ; } public PostMan(MailBox mbox, Queue outMail, Address myAddress) { this(mbox,outMail,null,myAddress); } public PostMan(MailBox mbox, Queue outMail, Queue ccMail, Address myAddress) { this.mbox = mbox; this.outMail = outMail; this.ccMail = ccMail; this.myAddress = myAddress; if ( ccMail == null ) this.setPriority(Thread.NORM_PRIORITY-2); this.start(); } public void stopDispatching() { dispatching = false; } public void lowerStatus() { this.setPriority(Thread.NORM_PRIORITY-2); } public void run() { Performative msg, query; String receiver; Address addr; dispatching = true; // 1.04? 31/00/08 sgt // lowerStatus(); // sgt while( dispatching ) { msg = (Performative) outMail.dequeue(); Enumeration allRec = msg.getReceivers(); if (allRec == null) { mbox.postErrorMsg(msg,"No reveivers specified"); } else { // 1.04 bug fix // I don't like this code, it is too complex, however... // I put some comments in and reformatted to try and make it // a bit clearer. // the fix is that while there are more elements in the receivers // part of the performative you must attempt to send a message to them // all while (allRec.hasMoreElements()) { //1 receiver = (String) allRec.nextElement(); if (( addr = mbox.lookup(receiver)) != null) { //2 if ( postMsg(msg,addr) ) { if ( ccMail != null ) mbox.informVisualisers(msg); } else { // The receiver cannot be contacted at the given // address - we assume the address is wrong, and // delete it from the address book. mbox.postErrorMsg(msg,"Cannot contact reciever"); // mbox.del(addr); } } else {//3 String key = mbox.addressSought(receiver); if ( key == null ) mbox.postErrorMsg(msg,"Cannot find address of receiver"); else {//4 Vector list = (Vector)waitQueue.get(key); if ( list == null ) { list = new Vector(); waitQueue.put(key,list); } list.addElement(msg); }// end else 4 }// end else 3 }//end if 2 try { sleep (10); } catch (Exception e) { e.printStackTrace(); } }// end while 1 // end fix // this is the code I removed. /* if ( (receiver = msg.getReceiver()) == null ) mbox.postErrorMsg(msg,"No receiver specified"); else { if ( (addr = mbox.lookup(receiver)) != null ) { // receiver's address found if ( postMsg(msg,addr) ) { if ( ccMail != null ) mbox.informVisualisers(msg); } else { // The receiver cannot be contacted at the given // address - we assume the address is wrong, and // delete it from the address book. mbox.postErrorMsg(msg,"Cannot contact reciever"); // mbox.del(addr); } }*/ // try sleep here yield(); } } public void addressReceived(String key) { synchronized( waitQueue ) { Vector list = (Vector)waitQueue.remove(key); for(int i = 0; list != null && i < list.size(); i++ ) outMail.enqueue(list.elementAt(i)); list = null; // GC } } public boolean postMsg( Performative msg, Address addr ) { PrintWriter out = null; boolean isOk = false; int nTry = 0; msg.setSender(myAddress.getName()); msg.setAddress(myAddress); while( !isOk && nTry++ < MAX_RETRY ) { try { //start of ssl modification SSLSocketFactory sslFact = (SSLSocketFactory)SSLSocketFactory.getDefault(); SSLSocket socket = (SSLSocket)sslFact.createSocket( addr.getHost(), addr.getPort() ); //end of ssl modification //Socket socket = new Socket( addr.getHost(), addr.getPort() ); //old code out = new PrintWriter( socket.getOutputStream(), true ); Time time; if ( (time = mbox.getAgentContext().currentTime()) != null ) msg.setSendTime(time); out.println( msg ); // Send msg out.flush(); // flush out-stream isOk = true; if ( (time = mbox.getAgentContext().currentTime()) != null ) msg.setReceiveTime(time); // tell mbox to notifyMonitors of delivery mbox.notifyMonitors(msg,MailBox.DISPATCH); } catch (IOException e) { Core.DEBUG(3,"IOException: " + e); // try { yield(); // sleep (1000); // } // catch(InterruptedException e1) { // } } finally { if (out != null) out.close(); } } return isOk; } }


Last modified: Sun Oct 31 21:03:42 PST 2004