Python TCP服务器v1.6 – multiprocessing多进程及Ctrl-c(SIGINT)退出

Python TCP服务器v1.6 – multiprocessing多进程及Ctrl-c(SIGINT)退出多线程 threading 与多进程 multiprocess 线程 所有的线程运行在同一个进程中 共享相同的运行环境 每个独立的线程有一个程序入口 顺序执行序列和程序的出口 python 对线程的支持并不是非常好 GIL 所以你可以在很多文章上批评 python 的多线程的弊端 GIL 作为解释器的一个 Bug 一样的存在 我们也有一定的解决方法 用 Ctype 绕过解释器是我们一般的解决方法 主要用 multiprocess 来绕过多线程的瓶颈 并且退出线程很麻烦 进程 程序的一次执行 程

TCP聊天服务器套接字v1.6

| 多线程threading 与 多进程multiprocess

线程:所有的线程运行在同一个进程中,共享相同的运行环境。每个独立的线程有一个程序入口,顺序执行序列和程序的出口
python对线程的支持并不是非常好(GIL),所以你可以在很多文章上批评python的多线程的弊端
GIL作为解释器的一个Bug一样的存在,我们也有一定的解决方法,用Ctype绕过解释器是我们一般的解决方法,主要用multiprocessing来绕过多线程的瓶颈。并且退出线程很麻烦.




进程:程序的一次执行(程序载入内存,系统分配资源运行)。每个进程有自己的内存空间,数据栈等,进程之间可以进行通讯,但是不能共享信息。退出进程模块中有一个函数Process.terminate()可以直接调用

| signal

Python中要捕获信号,需要signal包来处理。

| 使用

from signal import SIGINT, signal import multiprocessing 
if __name__ == "__main__": server = Server("127.0.0.1", 429) process = multiprocessing.Process(target=server.run) def quit(signum, frame): if process.is_alive(): process.terminate() signal(SIGINT, quit) print("[i]Press Ctrl-c to shutdown the server.") process.start() while True: if not process.is_alive(): logger.info("Server shutdown complete.") break os.system("pause") 

感觉这次更新内容不怎么多

|全部代码

files
server.py
内个data.py就不传了




