Server.java

/*
* The contents of this file are subject to the BT "ZEUS" Open Source
* Licence (L77741), Version 1.0 (the "Licence"); you may not use this file
* except in compliance with the Licence. You may obtain a copy of the Licence
* from $ZEUS_INSTALL/licence.html or alternatively from
* http://www.labs.bt.com/projects/agents/zeus/licence.htm
*
* Except as stated in Clause 7 of the Licence, software distributed under the
* Licence is distributed WITHOUT WARRANTY OF ANY KIND, either express or
* implied. See the Licence for the specific language governing rights and
* limitations under the Licence.
*
* The Original Code is within the package zeus.*.
* The Initial Developer of the Original Code is British Telecommunications
* public limited company, whose registered office is at 81 Newgate Street,
* London, EC1A 7AJ, England. Portions created by British Telecommunications
* public limited company are Copyright 1996-9. All Rights Reserved.
*
* THIS NOTICE MUST BE INCLUDED ON ANY COPY OF THIS FILE
*/



/*
 * @(#)Server.java 1.03b
 */

package zeus.actors;

import java.net.*;
import java.io.*;
import java.util.*;
import zeus.util.*;
import zeus.concepts.*;


//added for secure socket support 5/10/2001
import java.security.KeyStore;
import javax.net.*;
import javax.net.ssl.*;
import javax.security.cert.X509Certificate;
import com.sun.net.ssl.*;

/**
 * This component is part of the {@link MailBox}, and is responsible for
 * reading incoming messages. This component operates within its own thread
 * enabling the MailBox to send and receive multiple messages simultaneously. 

* * It is unlikely that developers will need to call these methods directly. * Although if the user wants to replace the default TCP/IP messaging mechanism * this could be cleanly achieved by reimplementing the methods of this class. */ public class Server extends Thread { public static final int MAX_CONNECTIONS = 20; public static final int MAX_QUEUE_LENGTH = 50; // 50 public static final int DEFAULT_PORT_MIN = 6700;//6700 public static final int DEFAULT_PORT_MAX = 7800;//6800 protected AgentContext context = null; protected SSLServerSocket listenSocket; protected Address address; protected int connection_count = 0; protected boolean processing; /** timeout was originally private, but subclasses need to use it to construct there own treads of control, so I changed it to protected (ST -14/8/00) */ protected long timeout = -1; /** Data structure holding messages as they are read in */ protected Queue inMail; /** Reference to MailBox of which this is a sub-component */ protected MailBox mbox; public Server(AgentContext context, MailBox mbox, Queue inMail) { Assert.notNull(context); this.context = context; // Get localhost details try { InetAddress ip = InetAddress.getLocalHost(); String localhost = ip.getHostAddress(); Core.DEBUG(4,"Ip Address is: " + ip); // Select port for listening boolean port_found = false; for(int port = DEFAULT_PORT_MIN; !port_found && port < DEFAULT_PORT_MAX; port++ ) { //modified 5/10/2001 for ssl try { //start of ssl modification ServerSocketFactory ssf; //set up key manager to do server authentication SSLContext ctx; KeyManagerFactory kmf; KeyStore ks; char[] passphrase = "passphrase".toCharArray(); ctx = SSLContext.getInstance("TLS"); kmf = KeyManagerFactory.getInstance("SunX509"); ks = KeyStore.getInstance("JKS"); ks.load(new FileInputStream("testkeys"), passphrase); kmf.init(ks, passphrase); ctx.init(kmf.getKeyManagers(), null, null); ssf = ctx.getServerSocketFactory(); listenSocket = (SSLServerSocket) ssf.createServerSocket(port,MAX_QUEUE_LENGTH); System.out.println("SSL Server has been started"); //end of ssl modification //listenSocket = new SSLServerSocket(port,MAX_QUEUE_LENGTH); port_found = true; address = new Address(context.whoami(),localhost, port,context.whatami()); context.AddressBook().add(address); } catch (Exception e) { //changes to Exception instead of IOException // e.printStackTrace(); } } if ( !port_found ) { System.err.println("Cannot get a port for listening"); System.exit(0); } } catch (UnknownHostException e) { System.err.println("Cannot get local host IP address"); System.exit(0); } catch (IOException e) { System.err.println("Cannot get a port for listening"); e.printStackTrace(); System.exit(0); } // Store variables this.inMail = inMail; this.mbox = mbox; // LL 030500 1.03b lowerStatus(); // Start the server listening for connections this.start(); } public AgentContext getAgentContext() { return context; } public synchronized void updateCount(int x) { connection_count += x; if ( x < 0 ) notify(); } public void stopProcessing() { processing = false; } public void lowerStatus() { this.setPriority(Thread.NORM_PRIORITY-2); timeout = 1000; } // LL 030500 1.03bB public void normalStatus() { this.setPriority(Thread.NORM_PRIORITY); timeout = 1000;//1000? } // LL 030500 1.03bE // The body of the server thread. Loop forever, listening for and // accepting connections from clients. For each connection, // create a Connection object to handle communication through the // new Socket. public void run() { processing = true; while(processing) { // System.err.print("&"); try { synchronized(this) { while ( connection_count >= MAX_CONNECTIONS ) { try { wait(); } catch(InterruptedException e) {} } } //start of ssl modification SSLSocket client = (SSLSocket)listenSocket.accept(); //end of ssl modification updateCount(1); new Connection(client, this); } catch (Exception e) { Core.DEBUG(3,"Exception listening for connections: " + e); } /* if ( timeout != -1 ) { try { sleep(300); } catch(InterruptedException e) {} } else */ yield(); } } protected void finalize() { try { if ( listenSocket != null ) listenSocket.close(); //ssl mod } catch(IOException e) { } } public Address getAddress() { return address; } public void newMsg( Performative msg ) { Address addr; Time time; if ( (time = context.currentTime()) != null ) msg.setReceiveTime(time); if ( (addr = msg.getAddress()) != null ) context.AddressBook().add( addr); inMail.enqueue(msg); mbox.notifyMonitors(msg,MailBox.RECEIVE); } }


Last modified: Sun Oct 31 21:05:11 PST 2004