首页 > Java > 健壮的、便捷的、异步的SocketChannel实现

健壮的、便捷的、异步的SocketChannel实现

2012年12月20日 发表评论 阅读评论

Socket通信比较常见的问题有如下几种:
1、设置收发超时;
2、正确的每一个bit的收发;
3、物理线路故障的保护;
4、始终能正常工作;
5、尽量少占系统资源;
n、……
而Socket编程有一个共性,尽管100个人可能会写出1000种实现,但做的事情却只有一种,就是: 通信。
为此,通过学习dnsjava的通信代码,加上自己在一些项目中的实践,现在给出TCP通信的例子实现如下,希望能够给想偷懒的人一个简单的解决方案。
本方案在正常的局域网连接中测试过几百万次没什么问题。缺乏更艰苦的环境,所以如果使用这些代码发生任何风险的话……
(TcpChannel代码为Brian Wellington所做,原名为TCPClient,经本人稍作改动)
view sourceprint?
001
// Copyright (c) 2005 Brian Wellington (bwelling@xbill.org)
002

003
package asynchronizedchannel;
004

005
import java.io.*;
006
import java.net.*;
007
import java.nio.*;
008
import java.nio.channels.*;
009

010
final class TcpChannel
011
{
012
private long endTime;
013
private SelectionKey key;
014

015
public TcpChannel(SelectableChannel channel, long endTime, int op) throws IOException
016
{
017
boolean done = false;
018
Selector selector = null;
019
this.endTime = endTime;
020
try {
021
selector = Selector.open();
022
channel.configureBlocking(false);
023
key = channel.register(selector, op);
024
done = true;
025
} finally {
026
if (!done && selector != null) {
027
selector.close();
028
}
029
if (!done) {
030
channel.close();
031
}
032
}
033
}
034

035
static void blockUntil(SelectionKey key, long endTime) throws IOException
036
{
037
long timeout = endTime – System.currentTimeMillis();
038
int nkeys = 0;
039
if (timeout > 0) {
040
nkeys = key.selector().select(timeout);
041
} else if (timeout == 0) {
042
nkeys = key.selector().selectNow();
043
}
044
if (nkeys == 0) {
045
throw new SocketTimeoutException();
046
}
047
}
048

049
void cleanup()
050
{
051
try {
052
key.selector().close();
053
key.channel().close();
054
} catch (IOException ex) {
055
ex.printStackTrace();
056
}
057
}
058

059
void bind(SocketAddress addr) throws IOException
060
{
061
SocketChannel channel = (SocketChannel) key.channel();
062
channel.socket().bind(addr);
063
}
064

065
void connect(SocketAddress addr) throws IOException
066
{
067
SocketChannel channel = (SocketChannel) key.channel();
068

069
key.interestOps(key.interestOps() | SelectionKey.OP_CONNECT);
070

071
try {
072
if (!key.isConnectable()) {
073
blockUntil(key, endTime);
074
}
075
if (!channel.connect(addr) && !channel.finishConnect()) {
076
throw new ConnectException();
077
}
078
} finally {
079
if (key.isValid()) {
080
key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT);
081
}
082
}
083
}
084

085
void send(ByteBuffer buffer) throws IOException
086
{
087
Send.operate(key, buffer, endTime);
088
}
089

090
void recv(ByteBuffer buffer) throws IOException
091
{
092
Recv.operate(key, buffer, endTime);
093
}
094
}
095

096
interface Operator
097
{
098
class Operation
099
{
100
static void operate(final int op, final SelectionKey key, final ByteBuffer buffer, final long endTime, final Operator optr) throws IOException
101
{
102
final SocketChannel channel = (SocketChannel) key.channel();
103
final int total = buffer.capacity();
104
key.interestOps(op);
105
try {
106
while (buffer.position() < total) {
107
if (System.currentTimeMillis() > endTime) {
108
throw new SocketTimeoutException();
109
}
110
if ((key.readyOps() & op) != 0) {
111
if (optr.io(channel, buffer) < 0) {
112
throw new EOFException();
113
}
114
} else {
115
TcpChannel.blockUntil(key, endTime);
116
}
117
}
118
} finally {
119
if (key.isValid()) {
120
key.interestOps(0);
121
}
122
}
123
}
124
}
125

126
int io(SocketChannel channel, ByteBuffer buffer) throws IOException;
127
}
128
class Send implements Operator
129
{
130
public int io(SocketChannel channel, ByteBuffer buffer) throws IOException
131
{
132
return channel.write(buffer);
133
}
134
public static final void operate(final SelectionKey key, final ByteBuffer buffer, final long endTime) throws IOException
135
{
136
Operation.operate(SelectionKey.OP_WRITE, key, buffer, endTime, operator);
137
}
138
public static final Send operator = new Send();
139
}
140

