1、server & client
server代码:
import socket,os
#server
HOST = ""
PORT = 8888
MySockServer = socket.socket(family=socket.AF_INET,type=socket.SOCK_STREAM) #创建一个socket实体
MySockServer.bind((HOST,PORT)) #绑定地址和端口,地址和端口为元组
#MySockServer.setblocking(True) #设置socket阻塞模式,默认为True,阻塞
MySockServer.listen() #监听网络连接,可设置最大的连接个数
conn,addr = MySockServer.accept() #等待client的连接请求,返回两个参数:conn,addr
while True:
print("Connected by",addr)
data = conn.recv(1024).decode() #接收client传送过来的数据,一般情况下需设置接收数据大小,socket传送的均为二进制数据
if len(data) == 0:
print("There is no data receve.")
continue
print("The reseve data is :",data)
res_data = os.popen(data).read()
res_size = len(res_data.encode("utf-8"))
print(res_size)
if res_size == 0 :
continue
conn.send(str(res_size).encode("utf-8")) #先传送数据包大小,便于接收端处理,解决数据粘包的问题
ack = conn.recv(1024)
if ack.decode() == "OK":
print("Receve OK")
print(res_data)
conn.sendall(res_data.encode("utf-8"))
client代码
import socket
HOST = "localhost"
PORT = 8888
mysocketclient =socket.socket(socket.AF_INET,socket.SOCK_STREAM)
mysocketclient.connect((HOST,PORT)) #client发起conn连接
while True:
data = input("Please enter data:").strip()
mysocketclient.send(data.encode("utf-8"))
data_size = int(mysocketclient.recv(1024).decode())
print(data_size)
if data_size > 0:
mysocketclient.send("OK".encode("utf-8"))
tdata_resp = b''
while data_size > 0:
if data_size < 1024:
data_resp = mysocketclient.recv(data_size)
tdata_resp += data_resp
print("000",data_size)
break
else:
data_resp = mysocketclient.recv(1024)
data_size -= len(data_resp)
tdata_resp += data_resp
print(data_size)
print(tdata_resp.decode())
以上代码实现简单的ssh服务器功能(未处理错误命令)。
socket.socket(family=socket.AF_INET,type=socket.SOCK_STREAM)
参数:
family,地址簇。
socket.AF_UNIX:
用于单一的Unix系统进程间通信
socket.
AF_INET:IPV4地址(缺省默认)
socket.AF_INET6:IPV6地址
type,类型。
socket.
SOCK_STREAM:数据流,用于TCP/IP(缺省默认)
socket.
SOCK_DGRAM:数据报,用于UDP
socket.SOCK_RAW:
原始套接字,普通的套接字无法处理ICMP、IGMP等网络报文,而SOCK_RAW可以;其次,SOCK_RAW也可以处理特殊的IPv4报文;此外,利用原始套接字,可以通过IP_HDRINCL套接字选项由用户构造IP头。
socket.SOCK_RDM:
是一种可靠的UDP形式,即保证交付数据报但不保证顺序。SOCK_RAM用来提供对原始协议的低级访问,在需要执行某些特殊操作时使用,如发送ICMP报文。
socket.SOCK_SEQPACKET:
可靠的连续数据包服务
sendto(string[,flag],address)
将数据发送到套接字,address是形式为(ipaddr,port)的元组,指定远程地址。返回值是发送的字节数。该函数主要用于UDP协议。
settimeout(timeout)
设置套接字操作的超时期,timeout是一个浮点数,单位是秒。值为None表示没有超时期。一般,超时期应该在刚创建套接字时设置,因为它们可能用于连接的操作(如 client 连接最多等待5s )
fileno()
返回套接字的文件描述符
DebugError信息:
Traceback (most recent call last):
File “E:/phthon/exercise/Noteforpython/note6_socket_client.py”, line 29, in <module>
print(tdata_resp.decode())
UnicodeDecodeError: ‘utf-8’ codec can’t decode bytes in position 828-829: unexpected end of data
不同编码制式的中文占用的存储空间长度不同,导致报错。坑了3小时!
>>> a = "王"
>>> len(a)
1
>>> len(a.encode("utf-8"))
3
>>> b = "Hello"
>>> len(b)
5
>>> len(b.encode("utf-8"))
5
2、SocketServer并发
import socketserver,os
class MyConcorrentServer(socketserver.BaseRequestHandler): #创建一个实体类,继承socketserver.BaseRequestHandler
def handle(self): #重写方法 handle(self),包括所有数据处理逻辑
while True:
try:
self.data = self.request.recv(1024).strip().decode() #接收socket数据
print("The reseve data is :", self.data)
res_data = os.popen(self.data).read()
res_size = len(res_data.encode("utf-8"))
print(res_size)
self.request.send(str(res_size).encode("utf-8"))
ack = self.request.recv(1024)
if ack.decode() == "OK":
print("Receve OK")
print(res_data)
self.request.sendall(res_data.encode("utf-8")) #发送socket数据
except ConnectionResetError as e: #抓取客户端断开异常
print("error info:",e)
break
if __name__ == '__main__':
HOST = ""
PORT = 8888
server = socketserver.ThreadingTCPServer((HOST,PORT),MyConcorrentServer) #多进程并发
#server = socketserver.ForkingTCPServer((HOST, PORT), MyConcorrentServer) #多线程并发
server.serve_forever()
3、进程、线程
程序,一系列指令的集合,是进程运行的静态描述;
进程,程序的一次执行活动,是程序的动态描述;一个进程在一个时间段只能做一件事情;
线程,操作系统能够调用运算的最小单位,是进程中的实际运算单元,一个进程中的多个线程可以同时并发执行不同的操作
区别:
- 线程共享创建它的进程的地址空间;进程有自己的地址空间。
- 线程可以直接访问进程的数据段;进程拥有自己的数据段。
- 线程可以直接与进程的其他线程通信;进程必须使用进程间通信来与兄弟进程通信。
- 线程可以对同一进程的线程进行相当大的控制;进程只能对子进程进行控制。
- 对主线程的更改(取消、优先级更改等)可能会影响进程中其他线程的行为;对父进程的更改不会影响子进程
多线程编程:
import threading,time
# def running():
# print("hello world!")
# t1 = threading.Thread(target=running,args=[1,])
# 直接调用生成一个线程
class MyThreading(threading.Thread): #实例化一个线程类,继承threading.Thread
def __init__(self,number):
threading.Thread.__init__(self)
self.number = number
def run(self): #重写RUN方法
i = 1
while i <= self.number:
print("This is process :",i)
i += 1
time.sleep(2)
print("the kill :",self.getName())
class MyThreading2(threading.Thread):
def __init__(self,number):
threading.Thread.__init__(self)
self.number = number
def run(self):
n = 0
while n <= self.number:
print("HELLO MY WORLD :",n)
n += 1
time.sleep(3)
print("the kill :", self.getName())
if __name__ == "__main__":
t1 = MyThreading(10) #创建一个线程
t2 = MyThreading2(2)
print("The process 1 name is ",t1.getName())
print("The process 2 name is ",t2.getName())
t2.setDaemon(True) #设置为守护线程,当主线程退出时,守护线程也会退出,由m启动的其它子线程会同时退出,不管是否执行完任务
t1.start() #启动一个线程
t2.start()
# t2.join() #等待一个线程结束
线程间通信
互斥锁
import threading,time
#--------互斥锁:Mutex---------
class MyThreadCreatr(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
global list1 #获取全局变量
tt = True
while tt:
lock.acquire() #操作全局变量前加锁
if list1[len(list1)-1] < 20:
list1.append(len(list1))
else:
tt = False
lock.release() #操作全局变量后立即释放锁
# time.sleep(1)
print("changed by ",self.getName())
if __name__ == "__main__":
list1 = [0,]
lock = threading.Lock() #生成一个互斥锁实例
for i in range(5):
t = MyThreadCreatr()
t.start()
while True:
if len(threading.enumerate()) == 1:
#返回当前所有线程对象的列表。该列表包括daemonic线程、current_thread()创建的虚拟线程对象和主线程。
# 它排除终止的线程和尚未启动的线程。
print(list1)
break
信号量:设置允许多少个线程操作共享数据
import threading,time
#--------信号量:Semaphore---------
class MyThreadCreatr(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
global list1 #获取全局变量
tt = 3
while tt != 0:
sema.acquire() #操作全局变量前设置信号量
if list1[len(list1)-1] < 20:
list1.append(len(list1))
tt -= 1
else:
tt = 0
time.sleep(2)
sema.release() #操作全局变量后立即释放
time.sleep(2)
print("changed by ",self.getName())
if __name__ == "__main__":
list1 = [0,]
sema = threading.Semaphore(3) #生成一个信号量实例
for i in range(6):
t = MyThreadCreatr()
t.start()
while True:
if threading.active_count() == 1:
print(list1)
break
事件:
import threading,time
#--------事件:Event---------
class MyThreadCreatr(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
global list1 #获取全局变量
tt = True
while tt :
if not event.isSet(): #根据事件的状态执行相关逻辑
if list1[len(list1)-1] < 10:
list1.append(len(list1))
else:
tt = False
time.sleep(1)
print("changed by ",self.getName())
if __name__ == "__main__":
list1 = [0,]
ee = 1
event = threading.Event() #生成一个事件实例
if not event.isSet(): #首先设置事件初始状态
event.set()
for i in range(2):
t = MyThreadCreatr()
t.start()
while True:
if ee == 3 and event.isSet(): #根据条件改变事件的状态
event.clear()
ee = 1
print("event is clear...")
time.sleep(2)
event.set()
else:
ee += 1
print("event is set,please waitting...")
if threading.active_count() == 1:
print(list1)
break
队列,生产者消费者模型:
queue.Queue 先进先出
queue.LifoQueue 先进后出 类似于堆栈
queue.PriorityQueue 可设置处理优先级的队列
import threading,time , queue,random
class product1(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
tt = 0
while tt < 10:
if myq.qsize() < 20:
it = random.randrange(97,122)
myq.put(it)
tt += 1
time.sleep(random.randrange(1,5))
else:
time.sleep(random.randrange(1, 5))
class product2(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
tt = 0
while tt < 10:
if myq.qsize() <10:
it = random.randrange(65,90)
myq.put(it)
tt += 1
time.sleep(random.randrange(1,5))
else:
time.sleep(random.randrange(1, 5))
class customer(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
while True:
if myq.empty():
print("the queue is empty...")
time.sleep(2)
else:
data = myq.get()
print("the letter is {},by the threading {}".format(chr(data),self.getName()))
time.sleep(1)
if __name__ == "__main__":
myq = queue.Queue()
for i in range(2):
t1 = product1()
t2 = product2()
t3 = customer()
t1.start()
t2.start()
t3.start()
进程编程,进程间通信
进程间队列:
from multiprocessing import Process,Queue
import os
class mypr(Process):
def __init__(self,queue):
Process.__init__(self)
self.queue = queue
def run(self):
self.name = "dd"
self.queue.put(self.name)
self.queue.put(self.pid)
self.queue.put(self.ident)
self.queue.put(os.getppid())
if __name__ == "__main__":
myq = Queue()
# pr = Process(target=run(),args=("dd")) #直接调用新进程
pr = mypr(myq)
pr.start()
pr.join()
while not myq.empty():
print(myq.get())
管道pip
from multiprocessing import Pipe,Process
class mypr(Process):
def __init__(self,conn):
Process.__init__(self)
self.conn = conn
def run(self):
self.name = "HELLO"
self.conn.send(self.name)
if __name__ == "__main__":
in_pipe,out_pipe = Pipe() #定义管道端口
pr = mypr(in_pipe)
pr.start()
pr.join()
print(out_pipe.recv())
Manager,进程池:
from multiprocessing import Process,Manager,Pool
import time,os
def run(list):
list.append("iiii")
time.sleep(1)
print("the children process is ",os.getpid())
def runback():
print("this process is over...",os.getpid())
if __name__ == "__main__":
manager = Manager() #初始化一个manager,管理进程间的共享数据
list1 = manager.list(range(3)) #创建一个共享列表,manager可创建字典,变量等几乎所有的数据类型
pool = Pool(5) #初始化进程池,设置同时运行的进程数
for i in range(10):
# pool.apply(func=run,args=(list1,)) #Pool 创建进程,串行执行阻塞
pool.apply_async(func=run,args=(list1,),callback=runback()) #Pool 创建进程,并行执行不阻塞,回调函数由父进程执行
print(list1)
pool.close()
pool.join() #Pool的close 和join 方法顺序需注意
参考文章: