node.js

关注公众号 jb51net

关闭
首页 > 网络编程 > JavaScript > node.js > node连接kafka

node连接kafka2.0实现方法示例

作者:他强任他强03

这篇文章主要介绍了node连接kafka2.0,nodejs连接kafka2.0的实现方法,结合实例形式分析了kafka2.0的功能、原理、以及node.js连接kafka2.0的具体实现技巧,需要的朋友可以参考下

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据

node.js使用Kafka需要安装的npm包:https://www.npmjs.com/package/wisrtoni40-confluent-schema#Quickstart

npm i wisrtoni40-confluent-schema --save

procedurer.ts文件

import { HighLevelProducer, KafkaClient } from 'kafka-node';
 import { v4 as uuidv4 } from 'uuid';
 import {
   ConfluentAvroStrategy,
   ConfluentMultiRegistry,
   ConfluentPubResolveStrategy,
 } from 'wisrtoni40-confluent-schema';
 /**
  * -----------------------------------------------------------------------------
  * Config
  * -----------------------------------------------------------------------------
  */
 const kafkaHost = '你的kafka host';
 const topic = '你的topic';
 const registryHost =
   '你的kafka注册host';
 /**
  * -----------------------------------------------------------------------------
  * Kafka Client and Producer
  * -----------------------------------------------------------------------------
  */
 const kafkaClient = new KafkaClient({
   kafkaHost,
   clientId: uuidv4(),
   connectTimeout: 60000,
   requestTimeout: 60000,
   connectRetryOptions: {
     retries: 5,
     factor: 0,
     minTimeout: 1000,
     maxTimeout: 1000,
     randomize: false,
   },
   sasl: {
     mechanism: 'plain',
     username: '你的kafka用户名',
     password: '你的kafka密码',
   },
 });
 const producer = new HighLevelProducer(kafkaClient, {
   requireAcks: 1,
   ackTimeoutMs: 100,
 });
 /**
  * -----------------------------------------------------------------------------
  * Confluent Resolver
  * -----------------------------------------------------------------------------
  */
 const schemaRegistry = new ConfluentMultiRegistry(registryHost);
 const avro = new ConfluentAvroStrategy();
 const resolver = new ConfluentPubResolveStrategy(schemaRegistry, avro, topic);
 /**
  * -----------------------------------------------------------------------------
  * Produce
  * -----------------------------------------------------------------------------
  */
 (async () => {
   const data = {
    evt_dt: 1664446229425,
    evt_type: 'tower_unload',
    plant: 'F110',
    machineName: 'TOWER_01',
    errorCode: '',
    description: '',
    result: 'OK',
    evt_ns: 'wmy.dx',
    evt_tp: 'tower.error',
    evt_pid: 'TOWER_01',
    evt_pubBy: 'nifi.11142'
   };
   const processedData = await resolver.resolve(data);
   producer.send([{ topic, messages: processedData }], (error, result) => {
     if (error) {
       console.error(error);
     } else {
       console.log(result);
     }
   });
 })();

procedurer.js文件

var kafka_node_1 = require("kafka-node");
var uuid_1 = require("uuid");
var wisrtoni40_confluent_schema_1 = require("wisrtoni40-confluent-schema");
var kafkaHost = '你的kafka host';
var topic = '你的topic';
var registryHost = '你的kafka注册host';
const kafkaClient = new kafka_node_1.KafkaClient({
  kafkaHost,
  clientId: (0, uuid_1.v4)(),
  connectTimeout: 60000,
  requestTimeout: 60000,
  connectRetryOptions: {
    retries: 5,
    factor: 0,
    minTimeout: 1000,
    maxTimeout: 1000,
    randomize: false,
  },
  sasl: {
    mechanism: 'plain',
    username: '你的kafka用户名',
    password: '你的kafka密码',
  },
});
const producer = new kafka_node_1.HighLevelProducer(kafkaClient, {
  requireAcks: 1,
  ackTimeoutMs: 100,
});
const schemaRegistry = new wisrtoni40_confluent_schema_1.ConfluentMultiRegistry(registryHost);
const avro = new wisrtoni40_confluent_schema_1.ConfluentAvroStrategy();
const resolver = new wisrtoni40_confluent_schema_1.ConfluentPubResolveStrategy(schemaRegistry, avro, topic);
(async () => {
  const data = {
    evt_dt: 1664446229425,
    evt_type: 'tower_unload',
    plant: 'F110',
    machineName: 'TOWER_01',
    errorCode: '',
    description: '',
    result: 'OK',
    evt_ns: 'wmy.dx',
    evt_tp: 'tower.error',
    evt_pid: 'TOWER_01',
    evt_pubBy: 'nifi.11142'
  };
  const processedData = await resolver.resolve(data);
  producer.send([{ topic, messages: processedData }], (error, result) => {
    if (error) {
      console.error(error);
    } else {
      console.log(result);
    }
  });
})();

