博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
大数据教程(8.6)yarn客户端提交job的流程梳理和总结&自定义partition编程
阅读量:7100 次
发布时间:2019-06-28

本文共 12501 字,大约阅读时间需要 41 分钟。

hot3.png

        上一篇博客博主分享了mapreduce的并行原理,本篇博客将继续分享yarn客户端提交job的流程和自定义partition编程。

        一、yarn客户端提交job的流程

    98f0192cb806950fbeb45b86082b3ab7e2c.jpg

        二、自定义partition编程

               FlowBean(输出结果类)

package com.empire.hadoop.mr.provinceflow;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.Writable;public class FlowBean implements Writable {    private long upFlow;    private long dFlow;    private long sumFlow;    //反序列化时,需要反射调用空参构造函数,所以要显示定义一个    public FlowBean() {    }    public FlowBean(long upFlow, long dFlow) {        this.upFlow = upFlow;        this.dFlow = dFlow;        this.sumFlow = upFlow + dFlow;    }    public long getUpFlow() {        return upFlow;    }    public void setUpFlow(long upFlow) {        this.upFlow = upFlow;    }    public long getdFlow() {        return dFlow;    }    public void setdFlow(long dFlow) {        this.dFlow = dFlow;    }    public long getSumFlow() {        return sumFlow;    }    public void setSumFlow(long sumFlow) {        this.sumFlow = sumFlow;    }    /**     * 序列化方法     */    public void write(DataOutput out) throws IOException {        out.writeLong(upFlow);        out.writeLong(dFlow);        out.writeLong(sumFlow);    }    /**     * 反序列化方法 注意:反序列化的顺序跟序列化的顺序完全一致     */    public void readFields(DataInput in) throws IOException {        upFlow = in.readLong();        dFlow = in.readLong();        sumFlow = in.readLong();    }    public String toString() {        return upFlow + "\t" + dFlow + "\t" + sumFlow;    }}

               ProvincePartitioner (自定义分区类)

package com.empire.hadoop.mr.provinceflow;import java.util.HashMap;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;/** * K2 V2 对应的是map输出kv的类型 *  * @author */public class ProvincePartitioner extends Partitioner
{ public static HashMap
proviceDict = new HashMap
(); static { proviceDict.put("136", 0); proviceDict.put("137", 1); proviceDict.put("138", 2); proviceDict.put("139", 3); } @Override public int getPartition(Text key, FlowBean value, int numPartitions) { String prefix = key.toString().substring(0, 3); Integer provinceId = proviceDict.get(prefix); return provinceId == null ? 4 : provinceId; }}

               FlowCount(mapreduce主类)

package com.empire.hadoop.mr.provinceflow;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FlowCount {    static class FlowCountMapper extends Mapper
{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); //将一行内容转成string String[] fields = line.split("\t"); //切分字段 String phoneNbr = fields[1]; //取出手机号 long upFlow = Long.parseLong(fields[fields.length - 3]); //取出上行流量下行流量 long dFlow = Long.parseLong(fields[fields.length - 2]); context.write(new Text(phoneNbr), new FlowBean(upFlow, dFlow)); } } static class FlowCountReducer extends Reducer
{ //<183323,bean1><183323,bean2><183323,bean3><183323,bean4>....... @Override protected void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException { long sum_upFlow = 0; long sum_dFlow = 0; //遍历所有bean,将其中的上行流量,下行流量分别累加 for (FlowBean bean : values) { sum_upFlow += bean.getUpFlow(); sum_dFlow += bean.getdFlow(); } FlowBean resultBean = new FlowBean(sum_upFlow, sum_dFlow); context.write(key, resultBean); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); /* * conf.set("mapreduce.framework.name", "yarn"); * conf.set("yarn.resoucemanager.hostname", "mini1"); */ Job job = Job.getInstance(conf); /* job.setJar("/home/hadoop/wc.jar"); */ //指定本程序的jar包所在的本地路径 job.setJarByClass(FlowCount.class); //指定本业务job要使用的mapper/Reducer业务类 job.setMapperClass(FlowCountMapper.class); job.setReducerClass(FlowCountReducer.class); //指定我们自定义的数据分区器 job.setPartitionerClass(ProvincePartitioner.class); //同时指定相应“分区”数量的reducetask job.setNumReduceTasks(5); //指定mapper输出数据的kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); //指定最终输出的数据的kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //指定job的输入原始文件所在目录 FileInputFormat.setInputPaths(job, new Path(args[0])); //指定job的输出结果所在目录 FileOutputFormat.setOutputPath(job, new Path(args[1])); //将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行 /* job.submit(); */ boolean res = job.waitForCompletion(true); System.exit(res ? 0 : 1); }}

        三、运行jar包,并查看结果

#提交hadoop集群运行hadoop jar flowcount_patitioner_aaron.jar com.empire.hadoop.mr.provinceflow.FlowCount /user/hadoop/flowcount /flowcountpatitioner#查看输出结果目录hdfs dfs -ls /flowcountpatitioner#浏览输出结果hdfs dfs -cat /flowcountpatitioner/part-r-00000

               运行效果:

18/11/29 07:26:20 INFO client.RMProxy: Connecting to ResourceManager at centos-aaron-h1/192.168.29.144:803218/11/29 07:26:21 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.18/11/29 07:26:22 INFO input.FileInputFormat: Total input files to process : 518/11/29 07:26:22 INFO mapreduce.JobSubmitter: number of splits:518/11/29 07:26:22 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled18/11/29 07:26:23 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1543447570289_000118/11/29 07:26:24 INFO impl.YarnClientImpl: Submitted application application_1543447570289_000118/11/29 07:26:24 INFO mapreduce.Job: The url to track the job: http://centos-aaron-h1:8088/proxy/application_1543447570289_0001/18/11/29 07:26:24 INFO mapreduce.Job: Running job: job_1543447570289_000118/11/29 07:26:36 INFO mapreduce.Job: Job job_1543447570289_0001 running in uber mode : false18/11/29 07:26:36 INFO mapreduce.Job:  map 0% reduce 0%18/11/29 07:26:45 INFO mapreduce.Job:  map 20% reduce 0%18/11/29 07:27:02 INFO mapreduce.Job:  map 40% reduce 1%18/11/29 07:27:04 INFO mapreduce.Job:  map 100% reduce 1%18/11/29 07:27:05 INFO mapreduce.Job:  map 100% reduce 8%18/11/29 07:27:06 INFO mapreduce.Job:  map 100% reduce 60%18/11/29 07:27:07 INFO mapreduce.Job:  map 100% reduce 100%18/11/29 07:27:07 INFO mapreduce.Job: Job job_1543447570289_0001 completed successfully18/11/29 07:27:08 INFO mapreduce.Job: Counters: 50        File System Counters                FILE: Number of bytes read=4195                FILE: Number of bytes written=1986755                FILE: Number of read operations=0                FILE: Number of large read operations=0                FILE: Number of write operations=0                HDFS: Number of bytes read=11574                HDFS: Number of bytes written=594                HDFS: Number of read operations=30                HDFS: Number of large read operations=0                HDFS: Number of write operations=10        Job Counters                 Killed map tasks=1                Launched map tasks=6                Launched reduce tasks=5                Data-local map tasks=6                Total time spent by all maps in occupied slots (ms)=111307                Total time spent by all reduces in occupied slots (ms)=93581                Total time spent by all map tasks (ms)=111307                Total time spent by all reduce tasks (ms)=93581                Total vcore-milliseconds taken by all map tasks=111307                Total vcore-milliseconds taken by all reduce tasks=93581                Total megabyte-milliseconds taken by all map tasks=113978368                Total megabyte-milliseconds taken by all reduce tasks=95826944        Map-Reduce Framework                Map input records=110                Map output records=110                Map output bytes=3945                Map output materialized bytes=4315                Input split bytes=624                Combine input records=0                Combine output records=0                Reduce input groups=21                Reduce shuffle bytes=4315                Reduce input records=110                Reduce output records=21                Spilled Records=220                Shuffled Maps =25                Failed Shuffles=0                Merged Map outputs=25                GC time elapsed (ms)=3300                CPU time spent (ms)=5980                Physical memory (bytes) snapshot=1349332992                Virtual memory (bytes) snapshot=8470929408                Total committed heap usage (bytes)=689782784        Shuffle Errors                BAD_ID=0                CONNECTION=0                IO_ERROR=0                WRONG_LENGTH=0                WRONG_MAP=0                WRONG_REDUCE=0        File Input Format Counters                 Bytes Read=10950        File Output Format Counters                 Bytes Written=594

               处理结果:

[hadoop@centos-aaron-h1 ~]$ hdfs dfs -ls /flowcountpatitionerFound 6 items-rw-r--r--   2 hadoop supergroup          0 2018-11-29 07:27 /flowcountpatitioner/_SUCCESS-rw-r--r--   2 hadoop supergroup         58 2018-11-29 07:27 /flowcountpatitioner/part-r-00000-rw-r--r--   2 hadoop supergroup        113 2018-11-29 07:27 /flowcountpatitioner/part-r-00001-rw-r--r--   2 hadoop supergroup         24 2018-11-29 07:27 /flowcountpatitioner/part-r-00002-rw-r--r--   2 hadoop supergroup        112 2018-11-29 07:27 /flowcountpatitioner/part-r-00003-rw-r--r--   2 hadoop supergroup        287 2018-11-29 07:27 /flowcountpatitioner/part-r-00004[hadoop@centos-aaron-h1 ~]$ hdfs dfs -cat /flowcountpatitioner/part-r-0000013602846565     9690    14550   2424013660577991     34800   3450    38250[hadoop@centos-aaron-h1 ~]$ hdfs dfs -cat /flowcountpatitioner/part-r-0000113719199419     1200    0       120013726230503     12405   123405  13581013726238888     12405   123405  13581013760778710     600     600     1200[hadoop@centos-aaron-h1 ~]$ hdfs dfs -cat /flowcountpatitioner/part-r-0000213826544101     1320    0       1320[hadoop@centos-aaron-h1 ~]$ hdfs dfs -cat /flowcountpatitioner/part-r-0000313922314466     15040   18600   3364013925057413     55290   241215  29650513926251106     1200    0       120013926435656     660     7560    8220[hadoop@centos-aaron-h1 ~]$ hdfs dfs -cat /flowcountpatitioner/part-r-0000413480253104     900     900     180013502468823     36675   551745  58842013560436666     5580    4770    1035013560439658     10170   29460   3963015013685858     18295   17690   3598515920133257     15780   14680   3046015989002119     9690    900     1059018211575961     7635    10530   1816518320173382     47655   12060   5971584138413        20580   7160    27740

        四、最后总结

            (1)默认实现分区的类:HashPatitioner(分区效果是根据key的hashcode模reducetasks的启动数量后是几就落到几号分区,分区总数就等于numReduceTasks)

/**  * Partition keys by their {@link Object#hashCode()}.  */@InterfaceAudience.Public@InterfaceStability.Stablepublic class HashPartitioner
implements Partitioner
{ public void configure(JobConf job) {} /** Use {@link Object#hashCode()} to partition. */ public int getPartition(K2 key, V2 value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; }}

            (2)job提交后的切片是定的,但是启动的maptask数不一定就等于切片数;当maptask执行慢时,集群会以为它有问题,于是再启动一个maptask来执行这个慢的task对于的切片,两个一起跑看谁先跑完用谁的结果;(这就是推测执行)

            (3)当重写了分区后,一般我们要手动在代码中设置reducetask个数为分区数;但如果没有设置reducetask时,默认reducetask数会启动一个,此时是可以正常运行的,只是只能生成一个结果文件;当设置为大于1小于分区数的reducetask时会报错,因为它不知道有些数据应该入哪个区;当设置为大于分区数的reducetask时,程序能正常运行,只是大于分区的那几个reducetask不会收到数据,也不会产生结果。【建议由条件的小伙伴们验证下结论】

 

        最后寄语,以上是博主本次文章的全部内容,如果大家觉得博主的文章还不错,请点赞;如果您对博主其它服务器大数据技术或者博主本人感兴趣,请关注博主博客,并且欢迎随时跟博主沟通交流。

转载于:https://my.oschina.net/u/2371923/blog/2962789

你可能感兴趣的文章
5-4-3原则
查看>>
html图像入门
查看>>
C# Mongo Client 2.4.2创建索引
查看>>
我的第四个网页制作:列表标签
查看>>
【python进阶】详解元类及其应用2
查看>>
简单实用的菜单栏
查看>>
AMap行政区查询服务
查看>>
SpringBoot2.0源码分析(一):SpringBoot简单分析
查看>>
nginx一台服务器布置多个网站
查看>>
自定义String
查看>>
JAVA入门到精通-第25讲-泛型.异常
查看>>
15.回撤操作
查看>>
java 主流框架
查看>>
简单的增删改查
查看>>
iptables详解(8):iptables扩展模块之state扩展
查看>>
解决火狐访问(localhost)本地网站提示输入用户名密码
查看>>
ubuntu14.0.4.3 devstack 安装openstack
查看>>
String隐式共享
查看>>
[转载]Windows Phone
查看>>
pyspark 中启动 jupyter notebook
查看>>