當前位置:網站首頁>Streams:深入剖析Redis5.0全新數據結構

Streams:深入剖析Redis5.0全新數據結構

2022-01-28 06:54:49 程序員社區

Streams:深入剖析Redis5.0全新數據結構
 
原創: 阿飛的博客
 
Redis 5.0 全新的數據類型:streams,官方把它定義為:以更抽象的方式建模日志的數據結構。Redis的streams主要是一個append only的數據結構,至少在概念上它是一種在內存中錶示的抽象數據類型,只不過它們實現了更强大的操作,以克服日志文件本身的限制。
 
如果你了解MQ,那麼可以把streams當做MQ。如果你還了解kafka,那麼甚至可以把streams當做kafka。
 
另外,這個功能有點類似於redis以前的Pub/Sub,但是也有基本的不同:
  • streams支持多個客戶端(消費者)等待數據(Linux環境開多個窗口執行XREAD即可模擬),並且每個客戶端得到的是完全相同的數據。
  • Pub/Sub是發送忘記的方式,並且不存儲任何數據;而streams模式下,所有消息被無限期追加在streams中,除非用於顯示執行删除(XDEL)。
  • streams的Consumer Groups也是Pub/Sub無法實現的控制方式。
streams數據結構
 
streams數據結構本身非常簡單,但是streams依然是Redis到目前為止最複雜的類型,其原因是實現的一些額外的功能:一系列的阻塞操作允許消費者等待生產者加入到streams的新數據。另外還有一個稱為Consumer Groups的概念,這個概念最先由kafka提出,Redis有一個類似實現,和kafka的Consumer Groups的目的是一樣的:允許一組客戶端協調消費相同的信息流!
 
redis源碼中定義streams結構的源碼如下,由源碼可知,stream的核心數據結構是radix tree:
 
typedef struct stream {
 
    rax *rax;               /* The radix tree holding the stream. */
 
    uint64_t length;        /* Number of elements inside this stream. */
 
    streamID last_id;       /* Zero if there are yet no items. */
 
    rax *cgroups;           /* Consumer groups dictionary: name -> streamCG */
 
} stream;
 
源碼參考:https://github.com/antirez/redis/blob/5.0.0/src/stream.h;
 
至於redis對radix tree的實現,參考源碼:https://github.com/antirez/redis/blob/5.0.0/src/rax.c 和 https://github.com/antirez/redis/blob/5.0.0/src/rax.h 。網上也有很多radix tree的文章,本篇文章就不做過多的介紹了。下面給出一張從官方源碼中的部分截圖:
Streams:深入剖析Redis5.0全新數據結構插圖
radix tree
 
streams基礎
 
為了理解streams的目的,以及如何使用它,我們先忽略掉所有高級特性,只把注意力放在數據結構本身,以及那些操作和訪問streams的命令。這基本上也是大多數其他Redis數據類型共有的部分,例如Lists,Sets,Sorted Sets等。然而需要注意的是,Lists也有一個更複雜的阻塞式的API,例如BLPOP,BRPOP等。streams這方便的API也沒什麼不同,只是更複雜,更强大(更牛逼,哈)!
 
streams命令
 
廢話不多說,先上手玩玩這個全新的數據類型。streams這個數據類型對應有如下13個操作命令,所有命令都以"X"開頭:
 
XADD
 
用法:XADD key ID field string [field string …]
 
正如其名,這個命令就是用來添加的,給streams追加(append,前面提到過:streams主要是一個append only的數據結構)一個新的entry(和Java裏的Map類似,Redis裏的streams中的數據也稱為entry)。
 
key:的含義就是同一類型streams的名稱;
 
ID: streams中entry的唯一標識符,如果執行XADD命令時,傳入星號(*),那麼,ID會自動生成,且自動生成的ID會在執行XADD後返回,默認生成的ID格式為millisecondsTime+sequenceNumber,即當前毫秒級別的時間戳加上一個自增序號值,例如"1540013735401-0"。並且執行XADD時,不接受少於或等於上一次執行XADD的ID,否則會報錯:ERR The ID specified in XADD is equal or smaller than the target stream top item;
 
field&string:接下來就是若幹組field string。可以把它理解為錶示屬性的json中的key-value。例如,某一streams的key命名為userInfo,且某個用戶信息為{"username":"afei", "password":"123456"},那麼執行XADD命令如下:
 
127.0.0.1:6379> xadd userInfo * username afei password 123456
 
