Bucketing, Multiplexing and Combining in Hadoop - Part 1
this is the first blog post in a series which looks at some data organization patterns in mapreduce. we’ll look at how to bucket output across multiple files in a single task, how to multiplex data across multiple files, and also how to coalesce data. these are all common patterns that are useful to have in your mapreduce toolkit. we’ll kick things off with a look at bucketing data outputs in your map or reduce tasks. by default when using a fileoutputformat-derived outputformat (such as textoutputformat), all the outputs for a reduce task (or a map task in a map-only job) are written to a single file in hdfs. imagine a situation where you have user activity logs being streamed into hdfs, and you want to write a mapreduce job to better organize the incoming data. as an example a large organization with multiple products may want to bucket the logs based on the product. to do this you’ll need the ability to write to multiple output files in a single task. let’s take a look at how we can make that happen. multipleoutputformat there are a few ways you can achieve your goal, and the first option we’ll look at is the multipleoutputformat class in hadoop. this is an abstract class that lets you do the following: define the output path for each and every key/value output record being emitted by a task. incorporate the input paths into the output directory for map-only jobs. redefine the key and value that are used to write to the underlying recordwriter . this is useful in situations where you want to remove data from the outputs as it duplicates data in the filename. for each output path, define the recordwriter that should be used to write the outputs. ok enough with the words - let’s look at some data and code. first up is the simple data we’ll use in our example - imagine you work at a fruit market with locations in multiple cities, and you have a purchase transaction stream which contains the store location along with the fruit that was purchased. cupertino apple sunnyvale banana cupertino pear to help bucket your data for future analysis, you want to bin each record into city-specific files. for the simple data set above you don’t want to filter, project or transform your data, just bucket it out, so a simple identity map-only job will do the job. to force more than one mapper, we’ll write the data to two separate files. $ tab="$(printf '\t')" $ hdfs -put - file1.txt << eof cupertino${tab}apple sunnyvale${tab}banana eof $ hdfs -put - file2.txt << eof cupertino${tab}pear eof here’s the code which will let you write city-specific output files. import org.apache.commons.lang.stringutils; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.conf.configured; import org.apache.hadoop.fs.filesystem; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.text; import org.apache.hadoop.mapred.*; import org.apache.hadoop.mapred.lib.identitymapper; import org.apache.hadoop.mapred.lib.multipletextoutputformat; import org.apache.hadoop.util.progressable; import org.apache.hadoop.util.tool; import org.apache.hadoop.util.toolrunner; import java.io.ioexception; import java.util.arrays; /** * an example of how to use {@link org.apache.hadoop.mapred.lib.multipleoutputformat}. */ public class mofexample extends configured implements tool { /** * create output files based on the output record's key name. */ static class keybasedmultipletextoutputformat extends multipletextoutputformat { @override protected string generatefilenameforkeyvalue(text key, text value, string name) { return key.tostring() + "/" + name; } } /** * the main job driver. */ public int run(final string[] args) throws exception { string csvinputs = stringutils.join(arrays.copyofrange(args, 0, args.length - 1), ","); path outputdir = new path(args[args.length - 1]); jobconf jobconf = new jobconf(super.getconf()); jobconf.setjarbyclass(mofexample.class); jobconf.setnumreducetasks(0); jobconf.setmapperclass(identitymapper.class); jobconf.setinputformat(keyvaluetextinputformat.class); jobconf.setoutputformat(keybasedmultipletextoutputformat.class); fileinputformat.setinputpaths(jobconf, csvinputs); fileoutputformat.setoutputpath(jobconf, outputdir); return jobclient.runjob(jobconf).issuccessful() ? 0 : 1; } /** * main entry point for the utility. * * @param args arguments * @throws exception when something goes wrong */ public static void main(final string[] args) throws exception { int res = toolrunner.run(new configuration(), new mofexample(), args); system.exit(res); } } run this code and you’ll see the following files in hdfs, where /output is the job output directory: $ hadoop fs -lsr /output /output/cupertino/part-00000 /output/cupertino/part-00001 /output/sunnyvale/part-00000 if you look at the output files you’ll see that the files contain the correct buckets. $ hadoop fs -lsr /output/cupertino/* cupertino apple cupertino pear $ hadoop fs -lsr /output/sunnyvale/* sunnyvale banana awesome, you have your data bucketed by store. now that we have everything working, let’s look at what we did to get there. we had to do two things to get this working: extend multipletextoutputformat this is where the magic happened - let’s look at that class again. static class keybasedmultipletextoutputformat extends multipletextoutputformat { @override protected string generatefilenameforkeyvalue(text key, text value, string name) { return key.tostring() + "/" + name; } } you are working with text, which is why you extended multipletextoutputformat , a class that in turn extends multipleoutputformat . multipletextoutputformat is a simple class which instructs the multipleoutputformat to use textoutputformat as the underlying output format for writing out the records. if you were to use multipleoutputformat as-is it behaves as if you were using the regular textoutputformat , which is to say that it’ll only write to a single output file. to write data to multiple files you had to extend it, as with the example above. the generatefilenameforkeyvalue method allows you to return the output path for an input record. the third argument, name , is the original fileoutputformat -created filename, which is in the form “part-nnnnn”, where “nnnnn” is the task index, to ensure uniqueness. to avoid file collisions, it’s a good idea to make sure your generated output paths are unique, and leveraging the original output file is certainly a good way of doing this. in our example we’re using the key as the directory name, and then writing to the original fileoutputformat filename within that directory. specify the outputformat the next step was easy - specify that this output format should be used for your job: jobconf.setoutputformat(keybasedmultipletextoutputformat.class); earlier we also mentioned that you can use the input path as part of the output path, which we will look at next. using the input filename as part of the output filename in map-only jobs what if we wanted to keep the input filename as part of the output filename? this only works for map-only jobs, and can be accomplished by overriding the getinputfilebasedoutputfilename method. let’s look at the following code to understand how this method fits into the overall sequence of actions that the multipleoutputformat class performs: public void write(k key, v value) throws ioexception { // get the file name based on the key string keybasedpath = generatefilenameforkeyvalue(key, value, myname); // get the file name based on the input file name string finalpath = getinputfilebasedoutputfilename(myjob, keybasedpath); // get the actual key k actualkey = generateactualkey(key, value); v actualvalue = generateactualvalue(key, value); recordwriter rw = this.recordwriters.get(finalpath); if (rw == null) { // if we don't have the record writer yet for the final path, create // one // and add it to the cache rw = getbaserecordwriter(myfs, myjob, finalpath, myprogressable); this.recordwriters.put(finalpath, rw); } rw.write(actualkey, actualvalue); }; the getinputfilebasedoutputfilename method is called with the output of generatefilenameforkeyvalue , which contains our already-customized output file. our new keybasedmultipletextoutputformat can now be updated to override getinputfilebasedoutputfilename and append the original input filename to the output filename: static class keybasedmultipletextoutputformat extends multipletextoutputformat { @override protected string generatefilenameforkeyvalue(object key, object value, string name) { return key.tostring() + "/" + name; } @override protected string getinputfilebasedoutputfilename(jobconf job, string name) { string infilename = new path(job.get("map.input.file")).getname(); return name + "-" + infilename; } if you run with your modified outputformat class you’ll see the following files in hdfs, confirming that the input filenames are now concatenated to the end of each output file. $ hadoop fs -lsr /output /output/cupertino/part-00000-file1.txt /output/cupertino/part-00001-file2.txt /output/sunnyvale/part-00000-file1.txt the implementation of getinputfilebasedoutputfilename in multipleoutputformat doesn’t do anything interesting by default, but if you set the value of the mapred.outputformat.numoftrailinglegs configurable to an integer greater than 0, then the getinputfilebasedoutputfilename will use part of the input path as the output path. let’s see what happens when we set the value to 1: jobconf.setint("mapred.outputformat.numoftrailinglegs", 1); the output files in hdfs now exactly mirror the input files used for the job: $ hadoop fs -lsr /output /output/file1.txt /output/file2.txt if we set mapred.outputformat.numoftrailinglegs to 2, and our input files exist in the /inputs directory, then our output directory looks like this: $ hadoop fs -lsr /output /output/input/file1.txt /output/input/file2.txt basically as you keep incrementing mapred.outputformat.numoftrailinglegs , then multipleoutputformat will continue to go up the parent directories of the input file and use them in the output path. modifying the output key and value it’s very possible that the actual key and value you want to emit are different from those that were used to determine the output file. in our example, we took the output key and wrote to a directory using the key name. if you do that keeping the key in the output file may be redundant. how would we modify the output record so that the key isn’t written? multipleoutputformat has your back with the generateactualkey method. class keybasedmultipletextoutputformat extends multipletextoutputformat { @override protected string generatefilenameforkeyvalue(text key, text value, string name) { return key.tostring() + "/" + name; } @override protected text generateactualkey(text key, text value) { return null; } } the returned value from this method replaces the key that’s supplied to the underlying recordwriter , so if you return null as in the above example, no key will be written to the file. $ hadoop fs -lsr /output/cupertino/* apple pear $ hadoop fs -lsr /output/sunnyvale/* banana you can achieve the same result for the output value by overriding the generateactualvalue method. changing the recordwriter in our final step we’ll look at how you can leverage multiple recordwriter classes for different output files. this is accomplished by overriding the getrecordwriter method. in the example below we’re leveraging the same textoutputformat for all the files, but it gives you a sense of what can be accomplished. static class keybasedmultipletextoutputformat extends multipletextoutputformat { @override protected string generatefilenameforkeyvalue(text key, text value, string name) { return key.tostring() + "/" + name; } @override public recordwriter getrecordwriter(filesystem fs, jobconf job, string name, progressable prog) throws ioexception { if (name.startswith("apple")) { return new textoutputformat().getrecordwriter(fs, job, name, prog); } else if (name.startswith("banana")) { return new textoutputformat().getrecordwriter(fs, job, name, prog); } return super.getrecordwriter(fs, job, name, prog); } } conclusion when using multipleoutputformat , give some thought to the number of distinct files that each reducer will create. it would be prudent to plan your bucketing so that you have a relatively small number of files. in this post we extended multipletextoutputformat , which is a simple extension of multipleoutputformat that supports text outputs. multiplesequencefileoutputformat also exists to support sequencefiles in a similar fashion. so what are the shortcomings with the multipleoutputformat class? if you have a job that uses both map and reduce phases, then multipleoutputformat can’t be used in the map-side to write outputs. of course, multipleoutputformat works fine in map-only jobs. all recordwriter classes must support exactly the same output record types. for example, you wouldn’t be able to support a recordwriter that emitted for one output file, and have another recordwriter that emitted . multipleoutputformat exists in the mapred package, so it won’t work with a job that requires use of the mapreduce package. all is not lost if you bump into either one of these issues, as you’ll discover in the next blog post.
May 20, 2013
by Alex Holmes
·
5,675 Views
·
0 Likes