简单地说,我们现在是Redis

了解更多

快速数据摄取

什么是快速数据摄取?

收集、存储和处理大量高多样性、高速的数据带来了一些复杂的设计挑战,特别是在物联网(IoT)、电子商务、安全、通信、娱乐、金融和零售等领域。鉴于响应性、及时性和准确的数据驱动决策是这些业务的核心,实时数据收集和分析至关重要。

交付实时数据分析的重要的第一步是确保有足够的资源可用来有效地捕获快速数据流。虽然物理基础设施(包括高速网络、计算、存储和内存)在这里扮演着重要的角色,但软件堆栈必须与其物理层的性能相匹配,否则组织最终可能会出现大量的数据积压、丢失的数据,或者不完整的、误导性的数据。万博电竞客服

快速数据摄取的挑战和最佳实践

高速数据摄入通常涉及不同类型的复杂性:

  1. 大量数据以突发方式到达:突发数据要求能够以最小延迟处理大量数据的解决方案。理想情况下,它应该能够每秒执行数百万次写操作,延迟在毫秒以下,使用最小的资源。
  2. 数据来自多个来源/格式:数据摄取解决方案还必须足够灵活,以处理许多不同格式的数据,在需要时保留源标识,并实时进行转换或规范化。
  3. 需要过滤、分析或转发的数据:大多数数据摄取解决方案都有一个或多个使用数据的订阅者。这些通常是在相同或不同位置使用不同假设的不同应用程序。在这种情况下,数据库不仅必须转换数据,而且还必须根据使用应用程序的需求过滤或聚合数据。
  4. 在生产者和不同类型的消费者之间管理稳定的数据通道:如果数据到达模式不是连续的,那么生产者和消费者需要一个允许他们异步传输数据的通道。信道还必须具有抗连接丢失和硬件故障的能力。在许多用例中,生产者和消费者的操作速度并不相同。这可能导致数据积压,进一步延迟消费者对数据的操作。
  5. 来自地理分布来源的数据:在此场景中,底层体系结构将数据收集节点分布在源节点附近通常是很方便的。这样,节点本身就成为快速数据摄取解决方案的一部分,以收集、处理、转发或重路由摄取数据。

Redi万博体育彩s Enterprise如何使快速数据获取变得容易

服务器数量最少的高性能
谈到性能,Redis Enterprise一直是万博体育彩基准测试在AWS上只有40个节点的集群,以毫秒以下的延迟处理每秒超过2亿个读/写操作。这使得Redis En万博体育彩terprise成为市场上最节约资源的NoSQL数据库。

灵活的数据结构和模块,实时分析:Redis流,Pub/Sub,列表,排序集,RedisTimeSeries
Redis提供了多种数据结构,如Streams、list、set、Sorted set和hash,提供简单和通用的数据处理,以有效地结合高速数据摄取和实时分析。

Redis的Pub/Sub功能允许它在地理分布的数据摄取节点之间充当有效的消息代理。数据生成应用程序以所需的格式将流数据发布到通道,而消费应用程序订阅与它们相关的通道,在消息发布时异步接收消息。

列表和排序集可以用作连接生产者和消费者的数据通道。还可以使用这些数据结构异步传输数据。与发布/订阅不同,列表和排序集提供了持久性。

流可以做的更多,它提供了生产者和消费者之间持久的数据摄取通道。有了Streams,您可以使用消费者组来扩展消费者的数量。当消费者在消费和处理数据的过程中失败时,消费者组还实现类似事务的数据安全。

最后RedisTimeSeries提供了一个增强的快速数据摄取功能集,包括下采样、对最后摄取值的特殊计数器操作和双增量压缩,并结合了实时分析功能,如数据标签内置搜索、聚合、范围查询、以及与领先的监控和分析工具(如Grafana和Prometheus)的内置连接器。

active - active Geo-Distribution部署
万博体育彩复述,企业的CRDTs-based active - active技术支持跨地理位置的复杂数据摄取和消息传递操作,并支持以完全分布式的方式部署应用程序,从而显著提高可用性和应用程序响应时间。

扩展Redis DRAM与SSD和持久内存
万博体育彩复述,企业的复述在闪光技术使扩展DRAM SSD和持久的记忆,允许存储很大的tb数据集使用相同的基础设施成本的基于磁盘的数据库和数据库延迟,同时保持在毫秒级的水平,即使摄取物品超过1米/秒的复述,企业集群的每个节点上。万博体育彩

如何实现快速的数据摄取与Redis