"1540014082060-0"
 
由於命令中ID字段的值是星號,所以自定生成ID,1540014082060-0就是自動生成的ID。 XADD命令也支持顯示指定ID,例如:XADD streamname 0-2 foo bar。
  • 時鐘回撥
需要注意的是,ID的時間戳部分是部署Redis服務器的本地時間,如果發生時鐘回撥會怎麼樣?如果發生始終回撥,生成的ID的時間戳部分就是回撥後的時間,然後加上這個時間的遞增序列號。例如當前時間戳1540014082060,然後這時候發生了時鐘回撥,且回撥5ms,那麼時間戳就是1540014082055。假設以前已經生成了1540014082055-0,1540014082055-1,那麼這次由於時鐘回撥,生成的ID就是1540014082055-2。所以允許自動生成的ID在發生時鐘回撥時少於上次的ID,但是不允許顯示指定一個少於上次的ID。
 
XDEL
 
用法:XDEL key ID [ID …]
 
和XADD相反,這是命令用來從streams中删除若幹個entry,並且會返回實際删除數,這個删除數可能和參數ID個數不等,因為某些ID錶示的消息可能不存在。執行命令如下,第二個參數ID是不存在的,所以XDEL的返回結果是1:
 
127.0.0.1:6379> XDEL userInfo "1540014379642-0" "1540014379642-1"
 
(integer) 1
 
XLEN
 
用法:XLEN key
 
很好理解,這個命令就是用來返回streams中有多少個entry。執行如下:
 
127.0.0.1:6379> XLEN userInfo
 
(integer) 2
 
streams三種查詢模式
 
redis提供了三種查詢streams數據的模式:
  1. 範圍查詢:因為streams的每個entry,其默認生成的ID是基於時間且遞增的;
  2. 監聽模式:類比linux中的tailf命令,實時接收新增加到streams中的entry(也有點像一個消息系統,事實上筆者認為它就是借鑒了kafka);
  3. 消費者組:即Consumer Groups,特殊的監聽模式。從一個消費者的角度來看streams,一個streams能被分區到多個處理消息的消費者,對於任意一條消息,同一個消費者組中只有一個消費者可以處理(和kafka的消費者組完全一樣)。這樣還能够橫向擴容消費者,從而提昇處理消息的能力,而不需要只讓把讓一個消費者處理所有消息。
接下裏分別介紹這三種模式。
 
XRANGE
 
用法:XRANGE key start end [COUNT count]
 
這個命令屬於第1種模式,即基於範圍查詢。這個命令用來返回streams某個順序範圍下的元素,start參數是更小的ID,end參數是更大的ID。有兩個特殊的ID用符號"-"和"+"錶示,符號"-"錶示最小的ID,符號"+"錶示最大的ID:
 
127.0.0.1:6379> XRANGE userInfo "1540014096298-0" "1540014477236-0"
 
1) 1) "1540014096298-0"
 
   2) 1) "username"
 
      2) "root"
 
      3) "password"
 
      4) "666666"
 
2) 1) "1540014477236-0"
 
   2) 1) "username"
 
      2) "test"
 
      3) "password"
 
      4) "111111"
 
127.0.0.1:6379> 
 
127.0.0.1:6379> XRANGE userInfo - +
 
1) 1) "1540014082060-0"
 
   2) 1) "username"
 
      2) "afei"
 
      3) "password"
 
      4) "123456"
 
2) 1) "1540014096298-0"
 
   2) 1) "username"
 
      2) "root"
 
      3) "password"
 
      4) "666666"
 
3) 1) "1540014477236-0"
 
   2) 1) "username"
 
      2) "test"
 
      3) "password"
 
      4) "111111"
 
4) 1) "1540014493402-0"
 
   2) 1) "username"
 
      2) "u1"
 
      3) "password"
 
      4) "111111"
 
XRANGE還能實現遍曆某個範圍區間的功能,例如我想遍曆2018-10-20號新增的用戶信息。首先得到2018-10-20 00:00:00對應的時間戳為1539964800000,再得到2018-10-20 23:59:59對應的時間戳為1540051199000,然後執行如下命令:
 
127.0.0.1:6379> XRANGE userInfo 1539964800000-0  1540051199000-0 COUNT 5
 
1) 1) "1540014082060-0"
 
   2) 1) "username"
 
      2) "afei"
 
      3) "password"
 
      4) "123456"
 
... ...
 
