ページの先頭行へ戻る
Interstage Big Data Parallel Processing Server V1.2.0 ユーザーズガイド
FUJITSU Software

17.6.1 MapReduceアプリケーションの概要

MapReduceアプリケーションはjarファイルとして作成します。一般的には、

インターフェースにしたがってクラスを実装します。Hadoopからそのクラスのmain()メソッドが呼び出されることでJobClientが起動されます。


main()メソッドでは、コマンドラインオプションとして指定されたプロパティ(-Dで指定)を取り込むために

を介して、run()メソッドを実行します。


run()メソッドでは、はじめにHadoopジョブの実行に必要な設定やスレーブサーバで実処理を行う部品(クラス)群を指定した

オブジェクトを作成します。その後、同オブジェクトの内容にしたがってJobClientからJobTrackerにHadoopジョブの実行が依頼されます。


MapReduceアプリケーション(抜粋)

public class BDPPApp extends Configured implements Tool {

    public int run(String[] args) throws Exception {
        Configuration defconf = getConf(); // Hadoopジョブのデフォルトのプロパティ
        JobConf conf = new JobConf(defconf, BDPPApp.class); // Hadoopジョブの定義
        
        // アプリケーション固有のコマンドラインオプションの取り込み
        Path infile = new Path(args[1]); // 入力パス
        Path outdir = new Path(args[2]); // 出力パス
        
        // ジョブで使用するプロパティや部品群(クラス)の設定
        conf.setJobName("bdpp-app"); // Hadoopジョブ名の設定
        conf.setInputPath(infile);   // 入力パスの設定
        conf.setOutputPath(outdir);  // 出力パスの設定
        conf.setMapperClass(BDPPMapper.class);   // Map処理を行うクラスの設定
        conf.setReducerClass(BDPPReducer.class); // Reduce処理を行うクラスの設定
        
        JobClient.runJob(conf); // JobTrackerへのHadoopジョブの実行依頼と、終了の待ち合わせ
    }
    
    public static void main(String[] args) throws Exception {
        // コマンドラインで指定されたプロパティの取り込みと、BDPPApp.run()の呼び出し
        int ret = ToolRunner.run(new Configuration(), new BDPPApp(), args);
        System.exit(ret);
    }
}

さらに、スレーブサーバで実処理を行う部品(クラス)としてMap処理用クラス(Mapper)と、必要に応じてReduce処理用クラス(Reducer)を用意します。


Mapper/Reducerクラスは入出力や回収の方法を規定する

クラスを継承して

インターフェースを実装します。


Mapperは1件のKey-ValueデータをHadoopから入力し、1件または複数件のKey-ValueデータをHadoopに出力するように作成します。Mapperが出力したKey-ValueデータはHadoopによってソートされ、同一のKeyを持つデータが同じReducerに渡るよう制御されます。Reducerは1件のKeyとValueの集合(Iterator)をHadoopから入力し、1件または複数件のKey-ValueデータをHadoopに出力するように作成します。


KeyとValueを使用した実際のデータ処理(入力データから出力データを作成する処理)については、通常のJavaアプリケーションと同様に記述します。

Mapperクラス(抜粋)

public class BDPPMapper<K, V> extends MapReduceBase implements Mapper<K, V, K, V> {

    public void map(K key, V value, OutputCollector<K, V> output, Reporter reporter)
    throws IOException {              // Hadoopからkeyとvalueを入力
        
        // keyとvalueからkeyMとvalueMを生成する処理
        // 例 : keyとvalue内の先頭データを連結したものをkeyMに設定
        //      value内の集計対象部分のみをvalueMに設定
        output.collect(keyM, valueM); // HadoopにkeyMとvalueMを出力
    }
}

Reducerクラス(抜粋)

public class BDPPReducer<K, V> extends MapReduceBase implements Reducer<K, V, K, V> {

    public void reduce(K key, Iterator<V> values, OutputCollector<K, V> output, Reporter reporter)
    throws IOException {               // Hadoopからkeyとvaluesを入力
        
        while (values.hasNext()) {
            // keyとvaluesからkeyRとvalueRを生成する処理
            // 例 : values.next()の合計値をvalueRに設定
        }
        output.collect(keyR, valueR); // HadoopにkeyRとvalueRを出力
    }
}

参照

MapReduceアプリケーションやAPIの詳細については、以下を参照してください。

  • http://hadoop.apache.org/docs/r1.2.1/mapred_tutorial.html

  • http://hadoop.apache.org/docs/r1.2.1/api/