多任务的调度情况 node ts

  • Post author:
  • Post category:其他


标题:多任务的调度情况 node ts。

简介:在我们开发中有些关于分布式的任务需要部署执行的时候,那么这里就需要到关于这个逻辑的代码,来帮助我们解决这个繁琐的事情。以下的内容就给出了关于这块的实现,结构,只是需要在具体的逻辑中去添加我们需要实现的逻辑即可。

整体的结构中就好比是有一个包工头,他带领了很多的员工,然后包工头接了很多的工作事情,这些需要一些员工来执行,那么包工头就会将这些工作事情告诉员工们执行,员工们就做具体的事情,等待工作完成后告诉包工头自己任务完成即可,或可增加事情的结果等。

关于上面的介绍后,就需要建立一个包工头与员工的通信情况,这里采用了websocket来做。

有如下消息工具:

MsgUitl.js–来控制消息的解码编码与解析,具体的逻辑可以在后面自定义扩展

var iconv_lite = require("iconv-lite");

/**
 * 编码默认使用utf-8
 */
var coding = "utf-8";



/**
 * 解码消息 将消息的格式解析
 * 
 * @param {ArrayBuffer} data 
 * @returns {string}
 */
var decode = function (data) {
    //
    let str = iconv_lite.decode(data, "utf-8");
    //  console.log("decodeMsg : ", str);
    return str;
}

/**
 * 编码消息 将消息的格式编码
 * 
 * @param {string} msg 
 * 
 * @returns {ArrayBuffer}
 */
var encode = function (msg) {
    let data = iconv_lite.encode(msg, "utf-8");
    // console.log("encodeMsg : ", msg);
    return data;
}


/**
 * 解码消息 将消息内容解析
 * 
 * @param {ArrayBuffer} data 
 * @returns {msg:data}
 */
var decodeMsg = function (data) {
    //
    let str = decode(data);
    //  console.log("decodeMsg : ", str);
    return JSON.parse(str);
}

/**
 * 编码消息 将消息内容编码
 * 
 * @param {Object} data 
 * 
 * @returns {ArrayBuffer}
 */
var encodeMsg = function (data) {
    let jsonStr = JSON.stringify({ msg: data });
    let msgData = encode(jsonStr);
    // console.log("encodeMsg : ", msg);
    return msgData;
}

//导出
exports.encodeMsg = encodeMsg;
exports.decodeMsg = decodeMsg;

然后咱们创建任务具体的相关工具类来处理任务相关的事情–JobUilt.js

/**
 * 注册angent的数据
 * 
 * @param {string} angentName 
 * @returns 
 */
var registerAngentData = function (angentName) {
    return { register: angentName };
}

/**
 * 获取任务数据
 * 
 * @param {Object} data //具体任务数据
 * @param {string} angentName 
 * @returns { angentInfo: getAngentInfo(angentName), data: data }
 */
var getJobData = function (data, angentName) {
    let jobData = { angentInfo: getAngentInfo(angentName), data: data };
    return jobData;
}

/**
 * 获取一个任务响应数据
 * 
 * @param {Object} data //具体任务数据
 * @param {string} angentName 
 * @param {status} status 状态 0 成功 1 失败 2 异常 
 * 
 * @returns { angentInfoRev: getAngentInfo(angentName), data: data }
 */
var getJobRevData = function (data, angentName, status) {
    let jobData = { angentInfoRev: getAngentInfo(angentName), data: data, status: status };
    return jobData;
}

/**
 * 获取angentInfo信息
 * 
 * @param {object} angentName 
 * @returns {angentName:string}
 */
var getAngentInfo = function (angentName) {
    return { angentName: angentName };
}

//导出
exports.getJobData = getJobData;
exports.getAngentInfo = getAngentInfo;
exports.getJobRevData = getJobRevData;
exports.registerAngentData = registerAngentData;

有了上述的工具情况,咱们就可以创建 包工头了 Master.js

// 加载node上websocket模块 ws;

var configModel = require("../Config");
var MsgUtil = require("../MsgUtil");
var JobUtil = require("../JobUtil");


/**
 * 所有的可以用的angent
 */
