热门排行
简介
ksqlDB是Apache Kafka的事件流数据库。它是分布式的,可伸缩的,可靠的和实时的。ksqlDB通过熟悉的轻量级SQL语法将实时流处理的功能与关系数据库的平易近人的感觉结合在一起。ksqlDB提供了以下核心原语:
1、流和表-通过Apache Kafka主题数据在架构上创建关系
2、物化视图-使用SQL在流上定义实时,增量更新的物化视图
3、推送查询-连续查询,可将增量结果实时推送到客户端
4、拉式查询-按需查询物化视图,非常类似于传统数据库
5、连接-完全从ksqlDB内部与任何Kafka Connect数据源或接收器集成
组合这些强大的原语,使您能够仅使用SQL语句来构建完整的流应用程序,从而最大程度地减少了复杂性和运营开销。ksqlDB支持广泛的操作,包括聚合,联接,窗口化,会话化等等。您可以在此处找到更多的ksqlDB教程和资源。
用例和范例
物化视图
ksqlDB允许您在流和表上定义实例化视图。物化视图由所谓的“持久查询”定义。这些查询被称为持久查询,因为它们使用表维护其增量更新的结果。
CREATE TABLE hourly_metrics AS
SELECT url, COUNT(*)
FROM page_views
WINDOW TUMBLING (SIZE 1 HOUR)
GROUP BY url EMIT CHANGES;
可以通过查询从需求的实现视图中“拉出”结果SELECT。以下查询将返回单行:
SELECT * FROM hourly_metrics
WHERE URL = 'http://myurl.com' AND WINDOWSTART = '2019-11-20T19:00' ;
结果也可以通过流查询连续地“推送”给客户端SELECT。以下流查询将向客户端推送对实例化视图所做的所有增量更改:
SELECT * FROM hourly_metrics EMIT CHANGES ;
流查询将永久运行,直到明确终止它们为止。
流式ETL
Apache Kafka是为数据管道提供动力的流行选择。ksqlDB使在管道内转换数据变得简单,从而使消息可以干净地降落在另一个系统中。
CREATE STREAM vip_actions AS
SELECT userid, page, action
FROM clickstream c
LEFT JOIN users u ON c.userid = u.user_id
WHERE u.level = 'Platinum' EMIT CHANGES;
异常检测
ksqlDB非常适合识别实时数据的模式或异常。通过在数据到达时处理流,您可以识别并适当地以毫秒级延迟掩盖异常事件。
CREATE TABLE possible_fraud AS
SELECT card_number, count(*)
FROM authorization_attempts
WINDOW TUMBLING (SIZE 5 SECONDS)
GROUP BY card_number
HAVING count(*) > 3 EMIT CHANGES;
监控方式
Kafka具有通过流处理提供可伸缩的有序消息的能力,使其成为日志数据监视和警报的通用解决方案。ksqlDB提供了一种熟悉的语法,用于跟踪,理解和管理警报。
CREATE TABLE error_counts AS
SELECT error_code, count(*)
FROM monitoring_stream
WINDOW TUMBLING (SIZE 1 MINUTE)
WHERE type = 'ERROR'
GROUP BY error_code EMIT CHANGES;
与外部数据源和接收器集成
ksqlDB包括与Kafka Connect数据源和接收器的本机集成,可有效地在各种外部系统上提供统一的SQL接口。
以下查询是一个简单的持久性流查询,它将所有输出产生到名为的主题中
CREATE STREAM clicks_transformed AS
SELECT userid, page, action
FROM clickstream c
LEFT JOIN users u ON c.userid = u.user_id EMIT CHANGES;
与其简单地将所有连续查询输出发送到Kafka主题,还不如将输出路由到另一个数据存储区非常有用。ksqlDB的Kafka Connect集成使此模式非常容易。
以下语句将创建一个Kafka Connect接收器连接器,该连接器将上述流式ETL查询的所有输出直接连续发送到Elasticsearch:
CREATE SINK CONNECTOR es_sink WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'topics' = 'clicks_transformed',
'key.ignore' = 'true',
'schema.ignore' = 'true',
'type.name' = '',
'connection.url' = 'http://elasticsearch:9200');