第 27 章:Socket 和 Syslog

基本网络

本书的前几章,我们讨论了在网络上进行操作的服务。其中两个例子是数据库客户端/服务器和 web 服务。当需要设计新的协议,或者使用没有现成 Haskell 库的协议通信时,将需要使用 Haskell 库函数提供的底层网络工具。

本章中,我们将讨论这些底层工具。网络通讯是个大题目,可以用一整本书来讨论。本章中,我们将展示如何使用 Haskell 应用你已经掌握的底层网络知识。

Haskell 的网络函数几乎始终与常见的 C 函数调用相符。像其他在 C 上层的语言一样,你将发现其接口很眼熟。

使用 UDP 通信

UDP 将数据拆散为数据包。其不保证数据到达目的地,也不确保同一个数据包到达的次数。其用校验和的方式确保到达的数据包没有损坏。 UDP 适合用在对性能和延迟敏感的应用中,此类场景中系统的整体性能比单个数据包更重要。也可以用在 TCP 表现性能不高的场景,比如发送互不相关的短消息。适合使用 UDP 的系统的例子包括音频和视频会议、时间同步、网络文件系统、以及日志系统。

UDP 客户端例子:syslog

传统 Unix syslog 服务允许程序通过网络向某个负责记录的中央服务器发送日志信息。某些程序对性能非常敏感,而且可能会生成大量日志消息。这样的程序,将日志的开销最小化比确保每条日志被记录更重要。此外,在日志服务器无法访问时,使程序依旧可以操作或许是一种可取的设计。因此,UDP 是一种 syslog 支持的日志传输协议。这种协议比较简单,这里有一个 Haskell 实现的客户端:

  1. -- file: ch27/syslogclient.hs
  2. import Data.Bits
  3. import Network.Socket
  4. import Network.BSD
  5. import Data.List
  6. import SyslogTypes
  7.  
  8. data SyslogHandle =
  9. SyslogHandle {slSocket :: Socket,
  10. slProgram :: String,
  11. slAddress :: SockAddr}
  12.  
  13. openlog :: HostName -- ^ Remote hostname, or localhost
  14. -> String -- ^ Port number or name; 514 is default
  15. -> String -- ^ Name to log under
  16. -> IO SyslogHandle -- ^ Handle to use for logging
  17. openlog hostname port progname =
  18. do -- Look up the hostname and port. Either raises an exception
  19. -- or returns a nonempty list. First element in that list
  20. -- is supposed to be the best option.
  21. addrinfos <- getAddrInfo Nothing (Just hostname) (Just port)
  22. let serveraddr = head addrinfos
  23.  
  24. -- Establish a socket for communication
  25. sock <- socket (addrFamily serveraddr) Datagram defaultProtocol
  26.  
  27. -- Save off the socket, program name, and server address in a handle
  28. return $ SyslogHandle sock progname (addrAddress serveraddr)
  29.  
  30. syslog :: SyslogHandle -> Facility -> Priority -> String -> IO ()
  31. syslog syslogh fac pri msg =
  32. sendstr sendmsg
  33. where code = makeCode fac pri
  34. sendmsg = "<" ++ show code ++ ">" ++ (slProgram syslogh) ++
  35. ": " ++ msg
  36.  
  37. -- Send until everything is done
  38. sendstr :: String -> IO ()
  39. sendstr [] = return ()
  40. sendstr omsg = do sent <- sendTo (slSocket syslogh) omsg
  41. (slAddress syslogh)
  42. sendstr (genericDrop sent omsg)
  43.  
  44. closelog :: SyslogHandle -> IO ()
  45. closelog syslogh = sClose (slSocket syslogh)
  46.  
  47. {- | Convert a facility and a priority into a syslog code -}
  48. makeCode :: Facility -> Priority -> Int
  49. makeCode fac pri =
  50. let faccode = codeOfFac fac
  51. pricode = fromEnum pri
  52. in
  53. (faccode `shiftL` 3) .|. pricode

