Step 3 - 从Dump(User,Question)中提取我们需要的数据为了提取数据,我们将借助Hadoop来分配。首先要写一个简单的Mapper。就像前面提到过的,我们需要弄 清楚所有PostTypeId=2的文章中的{OwnerUserId,ParentId}。这是因为我们要之后要为推荐工作输入的 是{user,item}。基于此,首先要把Posts.XML加载到HDFS。你可以使用hadoop fs命令把本地文件复制 到指定的输入路径。

现在,是时候开始写一个用户映射来提取数据了。我们将使用Hadoop On Azure .NET SDK来写Mapduce任务。 不是我们在配置部分指明输入目录和输出目录。启动Visual Studio,创建一个C#控制台程序。如果你记得我之前 写的文章,你会知道hadoop fs是用来访问HDFS文件系统,当然如果你知道一些基本的*nix命令如 Is,cat等等 会更好。 注意:: (之前的文章) 忽略HDInsight前面部分,你可以理解更多关于Map Reduce模型和Hadoop on Azure。 你需要通过Nuget包管理器来安装Hadop SDK for .NET上的Hadoop Map Reduce包。 1 | install -package Microsoft.Hadoop.MapReduce |
有下面的代码,我们可以 具体如下: 002 | using System.Collections.Generic; |
003 | using System.Globalization; |
006 | using System.Xml.Linq; |
007 | using Microsoft.Hadoop.MapReduce; |
009 | namespace StackExtractor |
014 | public class UserQuestionsMapper : MapperBase |
016 | public override void Map( string inputLine, MapperContext context) |
020 | var obj = XElement.Parse(inputLine); |
021 | var postType = obj.Attribute( "PostTypeId" ); |
022 | if (postType != null && postType.Value == "2" ) |
024 | var owner = obj.Attribute( "OwnerUserId" ); |
025 | var parent = obj.Attribute( "ParentId" ); |
028 | if (owner != null && parent != null ) |
030 | context.EmitLine( string .Format( "{0},{1}" , owner.Value, parent.Value)); |
043 | public class UserQuestionsExtractionJob : HadoopJob |
045 | public override HadoopJobConfiguration Configure(ExecutorContext context) |
047 | var config = new HadoopJobConfiguration(); |
048 | config.DeleteOutputFolder = true ; |
049 | config.InputPath = "/input/Cooking" ; |
050 | config.OutputFolder = "/output/Cooking" ; |
062 | public static void Main() |
066 | var azureCluster = new Uri( "https://{yoururl}.azurehdinsight.net:563" ); |
067 | const string clusterUserName = "admin" ; |
068 | const string clusterPassword = "{yourpassword}" ; |
072 | const string hadoopUserName = "Hadoop" ; |
075 | const string azureStorageAccount = "{yourstorage}.blob.core.windows.net" ; |
076 | const string azureStorageKey = |
078 | const string azureStorageContainer = "{yourcontainer}" ; |
079 | const bool createContinerIfNotExist = true ; |
080 | Console.WriteLine( "Connecting : {0} " , DateTime.Now); |
082 | var hadoop = Hadoop.Connect(azureCluster, |
088 | azureStorageContainer, |
089 | createContinerIfNotExist); |
091 | Console.WriteLine( "Starting: {0} " , DateTime.Now); |
092 | var result = hadoop.MapReduceJob.ExecuteJob(); |
093 | var info = result.Info; |
095 | Console.WriteLine( "Done: {0} " , DateTime.Now); |
096 | Console.WriteLine( "\nInfo From Server\n----------------------" ); |
097 | Console.WriteLine( "StandardError: " + info.StandardError); |
098 | Console.WriteLine( "\n----------------------" ); |
099 | Console.WriteLine( "StandardOut: " + info.StandardOut); |
100 | Console.WriteLine( "\n----------------------" ); |
101 | Console.WriteLine( "ExitCode: " + info.ExitCode); |
105 | Console.WriteLine( "Error: {0} " , ex.StackTrace.ToString(CultureInfo.InvariantCulture)); |
107 | Console.WriteLine( "Press Any Key To Exit.." ); |
现在编译和运行上的程序。 执行工作(ExecuteJob)会上传所需的二进制文件到集群,并初始化一个Hadoop数 据流工作(Streaming Job),它会在集群上运行我们的映射(Mappers),并输入存储在输入文件夹中的Posts文件。 我们的控制台程序把任务提交到云端,并等待执行的结果。Hadoop SDK将更新映射-归并二进制文件到二进制容 器(blob)中,并组建所需命令行来执行任务(见之前写的理解如何手动实现的文章-)。你可以点击桌面快捷方式 中的Hadoop映射归并状态追踪来检查头结点中的任务。
|