@H_502_6@
文章目录
@H_502_6@
环境
hadoop :1.0.0
java :1.8.0_171@H_502_6@ @H_502_33@启动haoop,并配置远程调试
指定远程调试监听端口8888
export HADOOP_CLIENT_OPTS="-agentlib:jdwp=transport=dt_socket,address=8888,server=y,suspend=y"@H_502_6@配置idea
在org.apache.hadoop.util.RunJar的main函数入口下打断点
选择第三部配置的remote.hadoop.然后单击蜘蛛图标
最终结果如下:
代码解析
runJar
Manifest manifest = jarFile.getManifest(); if (manifest != null) { mainClassName = manifest.getMainAttributes().getValue("Main-Class"); } jarFile.close();
File tmpDir = new File(new Configuration().get("hadoop.tmp.dir")); tmpDir.mkdirs(); if (!tmpDir.isDirectory()) { System.err.println("Mkdirs Failed to create " + tmpDir); System.exit(-1); } final File workdir = File.createTempFile("hadoop-unjar", "", tmpDir); workdir.delete(); workdir.mkdirs(); if (!workdir.isDirectory()) { System.err.println("Mkdirs Failed to create " + workdir); System.exit(-1); }
Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { try { FileUtil.fullyDelete(workdir); } catch (IOException e) { } } });
- 解压jar包,使用动态代理得到mainClass,然后并执行
ClassLoader loader = new urlclassloader(classpath.toArray(new URL[0])); Thread.currentThread().setContextClassLoader(loader); Class<?> mainClass = Class.forName(mainClassName, true, loader); Method main = mainClass.getmethod("main", new Class[] {Array.newInstance(String.class, 0).getClass()}); String[] newArgs = Arrays.asList(args).subList(firstArg, args.length).toArray(new String[0]); try { main.invoke(null, new Object[] { newArgs }); } catch (InvocationTargetException e) { throw e.getTargetException(); }
自定义mapreduce程序
public boolean waitForCompletion(boolean verbose ) throws IOException, InterruptedException, ClassNotFoundException { if (state == JobState.DEFINE) { submit(); } if (verbose) { jobClient.monitorAndPrintJob(conf, info); } else { info.waitForCompletion(); } return isSuccessful(); } public void submit() throws IOException, InterruptedException, ClassNotFoundException { ensureState(JobState.DEFINE); setUseNewAPI(); // Connect to the JobTracker and submit the job // connect()函数里初始化了 jobClient connect(); // 这里正是提交任务 info = jobClient.submitJobInternal(conf); super.setJobID(info.getID()); state = JobState.RUNNING; }
这个函数只做两件事,第一件事是提交任务,第二件事等待任务结束。
观察submit()的实现,程序依靠JobClient类来实现连接到集群和提交任务。@H_502_6@
- JobClient解析
public JobClient(JobConf conf) throws IOException { setConf(conf); // init()函数通过通过动态代理获得jobSubmitClient的代理对象 init(conf); }
构造函数里调用了init()方法,其实现如下:@H_502_6@
public void init(JobConf conf) throws IOException { String tracker = conf.get("mapred.job.tracker", "local"); tasklogtimeout = conf.getInt( TASKLOG_PULL_TIMEOUT_KEY, DEFAULT_TASKLOG_TIMEOUT); this.ugi = UserGroup@R_637_404[email protected](); if ("local".equals(tracker)) { conf.setNumMapTasks(1); // 使用本地模式 // LocalJobRunner实现了JobSubmissionProtocol接口,并没有调用JobTracker this.jobSubmitClient = new LocalJobRunner(conf); } else { // 使用非本地模式 this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf); } }
由init()函数可知:如果mapred.job.tracker的配置值是local或者没有配置,则jobSubmitClient的实例是一个本地已经实现的LocalJobRunner。如果不是local,则jobSubmitClient只是一个RPC客户端,真正的实现是在远程的JobTracker。@H_502_6@
- JobTracker解析
我们上面说到的东西我们画个图来总结一下:
@H_502_6@
@H_502_33@
- job 的 submit()函数调用connect函数
- connect函数创建jobclient
- jobclient对象在创建的时候,构造函数里调用init()函数
- init()函数根据配置信息选择创建LocalJobRunner还是Jobtracker的Rpc代理。
- job submit()函数通过函数jobclient的submitJobInternal提交任务。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。