这段程序需要 SyslogTypes.hs ,代码如下:

  1. -- file: ch27/SyslogTypes.hs
  2. module SyslogTypes where
  3. {- | Priorities define how important a log message is. -}
  4.  
  5. data Priority =
  6. DEBUG -- ^ Debug messages
  7. | INFO -- ^ Information
  8. | NOTICE -- ^ Normal runtime conditions
  9. | WARNING -- ^ General Warnings
  10. | ERROR -- ^ General Errors
  11. | CRITICAL -- ^ Severe situations
  12. | ALERT -- ^ Take immediate action
  13. | EMERGENCY -- ^ System is unusable
  14. deriving (Eq, Ord, Show, Read, Enum)
  15.  
  16. {- | Facilities are used by the system to determine where messages
  17. are sent. -}
  18.  
  19. data Facility =
  20. KERN -- ^ Kernel messages
  21. | USER -- ^ General userland messages
  22. | MAIL -- ^ E-Mail system
  23. | DAEMON -- ^ Daemon (server process) messages
  24. | AUTH -- ^ Authentication or security messages
  25. | SYSLOG -- ^ Internal syslog messages
  26. | LPR -- ^ Printer messages
  27. | NEWS -- ^ Usenet news
  28. | UUCP -- ^ UUCP messages
  29. | CRON -- ^ Cron messages
  30. | AUTHPRIV -- ^ Private authentication messages
  31. | FTP -- ^ FTP messages
  32. | LOCAL0
  33. | LOCAL1
  34. | LOCAL2
  35. | LOCAL3
  36. | LOCAL4
  37. | LOCAL5
  38. | LOCAL6
  39. | LOCAL7
  40. deriving (Eq, Show, Read)
  41.  
  42. facToCode = [
  43. (KERN, 0),
  44. (USER, 1),
  45. (MAIL, 2),
  46. (DAEMON, 3),
  47. (AUTH, 4),
  48. (SYSLOG, 5),
  49. (LPR, 6),
  50. (NEWS, 7),
  51. (UUCP, 8),
  52. (CRON, 9),
  53. (AUTHPRIV, 10),
  54. (FTP, 11),
  55. (LOCAL0, 16),
  56. (LOCAL1, 17),
  57. (LOCAL2, 18),
  58. (LOCAL3, 19),
  59. (LOCAL4, 20),
  60. (LOCAL5, 21),
  61. (LOCAL6, 22),
  62. (LOCAL7, 23)
  63. ]
  64.  
  65. codeToFac = map (\(x, y) -> (y, x)) facToCode
  66.  
  67.  
  68. {- | We can't use enum here because the numbering is discontiguous -}
  69. codeOfFac :: Facility -> Int
  70. codeOfFac f = case lookup f facToCode of
  71. Just x -> x
  72. _ -> error $ "Internal error in codeOfFac"
  73.  
  74. facOfCode :: Int -> Facility
  75. facOfCode f = case lookup f codeToFac of
  76. Just x -> x
  77. _ -> error $ "Invalid code in facOfCode"

可以用 ghci 向本地的 syslog 服务器发送消息。服务器可以使用本章实现的例子,也可以使用其它的在 Linux 或者 POSIX 系统中的 syslog 服务器。注意,这些服务器默认禁用了 UDP 端口,你需要启用 UDP 以使 syslog 接收 UDP 消息。

可以使用下面这样的命令向本地 syslog 服务器发送一条消息:

  1. ghci> :load syslogclient.hs
  2. [1 of 2] Compiling SyslogTypes ( SyslogTypes.hs, interpreted )
  3. [2 of 2] Compiling Main ( syslogclient.hs, interpreted )
  4. Ok, modules loaded: SyslogTypes, Main.
  5. ghci> h <- openlog "localhost" "514" "testprog"
  6. Loading package parsec-2.1.0.0 ... linking ... done.
  7. Loading package network-2.1.0.0 ... linking ... done.
  8. ghci> syslog h USER INFO "This is my message"
  9. ghci> closelog h

UDP Syslog 服务器

UDP 服务器会在服务器上绑定某个端口。其接收直接发到这个端口的包,并处理它们。UDP 是无状态的,面向包的协议,程序员通常使用 recvFrom 这个调用接收消息和发送机信息,在发送响应时会用到发送机信息。

  1. -- file: ch27/syslogserver.hs
  2. import Data.Bits
  3. import Network.Socket
  4. import Network.BSD
  5. import Data.List
  6.  
  7. type HandlerFunc = SockAddr -> String -> IO ()
  8.  
  9. serveLog :: String -- ^ Port number or name; 514 is default
  10. -> HandlerFunc -- ^ Function to handle incoming messages
  11. -> IO ()
  12. serveLog port handlerfunc = withSocketsDo $
  13. do -- Look up the port. Either raises an exception or returns
  14. -- a nonempty list.
  15. addrinfos <- getAddrInfo
  16. (Just (defaultHints {addrFlags = [AI_PASSIVE]}))
  17. Nothing (Just port)
  18. let serveraddr = head addrinfos
  19.  
  20. -- Create a socket
  21. sock <- socket (addrFamily serveraddr) Datagram defaultProtocol
  22.  
  23. -- Bind it to the address we're listening to
  24. bindSocket sock (addrAddress serveraddr)
  25.  
  26. -- Loop forever processing incoming data. Ctrl-C to abort.
  27. procMessages sock
  28. where procMessages sock =
  29. do -- Receive one UDP packet, maximum length 1024 bytes,
  30. -- and save its content into msg and its source
  31. -- IP and port into addr
  32. (msg, _, addr) <- recvFrom sock 1024
  33. -- Handle it
  34. handlerfunc addr msg
  35. -- And process more messages
  36. procMessages sock
  37.  
  38. -- A simple handler that prints incoming packets
  39. plainHandler :: HandlerFunc
  40. plainHandler addr msg =
  41. putStrLn $ "From " ++ show addr ++ ": " ++ msg

这段程序可以在 ghci 中执行。执行 serveLog "1514" plainHandler 将建立一个监听 1514 端口的 UDP 服务器。其使用 plainHandler 将每条收到的 UDP 包打印出来。按下 Ctrl-C 可以终止这个程序。

Note

处理错误。 执行时收到了 bind: permission denied 消息?要确保端口值比 1024 大。某些操作系统不允许 root 之外的用户使用小于 1024 的端口。

使用 TCP 通信

TCP 被设计为确保互联网上的数据尽可能可靠地传输。 TCP 是数据流传输。虽然流在传输时会被操作系统拆散为一个个单独的包,但是应用程序并不需要关心包的边界。TCP 负责确保如果流被传送到应用程序,它就是完整的、无改动、仅传输一次且保证顺序。显然,如果线缆被破坏会导致流量无法送达,任何协议都无法克服这类限制。

与 UDP 相比,这带来一些折衷。首先,在 TCP 会话开始必须传递一些包以建立连接。其次,对于每个短会话,UDP 将有性能优势。另外,TCP 会努力确保数据到达。如果会话的一端尝试向远端发送数据,但是没有收到响应,它将周期性的尝试重新传输数据直至放弃。这使得 TCP 面对丢包时比较健壮可靠。可是,它同样意味着 TCP 不是实时传输协议(如实况音频或视频传输)的最佳选择。

处理多个 TCP 流

TCP 的连接是有状态的。这意味着每个客户机和服务器之间都有一条专用的逻辑“频道”,而不是像 UDP 一样只是处理一次性的数据包。这简化了客户端开发者的工作。服务器端程序几乎总是需要同时处理多条 TCP 连接。如何做到这一点呢?

在服务器端,首先需要创建一个 socket 并绑定到某个端口,就像 UDP 一样。但这回不是重复监听从任意地址发来的数据,取而代之,你的主循环将围绕 accept 调用编写。每当有一个客户机连接,服务器操作系统为其分配一个新的 socket 。所以我们的主 socket 只用来监听进来的连接,但从不发送数据。我们也获得了多个子 socket 可以同时使用,每个子 socket 从属于一个逻辑上的 TCP 会话。

在 Haskell 中,通常使用 forkIO 创建一个单独的轻量级线程以处理与子 socket 的通信。对此, Haskell 拥有一个高效的内部实现,执行得非常好。

TCP Syslog 服务器

让我们使用 TCP 的实现来替换 UDP 的 syslog 服务器。假设一条消息并不是定义为单独的包,而是以一个尾部的字符 ‘n’ 结束。任意客户端可以使用 TCP 连接向服务器发送 0 或多条消息。我们可以像下面这样实现:

  1. -- file: ch27/syslogtcpserver.hs
  2. import Data.Bits
  3. import Network.Socket
  4. import Network.BSD
  5. import Data.List
  6. import Control.Concurrent
  7. import Control.Concurrent.MVar
  8. import System.IO
  9.  
  10. type HandlerFunc = SockAddr -> String -> IO ()
  11.  
  12. serveLog :: String -- ^ Port number or name; 514 is default
  13. -> HandlerFunc -- ^ Function to handle incoming messages
  14. -> IO ()
  15. serveLog port handlerfunc = withSocketsDo $
  16. do -- Look up the port. Either raises an exception or returns
  17. -- a nonempty list.
  18. addrinfos <- getAddrInfo
  19. (Just (defaultHints {addrFlags = [AI_PASSIVE]}))
  20. Nothing (Just port)
  21. let serveraddr = head addrinfos
  22.  
  23. -- Create a socket
  24. sock <- socket (addrFamily serveraddr) Stream defaultProtocol
  25.  
  26. -- Bind it to the address we're listening to
  27. bindSocket sock (addrAddress serveraddr)
  28.  
  29. -- Start listening for connection requests. Maximum queue size
  30. -- of 5 connection requests waiting to be accepted.
  31. listen sock 5
  32.  
  33. -- Create a lock to use for synchronizing access to the handler
  34. lock <- newMVar ()
  35.  
  36. -- Loop forever waiting for connections. Ctrl-C to abort.
  37. procRequests lock sock
  38.  
  39. where
  40. -- | Process incoming connection requests
  41. procRequests :: MVar () -> Socket -> IO ()
  42. procRequests lock mastersock =
  43. do (connsock, clientaddr) <- accept mastersock
  44. handle lock clientaddr
  45. "syslogtcpserver.hs: client connnected"
  46. forkIO $ procMessages lock connsock clientaddr
  47. procRequests lock mastersock
  48.  
  49. -- | Process incoming messages
  50. procMessages :: MVar () -> Socket -> SockAddr -> IO ()
  51. procMessages lock connsock clientaddr =
  52. do connhdl <- socketToHandle connsock ReadMode
  53. hSetBuffering connhdl LineBuffering
  54. messages <- hGetContents connhdl
  55. mapM_ (handle lock clientaddr) (lines messages)
  56. hClose connhdl
  57. handle lock clientaddr
  58. "syslogtcpserver.hs: client disconnected"
  59.  
  60. -- Lock the handler before passing data to it.
  61. handle :: MVar () -> HandlerFunc
  62. -- This type is the same as
  63. -- handle :: MVar () -> SockAddr -> String -> IO ()
  64. handle lock clientaddr msg =
  65. withMVar lock
  66. (\a -> handlerfunc clientaddr msg >> return a)
  67.  
  68. -- A simple handler that prints incoming packets
  69. plainHandler :: HandlerFunc
  70. plainHandler addr msg =
  71. putStrLn $ "From " ++ show addr ++ ": " ++ msg

SyslogTypes 的实现,见 UDP 客户端例子:syslog

让我们读一下源码。主循环是 procRequests ,这是一个死循环,用于等待来自客户端的新连接。 accept 调用将一直阻塞,直到一个客户端来连接。当有客户端连接,我们获得一个新 socket 和客户机地址。我们向处理函数发送一条关于新连接的消息,接着使用 forkIO 建立一个线程处理来自客户机的数据。这条线程执行 procMessages

处理 TCP 数据时,为了方便,通常将 socket 转换为 Haskell 句柄。我们也同样处理,并明确设置了缓冲 – 一个 TCP 通信的要点。接着,设置惰性读取 socket 句柄。对每个传入的行,我们都将其传给 handle 。当没有更多数据时 – 远端已经关闭了 socket – 我们输出一条会话结束的消息。

因为可能同时收到多条消息,我们需要确保没有将多条消息同时写入一个处理函数。那将导致混乱的输出。我们使用了一个简单的锁以序列化对处理函数的访问,并且编写了一个简单的 handle 函数处理它。

你可以使用下面我们将展示的客户机代码测试,或者直接使用 telnet 程序来连接这个服务器。你向其发送的每一行输入都将被服务器原样返回。我们来试一下:

  1. ghci> :load syslogtcpserver.hs
  2. [1 of 1] Compiling Main ( syslogtcpserver.hs, interpreted )
  3. Ok, modules loaded: Main.
  4. ghci> serveLog "10514" plainHandler
  5. Loading package parsec-2.1.0.0 ... linking ... done.
  6. Loading package network-2.1.0.0 ... linking ... done.

此处,服务器从 10514 端口监听新连接。在有某个客户机过来连接之前,它什么事儿都不做。我们可以使用 telnet 来连接这个服务器:

  1. ~$ telnet localhost 10514
  2. Trying 127.0.0.1...
  3. Connected to localhost.
  4. Escape character is '^]'.
  5. Test message
  6. ^]
  7. telnet> quit
  8. Connection closed.