consumer.ts文件

import { ConsumerGroup } from 'kafka-node';
import { v4 as uuidv4 } from 'uuid';
import {
  ConfluentAvroStrategy,
  ConfluentMultiRegistry,
  ConfluentSubResolveStrategy,
} from 'wisrtoni40-confluent-schema';
/**
 * -----------------------------------------------------------------------------
 * Config
 * -----------------------------------------------------------------------------
 */
const kafkaHost = '你的kafka host';
const topic = '你的topic';
const registryHost =
  '你的kafka注册host';
/**
 * -----------------------------------------------------------------------------
 * Kafka Consumer
 * -----------------------------------------------------------------------------
 */
const consumer = new ConsumerGroup(
  {
    kafkaHost,
    groupId: uuidv4(),
    sessionTimeout: 15000,
    protocol: ['roundrobin'],
    encoding: 'buffer',
    fromOffset: 'latest',
    outOfRangeOffset: 'latest',
    sasl: {
      mechanism: 'plain',
      username: '你的kafka用户名',
      password: '你的kafka密码',
    },
  },
  topic,
);
/**
 * -----------------------------------------------------------------------------
 * Confluent Resolver
 * -----------------------------------------------------------------------------
 */
const schemaRegistry = new ConfluentMultiRegistry(registryHost);
const avro = new ConfluentAvroStrategy();
const resolver = new ConfluentSubResolveStrategy(schemaRegistry, avro);
/**
 * -----------------------------------------------------------------------------
 * Consume
 * -----------------------------------------------------------------------------
 */
consumer.on('message', async msg => {
  const result = await resolver.resolve(msg.value);
  console.log(msg.offset);
  console.log(result);
});

comsumer.js文件

var kafka_node_1 = require("kafka-node");
var uuid_1 = require("uuid");
var wisrtoni40_confluent_schema_1 = require("wisrtoni40-confluent-schema");
var kafkaHost = '你的kafka host';
var topic = '你的topic';
var registryHost = '你的kafka注册host';
var consumer = new kafka_node_1.ConsumerGroup({
  kafkaHost: kafkaHost,
  groupId: (0, uuid_1.v4)(),
  sessionTimeout: 15000,
  protocol: ['roundrobin'],
  encoding: 'buffer',
  fromOffset: 'latest',
  outOfRangeOffset: 'latest',
  sasl: {
    mechanism: 'plain',
    username: '你的kafka用户名',
    password: '你的kafka密码'
  }
}, topic);
var schemaRegistry = new wisrtoni40_confluent_schema_1.ConfluentMultiRegistry(registryHost);
var avro = new wisrtoni40_confluent_schema_1.ConfluentAvroStrategy();
var resolver = new wisrtoni40_confluent_schema_1.ConfluentSubResolveStrategy(schemaRegistry, avro);
consumer.on('message', async function (msg) {
  const result = await resolver.resolve(msg.value);
  console.log(msg.offset);
  console.log(result);
});

附:kafka官网: https://kafka.apache.org/

您可能感兴趣的文章:
阅读全文