我们现在,简单地,redis

了解更多

redis Streams和Java入门

了解如何使用Redis Streams Producer和Consumer Groups与Lettuce Java客户端



回到博客

作为一名新的企业技术客户经理复述,,我的第一个任务之一就是了解有关Redis的更多信息。所以我开始挖掘,快速发现复述,流.作为基于流式的应用程序的忠实粉丝,我很高兴地分享我了解到如何使用Redis Streams和Java的了解。

什么是Redis Streams?

Redis Streams是一种代表日志的Redis数据类型,所以你可以添加新的信息和消息在一个只追加模式(注意:这不是100%准确的,因为您可以从日志中删除消息,但它已经足够了。)Redis Streams允许您构建“Kafka类似”的应用程序,可以:

  • 创建发布和消耗消息的应用程序。在这里没有任何非凡的内容,您可以使用Redis Pub / sub这样做。
  • 使用即使客户端应用程序(使用者)未运行时也发布的消息。这与Redis的Pub/Sub有很大的不同。
  • 使用从特定点开始的消息。例如,阅读整个历史记录或只阅读新消息。

此外,Redis Streams有概念消费者团体.Redis流群消费者群体,如类似的概念Apache Kafka.,允许客户端应用程序以分布式方式(多个客户端)使用消息,这使得扩展和创建高可用性系统变得容易。

所以,虽然比较Redis Streams和Redis Pub/Sub可能很诱人,并决定其中一个比另一个更好,但这两个功能的目的是不同的。如果你正在评估Pub/Sub和Redis Streams,但还不能马上弄清楚,你可能需要考虑更多需要解决的问题,或者重新阅读这两方面的文档。

(参加Redis大学:Redis Streams课程以了解更多。)

Java和Redis流

学习如何使用Redis Streams和Java的最佳方法是构建示例应用程序。这Redis-Streams-101-Java GitHub存储库包含示例代码,演示如何将消息发送到Stream和c使用消费者组使用消息。首先,你需要Redis 5.x,Java 8或更高版本,apache maven 3.5.x和git。

Redis有许多由社区开发的Java客户,正如您所看到的那样redis.io..我目前喜欢与Redis Streams一起使用的是生菜,所以我在这个示例应用程序中使用。让我们看看创建示例项目所涉及的步骤:

步骤1:将生菜添加到Maven项目中

将下面的依赖项添加到你的项目文件中:

步骤2:连接到Redis

导入以下类:

然后连接:

当你的应用程序完成连接时,使用以下代码断开连接:

步骤3:向Redis Streams发送消息

一旦你有了连接,你就可以发送信息了。在这个例子中,我让Redis生成消息ID,这是基于时间的,并使用代表物联网天气数据的地图构建身体,实时捕捉风速和方向:

这是代码中发生的事情:

  • 线路3-5连接到Redis
  • 第7-10行使用映射创建消息体,因为Redis Streams消息在Java中是字符串键/值。
  • 第12-14行致电synccommands.xadd()方法使用流键" weather_sensor:wind "和消息体本身。此方法返回消息ID。
  • 第16行打印消息ID和内容。
  • 第18-19行关闭连接和客户端。

(完整的生产商代码可用在这里.)

步骤4:消费消息

Redis Streams提供了几种使用命令消耗和读取消息的方法:Xrange.XREVRANGEXREAD.XREADGROUP..为了关注如何使用Apache Kafka构建应用程序,让我们使用XREADGROUP.来自生菜的命令。

消费者组允许开发人员创建一组客户端,该客户端将协作,ag万博下载万博最新版本下载苹果以便从流中获取消息(用于比例和高可用性)。它也是将客户端关联到特定应用程序角色的方法;例如:

  • 一个名为“数据仓库”的消费者组将消耗消息并将其发送到数据仓库
  • 另一个名为“聚合器”的消费者组将消耗消息并聚合数据并将聚合结果发送到另一个宿区(另一个流或存储)

这些消费者组中的每一个都将独立操作,并且每一个消费者组都可以有多个“消费者”(客户端)。

下面是它在Java中的工作方式:

此代码是一个子集main ()方法。我删除了连接管理部分,使其更具可读性。让我们看一下代码。

  • 第3行至10,使用该方法xgroupCreate (),与之匹配XGroup创建命令:
    • 创建一个名为application_1.
    • 消耗来自流的消息weather_sensor:风
    • 消费者组从流中的第一条消息开始读取使用消息ID0 - 0.(注意:您还可以向组表示,以便在特定消息ID中开始读取,或仅使用新消息$特殊ID(或helper方法)XReadArgs.StreamOffset.latest ()
  • 第15到30行使用一个无限循环(而(真))等待发布到流的任何新邮件。
  • 第17行到第20行使用该方法XREADGROUP()根据组配置返回消息。
    • 第18行定义了命名的消费者消费者_1.与该组相关联application_1..您可以创建一个新组,将读取数据分发给多个客户端。
    • 第19行表示在哪里开始,在这种情况下,StreamOffset.LastConsumed(“Weather_Sensor:Wind”).T.消费者将使用尚未被读取的消息。使用组的当前配置(偏移0 - 0),当消费者第一次启动时,它将读取所有现有消息。
  • 第22至28行,应用程序迭代每条消息,以及:
    • 第24行,处理消息,在本例中是一个简单的打印
    • 行26日发送承认使用xack ()命令。你必须使用ACK.命令,以确认已读取和处理消息。这XACK命令从消费者组的挂起列表中删除邮件。

完整的消费代码可用在这里

构建并运行简单的Java应用程序

现在您对代码有更好的理解,让我们运行生产者和消费者。您可以从IDE运行此功能或使用Maven,但这是它在Maven CLI中的工作原理。首先开设两个终端,一个生成消息和一个要使用它们的消息,然后按照以下步骤操作:

第1步:克隆并构建项目:

第2步:发布新消息:

步骤3:使用消息

打开一个新的终端并运行以下命令:

使用者将启动并使用您刚刚发布的消息,并等待任何新消息。

步骤4:在第一个终端中,发布100条新消息:

消费者将接收并打印所有消息。

第5步:杀死消费者并发布更多信息

让我们做另一个测试:停止消费者使用简单ctrl + c和t母鸡发了五条新消息:

邮件尚未由任何应用程序消耗,但仍然存储在Redis Stream中。因此,当您启动消费者时,它会消耗这些新消息:

这是其中的差异之一复述,流redis pub / sub.生产者应用程序在消费者应用程序未运行时发布了许多消息。因为消费者是与StreamOffset.LastConsumed(),当消费者开始时,它查找最后一个消费的ID,并开始从那里读取流。此方法使用组生成一个XGROUPREAD命令。

结论

这个小型项目旨在向您展示如何使用Lettuce是Redis的Java客户端,将消息发布到流,创建消费者组,并使用消费者组使用消息。

这是一个非常基本的例子,在接下来的文章中,我计划深入探讨如何与多个消费者一起工作,以及如何配置消费者组和消费者来控制您想要阅读的消息。

Baidu