MapReduceアプリケーションはjarファイルとして作成します。一般的には、
org.apache.hadoop.util.Tool
インターフェースにしたがってクラスを実装します。Hadoopからそのクラスのmain()メソッドが呼び出されることでJobClientが起動されます。
main()メソッドでは、コマンドラインオプションとして指定されたプロパティ(-Dで指定)を取り込むために
org.apache.hadoop.util.ToolRunner
を介して、run()メソッドを実行します。
run()メソッドでは、はじめにHadoopジョブの実行に必要な設定やスレーブサーバで実処理を行う部品(クラス)群を指定した
org.apache.hadoop.mapred.JobConf
オブジェクトを作成します。その後、同オブジェクトの内容にしたがってJobClientからJobTrackerにHadoopジョブの実行が依頼されます。
例
MapReduceアプリケーション(抜粋)
public class BDPPApp extends Configured implements Tool { public int run(String[] args) throws Exception { Configuration defconf = getConf(); // Hadoopジョブのデフォルトのプロパティ JobConf conf = new JobConf(defconf, BDPPApp.class); // Hadoopジョブの定義 // アプリケーション固有のコマンドラインオプションの取り込み Path infile = new Path(args[1]); // 入力パス Path outdir = new Path(args[2]); // 出力パス // ジョブで使用するプロパティや部品群(クラス)の設定 conf.setJobName("bdpp-app"); // Hadoopジョブ名の設定 conf.setInputPath(infile); // 入力パスの設定 conf.setOutputPath(outdir); // 出力パスの設定 conf.setMapperClass(BDPPMapper.class); // Map処理を行うクラスの設定 conf.setReducerClass(BDPPReducer.class); // Reduce処理を行うクラスの設定 JobClient.runJob(conf); // JobTrackerへのHadoopジョブの実行依頼と、終了の待ち合わせ } public static void main(String[] args) throws Exception { // コマンドラインで指定されたプロパティの取り込みと、BDPPApp.run()の呼び出し int ret = ToolRunner.run(new Configuration(), new BDPPApp(), args); System.exit(ret); } }
さらに、スレーブサーバで実処理を行う部品(クラス)としてMap処理用クラス(Mapper)と、必要に応じてReduce処理用クラス(Reducer)を用意します。
Mapper/Reducerクラスは入出力や回収の方法を規定する
org.apache.hadoop.mapred.MapReduceBase
クラスを継承して
org.apache.hadoop.mapred.Mapper
org.apache.hadoop.mapred.Reducer
インターフェースを実装します。
Mapperは1件のKey-ValueデータをHadoopから入力し、1件または複数件のKey-ValueデータをHadoopに出力するように作成します。Mapperが出力したKey-ValueデータはHadoopによってソートされ、同一のKeyを持つデータが同じReducerに渡るよう制御されます。Reducerは1件のKeyとValueの集合(Iterator)をHadoopから入力し、1件または複数件のKey-ValueデータをHadoopに出力するように作成します。
KeyとValueを使用した実際のデータ処理(入力データから出力データを作成する処理)については、通常のJavaアプリケーションと同様に記述します。
例
Mapperクラス(抜粋)
public class BDPPMapper<K, V> extends MapReduceBase implements Mapper<K, V, K, V> { public void map(K key, V value, OutputCollector<K, V> output, Reporter reporter) throws IOException { // Hadoopからkeyとvalueを入力 // keyとvalueからkeyMとvalueMを生成する処理 // 例 : keyとvalue内の先頭データを連結したものをkeyMに設定 // value内の集計対象部分のみをvalueMに設定 output.collect(keyM, valueM); // HadoopにkeyMとvalueMを出力 } }
Reducerクラス(抜粋)
public class BDPPReducer<K, V> extends MapReduceBase implements Reducer<K, V, K, V> { public void reduce(K key, Iterator<V> values, OutputCollector<K, V> output, Reporter reporter) throws IOException { // Hadoopからkeyとvaluesを入力 while (values.hasNext()) { // keyとvaluesからkeyRとvalueRを生成する処理 // 例 : values.next()の合計値をvalueRに設定 } output.collect(keyR, valueR); // HadoopにkeyRとvalueRを出力 } }
参照
MapReduceアプリケーションやAPIの詳細については、以下を参照してください。
http://hadoop.apache.org/docs/r1.2.1/mapred_tutorial.html
http://hadoop.apache.org/docs/r1.2.1/api/