java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java 线程池+分布式

Java 线程池+分布式实现代码

作者:啊退

在 Java 开发中,"池" 通过预先创建并管理一定数量的资源,避免频繁创建和销毁资源带来的性能开销,从而提高系统效率,这篇文章主要介绍了Java 线程池+分布式实现代码,需要的朋友可以参考下

1. 线程池

在 Java 开发中,"池" 通过预先创建并管理一定数量的资源,避免频繁创建和销毁资源带来的性能开销,从而提高系统效率。常见的池包括:

核心思想:用一个集合来维护固定大小或可配置的资源,当需要使用资源时从池中获取,使用完毕后归还给池,而不是直接销毁。

1.1 自定义线程池实现

1.1.1 线程池核心

PoolInit:线程池初始化和任务管理

ThreadPool:线程池核心类,管理工作线程

WorkThread:工作线程类,执行具体任务

1.1.2 代码示例

1.1.2.1 PoolInit
package com.lj.demo5;
import java.util.LinkedList;
import java.util.Scanner;
/**
 * 多线程与集合综合应用示例
 * 演示线程池的初始化和任务提交过程
 * 
 * 应用场景:
 * 如QQ聊天服务器,面对大量用户连接时,
 * 使用线程池管理线程资源比为每个用户创建新线程更高效
 */
public class PoolInit {
	// 存储待执行任务的链表,作为任务队列
	public static LinkedList<String> linkTaskName = new LinkedList<String>();
	// 线程池实例
	public static ThreadPool t;
	public PoolInit() {
		// 初始化线程池,指定核心线程数为3
		t = new ThreadPool(3);
		// 从控制台接收任务并提交给线程池
		Scanner scanner = new Scanner(System.in);
		String taskName = "";
		while (true) {
			System.out.println("请输入线程需要处理的任务名称:");
			taskName = scanner.next();
			// 将任务添加到任务队列
			linkTaskName.add(taskName);
			// 提交第一个任务给线程池执行
			t.executeTask(linkTaskName.get(0));
		}
	}
	public static void main(String[] args) {
		// 启动线程池
		new PoolInit();
	}
}

LinkTaskName:使用 LinkedList 作为任务队列,存储待执行的任务,构造方法中初始化线程池,并通过控制台接收用户输入的任务,每当有新任务输入,先添加到任务队列,再提交给线程池执行。

1.1.2.2 ThreadPool
package com.lj.demo5;
import java.util.Vector;
/**
 * 线程池核心类
 * 管理工作线程,负责任务的分配与调度
 */
public class ThreadPool {
	// 线程池大小
	private int num;
	// 存储工作线程的集合,使用Vector保证线程安全
	Vector<Runnable> vectors ;
	/**
	 * 构造方法,初始化线程池
	 * @param num 线程池中的线程数量
	 */
	public ThreadPool(int num) {
		this.num = num;
		// 初始化线程集合,容量为指定的线程数量
		this.vectors = new Vector<Runnable>(this.num);
		System.out.println("线程集合当前大小:" + this.vectors.size());
		System.out.println("线程集合容量:" + this.vectors.capacity());
		// 创建指定数量的工作线程并启动
		for(int i=0;i<this.vectors.capacity();i++) {
			WorkThread w = new WorkThread();
			this.vectors.add(w);
			w.start(); // 启动工作线程
		}
	}
	/**
	 * 执行任务方法
	 * 查找空闲线程并分配任务
	 * @param taskName 任务名称
	 */
	public void executeTask(String taskName) {
		// 遍历线程池中的线程,寻找空闲线程
		for(int i=0;i<this.vectors.capacity();i++) {
			WorkThread w = (WorkThread) this.vectors.get(i);
			System.out.println("线程池中的线程" + w.getName() + "的状态为:" + w.isFlag());
			// 如果线程处于空闲状态(flag为false)
			if(!w.isFlag()) {
				System.out.println(w.getName() + ",现在是空闲状态,分配新任务");
				// 设置线程为忙碌状态
				w.setFlag(true);
				// 分配任务
				w.setTaskName(taskName);
				// 从任务队列移除已分配的任务
				PoolInit.linkTaskName.remove(0);
				break; // 找到空闲线程后退出循环
			}
		}
	}
}

使用 Vector 存储工作线程,因为 Vector 是线程安全的集合,构造方法中创建并启动指定数量的工作线程

executeTask()方法将任务分配给空闲线程:

遍历所有工作线程,检查线程状态

找到空闲线程后,设置线程状态为忙碌,分配任务

从任务队列中移除已分配的任务

1.1.2.3 WorkThread
package com.lj.demo5;
/**
 * 工作线程类
 * 实际执行任务的线程
 */