5) 1) "1540014496505-0"
 
   2) 1) "username"
 
      2) "u2"
 
      3) "password"
 
      4) "111111"
 
127.0.0.1:6379> 
 
# 需要注意的是,接下來再遍曆的start參數是上一次遍曆結果最大的ID加1,即"1540014496505-0"加1就是"1540014496505-1"。
 
127.0.0.1:6379> XRANGE userInfo 1540014496505-1  1540051199000-0 COUNT 5
 
1) 1) "1540014499863-0"
 
   2) 1) "username"
 
      2) "u3"
 
      3) "password"
 
      4) "111111"
 
XREVRANGE
 
用法:XREVRANGE key end start [COUNT count]
 
這個命令也屬於第1種模式,且和XRANGE相反,返回一個逆序範圍。end參數是更大的ID,start參數是更小的ID。執行示例如下:
 
XREVRANGE userInfo "1540014477236-0" "1540014096298-0"
 
XREAD
 
用法:XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]
 
很明顯,這個命令就是用來實現第2個模式,即監聽模式。其作用是返回streams中從來沒有讀取的,且比參數ID更大的元素。
 
這個命令的使用方式如下:
 
127.0.0.1:6379> XREAD COUNT 10 BLOCK 60000 STREAMS userInfo "1540041139268-0"
 
1) 1) "userInfo"
 
   2) 1) 1) "1540041264182-0"
 
         2) 1) "u2"
 
            2) "p2"
 
(9.26s)
 
# "1540041264182-0"這條消息時通過XADD添加的然後被XREAD監聽到的消息。
 
127.0.0.1:6379> XREAD COUNT 2 STREAMS userInfo 0
 
1) 1) "userInfo"
 
   2) 1) 1) "1540014082060-0"
 
         2) 1) "username"
 
            2) "afei"
 
            3) "password"
 
            4) "123456"
 
      2) 1) "1540014096298-0"
 
         2) 1) "username"
 
            2) "root"
 
            3) "password"
 
            4) "666666"
 
# 這條命令實現類似XRANGE的功能。
 
127.0.0.1:6379> XREAD BLOCK 0 STREAMS userInfo $
 
1) 1) "userInfo"
 
   2) 1) 1) "1540042613437-0"
 
         2) 1) "u7"
 
            2) "p7"
 
# 說明BLOCK為0錶示一致等待知道有新的數據,否則永遠不會超時。並且ID的值我們用特殊字符`$`錶示,這個特殊字符錶示我們只獲取最新添加的消息。
 
此外,XREAD還支持同時監聽多個streams,用法如下所示:
 
127.0.0.1:6379> XREAD BLOCK 0 STREAMS userInfo_01 userInfo_02 userInfo_03 userInfo_04  $ $ $ $
 
1) 1) "userInfo_03"
 
   2) 1) 1) "1540043348287-0"
 
         2) 1) "u1"
 
            2) "p1"
 
(3.49s)
 
# 監聽userInfo_01~userInfo_04這4個streams的新的消息。
 
XREAD除了COUNT和BLOCK,沒有其他選項了。所有XREAD是一個非常基本的命令。更多高級特性可以往下看接下來要介紹的XREADGROUP。
 
XREADGROUP
 
用法:XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]
 
很明顯,這就是第三種模式:消費者組模式。
 
如果你了解kafka的消費者組,那麼你就也了解了streams的消費者組。如果不了解也沒關系,筆者簡單解釋一下,假設有三個消費者C1,C2,C3。在streams中總計有7條消息:1, 2, 3, 4, 5, 6, 7,那麼消費關系如下所示:
 
1 -> C1
 
2 -> C2
 
3 -> C3
 
4 -> C1
 
5 -> C2
 
6 -> C3
 
7 -> C1
 
消費者組具備如下幾個特點:
  1. 同一個消息不會被投遞到一個消費者組下的多個消費者,只可能是一個消費者。
  2. 同一個消費者組下,每個消費者都是唯一的,通過大小寫敏感的名字區分。
  3. 消費者組中的消費者請求的消息,一定是新的,從來沒有投遞過的消息。
  4. 消費一個消息後,需要用命令(XACK)確認,意思是說:這條消息已經給成功處理。正因為如此,當訪問streams的曆史消息時,每個消費者只能看到投遞給它自己的消息。
消費者組抽象的想象成如下這個樣子:
 
+----------------------------------------+
 
| consumer_group_name: afeigroup         |
 
| consumer_group_stream: somekey         |
 
