Custom - Map Reduce - Schedule Job

/custom/mapreduce/schedulejob/{jobtitle}/{jobdesc}/{communityIds}/{jarURL}/{timeToRun}/{frequencyToRun}/{mapperClass}/{reducerClass}/{combinerClass}/{query}/{inputcollection}/{outputKey}/{outputValue}

Schedules a hadoop map reduce job. Returns the output collection id in the data field of the response if successfully queued to run.

A detailed guide to creating plugins.

A simple web-based utility is available for uploading JARs and managing jobs.

Authentication

Required, see Auth - Login.

Arguments

jobtitle (required)
A descriptive name of the job being submitted.

jobdesc (required)
A description of what the job being submitted is attempting to do.

communityIds (required)
Community ID, or IDs (comma-separated), that the map reduce job wants to run on. These will be appended to the mongo query.

jarURL (required)
A URL to the location of the jar file to run for the job, this can be in our Shares table or hosted somewhere else on the web.  Any permission errors will die silently and the job will not complete.

timeToRun (required)
The time you want a job to be run after in long form. For example if you want it to run immediately when possible you can submit 0. If you want the job to run after January 1, 2015 submit: 1420106400000.

frequencyToRun (required)
How often the job should be ran, either: NONE, HOURLY, DAILY, WEEKLY, MONTHLY. This will cause the job to get resubmitted after running, use NONE if you only want the job to run once.

mapperClass (required)
The java classpath to the jobs mapper, it should be in the form of package.file$class

reducerClass (optional)
The java classpath to the jobs reducer, it should be in the form of package.file$class

combinerClass (optional)
The java classpath to the jobs combiner, it should be in the form of package.file$class (use the reducer if you have not written a combiner or submit null). If not present, then only the mapper (or combiner) is run, and records with duplicate keys will overwrite each other in an arbitrary order. 

query (required)
The mongo query to use to get the jobs data. {} is a blank query or you can submit null.  

Only indexed fields should be used in the query, this is discussed further here: Hadoop Plugin Guide

Note that MongoDB uses some JSON extensions that must be used in queries from the command line:

  • When querying the ObjectId type (eg "_id"), it should be queried as the object '{ "$oid": "<object id string>"' }'
  • When querying a Date type, it should be queried as the object '{ "$date": "<date in Java time, ie milliseconds since 01 Jan 1970" }'

The full list is here: http://docs.mongodb.org/manual/reference/mongodb-extended-json/

From March 2014, can also be an query JSON object. This provides a much more powerful query interface (including on full text and non-indexed fields), but this does come at the cost of some performance (particularly compared to "$srctags") so use old-style queries if possible.

In both cases you can also add post-processing directives. The preferred method of specifying post-processing fields is now to use additional optional fields in the top-level of the query, as follows:

{
    // Output pre-processing
    "$output": { // (as above)
        "limit":int,          // a record limit, how it is applied depends on limitAllData below
        "limitAllData":boolean,   // if true, the above limit is applied to the entire collection; if false, just to this iteration's records
        "sortField":string,       // field to sort on (defaults to "_id"), supports dot notation
        "sortDirection":int,  // -1 or 1 for descending or ascending  
		"indexes": [{}] or {} // A JSON object or list of JSON objects defining simple or compound MongoDB indexes
    },
    // Other control fields
    "$limit": int,            // If specified then will process this number of records in one giant split (used for debugging)
    "$fields": {},        // Usual MongoDB specification of the fields to provide to the mapper, eg {"_id":0, "entities":1} (defaults to all if {})
	// More advanced parameters:
	"$reducers": int,		// Specifies the number of reducers to use (default: 1)
	"$mapper_key_class": string, // Allows you to use different mapper output classes than the reducer (key class name, should be fully specified)
	"$mapper_value_class": string, // Allows you to use different mapper output classes than the reducer (value class name, should be fully specified)
	// Can mostly be left at their defaults:
    "$splits": int,       // The maximum number of splits before the standard MongoInputFormat class is used (which is very inefficient when a query is applied), default 10
    "$docsPerSplit": int, // The maximum number of docs per split before the standard MongoInputFormat class is used (which is very inefficient when a query is applied), default 12.5K
	"$srctags": string or {...},	// A MongoDB query, that is applied against the source tags (not the document tags) and converts to a list of sources (ie very efficient). 
									// (Note this backs out if too many sources - currently >5000 - are selected, so should be treated as a recommendation - ie mappers might still be called on non-matching sources)
	"$tmin": string, "$tmax": string, // Maps Infinit.e-query style time strings (including "now", "now-1d" etc) onto an indexed field in the specified collection to support time-boxing the input
										// (supported across the following input types: docs (mongo query and infinit.e query), records, custom table)

	"$caches": [ string ], // A list of ids pointing to JARs that are then added to the classpath cache, or other shares that can be accessed via the Hadoop distributed cache
							// (Currently JS scripting engine only: Also can be a list of ids/job titles pointing to other jobs that can then be accessed via _custom)

	// Record specific:
	// (ES formatted query needed, with $tmin/$tmax support)
	"$streaming": boolean, // if present then will search only live/streaming records interface if true, only stashed/demo records interface if false; searches both if not present
	"$types": string, // ,-separated list of ES-types to filter on
    //
    // The MongoDB or Infinit.e or ES query as before eg "sourceKey": "key.to.source", / "qt" [ { "etext", "string" } ], etc
    //
}

