修海数据清洗相关代码

This commit is contained in:
zk
2026-06-02 14:45:05 +08:00
parent 9a58722282
commit b9d8c2724a
5 changed files with 49 additions and 33 deletions
@@ -19,6 +19,6 @@ public interface AppJobDataMapper extends CommonMapper<AppJobData> {
* 查询待清洗数据并加行锁(SELECT ... FOR UPDATE * 查询待清洗数据并加行锁(SELECT ... FOR UPDATE
* <p>必须在事务内调用,配合状态更新实现原子锁定</p> * <p>必须在事务内调用,配合状态更新实现原子锁定</p>
*/ */
@Select("SELECT * FROM app_job_data WHERE clean_status = 0 AND is_valid = 1 LIMIT #{limit} FOR UPDATE") @Select("SELECT * FROM app_job_data WHERE clean_status = 'pending' LIMIT #{limit} FOR UPDATE")
List<AppJobData> selectForUpdate(@Param("limit") int limit); List<AppJobData> selectForUpdate(@Param("limit") int limit);
} }
@@ -1,6 +1,7 @@
package org.jiayunet.pojo.po; package org.jiayunet.pojo.po;
import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName; import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data; import lombok.Data;
@@ -20,8 +21,9 @@ public class AppJobData {
@TableId(type = IdType.AUTO) @TableId(type = IdType.AUTO)
private Long id; private Long id;
/** 关联爬取任务ID */ /** 关联urllistid */
private Long taskCrawlId; @TableField("urllistid")
private Long urllistId;
/** 职位名称 */ /** 职位名称 */
private String jobTitle; private String jobTitle;
@@ -47,33 +49,30 @@ public class AppJobData {
/** 详情页URL */ /** 详情页URL */
private String detailUrl; private String detailUrl;
/** 招聘分类: 0=校招, 1=实习, 2=社招, 3=其他 */
private Integer recruitCategory;
/** 内容哈希值,用于查重 */ /** 内容哈希值,用于查重 */
private String contentHash; private String contentHash;
/** 数据来源 0=官网 1=平台 */ /** 数据来源 0=官网 1=平台 */
private Integer sources; private Integer sources;
/** 是否独立URL 0=页内展示 1=独立页面 */ /** 发布日期 */
private Integer isIndependentUrl;
/** 是否有效 0=无效 1=有效 */
private Integer isValid;
/** 有效期 */
private Instant expireAt; private Instant expireAt;
/** 验证状态 pending=待验证 checking=验证中 checked=已验证 */
private String checkStatus;
/** 清洗状态 0=待清洗 1=清洗中 2=已入库 3=已丢弃 */
private Integer cleanStatus;
/** 上次验证时间 */
private Instant lastCheckAt;
/** 创建时间 */ /** 创建时间 */
private Instant createdAt; private Instant createdAt;
/** 更新时间 */ /** 更新时间 */
private Instant updatedAt; private Instant updatedAt;
/** 清洗状态: pending=待清洗 cleaning=清洗中 cleaned=已清洗 discarded=已丢弃 */
private String cleanStatus;
/** 清洗开始时间 */
private Instant cleanStartedAt;
/** 清洗完成时间 */
private Instant cleanedAt;
} }
@@ -77,6 +77,12 @@ public class Job {
/** 爬虫原始数据ID,用于去重 */ /** 爬虫原始数据ID,用于去重 */
private String sourceId; private String sourceId;
/** 招聘分类 0=校招 1=实习 2=社招 3=其他 */
private Integer recruitCategory;
/** 发布日期 */
private Instant expireAt;
/** 状态 0=上架 1=下架 2=已失效 */ /** 状态 0=上架 1=下架 2=已失效 */
private Integer status; private Integer status;
@@ -102,9 +102,10 @@ public class JobCleanService {
public void recoverZombie() { public void recoverZombie() {
int recovered = appJobDataMapper.update(null, int recovered = appJobDataMapper.update(null,
new LambdaUpdateWrapper<AppJobData>() new LambdaUpdateWrapper<AppJobData>()
.set(AppJobData::getCleanStatus, 0) .set(AppJobData::getCleanStatus, "pending")
.eq(AppJobData::getCleanStatus, 1) .set(AppJobData::getCleanStartedAt, null)
.lt(AppJobData::getUpdatedAt, Instant.now().minusSeconds(600))); .eq(AppJobData::getCleanStatus, "cleaning")
.lt(AppJobData::getCleanStartedAt, Instant.now().minusSeconds(600)));
if (recovered > 0) { if (recovered > 0) {
log.info("僵尸恢复:重置{}条数据", recovered); log.info("僵尸恢复:重置{}条数据", recovered);
@@ -118,7 +119,7 @@ public class JobCleanService {
public void cleanOne(AppJobData data) { public void cleanOne(AppJobData data) {
// 1. 前置校验 // 1. 前置校验
if (data.getDescription() == null || data.getDescription().length() < 20) { if (data.getDescription() == null || data.getDescription().length() < 20) {
jobCleanTransactionService.updateCleanStatus(data.getId(), 3); jobCleanTransactionService.updateCleanStatus(data.getId(), "discarded");
return; return;
} }
@@ -134,7 +135,7 @@ public class JobCleanService {
// valid 校验 // valid 校验
if (!root.path("valid").asBoolean(false)) { if (!root.path("valid").asBoolean(false)) {
jobCleanTransactionService.updateCleanStatus(data.getId(), 3); jobCleanTransactionService.updateCleanStatus(data.getId(), "discarded");
return; return;
} }
@@ -142,7 +143,7 @@ public class JobCleanService {
String sourceId = String.valueOf(data.getId()); String sourceId = String.valueOf(data.getId());
Long existJob = jobMapper.selectCount(new LambdaQueryWrapper<Job>().eq(Job::getSourceId, sourceId)); Long existJob = jobMapper.selectCount(new LambdaQueryWrapper<Job>().eq(Job::getSourceId, sourceId));
if (existJob > 0) { if (existJob > 0) {
jobCleanTransactionService.updateCleanStatus(data.getId(), 2); jobCleanTransactionService.updateCleanStatus(data.getId(), "cleaned");
return; return;
} }
@@ -86,6 +86,10 @@ public class JobCleanTransactionService {
Long requiredIndustryId = root.path("requiredIndustryId").asLong(0); Long requiredIndustryId = root.path("requiredIndustryId").asLong(0);
job.setRequiredIndustryId(requiredIndustryId == 0 ? null : requiredIndustryId); job.setRequiredIndustryId(requiredIndustryId == 0 ? null : requiredIndustryId);
// 从原始数据透传 recruit_category 和 expire_at
job.setRecruitCategory(data.getRecruitCategory());
job.setExpireAt(data.getExpireAt());
job.setSourceUrl(data.getDetailUrl()); job.setSourceUrl(data.getDetailUrl());
job.setSourceId(sourceId); job.setSourceId(sourceId);
job.setStatus(0); job.setStatus(0);
@@ -106,8 +110,8 @@ public class JobCleanTransactionService {
jobRegionRelationMapper.batchInsert(relations); jobRegionRelationMapper.batchInsert(relations);
} }
// 更新清洗状态 // 更新清洗状态为已清洗
updateCleanStatus(data.getId(), 2); updateCleanStatus(data.getId(), "cleaned");
} }
/** /**
@@ -162,11 +166,16 @@ public class JobCleanTransactionService {
} }
/** 更新清洗状态 */ /** 更新清洗状态 */
public void updateCleanStatus(Long id, int status) { public void updateCleanStatus(Long id, String status) {
appJobDataMapper.update(null, LambdaUpdateWrapper<AppJobData> wrapper = new LambdaUpdateWrapper<AppJobData>()
new LambdaUpdateWrapper<AppJobData>() .set(AppJobData::getCleanStatus, status)
.set(AppJobData::getCleanStatus, status) .eq(AppJobData::getId, id);
.eq(AppJobData::getId, id));
if ("cleaned".equals(status)) {
wrapper.set(AppJobData::getCleanedAt, Instant.now());
}
appJobDataMapper.update(null, wrapper);
} }
/** /**
@@ -185,7 +194,8 @@ public class JobCleanTransactionService {
List<Long> ids = dataList.stream().map(AppJobData::getId).toList(); List<Long> ids = dataList.stream().map(AppJobData::getId).toList();
appJobDataMapper.update(null, appJobDataMapper.update(null,
new LambdaUpdateWrapper<AppJobData>() new LambdaUpdateWrapper<AppJobData>()
.set(AppJobData::getCleanStatus, 1) .set(AppJobData::getCleanStatus, "cleaning")
.set(AppJobData::getCleanStartedAt, Instant.now())
.in(AppJobData::getId, ids)); .in(AppJobData::getId, ids));
return dataList; return dataList;