get_jobs/boss.py
2025-04-17 00:36:31 +08:00

270 lines
9.9 KiB
Python

import time
import base
from base import Logger
from selenium.webdriver.common.by import By
import json
import os
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import consts
class BossCore(base.Core):
citys = {}
def __init__(self):
super().__init__(
name="boss",
url_base="https://www.zhipin.com/",
url_login="/web/user/?ka=header-login",
send_amount=300,
filter_dict={
"公司行业": "industry",
"工作经验": "experience",
"求职类型": "jobType",
"薪资待遇": "salary",
"学历要求": "degree",
"公司规模": "scale",
"融资阶段": "stage",
},
)
def detect_login(self):
last_status = self.login_status
self.login_status = (
len(self.driver.find_elements(By.CLASS_NAME, "link-logout")) > 0
)
if not last_status and self.login_status:
self.save_cookies()
def detect_verify(self):
self.verify_status = (
len(self.driver.find_elements(By.CLASS_NAME, "validate_button_click")) > 0
)
def get_city_info(self, value):
province = next(iter(value.keys()))
city = next(iter(value.keys()))
if province in self.citys.keys():
if city in self.citys[province].keys():
return self.citys[province][city]
try:
with open(
os.path.join(self.assets_path + "\site.json"), "r", encoding="utf-8"
) as f:
allData = json.load(f)
ProvinceData = None
Data = None
# 获取省份数据
for item in allData["siteList"]:
if item["name"] == province:
ProvinceData = item
self.citys[province] = {}
break
if ProvinceData is None:
Logger.warn(f"没有找到省份 {province} 的信息")
return
# 获取城市数据
for item in ProvinceData["subLevelModelList"]:
if item["name"] == city:
Data = item
self.citys[province][city] = {}
break
if Data is None:
Logger.warn(f"没有找到城市 {city} 的信息")
return
self.citys[province][city]["url"] = Data["url"]
self.citys[province][city]["code"] = Data["code"]
return self.citys[province][city]
except Exception as e:
Logger.warn(f"获取城市信息失败", e, {"province": province, "city": city})
return
def send(self):
# 所有检查
if core.detect():
self.send()
return
# 检查简历信息
if len(self.info["resumes"]) == 0:
send_amount = 0
for name, resume in self.esumes.items():
send_amount += len(resume["keywords"]) * len(resume["citys"])
self.info["resumes"][name] = {}
for name, resume in self.esumes.items():
for province, citys in resume["citys"].items():
self.info["resumes"][name][province] = {}
for city in citys:
self.info["resumes"][name][province][city] = {}
for keyword in resume["keywords"]:
self.info["resumes"][name][province][city][keyword] = {
"expected": self.send_amount // send_amount,
"actual": 0,
"page": 1,
"surplus": False,
}
self.save_info()
# 取出本次需要投递的
current_name = None
current_keyword = None
current_province = None
current_city = None
actual_amount = 0
for name, provinces in self.info["resumes"].items():
for province, citys in provinces.items():
for city, kywords in citys.items():
for keyword, info in kywords.items():
if (
info["surplus"] == False
and info["actual"] < info["expected"]
):
current_province = province
current_city = city
current_keyword = keyword
current_name = name
break
if current_name is not None:
break
if current_name is not None:
break
if current_name is not None:
break
if actual_amount < self.send_amount and current_keyword is None:
for name, provinces in self.info["resumes"].items():
for province, citys in provinces.items():
for city, kywords in citys.items():
for keyword, info in kywords.items():
if info["surplus"] == False:
current_province = province
current_city = city
current_keyword = keyword
current_name = name
break
if current_name is not None:
break
if current_name is not None:
break
if current_name is not None:
break
if current_keyword is None:
Logger.info("投递完毕")
self.driver.close()
exit()
info_path = [
"resumes",
current_name,
current_province,
current_city,
current_keyword,
]
Logger.info(
f"当前投递 {current_name} - {current_province} - {current_city} - {current_keyword}"
)
# 获取要投递的城市链接
city_info = self.get_city_info({current_province: [current_city]})
if city_info is None:
self.info["resumes"][current_name][current_province][current_city][
current_keyword
]["surplus"] = True
self.save_info()
return
# 跳转到搜索页面
self.driver.get(
self.get_url(
f"/web/geek/job?query={current_keyword}&city={city_info['code']}&page={base.deep_get(self.info, info_path)['page']}"
)
)
# 筛选函数
def set_filter(tag_name, filter_name):
if filter_name == "不限":
return True
if core.detect():
return False
level_ones_dict = {}
try:
level_ones = self.driver.find_elements(
By.CLASS_NAME, "placeholder-text"
)
for item in level_ones:
s = item.find_element(By.XPATH, "./..")
box = s.find_element(By.XPATH, "./..")
level_ones_dict[item.text] = {}
level_ones_dict[item.text]["father"] = box
level_ones_dict[item.text]["self"] = item
except Exception as e:
Logger.error("筛选的一级菜单盒子查找失败", e)
if level_ones_dict == 0:
Logger.error("筛选的一级菜单盒子查找失败")
return False
try:
level_one_box = level_ones_dict[tag_name]["father"]
level_two_box = level_one_box.find_element(
By.CLASS_NAME, "filter-select-dropdown"
)
level_twos = level_two_box.find_elements(By.TAG_NAME, "a")
if len(level_twos) == 0:
level_twos = level_two_box.find_elements(By.TAG_NAME, "li")
if len(level_twos) == 0:
Logger.warn(f"筛选的二级菜单列表内容查找失败 {tag_name}")
for item in level_twos:
tag_text = item.get_attribute("innerText").strip()
if tag_text == filter_name:
core.human_move(level_ones_dict[tag_name]["self"])
core.human_click(item)
Logger.info(f"设置筛选条件成功 {tag_name}-{filter_name}")
self.request_await()
try:
WebDriverWait(self.driver, 10).until(EC.staleness_of(item))
finally:
return True
Logger.info(f"设置筛选条件失败 {tag_name}-{filter_name}")
except:
Logger.info(f"设置筛选条件失败 {tag_name}-{filter_name}")
# 筛选
for key,value in self.filter_dict.items():
data = self.esumes[current_name][value]
if isinstance(data, str):
if set_filter(key,data) is False:
return
elif isinstance(data, list):
for name in data:
if set_filter(key,name) is False:
return
else:
Logger.info("格式错误 {key}")
Logger.info("应用筛选标签完成")
time.sleep(1000)
core = BossCore()
# 配置日志输出
Logger.enable_log_save(core.output_path)
# 首次打开页面
core.request_await()
core.driver.get(core.get_url())
core.page_load_await()
# 添加cookies
# core.request_await()
# core.add_cookies()
# core.page_load_await()
# 投递
core.send()
core.driver.close()