java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java BIO聊天程序

Java BIO实现聊天程序

作者:java硕哥

这篇文章主要为大家详细介绍了Java BIO实现聊天程序,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

本文实例为大家分享了Java BIO实现聊天程序的具体代码,供大家参考,具体内容如下

我们使用一个聊天程序来说本文的主题

1、BIO 客户端服务器通讯

public class ChatServer {
    public static void main(String[] args) throws IOException {
        ServerSocket serverSocket = new ServerSocket(9000);
        while (true) {
            try {
                System.out.println("聊天服务已启动,等待客户连接....");
                Socket socket = serverSocket.accept();
                System.out.printf("建立了与%s的连接!\n",socket.getRemoteSocketAddress());
                loopReadRequest(socket);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public static String loopReadRequest(Socket socket) throws IOException {
        InputStreamReader reader = new InputStreamReader(socket.getInputStream());
        StringBuilder sb = new StringBuilder();
        char[] cbuf = new char[256];
        
        // 循环读取socket的输入数据流
        while (true) {
            // read方法,读出内容写入 char 数组,read 方法会一直阻塞
            // 直到有输入内容 或 发生I/O错误 或 输入流结束(对方关闭了socket)
            // 正常读取时方法会返回读取的字符数,当输入流结束时(对方关闭了socket)方法返回 -1
            int readed = reader.read(cbuf);
   SocketAddress remoteSocketAddress = socket.getRemoteSocketAddress();
   // 客户端执行了socket.close()
            if (readed == -1) {
                System.out.println(remoteSocketAddress + " 断开了连接!");
                reader.close();
                socket.close();
                break;
            }

            String readedStr = new String(cbuf, 0, readed);
            sb.append(readedStr);

      // ready()用来判断流是否可被读取,如果reader缓冲区不是空则返回true,否则返回false
            if (!reader.ready()) {//reader缓冲区为空,表示数据流已读完
                // 数据流已读完,此时向客户端发送响应
                socket.getOutputStream().write((remoteSocketAddress+"你好,"+sb+"已收到").getBytes());
                System.out.println("收到内容:"+sb);
                // 清除sb的内容,准备接收下一个请求内容
                sb.setLength(0);
                System.out.println("等待客户端消息....");
            }
        }
        return sb.toString();
    }
}

public class ChatClient {
    public static void main(String[] args) {
        try {
            Socket socket = new Socket("localhost", 9000);
            Scanner scanner = new Scanner(System.in);
            while (true) {
                System.out.print(">");
                String line = scanner.nextLine();
                if("".equals(line)){
                    continue;
                }
                if ("quit".equals(line)) {
                    scanner.close();
                    socket.close();
                    break;
                }
                socket.getOutputStream().write(line.getBytes());
                System.out.println(readRequest(socket));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static String readRequest(Socket socket) throws IOException {
        InputStreamReader reader = new InputStreamReader(socket.getInputStream());
        StringBuilder sb = new StringBuilder();
        char[] cbuf = new char[256];
        while (true) {
            int readed = reader.read(cbuf);
            // 读出内容写入 char 数组,read 方法会一直阻塞
            // 直到有输入内容 或 发生I/O错误 或 输入流结束(对方关闭了socket)
            // 正常读取,方法会返回读取的字符数,而当输入流结束(对方关闭了socket)则返回 -1
            if (readed == -1) {
                System.out.println(socket.getRemoteSocketAddress() + " 断开了连接!");
                reader.close();
                socket.close();
                break;
            }

            String readedStr = new String(cbuf, 0, readed);
            sb.append(readedStr);
            if(!reader.ready()){
                break;
            }
        }
        return sb.toString();
    }
}

ChatServer与ChatClient建立了长连接,且ChatServer阻塞等待ChatClient发送消息过来,程序中 Server端只能与一个Client建立连接。程序这么写,只能实现一个客户端和服务端进行通信。

如何支持多个Client的连接呢? 使用独立的线程去读取socket

2、多线程实现单聊,群聊

单聊发送 格式:-c 对方端口号 消息内容, 群聊直接发送信息就可以了,具体发送逻辑看下面的程序

public class ChatServer {
    private static Map<String, Socket> connnectedSockets = new ConcurrentHashMap<>();

    public static void main(String[] args) throws IOException {

        // 1、服务端初始化工作
        ServerSocket serverSocket = new ServerSocket(9000);
        ExecutorService executorService = getExecutorService();

        // 2、主线程- 循环阻塞接收新的连接请求
        while (true) {
            Socket socket = serverSocket.accept();
            cacheSocket(socket);

            // 3、一个socket对应一个读取任务,交给线程池中的线程执行
            // 如果使用fixed线程池,会操作读取任务分配不到线程的情况
            // 现象就是发送的消息别人收不到(暂存在Socket缓存中)
            executorService.submit(createLoopReadTask(socket));
        }
    }

    private static Runnable createLoopReadTask(Socket socket) {
        return new Runnable() {
            public void run() {
                try {
                    loopReadRequestAndRedirect(socket);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        };
    }

    private static ExecutorService getExecutorService() {
        ExecutorService executorService = Executors.newCachedThreadPool();
        int nThreads = Runtime.getRuntime().availableProcessors();
        nThreads = 1;
        // 如果只设置一个线程,那么最先连接进来的客户端可以发送消息
        // 因为程序阻塞读取第一个socket连接的数据流,没有其他线程资源去读后面建立的socket了
        executorService = Executors.newFixedThreadPool(nThreads);
        return executorService;
    }

    private static void cacheSocket(Socket socket) {
        SocketAddress remoteSocketAddress = socket.getRemoteSocketAddress();
        String[] split = remoteSocketAddress.toString().split(":");
        connnectedSockets.put(split[1], socket);
    }

    public static String loopReadRequestAndRedirect(Socket socket) throws IOException {
        InputStreamReader reader = new InputStreamReader(socket.getInputStream());
        StringBuilder sb = new StringBuilder();
        char[] cbuf = new char[256];
        while (true) {
            SocketAddress remoteSocketAddress = socket.getRemoteSocketAddress();
            System.out.println(Thread.currentThread() + "执行 " + remoteSocketAddress + "发送的消息");
            // 读出内容写入 char 数组,read 方法会一直阻塞
            // 直到有输入内容 或 发生I/O错误 或 输入流结束(对方关闭了socket)
            // 正常读取时方法会返回读取的字符数,当输入流结束(对方关闭了socket)时返回 -1
            int readed = reader.read(cbuf);

            if (readed == -1) {
                System.out.println(remoteSocketAddress + " 断开了连接!");
                reader.close();
                socket.close();
                break;
            }

            String readedStr = new String(cbuf, 0, readed);
            sb.append(readedStr);

            //ready()用来判断流是否可被读取,如果reader缓冲区不是空则返回true,否则返回false
            boolean oneReqeustStreamReaded = !reader.ready();
            if (oneReqeustStreamReaded) {
                String requestContent = sb.toString().trim();
                String prifix = requestContent.substring(0, 2);
                // 单聊
                if ("-c".equals(prifix)) {
                    requestContent = requestContent.substring(3);
                    String port = requestContent.substring(0, requestContent.indexOf(" "));
                    requestContent = requestContent.replaceFirst(port, "");
                    sendToOneSocket(connnectedSockets.get(port), requestContent);
                    // 群聊
                } else {
                    // 向客户端发送响应
                    socket.getOutputStream().write(("您发送的消息-'" + sb + "' 已收到").getBytes());
                    sendToAllSocket(sb.toString(), socket);
                }
                sb.setLength(0);
            }
        }
        return sb.toString();
    }

    /**
     * 发送消息给某个socket
     *
     * @param socket
     * @param msg
     */
    private static void sendToOneSocket(Socket socket, String msg) {
        // 对于同一个socket,同一时刻只有一个线程使用它发送消息
        synchronized (socket) {
            try {
                socket.getOutputStream().write(msg.getBytes("UTF-8"));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 发送消息给所有的socket
     *
     * @param msg
     */
    private static void sendToAllSocket(String msg, Socket selfSocket) {
        for (String key : connnectedSockets.keySet()) {
            Socket socket = connnectedSockets.get(key);
            if (socket.equals(selfSocket)) {
                continue;
            }
            sendToOneSocket(socket, msg);
        }
    }
}


public class ChatClient {
    public static void main(String[] args) throws IOException {
        new ChatClient().start();
    }

    public void start() throws IOException {
        Socket socket = new Socket("localhost", 9000);
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        
        Runnable readTask = new Runnable() {
            public void run() {
                try {
                    loopReadRequest(socket);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        };
        executorService.submit(readTask);

        Runnable sendMsgTask = new Runnable() {
            public void run() {
                try {
                    Scanner scanner = new Scanner(System.in);
                    while (true) {
                        System.out.print(">");
                        String line = scanner.nextLine();
                        if ("".equals(line)) {
                            continue;
                        }
                        if ("quit".equals(line)) {
                            scanner.close();
                            socket.close();
                            break;
                        }
                        socket.getOutputStream().write(line.getBytes());
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        };
        executorService.submit(sendMsgTask);

    }

    public void loopReadRequest(Socket socket) throws IOException {
        InputStreamReader reader = new InputStreamReader(socket.getInputStream());
        StringBuilder sb = new StringBuilder();
        char[] cbuf = new char[256];
        while (true) {
            int readed = reader.read(cbuf);
            // 读出内容写入 char 数组,read 方法会一直阻塞
            // 直到有输入内容 或 发生I/O错误 或 输入流结束(对方关闭了socket)
            // 正常读取,方法会返回读取的字符数,而当输入流结束(对方关闭了socket)则返回 -1
            if (readed == -1) {
                System.out.println(socket.getRemoteSocketAddress() + " 断开了连接!");
                reader.close();
                socket.close();
                break;
            }

            String readedStr = new String(cbuf, 0, readed);
            sb.append(readedStr);
            if (!reader.ready()) {
                System.out.println(sb);
                sb.setLength(0);
            }
        }
    }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文