MapReduceアプリケーションはjarファイルとして作成します。一般的には、
org.apache.hadoop.util.Tool
インターフェースにしたがってクラスを実装します。Hadoopからそのクラスのmain()メソッドが呼び出されることでJobClientが起動されます。
main()メソッドでは、コマンドラインオプションとして指定されたプロパティ(-Dで指定)を取り込むために
org.apache.hadoop.util.ToolRunner
を介して、run()メソッドを実行します。
run()メソッドでは、はじめにHadoopジョブの実行に必要な設定やスレーブサーバで実処理を行う部品(クラス)群を指定した
org.apache.hadoop.mapred.JobConf
オブジェクトを作成します。その後、同オブジェクトの内容にしたがってJobClientからJobTrackerにHadoopジョブの実行が依頼されます。
例
MapReduceアプリケーション(抜粋)
public class BDPPApp extends Configured implements Tool {
public int run(String[] args) throws Exception {
Configuration defconf = getConf(); // Hadoopジョブのデフォルトのプロパティ
JobConf conf = new JobConf(defconf, BDPPApp.class); // Hadoopジョブの定義
// アプリケーション固有のコマンドラインオプションの取り込み
Path infile = new Path(args[1]); // 入力パス
Path outdir = new Path(args[2]); // 出力パス
// ジョブで使用するプロパティや部品群(クラス)の設定
conf.setJobName("bdpp-app"); // Hadoopジョブ名の設定
conf.setInputPath(infile); // 入力パスの設定
conf.setOutputPath(outdir); // 出力パスの設定
conf.setMapperClass(BDPPMapper.class); // Map処理を行うクラスの設定
conf.setReducerClass(BDPPReducer.class); // Reduce処理を行うクラスの設定
JobClient.runJob(conf); // JobTrackerへのHadoopジョブの実行依頼と、終了の待ち合わせ
}
public static void main(String[] args) throws Exception {
// コマンドラインで指定されたプロパティの取り込みと、BDPPApp.run()の呼び出し
int ret = ToolRunner.run(new Configuration(), new BDPPApp(), args);
System.exit(ret);
}
}さらに、スレーブサーバで実処理を行う部品(クラス)としてMap処理用クラス(Mapper)と、必要に応じてReduce処理用クラス(Reducer)を用意します。
Mapper/Reducerクラスは入出力や回収の方法を規定する
org.apache.hadoop.mapred.MapReduceBase
クラスを継承して
org.apache.hadoop.mapred.Mapper
org.apache.hadoop.mapred.Reducer
インターフェースを実装します。
Mapperは1件のKey-ValueデータをHadoopから入力し、1件または複数件のKey-ValueデータをHadoopに出力するように作成します。Mapperが出力したKey-ValueデータはHadoopによってソートされ、同一のKeyを持つデータが同じReducerに渡るよう制御されます。Reducerは1件のKeyとValueの集合(Iterator)をHadoopから入力し、1件または複数件のKey-ValueデータをHadoopに出力するように作成します。
KeyとValueを使用した実際のデータ処理(入力データから出力データを作成する処理)については、通常のJavaアプリケーションと同様に記述します。
例
Mapperクラス(抜粋)
public class BDPPMapper<K, V> extends MapReduceBase implements Mapper<K, V, K, V> {
public void map(K key, V value, OutputCollector<K, V> output, Reporter reporter)
throws IOException { // Hadoopからkeyとvalueを入力
// keyとvalueからkeyMとvalueMを生成する処理
// 例 : keyとvalue内の先頭データを連結したものをkeyMに設定
// value内の集計対象部分のみをvalueMに設定
output.collect(keyM, valueM); // HadoopにkeyMとvalueMを出力
}
}Reducerクラス(抜粋)
public class BDPPReducer<K, V> extends MapReduceBase implements Reducer<K, V, K, V> {
public void reduce(K key, Iterator<V> values, OutputCollector<K, V> output, Reporter reporter)
throws IOException { // Hadoopからkeyとvaluesを入力
while (values.hasNext()) {
// keyとvaluesからkeyRとvalueRを生成する処理
// 例 : values.next()の合計値をvalueRに設定
}
output.collect(keyR, valueR); // HadoopにkeyRとvalueRを出力
}
}参照
MapReduceアプリケーションやAPIの詳細については、以下を参照してください。
http://hadoop.apache.org/docs/r1.2.1/mapred_tutorial.html
http://hadoop.apache.org/docs/r1.2.1/api/