简单地说,我们现在是Redis

了解更多

消息传递

什么是消息代理?

现代软件应用程万博电竞客服序已经从单个单片单元转变为松散耦合的服务集合。虽然这种新体系结构带来了许多好处,但这些服务仍然需要相互交互,从而产生了对健壮和高效消息传递解决方案的需求。

复述,流作为构建流架构的通信通道和持久化数据的类似日志的数据结构,Streams成为事件源的完美解决方案。

复述,Pub / Sub是一种非常轻量级的消息传递协议,用于在系统中广播实时通知。当低延迟和巨大的吞吐量至关重要时,它是传播短时间消息的理想选择。

Redis列表和Redis排序集是实现消息队列的基础。它们既可以直接用于构建定制的解决方案,也可以通过框架使消息处理更适合您选择的编程语言。

构建消息代理解决方案中的挑战

1.服务之间的通信必须是可靠的

当一个服务想要与另一个服务通信时,它不能总是立即这样做。发生故障,独立部署可能会使服务在一段时间内不可用。对于规模较大的应用程序来说,这不是服务是否不可用或何时不可用的问题,而是服务不可用的频率。为了缓解这个问题,最佳实践是限制服务之间的同步通信数量(例如,直接调用服务的api,例如通过发送HTTP(S)请求),而在实际情况下选择持久通道,这样服务就可以方便地使用消息。这类异步通信的两个主要范例是事件流和消息队列。

消息队列

  1. 消息队列基于可变列表,有时通过帮助实现公共模式的工具使用。消息队列和事件流之间有两个主要区别:消息队列使用“推送”类型的通信——当有新消息需要关注时,服务将新消息推送到另一个服务的收件箱。流以相反的方式运行。
  2. 消息包含可变状态(例如,重试次数),当成功处理时,它们将从系统中删除。流事件是不可变的,而历史记录在经过修剪后通常保存在冷藏库中。

Redis列表和排序集这两种数据类型是否实现了这种类型的行为,并且都可以用于构建定制的解决方案,以及特定于生态系统的框架的后端,如芹菜(Python),(JavaScript),Sidekiq(Ruby),机械(走),还有许多其他的。

事件流

事件流基于日志数据类型,这在查找其历史记录和将新项追加到其末尾方面非常有效。这两个属性使得不可变日志既是一种很好的通信原语,也是一种存储数据的有效方法。

通过流进行通信与使用消息队列不同。如前所述,消息队列是“推”,而流是“拉”。在实践中,这意味着每个服务写入自己的流,而其他服务将选择性地观察(即“拉”)它。这使得一对多通信比消息队列更高效。

当一个服务需要另一个服务执行操作时,消息队列工作得最好。在这种情况下,第二个服务的消息队列充当“请求收件箱”。当一个服务需要发布一个事件(例如,多个服务感兴趣的消息)时,发布服务将需要将消息推送到与该事件相关的每个服务的队列中。在实践中,大多数工具(例如企业服务总线)都可以透明地做到这一点,但是为每个收件人生成和存储单独的消息副本仍然效率低下。

在一对多通信模式下,事件流的性能优于消息队列,方法是颠倒协议:只有原始事件的一个副本存在,任何想要访问它的服务都可以按照自己的节奏通过事件流(即发布服务的流)查找。事件流相对于消息队列还有另一个实际优势:您不需要预先指定事件订阅者。在消息队列中,系统需要知道向哪个队列交付事件的副本,因此如果稍后添加新服务,它将只接收新事件。有了事件流,这个问题就不存在了——一个新的服务甚至可以浏览整个事件历史,这对于添加新的分析和仍然能够追溯计算它们是非常好的。这意味着你不必马上想出你将来可能需要的每一个指标。你可以只跟踪你现在需要的,然后随着你的前进添加更多,因为你知道你仍然可以看到完整的历史,即使是在以后添加的。

2.存储必须有效利用空间

对于持久化消息的所有通信通道来说,空间效率是一个受欢迎的属性。然而,对于事件流来说,这是基本的,因为它们通常用于长期的信息存储。(我们在前面提到过,不可变日志在追加新条目和查找历史记录方面速度很快。)

复述,流是使用基数树作为底层数据结构的不可变日志的实现。每个流条目都由一个时间戳标识,可以包含任意一组字段-值对。同一流的条目可以有不同的字段,但Redis能够压缩一行中共享同一模式的多个事件。这意味着,如果您的事件有稳定的字段集,那么您就不必为每个字段名支付存储费用,这样就可以使用更长的、更具描述性的键名,而不会有任何缺点。

如上所述,可以对流进行修剪以删除较旧的条目,而删除的历史记录通常会以归档格式保存。Redis Streams的另一个特性是能够将任何流中的条目标记为“已删除”,以帮助遵守GDPR等法规。

缩放处理吞吐量

事件流和消息队列有助于应对通信突发。但直接API调用的另一个问题是,当流量峰值时,服务可能会不堪重负。异步通信通道可以充当缓冲区,这有助于平滑峰值,但处理吞吐量必须足够健壮,以维持正常的流量,否则系统将崩溃,缓冲区将需要无限增长。

在Redis Streams中,可以通过通过消费者组读取流来增加处理吞吐量。属于同一消费者组的阅读器以相互排斥的方式查看消息。当然,一个流可以有多个消费者组。在实践中,您可能希望为每个服务创建一个单独的消费者组,这样每个服务就可以根据需要启动多个读取器实例来增加并行性。

3.消息传递语义必须清晰