var allAngents = new Set();

/**
 * 根据名来指定的angent客户
 */
var allMapAngents = {};

/**
 * 记录在执行任务的angent
 */
var doJobAngents = new Set();


/**
 * 根据客户来找AngentName名字
 * 
 * @param {ws} client_sock 
 * @returns 
 */
var getAngentNameByClientSock = function (client_sock) {
    for (let key in allMapAngents) {
        if (allMapAngents[key] == client_sock) {
            return key;
        }
    }
    return null;
}

/**
 * 获取一个空闲的angent
 * 
 * @returns 
 */
var getOneFreeAngent = function () {
    for (let key in allMapAngents) {
        let client_sock = allMapAngents[key];
        if (doJobAngents.has(client_sock)) {
            continue
        }
        return client_sock;
    }
    return null;
}

/**
 * 增加一个可以用的angent
 * 
 * @param {string} angentName 
 * @param {Object ws} client_sock 
 */
var addAngent = function (angentName, client_sock) {
    //
    let isOk = true;//是否注册成功
    //
    allAngents.add(client_sock);
    allMapAngents[angentName] = client_sock;

    //发送注册结果
    let msg = JobUtil.registerAngentData(isOk);
    let dataMsg = MsgUtil.encodeMsg(msg);
    client_sock.send(dataMsg);

    //打印可以的angent
    console.log(allAngents.size, " 个angent 可用");
}

/**
 * 增加一个可以用的angent
 * 
 * @param {string} angentName 
 * @param {Object ws} client_sock 
 */
var removeAngent = function (angentName, client_sock) {
    allMapAngents[angentName] = null;
    allAngents.delete(client_sock);
    //
    doJobAngents.delete(client_sock);//将做任务的他删除
    //打印可以的angent
    console.log(allAngents.size, " 个angent 可用");
}


/**
 * 模拟派发任务
 */
var distributeJob = function (jobInfo) {
    if (allAngents.size > 0) {
        let client_sock = getOneFreeAngent();
        if (client_sock) {
            //
            doJobAngents.add(client_sock);//做任务记录
            //
            let angentName = getAngentNameByClientSock(client_sock);
            //发送任务
            let msg = JobUtil.getJobData(jobInfo, angentName);
            let dataMsg = MsgUtil.encodeMsg(msg);
            client_sock.send(dataMsg);
            //
            console.log("可以派发任务 给 ", angentName);
            return true;
        }
    }
    console.warn("没有可用的angent---");
    return false;
}

/**
 * 开启master
 */
var start = function () {
    // 加载node上websocket模块 ws;
    var ws = require("ws");

    // 启动基于websocket的服务器,监听我们的客户端接入进来。
    var server = new ws.Server({
        host: configModel.config.ip,
        port: configModel.config.port,
    });

    // 监听接入进来的客户端事件
    function websocket_add_listener(client_sock) {
        // close事件
        client_sock.on("close", function () {
            let angentName = getAngentNameByClientSock(client_sock);
            removeAngent(angentName, client_sock);
            console.log("client close angentName:", angentName);
        });

        // error事件
        client_sock.on("error", function (err) {
            let angentName = getAngentNameByClientSock(client_sock);
            removeAngent(angentName, client_sock);
            console.log("client error angentName:", angentName, " err", err);
        });
        // end 

        // message 事件, data已经是根据websocket协议解码开来的原始数据;
        // websocket底层有数据包的封包协议,所以,绝对不会出现粘包的情况。
        // 每解一个数据包,就会触发一个message事件;
        // 不会出现粘包的情况,send一次,就会把send的数据独立封包。
        // 如果我们是直接基于TCP,我们要自己实现类似于websocket封包协议就可以完全达到一样的效果;
        client_sock.on("message", function (data) {
            // console.log(data);
            //收到
            let msgData = MsgUtil.decodeMsg(data);
            console.log("msg rev : ", msgData);
            //
            if (msgData.msg.register) {
                //存在注册
                console.log("注册 angent Name:", msgData.msg.register);
                addAngent(msgData.msg.register, client_sock);
                //
            } else if (msgData.msg.angentInfoRev) {
                //任务响应
                switch (msgData.msg.status) {
                    case 0://成功
                        console.log("任务 ", JSON.stringify(msgData.msg.data), " 成功");
                        break
                    case 1://失败
                        console.log("任务 ", JSON.stringify(msgData.msg.data), " 失败");
                        break
                    case 2://异常
                        console.log("任务 ", JSON.stringify(msgData.msg.data), " 异常");
                        break
                }
                //将任务执行的angent移除
                doJobAngents.delete(client_sock);
            }
        });
        // end 
    }

    // connection 事件, 有客户端接入进来;
    function on_server_client_comming(client_sock) {
        console.log("client comming");
        websocket_add_listener(client_sock);
    }

    server.on("connection", on_server_client_comming);

    // error事件,表示的我们监听错误;
    function on_server_listen_error(err) {
        console.error(err);
    }

    server.on("error", on_server_listen_error);

    // headers事件, 回给客户端的字符。
    function on_server_headers(data) {
        // console.log(data);
    }

    server.on("headers", on_server_headers);
}