public class WorkThread extends Thread {
	// 线程状态标志:false表示空闲,true表示忙碌
	private boolean flag;
	// 当前执行的任务名称
	private String taskName;
	/**
	 * 线程运行方法
	 * 循环处理任务:有任务则执行,无任务则等待
	 */
	@Override
	public synchronized void run() {
		while (true) {
			if (this.isFlag()) { // 有任务要执行
				System.out.println(Thread.currentThread().getName() + "正在执行任务:" + this.taskName);
				try {
					// 模拟任务执行时间(30秒)
					Thread.sleep(30 * 1000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				System.out.println(Thread.currentThread().getName() + "已完成任务:" + this.taskName + ",状态变为空闲");
				// 任务完成,设置为空闲状态
				this.setFlag(false);
			} else { // 无任务
				System.out.println("任务队列中等待的任务数量:" + PoolInit.linkTaskName.size());
				// 检查是否有未处理的任务
				if (PoolInit.linkTaskName.size() > 0) {
					String taskName = PoolInit.linkTaskName.get(0);
					PoolInit.t.executeTask(taskName);
					System.out.println(Thread.currentThread().getName() + 
							"发现未处理任务,尝试处理:" + taskName);
				} else {
					System.out.println(Thread.currentThread().getName() + ":当前没有任务,进入等待状态...");
					try {
						// 进入等待状态,释放锁
						this.wait();
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}
		}
	}
	// getter和setter方法
	public boolean isFlag() {
		return flag;
	}
	/**
	 * 设置线程状态
	 * 当设置为忙碌状态时,唤醒等待的线程
	 */
	public synchronized void setFlag(boolean flag) {
		this.flag = flag;
		if (this.flag) {
			// 有新任务,唤醒线程
			this.notify();
		}
	}
	public String getTaskName() {
		return taskName;
	}
	public void setTaskName(String taskName) {
		this.taskName = taskName;
	}
}

flag变量用于标识线程状态:false(空闲),true(忙碌)

run()方法是线程的核心,通过循环处理任务:

        任务完成后,将线程状态设为空闲

        如果有等待任务,通知线程池分配任务;如果没有,则进入等待状态

1.2 总结流程

  1. 初始化线程池,创建指定数量的工作线程并启动
  2. 工作线程启动后进入等待状态,等待接收任务
  3. 用户输入任务,添加到任务队列
  4. 线程池查找空闲线程,分配任务并唤醒线程
  5. 线程执行任务,完成后回到空闲状态
  6. 重复步骤 3-5,直到程序结束

2. Java分布式处理

分布式系统是由多个独立计算机组成的系统,这些计算机通过网络协同工作,对外表现为一个整体。

下面我通过 WebService 实现一个简单的分布式服务。

2.1 WebService 服务接口

package com.lj.data;
import javax.jws.WebMethod;
import javax.jws.WebService;
/**
 * WebService服务接口
 * 定义远程可调用的方法
 * 
 * RPC(远程过程调用)机制:
 * - RMI:客户端和服务器端都必须是Java
 * - WebService:支持跨语言、跨平台通信
 * - 其他:hessian, thrift, grpc, dubbo等
 */
@WebService(targetNamespace = "http://lj.com/wsdl")
public interface IAdminService {
	/**
	 * 定义WebService方法
	 * @return 字符串结果
	 */
	@WebMethod
	public String queryStr();
}

2.2 WebService 服务实现

package com.lj.data.impl;
import javax.jws.WebService;
import com.lj.data.IAdminService;
/**
 * WebService服务实现类
 * 提供接口中定义的方法的具体实现
 */
@WebService(
    portName = "admin",//端口名称
    serviceName = "AdminServiceImpl",//服务名称
    targetNamespace = "http://lj.com/wsdl",//必须与接口的命名空间一致
    endpointInterface = "com.lj.data.IAdminService"//指定实现的接口
)
public class AdminServiceImpl implements IAdminService {
	/**
	 * 实现查询方法
	 * @return 返回示例字符串
	 */
	@Override
	public String queryStr() {
		return "你好,世界";
	}
}

2.3 发布 WebService 服务

package com.lj.webserviceclient;
import javax.xml.ws.Endpoint;
import com.lj.data.IAdminService;
import com.lj.data.impl.AdminServiceImpl;
/**
 * WebService服务发布类
 * 将服务发布到指定地址,供客户端调用
 */
public class App {
    public static void main(String[] args) {
    	System.out.println("开始发布WebService远程服务...");
    	// 创建服务实现类实例
    	IAdminService adminService = new AdminServiceImpl();
    	// 发布服务,指定服务地址
    	Endpoint.publish("http://127.0.0.1:8225/AdminServiceImpl/admin", adminService);
    	System.out.println("WebService服务发布成功!");
    }
}

3.4 WebService 客户端调用

package com.lj.webserviceclient;
import java.net.URL;
import javax.xml.namespace.QName;
import javax.xml.ws.Service;
import com.lj.data.IAdminService;
/**
 * WebService客户端
 * 调用远程发布的WebService服务
 */
public class App {
    public static void main(String[] args) {
        try {
        	// 服务的URL地址
            URL url = new URL("http://127.0.0.1:8225/AdminServiceImpl/admin");
            // 创建QName对象,指定命名空间和服务名称
            QName qname = new QName("http://lj.com/wsdl", "AdminServiceImpl");
            // 创建Service对象
            Service service = Service.create(url, qname);
            // 获取服务接口实例
            IAdminService adminService = service.getPort(IAdminService.class);
            // 调用远程方法
            String message = adminService.queryStr();
            System.out.println("调用远程WebService服务的结果为: " + message);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

到此这篇关于Java 线程池+分布式实现代码的文章就介绍到这了,更多相关Java 线程池+分布式内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文