Python之路——网络编程

  • Post author:
  • Post category:python


1、server & client

server代码:

import socket,os

#server
HOST = ""
PORT = 8888
MySockServer = socket.socket(family=socket.AF_INET,type=socket.SOCK_STREAM) #创建一个socket实体
MySockServer.bind((HOST,PORT))  #绑定地址和端口,地址和端口为元组
#MySockServer.setblocking(True)     #设置socket阻塞模式,默认为True,阻塞
MySockServer.listen()           #监听网络连接,可设置最大的连接个数
conn,addr = MySockServer.accept()       #等待client的连接请求,返回两个参数:conn,addr
while True:
    print("Connected by",addr)
    data = conn.recv(1024).decode()     #接收client传送过来的数据,一般情况下需设置接收数据大小,socket传送的均为二进制数据
    if len(data) == 0:
        print("There is no data receve.")
        continue
    print("The reseve data is :",data)
    res_data = os.popen(data).read()
    res_size = len(res_data.encode("utf-8"))
    print(res_size)
    if res_size == 0 :
        continue
    conn.send(str(res_size).encode("utf-8"))    #先传送数据包大小,便于接收端处理,解决数据粘包的问题
    ack = conn.recv(1024)
    if ack.decode() == "OK":
        print("Receve OK")
        print(res_data)
        conn.sendall(res_data.encode("utf-8"))

client代码

import socket

HOST = "localhost"
PORT = 8888
mysocketclient =socket.socket(socket.AF_INET,socket.SOCK_STREAM)
mysocketclient.connect((HOST,PORT))     #client发起conn连接
while True:
    data = input("Please enter data:").strip()
    mysocketclient.send(data.encode("utf-8"))
    data_size = int(mysocketclient.recv(1024).decode())
    print(data_size)
    if data_size > 0:
        mysocketclient.send("OK".encode("utf-8"))
        tdata_resp = b''
        while data_size > 0:
            if data_size < 1024:
                data_resp = mysocketclient.recv(data_size)
                tdata_resp += data_resp
                print("000",data_size)
                break
            else:
                data_resp = mysocketclient.recv(1024)
                data_size -=  len(data_resp)
                tdata_resp +=  data_resp
                print(data_size)
        print(tdata_resp.decode())

以上代码实现简单的ssh服务器功能(未处理错误命令)。


socket.socket(family=socket.AF_INET,type=socket.SOCK_STREAM)

参数:

family,地址簇。


socket.AF_UNIX:

用于单一的Unix系统进程间通信


socket.


AF_INET:IPV4地址(缺省默认)


socket.AF_INET6:IPV6地址


type,类型。


socket.


SOCK_STREAM:数据流,用于TCP/IP(缺省默认)


socket.


SOCK_DGRAM:数据报,用于UDP


socket.SOCK_RAW:

原始套接字,普通的套接字无法处理ICMP、IGMP等网络报文,而SOCK_RAW可以;其次,SOCK_RAW也可以处理特殊的IPv4报文;此外,利用原始套接字,可以通过IP_HDRINCL套接字选项由用户构造IP头。


socket.SOCK_RDM:

是一种可靠的UDP形式,即保证交付数据报但不保证顺序。SOCK_RAM用来提供对原始协议的低级访问,在需要执行某些特殊操作时使用,如发送ICMP报文。


socket.SOCK_SEQPACKET:

可靠的连续数据包服务


sendto(string[,flag],address)

将数据发送到套接字,address是形式为(ipaddr,port)的元组,指定远程地址。返回值是发送的字节数。该函数主要用于UDP协议。


settimeout(timeout)

设置套接字操作的超时期,timeout是一个浮点数,单位是秒。值为None表示没有超时期。一般,超时期应该在刚创建套接字时设置,因为它们可能用于连接的操作(如 client 连接最多等待5s )


fileno()

返回套接字的文件描述符



DebugError信息:

Traceback (most recent call last):

File “E:/phthon/exercise/Noteforpython/note6_socket_client.py”, line 29, in <module>

print(tdata_resp.decode())

UnicodeDecodeError: ‘utf-8’ codec can’t decode bytes in position 828-829: unexpected end of data

不同编码制式的中文占用的存储空间长度不同,导致报错。坑了3小时!

>>> a = "王"
>>> len(a)
1
>>> len(a.encode("utf-8"))
3
>>> b = "Hello"
>>> len(b)
5
>>> len(b.encode("utf-8"))
5

2、SocketServer并发

import socketserver,os