start();

//模拟定时去派发任务
let jobId = 1;
let testDoJob = function () {
    setInterval(() => {
        if (distributeJob({ jobId: jobId })) {
            jobId++;
        }
    }, 3000);
}

testDoJob();

console.log("========启动Master========");

有了包工头也需要有员工 – 咱们创建员工 Angent.js

var minimist = require('minimist');


var configModel = require("../Config");
var MsgUtil = require("../MsgUtil");
var JObUtil = require("../JobUtil");



var argv = minimist(process.argv.slice(2));
console.log("命令行的参数情况:", argv);

//自己名
let usAngentName = argv.n;

var ws = require("ws");


/**
 * 做任务
 * 
 * @param {Object} jobData 
 * @param {ws} sock 
 */
var doJob = function (jobData, sock) {
    console.log("做任务:", JSON.stringify(jobData));
    //模拟做任务成功
    setTimeout(() => {
        //告诉做任务成功了
        let msg = JObUtil.getJobRevData(jobData, usAngentName, 0);
        let data = MsgUtil.encodeMsg(msg);
        console.log("===>告诉做任务成功了***");
        console.log("send msg : ", msg);
        //console.log("send data : ", data);
        sock.send(data);
    }, 5000);
}

/**
 * 开启
 */
var start = function () {
    // 创建了一个客户端的socket,然后让这个客户端去连接服务器的socket
    var sock = new ws("ws://" + configModel.config.ip + ":" + configModel.config.port);

    sock.on("open", function () {
        console.log("connect success !!!!");
        //发送注册消息
        let data = JObUtil.registerAngentData(usAngentName);
        let msgData = MsgUtil.encodeMsg(data);
        sock.send(msgData);
    });

    sock.on("error", function (err) {
        console.error("error: ", err);
    });

    sock.on("close", function () {
        console.error("master close");
    });

    sock.on("message", function (data) {
        // console.log(data);
        let msgData = MsgUtil.decodeMsg(data);
        console.log("client ", usAngentName, " rev msg:", msgData);
        if (msgData.msg.register) {
            //注册消息
            console.log("注册 ", msgData.msg.register == true ? "成功" : "失败!");
        } else if (msgData.msg.angentInfo) {
            //任务消息
            doJob(msgData.msg.data, sock);
        }
    });
}

//开启
start();

然后咱们就再给出个配置文件 Config.js

/**
 * 配置master的情况
 */
class Config {
    ip = "127.0.0.1";
    port = 6080;
}
exports.config = new Config();

然后咱们给出简单的开启的批处理脚本

开启Angent-001.bat

这里如果要多开几个就可以直接修改后面的-n参数即可

CHCP 65001
echo off
call node src/angents/Angent.js -n angent-001
pause

开启Master.bat

CHCP 65001
echo off
call node src/master/Master.js
pause

下面给出整体的目录结构方便参考

在这里插入图片描述

备注:上诉内容是自己整理起来的处理方式,如果有借鉴之处请谅解,如有不当之处请见谅。



版权声明:本文为qq_25095105原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。