141
class Recv implements Operator
142
{
143
public int io(SocketChannel channel, ByteBuffer buffer) throws IOException
144
{
145
return channel.read(buffer);
146
}
147

148
public static final void operate(final SelectionKey key, final ByteBuffer buffer, final long endTime) throws IOException
149
{
150
Operation.operate(SelectionKey.OP_READ, key, buffer, endTime, operator);
151
}
152
public static final Recv operator = new Recv();
153
}
使用演示见以下代码。
大致说明一下,Server端开5656侦听,Client端开若干线程测试Socket通信。每次发送240字节信息+16字节MD5校验。服务端收到信息之后做MD5检查,正确的,发送“.xxxx”表示认可,否则发送“?xxxx”表示故障。
正式应用中可以再设置tryout尝试n次。
Server端,代码演示:

001
package asynchronizedchannel;
002

003
import java.io.IOException;
004
import java.net.InetSocketAddress;
005
import java.nio.ByteBuffer;
006
import java.nio.channels.SelectionKey;
007
import java.nio.channels.Selector;
008
import java.nio.channels.ServerSocketChannel;
009
import java.nio.channels.SocketChannel;
010
import java.security.MessageDigest;
011
import java.util.Iterator;
012

013
public class Server
014
{
015

016
/**
017
* 服务端通信范例程序主函数
018
*
019
* @param args
020
* @throws IOException
021
*/
022
public static void main(String[] args) throws IOException
023
{
024
// Create the selector
025
final Selector selector = Selector.open();
026
final ServerSocketChannel server = ServerSocketChannel.open();
027
server.configureBlocking(false);
028
server.socket().bind(new InetSocketAddress("xx.xx.xx.xx", 5656), 5);
029
// Register both channels with selector
030
server.register(selector, SelectionKey.OP_ACCEPT);
031
new Thread(new Daemon(selector)).start();
032
}
033
}
034

035
class Daemon implements Runnable
036
{
037
private final Selector selector;
038

039
Daemon(Selector selector)
040
{
041
this.selector = selector;
042
}
043

044
public void run()
045
{
046
while (true) {
047
try {
048
// Wait for an event
049
selector.select();
050

051
// Get list of selection keys with pending events
052
Iterator it = selector.selectedKeys().iterator();
053

054
// Process each key
055
while (it.hasNext()) {
056
// Get the selection key
057
SelectionKey selKey = it.next();
058

059
// Remove it from the list to indicate that it is being processed
060
it.remove();
061

062
// Check if it’s a connection request
063
if (selKey.isAcceptable()) {
064
// Get channel with connection request
065
ServerSocketChannel server = (ServerSocketChannel) selKey.channel();
066
// Accept the connection request.
067
// If serverSocketChannel is blocking, this method blocks.
068
// The returned channel is in blocking mode.
069
SocketChannel channel = server.accept();
070

071
// If serverSocketChannel is non-blocking, sChannel may be null
072
if (channel != null) {
073
// Use the socket channel to communicate with the client
074
new Thread(new ServerHandler(channel)).start();
075
} else {
076
System.out.println(“—No Connection—”);
077
// There were no pending connection requests; try again later.
078
// To be notified of connection requests,
079
}
080
}
081
}
082
} catch (Exception ex) {
083
ex.printStackTrace();
084
}
085
}
086
}
087
}
088

089
class ServerHandler implements Runnable
090
{
091
private static final long timeout = 30 * 1000; // 设置超时时间为30秒
092
private static int counter = 0;
093
private final TcpChannel channel;
094
private final MessageDigest md;
095

096
ServerHandler(SocketChannel channel) throws Exception
097
{
098
this.channel = new TcpChannel(channel, System.currentTimeMillis() + timeout, SelectionKey.OP_READ);
099
md = MessageDigest.getInstance(“md5″);
100
}
101

102
public void run()
103
{
104
try {
105
while (true) {
106
work();
107
synchronized (ServerHandler.class) {
108
if ((++counter & 65535) == 0) {
109
System.out.println(counter);
110
}
111
}
112
}
113
} catch (Exception e) {
114
e.printStackTrace();
115
} finally {
116
channel.cleanup();
117
}
118
}
119

120
private void work() throws IOException
121
{ // 模拟工作流程
122
byte[] cache = new byte[256], reply = new byte[5];
123
read(cache, reply);
124
}
125

126
private void read(byte[] cache, byte[] reply) throws IOException
127
{ // 从套接字读入数据
128
channel.recv(ByteBuffer.wrap(cache));
129
md.reset();
130
md.update(cache, 0, 240);
131
byte[] md5 = md.digest(); // 使用前240字节产生MD5校验码
132
if (!ExtArrays.partialEquals(md5, 0, cache, 240, 16)) { // 与后16字节比较
133
reply[0] = ‘?’;
134
System.out.println(“MISMATCH!”);
135
} else {
136
reply[0] = ‘.’;
137
}
138
channel.send(ByteBuffer.wrap(reply)); // 返回接收结果
139
}
140
}
141

