javascript技巧

关注公众号 jb51net

关闭
首页 > 网络编程 > JavaScript > javascript技巧 > javaScript worker线程池

javaScript中worker实现线程池的示例代码

作者:不穿铠甲的穿山甲

本文主要介绍了javaScript中worker实现线程池,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

目录结构:

在这里插入图片描述

Lock

class Lock {
	static locks = {};
	static getInstance(key = ""){
		let instance = this.locks[key];
		if(null == instance){
			instance = new Lock();
			this.locks[key] = instance;
		}
		
		return instance;
	}
	
	constructor(){
		const sab = new SharedArrayBuffer(4);
		this._arr = new Int32Array(sab);
		Atomics.store(this._arr, 0, 0);
	}
	
	acquireLock() {
		let acquired = false;
		while (!acquired) {
			const expected = 0;
			const actual = Atomics.compareExchange(this._arr, 0, expected, 1);
			acquired = (actual === expected);
			if(acquired){
			//	console.log('Lock acquired in worker thread');
				return;
			}
			//console.log('Lock acquired in worker thread, wait');
			Atomics.wait(this._arr, 0);
			//console.log('Lock acquired in worker thread',"被唤醒");
		}
	}
	
	releaseLock() {
	//	console.log('Lock released in worker thread');
		Atomics.store(this._arr, 0, 0);
		Atomics.notify(this._arr, 0);
	}
	
}


module.exports = Lock;

BlockingQueue

let Lock = require('./Lock.js');

class BlockingQueue{
	constructor(key = ""){
		this._data = [];
		this._lock = Lock.getInstance(key);
	}
	
	getSize(){
		try{
			this._lock.acquireLock();
			return this._data.length;
		}catch(e){
			console.error(e);
		}finally{
			this._lock.releaseLock();
		}
	}
	
	offer(message){
		try{
			this._lock.acquireLock();
			this._data.push(message);
		}catch(e){
			console.error(e);
		}finally{
			this._lock.releaseLock();
		}
	}
	
	pop(){
		try{
			this._lock.acquireLock();
			return this._data.pop();
		}catch(e){
			console.error(e);
		}finally{
			this._lock.releaseLock();
		}
	}
	
	popList(count = 1){
		try{
			this._lock.acquireLock();
			let arr = [];
			count = count > this._data.length ? this._data.length : count;
			for(let index = 0; index < count; index ++){
				arr.push(this._data.pop());
			}
			return arr;
		}catch(e){
			console.error(e);
		}finally{
			this._lock.releaseLock();
		}
	}
	
	remove(arr){
		try{
			this._lock.acquireLock();
			this._data = this._data.filter(item => !arr.includes(item));
		}catch(e){
			console.error(e);
		}finally{
			this._lock.releaseLock();
		}
	}
	
	remove(item){
		try{
			this._lock.acquireLock();
			this._data = this._data.filter(em => !em == item);
		}catch(e){
			console.error(e);
		}finally{
			this._lock.releaseLock();
		}
	}
	
	getArr(){
		try{
			this._lock.acquireLock();
			return this._data;
		}catch(e){
			console.error(e);
		}finally{
			this._lock.releaseLock();
		}
	}
	
}

module.exports = BlockingQueue;

Message

class Message{
	constructor(data){
		this._data = data;
	}
	
	getData(){
		return this._data;
	}

}

module.exports = Message;

线程实现

worker.js

const {
	parentPort,
	threadId
} = require('worker_threads');

parentPort.on('message', (msg) => {
	let data = msg.data;
	let fun = eval(msg.fun);
	fun(data);
	console.log("threadId=",threadId, " data=", msg)
	//parentPort.postMessage(msg);
});

Thread

const {
	Worker
} = require('worker_threads');
var path = require('path')
let workerJsPath = path.resolve(__dirname +'/worker.js')

class Thread {
	
	constructor(runnable, name = "") {
		this._name = name;
		this._target = runnable;
		this._interrupted = false;
	}

	run() {
		this._target.run()
	}

	start() {
		this._initForkThread();
	}
	
	stop(){
		this._worker.terminate();
	}
	
	_initForkThread(){
		this._worker = new Worker(workerJsPath);
		// this._worker.on('message', (msg) => {
		// 	this._target.data = msg;
		// 	this._target.status = "done";
		// 	console.log("收到回复", msg)
		// });
		
		this._worker.on('exit', (msg) => {
			this._interrupted = true;
			console.log("线程中断")
		});
		
	}
	
	isInterrupted(){
		return this._interrupted;
	}
	
	getName(){
		return this._name;
	}
	
	postMessage(message){
		this._worker.postMessage({
			fun: `${this._target}`,
			data: message.data
		});
	}

}

module.exports = Thread;

TaskThread

class TaskThread{
	
	constructor(thread, queue){
		this._thread = thread;
		this._queue = queue;
	}
	
	start(){
		this._thread.start();
		try{
			let start = performance.now();
			let interval = setInterval(() => {
				if(!this.isInterrupted()){
					let message = this._queue.pop();
					if(null == message){
						return;
					}
					this._thread.postMessage(message);
				}else{
					clearInterval(interval);
				}
			}, 500);

		}catch(e){
			console.error("错误信息",e);
		}
	}
	
	isInterrupted(){
		return this._thread.isInterrupted();
	}
}

module.exports = TaskThread;

ThreadPool

let Thread = require("./Thread.js")
let BlockingQueue = require("./BlockingQueue.js")
let TaskThread = require("./TaskThread.js")

class ThreadPool{
	constructor(threadCount = 4){
		this._count = threadCount;
		this._queue = new BlockingQueue("queue"); 
		this._threads = new BlockingQueue("thread")
		setInterval(() => {
			if(0 == this._queue.getSize()){
				return;
			}
			//移除无效线程
			let removeThreads = this._threads.getArr().map(thread => thread.isInterrupted());
			this._threads.remove(removeThreads);
			
			// //检验线程数
			let activeThreadCount = this._threads.getSize();
			if(activeThreadCount < this._count && this._queue.getSize() > 0){
				this.createThread(this._count - activeThreadCount);
			}
			
		}, 10);
	}
	
	createThread(count){
		for(let index = 0; index < count; index ++ ){
			let thread = new Thread((m) => {
				console.log(m)
			}, "fork_task_" + index);
			
			let taskThread = new TaskThread(thread, this._queue);
			this._threads.offer(taskThread);
			taskThread.start();
		}
	}
	
	addTask(message){
		this._queue.offer(message);
	}
}

module.exports = ThreadPool;

测试用例:
test.js

let ThreadPool = require("./ThreadPool.js")

let threadPool = new ThreadPool();

threadPool.addTask({
	data: "99999"
})

threadPool.addTask({
	data: "88888"
})

threadPool.addTask({
	data: "77777"
})

运行test.js

node test.js

在这里插入图片描述

到此这篇关于javaScript中worker实现线程池的文章就介绍到这了,更多相关javaScript worker线程池内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文