目录
2. sequenceFileoutputFormat(不常用)
OutputFormat接口实现类
OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat接口。
几种常见的OutputFormat实现类:
1.文本输出TextoutputFormat
默认的输出格式是TextOutputFormat,它把每条记录写为文本行。它的键和值可以是任意类型,因为TextOutputFormat调用toString()方法把它们转换为字符串。
2. sequenceFileoutputFormat(不常用)
将SequenceFileOutputFormat输出作为后续 MapReduce任务的输入,这便是一种好的输出格式,因为它的格式紧凑,很容易被压缩。(一般用于后续不操作的文件)
3.自定义outputFormat
根据用户需求,自定义实现输出。
3.1使用场景
为了实现控制最终文件的输出路径和输出格式,可以自定义OutputFormat
例如:要在一个MapReduce程序中根据据的不同输出两类结果到不同目录,这类灵活的输出需求可以通过自定义OutputFormat来实现。
3.2自定义OutputFormat步骤
(1) 自定义一个类继承FileOutputFormat。
(2)改写RecordWriter,具体改写输出数据的方法write()
3.3自定义OutputFormat案例实操
1)需求
过滤输入的log日志,包含atguigu的网站输出到e:/atguigu.log,不包含atguigu的网站输出到e:/other.log。
2)需求分析
(1)输入数据
(2)期望输出数据
atguigu.log | others.log |
http://www.atguigu.com | http://www.baidu.com http://www.baidu.com http://www.google.com http://cn.bing.com http://www.sohu.com http://www.sina.com http://www.sin2a.com http://www.sin2desa.com http://www.sindsafa.com |
(3)自定义一个OutputFormat类
(1)创建一个类LogRecordWriter继迷承RecordWriter
(2)创建两个文件的输出流: atguiguOut、otherOut;如果输入数据包含atguigu,输出到atguiguOut流;如果不包含atguigu,输出到otherOut流
(4)驱动类Driver
要将自定义的输出格式组件设置到job中job.setOutputFonmat Class(LogOutputFormat.class);
3.4案例代码展示
(1)编写LogMapper
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
/**
* 核心处理方法
* @param key
* @param value
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 直接写出
context.write(value, NullWritable.get());
}
}
(2)编写LogReducer
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class LogReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
// 遍历直接写出
for (NullWritable value : values) {
context.write(key, NullWritable.get());
}
}
}
(3)编写LogDriver
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class LogDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(LogDriver.class);
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setNumReduceTasks(2);
// 指定自定义的OutputFormat类
job.setOutputFormatClass(LogOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path("F:\\in\\log"));
FileOutputFormat.setOutputPath(job, new Path("F:\\out\\log_out2"));
job.waitForCompletion(true);
}
}
(4)编写LogOutputFormat
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* 自定义的OutputFormat需要继承Hadoop提供的OutputFormat
*/
public class LogOutputFormat extends FileOutputFormat<Text, NullWritable> {
/**
* 返回RecordWriter
* @param job
* @return
* @throws IOException
* @throws InterruptedException
*/
public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
LogRecordWriter lrw = new LogRecordWriter(job);
return lrw;
}
}
(5)编写LogRecordWriter
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
/**
* 自定义LogRecordWriter 需要继承Hadoop提供的RecordWriter
*/
public class LogRecordWriter extends RecordWriter<Text, NullWritable> {
// 定义输出路径
private String atguiguPath = "F:\\logs\\atguigu.txt";
private String otherPath = "F:\\logs\\other.txt";
private FileSystem fs;
private FSDataOutputStream atguiguOut;
private FSDataOutputStream otherOut;
/**
* 初始化工作
* @param job
*/
public LogRecordWriter(TaskAttemptContext job) throws IOException {
// 获取Hadoop的文件系统对象
fs = FileSystem.get(job.getConfiguration());
// 获取输出流 atguiguOut
atguiguOut = fs.create(new Path(atguiguPath));
// 获取输出流otherOut
otherOut = fs.create(new Path(otherPath));
}
/**
* 实现数据写出的逻辑
* @param key
* @param value
* @throws IOException
* @throws InterruptedException
*/
public void write(Text key, NullWritable value) throws IOException, InterruptedException {
// 获取当前输入数据
String logData = key.toString();
if(logData.contains("atguigu")){
atguiguOut.writeBytes(logData + "\n");
}else {
otherOut.writeBytes(logData + "\n");
}
}
/**
* 关闭资源
* @param context
* @throws IOException
* @throws InterruptedException
*/
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
IOUtils.closeStream(atguiguOut);
IOUtils.closeStream(otherOut);
}
}
文章评论