node连接kafka2.0实现方法示例
作者:他强任他强03
这篇文章主要介绍了node连接kafka2.0,nodejs连接kafka2.0的实现方法,结合实例形式分析了kafka2.0的功能、原理、以及node.js连接kafka2.0的具体实现技巧,需要的朋友可以参考下
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据
node.js使用Kafka需要安装的npm包:https://www.npmjs.com/package/wisrtoni40-confluent-schema#Quickstart
npm i wisrtoni40-confluent-schema --save
procedurer.ts文件
import { HighLevelProducer, KafkaClient } from 'kafka-node'; import { v4 as uuidv4 } from 'uuid'; import { ConfluentAvroStrategy, ConfluentMultiRegistry, ConfluentPubResolveStrategy, } from 'wisrtoni40-confluent-schema'; /** * ----------------------------------------------------------------------------- * Config * ----------------------------------------------------------------------------- */ const kafkaHost = '你的kafka host'; const topic = '你的topic'; const registryHost = '你的kafka注册host'; /** * ----------------------------------------------------------------------------- * Kafka Client and Producer * ----------------------------------------------------------------------------- */ const kafkaClient = new KafkaClient({ kafkaHost, clientId: uuidv4(), connectTimeout: 60000, requestTimeout: 60000, connectRetryOptions: { retries: 5, factor: 0, minTimeout: 1000, maxTimeout: 1000, randomize: false, }, sasl: { mechanism: 'plain', username: '你的kafka用户名', password: '你的kafka密码', }, }); const producer = new HighLevelProducer(kafkaClient, { requireAcks: 1, ackTimeoutMs: 100, }); /** * ----------------------------------------------------------------------------- * Confluent Resolver * ----------------------------------------------------------------------------- */ const schemaRegistry = new ConfluentMultiRegistry(registryHost); const avro = new ConfluentAvroStrategy(); const resolver = new ConfluentPubResolveStrategy(schemaRegistry, avro, topic); /** * ----------------------------------------------------------------------------- * Produce * ----------------------------------------------------------------------------- */ (async () => { const data = { evt_dt: 1664446229425, evt_type: 'tower_unload', plant: 'F110', machineName: 'TOWER_01', errorCode: '', description: '', result: 'OK', evt_ns: 'wmy.dx', evt_tp: 'tower.error', evt_pid: 'TOWER_01', evt_pubBy: 'nifi.11142' }; const processedData = await resolver.resolve(data); producer.send([{ topic, messages: processedData }], (error, result) => { if (error) { console.error(error); } else { console.log(result); } }); })();
procedurer.js文件
var kafka_node_1 = require("kafka-node"); var uuid_1 = require("uuid"); var wisrtoni40_confluent_schema_1 = require("wisrtoni40-confluent-schema"); var kafkaHost = '你的kafka host'; var topic = '你的topic'; var registryHost = '你的kafka注册host'; const kafkaClient = new kafka_node_1.KafkaClient({ kafkaHost, clientId: (0, uuid_1.v4)(), connectTimeout: 60000, requestTimeout: 60000, connectRetryOptions: { retries: 5, factor: 0, minTimeout: 1000, maxTimeout: 1000, randomize: false, }, sasl: { mechanism: 'plain', username: '你的kafka用户名', password: '你的kafka密码', }, }); const producer = new kafka_node_1.HighLevelProducer(kafkaClient, { requireAcks: 1, ackTimeoutMs: 100, }); const schemaRegistry = new wisrtoni40_confluent_schema_1.ConfluentMultiRegistry(registryHost); const avro = new wisrtoni40_confluent_schema_1.ConfluentAvroStrategy(); const resolver = new wisrtoni40_confluent_schema_1.ConfluentPubResolveStrategy(schemaRegistry, avro, topic); (async () => { const data = { evt_dt: 1664446229425, evt_type: 'tower_unload', plant: 'F110', machineName: 'TOWER_01', errorCode: '', description: '', result: 'OK', evt_ns: 'wmy.dx', evt_tp: 'tower.error', evt_pid: 'TOWER_01', evt_pubBy: 'nifi.11142' }; const processedData = await resolver.resolve(data); producer.send([{ topic, messages: processedData }], (error, result) => { if (error) { console.error(error); } else { console.log(result); } }); })();
consumer.ts文件
import { ConsumerGroup } from 'kafka-node'; import { v4 as uuidv4 } from 'uuid'; import { ConfluentAvroStrategy, ConfluentMultiRegistry, ConfluentSubResolveStrategy, } from 'wisrtoni40-confluent-schema'; /** * ----------------------------------------------------------------------------- * Config * ----------------------------------------------------------------------------- */ const kafkaHost = '你的kafka host'; const topic = '你的topic'; const registryHost = '你的kafka注册host'; /** * ----------------------------------------------------------------------------- * Kafka Consumer * ----------------------------------------------------------------------------- */ const consumer = new ConsumerGroup( { kafkaHost, groupId: uuidv4(), sessionTimeout: 15000, protocol: ['roundrobin'], encoding: 'buffer', fromOffset: 'latest', outOfRangeOffset: 'latest', sasl: { mechanism: 'plain', username: '你的kafka用户名', password: '你的kafka密码', }, }, topic, ); /** * ----------------------------------------------------------------------------- * Confluent Resolver * ----------------------------------------------------------------------------- */ const schemaRegistry = new ConfluentMultiRegistry(registryHost); const avro = new ConfluentAvroStrategy(); const resolver = new ConfluentSubResolveStrategy(schemaRegistry, avro); /** * ----------------------------------------------------------------------------- * Consume * ----------------------------------------------------------------------------- */ consumer.on('message', async msg => { const result = await resolver.resolve(msg.value); console.log(msg.offset); console.log(result); });
comsumer.js文件
var kafka_node_1 = require("kafka-node"); var uuid_1 = require("uuid"); var wisrtoni40_confluent_schema_1 = require("wisrtoni40-confluent-schema"); var kafkaHost = '你的kafka host'; var topic = '你的topic'; var registryHost = '你的kafka注册host'; var consumer = new kafka_node_1.ConsumerGroup({ kafkaHost: kafkaHost, groupId: (0, uuid_1.v4)(), sessionTimeout: 15000, protocol: ['roundrobin'], encoding: 'buffer', fromOffset: 'latest', outOfRangeOffset: 'latest', sasl: { mechanism: 'plain', username: '你的kafka用户名', password: '你的kafka密码' } }, topic); var schemaRegistry = new wisrtoni40_confluent_schema_1.ConfluentMultiRegistry(registryHost); var avro = new wisrtoni40_confluent_schema_1.ConfluentAvroStrategy(); var resolver = new wisrtoni40_confluent_schema_1.ConfluentSubResolveStrategy(schemaRegistry, avro); consumer.on('message', async function (msg) { const result = await resolver.resolve(msg.value); console.log(msg.offset); console.log(result); });
附:kafka官网: https://kafka.apache.org/