使用 TCP / IP 套接字(Sockets)

  • Post author:
  • Post category:其他


使用TCP / IP 套接字(Sockets)

TCP/IP 套接字提供了跨网络的低层控制。TCP/IP套接字是两台计算机之间的逻辑连接,通过它,计算机能够在任何时间发送或接收数据;这个连接一直保持,直到这台计算机显式发出关闭指令。它提供了高度的灵活性,但也带来了大量的问题,在这一章中我们会看到,因此,除非真的需要非常高度的控制,否则,最好还是使用更抽象的网络协议,在这一章的后面我们也会谈到。

为了使用TCP/IP 套接字所必须的类包含在命名空间System.Net,表 11-1 进行了汇总。

表 11-1 使用 TCP/IP 套接字需要的类

描述

System.Net.Sockets

.TcpListener

服务器用这个类监听入站请求。

System.Net.Sockets

.TcpClient

服务器和客户端都使用这个类,控制如何在网络上发送数据。

System.Net.Sockets

.NetworkStream

这个类用于在网络上发送和接收数据。在网络上是发送字节,因此,要发送文本,通常要打包到另外的流类型中。

System.IO.StreamReader

这个类用于包装 NetworkStream 类,用来读取文本。

StreamReader 提供两个方法 ReadLine 和 ReadToEnd,以字符串形式返回流中的数据。

在 StreamReader [ 这里好像不应该是 StreamWriter ]创建时,可以使用各种不同的文本编码,由System.Text.Encoding 类的实例提供。

System.IO.StreamWriter

这个类用于包装 NetworkStream 类,用于写文本。

StreamWriter 提供两个方法 Write 和 WriteLine,以字符串形式把数据写入流。

在 StreamWriter 创建时,可以使用各种不同的文本编码,由System.Text.Encoding 类的实例提供。

这一章的第一个例子,我们将创建一个聊天程序,有聊天服务器(见清单 11-1 )和客户端(见清单 11-2 )。聊天服务器的任务是等待并监听客户端的连接,一旦有客户端连接,必须要求客户提供用户名;还必须连续监听所有客户端的入站消息。一旦有入站消息到达,就把这个消息推给所有的客户端。客户端的任务是连接到服务器,并提供用户界面,来阅读接收到的消息,写消息并发送给其他用户。TCP/IP 连接非常适合这种类型的应用程序,因为,连接总是可用,服务器能够直接把入站消息推给客户端,而不必从客户端拉消息。

清单 11-1 聊天服务器

open System

open System.IO

open System.Net

open System.Net.Sockets

open System.Text

open System.Threading

open System.Collections.Generic

// Enhance the TcpListener class so it canhandle async connections

type System.Net.Sockets.TcpListener with

memberx.AsyncAcceptTcpClient() =

Async.FromBeginEnd(x.BeginAcceptTcpClient,x.EndAcceptTcpClient)

// Type that defines protocol forinteracting with the ClientTable

type ClientTableCommands =

|Add of (string * StreamWriter)

|Remove of string

|SendMessage of string

|ClientExists of (string * AsyncReplyChannel<bool>)

// A class that will store a list of namesof connected clients along with

// streams that allow the client to bewritten too

type ClientTable() =

//create the mail box

letmailbox = MailboxProcessor.Start(fun inbox ->

//main loop that will read messages and update the

//client name/stream writer map

letrec loop (nameMap: Map<string, StreamWriter>) =

async { let! msg = inbox.Receive()

match msg with

| Add (name, sw) ->

return! loop (Map.add name swnameMap)

| Remove name ->

return! loop (Map.remove namenameMap)

| ClientExists (name, rc) ->

rc.Reply (nameMap.ContainsKeyname)

return! loop nameMap

| SendMessage msg ->

for (_, sw) in Map.toSeq nameMapdo

try

sw.WriteLine msg

sw.Flush()

with _ -> ()

return! loop nameMap }

//start the main loop with an empty map

loopMap.empty)

///add a new client

memberx.Add(name, sw) = mailbox.Post(Add(name, sw))

///remove an existing connection

memberx.Remove(name) = mailbox.Post(Remove name)

///handles the process of sending a message to all clients

memberx.SendMessage(msg) = mailbox.Post(SendMessage msg)

///checks if a client name is taken

memberx.ClientExists(name) = mailbox.PostAndReply(fun rc -> ClientExists(name,rc))

/// perform async read on a network streampassing a continuation

/// function to handle the result

let rec asyncReadTextAndCont (stream:NetworkStream) cont =

//unfortunatly we need to specific a number of bytes to read

//this leads to any messages longer than 512 being broken into

//different messages

