对于本专栏的前面几篇文章的操作,基本都是读取本地或 HDFS 中的文件,如果有的数据是存在 DB
中的我们要怎么处理呢?
Hadoop
为我们提供了 DBInputFormat
和 DBOutputFormat
两个类。顾名思义 DBInputFormat
负责从数据库中读取数据,DBOutputFormat
负责把数据最终写入数据库中。
不过如果要把数据库内容映射成对象实体,还需要该实体实现 DBWritable
接口,其中 readFields
方法用来指定获取数据库中的字段,write
方法用于指定写入数据库字段。
下面还是使用本专栏上几篇文章所使用的COVID-19
案例进行试验,首先将文本类型的数据集导入 Mysql
数据库中,然后读取表信息作为数据集分析每个州的 cases
和 deaths
的总数,并将计算结果写入 Mysql
。
COVID-19
案例地址:
https://blog.csdn.net/qq_43692950/article/details/127475811
首先在 Mysql
中创建表:
CREATE TABLE `covid_input` (`id` int NOT NULL AUTO_INCREMENT,`date` datetime DEFAULT NULL,`county` varchar(255) DEFAULT NULL,`state` varchar(255) DEFAULT NULL,`fips` varchar(255) DEFAULT NULL,`cases` int DEFAULT NULL,`deaths` int DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
创建对象实体类,实现 WritableComparable
、DBWritable
:
@Data
@AllArgsConstructor
@NoArgsConstructor
public class CountEntity implements WritableComparable, DBWritable {private String date; //日期private String county; // 县private String state; // 州private String fips; // 县编码codeprivate Long cases;//确诊病例数private Long deaths;//死亡病例数/*** 序列化方法*/@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(date);out.writeUTF(county);out.writeUTF(state);out.writeUTF(fips);out.writeLong(cases);out.writeLong(deaths);}/*** 反序列化方法 注意顺序*/@Overridepublic void readFields(DataInput in) throws IOException {this.date = in.readUTF();this.county = in.readUTF();this.state = in.readUTF();this.fips = in.readUTF();this.cases = in.readLong();this.deaths = in.readLong();}/*** 指定写入 DB 中的字段,parameterIndex对应 DBOutputFormat.setOutput 中指定的 fieldNames*/@Overridepublic void write(PreparedStatement preparedStatement) throws SQLException {preparedStatement.setString(1, date);preparedStatement.setString(2, county);preparedStatement.setString(3, state);preparedStatement.setString(4, fips);preparedStatement.setLong(5, cases);preparedStatement.setLong(6, deaths);}/*** 从数据库读取字段信息,由于是读取的文本文件写入 Mysql,没有读取 DB*/@Overridepublic void readFields(ResultSet resultSet) throws SQLException {}//排序规则,根据首先根据 state 正序排列,然后根据 deaths 倒序排序@Overridepublic int compareTo(CountEntity o) {int i = this.state.compareTo(o.getState());if (i == 0) {return this.deaths.equals(o.getDeaths()) ? 0 : (this.deaths - o.getDeaths() > 0 ? -1 : 1);}return i;}}
编写 Mapper
类,由于这里无需聚合分组操作,可以不做 Reduces
,直接去 Mapper
的输出结果到 Mysql
即可,因此这里 key
输出实体对象,Value
为 Null
占位
@Slf4j
public class DBMapper extends Mapper {CountEntity outValue = new CountEntity();NullWritable outKey = NullWritable.get();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] fields = value.toString().split(",");if (fields.length >= 6){outValue.setDate(fields[0]);outValue.setCounty(fields[1]);outValue.setState(fields[2]);outValue.setFips(fields[3]);outValue.setCases(Long.parseLong(fields[4]));outValue.setDeaths(Long.parseLong(fields[5]));context.write(outValue, outKey);}}
}
最后编写驱动类,声明输出表及字段:
public class DBDriver extends Configured implements Tool {public static void main(String[] args) throws Exception{Configuration conf = new Configuration();//配置当前作业需要使用的JDBC信息DBConfiguration.configureDB(conf,"com.mysql.cj.jdbc.Driver","jdbc:mysql://127.0.0.1:3306/mapreduces?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT","root","root");int status = ToolRunner.run(conf, new DBDriver(), args);System.exit(status);}@Overridepublic int run(String[] args) throws Exception {Job job = Job.getInstance(getConf(), DBDriver.class.getSimpleName());// 设置作业驱动类job.setJarByClass(DBDriver.class);// 设置作业mapper reducer类job.setMapperClass(DBMapper.class);// 设置作业mapper阶段输出key value数据类型job.setMapOutputKeyClass(CountEntity.class);job.setMapOutputValueClass(NullWritable.class);//这里无需Recucesjob.setNumReduceTasks(0);// 配置作业的输入数据路径FileInputFormat.addInputPath(job, new Path("D:/test/input"));// 配置作业的输出job.setOutputFormatClass(DBOutputFormat.class);DBOutputFormat.setOutput(job,"covid_input","date", "county", "state", "fips", "cases","deaths");return job.waitForCompletion(true)? 0:1;}
}
执行驱动类:
执行成功后,到 Mysql 查看结果:
已经写入成功,下面基于该表统计每个 state
州 的 cases
和 deaths
总数。
现在和上面不同的是,输入和输出都是DB
,首先创建结果输出表:
CREATE TABLE `covid_output` (`id` int NOT NULL AUTO_INCREMENT,`state` varchar(255) DEFAULT NULL,`cases` int DEFAULT NULL,`deaths` int DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
修改 CountEntity
实体,指定读取和输出 DB
字段:
@Data
@AllArgsConstructor
@NoArgsConstructor
public class CountEntity implements WritableComparable, DBWritable {private String date; //日期private String county; // 县private String state; // 州private String fips; // 县编码codeprivate Long cases;//确诊病例数private Long deaths;//死亡病例数/*** 序列化方法*/@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(date);out.writeUTF(county);out.writeUTF(state);out.writeUTF(fips);out.writeLong(cases);out.writeLong(deaths);}/*** 反序列化方法 注意顺序*/@Overridepublic void readFields(DataInput in) throws IOException {this.date = in.readUTF();this.county = in.readUTF();this.state = in.readUTF();this.fips = in.readUTF();this.cases = in.readLong();this.deaths = in.readLong();}/*** 由于输出covid_output表字段*/@Overridepublic void write(PreparedStatement preparedStatement) throws SQLException {preparedStatement.setString(1, state);preparedStatement.setLong(2, cases);preparedStatement.setLong(3, deaths);}/*** 读取covid_input 表中的字段*/@Overridepublic void readFields(ResultSet resultSet) throws SQLException {this.date = resultSet.getString("date");this.county = resultSet.getString("county");this.state = resultSet.getString("state");this.fips = resultSet.getString("fips");this.cases = resultSet.getLong("cases");this.deaths = resultSet.getLong("deaths");}//排序规则,根据首先根据 state 正序排列,然后根据 deaths 倒序排序@Overridepublic int compareTo(CountEntity o) {int i = this.state.compareTo(o.getState());if (i == 0) {return this.deaths.equals(o.getDeaths()) ? 0 : (this.deaths - o.getDeaths() > 0 ? -1 : 1);}return i;}}
编写 Mapper
类,将 state
最为 key
, value
行数据输出至 Reduces 中
public class DBMapper extends Mapper {Text outKey = new Text();@Overrideprotected void map(LongWritable key, CountEntity value, Context context) throws IOException, InterruptedException {outKey.set(value.getState());context.write(outKey, value);}
}
在 Reduces
中对 cases
和 deaths
进行求和,key
即为输出的结果:
public class DBReducer extends Reducer {CountEntity outKey = new CountEntity();NullWritable outValue = NullWritable.get();@Overrideprotected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {long totalCases = 0;long totalDeaths = 0;for (CountEntity value : values) {totalCases += value.getCases();totalDeaths += value.getDeaths();}outKey.setState(key.toString());outKey.setCases(totalCases);outKey.setDeaths(totalDeaths);context.write(outKey, outValue);}
}
最后编写驱动类,指定输入输出方式:
public class DBDriver extends Configured implements Tool {public static void main(String[] args) throws Exception {//配置文件对象Configuration conf = new Configuration();//配置当前作业需要使用的JDBC信息DBConfiguration.configureDB(conf,"com.mysql.cj.jdbc.Driver","jdbc:mysql://127.0.0.1:3306/mapreduces?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT","root","root");conf.set("mapreduce.framework.name", "local");int status = ToolRunner.run(conf, new DBDriver(), args);System.exit(status);}@Overridepublic int run(String[] args) throws Exception {// 创建作业实例Job job = Job.getInstance(getConf(), DBDriver.class.getSimpleName());// 设置作业驱动类job.setJarByClass(DBDriver.class);// 设置作业mapper reducer类job.setMapperClass(DBMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(CountEntity.class);job.setReducerClass(DBReducer.class);job.setOutputKeyClass(CountEntity.class);job.setOutputValueClass(NullWritable.class);job.setInputFormatClass(DBInputFormat.class);DBInputFormat.setInput(job,CountEntity.class,"SELECT date,county,state,fips,cases,deaths from covid_input","SELECT count(*) from covid_input");// 配置作业的输出job.setOutputFormatClass(DBOutputFormat.class);DBOutputFormat.setOutput(job,"covid_output","state", "cases", "deaths");return job.waitForCompletion(true) ? 0 : 1;}
}
执行该驱动类:
执行成功后,到 Mysql 中查看结果:
最后可以检验数据的正确性:
SELECTstate,sum(cases) AS total_cases,sum(deaths) AS total_deaths
FROM`covid_input`
GROUP BY state
上一篇:关于蓝桥杯单片机组自学的经验分享
下一篇:【每日渗透笔记】文件上传绕过尝试