| last_delivered_id: 1292309234234-92    |
 
|                                        |
 
| consumers:                             |
 
|    "consumer-1" with pending messages  |
 
|       1292309234234-4                  |
 
|       1292309234232-8                  |
 
|    "consumer-42" with pending messages |
 
|       ... (and so forth)               |
 
+----------------------------------------+
 
XACK
 
用法:XACK key group ID [ID …]
 
這是消費者組相關的另一個重要的命令。標記一個處理中的消息為已被正確處理,如此一來,這條消息就會被從消費者組的pending消息集合中删除,類似MQ中的ack。
 
XGROUP
 
用法:XGROUP [CREATE key groupname id-or-$] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
 
這也是消費者組的一個重要命令,這個命令用來管理消費者組,例如創建,删除等。
 
XREADGROUP,XACK,XGROUP三種命令構成了消費者組相關的操作命令,下面是消費者組一些操作示例:
 
# 創建一個消費者組
 
127.0.0.1:6379> XGROUP CREATE userInfo GRP-AFEI $
 
OK
 
# 需要注意的是,目前XGROUP CREATE的streams必須是一個存在的streams,否則會報錯:
 
127.0.0.1:6379> XGROUP CREATE userinfo GRP-AFEI $
 
(error) ERR The XGROUP subcommand requires the key to exist. Note that for CREATE you may want to use the MKSTREAM option to create an empty stream automatically.
 
# 名為zhangsan的消費者,需要注意的是streams名稱userInfo後面的特殊符號`>`錶示這個消費者只接收從來沒有被投遞給其他消費者的消息,即新的消息。當然我們也可以指定具體的ID,例如指定0錶示訪問所有投遞給該消費者的曆史消息,指定1540081890919-1錶示投遞給該消費者且大於這個ID的曆史消息:
 
127.0.0.1:6379> XREADGROUP GROUP mygroup zhangsan COUNT 1 BLOCK 0 STREAMS userInfo >
 
# 名為lisi的消費者:
 
127.0.0.1:6379> XREADGROUP GROUP mygroup lisi COUNT 1 BLOCK 0 STREAMS userInfo >
 
# 接下來分別添加兩條信息,一條就會被zhangsan消費,另一條被lisi消費:
 
127.0.0.1:6379> XADD userInfo * username u102102 password p102102
 
"1540081873370-0"
 
127.0.0.1:6379> XADD userInfo * username u102103 password p102103
 
"1540081890919-0"
 
#現在消費者lisi有一條消息:
 
127.0.0.1:6379> XREADGROUP GROUP mygroup lisi COUNT 5 BLOCK 0 STREAMS userInfo 0
 
1) 1) "userInfo"
 
   2) 1) 1) "1540081890919-0"
 
         2) 1) "username"
 
            2) "u102103"
 
            3) "password"
 
            4) "p102103"
 
#然後通過命令ack這條消息:
 
127.0.0.1:6379> XACK userInfo mygroup 1540081890919-0
 
(integer) 1
 
# 再看消費者lisi的pending隊列,已經為空:
 
127.0.0.1:6379> XREADGROUP GROUP mygroup lisi COUNT 5 BLOCK 0 STREAMS userInfo 0
 
1) 1) "userInfo"
 
   2) (empty list or set)
 
XPENDING
 
用法:XPENDING key group [start end count] [consumer]
 
返回streams中消費者組的pending消息,即消費者接收到但是還沒有ack的消息,用法參考:
 
# 查看消費者組下總計最多10條pending消息
 
127.0.0.1:6379> XPENDING userInfo mygroup - + 10
 
1) 1) "1540083260408-0"
 
   2) "zhangsan"
 
   3) (integer) 183551
 
   4) (integer) 1
 
2) 1) "1540083266293-0"
 
   2) "lisi"
 
   3) (integer) 177666
 
   4) (integer) 1
 
# 查看消費者組下zhangsan這個消費者總計最多10條pending消息
 
127.0.0.1:6379> XPENDING userInfo mygroup - + 10 zhangsan
 
1) 1) "1540083260408-0"
 
   2) "zhangsan"
 
   3) (integer) 187006
 
   4) (integer) 1
 
XCLAIM
 
用法:XCLAIM key group consumer min-idle-time ID [ID …] [IDLE ms] [TIME ms-unix-time] [RETRYCOUNT count] [FORCE] [JUSTID]
 
作用是改變消費者組中消息的所有權,用法參考:
 
