Hadoop3 - MapReduce DB 操作
创始人
2024-03-31 23:15:37
0

一、MapReduce DB 操作

对于本专栏的前面几篇文章的操作,基本都是读取本地或 HDFS 中的文件,如果有的数据是存在 DB 中的我们要怎么处理呢?

Hadoop 为我们提供了 DBInputFormatDBOutputFormat 两个类。顾名思义 DBInputFormat 负责从数据库中读取数据,DBOutputFormat负责把数据最终写入数据库中。

不过如果要把数据库内容映射成对象实体,还需要该实体实现 DBWritable 接口,其中 readFields 方法用来指定获取数据库中的字段,write 方法用于指定写入数据库字段。

下面还是使用本专栏上几篇文章所使用的COVID-19 案例进行试验,首先将文本类型的数据集导入 Mysql 数据库中,然后读取表信息作为数据集分析每个州的 casesdeaths 的总数,并将计算结果写入 Mysql

COVID-19 案例地址:

https://blog.csdn.net/qq_43692950/article/details/127475811

二、文本类型的数据集导入 Mysql 数据库

首先在 Mysql 中创建表:

CREATE TABLE `covid_input` (`id` int NOT NULL AUTO_INCREMENT,`date` datetime DEFAULT NULL,`county` varchar(255) DEFAULT NULL,`state` varchar(255) DEFAULT NULL,`fips` varchar(255) DEFAULT NULL,`cases` int DEFAULT NULL,`deaths` int DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

创建对象实体类,实现 WritableComparableDBWritable

@Data
@AllArgsConstructor
@NoArgsConstructor
public class CountEntity implements WritableComparable, DBWritable {private String date; //日期private String county; // 县private String state; // 州private String fips; // 县编码codeprivate Long cases;//确诊病例数private Long deaths;//死亡病例数/*** 序列化方法*/@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(date);out.writeUTF(county);out.writeUTF(state);out.writeUTF(fips);out.writeLong(cases);out.writeLong(deaths);}/*** 反序列化方法 注意顺序*/@Overridepublic void readFields(DataInput in) throws IOException {this.date = in.readUTF();this.county = in.readUTF();this.state = in.readUTF();this.fips = in.readUTF();this.cases = in.readLong();this.deaths = in.readLong();}/*** 指定写入 DB 中的字段,parameterIndex对应 DBOutputFormat.setOutput 中指定的 fieldNames*/@Overridepublic void write(PreparedStatement preparedStatement) throws SQLException {preparedStatement.setString(1, date);preparedStatement.setString(2, county);preparedStatement.setString(3, state);preparedStatement.setString(4, fips);preparedStatement.setLong(5, cases);preparedStatement.setLong(6, deaths);}/***  从数据库读取字段信息,由于是读取的文本文件写入 Mysql,没有读取 DB*/@Overridepublic void readFields(ResultSet resultSet) throws SQLException {}//排序规则,根据首先根据 state 正序排列,然后根据 deaths 倒序排序@Overridepublic int compareTo(CountEntity o) {int i = this.state.compareTo(o.getState());if (i == 0) {return this.deaths.equals(o.getDeaths()) ? 0 : (this.deaths - o.getDeaths() > 0 ? -1 : 1);}return i;}}

编写 Mapper 类,由于这里无需聚合分组操作,可以不做 Reduces ,直接去 Mapper 的输出结果到 Mysql 即可,因此这里 key 输出实体对象,ValueNull 占位

@Slf4j
public class DBMapper extends Mapper {CountEntity outValue = new CountEntity();NullWritable outKey = NullWritable.get();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] fields = value.toString().split(",");if (fields.length >= 6){outValue.setDate(fields[0]);outValue.setCounty(fields[1]);outValue.setState(fields[2]);outValue.setFips(fields[3]);outValue.setCases(Long.parseLong(fields[4]));outValue.setDeaths(Long.parseLong(fields[5]));context.write(outValue, outKey);}}
}

最后编写驱动类,声明输出表及字段:

