直接上代码:mapper package com.winksi.dotstat;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.mapreduce.Mapper;public class DotProducedMapper extends MapperLongW
直接上代码:mapper
package com.winksi.dotstat;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Mapper;
public class DotProducedMapper extends Mapper<LongWritable, Text, Text, Text>{
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException{
String line = value.toString();
String fileds[] = line.split(",");
String imsi = fileds[0];
String adccompany = fileds[1];
String phone = fileds[2];
String cur_time = fileds[3];
String call_length = fileds[4];
String call_type = fileds[5];
String show_length = fileds[6];
String pkgname = fileds[7];
String ip = fileds[8];
String model = fileds[9];
String ossdk = fileds[10];
String av = fileds[11];
String cityId = fileds[12];
StringBuffer strKey = new StringBuffer();
strKey.append(model).append(",").append(adccompany).append(",")
.append(ossdk.toUpperCase()).append(",").append(av).append(",")
.append(phone);
context.write(new Text(strKey.toString()), new Text(imsi));
}
}
package com.winksi.dotstat;
import java.io.IOException;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class DotProducedReducer extends Reducer<Text, Text, Text, Text>{
public void reduce(Text key, Iterator<Text> values,
Context context)
throws IOException, InterruptedException {
int count = 0;
Set<String> set = new HashSet<String>();
while (values.hasNext()) {
set.add(values.next().toString());
count++;
}
StringBuffer sb = new StringBuffer();
sb.append(count).append(",").append(set.size());
Date yesterdayDate = new Date(new Date().getTime() - 1*24*60*60*1000);
String yesterday = DotStatPvUv.getCurrentDay(yesterdayDate);
StringBuffer sbKey = new StringBuffer();
sbKey.append(key.toString()).append(",").append(yesterday);
context.write(new Text(sbKey.toString()), new Text(sb.toString()));
}
}
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/hadoop"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/hadoop/spring-hadoop.xsd">
<context:property-placeholder location="classpath:config.properties" />
<configuration>
fs.default.name=${hd.fs}
</configuration>
<job id="dotProductJob"
input-path="${input.path}"
output-path="${output.path}"
mapper="com.winksi.dotstat.DotProducedMapper"
reducer="com.winksi.dotstat.DotProducedReducer"/>
<job-runner id="runner" run-at-startup="true"
job-ref="dotProductJob" />
</beans:beans> <dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-hadoop</artifactId>
<version>1.0.2.RELEASE-cdh4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>${hadoop.core.version}</version>
</dependency>public class Test {
public static void main(String[] args) {
ApplicationContext ctx = new ClassPathXmlApplicationContext("hadoop-mr.xml");
}
}
运行报错:
Exception in thread "main" java.io.IOException: Cannot run program "cygpath": CreateProcess error=2<span>, ?????μ??? at java.lang.ProcessBuilder.start(ProcessBuilder.java:</span>459<span>) at org.apache.hadoop.util.Shell.runCommand(Shell.java:</span>201<span>) at org.apache.hadoop.util.Shell.run(Shell.java:</span>183<span>) at org.apache.hadoop.fs.FileUtil$CygPathCommand.</span><init>(FileUtil.java:413<span>) at org.apache.hadoop.fs.FileUtil.makeShellPath(FileUtil.java:</span>439<span>) at org.apache.hadoop.fs.FileUtil.makeShellPath(FileUtil.java:</span>466<span>) at org.apache.hadoop.fs.RawLocalFileSystem.execCommand(RawLocalFileSystem.java:</span>559<span>) at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:</span>551<span>) at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:</span>355<span>) at org.apache.hadoop.fs.FilterFileSystem.mkdirs(FilterFileSystem.java:</span>212)
在windows开发服务器上安装好cygwin,然后在环境变量中添加cygwin的bin目录,比如“D:\Program\cygwin\bin”,问题得以解决。
注意是在系统环境变量的Path上加入“D:\Program\cygwin\bin”这样我们就行像在linux上运行命令一样在windows的dos窗口下执行ls,cd等命令。刚开始我一直是在系统环境变量下的classpath上加的D:\Program\cygwin\bin”,尝试了半天都是失败,后来才发现,是加错了地方。加完之后再cmd窗口中试一下ls,cd等命令。成功后重启myeclipse运行代码,就没有问题了。
------------------------------------------------------------------------------
之前纠结集成的Mapper和Reducer的问题,有两个文件可以继承,具体问题见:
http://wuyanzan60688.blog.163.com/blog/static/127776163201310164244955/
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号