hadoop MapReduce运营商案例关于用户基站停留数据统计

如果需要文件和代码的话可评论区留言邮箱,我给你发源代码

本文来自博客园,作者:Arway,转载请注明原文链接:https://www.cnblogs.com/cenjw/p/hadoop-mapReduce-operator-case.html

实验要求

统计每个用户在不同时段中各个基站的停留时间。

1.功能描述

用户的手机,连接到不同的基站会产生一条记录。
数据格式为:用户标识 设备标识 基站位置 通讯的日期 通讯时间
example: 0000009999 0054785806 00000089 2016-02-21 21:55:37

需要得到的数据格式为:
用户标识 时段 基站位置 停留时间
example: 0000000001 09-18 00000003 15
用户0000000001在09-18点这个时间段在基站00000003停留了15分钟

2.实现思路

程序运行支持传入时间段,比如“09-18-24”,表示分为0点到9点,9点到18点,18点到24点三个时间段。

  • (1)Mapper阶段
    对输入的数据,算出它属于哪个时间段。
    k1:每行记录在文本中的偏移量。
    v2:一条记录
    k2用“用户ID,时间段”输出。
    v2用“基站位置,时间”。时间用unix time

  • (2)Reducer阶段
    对获取的v3(v3是一个集合,每个元素是v2,相当于按照k2对v2分组)进行排序,以时间升序排序。
    计算两两之间的时间间隔,保存到另一个集合中,两个不同的时间间隔中,从基站A移动到基站B,这样获取到在A基站的停留的时间。
    同理从基站B移动到基站C,基站C移动到基站D,依次类推,所有的时间都获取到。再把时间累加起来,就可以获取到总的时间。

本文来自博客园,作者:Arway,转载请注明原文链接:https://www.cnblogs.com/cenjw/p/hadoop-mapReduce-operator-case.html

代码实现

PhoneMain.java