当进行异步通信时,考虑可能的失败场景是很重要的。例如,在处理消息时,服务实例可能会崩溃或失去连接。由于通信失败是不可避免的,消息传递系统将自己分为两类:至多一次“至少一次”交付。(一些消息传递系统声称只提供一次传递,但这并不是全部。在任何可靠的消息传递系统中,为了克服故障,消息偶尔需要传递不止一次。这是在不可靠网络上通信的一个不可避免的特点。)

要正确处理故障,参与系统的所有服务必须能够执行幂等消息处理。“等幂”表示在重复消息传递的情况下,系统的状态不会改变。幂等性通常通过应用任何必要的状态更改并保存原子处理的最后一条消息(例如,在事务中)来实现。这样,在失败的情况下,失败将永远不会处于不一致的状态,并且通过检查新消息标识符是否位于最后处理的消息之前,读者将能够判断给定消息是否已经被处理。

复述,流,作为一个可靠的流通信信道,是一个“至少一次”系统。当通过消费者组读取流时,Redis会记住哪个事件被分派给了哪个消费者。然后,消费者有责任正确地确认消息已被成功处理。当一个消费者死亡时,一个事件可能会被卡住。为了解决这个问题,使用者组提供了一种方法来检查挂起消息的状态,并在必要时将事件重新分配给另一个使用者。

我们在上面提到,事务(和原子操作)是实现等幂的主要方法。为了解决这个问题,Redis Transactions和Lua脚本允许使用全或全无事务语义组合多个命令。

复述,Pub / Sub是一个至多一次允许发布者向一个或多个频道广播消息的消息传递系统。更准确地说,Redis Pub/Sub是为低延迟最重要的实例之间的实时通信而设计的,因此不具有任何形式的持久性或确认。其结果是最精简的实时消息系统,完美的金融和游戏应用,在这些应用中每一毫秒都很重要。

为什么R万博体育彩edis Enterprise用于消息传递?

万博体育彩Redis Enterprise是基于无共享、对称的建筑这使得数据集的大小可以线性无缝地增长,而不需要更改应用程序代码。

万博体育彩Redis Enterprise提供了多种高可用性和地理分布模型,可以在需要时为用户提供本地延迟。

多个持久性选项(每次写入或每秒的AOF和快照)对性能没有影响,确保您不必在故障后重新构建数据库服务器。

支持非常大的数据集通过对内存(RAM、持久内存或Flash)的智能分层访问,可以确保您可以扩展数据集,以满足用户的需求,而不会显著影响性能。

如何使用一个Pub/Sub与Redis企业万博体育彩

Redis Streams和Pub/Sub有跨不同编程语言的稳定api,所以下面的Python示例可以很容易地翻译成你选择的语言。

连接到复述:

进口复述,
#连接到本地redis实例
r =复述。复述,(host='localhost', port=6379, db=0)

写入流:

event = {"eventType": "purchase", "amount": 5, "item_id": "XXX"}
r.xadd(“stream_key”、‘*’事件)
# ' * '表示redis自动生成和事件id

直接读取流:

Last_id = '$' # '$'只表示新消息
而真正的:
事件= r.xread({"stream_key": last_id}, block=0, count=10)
对于事件中的_,e:
Print (f"new event, amount: {e['amount']}")
last_id = e(“id”)

通过消费者组读取流:

#从读取任何潜在的挂起事件开始
#以前不知道的(例如:
#因为崩溃)。“0”表示挂起事件。
pending = r.xreadgroup("service-1", "consumer-A", {"stream_key": "0"})
pending_ids = []
For _, e in pending:
Print (f"old event found, amount: {e['amount']}")
pending_ids.append (e [' id '])
#标记挂起事件为已处理
r.xack(“stream_key”、“service-1”* pending_ids)

#现在我们处理了所有以前的事件,
#开始要求新的。“>”表示“仅新增事件”。
而真正的:
events = r.xreadgroup(" service-1 ", " consumer-A ", {" stream_key ": " > "}, count=10)
event_ids = []
对于事件中的_,e:
Print (f " new event, amount: {e[' amount ']} ")
event_ids.append (e [' id '])
r.xack(“stream_key”、“service-1”* event_ids)
#如果我们在r之前崩溃。xack”,在重新加载,
#我们将重试此消息批处理。

处理一些事件,以原子的方式确认和应用更改:

而真正的:
events = r.xreadgroup("service-1", "consumer-A", {"stream_key": ">"}, count=10)
event_ids = []

#启动一个redis事务
事务= r.multi ()
对于事件中的_,e:
事务。incrby (f”项目:{e[‘item_id}:总”,e[‘量’])
event_ids.append (e [' id '])
事务。xack(“stream_key”、“service-1”* event_ids)
transaction.exec ()
#如果我们在提交事务之前崩溃,则没有
#将发生其他操作,以确保一致性。

在发布/订阅发布:

发布一条消息到redis频道
r.publish(“复述”,“hello world”)

在Pub/Sub订阅频道:

子= r.pubsub ()
sub.subscribe(“复述”)
而真正的:
味精= sub.get_message ()
打印(f”新消息:{味精(“数据”)}”)

订阅发布/订阅模式:

子= r.pubsub ()
#这个订阅将返回消息
#从所有以' red '开头的渠道。
sub.psubscribe(“红色*”)
而真正的:
味精= sub.get_message ()
Print (f"new message in channel {msg['channel']}: {msg['data']}")


探索更多的


下一个步骤

Baidu