public class DBDriver extends Configured implements Tool {public static void main(String[] args) throws Exception{Configuration conf = new Configuration();//配置当前作业需要使用的JDBC信息DBConfiguration.configureDB(conf,"com.mysql.cj.jdbc.Driver","jdbc:mysql://127.0.0.1:3306/mapreduces?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT","root","root");int status = ToolRunner.run(conf, new DBDriver(), args);System.exit(status);}@Overridepublic int run(String[] args) throws Exception {Job job = Job.getInstance(getConf(), DBDriver.class.getSimpleName());// 设置作业驱动类job.setJarByClass(DBDriver.class);// 设置作业mapper reducer类job.setMapperClass(DBMapper.class);// 设置作业mapper阶段输出key value数据类型job.setMapOutputKeyClass(CountEntity.class);job.setMapOutputValueClass(NullWritable.class);//这里无需Recucesjob.setNumReduceTasks(0);// 配置作业的输入数据路径FileInputFormat.addInputPath(job, new Path("D:/test/input"));// 配置作业的输出job.setOutputFormatClass(DBOutputFormat.class);DBOutputFormat.setOutput(job,"covid_input","date", "county", "state", "fips", "cases","deaths");return job.waitForCompletion(true)? 0:1;}
}

执行驱动类:

在这里插入图片描述
执行成功后,到 Mysql 查看结果:

在这里插入图片描述

已经写入成功,下面基于该表统计每个 state 州 的 casesdeaths 总数。

三、计算各个州的累积cases、deaths

现在和上面不同的是,输入和输出都是DB,首先创建结果输出表:

CREATE TABLE `covid_output` (`id` int NOT NULL AUTO_INCREMENT,`state` varchar(255) DEFAULT NULL,`cases` int DEFAULT NULL,`deaths` int DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

修改 CountEntity 实体,指定读取和输出 DB 字段:

@Data
@AllArgsConstructor
@NoArgsConstructor
public class CountEntity implements WritableComparable, DBWritable {private String date; //日期private String county; // 县private String state; // 州private String fips; // 县编码codeprivate Long cases;//确诊病例数private Long deaths;//死亡病例数/*** 序列化方法*/@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(date);out.writeUTF(county);out.writeUTF(state);out.writeUTF(fips);out.writeLong(cases);out.writeLong(deaths);}/*** 反序列化方法 注意顺序*/@Overridepublic void readFields(DataInput in) throws IOException {this.date = in.readUTF();this.county = in.readUTF();this.state = in.readUTF();this.fips = in.readUTF();this.cases = in.readLong();this.deaths = in.readLong();}/*** 由于输出covid_output表字段*/@Overridepublic void write(PreparedStatement preparedStatement) throws SQLException {preparedStatement.setString(1, state);preparedStatement.setLong(2, cases);preparedStatement.setLong(3, deaths);}/*** 读取covid_input 表中的字段*/@Overridepublic void readFields(ResultSet resultSet) throws SQLException {this.date = resultSet.getString("date");this.county = resultSet.getString("county");this.state = resultSet.getString("state");this.fips = resultSet.getString("fips");this.cases = resultSet.getLong("cases");this.deaths = resultSet.getLong("deaths");}//排序规则,根据首先根据 state 正序排列,然后根据 deaths 倒序排序@Overridepublic int compareTo(CountEntity o) {int i = this.state.compareTo(o.getState());if (i == 0) {return this.deaths.equals(o.getDeaths()) ? 0 : (this.deaths - o.getDeaths() > 0 ? -1 : 1);}return i;}}

编写 Mapper 类,将 state 最为 keyvalue 行数据输出至 Reduces 中

public class DBMapper extends Mapper {Text outKey = new Text();@Overrideprotected void map(LongWritable key, CountEntity value, Context context) throws IOException, InterruptedException {outKey.set(value.getState());context.write(outKey, value);}
}

Reduces 中对 casesdeaths 进行求和,key 即为输出的结果:

public class DBReducer extends Reducer {CountEntity outKey = new CountEntity();NullWritable outValue = NullWritable.get();@Overrideprotected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {long totalCases = 0;long totalDeaths = 0;for (CountEntity value : values) {totalCases += value.getCases();totalDeaths += value.getDeaths();}outKey.setState(key.toString());outKey.setCases(totalCases);outKey.setDeaths(totalDeaths);context.write(outKey, outValue);}
}

最后编写驱动类,指定输入输出方式:

public class DBDriver extends Configured implements Tool {public static void main(String[] args) throws Exception {//配置文件对象Configuration conf = new Configuration();//配置当前作业需要使用的JDBC信息DBConfiguration.configureDB(conf,"com.mysql.cj.jdbc.Driver","jdbc:mysql://127.0.0.1:3306/mapreduces?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT","root","root");conf.set("mapreduce.framework.name", "local");int status = ToolRunner.run(conf, new DBDriver(), args);System.exit(status);}@Overridepublic int run(String[] args) throws Exception {// 创建作业实例Job job = Job.getInstance(getConf(), DBDriver.class.getSimpleName());// 设置作业驱动类job.setJarByClass(DBDriver.class);// 设置作业mapper reducer类job.setMapperClass(DBMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(CountEntity.class);job.setReducerClass(DBReducer.class);job.setOutputKeyClass(CountEntity.class);job.setOutputValueClass(NullWritable.class);job.setInputFormatClass(DBInputFormat.class);DBInputFormat.setInput(job,CountEntity.class,"SELECT date,county,state,fips,cases,deaths from covid_input","SELECT count(*) from covid_input");// 配置作业的输出job.setOutputFormatClass(DBOutputFormat.class);DBOutputFormat.setOutput(job,"covid_output","state", "cases", "deaths");return job.waitForCompletion(true) ? 0 : 1;}
}

执行该驱动类:

在这里插入图片描述
执行成功后,到 Mysql 中查看结果:

在这里插入图片描述

最后可以检验数据的正确性:

SELECTstate,sum(cases) AS total_cases,sum(deaths) AS total_deaths
FROM`covid_input`
GROUP BY state

在这里插入图片描述

相关内容

热门资讯

监控摄像头接入GB28181平... 流程简介将监控摄像头的视频在网站和APP中直播,要解决的几个问题是:1&...
Windows10添加群晖磁盘... 在使用群晖NAS时,我们需要通过本地映射的方式把NAS映射成本地的一块磁盘使用。 通过...
protocol buffer... 目录 目录 什么是protocol buffer 1.protobuf 1.1安装  1.2使用...
在Word、WPS中插入AxM... 引言 我最近需要写一些文章,在排版时发现AxMath插入的公式竟然会导致行间距异常&#...
Fluent中创建监测点 1 概述某些仿真问题,需要创建监测点,用于获取空间定点的数据࿰...
educoder数据结构与算法...                                                   ...
MySQL下载和安装(Wind... 前言:刚换了一台电脑,里面所有东西都需要重新配置,习惯了所...
MFC文件操作  MFC提供了一个文件操作的基类CFile,这个类提供了一个没有缓存的二进制格式的磁盘...
有效的括号 一、题目 给定一个只包括 '(',')','{','}'...
【PdgCntEditor】解... 一、问题背景 大部分的图书对应的PDF,目录中的页码并非PDF中直接索引的页码...