142
final class ExtArrays
143
{
144
private ExtArrays()
145
{
146
}
147

148
public static boolean partialEquals(byte[] a, int offset_a, byte[] b, int offset_b, int len)
149
{ // 字节数组的部分比较
150
if (a == null || b == null) {
151
return false;
152
}
153
if (offset_a + len > a.length || offset_b + len > b.length) {
154
return false;
155
}
156
for (int i = offset_a, j = offset_b, k = len; k > 0; i++, j++, k–) {
157
if (a[i] != b[j]) {
158
return false;
159
}
160
}
161
return true;
162
}
163
}
Client端,代码演示:
01
package asynchronizedchannel;
02

03
import java.io.IOException;
04
import java.net.InetSocketAddress;
05
import java.nio.ByteBuffer;
06
import java.nio.channels.SelectionKey;
07
import java.nio.channels.SocketChannel;
08
import java.security.DigestException;
09
import java.security.MessageDigest;
10
import java.util.Random;
11

12
public class Client
13
{
14
private static int id = 0;
15
/**
16
* 客户端通信范例程序主函数
17
*
18
* @param args
19
* @throws Exception
20
*/
21
public static void main(String[] args) throws Exception
22
{
23
new Thread(new ClientHandler(id++)).start();
24
new Thread(new ClientHandler(id++)).start();
25
new Thread(new ClientHandler(id++)).start();
26
new Thread(new ClientHandler(id++)).start();
27
new Thread(new ClientHandler(id++)).start();
28
}
29

30
}
31

32
class ClientHandler implements Runnable
33
{
34
private static final long timeout = 30 * 1000; // 设置超时时间为30秒
35
private final TcpChannel channel;
36

37
private final int id;
38

39
private final MessageDigest md;
40
private final Random rand;
41

42
ClientHandler(int id) throws Exception
43
{
44
this.id = id;
45
channel = new TcpChannel(SocketChannel.open(), System.currentTimeMillis() + timeout, SelectionKey.OP_WRITE);
46
md = MessageDigest.getInstance(“md5″);
47
rand = new Random();
48
}
49

50
@Override
51
public void run()
52
{
53
try {
54
channel.connect(new InetSocketAddress(“xx.xx.xx.xx”, 5656));
55
int i = 0;
56
while (true) {
57
work();
58
if ((++i & 16383) == 0) {
59
System.out.println(String.format(“client(%1$d): %2$d”, id, i));
60
}
61
Thread.yield();
62
}
63
} catch (Exception e) {
64
e.printStackTrace();
65
} finally {
66
channel.cleanup();
67
}
68
}
69

70
private void work() throws IOException, DigestException
71
{
72
byte[] cache = new byte[256], reply = new byte[5];
73
write(cache, reply);
74
}
75

76
private void write(byte[] cache, byte[] reply) throws DigestException, IOException
77
{
78
rand.nextBytes(cache); // 只用前面的240字节
79
md.reset();
80
md.update(cache, 0, 240);
81
md.digest(cache, 240, 16); // MD5校验码占后面16字节
82
ByteBuffer buffer = ByteBuffer.wrap(cache);
83
channel.send(buffer);
84
buffer = ByteBuffer.wrap(reply);
85
channel.recv(buffer);
86
if (reply[0] != ‘.’) { // 若接收的结果不正确,可以考虑尝试再次发送
87
System.out.println(“MISMATCH!”);
88
}
89
}
90
}
重点说明:
发多少,收多少。要么固定发送和接收的字节数,要么在发送的时候带有发送字节数的信息,接收的时候根据该信息接收完整然后再处理。

分类: Java 标签:
  1. 本文目前尚无任何评论.
  1. 本文目前尚无任何 trackbacks 和 pingbacks.