package org.wikiwebserver.distribute.se.worker;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;

import org.wikiwebserver.distribute.interfaces.Persistable;
import org.wikiwebserver.distribute.interfaces.Processor;
import org.wikiwebserver.distribute.interfaces.Generator;
import org.wikiwebserver.distribute.interfaces.Task;
import org.wikiwebserver.distribute.interfaces.Transferer;
import org.wikiwebserver.distribute.se.ConfigManager;
import org.wikiwebserver.distribute.se.worker.task.ExceptionPostTask;

public class ServerCommunicator {
	
    public static final int TASK_ASSIGNMENT_CONNECT_TIMEOUT = 5000;
    public static final int TASK_ASSIGNMENT_READ_TIMEOUT = 10000;
    public static final int TASK_COMMUNICATION_CONNECT_TIMEOUT = 5000;
    public static final int TASK_COMMUNICATION_READ_TIMEOUT = 60000;    
	public static final int STREAM_BUFFER_SIZE = 32 * 1024;
	
	public static String lastConfigId;
	private static volatile long lastTaskCheckTime;
	private static volatile long nextTaskCheckTime;
	private static volatile String error;
	
	public static Task getTask() 
		throws ClassNotFoundException, InstantiationException, 
		IllegalAccessException, IllegalArgumentException, IOException {
	    
	    URL taskAssignmentUrl = ConfigManager.getTaskAssignmentUrl();
	    
	    String nodeId = ConfigManager.getString("nodeId");
	    String nodeIdKey = ConfigManager.getString("nodeIdKey");
	    String nodePassword = ConfigManager.getString("nodePassword");
	    String nodeName = ConfigManager.getString("nodeName"); 
	    String userId = ConfigManager.getString("userId");
	    String browserId = ConfigManager.getString("browserId");
	    String configId = String.valueOf(ConfigManager.getConfigModifiedTime()); 
	    
	    if (nodeName == null) {
	        throw new IllegalArgumentException("Node name required");
	    }
		
		HttpURLConnection conn = (HttpURLConnection) taskAssignmentUrl.openConnection();
        conn.setDoInput(true);
        conn.setDoOutput(false);
        conn.setUseCaches(false);		
		conn.setConnectTimeout(TASK_ASSIGNMENT_CONNECT_TIMEOUT);
		conn.setReadTimeout(TASK_ASSIGNMENT_READ_TIMEOUT);
		
		if (nodeId == null || nodeId.trim().length() == 0) {
			conn.setRequestProperty("X-Node-ID-Request", "true");
		}
		else {
		    conn.setRequestProperty("X-Node-ID", nodeId);
		    conn.setRequestProperty("X-Node-ID-Key", nodeIdKey);
		    if (userId != null && userId.trim().length() > 0) {
		    	conn.setRequestProperty("User-ID", userId);
		    }
		    if (browserId != null && browserId.trim().length() > 0) {
		    	conn.setRequestProperty("Browser-ID", browserId);		
		    }
		}
		
		// Update node name
		String urlEncNodeName = URLEncoder.encode(nodeName, "utf8").replace("+", "%20");
		conn.setRequestProperty("X-Node-Name", urlEncNodeName);
		
		// Update configuration identity
		if (!configId.equals(lastConfigId)) {
		    conn.setRequestProperty("X-Node-Config-ID", configId);	
		    conn.setRequestProperty("X-Node-OS-Name", System.getProperty("os.name"));
		    boolean reqPW = (nodePassword != null && nodePassword.length() > 0);
		    conn.setRequestProperty("X-Node-Requires-Password", String.valueOf(reqPW));
		}
		
		
        long time = System.currentTimeMillis();     		
		
        // Find when the next task should be requested from the server
        String nextTaskCheckDelayString = conn.getHeaderField("X-Task-Check-Delay");
        if (nextTaskCheckDelayString != null) {
            int nextTaskCheckDelay = Integer.parseInt(nextTaskCheckDelayString);
            nextTaskCheckTime = time + nextTaskCheckDelay;	
        }
        
        // Server may provide or update node ID
        nodeId = conn.getHeaderField("X-Node-ID");
        if (nodeId != null && nodeId.length() > 0) {
            ConfigManager.setString("nodeId", nodeId);
            ConfigManager.setString("nodeIdKey", conn.getHeaderField("X-Node-ID-Key"));
        }        
        
		if (conn.getResponseCode() / 100 != 2) {
			error = conn.getResponseMessage();
			System.err.println(error);
			return null;
		}	
		else error = null;
		

		Task task = null;
		String taskName = conn.getHeaderField("X-Task");
		if (taskName != null && taskName.length() > 0) {

    		String className = "org.wikiwebserver.distribute.se.worker.task." + taskName + "Task";
    		ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
    		Class<?> taskClass = classLoader.loadClass(className);
    		task = (Task) taskClass.newInstance();
            
    		task.setTaskId(conn.getHeaderField("X-Task-ID"));
    		
    		long timeOffset = (time - conn.getDate());		
    		task.setExpireTime(conn.getHeaderFieldDate("X-Task-Expires", time) + timeOffset);
    		
    		task.setTaskInputMeta(conn.getHeaderField("X-Task-Input-Meta"));	
    		task.setTaskPassword(conn.getHeaderField("X-Task-Password"));
    		
            // If this node has a password, check task password matches
            if (nodePassword != null && nodePassword.length() > 0) {
            	if (!nodePassword.equals(task.getTaskPassword())) {
	            	SecurityException se = new SecurityException("Incorrect task password.");
	                // The task is to send back the exception
	                Task exceptionResponse = new ExceptionPostTask(se);
	                exceptionResponse.setTaskId(task.getTaskId());
	                task = exceptionResponse;
            	}
            }      		
		}
		

        InputStream in = conn.getInputStream();		
		in.close();
		
		lastTaskCheckTime = time;
        lastConfigId = configId;   
        
		return task;
	}

	
	public static void performTransfer(Task task) throws Exception {
	    
	    URL taskCommunicationUrl = ConfigManager.getTaskCommunicationUrl();	   
	    
        //System.out.println("Task " + task.getTaskId() + " communicating...");	    
		
		if (!(task instanceof Transferer)) {
			throw new IllegalArgumentException("Task does not require data transfer");
		}		
		
        String nodeId = ConfigManager.getString("nodeId");
		
		HttpURLConnection conn = (HttpURLConnection) taskCommunicationUrl.openConnection();
        conn.setDoOutput(task instanceof Generator);		
        conn.setDoInput(true);
        conn.setUseCaches(false);
        conn.setConnectTimeout(TASK_COMMUNICATION_CONNECT_TIMEOUT);
        conn.setReadTimeout(TASK_COMMUNICATION_READ_TIMEOUT);		

		conn.setRequestProperty("X-Node-ID", nodeId);
		conn.setRequestProperty("X-Task-ID", task.getTaskId());
		
		if (task instanceof Generator) {
			//conn.setRequestMethod("POST");
			
			Generator gen = (Generator)task;
			Object output = null;
			try {
				output = gen.generate();
				
			} catch (IOException ex) {
				output = ex;
			}
			
			if (task.getTaskOutputMeta() != null) {
				conn.setRequestProperty("X-Task-Output-Meta", task.getTaskOutputMeta());		
			}
			
			if (output instanceof byte[] || output instanceof InputStream) {
				
				conn.setRequestProperty("Content-Type", "application/octet-stream");
				
				long length = gen.getStreamLength();
				if (length > Integer.MAX_VALUE) {
					throw new IllegalArgumentException("Output too large. (2GB max)");
				}
				else if (length > 0) {
					conn.setFixedLengthStreamingMode((int)length);
				}
				
				OutputStream out = conn.getOutputStream();	
				try {
					if (output instanceof byte[]) {
						out.write((byte[])output);
					}
					else if (output instanceof InputStream) {
						InputStream in = (InputStream)output;
						try {
							byte[] buffer = new byte[STREAM_BUFFER_SIZE];
							int r = in.read(buffer);
							while (r > 0) {
								out.write(buffer, 0, r);
								r = in.read(buffer);
							}
						}
						finally {				
							try { in.close(); } catch (Exception ex) {}	
						}
					}
				}
				finally {
					try { out.close(); } catch (Exception ex) {}						
				}
			}
			else if (output instanceof Persistable) {
				conn.setRequestProperty("Content-Type", "application/x-java-persistable-object");
				OutputStream out = conn.getOutputStream();			
				DataOutputStream dout = new DataOutputStream(out);
				dout.writeUTF(output.getClass().getName());
				try {
					((Persistable) output).persist(dout);
				}
				finally {
					try { dout.close(); } catch (Exception ex) {}
				}
			}			
			else {
				conn.setRequestProperty("Content-Type", "application/x-java-serialized-object");
				OutputStream out = conn.getOutputStream();	
				ObjectOutputStream oos = new ObjectOutputStream(out);
				try {
					oos.writeObject(output);
				} finally {
				    try { oos.close(); } catch (Exception ex) {}
				}
			}
		}		
		
		
        long time = System.currentTimeMillis();         
        
        // Find when the next task should be requested from the server
        String nextTaskCheckDelayString = conn.getHeaderField("X-Task-Check-Delay");
        if (nextTaskCheckDelayString != null) {
            int nextTaskCheckDelay = Integer.parseInt(nextTaskCheckDelayString);
            nextTaskCheckTime = time + nextTaskCheckDelay;  
        }  
        

		if (conn.getResponseCode() / 100 != 2) {
			throw new IOException("Failed to write output: " + conn.getResponseMessage());
		}
		
        InputStream in = conn.getInputStream();		
		
		if (task instanceof Processor) {
			
			Processor proc = (Processor)task;
			
            task.setTaskInputMeta(conn.getHeaderField("X-Task-Input-Meta")); 			

			String contentType = conn.getHeaderField("Content-Type");
			
			if (contentType.equals("application/x-java-serialized-object")) {
				ObjectInputStream ois = new ObjectInputStream(in); 
				proc.process(ois.readObject());
				ois.close();
			}
			else if (contentType.equals("application/x-java-persistable-object")) {
				DataInputStream din = new DataInputStream(in); 
				
				String className = din.readUTF();
				Class<?> persistableClass = Class.forName(className);
				
				Persistable persistable = (Persistable) persistableClass.newInstance();
				persistable.resurrect(din);
				din.close();
				
				proc.process(persistable);
			}			
			else {
				proc.process(in);
				in.close();
			}
		}
		else {
		    in.close();
		}
	}
	
    public static long getLastTaskCheckTime() {
        return lastTaskCheckTime;
    }   
    
    public static long checkNow() {
        return nextTaskCheckTime = System.currentTimeMillis();
    }    
    
    public static long getNextTaskCheckTime() {
        return nextTaskCheckTime;
    }	
    
    public static String getError() {
        return error;
    }
}

