重构聊天室案例,使用线程来实现一个服务器端可以同时接收多个客户端的信息。通信过程如表-1所示:
表- 1客户端与服务器端通信过程
客户端A,B,C . . .可以同时去连接服务器,和服务器进行通信。
之前的聊天室案例中,已经实现了客户端和服务器端一对一的通信。现在需要实现多个客户端连接同一个服务器,则需要服务器端循环等待多个客户端发送的请求。
为实现此功能,首先需要在服务器端定义内部类,并在该内部类中设置线程要执行的任务。此案例中,线程要执行的任务即为接收对应的客户端的消息并打印显示。该内部类的代码如下:
/** * 线程体,用于并发处理不同客户端的交互 */ private class ClientHandler implements Runnable { // 该线程用于处理的客户端 private Socket socket; public ClientHandler(Socket socket) { this.socket = socket; } @Override public void run() { try { InputStream in = socket.getInputStream(); InputStreamReader isr = new InputStreamReader(in, "UTF-8"); BufferedReader br = new BufferedReader(isr); // 循环读取客户端发送的信息 while (true) { System.out.println("客户端说:" + br.readLine()); } } catch (Exception e) { e.printStackTrace(); } finally { if (socket != null) { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
其次,在服务器端 Server 类中的 start 方法中,需要使用while (true)循环,在循环中阻塞等待多个客户端的连接。当有客户端连接时,则使用上文定义的任务作为线程的任务,并启动一个该任务对应的线程,来处理服务器和该客户端之间的通信,代码如下:
public void start() { try { //循环监听客户端的连接 while(true){ System.out.println("等待客户端连接..."); // 监听客户端的连接 Socket socket = serverSocket.accept(); System.out.println("客户端已连接!"); //启动一个线程来完成针对该客户端的交互 ClientHandler handler = new ClientHandler(socket); new Thread(handler).start(); } } catch (Exception e) { e.printStackTrace(); } }
而客户端的实现与V2版本完全相同。
实现此案例需要按照如下步骤进行。
步骤一:新建类Server
创建Server类,在该类的构造方法中,创建ServerSocket类的对象。在实例化对象时使用构造方法“ServerSocket(int port)”来构造ServerSocket类的对象,以申请服务的端口号,代码如下所示:
package com.tarena.part3; import java.net.ServerSocket; import java.net.Socket; /** * 服务端应用程序 */ public class Server { // 服务端Socket private ServerSocket serverSocket; /** * 构造方法,用于初始化 */ public Server() { try { serverSocket = new ServerSocket(8088); } catch (Exception e) { e.printStackTrace(); } } }
步骤二:定义服务器线程要执行的任务
在Server类中定义成员内部类ClientHandler。该内部类需要实现Runnable接口并实现该接口的run方法。在该方法中实现线程要执行的任务,在此,线程要执行的任务即为接收对应的客户端的消息和向对应的客户端发送消息,代码如下所示:
package com.tarena.part3; import java.net.ServerSocket; import java.net.Socket; #cold_boldimport java.io.BufferedReader; #cold_boldimport java.io.IOException; #cold_boldimport java.io.InputStream; #cold_boldimport java.io.InputStreamReader; /** * 服务端应用程序 */ public class Server { // 服务端Socket private ServerSocket serverSocket; /** * 构造方法,用于初始化 */ public Server() { try { serverSocket = new ServerSocket(8088); } catch (Exception e) { e.printStackTrace(); } } /** * 线程体,用于并发处理不同客户端的交互 */ #cold_bold private class ClientHandler implements Runnable { #cold_bold // 该线程用于处理的客户端 #cold_bold private Socket socket; #cold_bold public ClientHandler(Socket socket) { #cold_bold this.socket = socket; #cold_bold } #cold_bold @Override #cold_bold public void run() { #cold_bold try { #cold_bold InputStream in = socket.getInputStream(); #cold_bold InputStreamReader isr = new InputStreamReader(in, "UTF-8"); #cold_bold BufferedReader br = new BufferedReader(isr); #cold_bold #cold_bold // 循环读取客户端发送的信息 #cold_bold while (true) { #cold_bold System.out.println("客户端说:" + br.readLine()); #cold_bold } #cold_bold } catch (Exception e) { #cold_bold e.printStackTrace(); #cold_bold } finally { #cold_bold if (socket != null) { #cold_bold try { #cold_bold socket.close(); #cold_bold } catch (IOException e) { #cold_bold e.printStackTrace(); #cold_bold } #cold_bold } #cold_bold } #cold_bold } #cold_bold } }
步骤三:启动线程
在Server类中添加start方法。在该方法中使用while (true)循环,在循环中阻塞等待多个客户端的连接。当有客户端连接时则使用上文定义的任务作为线程的任务,并启动一个该任务对应的线程,来处理服务器和该客户端之间的通信,代码如下所示:
package com.tarena.part3; import java.net.ServerSocket; import java.net.Socket; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; /** * 服务端应用程序 */ public class Server { // 服务端Socket private ServerSocket serverSocket; /** * 构造方法,用于初始化 */ public Server() { try { serverSocket = new ServerSocket(8088); } catch (Exception e) { e.printStackTrace(); } } /** * 服务端开启方法 */ #cold_bold public void start() { #cold_bold try { #cold_bold //循环监听客户端的连接 #cold_bold while(true){ #cold_bold System.out.println("等待客户端连接..."); #cold_bold // 监听客户端的连接 #cold_bold Socket socket = serverSocket.accept(); #cold_bold System.out.println("客户端已连接!"); #cold_bold #cold_bold //启动一个线程来完成针对该客户端的交互 #cold_bold ClientHandler handler = new ClientHandler(socket); #cold_bold new Thread(handler).start(); #cold_bold } #cold_bold } catch (Exception e) { #cold_bold e.printStackTrace(); #cold_bold } #cold_bold } /** * 线程体,用于并发处理不同客户端的交互 */ private class ClientHandler implements Runnable { // 该线程用于处理的客户端 private Socket socket; public ClientHandler(Socket socket) { this.socket = socket; } @Override public void run() { try { InputStream in = socket.getInputStream(); InputStreamReader isr = new InputStreamReader(in, "UTF-8"); BufferedReader br = new BufferedReader(isr); // 循环读取客户端发送的信息 while (true) { System.out.println("客户端说:" + br.readLine()); } } catch (Exception e) { e.printStackTrace(); } finally { if (socket != null) { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } } }
步骤四:编写启动服务器的代码
在Server类的main方法中,编写启动服务器端的代码。代码如下所示:
package com.tarena.part3; import java.net.ServerSocket; import java.net.Socket; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; /** * 服务端应用程序 */ public class Server { // 服务端Socket private ServerSocket serverSocket; /** * 构造方法,用于初始化 */ public Server() { try { serverSocket = new ServerSocket(8088); } catch (Exception e) { e.printStackTrace(); } } /** * 服务端开启方法 */ public void start() { try { //循环监听客户端的连接 while(true){ System.out.println("等待客户端连接..."); // 监听客户端的连接 Socket socket = serverSocket.accept(); System.out.println("客户端已连接!"); //启动一个线程来完成针对该客户端的交互 ClientHandler handler = new ClientHandler(socket); new Thread(handler).start(); } } catch (Exception e) { e.printStackTrace(); } } #cold_bold #cold_bold public static void main(String[] args) { #cold_bold Server server = new Server(); #cold_bold server.start(); #cold_bold } /** * 线程体,用于并发处理不同客户端的交互 */ private class ClientHandler implements Runnable { // 该线程用于处理的客户端 private Socket socket; public ClientHandler(Socket socket) { this.socket = socket; } @Override public void run() { try { InputStream in = socket.getInputStream(); InputStreamReader isr = new InputStreamReader(in, "UTF-8"); BufferedReader br = new BufferedReader(isr); // 循环读取客户端发送的信息 while (true) { System.out.println("客户端说:" + br.readLine()); } } catch (Exception e) { e.printStackTrace(); } finally { if (socket != null) { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } } }
步骤五:编写客户端的代码
新建Client类,并在该类中编写实现客户端需要的代码。Client 类的代码和上一个案例中的代码相同,详见完整代码。
本案例中,类Server的完整代码如下所示:
package com.tarena.part3; import java.net.ServerSocket; import java.net.Socket; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; /** * 服务端应用程序 */ public class Server { // 服务端Socket private ServerSocket serverSocket; /** * 构造方法,用于初始化 */ public Server() { try { serverSocket = new ServerSocket(8088); } catch (Exception e) { e.printStackTrace(); } } /** * 服务端开启方法 */ public void start() { try { //循环监听客户端的连接 while(true){ System.out.println("等待客户端连接..."); // 监听客户端的连接 Socket socket = serverSocket.accept(); System.out.println("客户端已连接!"); //启动一个线程来完成针对该客户端的交互 ClientHandler handler = new ClientHandler(socket); new Thread(handler).start(); } } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { Server server = new Server(); server.start(); } /** * 线程体,用于并发处理不同客户端的交互 */ private class ClientHandler implements Runnable { // 该线程用于处理的客户端 private Socket socket; public ClientHandler(Socket socket) { this.socket = socket; } @Override public void run() { try { InputStream in = socket.getInputStream(); InputStreamReader isr = new InputStreamReader(in, "UTF-8"); BufferedReader br = new BufferedReader(isr); // 循环读取客户端发送的信息 while (true) { System.out.println("客户端说:" + br.readLine()); } } catch (Exception e) { e.printStackTrace(); } finally { if (socket != null) { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } } }
类Client的完整代码如下所示:
package com.tarena.part3; import java.io.IOException; import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.net.Socket; import java.util.Scanner; /** * 客户端应用程序 */ public class Client { //客户端Socket private Socket socket; /** * 构造方法,用于初始化 */ public Client(){ try { socket = new Socket("localhost",8088); } catch (Exception e) { e.printStackTrace(); } } /** * 客户端工作方法 */ public void start(){ try { OutputStream out = socket.getOutputStream(); OutputStreamWriter osw = new OutputStreamWriter(out,"UTF-8"); PrintWriter pw = new PrintWriter(osw,true); //创建Scanner读取用户输入内容 Scanner scanner = new Scanner(System.in); while(true){ pw.println(scanner.nextLine()); } } catch (Exception e) { e.printStackTrace(); } finally{ if(socket != null){ try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } public static void main(String[] args) { Client client = new Client(); client.start(); } }
重构聊天室案例,使服务端可以将用户的信息转发给所有客户端,并在每个客户端控制台上显示。通信过程如表-2所示:
表- 2客户端与服务器端通信过程
之前的聊天室案例中,已经实现了多个客户端可以连接同一个服务器端的通信。现在需要实现服务端对某个客户端发送的信息进行广播(转发给所有客户端)的工作,并且使客户端在接收到服务端转发的信息后输出到控制台。
为实现此功能,首先需要在服务器端定义一个集合类型的属性,用于存储所有客户端的输出流。代码如下:
public class Server { // 服务端Socket private ServerSocket serverSocket; // 所有客户端输出流 private List<PrintWriter> allOut; }
然后在Server的内部类中run方法的最开始处将客户端的输出流存入该集合。之后,每当客户端发送信息后就遍历集合,将信息写入集合中所有的输出流中(相当于将信息转发给所有的客户端)。代码如下:
public void run() { PrintWriter pw = null; try { //将客户端的输出流存入共享集合,以便广播消息 OutputStream out = socket.getOutputStream(); OutputStreamWriter osw = new OutputStreamWriter(out,"UTF-8"); pw = new PrintWriter(osw,true); /* * 将用户信息存入共享集合 */ allOut.add(pw); }
随后将客户端发送过来的信息转发给所有的客户端,代码如下:
String message = null; // 循环读取客户端发送的信息 while ((message = br.readLine())!=null) { /* * 遍历所有输出流,将该客户端发送的信息转发给所有客户端 */ for(PrintWriter o : allOut){ o.println(message); } }
之后在客户端中定义一个内部类,并定义线程要执行的任务,这里循环读取服务端发送过来的信息,并打印到控制台。代码如下:
private class ServerHander implements Runnable{ @Override public void run() { try { InputStream in = socket.getInputStream(); InputStreamReader isr = new InputStreamReader(in, "UTF-8"); BufferedReader br = new BufferedReader(isr); while(true){ System.out.println(br.readLine()); } } catch (Exception e) { e.printStackTrace(); } } }
最后,在Client的start方法中启动一个该任务对应的线程,来处理服务器和该客户端之间的通信。代码如下:
//将接收服务端信息的线程启动 ServerHander handler = new ServerHander(); Thread t = new Thread(handler); t.setDaemon(true); t.start();
步骤一:定义Server类
定义 Server类,并在类中定义属性allOut,该属性使用PrintWriter作为集合的泛型,用于存储输出流。在Server类的构造方法中,创建java.util.ArrayList类的实例来初始化该属性。代码如下所示:
public class Server { // 服务端Socket private ServerSocket serverSocket; // 所有客户端输出流 #cold_bold private List<PrintWriter> allOut; /** * 构造方法,用于初始化 */ public Server() { try { serverSocket = new ServerSocket(8088); #cold_bold allOut = new ArrayList<PrintWriter>(); } catch (Exception e) { e.printStackTrace(); } } }
步骤二: 定义 Server的内部类 ClientHandler
为类 Server定义内部类ClientHandler,并向集合添加客户端的输出流。
在内部类的run方法中,通过该线程处理的客户端的Socket获取向该客户端发送信息的输出流,并将该输出流包装为PrintWriter后存入集合allOut中;当客户端断线,需要将输出流从共享集合中删除。代码如下所示:
private class ClientHandler implements Runnable { // 该线程用于处理的客户端 private Socket socket; public ClientHandler(Socket socket) { this.socket = socket; } @Override public void run() { PrintWriter pw = null; try { //将客户端的输出流存入共享集合,以便广播消息 OutputStream out = socket.getOutputStream(); OutputStreamWriter osw = new OutputStreamWriter(out,"UTF-8"); pw = new PrintWriter(osw,true); /* * 将用户信息存入共享集合 */ #cold_bold allOut.add(pw); } catch (Exception e) { e.printStackTrace(); } finally { /* * 当客户端断线,要将输出流从共享集合中删除 */ #cold_bold allOut.remove(pw); if (socket != null) { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
步骤三:修改 run 方法,将客户端发送的信息转发给所有客户端
继续为run方法添加代码:在获取到客户端发送过来的信息后,遍历集合allOut,将获取到的信息写入该集合中每一个输出流中,从而将该信息发送给所有客户端。代码如下所示:
private class ClientHandler implements Runnable { // 该线程用于处理的客户端 private Socket socket; public ClientHandler(Socket socket) { this.socket = socket; } @Override public void run() { PrintWriter pw = null; try { //将客户端的输出流存入共享集合,以便广播消息 OutputStream out = socket.getOutputStream(); OutputStreamWriter osw = new OutputStreamWriter(out,"UTF-8"); pw = new PrintWriter(osw,true); /* * 将用户信息存入共享集合 */ allOut.add(pw); #cold_bold InputStream in = socket.getInputStream(); #cold_bold InputStreamReader isr = new InputStreamReader(in, "UTF-8"); #cold_bold BufferedReader br = new BufferedReader(isr); #cold_bold #cold_bold String message = null; #cold_bold // 循环读取客户端发送的信息 #cold_bold while ((message = br.readLine())!=null) { #cold_bold /* #cold_bold * 遍历所有输出流,将该客户端发送的信息转发给所有客户端 #cold_bold */ #cold_bold for(PrintWriter o : allOut){ #cold_bold o.println(message); #cold_bold } #cold_bold } } catch (Exception e) { e.printStackTrace(); } finally { /* * 当客户端断线,要将输出流从共享集合中删除 */ allOut.remove(pw); if (socket != null) { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
步骤四:定义Server类的start和main方法
为Server类定义 start 方法和main方法,这两个方法的代码和之前的案例相同,不再赘述。
步骤五:定义客户端线程要执行的任务
在Client类中定义成员内部类ServerHander。该内部类需要实现Runnable接口并实现该接口的run方法。在该方法中实现线程要执行的任务,在此,线程要执行的任务为循环接收服务端的消息并打印到控制台。代码如下所示:
private class ServerHander implements Runnable{ @Override public void run() { try { InputStream in = socket.getInputStream(); InputStreamReader isr = new InputStreamReader(in, "UTF-8"); BufferedReader br = new BufferedReader(isr); while(true){ System.out.println(br.readLine()); } } catch (Exception e) { e.printStackTrace(); } } }
步骤六:修改 Client 类的 start方法,创建并启动线程
修改 Client 类中的start方法,在该方法中,创建并启动线程。代码如下所示:
public void start(){ try { //将接收服务端信息的线程启动 ServerHander handler = new ServerHander(); Thread t = new Thread(handler); t.setDaemon(true); t.start(); OutputStream out = socket.getOutputStream(); OutputStreamWriter osw = new OutputStreamWriter(out,"UTF-8"); PrintWriter pw = new PrintWriter(osw,true); //创建Scanner读取用户输入内容 Scanner scanner = new Scanner(System.in); while(true){ pw.println(scanner.nextLine()); } } catch (Exception e) { e.printStackTrace(); } finally{ if(socket != null){ try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } }
本案例中,类Server的完整代码如下所示:
package com.tarena.part4; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.net.ServerSocket; import java.net.Socket; import java.util.ArrayList; import java.util.List; /** * 服务端应用程序 */ public class Server { // 服务端Socket private ServerSocket serverSocket; // 所有客户端输出流 private List<PrintWriter> allOut; /** * 构造方法,用于初始化 */ public Server() { try { serverSocket = new ServerSocket(8088); allOut = new ArrayList<PrintWriter>(); } catch (Exception e) { e.printStackTrace(); } } /** * 服务端开启方法 */ public void start() { try { //循环监听客户端的连接 while(true){ System.out.println("等待客户端连接..."); // 监听客户端的连接 Socket socket = serverSocket.accept(); System.out.println("客户端已连接!"); //启动一个线程来完成针对该客户端的交互 ClientHandler handler = new ClientHandler(socket); new Thread(handler).start(); } } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { Server server = new Server(); server.start(); } /** * 线程体,用于并发处理不同客户端的交互 */ private class ClientHandler implements Runnable { // 该线程用于处理的客户端 private Socket socket; public ClientHandler(Socket socket) { this.socket = socket; } @Override public void run() { PrintWriter pw = null; try { //将客户端的输出流存入共享集合,以便广播消息 OutputStream out = socket.getOutputStream(); OutputStreamWriter osw = new OutputStreamWriter(out,"UTF-8"); pw = new PrintWriter(osw,true); /* * 将用户信息存入共享集合 */ allOut.add(pw); InputStream in = socket.getInputStream(); InputStreamReader isr = new InputStreamReader(in, "UTF-8"); BufferedReader br = new BufferedReader(isr); String message = null; // 循环读取客户端发送的信息 while ((message = br.readLine())!=null) { /* * 遍历所有输出流,将该客户端发送的信息转发给所有客户端 */ for(PrintWriter o : allOut){ o.println(message); } } } catch (Exception e) { e.printStackTrace(); } finally { /* * 当客户端断线,要将输出流从共享集合中删除 */ allOut.remove(pw); if (socket != null) { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } } }
本案例中,类Client的完整代码如下所示:
package com.tarena.part4; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.net.Socket; import java.util.Scanner; /** * 客户端应用程序 * 第四步:实现服务端可以将信息广播 */ public class Client { //客户端Socket private Socket socket; /** * 构造方法,用于初始化 */ public Client(){ try { socket = new Socket("localhost",8088); } catch (Exception e) { e.printStackTrace(); } } /** * 客户端工作方法 */ public void start(){ try { //将接收服务端信息的线程启动 ServerHander handler = new ServerHander(); Thread t = new Thread(handler); t.setDaemon(true); t.start(); OutputStream out = socket.getOutputStream(); OutputStreamWriter osw = new OutputStreamWriter(out,"UTF-8"); PrintWriter pw = new PrintWriter(osw,true); //创建Scanner读取用户输入内容 Scanner scanner = new Scanner(System.in); while(true){ pw.println(scanner.nextLine()); } } catch (Exception e) { e.printStackTrace(); } finally{ if(socket != null){ try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } public static void main(String[] args) { Client client = new Client(); client.start(); } /** * 该线程用于接收服务端发送过来的信息 */ private class ServerHander implements Runnable{ @Override public void run() { try { InputStream in = socket.getInputStream(); InputStreamReader isr = new InputStreamReader(in, "UTF-8"); BufferedReader br = new BufferedReader(isr); while(true){ System.out.println(br.readLine()); } } catch (Exception e) { e.printStackTrace(); } } } }
现有的聊天室功能虽然已经完成,但是由于客户端的频繁连接与断开,会使得服务端频繁的创建及销毁线程。随着客户端的增加,服务端的线程也在增加,这无疑会对服务端的资源造成浪费,并且由于过多的线程导致的过度切换也会为服务端带来崩溃的风险。与此同时,多个线程会共享服务端的集合属性allOut,这里还存在着多线程并发的安全问题。
为此,需要重构聊天室案例,使用线程池技术来解决服务端多线程问题,并解决多线程并发的安全问题。
之前的聊天室案例中,已经实现了聊天室的基本功能,现在需要对程序进行优化,使程序更加健壮。因此,需要使用线程池来控制客户端连接后启动和管理线程,并解决由于多线程共享Server属性allOut所引起的并发安全问题。
为实现此功能,首先需要在服务器端定义一个线程池类型的属性,用于管理服务端的线程创建及管理。代码如下所示:
public class Server { // 服务端Socket private ServerSocket serverSocket; // 所有客户端输出流 private List<PrintWriter> allOut; // 线程池 private ExecutorService threadPool; /** * 构造方法,用于初始化 */ public Server() { try { serverSocket = new ServerSocket(8088); allOut = new ArrayList<PrintWriter>(); threadPool = Executors.newFixedThreadPool(40); } catch (Exception e) { e.printStackTrace(); } } … }
其次我们修改Server的start方法,将原来创建并启动线程的代码替换为使用线程池管理的方式。代码如下所示:
/** * 服务端开启方法 */ public void start() { try { //循环监听客户端的连接 while(true){ System.out.println("等待客户端连接..."); // 监听客户端的连接 Socket socket = serverSocket.accept(); System.out.println("客户端已连接!"); //启动一个线程来完成针对该客户端的交互 ClientHandler handler = new ClientHandler(socket); threadPool.execute(handler); } } catch (Exception e) { e.printStackTrace(); } }
然后在Server中添加三个方法,用于操作属性allOut,并使用同步锁,使三个方法变为同步的。代码如下所示:
/** * 将输出流存入共享集合,与下面两个方法互斥,保证同步安全 * @param out */ private synchronized void addOut(PrintWriter out){ allOut.add(out); } /** * 将给定输出流从共享集合删除 * @param out */ private synchronized void removeOut(PrintWriter out){ allOut.remove(out); } /** * 将消息转发给所有客户端 * @param message */ private synchronized void sendMessage(String message){ for(PrintWriter o : allOut){ o.println(message); } }
最后,将原来操作向集合中添加,删除元素。遍历集合并将信息写入每一个输出流的操作改造为调用三个方法,以确保同步安全。代码如下所示:
/* * 将用户信息存入共享集合 * 需要同步 */ addOut(pw); … // 循环读取客户端发送的信息 while ((message = br.readLine())!=null) { /* * 遍历所有输出流,将该客户端发送的信息转发给所有客户端 * 需要同步 */ sendMessage(message); } … /* * 当客户端断线,要将输出流从共享集合中删除 * 需要同步 */ removeOut(pw); …
步骤一:定义 Server类
定义Server类,并在Server类中添加ExecutorService类型的属性threadPool,并在构造方法中将其初始化。初始化时,使用固定大小的线程池,线程数量为40。这里使用Executors类的newFixedThreadPool(int threads)方法来创建固定大小的线程池。代码如下所示:
public class Server { // 服务端Socket private ServerSocket serverSocket; // 所有客户端输出流 private List<PrintWriter> allOut; // 线程池 private ExecutorService threadPool; /** * 构造方法,用于初始化 */ public Server() { try { serverSocket = new ServerSocket(8088); allOut = new ArrayList<PrintWriter>(); threadPool = Executors.newFixedThreadPool(40); } catch (Exception e) { e.printStackTrace(); } } … }
步骤二:为 Server 类创建 start 方法
为 Server 类创建 start 方法。在该方法中,将原代码中创建并启动线程的方式,改为使用线程池来完成。创建内部类实例后,使用ExecutorService的execute(Runnable runn)方法来启动线程。代码如下所示:
/** * 服务端开启方法 */ public void start() { try { //循环监听客户端的连接 while(true){ System.out.println("等待客户端连接..."); // 监听客户端的连接 Socket socket = serverSocket.accept(); System.out.println("客户端已连接!"); //启动一个线程来完成针对该客户端的交互 ClientHandler handler = new ClientHandler(socket); threadPool.execute(handler); } } catch (Exception e) { e.printStackTrace(); } }
步骤三:为 Server 类定义 addOut 方法
定义 addOut 方法,该方法向Server的属性allOut集合中添加输出流,并使用synchronized关键字修饰,使该方法变为同步方法。代码如下所示:
/** * 将输出流存入共享集合,与下面两个方法互斥,保证同步安全 * @param out */ private synchronized void addOut(PrintWriter out){ allOut.add(out); }
步骤四:为 Server 类定义 removeOut 方法
定义removeOut方法,该方法从Server的属性allOut集合中删除输出流,并使用synchronized关键字修饰,使该方法变为同步方法。代码如下所示:
/** * 将给定输出流从共享集合删除 * @param out */ private synchronized void removeOut(PrintWriter out){ allOut.remove(out); }
步骤五:为 Server 类定义 sendMessage方法
定义sendMessage方法,该方法用于遍历Server的属性allOut集合元素,将信息写入每一个输出流,并使用synchronized关键字修饰,使该方法变为同步方法。代码如下所示:
/** * 将消息转发给所有客户端 * @param message */ private synchronized void sendMessage(String message){ for(PrintWriter o : allOut){ o.println(message); } }
步骤六:创建内部类
创建 Server的内部类 ClientHandler,在内部类中定义run方法。在run方法中,调用前面步骤中所创建的 addOut方法。代码如下所示:
/** * 线程体,用于并发处理不同客户端的交互 */ private class ClientHandler implements Runnable { // 该线程用于处理的客户端 private Socket socket; public ClientHandler(Socket socket) { this.socket = socket; } @Override public void run() { PrintWriter pw = null; try { //将客户端的输出流存入共享集合,以便广播消息 OutputStream out = socket.getOutputStream(); OutputStreamWriter osw = new OutputStreamWriter(out,"UTF-8"); pw = new PrintWriter(osw,true); /* * 将用户信息存入共享集合 * 需要同步 */ addOut(pw); } } }
步骤七:继续为run方法添加代码
继续为run方法添加代码,调用sendMessage方法,实现将信息转发给所有客户端。代码如下所示:
/** * 线程体,用于并发处理不同客户端的交互 */ private class ClientHandler implements Runnable { // 该线程用于处理的客户端 private Socket socket; public ClientHandler(Socket socket) { this.socket = socket; } @Override public void run() { PrintWriter pw = null; try { //将客户端的输出流存入共享集合,以便广播消息 OutputStream out = socket.getOutputStream(); OutputStreamWriter osw = new OutputStreamWriter(out,"UTF-8"); pw = new PrintWriter(osw,true); /* * 将用户信息存入共享集合 * 需要同步 */ addOut(pw); #cold_bold InputStream in = socket.getInputStream(); #cold_bold InputStreamReader isr = new InputStreamReader(in, "UTF-8"); #cold_bold BufferedReader br = new BufferedReader(isr); #cold_bold #cold_bold String message = null; #cold_bold // 循环读取客户端发送的信息 #cold_bold while ((message = br.readLine())!=null) { #cold_bold /* #cold_bold * 遍历所有输出流,将该客户端发送的信息转发给所有客户端 #cold_bold * 需要同步 #cold_bold */ #cold_bold sendMessage(message); #cold_bold } #cold_bold } catch (Exception e) { #cold_bold e.printStackTrace(); #cold_bold } } }
步骤八:删除输出流
继续为run方法添加代码,调用removeOut方法从集合allOut中删除输出流。代码如下所示:
/** * 线程体,用于并发处理不同客户端的交互 */ private class ClientHandler implements Runnable { // 该线程用于处理的客户端 private Socket socket; public ClientHandler(Socket socket) { this.socket = socket; } @Override public void run() { PrintWriter pw = null; try { //将客户端的输出流存入共享集合,以便广播消息 OutputStream out = socket.getOutputStream(); OutputStreamWriter osw = new OutputStreamWriter(out,"UTF-8"); pw = new PrintWriter(osw,true); /* * 将用户信息存入共享集合 * 需要同步 */ addOut(pw); InputStream in = socket.getInputStream(); InputStreamReader isr = new InputStreamReader(in, "UTF-8"); BufferedReader br = new BufferedReader(isr); String message = null; // 循环读取客户端发送的信息 while ((message = br.readLine())!=null) { /* * 遍历所有输出流,将该客户端发送的信息转发给所有客户端 * 需要同步 */ sendMessage(message); } } catch (Exception e) { e.printStackTrace(); #cold_bold } finally { #cold_bold /* #cold_bold * 当客户端断线,要将输出流从共享集合中删除 #cold_bold * 需要同步 #cold_bold */ #cold_bold removeOut(pw); #cold_bold #cold_bold if (socket != null) { #cold_bold try { #cold_bold socket.close(); #cold_bold } catch (IOException e) { #cold_bold e.printStackTrace(); #cold_bold } #cold_bold } #cold_bold } } }
本案例中,类Server的完整代码如下所示:
package com.tarena.part5; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.net.ServerSocket; import java.net.Socket; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 服务端应用程序 */ public class Server { // 服务端Socket private ServerSocket serverSocket; // 所有客户端输出流 private List<PrintWriter> allOut; // 线程池 private ExecutorService threadPool; /** * 构造方法,用于初始化 */ public Server() { try { serverSocket = new ServerSocket(8088); allOut = new ArrayList<PrintWriter>(); threadPool = Executors.newFixedThreadPool(40); } catch (Exception e) { e.printStackTrace(); } } /** * 服务端开启方法 */ public void start() { try { //循环监听客户端的连接 while(true){ System.out.println("等待客户端连接..."); // 监听客户端的连接 Socket socket = serverSocket.accept(); System.out.println("客户端已连接!"); //启动一个线程来完成针对该客户端的交互 ClientHandler handler = new ClientHandler(socket); threadPool.execute(handler); } } catch (Exception e) { e.printStackTrace(); } } /** * 将输出流存入共享集合,与下面两个方法互斥,保证同步安全 * @param out */ private synchronized void addOut(PrintWriter out){ allOut.add(out); } /** * 将给定输出流从共享集合删除 * @param out */ private synchronized void removeOut(PrintWriter out){ allOut.remove(out); } /** * 将消息转发给所有客户端 * @param message */ private synchronized void sendMessage(String message){ for(PrintWriter o : allOut){ o.println(message); } } public static void main(String[] args) { Server server = new Server(); server.start(); } /** * 线程体,用于并发处理不同客户端的交互 */ private class ClientHandler implements Runnable { // 该线程用于处理的客户端 private Socket socket; public ClientHandler(Socket socket) { this.socket = socket; } @Override public void run() { PrintWriter pw = null; try { //将客户端的输出流存入共享集合,以便广播消息 OutputStream out = socket.getOutputStream(); OutputStreamWriter osw = new OutputStreamWriter(out,"UTF-8"); pw = new PrintWriter(osw,true); /* * 将用户信息存入共享集合 * 需要同步 */ addOut(pw); InputStream in = socket.getInputStream(); InputStreamReader isr = new InputStreamReader(in, "UTF-8"); BufferedReader br = new BufferedReader(isr); String message = null; // 循环读取客户端发送的信息 while ((message = br.readLine())!=null) { /* * 遍历所有输出流,将该客户端发送的信息转发给所有客户端 * 需要同步 */ sendMessage(message); } } catch (Exception e) { e.printStackTrace(); } finally { /* * 当客户端断线,要将输出流从共享集合中删除 * 需要同步 */ removeOut(pw); if (socket != null) { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } } }
本案例中,类Client没有修改,参照之前代码即可。
使用Java的DatagramSocket实现客户端和服务器端通信。通信过程如表-3所示:
表-3客户端与服务器端通信过程
实现此案例需要按照如下步骤进行。
步骤一:创建客户端类和服务器端类
首先,创建类Client表示客户端;然后,创建类Server表示服务器端,Client类代码如下所示:
public class Client { public static void main(String[] args) { } }
Server类代码如下所示:
public class Server{ public static void main(String[] args) { } }
步骤二:客户端向服务器发送数据,服务器端接收该数据
要实现客户端向服务器发送数据“Hello! I'm Client”,详细过程如下:
1)创建Socket实例。在Client类中新建start方法,在该方法中,创建用于接收和发送UDP的DatagramSocket类的实例。客户端使用无参数的构造方法构造DatagramSocket类的实例即可。
2)构建发送包。使用如下构造方法来构造数据包DatagramPacket类的对象:
DatagramPacket(byte[] buf, int length, InetAddress clientAddress, int clientPort)
上述构造方法表示从buf数组中,取出length长度的数据创建数据包对象,该数据包对象的目标地址是clientAddress、端口是clientPort。本案例的目标IP地址是127.0.0.1,即本机、目标端口是8088。
3)发送数据。使用DatagramSocket类提供的send方法,向目标地址和端口发送报文。
Client类代码如下所示:
import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; public class Client { private void start() { try { #cold_bold DatagramSocket client = new DatagramSocket(); #cold_bold String sendStr = "Hello! I'm Client"; #cold_bold byte[] sendBuf; #cold_bold sendBuf = sendStr.getBytes(); #cold_bold InetAddress addr = InetAddress.getByName("127.0.0.1"); #cold_bold int port = 8088; #cold_bold DatagramPacket sendPacket = new DatagramPacket(sendBuf, #cold_bold sendBuf.length, addr, port); #cold_bold #cold_bold client.send(sendPacket); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { } }
要实现服务器接收客户端发送的数据,详细过程如下:
1)创建Socket实例。在Server类中新建start方法,在该方法中,首先创建用于接收和发送UDP的DatagramSocket类的实例。服务器端使用构造方法“DatagramSocket(int port)”来构造DatagramSocket类的实例。该构造方法可以固定监听port端口的报文。
2)构建接收包。使用如下构造方法来构造数据包DatagramPacket类的对象:
DatagramPacket(byte[] buf, int length)
上述构造方法表示将数据包中length长度的数据装进buf数组。
3)接收数据。使用DatagramSocket类的receive方法接收数据报文。
Server类代码如下所示:
import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; public class Server { public void start() { try { #cold_bold DatagramSocket server = new DatagramSocket(8088); #cold_bold byte[] recvBuf = new byte[100]; #cold_bold DatagramPacket recvPacket = new DatagramPacket(recvBuf, recvBuf.length); #cold_bold server.receive(recvPacket); #cold_bold String recvStr = new String(recvPacket.getData(), 0, #cold_bold recvPacket.getLength()); #cold_bold System.out.println("客户端说:" + recvStr); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { } }
步骤三:服务器向客户端发送数据,客户端接收数据
要实现服务器向客户端发送数据“Hello ! I'm Server”,详细过程如下:
1)构建发送包。从上一步客户端发送的数据包中获取客户端的地址和端口号,使用如下构造方法来构造数据包DatagramPacket类的对象:
DatagramPacket(byte[] buf, int length, InetAddress clientAddress, int clientPort)
上述构造方法表示从buf数组中,取出length长度的数据创建数据包对象,该数据包对象的目标地址是clientAddress、端口是clientPort。
2)发送数据。使用DatagramSocket类提供的send方法,向目标地址和端口发送报文。
Server类代码如下所示:
import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; public class Server { public void start() { try { DatagramSocket server = new DatagramSocket(8088); byte[] recvBuf = new byte[100]; DatagramPacket recvPacket = new DatagramPacket(recvBuf, recvBuf.length); server.receive(recvPacket); String recvStr = new String(recvPacket.getData(), 0, recvPacket.getLength()); System.out.println("客户端说:" + recvStr); #cold_bold int port = recvPacket.getPort(); #cold_bold InetAddress addr = recvPacket.getAddress(); #cold_bold String sendStr = "Hello ! I'm Server"; #cold_bold byte[] sendBuf; #cold_bold sendBuf = sendStr.getBytes(); #cold_bold DatagramPacket sendPacket = new DatagramPacket(sendBuf, #cold_bold sendBuf.length, addr, port); #cold_bold server.send(sendPacket); #cold_bold server.close(); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { Server server = new Server(); server.start(); } }
客户端接收数据的过程和服务器端接收数据的过程类似,在这里不再赘述。
Client类代码如下所示:
import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; public class Client { private void start() { try { DatagramSocket client = new DatagramSocket(); String sendStr = "Hello! I'm Client"; byte[] sendBuf; sendBuf = sendStr.getBytes(); InetAddress addr = InetAddress.getByName("127.0.0.1"); int port = 8088; DatagramPacket sendPacket = new DatagramPacket(sendBuf, sendBuf.length, addr, port); client.send(sendPacket); #cold_bold #cold_bold byte[] recvBuf = new byte[100]; #cold_bold DatagramPacket recvPacket = new DatagramPacket(recvBuf, recvBuf.length); #cold_bold client.receive(recvPacket); #cold_bold String recvStr = new String(recvPacket.getData(), 0, #cold_bold recvPacket.getLength()); #cold_bold System.out.println("服务端说:" + recvStr); #cold_bold #cold_bold client.close(); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { Client client = new Client(); client.start(); } }
步骤四:添加客户端和服务器端启动的代码
在Client类的main方法中调用该类的start方法、在Server类的main方法中调用该类start方法。
Client类代码如下所示:
import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; public class Client { private void start() { try { DatagramSocket client = new DatagramSocket(); String sendStr = "Hello! I'm Client"; byte[] sendBuf; sendBuf = sendStr.getBytes(); InetAddress addr = InetAddress.getByName("127.0.0.1"); int port = 8088; DatagramPacket sendPacket = new DatagramPacket(sendBuf, sendBuf.length, addr, port); client.send(sendPacket); byte[] recvBuf = new byte[100]; DatagramPacket recvPacket = new DatagramPacket(recvBuf, recvBuf.length); client.receive(recvPacket); String recvStr = new String(recvPacket.getData(), 0, recvPacket.getLength()); System.out.println("服务端说:" + recvStr); client.close(); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { #cold_bold Client client = new Client(); #cold_bold client.start(); } }
Server类代码如下所示:
import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; public class Server { public void start() { try { DatagramSocket server = new DatagramSocket(8088); byte[] recvBuf = new byte[100]; DatagramPacket recvPacket = new DatagramPacket(recvBuf, recvBuf.length); server.receive(recvPacket); String recvStr = new String(recvPacket.getData(), 0, recvPacket.getLength()); System.out.println("客户端说:" + recvStr); int port = recvPacket.getPort(); InetAddress addr = recvPacket.getAddress(); String sendStr = "Hello ! I'm Server"; byte[] sendBuf; sendBuf = sendStr.getBytes(); DatagramPacket sendPacket = new DatagramPacket(sendBuf, sendBuf.length, addr, port); server.send(sendPacket); server.close(); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { #cold_bold Server server = new Server(); #cold_bold server.start(); } }
步骤五:测试
测试客户端和服务器端是否通信成功。首先,启动服务器端;然后再启动客户端。
客户端控制台运行输出如下:
服务端说:Hello ! I'm Server
服务器端控制台运行输出如下:
客户端说:Hello! I'm Client
本案例中,Client类的完整代码如下所示:
import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; public class Client { private void start() { try { DatagramSocket client = new DatagramSocket(); String sendStr = "Hello! I'm Client"; byte[] sendBuf; sendBuf = sendStr.getBytes(); InetAddress addr = InetAddress.getByName("127.0.0.1"); int port = 8088; DatagramPacket sendPacket = new DatagramPacket(sendBuf, sendBuf.length, addr, port); client.send(sendPacket); byte[] recvBuf = new byte[100]; DatagramPacket recvPacket = new DatagramPacket(recvBuf, recvBuf.length); client.receive(recvPacket); String recvStr = new String(recvPacket.getData(), 0, recvPacket.getLength()); System.out.println("服务端说:" + recvStr); client.close(); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { #cold_bold Client client = new Client(); #cold_bold client.start(); } }
Server类的完整代码如下所示:
import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; public class Server { public void start() { try { DatagramSocket server = new DatagramSocket(8088); byte[] recvBuf = new byte[100]; DatagramPacket recvPacket = new DatagramPacket(recvBuf, recvBuf.length); server.receive(recvPacket); String recvStr = new String(recvPacket.getData(), 0, recvPacket.getLength()); System.out.println("客户端说:" + recvStr); int port = recvPacket.getPort(); InetAddress addr = recvPacket.getAddress(); String sendStr = "Hello ! I'm Server"; byte[] sendBuf; sendBuf = sendStr.getBytes(); DatagramPacket sendPacket = new DatagramPacket(sendBuf, sendBuf.length, addr, port); server.send(sendPacket); server.close(); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { Server server = new Server(); server.start(); } }