于此同时,在我们运行 TCP 服务器的终端上,你将看到如下输出:

  1. From 127.0.0.1:38790: syslogtcpserver.hs: client connnected
  2. From 127.0.0.1:38790: Test message
  3. From 127.0.0.1:38790: syslogtcpserver.hs: client disconnected

其显示一个客户端从本机 (127.0.0.1) 的 38790 端口连上了主机。连接之后,它发送了一条消息,然后断开。当你扮演一个 TCP 客户端时,操作系统将分配一个未被使用的端口给你。通常这个端口在你每次运行程序时都不一样。

TCP Syslog 客户端

现在,为我们的 TCP syslog 协议编写一个客户端。这个客户端与 UDP 客户端类似,但是有一些变化。首先,因为 TCP 是流式协议,我们可以使用句柄传输数据而不需要使用底层的 socket 操作。其次,不在需要在 SyslogHandle 中保存目的地址,因为我们将使用 connect 建立 TCP 连接。最后,我们需要一个途径,以区分不同的消息。UDP 中,这很容易,因为每条消息都是不相关的逻辑包。TCP 中,我们将仅使用换行符 ‘n’ 来作为消息结尾的标识,尽管这意味着不能在单条消息中发送多行信息。这是代码:

  1. -- file: ch27/syslogtcpclient.hs
  2. import Data.Bits
  3. import Network.Socket
  4. import Network.BSD
  5. import Data.List
  6. import SyslogTypes
  7. import System.IO
  8.  
  9. data SyslogHandle =
  10. SyslogHandle {slHandle :: Handle,
  11. slProgram :: String}
  12.  
  13. openlog :: HostName -- ^ Remote hostname, or localhost
  14. -> String -- ^ Port number or name; 514 is default
  15. -> String -- ^ Name to log under
  16. -> IO SyslogHandle -- ^ Handle to use for logging
  17. openlog hostname port progname =
  18. do -- Look up the hostname and port. Either raises an exception
  19. -- or returns a nonempty list. First element in that list
  20. -- is supposed to be the best option.
  21. addrinfos <- getAddrInfo Nothing (Just hostname) (Just port)
  22. let serveraddr = head addrinfos
  23.  
  24. -- Establish a socket for communication
  25. sock <- socket (addrFamily serveraddr) Stream defaultProtocol
  26.  
  27. -- Mark the socket for keep-alive handling since it may be idle
  28. -- for long periods of time
  29. setSocketOption sock KeepAlive 1
  30.  
  31. -- Connect to server
  32. connect sock (addrAddress serveraddr)
  33.  
  34. -- Make a Handle out of it for convenience
  35. h <- socketToHandle sock WriteMode
  36.  
  37. -- We're going to set buffering to BlockBuffering and then
  38. -- explicitly call hFlush after each message, below, so that
  39. -- messages get logged immediately
  40. hSetBuffering h (BlockBuffering Nothing)
  41.  
  42. -- Save off the socket, program name, and server address in a handle
  43. return $ SyslogHandle h progname
  44.  
  45. syslog :: SyslogHandle -> Facility -> Priority -> String -> IO ()
  46. syslog syslogh fac pri msg =
  47. do hPutStrLn (slHandle syslogh) sendmsg
  48. -- Make sure that we send data immediately
  49. hFlush (slHandle syslogh)
  50. where code = makeCode fac pri
  51. sendmsg = "<" ++ show code ++ ">" ++ (slProgram syslogh) ++
  52. ": " ++ msg
  53.  
  54. closelog :: SyslogHandle -> IO ()
  55. closelog syslogh = hClose (slHandle syslogh)
  56.  
  57. {- | Convert a facility and a priority into a syslog code -}
  58. makeCode :: Facility -> Priority -> Int
  59. makeCode fac pri =
  60. let faccode = codeOfFac fac
  61. pricode = fromEnum pri
  62. in
  63. (faccode `shiftL` 3) .|. pricode