package phoneMapReduce;  import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException;  /**  * Created by ue50 on 11/13/19.  */ public class PhoneMain {     public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException     {         //String.equals()比较字符串的值是否相同         if(args == null || "0".equals(args[0]))         {             throw new RuntimeException("argument is not right!");         }         //Configuration是作业的配置信息类         Configuration configuration = new Configuration();         //set(String name, String value)设置配置项         configuration.set("timeRange", args[0]);          Job job = Job.getInstance(configuration);         job.setJarByClass(PhoneMain.class);          job.setMapperClass(PhoneMapper.class);         job.setMapOutputKeyClass(Text.class);         job.setMapOutputKeyClass(Text.class);          job.setReducerClass(PhoneReducer.class);         job.setOutputKeyClass(Text.class);         job.setOutputValueClass(Text.class);          //FileInputFormat.setInputPaths(job, new Path("hdfs://xdata-m0:8020/user/ue50/pos.txt"));         //FileOutputFormat.setOutputPath(job, new Path("hdfs://xdata-m0:8020/user/ue50/out"));          FileInputFormat.setInputPaths(job, new Path(args[1]));         FileOutputFormat.setOutputPath(job, new Path(args[2]));          job.waitForCompletion(true);     } } 

Mapper阶段
PhoneMapper.java

package phoneMapReduce;  import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException;  /**  * Created by ue50 on 11/13/19.  */ public class PhoneMapper extends Mapper<LongWritable, Text, Text, Text> {     private int[] timeRangeList;     @Override     //setup()被MapReduce框架仅且执行一次,在执行Map任务前,进行相关变量或者资源的集中初始化工作     protected void setup(Context context) throws IOException,InterruptedException     {         //Configuration是作业的配置信息类,通过Configuration可以实现在多个mapper和多个reducer任务之间共享信息         Configuration configuration = context.getConfiguration();          //get(String name)根据配置项的键name获取相应的值         String timeRange = configuration.get("timeRange");//运行时传入的时间段,比如“09-18-24”         String[] timeRangeString = timeRange.split("-");          timeRangeList = new int[timeRangeString.length];         for(int i = 0; i < timeRangeString.length;i++)         {             //timeRangeList数组保存传入的时间,如:09、18、24             timeRangeList[i] = Integer.parseInt(timeRangeString[i]);         }     }      @Override     protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException     {         String values[] = value.toString().split("\s+");//对一条记录"用户标识 设备标识   基站位置 通讯的时间"按空格拆分         String userId = values[0];//用户标识         String baseStation = values[2];//基站位置         String timeString = values[4];//访问时间,如:21:55:37          String[] times = timeString.split(":");//对访问时间按':'拆分         int hour = Integer.parseInt(times[0]);//小时          //startHour、endHour时间段的起止时间         int startHour = 0;         int endHour = 0;         for(int i = 0; i < timeRangeList.length; i++)         {             if(hour < timeRangeList[i])             {                 if(i == 0)                 {                     startHour = 0;                 }                 else                 {                     startHour = timeRangeList[i-1];                 }                 endHour = timeRangeList[i];                 break;             }         }          if(startHour == 0 && endHour == 0)         {             return;         }          //k2:用户标识  时间段  v2:基站位置-访问时间         context.write(new Text(userId + "t" + startHour + "-" + endHour + "t"), new Text(baseStation + "-" + timeString));     } } 

Reducer阶段

package phoneMapReduce;  import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.*;  /**  * Created by ue50 on 11/13/19.  */ public class PhoneReducer extends Reducer<Text, Text, Text, LongWritable> {     @Override     protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException     {         List<String> valueList = new LinkedList<String>();//基于链表的动态数组          //Map是一种把键对象和值对象映射的集合,TreeMap是一个有序的key-value集合,         //它是通过红黑树实现的,TreeMap中的元素默认按照key的自然排序排列         Map<String, Long> residenceTimeMap = new TreeMap<String, Long>();          for(Text value : values)         {             String item = value.toString();             valueList.add(item);//"基站位置-访问时间"的集合         }          if(valueList == null || valueList.size() <= 1)         {             return;         }          //Comparator是比较器         //Collections.sort()方法中的自定义比较器,根据比较器的实现逻辑对valueList进行排序         Collections.sort(valueList, new Comparator<String>() {//匿名内部类             @Override             //重写比较器中的比较方法:compare方法             public int compare(String o1, String o2) {                 o1 = o1.split("-")[1];                 o2 = o2.split("-")[1];                 return o1.compareTo(o2);//根据访问时间对valueList排序,第一个参数.compareTo(第二个参数)升序             }         });           for(int i = 0;i < valueList.size()-1; i++)         {             String station = valueList.get(i).split("-")[0];//基站位置             String time1 = valueList.get(i).split("-")[1];//访问时间             String time2 = valueList.get(i + 1).split("-")[1];              //对日期/时间进行格式化,HH:24小时制             DateFormat dateFormat = new SimpleDateFormat("HH:hh:ss");             //Date对象用于处理日期与时间             Date date1 = null;             Date date2 = null;             try{                 date1 = dateFormat.parse(time1);//parse():把String型的字符串转换成特定格式的Date类型                 date2 = dateFormat.parse(time2);             }catch (ParseException e)             {                 e.printStackTrace();             }              //date1.before(date2),当date1小于date2时,返回TRUE,当大于等于时,返回false;             if(date1.before(date2))             {                 long time = date2.getTime() - date1.getTime();//getTime方法返回的是毫秒数                  Long count = residenceTimeMap.get(station);//返回key关联的值,没有值返回null                 if(count == null)                 {                     residenceTimeMap.put(station, time);//<基站位置,停留时间>                 }                 else                 {                     residenceTimeMap.put(station, count + time);//将停留时间累积                 }             }         }          valueList = null;          //TreeMap的keySet():以升序返回一个具有TreeMap键的Set视图         Set<String> keySet = residenceTimeMap.keySet();//keySet:<基站位置>         for(String mapKey : keySet)         {             long minute = residenceTimeMap.get(mapKey);//停留时间毫秒             minute = minute/1000/60;//分钟             //minute = minute/1000;//秒              context.write(new Text(key +"t" + mapKey +"t"), new LongWritable(minute));         }          residenceTimeMap = null;     } } 

如果需要文件和代码的话可评论区留言邮箱,我给你发源代码

发表评论

相关文章