async{ let buffer = Array.create 512 0uy

let! read = stream.AsyncRead(buffer, 0,512)

let allText = Encoding.UTF8.GetString(buffer, 0, read)

return cont stream allText }

// class that will handle clientconnections

type Server() =

//client table to hold all incoming client details

letclients = new ClientTable()

//handles each client

lethandleClient (connection: TcpClient) =

//get the stream used to read and write from the client

letstream = connection.GetStream()

//create a stream write to more easily write to the client

letsw = new StreamWriter(stream)

//handles reading the name then starts the main loop that handles

//conversations

letrec requestAndReadName (stream: NetworkStream) (name: string) =

// read the name

let name = name.Replace(Environment.NewLine,””)

// main loop that handles conversations

let rec mainLoop (stream: NetworkStream) (msg: string) =

try

// send received message to all clients

let msg = Printf.sprintf “%s: %s” name msg

clients.SendMessage msg

with _ ->

// any error reading a message causes client to disconnect

clients.Remove name

sw.Close()

Async.Start (asyncReadTextAndCont stream mainLoop)

if clients.ClientExists(name) then

// if name exists print error and relaunch request

sw.WriteLine(“ERROR – Name in use already!”)

sw.Flush()

Async.Start (asyncReadTextAndCont stream requestAndReadName)

else

// name is good lanch the main loop

clients.Add(name, sw)

Async.Start (asyncReadTextAndCont stream mainLoop)

//welcome the new client by printing “What is you name?”

sw.WriteLine(“Whatis your name? “);

sw.Flush()

//start the main loop that handles reading from the client

Async.Start(asyncReadTextAndCont stream requestAndReadName)

//create a tcp listener to handle incoming requests

letlistener = new TcpListener(IPAddress.Loopback, 4242)

//main loop that handles all new connections

letrec handleConnections() =

//start the listerner

listener.Start()

iflistener.Pending() then

// if there are pending connections, handle them

async { let! connection = listener.AsyncAcceptTcpClient()

printfn “New Connection”

// use a thread pool thread to handle the new request

ThreadPool.QueueUserWorkItem(fun _ ->

handleClient connection) |>ignore

// loop

return! handleConnections() }

else

// no pending connections, just loop

Thread.Sleep(1)

async { return! handleConnections() }

///allow tot

memberserver.Start() = Async.RunSynchronously (handleConnections())

// start the server class

(new Server()).Start()

我们从头开始看一下清单11-1 的程序。第一步定义一个类ClientTable,来管理连接到服务器的客户端;这是一个很好的示例,解释了如何如用信箱处理程序(MailboxProcessor)安全地在几个线程之间共享数据,这个方法与第十章“消息传递”的非常相似。我们回忆一下,信箱处理程序把非常客户端的消息进行排队,通过调用MailboxProcessor 类的 Receive 方法接收这些消息:

let! msg = inbox.Receive()

我们总是异步接收消息,这样,可以在等待消息期间不阻塞线程。提交消息给类,使用Post 方法:

mailbox.Post(Add(name, sw))

我们使用联合类型来定义发关和接收的消息。在这里,我们定义了四种操作,分别是Add、Remove、Current、SendMessage 和 ClientExists:

type ClientTableCommands =

|Add of (string * StreamWriter)

|Remove of string

|SendMessage of string

|ClientExists of (string * AsyncReplyChannel<bool>)

这些操作用模式匹配实现,根据接收到的消息,它在专门负责接收消息的异步工作流中,通常,我们使用无限递归的循环连续地读消息。在这个示例中,这个函数是 loop,loop 有一个参数,它是 F# 中不可变的 Map 类,负责在用字符串表示的客户端名字与用StreamWriter 表示的到这个客户端的连接之间的映射:

let rec loop (nameMap: Map<string,StreamWriter>) =

这种很好地实现了操作之间的状态共享。操作先更新这个映射,然后,再传递一个新的实例给下一次循环。Add 和Remove 的操作实现很简单,创建一个更新过的新映射,然后,传递给下一次循环。下面的代码只展示了 Add 操作,因为 Remove 操作非常相似:

| Add (name, sw) ->

return! loop (Map.add name sw nameMap)

ClientExists 操作有更有趣一点,因为必须要返回一个结果,对此,使用 AsyncReplyChannel(异步应答通道),它包含在ClientExists 联合的情况中:

| ClientExists (name, rc) ->

rc.Reply (nameMap.ContainsKey name)

return! loop nameMap

把消息传递给MailboxProcessor 类,是通过使用它的 PostAndReply方法,注意,不是前面看到过的 Post 方法,应答通道被加载到联合的情况中:

mailbox.PostAndReply(fun rc -> ClientExists(name,rc))

