package org.wikiwebserver.handler.http.responder;

import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;

import org.wikiwebserver.distribute.interfaces.Persistable;
import org.wikiwebserver.distribute.interfaces.WorkerNode;
import org.wikiwebserver.distribute.server.NodeCommunicationException;
import org.wikiwebserver.distribute.server.RemoteWorkerNode;
import org.wikiwebserver.distribute.server.TaskStub;
import org.wikiwebserver.distribute.server.WorkerNodeManager;
import org.wikiwebserver.handler.http.HTTPException;
import org.wikiwebserver.handler.http.HTTPHandler;
import org.wikiwebserver.handler.http.HTTPHeaders;
import org.wikiwebserver.handler.http.HTTPPostReader;
import org.wikiwebserver.handler.http.interfaces.HTTPResponder;

import page.tools.entity.NodeData;

public class NodeTaskResponder implements HTTPResponder {
	
	private static final int PROXY_STREAMING_BUFFER_SIZE = 32 * 1024;
	private static final int PROXY_STREAMING_IDLE_CLOSE_PERIOD = 20 * 1000;

	public Object respond(HTTPHandler conn) throws IOException {
		
	    // System.out.println(conn.getRequest());
		
	    long time = System.currentTimeMillis();
		HTTPHeaders requestHeaders = conn.getRequest().getHeaders();
		HTTPHeaders responseHeaders = conn.getResponse().getHeaders();

		String nodeId = requestHeaders.getFirst("X-Node-ID");
		
        NodeData nodeData = NodeData.getNodeDataById(nodeId);	
		
        if (nodeData == null) {
            throw new HTTPException(400, "Invalid remote worker node data");
        } 		

		// Will create new if node does not exist
        WorkerNode node = WorkerNodeManager.getNode(nodeId);        
        
        if (!(node instanceof RemoteWorkerNode)) {
        	throw new HTTPException(400, "Invalid remote worker node");
        }  

        RemoteWorkerNode remoteNode = (RemoteWorkerNode) node;
        
        int delayUntilNextCheck = remoteNode.updateTaskCheckDelay();
        responseHeaders.set("X-Task-Check-Delay", String.valueOf(delayUntilNextCheck));        

        String taskIdString = requestHeaders.getFirst("X-Task-ID");
        
        if (taskIdString == null) {
            throw new HTTPException(400, "Task-ID required");
        }
        
        // The request from the node will update a task
        TaskStub taskStub = remoteNode.getTaskStub(taskIdString);
        
        // Ensure task exists
        if (taskStub == null) {
            throw new HTTPException(400, "Task not found");
        }
        
        // Ensure task not already completed
        if (taskStub.isComplete()) {
            throw new HTTPException(400, "Task already completed");
        }
        
        // Ensure task has not expired
        if (time > taskStub.getExpireTime()) {
        	throw new HTTPException(400, "Task has expired");
        }
        
        if (taskStub.isOutputGenerator()) {
            
            if (!"POST".equalsIgnoreCase(conn.getRequest().getMethod())) {
                throw new HTTPException(400, "POST expected from task");
            }
        	
    		taskStub.setTaskOutputMeta(requestHeaders.getFirst("X-Task-Output-Meta"));      	

    		String contentType = requestHeaders.getFirst("Content-Type");
            if (contentType.equals("application/x-java-persistable-object")) {
            	DataInputStream din = null;
            	try {
            		din = new DataInputStream((HTTPPostReader)conn.getRequest().getData());
	            	try {
	            		String className = din.readUTF();
	            		Class<?> persistableClass = Class.forName(className);
	            		Persistable persistable = (Persistable) persistableClass.newInstance();
	            		persistable.resurrect(din);
	            		taskStub.setOutput(persistable);
	            		
					} catch (Exception ex) {
		    			String msg = "Failed to understand persistable data from node";
						throw new HTTPException(300, msg, ex);
					}
            	}
            	finally {
	            	try { din.close(); } catch (Exception ex) {}
            	}
            }    		
            
            
            else if (contentType.equals("application/x-java-serialized-object")) {
            	ObjectInputStream ois = null;
            	try {
	            	ois = new ObjectInputStream((HTTPPostReader)conn.getRequest().getData());
	            	try {
						taskStub.setOutput(ois.readObject());
					} catch (ClassNotFoundException ex) {
		    			String msg = "Failed to understand serialized data from node";
						throw new HTTPException(300, msg, ex);
					}
            	}
            	finally {
	            	try { ois.close(); } catch (Exception ex) {}
            	}
            }
            else {
	
	            HTTPPostReader in = (HTTPPostReader)conn.getRequest().getData();

            	taskStub.setOutput(in); 	
	            // Wait for data transfer to complete
            	long lastReadTime = time;
            	int remaining = in.remaining();
	            while (remaining > 0) {	
	            	
	            	// Wait for data to flow
	            	try { Thread.sleep(10);
					} catch (InterruptedException ex) {}

					time = System.currentTimeMillis();
					int dataReadWhileSleeping = remaining - in.remaining();
					if (dataReadWhileSleeping > 0) lastReadTime = time;
					
					// If no data transferred in a while, time out
					if (time - lastReadTime > PROXY_STREAMING_IDLE_CLOSE_PERIOD) {
						throw new NodeCommunicationException("Timeout while streaming data from node");
					}
					
			        // Ensure task not completed (in another connection)
			        if (taskStub.isComplete()) {
			            throw new HTTPException(400, "Concurrency issue, task already completed");
			        }					
					
					remaining = in.remaining();
	            }
	            
                // Expect another task soon
                node.reduceWaitTime();		            
            }
    	}            
        
        // Send data to the node
        if (taskStub.isInputProcessor()) {
			
        	if (taskStub.getInput() == null) {
        		// Wait for data to be supplied
        		int timeout = (int) (taskStub.getExpireTime() - time);
        		synchronized (taskStub) {
        			try { taskStub.wait(timeout);
					} catch (InterruptedException ex) {
						String msg = "Task expired while waiting for data to send to node";
						throw new HTTPException(500, msg);						
					}
        		}
        	}
        	
        	responseHeaders.set("X-Task-Input-Meta", taskStub.getTaskInputMeta());            	
        	
        	conn.getResponse().setCode(200);
        	conn.getResponse().setInfo("Data to node follows");
        		
        	if (taskStub.getInput() instanceof InputStream) {
        		responseHeaders.set("Content-Type", "application/octet-stream");
	    		InputStream in = null;
	    		try {
	    			in = (InputStream) taskStub.getInput();
	    			OutputStream out = conn.getOutputStream();
	        		byte[] buffer = new byte[PROXY_STREAMING_BUFFER_SIZE];
	        		int r = in.read(buffer);
	        		while (r > 0) {
	        			out.write(buffer, 0, r);
	        			r = in.read(buffer);
	        		}	
	        		
	                // Expect another task soon
	                node.reduceWaitTime();	  
	                taskStub.setComplete(true);
	                return null;
	    		} 
	    		catch (IOException ex) {
	    			String msg = "Failed to stream data to node";
					throw new HTTPException(300, msg, ex);      			
	    		}
	    		finally {
	    			try { in.close(); } catch (Exception ex) {}    			
	    		}
        	}
        	
        	else if (taskStub.getInput() instanceof Persistable) {
        		responseHeaders.set("Content-Type", "application/x-java-persistable-object");

    			ByteArrayOutputStream bos = new ByteArrayOutputStream();		
				DataOutputStream dout = new DataOutputStream(bos);
				dout.writeUTF(taskStub.getInput().getClass().getName());
				((Persistable) taskStub.getInput()).persist(dout);
				
				taskStub.setComplete(true);
				return bos.toByteArray();
        	}        	
        	
        	else {
        		responseHeaders.set("Content-Type", "application/x-java-serialized-object");

        		ByteArrayOutputStream bos = new ByteArrayOutputStream();
				ObjectOutputStream oos = new ObjectOutputStream(bos);
				oos.writeObject(taskStub.getInput());
				oos.close();

				taskStub.setComplete(true);
				return bos.toByteArray();
        	}
        }
        
        taskStub.setComplete(true);
        return null;
	}	
}

