简单地说,我们现在是Redis
收集、存储和处理大量高多样性、高速的数据带来了一些复杂的设计挑战,特别是在物联网(IoT)、电子商务、安全、通信、娱乐、金融和零售等领域。鉴于响应性、及时性和准确的数据驱动决策是这些业务的核心,实时数据收集和分析至关重要。
交付实时数据分析的重要的第一步是确保有足够的资源可用来有效地捕获快速数据流。虽然物理基础设施(包括高速网络、计算、存储和内存)在这里扮演着重要的角色,但软件堆栈必须与其物理层的性能相匹配,否则组织最终可能会出现大量的数据积压、丢失的数据,或者不完整的、误导性的数据。万博电竞客服
高速数据摄入通常涉及不同类型的复杂性:
服务器数量最少的高性能
谈到性能,Redis Enterprise一直是万博体育彩基准测试在AWS上只有40个节点的集群,以毫秒以下的延迟处理每秒超过2亿个读/写操作。这使得Redis En万博体育彩terprise成为市场上最节约资源的NoSQL数据库。
灵活的数据结构和模块,实时分析:Redis流,Pub/Sub,列表,排序集,RedisTimeSeries
Redis提供了多种数据结构,如Streams、list、set、Sorted set和hash,提供简单和通用的数据处理,以有效地结合高速数据摄取和实时分析。
Redis的Pub/Sub功能允许它在地理分布的数据摄取节点之间充当有效的消息代理。数据生成应用程序以所需的格式将流数据发布到通道,而消费应用程序订阅与它们相关的通道,在消息发布时异步接收消息。
列表和排序集可以用作连接生产者和消费者的数据通道。还可以使用这些数据结构异步传输数据。与发布/订阅不同,列表和排序集提供了持久性。
流可以做的更多,它提供了生产者和消费者之间持久的数据摄取通道。有了Streams,您可以使用消费者组来扩展消费者的数量。当消费者在消费和处理数据的过程中失败时,消费者组还实现类似事务的数据安全。
最后RedisTimeSeries提供了一个增强的快速数据摄取功能集,包括下采样、对最后摄取值的特殊计数器操作和双增量压缩,并结合了实时分析功能,如数据标签内置搜索、聚合、范围查询、以及与领先的监控和分析工具(如Grafana和Prometheus)的内置连接器。
active - active Geo-Distribution部署
万博体育彩复述,企业的CRDTs-based active - active技术支持跨地理位置的复杂数据摄取和消息传递操作,并支持以完全分布式的方式部署应用程序,从而显著提高可用性和应用程序响应时间。
扩展Redis DRAM与SSD和持久内存
万博体育彩复述,企业的复述在闪光技术使扩展DRAM SSD和持久的记忆,允许存储很大的tb数据集使用相同的基础设施成本的基于磁盘的数据库和数据库延迟,同时保持在毫秒级的水平,即使摄取物品超过1米/秒的复述,企业集群的每个节点上。万博体育彩
下面是用Java编写的一些代码片段。他们都使用绝地武士的图书馆。首先,按照绝地武士的指示开始页面下载最新版本的绝地武士。
进口java.util.HashMap;
进口java.util.Map;
进口redis.clients.jedis.Jedis;
public class StreamPublish {
static Jedis Jedis = new Jedis(" localhost ", 6379);
尝试{
Map kv = new HashMap();
千伏。把(“a”,“100”);//键-> a;值- > 100
能。StreamEntryID xadd(“MyStream”。NEW_ENTRY kv);
最后}{
jedis.close ();
}
}
}
进口java.util.AbstractMap.SimpleEntry;
进口java.util.HashMap;
进口并不知道;
进口java.util.Map;
进口java.util.Map.Entry;
进口redis.clients.jedis.Jedis;
进口redis.clients.jedis.StreamEntry;
public class Jedis Jedis = new Jedis(" localhost ", 6379);public static void main(String[] args) throws Exception{//从0开始。对于后续查询,从最后一个id + 1读取
String lastStreamDataId = " 0-0 ";
Int count = 1000;
long waitTimeInMillis = 5000;try {
//在循环中异步读取新数据
而(真){
List next = getNext(" MyStream ", lastStreamDataId,
统计,waitTimeInMillis);
If (next != null) {
List stList = getStreamEntries(next);
if(stList != null) {
//在这里处理数据
for (int j = 0;j < stList.size ();j + +) {
StreamEntry streamData = (StreamEntry)stList.get(j);//读取数据流的字段(键值对)
Map fields = streamData.getFields();//从最后一个id + 1读取后续数据
.getTime lastStreamDataId = streamData.getID () ()
+“-”
+ (streamData.getID () .getSequence () + 1);System.out.println (stList.get (j));
System.out.println (lastStreamDataId);
}
其他}{
system . out。println("流中没有新数据");
}
}
最后}}{
jedis.close ();
}
} //从流中读取下一组数据
private static List getNext(String streamId, String lasttid, int count, long waitTimeInMillis) throws Exception{
HashMap map = new HashMap();
String readFrom = lasttid;
地图。把(streamId新StreamEntryID (readFrom));
列表列表=绝地武士。waitTimeInMillis xread(计数,
(条目<字符串,StreamEntryID >)
.toArray map.entrySet () () [0]);
返回列表;
} //读取流项
//假设streamList只有一个流
private static List getStreamEntries(List streamList) throws Exception{
如果(streamList.size () > 0) {
SimpleEntry stEntry = (SimpleEntry)streamList.get(0);
返回列表(< StreamEntry >) stEntry.getValue ();
}返回null;
}
}
进口并不知道;
进口java.util.Map;
进口redis.clients.jedis.Jedis;
进口redis.clients.jedis.StreamEntry;
public class Jedis Jedis = new Jedis(" localhost ", 6379);public static void main(String[] args) throws Exception{String streamID = " MyStream ";
StreamEntryID start = new StreamEntryID(0,0);
StreamEntryID end = null;// null ->直到流中的最后一项
Int count = 2;try {
stList stList = jedis. stList xrange(stream, start, end, count);
if(stList != null) {
//在这里处理数据
for (int j = 0;j < stList.size ();j + +) {
StreamEntry streamData = (StreamEntry)stList.get(j);System.out.println (streamData);//读取数据流的字段(键值对)
Map fields = streamData.getFields();//从最后一个id + 1读取后续数据
StreamEntryID nextStart =
.getTime新StreamEntryID (streamData.getID () (),
(streamData.getID () .getSequence () + 1));
}
其他}{
system . out。println("流中没有新数据");
}
最后}{
jedis.close ();
}
}
}
进口redis.clients.jedis.Jedis;
公共类PubSubPublish {
static Jedis Jedis = new Jedis(" localhost ", 6379);
public static void main(String[] args) throws Exception {try {
String channel = " MyChannel ";
String message = " Hello there! ";
能。发布(通道、消息);
最后}{
jedis.close ();
}
}
}
进口redis.clients.jedis.Jedis;
进口redis.clients.jedis.JedisPubSub;
public class PubSubSubscribe extends jdispubsub{/ /扩展jdispubsub
static Jedis Jedis = new Jedis(" localhost ", 6379);public static void main(String[] args) throws Exception {try {
PubSubSubscribe mySubscriber = new PubSubSubscribe();
String channel = " MyChannel ";
能。订阅(mySubscriber、通道);
最后}{
jedis.close ();
}
} / /接收消息
@Override
public void onMessage(String channel, String message) {
System.out.println(消息);
}
}
进口redis.clients.jedis.Jedis;
公共类ListPush {
static Jedis Jedis = new Jedis(" localhost ", 6379);
public static void main(String[] args) throws Exception {try {
String list = " MyList ";
String message = " Hello there! ";
能。lpush(列表、消息);
最后}{
jedis.close ();
}
}
}
进口redis.clients.jedis.Jedis;
公共类ListPop {
static Jedis Jedis = new Jedis(" localhost ", 6379);
public static void main(String[] args) throws Exception {try {
String list = " MyList ";
String message = jdis .rpop(list);
System.out.println(消息);
最后}{
jedis.close ();
}
}
}