Note that "$srctags", "$splits", "$docsPerSplit" are not supported when using the platform query JSON format.

See the Hadoop Plugin Guide for more information. (This also covers the different "query" format for file access, see also below under "inputCollection")

UPDATE: From August 2013 this replaces the old format, and from Jan 2014 it will no longer work: for reference, the old format was an array of the form [{mongodb query},{postproc}|null, {fields}] where postproc is a json object following the form below, and "fields" is the usual MongoDB "projection" object for specifying fields returned from queries.

{
	"limit":int,
	"sortField":"field.field.field",
	"sortDirection":-1|1,
	"limitAllData":true|false
}

inputcollection (required)
The mongo collection you want to use as input. You can submit DOC_METADATA to get the documents metadata, DOC_CONTENT to get the document contents, or grab a previous map reduce jobs results table in your communities by submitting its id or title (must be a member of that community).

From March 2014 this can also be "filesystem", which can read files directly from HDFS. This is discussed further under Advanced Topics in the Hadoop Plugin Guide.

outputKey (required)
The classpath for the map reduce output format key usually org.apache.hadoop.io.Text

outputValue (required)
The classpath for the map reduce output format value usually org.apache.hadoop.io.IntWritable

json (optional)
Current you can pass a json object in containing any custom arguments you want passed in to the map reduce job at runtime, these arguments will be available in the config file in your mapper/reducer
format: { "arguments":"any string you want here" }

selfMerge (optional, POST payload only)
true/false, if true will send your previous job's output back to your mapper e.g. your job can get data from "inputcollection" and this jobs last runs output

Example

http://infinite.ikanow.com/api/custom/mapreduce/schedulejob/TestJob/Testing%20map%20reduce/4e9c77ef17ef3523b657a890/%24infinite%2Fshare%2Fget%2F4eafed58233558b98055c872/0/NONE/com.ikanow.infinit.e.core.mapreduce.examplejars.Test%24TokenizerMapper/com.ikanow.infinit.e.core.mapreduce.examplejars.Test%24IntSumReducer/com.ikanow.infinit.e.core.mapreduce.examplejars.Test%24IntSumReducer/null/doc_metadata/org.apache.hadoop.io.Text/org.apache.hadoop.io.IntWritable

cURL - Login, Get Map Reduce Jobs, Get Map Reduce Job Results, LogoutJava Example

Method.Post

Example using curl:

curl \-XPOST 'http://infinite.ikanow.com/api/custom/mapreduce/schedulejob/TestJob/Testing%20map%20reduce/4e9c77ef17ef3523b657a890/%24infinite%2Fshare%2Fget%2F4eafed58233558b98055c872/0/NONE/com.ikanow.infinit.e.core.mapreduce.examplejars.Test%24TokenizerMapper/com.ikanow.infinit.e.core.mapreduce.examplejars.Test%24IntSumReducer/com.ikanow.infinit.e.core.mapreduce.examplejars.Test%24IntSumReducer/null/doc_metadata/org.apache.hadoop.io.Text/org.apache.hadoop.io.IntWritable' \-d '{ "arguments":"Sterling Archer" }'

Example Response
{"response":{"action":"Schedule MapReduce Job","success":true,"message":"Job scheduled successfully, will run on: Wed Dec 31 19:00:00 EST 1969","time":246},"data":"4f2007dd8196fe53a52c25a1"}
Error Response
{"response":{"action":"Schedule MapReduce Job","success":false,"message":"You are not allowed to use the given input collection.","time":142}}