class MyConcorrentServer(socketserver.BaseRequestHandler):   #创建一个实体类,继承socketserver.BaseRequestHandler
    def handle(self):       #重写方法  handle(self),包括所有数据处理逻辑
        while True:
            try:
                self.data = self.request.recv(1024).strip().decode()   #接收socket数据
                print("The reseve data is :", self.data)
                res_data = os.popen(self.data).read()
                res_size = len(res_data.encode("utf-8"))
                print(res_size)
                self.request.send(str(res_size).encode("utf-8"))
                ack = self.request.recv(1024)
                if ack.decode() == "OK":
                    print("Receve OK")
                    print(res_data)
                    self.request.sendall(res_data.encode("utf-8"))       #发送socket数据
            except ConnectionResetError as e:     #抓取客户端断开异常
                print("error info:",e)
                break

if __name__ == '__main__':
    HOST = ""
    PORT = 8888
    server = socketserver.ThreadingTCPServer((HOST,PORT),MyConcorrentServer)    #多进程并发
    #server = socketserver.ForkingTCPServer((HOST, PORT), MyConcorrentServer)      #多线程并发
    server.serve_forever()

3、进程、线程

程序,一系列指令的集合,是进程运行的静态描述;

进程,程序的一次执行活动,是程序的动态描述;一个进程在一个时间段只能做一件事情;

线程,操作系统能够调用运算的最小单位,是进程中的实际运算单元,一个进程中的多个线程可以同时并发执行不同的操作

区别:

  • 线程共享创建它的进程的地址空间;进程有自己的地址空间。
  • 线程可以直接访问进程的数据段;进程拥有自己的数据段。
  • 线程可以直接与进程的其他线程通信;进程必须使用进程间通信来与兄弟进程通信。
  • 线程可以对同一进程的线程进行相当大的控制;进程只能对子进程进行控制。
  • 对主线程的更改(取消、优先级更改等)可能会影响进程中其他线程的行为;对父进程的更改不会影响子进程


多线程编程:

import threading,time

# def running():
#     print("hello world!")
# t1 = threading.Thread(target=running,args=[1,])
# 直接调用生成一个线程

class MyThreading(threading.Thread):   #实例化一个线程类,继承threading.Thread
    def __init__(self,number):
        threading.Thread.__init__(self)
        self.number = number

    def run(self):          #重写RUN方法
        i = 1
        while i <= self.number:
            print("This is process :",i)
            i += 1
            time.sleep(2)
        print("the kill :",self.getName())


class MyThreading2(threading.Thread):
    def __init__(self,number):
        threading.Thread.__init__(self)
        self.number = number

    def run(self):
        n = 0
        while n <= self.number:
            print("HELLO MY WORLD :",n)
            n += 1
            time.sleep(3)
        print("the kill :", self.getName())
if __name__ == "__main__":
    t1 = MyThreading(10)    #创建一个线程
    t2 = MyThreading2(2)

    print("The process 1 name is ",t1.getName())
    print("The process 2 name is ",t2.getName())
    t2.setDaemon(True)   #设置为守护线程,当主线程退出时,守护线程也会退出,由m启动的其它子线程会同时退出,不管是否执行完任务
    t1.start()      #启动一个线程
    t2.start()
    # t2.join()       #等待一个线程结束


线程间通信

互斥锁

import threading,time

