/* * The contents of this file are subject to the BT "ZEUS" Open Source * Licence (L77741), Version 1.0 (the "Licence"); you may not use this file * except in compliance with the Licence. You may obtain a copy of the Licence * from $ZEUS_INSTALL/licence.html or alternatively from * http://www.labs.bt.com/projects/agents/zeus/licence.htm * * Except as stated in Clause 7 of the Licence, software distributed under the * Licence is distributed WITHOUT WARRANTY OF ANY KIND, either express or * implied. See the Licence for the specific language governing rights and * limitations under the Licence. * * The Original Code is within the package zeus.*. * The Initial Developer of the Original Code is British Telecommunications * public limited company, whose registered office is at 81 Newgate Street, * London, EC1A 7AJ, England. Portions created by British Telecommunications * public limited company are Copyright 1996-9. All Rights Reserved. * * THIS NOTICE MUST BE INCLUDED ON ANY COPY OF THIS FILE */ /* * @(#)PostMan.java 1.00 */ package zeus.actors; import java.net.*; import java.io.*; import java.util.*; import zeus.util.*; import zeus.concepts.*; //added for secure socket support 5/10/2001 import javax.net.ssl.*; /** * This component is part of the {@link MailBox}, and is responsible for * dispatching messages on demand to their recipients. This component * operates within its own thread enabling the MailBox to send and receive * multiple messages simultaneously.* * It is unlikely that developers will need to call these methods directly. * Although if the user wants to replace the default TCP/IP messaging mechanism * this could be cleanly achieved by reimplementing the methods of this class. */ public class PostMan extends Thread { protected final int MAX_RETRY = 10; /** Data structure holding messages pending dispatch */ protected Queue outMail = null; /** Data structure holding CC'ed messages pending dispatch to Visualisers */ protected Queue ccMail = null; /** Reference to MailBox of which this is a sub-component */ protected MailBox mbox = null; protected Address myAddress = null; protected boolean dispatching; protected Hashtable waitQueue = new Hashtable(); public PostMan() { ; } public PostMan(MailBox mbox, Queue outMail, Address myAddress) { this(mbox,outMail,null,myAddress); } public PostMan(MailBox mbox, Queue outMail, Queue ccMail, Address myAddress) { this.mbox = mbox; this.outMail = outMail; this.ccMail = ccMail; this.myAddress = myAddress; if ( ccMail == null ) this.setPriority(Thread.NORM_PRIORITY-2); this.start(); } public void stopDispatching() { dispatching = false; } public void lowerStatus() { this.setPriority(Thread.NORM_PRIORITY-2); } public void run() { Performative msg, query; String receiver; Address addr; dispatching = true; // 1.04? 31/00/08 sgt // lowerStatus(); // sgt while( dispatching ) { msg = (Performative) outMail.dequeue(); Enumeration allRec = msg.getReceivers(); if (allRec == null) { mbox.postErrorMsg(msg,"No reveivers specified"); } else { // 1.04 bug fix // I don't like this code, it is too complex, however... // I put some comments in and reformatted to try and make it // a bit clearer. // the fix is that while there are more elements in the receivers // part of the performative you must attempt to send a message to them // all while (allRec.hasMoreElements()) { //1 receiver = (String) allRec.nextElement(); if (( addr = mbox.lookup(receiver)) != null) { //2 if ( postMsg(msg,addr) ) { if ( ccMail != null ) mbox.informVisualisers(msg); } else { // The receiver cannot be contacted at the given // address - we assume the address is wrong, and // delete it from the address book. mbox.postErrorMsg(msg,"Cannot contact reciever"); // mbox.del(addr); } } else {//3 String key = mbox.addressSought(receiver); if ( key == null ) mbox.postErrorMsg(msg,"Cannot find address of receiver"); else {//4 Vector list = (Vector)waitQueue.get(key); if ( list == null ) { list = new Vector(); waitQueue.put(key,list); } list.addElement(msg); }// end else 4 }// end else 3 }//end if 2 try { sleep (10); } catch (Exception e) { e.printStackTrace(); } }// end while 1 // end fix // this is the code I removed. /* if ( (receiver = msg.getReceiver()) == null ) mbox.postErrorMsg(msg,"No receiver specified"); else { if ( (addr = mbox.lookup(receiver)) != null ) { // receiver's address found if ( postMsg(msg,addr) ) { if ( ccMail != null ) mbox.informVisualisers(msg); } else { // The receiver cannot be contacted at the given // address - we assume the address is wrong, and // delete it from the address book. mbox.postErrorMsg(msg,"Cannot contact reciever"); // mbox.del(addr); } }*/ // try sleep here yield(); } } public void addressReceived(String key) { synchronized( waitQueue ) { Vector list = (Vector)waitQueue.remove(key); for(int i = 0; list != null && i < list.size(); i++ ) outMail.enqueue(list.elementAt(i)); list = null; // GC } } public boolean postMsg( Performative msg, Address addr ) { PrintWriter out = null; boolean isOk = false; int nTry = 0; msg.setSender(myAddress.getName()); msg.setAddress(myAddress); while( !isOk && nTry++ < MAX_RETRY ) { try { //start of ssl modification SSLSocketFactory sslFact = (SSLSocketFactory)SSLSocketFactory.getDefault(); SSLSocket socket = (SSLSocket)sslFact.createSocket( addr.getHost(), addr.getPort() ); //end of ssl modification //Socket socket = new Socket( addr.getHost(), addr.getPort() ); //old code out = new PrintWriter( socket.getOutputStream(), true ); Time time; if ( (time = mbox.getAgentContext().currentTime()) != null ) msg.setSendTime(time); out.println( msg ); // Send msg out.flush(); // flush out-stream isOk = true; if ( (time = mbox.getAgentContext().currentTime()) != null ) msg.setReceiveTime(time); // tell mbox to notifyMonitors of delivery mbox.notifyMonitors(msg,MailBox.DISPATCH); } catch (IOException e) { Core.DEBUG(3,"IOException: " + e); // try { yield(); // sleep (1000); // } // catch(InterruptedException e1) { // } } finally { if (out != null) out.close(); } } return isOk; } }