可以在 ghci 中试着运行它。如果还没有关闭之前的 TCP 服务器,你的会话看上去可能会像是这样:

  1. ghci> :load syslogtcpclient.hs
  2. Loading package base ... linking ... done.
  3. [1 of 2] Compiling SyslogTypes ( SyslogTypes.hs, interpreted )
  4. [2 of 2] Compiling Main ( syslogtcpclient.hs, interpreted )
  5. Ok, modules loaded: Main, SyslogTypes.
  6. ghci> openlog "localhost" "10514" "tcptest"
  7. Loading package parsec-2.1.0.0 ... linking ... done.
  8. Loading package network-2.1.0.0 ... linking ... done.
  9. ghci> sl <- openlog "localhost" "10514" "tcptest"
  10. ghci> syslog sl USER INFO "This is my TCP message"
  11. ghci> syslog sl USER INFO "This is my TCP message again"
  12. ghci> closelog sl

结束时,服务器上将看到这样的输出:

  1. From 127.0.0.1:46319: syslogtcpserver.hs: client connnected
  2. From 127.0.0.1:46319: <9>tcptest: This is my TCP message
  3. From 127.0.0.1:46319: <9>tcptest: This is my TCP message again
  4. From 127.0.0.1:46319: syslogtcpserver.hs: client disconnected

<9> 是优先级和设施代码,和之前 UDP 例子中的意思一样。