practice_code/python/test/test2.py

39 lines
1.1 KiB
Python
Raw Normal View History

import json
import threading
from multiprocessing.connection import Client
class ProcessClient:
def __init__(self):
self.Process_port = 12321
self.Process_server = 'localhost'
self.Process_client_Client = Client((self.Process_server, self.Process_port))
Process_client_recv = threading.Thread(target=self.Process_client_recv)
print(self.Process_client_Client)
Process_client_recv.start()
def Process_client_send(self, target, function, content):
data = {"target": target, "function": function, "content": content}
data_json = json.dumps(data)
self.Process_client_Client.send(data_json)
def Process_client_recv(self):
while True:
try:
data_json = self.Process_client_Client.recv()
data = json.loads(data_json)
print(data)
except EOFError:
print("连接已关闭")
break
def Process_client_pick(self):
pass
if __name__ == '__main__':
xx = ProcessClient()
while True:
a = input("请输入")
xx.Process_client_send(a, a, a)