practice_code/python/test/Client/Transmission/File_operate.py

104 lines
4.4 KiB
Python
Raw Normal View History

2024-04-18 13:45:10 +08:00
import os
import csv
2024-04-19 00:48:24 +08:00
import time
2024-04-18 13:45:10 +08:00
from .Process_Client import ProcessClient
class FileOperate(ProcessClient):
2024-04-19 00:48:24 +08:00
def __init__(self, user_id):
2024-04-18 13:45:10 +08:00
ProcessClient.__init__(self)
2024-04-19 00:48:24 +08:00
self.user_id = user_id
self.root_path = None
self.Account_path = None
self.Contacts_path = None
self.History_path = None
self.file_root_path = None
self.other_file_path = None
self.image_file_path = None
2024-04-18 13:45:10 +08:00
current_file_path = __file__
current_file_name = os.path.basename(current_file_path).split('.')[0]
self.Process_client_send("Server", "Name", current_file_name)
def detection_data(self, Id):
2024-04-19 00:48:24 +08:00
self.user_id = Id
self.root_path = rf'.\{Id}'
self.Account_path = self.root_path + r'\Account.csv'
self.Contacts_path = self.root_path + r'\Contacts.csv'
self.History_path = self.root_path + r'\History.csv'
self.file_root_path = self.root_path + r'\file'
self.other_file_path = self.file_root_path + r'\other'
self.image_file_path = self.file_root_path + r'\image'
if not os.path.isdir(self.root_path):
os.mkdir(self.root_path)
if not os.path.isdir(self.file_root_path):
os.mkdir(self.file_root_path)
os.mkdir(self.other_file_path)
os.mkdir(self.image_file_path)
if not os.path.exists(self.Account_path):
with open(self.Account_path, 'w', encoding='utf-8') as f:
2024-04-18 13:45:10 +08:00
pass
2024-04-19 00:48:24 +08:00
with open(self.Account_path, 'r+', encoding='utf-8') as info:
data = csv.DictReader(info)
if os.path.getsize(self.Account_path) == 0: # 检查文件大小是否为0
date = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(0))
else:
for d in data:
date = d['UpDataTime']
2024-04-18 13:45:10 +08:00
target = "服务器"
genre = "数据更新"
2024-04-19 00:48:24 +08:00
data = {"genre": genre, "target": target, "content": date}
2024-04-18 13:45:10 +08:00
self.Process_client_send("Session_server", "send_server", data)
2024-04-20 01:36:15 +08:00
def read_data(self):
with open(self.Contacts_path, 'r', encoding='utf-8') as Contacts:
data = csv.reader(Contacts)
next(data)
for i in data:
content = {'Contact': i[0], 'Remark': i[1], 'state': i[2], 'UpDataTime': i[3]}
self.Process_client_send("Chat_main", "ChatPage_add_Contact_person", content)
with open(self.History_path, 'r', encoding='utf-8') as History_path:
data = csv.reader(History_path)
next(data)
for i in data:
content = {'send': i[0], 'receive': i[1], 'Type': i[2], 'content': i[3], 'UpDataTime': i[4]}
self.Process_client_send("Chat_main", "ChatPage_add_Contact_tab", content)
2024-04-19 00:48:24 +08:00
def save_data(self, data):
target = list(data.keys())[0]
data = list(data.values())[0]
match target:
case 'Account':
with open(self.Account_path, 'w+', encoding='utf-8', newline='') as file:
csv_file = csv.writer(file)
csv_file.writerow(list(data.keys()))
csv_file.writerow(list(data.values()))
case 'Contacts':
with open(self.Contacts_path, 'a+', encoding='utf-8', newline='') as file:
csv_file = csv.writer(file)
if os.path.getsize(self.Contacts_path) == 0: # 检查文件大小是否为0
csv_file.writerow(list(data.keys()))
csv_file.writerow(list(data.values()))
case 'History':
with open(self.History_path, 'a+', encoding='utf-8', newline='') as file:
csv_file = csv.writer(file)
if os.path.getsize(self.History_path) == 0: # 检查文件大小是否为0
csv_file.writerow(list(data.keys()))
csv_file.writerow(list(data.values()))
2024-04-20 01:36:15 +08:00
case '更新完成':
self.Process_client_send("ALL", "更新完成", eval(data))
2024-04-19 00:48:24 +08:00
2024-04-18 13:45:10 +08:00
def Process_client_pick(self, data):
2024-04-20 01:36:15 +08:00
if data['target'] in ['ALL', 'File_operate']:
2024-04-19 00:48:24 +08:00
match data['function']:
case 'detection_data':
self.detection_data(data['content'])
case 'save_data':
self.save_data(data['content'])
2024-04-20 01:36:15 +08:00
case 'read_data':
self.read_data()