Java - Login, share a hadoop Jar file, schedule a map reduce job with jar, Logout
Java - Login, share a hadoop Jar file, schedule a map reduce job with jar, Logout
import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.net.HttpURLConnection; import java.net.URL; import java.net.URLConnection; import java.net.URLEncoder; import com.ikanow.infinit.e.data_model.api.ResponsePojo; public class test { /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { //login String address = "http://infinite.ikanow.com/api/auth/login/sterling_archer@ikanow.com/WZRHGrsBESr8wYFZ9sx0tPURuZgG2lmzyvWpwXPKz8U%3D"; //don't forget to URLEncode your arguments String loginresult = sendRequest(address, null); ResponsePojo response = ResponsePojo.fromApi(loginresult, ResponsePojo.class); if ( response.getResponse().isSuccess()) { //get map reduce file bytes, send in POST to add binary call String mapreduceFile = "C:/Users/Burch/Desktop/mapreducefile.jar"; byte[] fileBytes = getBytesFromFile(new File(mapreduceFile)); String shareTitle = "Archer Sum Map Reduce"; String shareDesc = "Sums up the sources by community, by Sterling Archer"; String createCommunityAddress = "http://infinite.ikanow.com/api/social/share/add/binary/" + URLEncoder.encode(shareTitle,"UTF-8") + "/" + URLEncoder.encode(shareDesc,"UTF-8"); String communityresult = sendRequest(createCommunityAddress, fileBytes); //The response returns only the id of the share response = ResponsePojo.fromApi(communityresult, ResponsePojo.class); if ( response.getResponse().isSuccess() ) { String shareID = (String)response.getData(); String jobTitle = "Test Job"; String jobDesc = "testing a mr job"; String communityIds = "4c927585d591d31d7b37097a"; //can get communities from social/person/get String jarURL = "$infinite/share/get/" + shareID; //the web url of the share we just added $infinite will be converted to the local api serverside String timeToRun = "0"; //if we want this job to run in the future we will put the ms time to run at String frequencyToRun = "NONE"; //see api page for available enums, we only want it to run once so we put NONE String mapperClass = "com.ikanow.infinit.e.core.mapreduce.examplejars.Test%24TokenizerMapper"; //package name and class of mappper String reducerClass = "com.ikanow.infinit.e.core.mapreduce.examplejars.Test%24IntSumReducer"; //package name and class of reducer String combinerClass = "com.ikanow.infinit.e.core.mapreduce.examplejars.Test%24IntSumReducer"; //this can be the same as reducer usually String query = "null"; //we want everything so we can put null, otherwise you can put json mongodb query String inputcollection = "DOC_METADATA"; //see API for what is available String outputKey = "org.apache.hadoop.io.Text"; String outputValue = "org.apache.hadoop.io.IntWritable"; //now we want to schedule a map reduce job with this jar String scheduleJobAddress = "http://infinite.ikanow.com/api/custom/mapreduce/schedulejob/" + URLEncoder.encode(jobTitle,"UTF-8") + "/" + URLEncoder.encode(jobDesc,"UTF-8") + "/" + URLEncoder.encode(communityIds,"UTF-8") + "/" + URLEncoder.encode(jarURL,"UTF-8") + "/" + URLEncoder.encode(timeToRun,"UTF-8") + "/" + URLEncoder.encode(frequencyToRun,"UTF-8") + "/" + URLEncoder.encode(mapperClass,"UTF-8") + "/" + URLEncoder.encode(reducerClass,"UTF-8") + "/" + URLEncoder.encode(combinerClass,"UTF-8") + "/" + URLEncoder.encode(query,"UTF-8") + "/" + URLEncoder.encode(inputcollection,"UTF-8") + "/" + URLEncoder.encode(outputKey,"UTF-8") + "/" + URLEncoder.encode(outputValue,"UTF-8"); //to use ids, you need to import a mongo.jar available at https://github.com/mongodb/mongo-java-driver/downloads String schedulejobresult = sendRequest(scheduleJobAddress, null); response = ResponsePojo.fromApi(schedulejobresult, ResponsePojo.class); if ( response.getResponse().isSuccess()) { //scheduled job successfully, get results ID, will be available once job has finished running String mapreduceResultID = (String) response.getData(); System.out.println(mapreduceResultID); } } } else { System.out.println("error logging in: " + response.getResponse().getMessage()); } //logout when we are done, this will deactivate our cookie sendRequest("http://infinite.ikanow.com/api/auth/logout", null); } private static String cookie = null; public static String sendRequest(String urlAddress, byte[] postData ) throws Exception { URL url = new URL(urlAddress); URLConnection urlConnection = url.openConnection(); if ( cookie != null ) urlConnection.setRequestProperty("Cookie", cookie); if ( postData != null ) { urlConnection.setDoOutput(true); urlConnection.setRequestProperty("Accept-Charset", "UTF-8"); urlConnection.setRequestProperty("Content-Type", "application/java-archive" + ";charset=" + "UTF-8"); ((HttpURLConnection)urlConnection).setRequestMethod("POST"); OutputStream output = urlConnection.getOutputStream(); output.write(postData); output.close(); } else { ((HttpURLConnection)urlConnection).setRequestMethod("GET"); } //read back result BufferedReader inStream = new BufferedReader(new InputStreamReader(urlConnection.getInputStream())); StringBuilder strBuilder = new StringBuilder(); String buffer; while ( (buffer = inStream.readLine()) != null ) { strBuilder.append(buffer); } inStream.close(); //save cookie if cookie is null if ( cookie == null ) { String headername; for ( int i = 1; (headername = urlConnection.getHeaderFieldKey(i)) != null; i++ ) { if ( headername.equals("Set-Cookie") ) { cookie = urlConnection.getHeaderField(i); break; } } } return strBuilder.toString(); } /** * from: http://www.exampledepot.com/egs/java.io/file2bytearray.html * * @param file * @return * @throws IOException */ public static byte[] getBytesFromFile(File file) throws IOException { InputStream is = new FileInputStream(file); // Get the size of the file long length = file.length(); // You cannot create an array using a long type. // It needs to be an int type. // Before converting to an int type, check // to ensure that file is not larger than Integer.MAX_VALUE. if (length > Integer.MAX_VALUE) { // File is too large } // Create the byte array to hold the data byte[] bytes = new byte[(int)length]; // Read in the bytes int offset = 0; int numRead = 0; while (offset < bytes.length && (numRead=is.read(bytes, offset, bytes.length-offset)) >= 0) { offset += numRead; } // Ensure all the bytes have been read in if (offset < bytes.length) { throw new IOException("Could not completely read file "+file.getName()); } // Close the input stream and return bytes is.close(); return bytes; } }