#--------互斥锁:Mutex---------
class MyThreadCreatr(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        global list1        #获取全局变量
        tt = True
        while tt:
            lock.acquire()  #操作全局变量前加锁
            if list1[len(list1)-1] < 20:
                list1.append(len(list1))
            else:
                tt = False
            lock.release()      #操作全局变量后立即释放锁
            # time.sleep(1)
            print("changed by ",self.getName())

if __name__ == "__main__":
    list1 = [0,]
    lock = threading.Lock()     #生成一个互斥锁实例
    for i in range(5):
        t = MyThreadCreatr()
        t.start()
    while True:
        if len(threading.enumerate()) == 1:
            #返回当前所有线程对象的列表。该列表包括daemonic线程、current_thread()创建的虚拟线程对象和主线程。
            # 它排除终止的线程和尚未启动的线程。
            print(list1)
            break

信号量:设置允许多少个线程操作共享数据

import threading,time

#--------信号量:Semaphore---------
class MyThreadCreatr(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        global list1        #获取全局变量
        tt = 3
        while tt != 0:
            sema.acquire()  #操作全局变量前设置信号量
            if list1[len(list1)-1] < 20:
                list1.append(len(list1))
                tt -= 1
            else:
                tt = 0
            time.sleep(2)
            sema.release()      #操作全局变量后立即释放
            time.sleep(2)
            print("changed by ",self.getName())


if __name__ == "__main__":
    list1 = [0,]
    sema = threading.Semaphore(3)     #生成一个信号量实例
    for i in range(6):
        t = MyThreadCreatr()
        t.start()
    while True:
        if threading.active_count() == 1:
            print(list1)
            break

事件:

import threading,time

#--------事件:Event---------
class MyThreadCreatr(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        global list1        #获取全局变量
        tt = True
        while tt :
            if not event.isSet():       #根据事件的状态执行相关逻辑
                if list1[len(list1)-1] < 10:
                    list1.append(len(list1))
                else:
                    tt = False
                time.sleep(1)
                print("changed by ",self.getName())

if __name__ == "__main__":
    list1 = [0,]
    ee = 1
    event = threading.Event()     #生成一个事件实例
    if not event.isSet():       #首先设置事件初始状态
        event.set()
    for i in range(2):
        t = MyThreadCreatr()
        t.start()
    while True:
        if ee == 3 and event.isSet():       #根据条件改变事件的状态
            event.clear()
            ee = 1
            print("event is clear...")
            time.sleep(2)
            event.set()
        else:
            ee += 1
            print("event is set,please waitting...")
        if threading.active_count() == 1:
            print(list1)
            break

队列,生产者消费者模型:

queue.Queue       先进先出

queue.LifoQueue      先进后出   类似于堆栈

queue.PriorityQueue      可设置处理优先级的队列

import threading,time , queue,random

class product1(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
    def run(self):
        tt = 0
        while tt < 10:
            if myq.qsize() < 20:
                it = random.randrange(97,122)
                myq.put(it)
                tt += 1
                time.sleep(random.randrange(1,5))
            else:
                time.sleep(random.randrange(1, 5))
class product2(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
    def run(self):
        tt = 0
        while tt < 10:
            if myq.qsize() <10:
                it = random.randrange(65,90)
                myq.put(it)
                tt += 1
                time.sleep(random.randrange(1,5))
            else:
                time.sleep(random.randrange(1, 5))
class customer(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
    def run(self):
        while True:
            if myq.empty():
                print("the queue is empty...")
                time.sleep(2)
            else:
                data = myq.get()
                print("the letter is {},by the threading {}".format(chr(data),self.getName()))
                time.sleep(1)

if __name__ == "__main__":
    myq = queue.Queue()
    for i in range(2):
        t1 = product1()
        t2 = product2()
        t3 = customer()
        t1.start()
        t2.start()
        t3.start()

进程编程,进程间通信

进程间队列:

from multiprocessing import Process,Queue
import os

class mypr(Process):
    def __init__(self,queue):
        Process.__init__(self)
        self.queue = queue
    def run(self):
        self.name = "dd"
        self.queue.put(self.name)
        self.queue.put(self.pid)
        self.queue.put(self.ident)
        self.queue.put(os.getppid())

if __name__ == "__main__":
    myq = Queue()
    # pr = Process(target=run(),args=("dd"))   #直接调用新进程
    pr = mypr(myq)
    pr.start()
    pr.join()
    while not myq.empty():
        print(myq.get())

管道pip

from multiprocessing import Pipe,Process

class mypr(Process):
    def __init__(self,conn):
        Process.__init__(self)
        self.conn = conn
    def run(self):
        self.name = "HELLO"
        self.conn.send(self.name)
if __name__ == "__main__":
    in_pipe,out_pipe = Pipe()       #定义管道端口
    pr = mypr(in_pipe)
    pr.start()
    pr.join()
    print(out_pipe.recv())

Manager,进程池:

from multiprocessing import Process,Manager,Pool
import time,os

def run(list):
    list.append("iiii")
    time.sleep(1)
    print("the children process is ",os.getpid())
def runback():
    print("this process is over...",os.getpid())

if __name__ == "__main__":
    manager = Manager()     #初始化一个manager,管理进程间的共享数据
    list1 = manager.list(range(3))      #创建一个共享列表,manager可创建字典,变量等几乎所有的数据类型
    pool = Pool(5)      #初始化进程池,设置同时运行的进程数
    for i in range(10):
        # pool.apply(func=run,args=(list1,))            #Pool 创建进程,串行执行阻塞
        pool.apply_async(func=run,args=(list1,),callback=runback()) #Pool 创建进程,并行执行不阻塞,回调函数由父进程执行
    print(list1)
    pool.close()
    pool.join()     #Pool的close 和join 方法顺序需注意

参考文章:


http://www.cnblogs.com/wupeiqi/articles/5040823.html



版权声明:本文为newbo_yb原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。