javaScript中worker实现线程池的示例代码
作者:不穿铠甲的穿山甲
本文主要介绍了javaScript中worker实现线程池,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
目录结构:
Lock
class Lock { static locks = {}; static getInstance(key = ""){ let instance = this.locks[key]; if(null == instance){ instance = new Lock(); this.locks[key] = instance; } return instance; } constructor(){ const sab = new SharedArrayBuffer(4); this._arr = new Int32Array(sab); Atomics.store(this._arr, 0, 0); } acquireLock() { let acquired = false; while (!acquired) { const expected = 0; const actual = Atomics.compareExchange(this._arr, 0, expected, 1); acquired = (actual === expected); if(acquired){ // console.log('Lock acquired in worker thread'); return; } //console.log('Lock acquired in worker thread, wait'); Atomics.wait(this._arr, 0); //console.log('Lock acquired in worker thread',"被唤醒"); } } releaseLock() { // console.log('Lock released in worker thread'); Atomics.store(this._arr, 0, 0); Atomics.notify(this._arr, 0); } } module.exports = Lock;
BlockingQueue
let Lock = require('./Lock.js'); class BlockingQueue{ constructor(key = ""){ this._data = []; this._lock = Lock.getInstance(key); } getSize(){ try{ this._lock.acquireLock(); return this._data.length; }catch(e){ console.error(e); }finally{ this._lock.releaseLock(); } } offer(message){ try{ this._lock.acquireLock(); this._data.push(message); }catch(e){ console.error(e); }finally{ this._lock.releaseLock(); } } pop(){ try{ this._lock.acquireLock(); return this._data.pop(); }catch(e){ console.error(e); }finally{ this._lock.releaseLock(); } } popList(count = 1){ try{ this._lock.acquireLock(); let arr = []; count = count > this._data.length ? this._data.length : count; for(let index = 0; index < count; index ++){ arr.push(this._data.pop()); } return arr; }catch(e){ console.error(e); }finally{ this._lock.releaseLock(); } } remove(arr){ try{ this._lock.acquireLock(); this._data = this._data.filter(item => !arr.includes(item)); }catch(e){ console.error(e); }finally{ this._lock.releaseLock(); } } remove(item){ try{ this._lock.acquireLock(); this._data = this._data.filter(em => !em == item); }catch(e){ console.error(e); }finally{ this._lock.releaseLock(); } } getArr(){ try{ this._lock.acquireLock(); return this._data; }catch(e){ console.error(e); }finally{ this._lock.releaseLock(); } } } module.exports = BlockingQueue;
Message
class Message{ constructor(data){ this._data = data; } getData(){ return this._data; } } module.exports = Message;
线程实现
worker.js
const { parentPort, threadId } = require('worker_threads'); parentPort.on('message', (msg) => { let data = msg.data; let fun = eval(msg.fun); fun(data); console.log("threadId=",threadId, " data=", msg) //parentPort.postMessage(msg); });
Thread
const { Worker } = require('worker_threads'); var path = require('path') let workerJsPath = path.resolve(__dirname +'/worker.js') class Thread { constructor(runnable, name = "") { this._name = name; this._target = runnable; this._interrupted = false; } run() { this._target.run() } start() { this._initForkThread(); } stop(){ this._worker.terminate(); } _initForkThread(){ this._worker = new Worker(workerJsPath); // this._worker.on('message', (msg) => { // this._target.data = msg; // this._target.status = "done"; // console.log("收到回复", msg) // }); this._worker.on('exit', (msg) => { this._interrupted = true; console.log("线程中断") }); } isInterrupted(){ return this._interrupted; } getName(){ return this._name; } postMessage(message){ this._worker.postMessage({ fun: `${this._target}`, data: message.data }); } } module.exports = Thread;
TaskThread
class TaskThread{ constructor(thread, queue){ this._thread = thread; this._queue = queue; } start(){ this._thread.start(); try{ let start = performance.now(); let interval = setInterval(() => { if(!this.isInterrupted()){ let message = this._queue.pop(); if(null == message){ return; } this._thread.postMessage(message); }else{ clearInterval(interval); } }, 500); }catch(e){ console.error("错误信息",e); } } isInterrupted(){ return this._thread.isInterrupted(); } } module.exports = TaskThread;
ThreadPool
let Thread = require("./Thread.js") let BlockingQueue = require("./BlockingQueue.js") let TaskThread = require("./TaskThread.js") class ThreadPool{ constructor(threadCount = 4){ this._count = threadCount; this._queue = new BlockingQueue("queue"); this._threads = new BlockingQueue("thread") setInterval(() => { if(0 == this._queue.getSize()){ return; } //移除无效线程 let removeThreads = this._threads.getArr().map(thread => thread.isInterrupted()); this._threads.remove(removeThreads); // //检验线程数 let activeThreadCount = this._threads.getSize(); if(activeThreadCount < this._count && this._queue.getSize() > 0){ this.createThread(this._count - activeThreadCount); } }, 10); } createThread(count){ for(let index = 0; index < count; index ++ ){ let thread = new Thread((m) => { console.log(m) }, "fork_task_" + index); let taskThread = new TaskThread(thread, this._queue); this._threads.offer(taskThread); taskThread.start(); } } addTask(message){ this._queue.offer(message); } } module.exports = ThreadPool;
测试用例:
test.js
let ThreadPool = require("./ThreadPool.js") let threadPool = new ThreadPool(); threadPool.addTask({ data: "99999" }) threadPool.addTask({ data: "88888" }) threadPool.addTask({ data: "77777" })
运行test.js
node test.js
到此这篇关于javaScript中worker实现线程池的文章就介绍到这了,更多相关javaScript worker线程池内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!