XXL-JOB的搭建可以参考:参考博客
在介绍分片广播之前,我们先熟悉一下执行器在集群部署下调度中心的调度策略。

**任务超时时间:**支持自定义任务超时时间,任务运行超时将会主动中断任务。
**失败重试次数:**支持自定义任务失败重试次数,当任务失败时将会按照预设的失败重试次数主动进行重试。
分片是指调度中心将集群中的执行器标上序号,0,1,2,3…,广播是指每次调度会向集群中所有执行器发送调度请求,请求中携带分片参数。
每个执行器收到调度请求根据分片参数自行决定是否执行任务。
另外xxl-job还支持动态分片,当执行器数量有变更时,调度中心会动态修改分片的数量。
适用场景:
所以,广播分片方式不仅可以充分发挥每个执行器的能力,并且根据分片参数可以控制任务是否执行,最终灵活控制了执行器集群分布式处理任务。
使用说明:
“分片广播” 和普通任务开发流程一致,不同之处在于可以获取分片参数进行分片业务处理。
Java语言任务获取分片参数方式:
BEAN、GLUE模式(Java),可参考Sample示例执行器中的示例任务:
/*** 2、分片广播任务*/
@XxlJob("shardingJobHandler")
public void shardingJobHandler() throws Exception {// 分片序号,从0开始int shardIndex = XxlJobHelper.getShardIndex();// 分片总数int shardTotal = XxlJobHelper.getShardTotal();//......
}
1)在调度中心添加任务
添加成功:
启动任务,观察日志:
当一次分片广播到来,各执行器如何根据分片参数去分布式执行任务,保证执行器之间执行的任务不重复呢?
执行器收到调度请求后各自查询属于自己的任务,这样就保证了执行器之间不会重复执行任务。
xxl-job设计作业分片就是为了分布式执行任务,XXL-JOB并不直接提供数据处理的功能,它只会给执行器分配好分片序号并向执行器传递分片总数、分片序号这些参数,开发者需要自行处理分片项与真实数据的对应关系。
下图表示了多个执行器获取视频处理任务的结构:
每个执行器收到广播任务有两个参数:分片总数、分片序号。每个执行从数据表取任务时可以让任务id 模上 分片总数,如果等于分片序号则执行此任务。
上边两个执行器实例那么分片总数为2,序号为0、1,从任务1开始,如下:
1 % 2 = 1 执行器2执行
2 % 2 = 0 执行器1执行
3 % 2 = 1 执行器2执行
以此类推.
通过作业分片方案保证了执行器之间分配的任务不重复,另外如果同一个执行器在处理一个视频还没有完成,此时调度中心又一次请求调度,为了不重复处理同一个视频该怎么办?
我们在调度中心中编辑任务,设置为调度过期策略为“忽略”,设置阻塞处理策略为“丢弃后续调度”。
这样就可以避免重复调度了。
不过,我们还需要注意保证任务处理的幂等性。
任务的幂等性是指:对于数据的操作不论多少次,操作的结果始终是一致的。执行器接收调度请求去执行任务,要有办法去判断该任务是否处理完成,如果处理完则不再处理,即使重复调度处理相同的任务也不能重复处理相同的视频。
幂等性是为了解决重复提交问题,比如:恶意刷单,重复支付等。
解决幂等性常用的方案:
1)数据库约束,比如:唯一索引,主键。
2)乐观锁,常用于数据库,更新数据时根据乐观锁状态去更新。
3)唯一序列号,操作传递一个唯一序列号,操作时判断与该序列号相等则执行。
这里我们在数据库视频处理表中添加处理状态字段,视频处理完成更新状态为完成,执行视频处理前判断状态是否完成,如果完成则不再处理。
既然确定了分片方案,下边就可以梳理整个视频上传及处理的业务流程了。
上传视频成功向视频处理待处理表添加记录。
视频处理的详细流程如下:
如下图是待处理任务表:
完成任务历史表的结构与待处理任务表相同。
上传视频成功向视频处理待处理表添加记录,暂时只添加对avi视频的处理记录。
根据MIME Type去判断是否是avi视频,下边列出部分MIME Type
avi视频的MIME Type是video/x-msvideo。
在视频上传的时候,我们同时也要将视频的信息录入数据库:
重点关注第三步,在第三步将视频信息存入待处理表中。
@Transactional
public MediaFiles addMediaFilesToDb(Long companyId, String fileId, UploadFileParamsDto uploadFileParamsDto, String bucket, String objectName) {MediaFiles mediaFiles = mediaFilesMapper.selectById(fileId);if (mediaFiles == null) {mediaFiles = new MediaFiles();//1.封装数据BeanUtils.copyProperties(uploadFileParamsDto, mediaFiles);mediaFiles.setId(fileId);mediaFiles.setFileId(fileId);mediaFiles.setCompanyId(companyId);mediaFiles.setFilename(fileId);mediaFiles.setBucket(bucket);mediaFiles.setFilePath(objectName);//2.图片、MP4视频可以设置url//2.1获取扩展名String extension = null;String filename = uploadFileParamsDto.getFilename();if (StringUtils.isNotEmpty(filename) && filename.indexOf(".") >= 0) {extension = filename.substring(filename.lastIndexOf("."));}//2.2媒体类型String mimeType = getMimeTypeByextension(extension);if (mimeType.indexOf("image") >= 0 || mimeType.indexOf("mp4") >= 0) {mediaFiles.setUrl("/" + bucket + "/" + objectName);}mediaFiles.setCreateDate(LocalDateTime.now());mediaFiles.setStatus("1");mediaFiles.setAuditStatus("002003");//2.3插入文件表mediaFilesMapper.insert(mediaFiles);//3.对avi视频添加到待处理任务表if (mimeType.equals("video/x-msvideo")) {MediaProcess mediaProcess = new MediaProcess();BeanUtils.copyProperties(mediaFiles, mediaProcess);mediaProcess.setStatus("1");//未处理状态mediaProcessMapper.insert(mediaProcess);}}return mediaFiles;
}
我们如何保证查询到的待处理视频记录不重复?
我们可以根据调度中心传递的分片参数和执行器总数设定特定执行器执行特定任务。
编写根据分片参数获取待处理任务的dao方法:
/*** @description 根据分片参数获取待处理任务* @param shardTotal 分片总数* @param shardindex 分片序号* @param count 任务数* @return java.util.List */@Select("select * from media_process t where t.id % #{shardTotal} = #{shardIndex} limit #{count}")
public List selectListByShardIndex(@Param("shardTotal") int shardTotal,@Param("shardIndex") int shardIndex,@Param("count") int count);
定义service接口和service实现方法:
我这里跳过定义接口,直接到service实现方法:
@Override
public List getMediaProcessList(int shardIndex, int shardTotal, int count) {return mediaProcessMapper.selectListByShardIndex(shardTotal, shardIndex, count);
}
我们先跳过视频处理那一部,假设我们现在已经处理完视频了,我们需要对任务进行一系列的操作。
比如:
@Override
@Transactional
public void saveProcessFinishStatus(Long taskId, String status, String fileId, String url, String errorMsg) {//查询这个任务MediaProcess mediaProcess = mediaProcessMapper.selectById(taskId);if (mediaProcess == null) {log.debug("更新任务状态时此任务:{}为空", taskId);return;}LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper().eq(MediaProcess::getId, taskId);//判断是否成功if ("3".equals(status)) {//任务失败MediaProcess mediaProcess_u = new MediaProcess();mediaProcess_u.setStatus("3");//处理失败mediaProcess_u.setErrormsg(errorMsg);//这里修改是为了保证幂等性,防止一个任务重复执行mediaProcessMapper.update(mediaProcess_u, queryWrapper);return;}//处理成功,更新状态if ("2".equals(status)) {//更新待处理任务表mediaProcess.setStatus("2");mediaProcess.setUrl(url);mediaProcess.setFinishDate(LocalDateTime.now());//这里修改是为了保证幂等性,防止一个任务重复执行mediaProcessMapper.updateById(mediaProcess);//更新文件表中的url字段MediaFiles mediaFiles = mediaFilesMapper.selectById(fileId);mediaFiles.setUrl(url);mediaFilesMapper.updateById(mediaFiles);}//如果成功将任务添加到历史纪录表MediaProcessHistory mediaProcessHistory = new MediaProcessHistory();BeanUtils.copyProperties(mediaProcess, mediaProcessHistory);mediaProcessHistoryMapper.insert(mediaProcessHistory);//如果成功则将待处理表的记录删除mediaProcessMapper.deleteById(taskId);
}
在处理视频之间先了解一下什么是视频编码。
首先我们要分清文件格式和编码格式:
FFmpeg被许多开源项目采用,QQ影音、暴风影音、VLC等。
下载:FFmpeg https://www.ffmpeg.org/download.html#build-windows
我们可通过如下命令将一个.avi后缀的视频文件转成.mp4文件:
ffmpeg.exe -i 1.avi 1.mp4
视频处理采用并发处理,每个视频使用一个线程去处理,每次处理的视频数量不要超过服务器的cpu核数。
逻辑:
定义VideoTask类:
@Component
@Slf4j
public class VideoTask {@Autowiredprivate MediaFileProcessService mediaFileProcessService;@Autowiredprivate MediaFileService mediaFileService;@Value("${videoprocess.ffmpegpath}")private String ffmpegpath;/*** 2、分片广播任务*/@XxlJob("videoJobHander")public void videoJobHander() throws Exception {// 分片参数int shardIndex = XxlJobHelper.getShardIndex();// 分片总数int shardTotal = XxlJobHelper.getShardTotal();//查询待处理任务,一次处理的任务数和cpu核心数一样List mediaProcessList = mediaFileProcessService.getMediaProcessList(shardIndex, shardTotal, 2);if (CollectionUtils.isEmpty(mediaProcessList)) {log.debug("查询到的待处理任务为0");return;}//要处理的任务数int size = mediaProcessList.size();//创建size个线程数量的线程池ExecutorService threadPool = Executors.newFixedThreadPool(size);//计数器CountDownLatch countDownLatch = new CountDownLatch(size);//遍历mediaProcessList,将任务放入线程池mediaProcessList.forEach(mediaProcess -> {//任务执行逻辑threadPool.execute(() -> {//视频处理状态String status = mediaProcess.getStatus();//保证幂等性//"2"表示处理完成if ("2".equals(status)) {log.debug("视频已经处理不用再次处理,视频信息:{}", mediaProcess);//计数器-1countDownLatch.countDown();return;}//桶String bucket = mediaProcess.getBucket();//原视频的MD5值String fileId = mediaProcess.getFileId();//存储路径String filePath = mediaProcess.getFilePath();//原视频文件名称String filename = mediaProcess.getFilename();//将要处理的文件下载到服务器上File originalFile = null;//处理结束的视频文件File mp4File = null;try {originalFile = File.createTempFile("original", null);mp4File = File.createTempFile("mp4", ".mp4");} catch (IOException e) {log.error("处理视频前创建临时文件失效");//计数器-1countDownLatch.countDown();return;}try {//将原视频下载到本地(avi)originalFile = mediaFileService.downloadFileFromMinIO(originalFile, bucket, filePath);} catch (Exception e) {log.error("下载原始文件过程出错:{},文件信息:{}", e.getMessage(), mediaProcess);//计数器-1countDownLatch.countDown();return;}//调用工具类将avi转成MP4//转换后MP4文件的名称String mp4_name = fileId + ".mp4";//转换后MP4文件的路径String mp4_path = mp4File.getAbsolutePath();Mp4VideoUtil videoUtil = new Mp4VideoUtil(ffmpegpath, originalFile.getAbsolutePath(), mp4_name, mp4_path);//开始视频转换,成功将返回success,失败则返回失败原因String result = videoUtil.generateMp4();//默认处理失败String statusNew = "3";//最终访问路径String url = null;if ("success".equals(result)) {//转换成功//上传至minIO的路径String objectName = getFilePathByMd5(fileId, ".mp4");try {//上传至miniomediaFileService.addMediaFilesToMinIO(mp4_path, bucket, objectName);//处理成功statusNew = "2";url = "/" + bucket + "/" + objectName;} catch (Exception e) {log.debug("上传文件出错:{}", e.getMessage());//计数器-1countDownLatch.countDown();return;}}try {//记录任务处理结果mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), statusNew, fileId, url, result);//删除临时文件 originalFile 和 mp4Fileif(originalFile.exists()){originalFile.delete();}if(mp4File.exists()){mp4File.delete();}} catch (Exception e) {log.debug("保存任务处理结果出错:{}",e.getMessage());//计数器-1countDownLatch.countDown();return;}//计数器-1countDownLatch.countDown();});});//阻塞是为了使线程池中的任务都完成,不阻塞的话方法一下子就结束了,任务也没时间执行//阻塞到任务执行完成,当"countDownLatch"计数器归零,解除阻塞//等待,给一个充裕的超时时间,防止无限等待,到达超时时间还没有处理完成则结束任务countDownLatch.await(30,TimeUnit.MINUTES);}private String getFilePathByMd5(String fileMd5, String fileExt) {//将文件MD5值的第一位数作为一级目录,第二位数作为二级目录return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/" + fileMd5 + fileExt;}
}
至此,视频处理业务就完成了。
XXL-JOB的工作原理是什么?XXL-JOB是什么,怎么工作?
答:XXL-JOB分布式任务调度服务由调度中心和执行器组成,调度中心负责按任务调度策略向执行器下发任务,执行器负责接收任务执行任务。
如何保证任务不重复执行?
任务幂等性如何保证?
幂等性描述了一次和多次请求某一资源对于资源本身应该具有同样的结果。
幂等性是为了解决重复提交问题,比如:恶意刷单,重复支付等。
解决幂等性常用方案: