自定义Hadoop OutputFormat输出格式
在许多场景中,默认输出格式和RecordWriter类并不最适合某些需求。 我们可以通过扩展FileOutputFormatClass类来创建自己的自定义输出类,并覆盖它的方法来实现我们的目标。 在本节中,我们将讨论如何创建自定义输出类和RecordWriter,并将这些类用作MapReduce程序。
自定义OutputFormat类,需要继承自FileOutputFormat类,实现getRecordWriter方法,并根据需要编写逻辑。 该方法主要包含了根据需要构建RecordWriter类的逻辑。
RecordWriter负责从Mapper或Reducer阶段将输出key-value对写入输出文件,所以我们可以通过扩展RecordWriter类来创建自定义RecordWriter类。
问题描述
我们知道TextOutputFormat的默认记录分隔符是NEW_LINE。 考虑这样一个场景,我们的mapper/reducer生成一个输出值,默认包含一些NEW_LINE。 但是我们想配置不同的记录分隔符,而不是NEW_LINE字符。
在本例中,为了更好地理解,我们使用了带有自定义输出格式类的单词计数程序。 我们将使用自定义配置属性mapreduce.output.textoutputformat.recordseparator设置自定义记录分隔符。
一、创建Java Maven项目
Maven依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>HadoopDemo</groupId>
<artifactId>com.xueai8</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!--hadoop依赖-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.1</version>
</dependency>
<!--hdfs文件系统依赖-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.3.1</version>
</dependency>
<!--MapReduce相关的依赖-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>3.3.1</version>
</dependency>
<!--junit依赖-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!--编译器插件用于编译拓扑-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<!--指定maven编译的jdk版本和字符集,如果不指定,maven3默认用jdk 1.5 maven2默认用jdk1.3-->
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source> <!-- 源代码使用的JDK版本 -->
<target>1.8</target> <!-- 需要生成的目标class文件的编译版本 -->
<encoding>UTF-8</encoding><!-- 字符集编码 -->
</configuration>
</plugin>
</plugins>
</build>
</project>
WordCountLineRecordWriter.java:
这是自定义自定义RecordWriter类,需要扩展RecordWriter<K, V>类。
package com.xueai8.customoutput;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
/**
*
* 自定义RecordWriter类
*/
public class WordCountLineRecordWriter<K, V> extends RecordWriter<K, V> {
protected DataOutputStream out;
private final byte[] recordSeprator; // 记录分隔符
private final byte[] fieldSeprator; // 字段分隔符
// 指定字段分隔符和记录分隔符
public WordCountLineRecordWriter(DataOutputStream out, String fieldSeprator,String recordSeprator) {
this.out = out;
this.fieldSeprator = fieldSeprator.getBytes(StandardCharsets.UTF_8);
this.recordSeprator = recordSeprator.getBytes(StandardCharsets.UTF_8);
}
// 使用默认的字段分隔符和记录分隔符
public WordCountLineRecordWriter(DataOutputStream out) {
this(out, "\t","\n");
}
private void writeObject(Object o) throws IOException {
if (o instanceof Text) {
Text to = (Text)o;
this.out.write(to.getBytes(), 0, to.getLength());
} else {
this.out.write(o.toString().getBytes(StandardCharsets.UTF_8));
}
}
public synchronized void write(K key, V value) throws IOException {
boolean nullKey = key == null || key instanceof NullWritable;
boolean nullValue = value == null || value instanceof NullWritable;
if (!nullKey || !nullValue) {
if (!nullKey) {
this.writeObject(key);
}
if (!nullKey && !nullValue) {
this.out.write(this.fieldSeprator);
}
if (!nullValue) {
this.writeObject(value);
}
this.out.write(recordSeprator); // 写出自定义的记录分隔符而不是NEW_LINE
}
}
public synchronized void close(TaskAttemptContext context) throws IOException {
this.out.close();
}
}
WordCountOutputFormat.java:
自定义Hadoop OutputFormat类,来格式化输出结果。
package com.xueai8.customoutput;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.DataOutputStream;
import java.io.IOException;
public class WordCountOutputFormat<K,V> extends FileOutputFormat<K, V> {
public static String FIELD_SEPARATOR = "mapreduce.output.textoutputformat.separator";
public static String RECORD_SEPARATOR = "mapreduce.output.textoutputformat.recordseparator";
@Override
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
Configuration conf = job.getConfiguration();
boolean isCompressed = getCompressOutput(job);
// 指定字段分隔符,默认是 \t
String fieldSeprator = conf.get(FIELD_SEPARATOR, "\t");
// 指定记录分隔符,默认是 \n
String recordSeprator = conf.get(RECORD_SEPARATOR, "\n");
// 压缩输出逻辑
CompressionCodec codec = null;
String extension = "";
if (isCompressed) {
Class codecClass = getOutputCompressorClass(job, GzipCodec.class);
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
extension = codec.getDefaultExtension();
}
Path file = this.getDefaultWorkFile(job, extension);
FileSystem fs = file.getFileSystem(conf);
FSDataOutputStream fileOut = fs.create(file, false);
if(isCompressed){
return new WordCountLineRecordWriter<>(new DataOutputStream(codec.createOutputStream(fileOut)), fieldSeprator, recordSeprator);
}else{
return new WordCountLineRecordWriter<>(fileOut, fieldSeprator, recordSeprator);
}
}
}
WordCountMapper.java:
Mapper类。
package com.xueai8.customoutput;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.StringTokenizer;
public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
/*
* 1. 转换为小写
* 2. 将所有标点符号替换为空格
* 3. 输入行分词
* 4. 将其写入HDFS
* */
String line = value.toString().toLowerCase().replaceAll("\\p{Punct}"," ");
StringTokenizer st = new StringTokenizer(line," ");
while(st.hasMoreTokens()){
word.set(st.nextToken());
context.write(word,one);
}
}
}
WordCountReducer.java:
Reducer类。
package com.xueai8.customoutput;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for(IntWritable value : values){
sum = sum + value.get();
}
context.write(key, new IntWritable(sum));
}
}
WordCountDriver.java:
驱动程序类。注意这里我们使用了ToolRunner接口。
package com.xueai8.customoutput;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class WordCountDriver extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(), new WordCountDriver(), args);
System.exit(exitCode);
}
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out.println("执行程序需要提供两个参数:");
System.out.println("[ 1 ] 输入路径");
System.out.println("[ 2 ] 输出路径");
return -1;
}
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
Path input = new Path(otherArgs[0]);
Path output = new Path(otherArgs[1]);
/*
* 取消下面三行注释以启用map reduce作业的本地调试
*
*/
/*
conf.set("fs.defaultFS", "local");
conf.set("mapreduce.job.maps","1");
conf.set("mapreduce.job.reduces","1");
*/
Job job = Job.getInstance(conf,"Hadoop Example");
job.setJarByClass(WordCountDriver.class);
// set mapper
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// set reducer
job.setReducerClass(WordCountReducer.class);
job.setNumReduceTasks(1);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// set input format and output format
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(WordCountOutputFormat.class);
// Custom record separator, set in job configuration
job.getConfiguration().set("mapreduce.output.textoutputformat.recordseparator","<==>");
// 设置字段分隔符,而不是默认的\t字符
job.getConfiguration().set("mapreduce.output.textoutputformat.separator",";");
FileInputFormat.addInputPath(job, input);
FileOutputFormat.setOutputPath(job, output);
job.setSpeculativeExecution(false); // 关闭此作业的投机执行(即推测执行机制)
boolean success = job.waitForCompletion(true);
return (success?0:1);
}
}
二、配置log4j
在src/main/resources目录下新增log4j的配置文件log4j.properties,内容如下:
log4j.rootLogger = info,stdout
### 输出信息到控制抬 ###
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n
三、项目打包
打开IDEA下方的终端窗口terminal,执行"mvn clean package"打包命令,如下图所示:
如果一切正常,会提示打jar包成功。如下图所示:
这时查看项目结构,会看到多了一个target目录,打好的jar包就位于此目录下。如下图所示:
四、项目部署
请按以下步骤执行。
1、启动HDFS集群和YARN集群。在Linux终端窗口中,执行如下的脚本:
$ start-dfs.sh $ start-yarn.sh
查看进程是否启动,集群运行是否正常。在Linux终端窗口中,执行如下的命令:
$ jps
这时应该能看到有如下5个进程正在运行,说明集群运行正常:
5542 NodeManager
5191 SecondaryNameNode
4857 NameNode
5418 ResourceManager
4975 DataNode
2、先在本地创建一个输入数据文件word.txt,并编辑内容如下:
Hello World Bye World Hello Hadoop Bye Hadoop Bye Hadoop Hello Hadoop hello, every one!
3、将数据文件word.txt上传到HDFS的/data/mr/目录下。
$ hdfs dfs -mkdir -p /data/mr $ hdfs dfs -put word.txt /data/mr/ $ hdfs dfs -ls /data/mr/
4、提交作业到Hadoop集群上运行。(如果jar包在Windows下,请先拷贝到Linux中。)
在终端窗口中,执行如下的作业提交命令:
$ hadoop jar HadoopDemo-1.0-SNAPSHOT.jar com.xueai8.customoutput.WordCountDriver /data/mr /data/mr-output
5、查看输出结果。
在终端窗口中,执行如下的HDFS命令,查看输出结果:
$ hdfs dfs -ls /data/mr-output $ hdfs dfs -cat /data/mr-output/part-r-00000
可以看到最后的统计结果如下:
bye;3<==>every;1<==>hadoop;4<==>hello;4<==>one;1<==>world;2<==>