可能最有趣的操作是SendMessage,需要枚举出所有的客户端,然后把消息传递给它们。执行这个在MailboxProcessor 类当中,因为这个类实现了一个排队系统,这样,就能保证只有一个消息传递给所有的客户端只有一次;这种方法还保证了一个消息文本不会和其他消息混在一起,以及消息的到达顺序不会改变:

| SendMessage msg ->

for(_, sw) in Map.to_seq nameMap do

try

sw.WriteLine msg

sw.Flush()

with _ -> ()

下面,我们将看到代码中最困难的部分:如何有效地从连接的客户端读消息。要有效地读消息,必须使用异步的方式读,才能保证宝贵的服务器线程不会在等待客户端期间被阻塞,因为客户端发送消息相对并不频繁。F# 通过使用异步工作流,定代码已经很容易了;然而,要想让 F# 的异步工作流运行在最好的状态,还必须有大量的操作能够并发执行。在这里,我们想重复地执行一个异步操作,这是可能的,但有点复杂,因为我们必须使用连续的进行传递(continuation style passing)。我们定义一个函数 asyncReadTextAndCont,它异步地读网络流,并这个结果字符串和原始的网络流传递给它的连续函数(continuation function)。这里的连续函数是 cont:

/// perform async read on a network streampassing a continuation

/// function to handle the result

let rec asyncReadTextAndCont (stream:NetworkStream) cont =

async { let buffer = Array.create 512 0uy

let! read = stream.AsyncRead(buffer, 0, 512)

let allText = acc + Encoding.UTF8.GetString(buffer, 0, read)

return cont stream allText }

所以要注意这个函数的重要的一点是,当读取发生,物理线程将从函数返回,并可能返回到线程池;然而,我们不必担心物理线程太多的问题,因为当异步输入输出操作完成时,它会重新启动,结果会传递给 cont 函数。

然后,使用这个函数执行读客户端的所有任务,例如,主递归循环可能像这样:

let rec mainLoop (stream: NetworkStream)(msg: string) =

try

// send received message to all clients

let msg = Printf.sprintf “%s: %s” name msg

clients.SendMessage msg

with _ ->

// any error reading a message causes client to disconnect

clients.Remove name

sw.Close()

Async.Start (asyncReadTextAndCont stream mainLoop)

把接收到的 msg 字符串作为消息,执行发送操作;然后,使用asyncReadTextAndCont 函数进行递归循环,把 mainLoop 函数作为一个参数传递给它,再使用 Async.Start 函数发送消息,以fire-and-forget (启动后就不管了)模式启动异步工作流,就是说,它不会阻塞,并等待工作流的完成。

接着,创建TcpListener 类的实例。这个类是完成监听入站连接工作的,通常用被监听服务器的 IP 地址和端口号来初始化;当启动监听器时,告诉它监听的IP 地址。通常,它监听和这台计算机上网卡相关的所有 IP 地址的所有通信;然而,这仅是一个演示程序,因此,告诉TcpListener 类监听IPAddress.Loopback,表示只选取本地计算机的请求。使用端口号是判断网络通信只为这个应用程序服务,而不是别的。TcpListener 类一次只允许一个监听器监听一个端口。端口号的选择有点随意性,但要大于 1023,因为端口号 0 到 1023 是保留给专门的应用程序的。因此,我们在最后定义的函数handleConnections 中,使用 TcpListener 实例创建的监听器端口4242:

let listener = new TcpListener(IPAddress.Loopback,4242)

[

原文中为 server,就是来自上个版本,未做修改。

另外,本程序与原来的版本相比,作了较大的修改,或者说,是完全重写。

]

这个函数是个无限循环,它监听新的客户端连接,并创建新的线程来管理。看下面的代码,一旦有连接,就能检索出这个连接的实例,在新的线程池线程上启动管理它的工作。

let! connection =listener.AsyncAcceptTcpClient()

printfn “New Connection”

// use a thread pool thread to handle thenew request

ThreadPool.QueueUserWorkItem(fun _ ->handleClient connection) |> ignore

现在,我们知道了服务器是如何工作的,下面要看看客户端了,它在很多方面比服务器简单得多。清单11-2 是客户端的完整代码,注意,要引用Systems.Windows.Forms.dll 才能编译;在清单的后面是相关的代码讨论。

清单 11-2 聊天客户端

open System

open System.ComponentModel

open System.IO

open System.Net.Sockets

open System.Threading

open System.Windows.Forms

let form =

//create the form

