Yarn的安装与基础
一 配置Yarn
1.修改yarn-site.xml
在hadoop/etc/hadoop/yarn-site.xml
添加如下内容:
<!-- resourcemanager主节点所在机器 -->
<property>
<name>yarn.resourcemanager.hostname</name>
<value>linux01</value>
</property>
<!-- 为mr程序提供shuffle服务 http下载-->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<!-- 一台NodeManager的总可用内存资源 -->
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>4096</value>
</property>
<!-- 一台NodeManager的总可用(逻辑)cpu核数 -->
<property>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>4</value>
</property>
<!-- 是否检查容器的虚拟内存使用超标情况 -->
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
<!-- 容器的虚拟内存使用上限:与物理内存的比率 -->
<property>
<name>yarn.nodemanager.vmem-pmem-ratio</name>
<value>2.1</value>
</property>
2.修改启停脚本
在start-yarn.sh
和stop-yarn.sh
中添加:
YARN_RESOURCEMANAGER_USER=root
HADOOP_SECURE_DN_USER=yarn
YARN_NODEMANAGER_USER=root
3.在IDEA中修改
在IDEA中的resources
目录中添加 mapred-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<!-- Put site-specific property overrides in this file. -->
<configuration>
<property>
<name>yarn.app.mapreduce.am.env</name>
<value>HADOOP_MAPRED_HOME=/opt/apps/hadoop-3.1.1</value>
</property>
<property>
<name>mapreduce.map.env</name>
<value>HADOOP_MAPRED_HOME=/opt/apps/hadoop-3.1.1</value>
</property>
<property>
<name>mapreduce.reduce.env</name>
<value>HADOOP_MAPRED_HOME=/opt/apps/hadoop-3.1.1</value>
</property>
</configuration>
二 运行Yarn
在使用前记得启动yarn服务
1.通过Windows运行
代码如下:
package cn.doit.ab.day1120.demo02;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static void main(String[] args) throws Exception {
System.setProperty("HADOOP_USER_NAME", "root");
//创建配置对象
Configuration con = new Configuration();
// 设置访问的集群的位置
con.set("fs.defaultFS", "hdfs://Linux01:8020");
// 设置yarn的位置
con.set("mapreduce.framework.name", "yarn");
// yarn的resourcemanager的位置
con.set("yarn.resourcemanager.hostname", "Linux01");
// 设置MapReduce程序运行在windows上的跨平台参数
con.set("mapreduce.app-submission.cross-platform","true");
//创建job对象,执行程序的本体
Job job = Job.getInstance(con,"wc");
job.setJar("C:\\Users\\刘宾\\Desktop\\wc.jar");
//为job添加mapper方法和reducer方法
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReduce.class);
//设置输出格式
// job.setMapOutputKeyClass(Text.class);
// job.setMapOutputValueClass(IntWritable.class);
job.setoutputKeyClass(Text.class);
job.setoutputValueClass(IntWritable.class);
//设置reduce数量
job.setNumReduceTasks(1);
//设置数据源和接收位置
FileInputFormat.setInputPaths(job,new Path("/a/b"));
FileOutputFormat.setoutputPath(job,new Path("/a/res"));
//开始执行
boolean b = job.waitForCompletion(true);
}
}
map:
package cn.doit.ab.day1120.demo02;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
Text k = new Text();
IntWritable v = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String s = value.toString();
String[] ss = s.split("\\s+");
for (String s1 : ss) {
k.set(s1);
context.write(k,v);
}
}
}
reduce:
package cn.doit.ab.day1120.demo02;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountReduce extends Reducer<Text, IntWritable,Text,IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count=0;
for (IntWritable value : values) {
count++;
}
context.write(key,new IntWritable(count));
}
}
2.通过Linux运行
代码如下:
package cn.doit.ab.day1120.demo01;
import cn.doit.ab.day1118.demo01.Skew;
import cn.doit.ab.day1120.demo02.WordCountMapper;
import cn.doit.ab.day1120.demo02.WordCountReduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.*;
import java.net.URI;
import java.util.HashMap;
public class Order {
static class OrderMapper extends Mapper<LongWritable,Text, Text,Text>{
HashMap<String, String> uidMap = new HashMap<>();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
BufferedReader reader = new BufferedReader(new FileReader("user.txt"));
String s =null;
while ((s = reader.readLine()) != null) {
String[] split = s.split(",");
uidMap.put(split[0],s);
}
}
Text k = new Text();
Text v = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String s = value.toString();
String[] split = s.split(",");
String uid = split[1];
k.set(uid);
String user = uidMap.getorDefault(uid, "null,null,null,null,null");
String order = split[0]+","+user;
v.set(order);
context.write(k,v);
}
}
/**
* 在Linux中运行
* @param args
*/
public static void main(String[] args) throws Exception{
//创建配置对象
Configuration con = new Configuration();
// 设置yarn的位置
con.set("mapreduce.framework.name", "yarn");
// yarn的resourcemanager的位置
con.set("yarn.resourcemanager.hostname", "Linux01");
//创建job对象,执行程序的本体
Job job = Job.getInstance(con,"LiuBin");
job.setJarByClass(Order.class);
//为job添加mapper方法和reducer方法
job.setMapperClass(OrderMapper.class);
URI uri = new URI("hdfs://linux01:8020/anlI/Order1/input/uid/user.txt");
job.addCacheFile(uri);
//设置输出格式
// job.setMapOutputKeyClass(Text.class);
// job.setMapOutputValueClass(IntWritable.class);
job.setoutputKeyClass(Text.class);
job.setoutputValueClass(Text.class);
//设置reduce数量
job.setNumReduceTasks(1);
//设置数据源和接收位置
FileInputFormat.setInputPaths(job,new Path("/anlI/Order1/input/oid"));
FileOutputFormat.setoutputPath(job,new Path("/anlI/Order1/res"));
//开始执行
boolean b = job.waitForCompletion(true);
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。