使用Kafka和KSQL进行实时系统日志处理:过滤
本帖最后由 pig2 于 2018-7-14 11:10 编辑问题导读
1.kafka sql的作用是什么?
2.kafka sql如何实现过滤数据?
3.kafka sql对于过去的,现在接受的,以后接受的数据如何过滤?
static/image/hrline/line7.png
系统日志是现代计算运行的无处不在的标准之一。 Linux,它在IP摄像机等网络和物联网设备中也很常见。它提供了一种流式传输日志消息的方法,以及源主机,消息严重性等元数据。有时目标只是一个本地日志文件,但更常见的是它是一个集中式系统日志服务器,它可以进一步记录或处理消息。
作为一种高性能的分布式流媒体平台,ApacheKafka®是集中提取系统日志数据的绝佳工具。由于Apache Kafka还持久保存数据并支持本地流处理,因此我们无需在使用数据之前将其置于其中。您可以通过各种方式流式传输数据,包括具有专用syslog插件的Kafka Connect。
在这篇文章中,我们将看到KSQL如何在实时到达时处理系统日志消息。 KSQL是Apache Kafka的SQL流引擎。使用SQL,交互式执行或作为应用程序,我们可以在Kafka中过滤,丰富和聚合数据流。来自KSQL的数据处理可以追溯到Kafka,这意味着我们可以轻松地获取Kafka中的数据。
使用KSQL过滤syslog数据
使用KSQL处理syslog和流很简单。 首先,下载并安装Confluent Platform(https://www.confluent.io/download/),然后按照GitHub仓库中的说明安装KSQL(https://github.com/confluentinc/ksql/)。 (从Confluent Platform 4.1开始,KSQL将包含在平台中。)你需要安装和配置Kafka Connect syslog插件(https://github.com/rmoff/kafka-connect-syslog/blob/4eeb310bf159487d78711d9d3c56c76c28651edb/usage.adoc),然后配置syslog source,发送数据。 如何操作取决于客户端syslog实现 - 对于rsyslog,请参见此处(http://www.rsyslog.com/sending-messages-to-a-remote-syslog-server/)。
假设数据使用syslog topic接受,我们可以从KSQL提示(运行ksql local开始)开始检查topic上的数据:
ksql> PRINT 'syslog' FROM BEGINNING;
Format:AVRO
12/03/18 12:21:06 GMT, */192.168.10.250:47823, {"date": null, "facility": 1, "host": "I", "level": 6, "message": "I logs\n", "charset": "UTF-8", "remote_address": "/192.168.10.250:47823", "hostname": "proxmox01.moffatt.me"}
如果让这个运行,将在控制台看到所有的新事件流。按Ctrl-C返回到KSQL提示符。
我们现在可以使用(检测到的格式)在topic上声明KSQL:
ksql> CREATE STREAM SYSLOG WITH (KAFKA_TOPIC='syslog',VALUE_FORMAT='AVRO');
Message
----------------
Stream created
----------------
使用标准SQL命令,我们可以查询和操作事件流。
ksql> SELECT HOSTNAME,MESSAGE FROM SYSLOG;
proxmox01.moffatt.me | Ilogs
proxmox01.moffatt.me | I stilllogs
这是一个连续查询,并将继续显示到达时新数据。 按Ctrl-C取消并返回KSQL提示符。
Apache Kafka数据仍然存在,因此KSQL不仅可以显示当前和未来的入站(接受)消息 - 我们还可以查询过去! 为此,我们将KSQL设置从主题的开头处理消息:
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
现在你可以重新运行select,你将会看到syslog topic的很多内容
ksql> SELECT HOSTNAME,MESSAGE FROM SYSLOG;
localhost | X
localhost | foo I love logs
localhost | I logs
proxmox01.moffatt.me | I logs
proxmox01.moffatt.me | I still logs
我们现在接入实时系统日志,这样我们可以看到KSQL的潜力。我的日志流来自家庭网络,包括一些服务器,容器,一些network应用,和一堆移动设备。
ksql> SELECT TIMESTAMPTOSTRING(DATE, 'yyyy-MM-dd HH:mm:ss') AS SYSLOG_TS, HOST, MESSAGE FROM SYSLOG;
2018-03-12 13:30:59 | rpi-03 | rpi-03 sshd: Invalid user oracle from 185.55.218.153
2018-03-12 13:30:59 | rpi-03 | rpi-03 sshd: input_userauth_request: invalid user oracle
2018-03-12 13:30:59 | rpi-03 | rpi-03 sshd: Received disconnect from 185.55.218.153: 11: Bye Bye
2018-03-12 13:31:00 | rpi-03 | rpi-03 sshd: reverse mapping checking getaddrinfo for host3.artegix.info failed - POSSIBLE BREAK-IN ATTEMPT!
2018-03-12 13:31:01 | rpi-03 | rpi-03 sshd: Invalid user test from 185.55.218.153
2018-03-12 13:31:01 | rpi-03 | rpi-03 sshd: input_userauth_request: invalid user test
2018-03-12 13:31:01 | rpi-03 | rpi-03 sshd: Received disconnect from 185.55.218.153: 11: Bye Bye
2018-03-12 13:31:02 | rpi-03 | rpi-03 sshd: reverse mapping checking getaddrinfo for host3.artegix.info failed - POSSIBLE BREAK-IN ATTEMPT!
2018-03-12 13:31:02 | rpi-03 | rpi-03 sshd: Invalid user test from 185.55.218.153
2018-03-12 13:31:05 | ("BZ2,24a43c000000,v3.7.40.6115") | ("BZ2,24a43c000000,v3.7.40.6115") syslog: dpi.dpi_stainfo_notify(): dpi not enable
2018-03-12 13:31:05 | ("BZ2,24a43c000000,v3.7.40.6115") | ("BZ2,24a43c000000,v3.7.40.6115") hostapd: ath1: STA xx:xx:xx:xx:xx:xx IEEE 802.11: associated
2018-03-12 13:31:06 | ("U7PG2,f09fc2000000,v3.7.40.6115") | ("U7PG2,f09fc2000000,v3.7.40.6115") hostapd: ath3: STA xx:xx:xx:xx:xx:xx IEEE 802.11: disassociated
2018-03-12 13:31:06 | ("U7PG2,f09fc2000000,v3.7.40.6115") | ("U7PG2,f09fc2000000,v3.7.40.6115") libubnt: dpi.dpi_stainfo_notify(): dpi not enable
2018-03-12 13:31:06 | ("U7PG2,f09fc2000000,v3.7.40.6115") | ("U7PG2,f09fc2000000,v3.7.40.6115") libubnt: wevent.ubnt_custom_event(): EVENT_STA_LEAVE ath3: xx:xx:xx:xx:xx:xx / 0
2018-03-12 13:31:06 | ("BZ2,24a43c000000,v3.7.40.6115") | ("BZ2,24a43c000000,v3.7.40.6115") hostapd: ath1: STA xx:xx:xx:xx:xx:xx RADIUS: starting accounting session 5A9BFF48-00000286
2018-03-12 13:31:06 | ("BZ2,24a43c000000,v3.7.40.6115") | ("BZ2,24a43c000000,v3.7.40.6115") hostapd: ath1: STA xx:xx:xx:xx:xx:xx WPA: pairwise key handshake completed (RSN)
2018-03-12 13:31:06 | ("BZ2,24a43c000000,v3.7.40.6115") | ("BZ2,24a43c000000,v3.7.40.6115") syslog: wevent.ubnt_custom_event(): EVENT_STA_JOIN ath1: xx:xx:xx:xx:xx:xx / 4
2018-03-12 13:33:35 | proxmox01 | proxmox01 kernel: audit: type=1400 audit(1520861615.501:3062182): apparmor="DENIED" operation="ptrace" profile="docker-default" pid=26854 comm="node" requested_mask="trace" denied_mask="trace" peer="docker-default"
2018-03-12 13:33:38 | proxmox01 | proxmox01 kernel: audit: type=1400 audit(1520861618.081:3062183): apparmor="DENIED" operation="ptrace" profile="docker-default" pid=26854 comm="node" requested_mask="trace" denied_mask="trace" peer="docker-default"----
这只是一个小样本的内容,我们可以发现一些事情:
[*]针对我的面向公共服务器的登录攻击
[*]WiFi接入点客户端连接/断开
[*]Linux安全模块动作日志
现在我们将看到如何在这个案例中提取某些感兴趣的日志,即登录攻击。
要过滤掉我们的数据系统日志(包括:已经存在的,达到的,以后会到达的),我们只使用SQL语句:
ksql> SELECT TIMESTAMPTOSTRING(DATE, 'yyyy-MM-dd HH:mm:ss') AS SYSLOG_TS, HOST, MESSAGE \
FROM SYSLOG \
WHERE HOST='rpi-03' AND MESSAGE LIKE '%Invalid user%'\
LIMIT 5;
2018-03-04 15:14:24 | rpi-03 | rpi-03 sshd: Invalid user mini from 114.130.4.16
2018-03-04 15:21:49 | rpi-03 | rpi-03 sshd: Invalid user admin from 103.99.0.209
2018-03-04 15:21:58 | rpi-03 | rpi-03 sshd: Invalid user support from 103.99.0.209
2018-03-04 15:22:06 | rpi-03 | rpi-03 sshd: Invalid user user from 103.99.0.209
2018-03-04 15:22:23 | rpi-03 | rpi-03 sshd: Invalid user 1234 from 103.99.0.209
LIMIT reached for the partition.
Query terminated
(注意:我这里使用\换行来延续命令,如果sql语句都待在一行也是可以的)
这对于能够快速查询和检查日志非常有用。 但是让我们看看更有用的东西! 我们可以保留这些数据,不止过去的日志过滤,收到的最新的日志也是如此, 为此,只需将CREATE STREAM foo AS(通常称为CSAS)添加到查询的前面:
ksql> CREATE STREAM SYSLOG_INVALID_USERS AS \
SELECT * \
FROM SYSLOG \
WHERE HOST='rpi-03' AND MESSAGE LIKE '%Invalid user%';
Message
----------------------------
Stream created and running
----------------------------
我们创建了一个派生(derived )流,可以像在KSQL中的任何其他对象一样查询:
ksql> SELECT * FROMSYSLOG_INVALID_USERS LIMIT 1;
1520176464386 | //192.168.10.105:38254 | 1520176464000 | 4 | rpi-03 | 6 | rpi-03 sshd: Invalid user mini from 114.130.4.16 | UTF-8 | /192.168.10.105:38254 | rpi-03.moffatt.me
LIMIT reached for the partition.
Query terminated
在原理上,KSK实际上创建了一个主题,并使用我们针对源主题定义的任何消息实时填充此主题。 我们可以看到新主题及其中的消息 - 它只是一个Kafka主题:
$ kafka-topics --zookeeper localhost:2181 --list|grep SYSLOG
SYSLOG_INVALID_USERS
$
$ kafka-avro-console-consumer \
--bootstrap-server proxmox01.moffatt.me:9092 \
--property schema.registry.url=http://proxmox01.moffatt.me:8081 \
--topic SYSLOG_INVALID_USERS --max-messages=1 --from-beginning|jq '.'
{
"DATE": {
"long": 1520176464000
},
"FACILITY": {
"int": 4
},
"HOST": {
"string": "rpi-03"
},
"LEVEL": {
"int": 6
},
"MESSAGE": {
"string": "rpi-03 sshd: Invalid user mini from 114.130.4.16"
},
"CHARSET": {
"string": "UTF-8"
},
"REMOTE_ADDRESS": {
"string": "/192.168.10.105:38254"
},
"HOSTNAME": {
"string": "rpi-03.moffatt.me"
}
}
Processed a total of 1 messages
要跟踪新流,列等的吞吐量,使用DESCRIBE EXTENDED命令:
ksql> DESCRIBE EXTENDED SYSLOG_INVALID_USER_LOGIN;
Type : STREAM
Key field :
Timestamp field : Not set - using <ROWTIME>
Key format : STRING
Value format : AVRO
Kafka output topic : SYSLOG_INVALID_USER_LOGIN (partitions: 4, replication: 1)
Field | Type
--------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING)(system)
DATE | BIGINT
FACILITY | INTEGER
HOST | VARCHAR(STRING)
LEVEL | INTEGER
MESSAGE | VARCHAR(STRING)
CHARSET | VARCHAR(STRING)
REMOTE_ADDRESS | VARCHAR(STRING)
HOSTNAME | VARCHAR(STRING)
--------------------------------------------
Queries that write into this STREAM
-----------------------------------
id:CSAS_SYSLOG_INVALID_USER_LOGIN - CREATE STREAM SYSLOG_INVALID_USER_LOGIN AS SELECT * FROM SYSLOG WHERE HOST='rpi-03' AND MESSAGE LIKE '%Invalid user%';
For query topology and execution plan please run: EXPLAIN <QueryId>
Local runtime statistics
------------------------
messages-per-sec: 13.46 total-messages: 1335 last-message: 3/12/18 1:59:35 PM GMT
failed-messages: 0 failed-messages-per-sec: 0 last-failed: n/a
(Statistics of the local KSQL server interaction with the Kafka topic SYSLOG_INVALID_USER_LOGIN)
ksql>
总结
KSQL使用SQL的简单声明性语言为任何人提供编写流处理应用程序的能力。 在本文中,我们可以看到我们如何检查数据syslog的入站流,并轻松创建写入第二个Kafka主题的过滤消息的实时流。
在下一篇文章中,我们将简要介绍一下KSQL本身,并看看使用Python编写一个非常简单的推送通知系统。 我们将看到一些简单的异常检测,它基于KSQL的有状态聚合功能。
########################
关注公众号,查看about云经典文章
http://www.aboutyun.com/data/attachment/forum/201406/15/084659qcxzzg8n59b6zejp.jpg
还有更多资源
关注公众号,获取人工智能20套,区块链资源5阶段
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23842
页:
[1]