127.0.0.1:6379> XREADGROUP GROUP mygroup zhangsan COUNT 5 BLOCK 0 STREAMS userInfo 0
 
1) 1) "userInfo"
 
   2) 1) 1) "1540083260408-0"
 
         2) 1) "username"
 
            2) "u102106"
 
            3) "password"
 
            4) "p102106"
 
# zhangsan本來有1條消息,現在將另一條本來屬於lisi的消息的所有權轉給它:
 
127.0.0.1:6379> XCLAIM userInfo mygroup zhangsan 360 1540083266293-0
 
1) 1) "1540083266293-0"
 
   2) 1) "username"
 
      2) "u102107"
 
      3) "password"
 
      4) "p102107"
 
# 現在zhangsan有兩條消息了
 
127.0.0.1:6379> XREADGROUP GROUP mygroup zhangsan COUNT 5 BLOCK 0 STREAMS userInfo 0
 
1) 1) "userInfo"
 
   2) 1) 1) "1540083260408-0"
 
         2) 1) "username"
 
            2) "u102106"
 
            3) "password"
 
            4) "p102106"
 
      2) 1) "1540083266293-0"
 
         2) 1) "username"
 
            2) "u102107"
 
            3) "password"
 
            4) "p102107"
 
XINFO
 
用法:XINFO [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]
 
其作用是得到streams和消費者組的一些信息,使用參考:
 
127.0.0.1:6379> XINFO CONSUMERS userInfo mygroup 
 
1) 1) "name"
 
   2) "lisi"
 
   3) "pending"
 
   4) (integer) 0
 
   5) "idle"
 
   6) (integer) 201086
 
2) 1) "name"
 
   2) "zhangsan"
 
   3) "pending"
 
   4) (integer) 2
 
   5) "idle"
 
   6) (integer) 701954
 
127.0.0.1:6379> XINFO STREAM userInfo
 
 1) "length"
 
 2) (integer) 22
 
 3) "radix-tree-keys"
 
 4) (integer) 1
 
 5) "radix-tree-nodes"
 
 6) (integer) 2
 
 7) "groups"
 
 8) (integer) 2
 
 9) "last-generated-id"
 
10) "1540082298051-0"
 
11) "first-entry"
 
12) 1) "1540014082060-0"
 
    2) 1) "username"
 
       2) "afei"
 
       3) "password"
 
       4) "123456"
 
13) "last-entry"
 
14) 1) "1540082298051-0"
 
    2) 1) "username"
 
       2) "u102105"
 
       3) "password"
 
       4) "p102105"
 
XTRIM
 
用法:XTRIM key MAXLEN [~] count
 
修剪streams到一個確定的size。Trims the stream to (approximately if '~' is passed) a certain size,用法參考:
 
# streams只保留10條消息,其返回結果錶示被剪去多少條消息:
 
127.0.0.1:6379> XTRIM userInfo MAXLEN 10
 
(integer) 14
 
說明:streams目前的修剪策略比較簡單,比如連根據ID範圍修剪都沒有實現。根據具體某一個ID删除,可以通過XDEL實現。
 
持久化,複制以及消息安全性
 
和其他數據類型一樣,streams也會异步複制到slave,並也會持久化到AOF和RDB文件中。然而,消費者組的全部狀態是被傳播(propagated )到AOF,RDB和slave中。
 
需要注意的是,Redis的streams和消費者組使用Redis默認複制進行持久化和複制,因此:如果消息的持久性在您的應用程序中很重要,則必須將AOF與强fsync策略一起使用。
 
默認情况下,异步複制不保證能複制每一個數據添加或使用者組狀態更改:在故障轉移之後,可能會丟失某些內容,具體取决於slave從master接收數據的能力。
  • 長度為0的streams
這是streams和其他redis數據類型的不同,其他數據類型,例如Lists,Sets等,如果所有元素都被删除,那麼key也不存在。而streams允許所有entry都被删除。
 
存在這種不對稱性的原因是因為streams可能具有關聯的消費者組,並且我們不希望由於streams中不再有任何entry而丟失消費者組定義的狀態。 目前,即使沒有關聯的消費者群體,也不會删除該streams。
 
 
Streams:深入剖析Redis5.0全新數據結構插圖1
 

版權聲明
本文為[程序員社區]所創,轉載請帶上原文鏈接,感謝
https://cht.chowdera.com/2022/01/202201280654492840.html

隨機推薦