import os import socket # 导入 socket 模块 from threading import Thread import logging # from color import Text, Background, Print from signal import SIGINT, signal import data import multiprocessing import time __version__ = 1.6 def threading(Daemon, kwargs): thread = Thread(kwargs) thread.setDaemon(Daemon) thread.start() return thread def ignore(function): def i(*args, kwargs): try: function(*args, kwargs) except: return return i logger = logging.getLogger(__name__) logger.setLevel(level=logging.INFO) handler = logging.FileHandler("log.txt") handler.setLevel(logging.INFO) handler.setFormatter(logging.Formatter("[%(asctime)s(%(levelname)s)]: %(message)s")) logger.addHandler(handler) console = logging.StreamHandler() console.setLevel(logging.INFO) console.setFormatter(logging.Formatter("[%(asctime)s(%(levelname)s)]: %(message)s")) logger.addHandler(handler) logger.addHandler(console) bytecount = 1024 class Command_Handler(object): def __init__(self, bind): """Bind Client class""" assert isinstance(bind, Client) self.client = bind def _function(self, _list): data = { 
   "/info": { 
   "-v": self.get_version(), "-id": self.get_id(), "-i": self.info(), "-h": self.help(), "-name": self.name()}, } _dict = data for n in range(len(_list)): if type(_dict) == dict: _dict = _dict.get(_list[n], self.unknown(" ".join(_list))) else: break if type(_dict) == dict: _dict = "Error:\nThis command must take more arguments. Such as %s." % list( _dict.keys()) return _dict @staticmethod def help(): return """/info [-v] [-id] [-i] -v : get version of program. -id : get your id. -i : get information. -h : help. -name : get your name For example, /info -id""" @staticmethod def get_version(): return "version : " + str(__version__) def get_id(self): return "Your id is {}.".format(id(self.client)) def name(self): return "Your name is {}.".format(self.client.username) def info(self): return f"Socket Server[version { 
     self.get_version()}] By zmh." def unknown(self, s): return """Error: No command named "%s". Please search [/info -h] to help. %s""" % (s, self.help()) def cut(self, string): return string.strip().split() def handler(self, c): return "[command]%s\n%s" % ( c, str(self._function(self.cut(c)))) def iscommand(self, i): return i.strip().startswith("/") class Server(object): join_message = "Server> %s(%s) 连接服务器. 当前在线人数: %s" user_message = "%s(%s)%s> %s" quit_message = "%s(%s) 下线了, %s" def __init__(self, addr, port, backlog=10, encode='utf8'): self.address = addr, port self.backlog = backlog self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.bind(self.address) self.socket.listen(backlog) self.connect = [] self.encode = encode self.user_record = data.user() def clear_socket(self, clear_ms = 500): logger.info(f"Clear the closed socket once every { 
     clear_ms} ms.") while True: del_list = list(filter(lambda c: hasattr(c, 'Quitted') or (not c.isOpen()), self.connect)) for user in del_list: self.connect.remove(user) #if del_list: # logger.info(f"Clear the closed client socket, number is {len(del_list)}.") #else: # logger.info('None of the sockets have been cleaned.') time.sleep(clear_ms / 1000) def run(self): logger.debug(f"pid { 
     os.getpid()}.") logger.info(f"Server [{ 
     ':'.join(map(lambda i: str(i), self.address))}] on.") logger.info("Backlog number: " + str(self.backlog)) logger.info('The CODEC is sent as ' + self.encode) threading(Daemon=True, target=self.clear_socket) self.accept_client() def _get_Clients(self) -> list: def func(c): return c.__filter__() return list(filter(func, self.connect)) def _get_sockets(self): # return int return len(self._get_Clients()) def _str_sockets(self): return f"当前人数 { 
     self._get_sockets()}" def ServerMessage(self, mes, inc=True): for user in self._get_Clients(): if user.__filter__(): user._send(mes) def UserMessage(self, address, _user, mes, inc=True): if not mes: return for user in self.connect: if user.__filter__(): username = user.username send_message = Server.user_message % ("brown" if _user == username else "red", _user, address, "(我自己)" if _user == username else "", mes) user._send(send_message) logger.info(f"{ 
     address}[{ 
     _user}] : { 
     mes}") def error_handle(self): for user in filter(lambda c: not c.isOpen(), self.connect): print(user) self.connect.remove(user) def accept_client(self): while True: logger.info("The server is listening on the port.") client, address = self.socket.accept() # 阻塞,等待客户端连接 NewClient = Client(client, address[0], self) self.connect.append(NewClient) NewClient.run() logger.info(f'The address { 
     address[0]} is connected to the server') def quit(self, username, address): QuitMessage = Server.quit_message % (username, address, self._str_sockets()) logger.info(QuitMessage) self.ServerMessage(QuitMessage, False) def login(self, username, address): logger.info(f"{ 
     address}[{ 
     username}] 登录服务器 , " + self._str_sockets()) self.ServerMessage(Server.join_message % (username, address, self._get_sockets())) class Client(object): class QuitError(Exception): def __init__(self, *args): super().__init__(*args) def __init__(self, socket, addr, server: Server): self.socket = socket self.addr = addr if not isinstance(server, Server): raise ValueError self.server = server self.encode = self.server.encode self.com = Command_Handler(self) @self.error def _recv(self) -> bytes: return self.socket.recv(bytecount  2).decode(encoding=self.encode).strip() self._recv = lambda: _recv(self) @self.error def _send(self, message=str()) -> None: self.socket.sendall(message.encode(self.encode)) self._send = lambda m: _send(self, m) def __del__(self): self.socket.close() def isLogin(self) -> bool: return hasattr(self, "_login") and self._login def isOpen(self) -> bool: return not getattr(self.socket, "_closed", True) def __filter__(self) -> bool: """返回是否在线并已可接受消息""" return self.isLogin() and self.isOpen() def recv(self) -> str: data = self._recv() while not data: data = self._recv() # while not (data := self._recv()): # pass # 我的PythonIDE是3.8, PyCharm是3.7(anaconda 32x),而赋值表达式是3.8加进来的. return data @ignore def login(self): self._send(f'欢迎来到服务器[{ 
     self.server.address[0]}].您的ip地址为{ 
     self.socket.getpeername()[0]}') self.username = self.recv()[:8] if self.server.user_record.__in__(self.username): self._send("请输入您的密码: (右下[send]键发送)") i = self.recv() if self.server.user_record.handler(self.username, i): self._send(f'欢迎回来, { 
     self.username}.') else: self._send('密码错误,请重试.') self.__del__() else: def normal(string): return (4 <= len(string) <= 10) and not ('\n' in string) while True: self._send( "[i]提示: 密码需在4 ~ 10位之间, 且不能换行.\n请输入您的密码: (右下[send]键发送)") p1 = self.recv() if normal(p1): break while True: self._send("再次输入您的密码: (右下[send]键发送)") p2 = self.recv() if p1 == p2: break else: self._send("密码与前次不符!") self.server.user_record.handler(self.username, p1) self._send(f'初来乍到, { 
     self.username}') self._login = True self.server.login(self.username, self.addr) def quit(self) -> None: if hasattr(self, 'Quitted'): return self.Quitted = True if self.isOpen() is True: self.socket.close() self.server.quit(self.username, self.addr) @ignore def forever_receive(self): self.login() while self.__filter__(): string = self.recv() if string == Client.QuitError: return if self.com.iscommand(string): self._send(self.com.handler(string)) else: self.server.UserMessage(self.addr, self.username, string) def error(self, func): def function(*args, kwargs): try: res = func(*args, kwargs) return res except ConnectionAbortedError as e: self.quit() except Exception: logger.exception("error") return Client.QuitError return function def run(self): self.thread = threading(True, target=self.forever_receive) def get_host_ip() -> str: """get current IP address""" try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(('8.8.8.8', 80)) ip = s.getsockname()[0] finally: s.close() return ip if __name__ == "__main__": server = Server("127.0.0.1", 429) process = multiprocessing.Process(target=server.run) def quit(signum, frame): if process.is_alive(): process.terminate() signal(SIGINT, quit) print("[i]Press Ctrl-c to shutdown the server.") process.start() while True: if not process.is_alive(): logger.info("Server shutdown complete.") break os.system("pause") 

最后, 公布大家一个事,v1.7会带来服务端的GUI界面(PyQt5)!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/210915.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月18日 下午11:36
下一篇 2026年3月18日 下午11:37


相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号