letform = new Form(Text = “F# Talk Client”)

//text box to show the messages received

letoutput =

newTextBox(Dock = DockStyle.Fill,

ReadOnly = true,

Multiline = true)

form.Controls.Add(output)

//text box to allow the user to send messages

letinput = new TextBox(Dock = DockStyle.Bottom, Multiline = true)

form.Controls.Add(input)

//create a new tcp client to handle the network connections

lettc = new TcpClient()

tc.Connect(“localhost”,4242)

//loop that handles reading from the tcp client

letload() =

letrun() =

let sr = new StreamReader(tc.GetStream())

while(true) do

let text = sr.ReadLine()

if text <> null && text <> “” then

// we need to invoke back to the “gui thread”

// to be able to safely interact with the controls

form.Invoke(new MethodInvoker(fun () ->

output.AppendText(text + Environment.NewLine)

output.SelectionStart <- output.Text.Length))

|> ignore

//create a new thread to run this loop

lett = new Thread(new ThreadStart(run))

t.Start()

//start the loop that handles reading from the tcp client

//when the form has loaded

form.Load.Add(fun_ -> load())

letsw = new StreamWriter(tc.GetStream())

//handles the key up event – if the user has entered a line

//of text then send the message to the server

letkeyUp () =

if(input.Lines.Length> 1) then

let text = input.Text

if (text <> null && text <> “”) then

try

sw.WriteLine(text)

sw.Flush()

with err ->

MessageBox.Show(sprintf”Server error\n\n%O” err)

|> ignore

input.Text <- “”

//wire up the key up event handler

input.KeyUp.Add(fun_ -> keyUp ())

//when the form closes it’s necessary to explicitly exit the app

//as there are other threads running in the back ground

form.Closing.Add(fun_ ->

Application.Exit()

Environment.Exit(0))

//return the form to the top level

form

// show the form and start the apps eventloop

[<STAThread>]

do Application.Run(form)

运行前面的代码,产生如图11-2 所示的客户端服务器程序。

图 11-2 聊天客户端服务器程序

现在我们就来看一下清单11-2 中的客户端是如何工作的。代码的第一部分完成窗体各部分的初始化,这不是我们现在感兴趣的,有关Windows 窗体程序工作原理的详细内容可以回头看第八章。清单11-2 中与TCP/IP 套接字编程相关的第一部分是连接服务器,通过创建TcpClient 类的实例,然后调用它的Connect 方法:

let tc = new TcpClient()

tc.Connect(“localhost”, 4242)

在这里,我们指定localhost,即表示本地计算机,端口 4242,与服务器监听的端口相同。在更实际的例子中,可以用服务器的 DNS 名称,或者由让用户指定 DNS 名。因为我们在同一台计算机上运行这个示例程序,localhost 也是不错的选择。

Load 函数负责从服务器读取数据,把它附加到窗体的 Load 事件,是为了保证窗体装载并初始化完成后就能运行,我们需要与窗体的控件进行交互:

form.Load.Add(fun _ -> load())

[

原文中是temp,有误。

]

为了保证及时读取服务器上的所有数据,需要创建一个新的线程去读所有的入站请求;先定义函数run,然后,使用它启动一个新线程:

let t = new Thread(new ThreadStart(run))

t.Start()

在run 的定义中,先创建StreamReader,从连接中读文本;然后,使用无限循环,这样,保证线程不退出,能够连续从连接中读数据。发现数据之后,要用窗体的Invoke 方法更新窗体,这是因为不能从创建这个窗体之外的线程中更新窗体:

form.Invoke(new MethodInvoker(fun () ->

output.AppendText(text+ Environment.NewLine)

output.SelectionStart<- output.Text.Length))

[

原文中是temp,有误。

]

客户端的另外一部分,也是重要功能,是把消息写到服务器,这是在keyUp 函数中完成的,它被附加到输入文本框(input)的KeyUp 事件,这样,在文本框中每按一次键,下面的代码会触发:

input.KeyUp.Add(fun _ -> keyUp () )

keyUp 函数的实现是非常简单,发现超过一行,表示已按过 Enter 键,就通过网络发送所有可用的文本,并清除文本框。

现在,我们已经知道如何实现客户端和服务器了,再看看有关这个应用程序的一般问题。在清单11-1 和 11-2 中,在每次网络操作后都调用了Flush();否则,要等到流的缓存已满,数据才会通过网络传输,这会导致一个用户必须输入很多消息,才能出现在其他用户的屏幕上。

这种方法也有一些问题,特别是在服务器端。为每个入站的客户端分配一个线程,保证了对每个客户端能有很好的响应;但是,随着客户连接数的增加,对这些线程需要进行上下文切换(context switching,参见第十章“线程、内存、锁定和阻塞”一节)的数量也会增加,这样,服务器的整体性能就会下降。另外,每个客户端都要有它自己的线程,因此,客户端的最大数就受限于进程所能包含的最大线程数。这些问题是可以解决的,但是,通常简单的办法,是使用一些更加抽象的协议,下一节会有讨论。