from typing import Dict, Any, Optional from app.services.ingest.registry import PlatformConfig, DedupFieldSpec, register from app.services.ingest.remote_push import safe_get, safe_join def _extract_job_id(data: Dict[str, Any]) -> Optional[str]: # 新格式: encryptJobId 在顶层 # 旧格式: jobBaseInfoVO.jobId val = data.get("encryptJobId") if not val: job_base = data.get("jobBaseInfoVO") or {} val = job_base.get("jobId") or job_base.get("encryptJobId") return str(val) if val else None def _extract_company_name(data: Dict[str, Any]) -> Optional[str]: name = data.get("brandName") or data.get("name") or (data.get("companyFullInfoVO") or {}).get("name") return str(name) if name else None def _build_boss_push(data: Dict[str, Any]) -> Optional[Dict[str, Any]]: # 兼容新旧两种 API 格式 # 旧格式: brandComInfoVO / jobBaseInfoVO / bossBaseInfoVO 嵌套 # 新格式: 扁平结构,字段直接在顶层 boss_base = data.get("bossBaseInfoVO") or {} job_base = data.get("jobBaseInfoVO") or {} brand = data.get("brandComInfoVO") or {} # 职位信息:优先新格式顶层字段,fallback 旧格式嵌套字段 title = data.get("jobName") or safe_get(job_base, "positionName") description = data.get("jobDesc") or safe_get(job_base, "jobDesc") education = data.get("jobDegree") or safe_get(job_base, "degreeName") years = data.get("jobExperience") or safe_get(job_base, "experienceName") skills = data.get("skills") or job_base.get("requiredSkills") or [] welfares = data.get("welfares") or job_base.get("salaryWelfareInfo") or [] encrypt_job_id = data.get("encryptJobId") or safe_get(job_base, "encryptJobId") # 薪资:新格式用 salaryDesc,旧格式用 lowSalary-highSalary salary = data.get("salaryDesc") or "" if not salary: low = safe_get(job_base, "lowSalary") high = safe_get(job_base, "highSalary") salary = f"{low}-{high}" if low or high else "" # 位置:新格式用 cityName/districtName/businessName,旧格式用 locationName/locationDesc city = data.get("cityName") or "" district = data.get("districtName") or "" business = data.get("businessName") or "" location = f"{city}{district}{business}".strip() or safe_get(job_base, "locationName", "位置信息未找到") position = location or safe_get(job_base, "locationDesc", "位置信息未找到") # 公司信息:优先新格式顶层字段,fallback 旧格式嵌套字段 brand_name = data.get("brandName") or safe_get(brand, "brandName") brand_scale = data.get("brandScale") or safe_get(brand, "scaleName") brand_stage = data.get("brandStage") or safe_get(brand, "stageName") encrypt_brand_id = safe_get(brand, "encryptBrandId") brand_industry = safe_get(brand, "industryName") return { "source_type": "Boss直聘", "name": brand_name, "common_name": brand_name or safe_get(boss_base, "brandName"), "title": title, "title_addr": title, "description": description, "education": education, "skill": safe_join(skills), "welfare": safe_join(welfares), "years": years, "salary": salary, "location": location, "position": position, "job_type": "全职", "size": brand_scale, "employer_type": "全职", "industry": brand_industry, "job_1st_class": "", "job_2nd_class": "", "job_3rd_class": "", "job_4th_class": "", "date": "", "start_date": "", "end_date": "", "age": "", "sex": "", "number": "", "url": f"https://www.zhipin.com/job_detail/{encrypt_job_id}.html" if encrypt_job_id else "", "company_id": encrypt_brand_id, "company_name": brand_name, "company_url": f"https://www.zhipin.com/gongsi/{encrypt_brand_id}.html" if encrypt_brand_id else "", "company_desc": safe_get(brand, "introduce"), "base_data": data, } register(PlatformConfig( platform="boss", channel="mini", data_type="job", table="boss_job", dedup_fields=(DedupFieldSpec(column="job_id", extractor=_extract_job_id),), push_mapper=_build_boss_push, )) register(PlatformConfig( platform="boss", channel="mini", data_type="company", table="boss_company", dedup_fields=(DedupFieldSpec(column="company_name", extractor=_extract_company_name),), )) # 公司关联职位(通过 company_jobs_sync 写入,与搜索职位 mini 区分) register(PlatformConfig( platform="boss", channel="company", data_type="job", table="boss_job", dedup_fields=(DedupFieldSpec(column="job_id", extractor=_extract_job_id),), push_mapper=_build_boss_push, ))