下面是用Java编写的一些代码片段。他们都使用绝地武士的图书馆。首先,按照绝地武士的指示开始页面下载最新版本的绝地武士。

  1. 使用Redis Streams快速数据摄取
    1. 将消息发布到流数据结构。该程序使用XADD向流中添加新项。文件名称:StreamPublish.java。进口java.util.HashMap;
      进口java.util.Map;

      进口redis.clients.jedis.Jedis;
      public class StreamPublish {
      static Jedis Jedis = new Jedis(" localhost ", 6379);
      尝试{
      Map kv = new HashMap();
      千伏。把(“a”,“100”);//键-> a;值- > 100
      能。StreamEntryID xadd(“MyStream”。NEW_ENTRY kv);
      最后}{
      jedis.close ();


    2. 异步地使用流中的数据。如果流为空,则等待消息。这个程序使用XREAD命令。文件名称:StreamConsumeAsync.java。进口java.util.AbstractMap.SimpleEntry;
      进口java.util.HashMap;
      进口并不知道;
      进口java.util.Map;
      进口java.util.Map.Entry;

      进口redis.clients.jedis.Jedis;
      进口redis.clients.jedis.StreamEntry;
      public class Jedis Jedis = new Jedis(" localhost ", 6379);public static void main(String[] args) throws Exception{//从0开始。对于后续查询,从最后一个id + 1读取
      String lastStreamDataId = " 0-0 ";
      Int count = 1000;
      long waitTimeInMillis = 5000;try {
      //在循环中异步读取新数据
      而(真){
      List next = getNext(" MyStream ", lastStreamDataId,
      统计,waitTimeInMillis);
      If (next != null) {
      List stList = getStreamEntries(next);
      if(stList != null) {
      //在这里处理数据
      for (int j = 0;j < stList.size ();j + +) {
      StreamEntry streamData = (StreamEntry)stList.get(j);//读取数据流的字段(键值对)
      Map fields = streamData.getFields();//从最后一个id + 1读取后续数据
      .getTime lastStreamDataId = streamData.getID () ()
      +“-”
      + (streamData.getID () .getSequence () + 1);System.out.println (stList.get (j));
      System.out.println (lastStreamDataId);

      其他}{
      system . out。println("流中没有新数据");


      最后}}{
      jedis.close ();

      } //从流中读取下一组数据
      private static List getNext(String streamId, String lasttid, int count, long waitTimeInMillis) throws Exception{
      HashMap map = new HashMap();
      String readFrom = lasttid;
      地图。把(streamId新StreamEntryID (readFrom));
      列表列表=绝地武士。waitTimeInMillis xread(计数,
      (条目<字符串,StreamEntryID >)
      .toArray map.entrySet () () [0]);
      返回列表;
      } //读取流项
      //假设streamList只有一个流
      private static List getStreamEntries(List streamList) throws Exception{
      如果(streamList.size () > 0) {
      SimpleEntry stEntry = (SimpleEntry)streamList.get(0);
      返回列表(< StreamEntry >) stEntry.getValue ();
      }返回null;

    3. 使用XRANGE命令查询流。文件名称:StreamQuery.java进口并不知道;
      进口java.util.Map;

      进口redis.clients.jedis.Jedis;
      进口redis.clients.jedis.StreamEntry;
      public class Jedis Jedis = new Jedis(" localhost ", 6379);public static void main(String[] args) throws Exception{String streamID = " MyStream ";
      StreamEntryID start = new StreamEntryID(0,0);
      StreamEntryID end = null;// null ->直到流中的最后一项
      Int count = 2;try {
      stList stList = jedis. stList xrange(stream, start, end, count);
      if(stList != null) {
      //在这里处理数据
      for (int j = 0;j < stList.size ();j + +) {
      StreamEntry streamData = (StreamEntry)stList.get(j);System.out.println (streamData);//读取数据流的字段(键值对)
      Map fields = streamData.getFields();//从最后一个id + 1读取后续数据
      StreamEntryID nextStart =
      .getTime新StreamEntryID (streamData.getID () (),
      (streamData.getID () .getSequence () + 1));

      其他}{
      system . out。println("流中没有新数据");

      最后}{
      jedis.close ();


  2. 使用Pub/Sub快速数据摄取
    1. 发布到通道。文件名称:PubSubPublish.java进口redis.clients.jedis.Jedis;

      公共类PubSubPublish {
      static Jedis Jedis = new Jedis(" localhost ", 6379);
      public static void main(String[] args) throws Exception {try {
      String channel = " MyChannel ";
      String message = " Hello there! ";
      能。发布(通道、消息);
      最后}{
      jedis.close ();


    2. 订阅一个频道。文件名称:PubSubPublish.java进口redis.clients.jedis.Jedis;
      进口redis.clients.jedis.JedisPubSub;

      public class PubSubSubscribe extends jdispubsub{/ /扩展jdispubsub
      static Jedis Jedis = new Jedis(" localhost ", 6379);public static void main(String[] args) throws Exception {try {
      PubSubSubscribe mySubscriber = new PubSubSubscribe();
      String channel = " MyChannel ";
      能。订阅(mySubscriber、通道);
      最后}{
      jedis.close ();

      } / /接收消息
      @Override
      public void onMessage(String channel, String message) {
      System.out.println(消息);

  3. 使用列表快速数据摄取
    1. 将数据推入列表。文件名称:ListPush.java进口redis.clients.jedis.Jedis;

      公共类ListPush {
      static Jedis Jedis = new Jedis(" localhost ", 6379);
      public static void main(String[] args) throws Exception {try {
      String list = " MyList ";
      String message = " Hello there! ";
      能。lpush(列表、消息);
      最后}{
      jedis.close ();


    2. 从列表中弹出数据。文件名称:ListPop.java进口redis.clients.jedis.Jedis;

      公共类ListPop {
      static Jedis Jedis = new Jedis(" localhost ", 6379);
      public static void main(String[] args) throws Exception {try {
      String list = " MyList ";
      String message = jdis .rpop(list);
      System.out.println(消息);
      最后}{
      jedis.close ();



探索更多的


下一个步骤

Baidu