成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

像Flink一樣使用Redis

數(shù)據(jù)庫 Redis
Redis 是一種功能強大的 NoSQL 內存數(shù)據(jù)結構存儲,已成為開發(fā)人員的首選工具。雖然它通常被認為只是一個緩存,但 Redis 遠不止于此。它可以作為數(shù)據(jù)庫、消息代理和緩存三者合一。

Apache Flink和 Redis 是兩個強大的工具,可以一起使用來構建可以處理大量數(shù)據(jù)的實時數(shù)據(jù)處理管道。Flink 為處理數(shù)據(jù)流提供了一個高度可擴展和容錯的平臺,而 Redis 提供了一個高性能的內存數(shù)據(jù)庫,可用于存儲和查詢數(shù)據(jù)。在本文中,將探討如何使用 Flink 來使用異步函數(shù)調用 Redis,并展示如何使用它以非阻塞方式將數(shù)據(jù)推送到 Redis。

Redis的故事

圖片

“Redis:不僅僅是一個緩存

Redis 是一種功能強大的 NoSQL 內存數(shù)據(jù)結構存儲,已成為開發(fā)人員的首選工具。雖然它通常被認為只是一個緩存,但 Redis 遠不止于此。它可以作為數(shù)據(jù)庫、消息代理和緩存三者合一。

Redis 的優(yōu)勢之一是它的多功能性。它支持各種數(shù)據(jù)類型,包括字符串、列表、集合、有序集合、哈希、流、HyperLogLogs 和位圖。Redis 還提供地理空間索引和半徑查詢,使其成為基于位置的應用程序的寶貴工具。

Redis 的功能超出了它的數(shù)據(jù)模型。它具有內置的復制、Lua 腳本和事務,并且可以使用 Redis Cluster 自動分區(qū)數(shù)據(jù)。此外,Redis 通過 Redis Sentinel 提供高可用性。

注意:在本文中,將更多地關注Redis集群模式

圖片

Redis 集群使用帶哈希槽的算法分片來確定哪個分片擁有給定的鍵并簡化添加新實例的過程。同時,它使用 Gossiping 來確定集群的健康狀況,如果主節(jié)點沒有響應,可以提升輔助節(jié)點以保持集群健康。必須有奇數(shù)個主節(jié)點和兩個副本才能進行穩(wěn)健設置,以避免腦裂現(xiàn)象(集群無法決定提升誰并最終做出分裂決定)

為了與 Redis 集群對話,將使用lettuce和 Redis Async Java 客戶端。

Flink 的故事

圖片

Apache Flink 是一個開源、統(tǒng)一的流處理和批處理框架,旨在處理實時、高吞吐量和容錯數(shù)據(jù)處理。它建立在 Apache Gelly 框架之上,旨在支持有界和無界流上的復雜事件處理和有狀態(tài)計算,它的快速之處在于其利用內存中性能和異步檢查本地狀態(tài)。

故事的主人公

圖片

與數(shù)據(jù)庫的異步交互是流處理應用程序的游戲規(guī)則改變者。通過這種方法,單個函數(shù)實例可以同時處理多個請求,從而允許并發(fā)響應并顯著提高吞吐量。通過將等待時間與其他請求和響應重疊,處理管道變得更加高效。

我們將以電商數(shù)據(jù)為例,計算24小時滑動窗口中每個品類的銷售額,滑動時間為30秒,下沉到Redis,以便更快地查找下游服務。

充足的數(shù)據(jù)集

Category, TimeStamp
Electronics,1679832334
Furniture,1679832336
Fashion,1679832378
Food,16798323536

Flink Kafka 消費者類

package Aysnc_kafka_redis;

import AsyncIO.RedisSink;
import akka.japi.tuple.Tuple3;
import deserializer.Ecommdeserialize;
import model.Ecomm;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.concurrent.TimeUnit;

public class FlinkAsyncRedis {

public static void main(String[] args) throws Exception {


final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Ecommdeserialize jsonde = new Ecommdeserialize();

KafkaSource<Ecomm> source = KafkaSource.<Ecomm>builder()
.setTopics("{dummytopic}")
.setBootstrapServers("{dummybootstrap}")
.setGroupId("test_flink")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(jsonde)
.build();


DataStream<Ecomm> orderData = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");


orderData.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Ecomm>(Time.seconds(10)) {
@Override
public long extractTimestamp(Ecomm element) {
return element.getEventTimestamp(); // extract watermark column from stream
}
});

SingleOutputStreamOperator<Tuple3<String, Long, Long>> aggregatedData = orderData.keyBy(Ecomm::getCategory)
.window(SlidingEventTimeWindows.of(Time.hours(24),Time.seconds(30)))
.apply((WindowFunction<Ecomm, Tuple3<String, Long, Long>, String, TimeWindow>) (key, window, input, out) -> {
long count = 0;
for (Ecomm event : input) {
count++; // increment the count for each event in the window
}
out.collect(new Tuple3<>(key, window.getEnd(), count)); // output the category, window end time, and count
});


// calling async I/0 operator to sink data to redis in UnOrdered way
SingleOutputStreamOperator<String> sinkResults = AsyncDataStream.unorderedWait(aggregatedData,new RedisSink(
"{redisClusterUrl}"),
1000, // the timeout defines how long an asynchronous operation take before it is finally considered failed
TimeUnit.MILLISECONDS,
100); //capacity This parameter defines how many asynchronous requests may be in progress at the same time.

sinkResults.print(); // print out the redis set response stored in the future for every key

env.execute("RedisAsyncSink"); // you will be able to see your job running on cluster by this name


}

}

Redis 設置鍵異步 I/0 運算符

package AsyncIO;

import akka.japi.tuple.Tuple3;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands;
import lombok.AllArgsConstructor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import scala.collection.immutable.List;

import java.util.ArrayList;
import java.util.Collections;

@AllArgsConstructor
public class RedisSink extends RichAsyncFunction<Tuple3<String, Long, Long>, String> {

String redisUrl;

public RedisSink(String redisUrl){
this.redisUrl=redisUrl;
}

private transient RedisClusterClient client = null;
private transient StatefulRedisClusterConnection<String, String> clusterConnection = null;
private transient RedisAdvancedClusterAsyncCommands<String, String> asyncCall = null;


// method executes any operator-specific initialization
@Override
public void open(Configuration parameters) {
if (client == null ) {
client = RedisClusterClient.create(redisUrl);
}
if (clusterConnection == null) {
clusterConnection = client.connect();
}
if (asyncCall == null) {
asyncCall = clusterConnection.async();
}
}

// core logic to set key in redis using async connection and return result of the call via ResultFuture
@Override
public void asyncInvoke(Tuple3<String, Long, Long> stream, ResultFuture<String> resultFuture) {

String productKey = stream.t1();
System.out.println("RedisKey:" + productKey); //for logging
String count = stream.t3().toString();
System.out.println("Redisvalue:" + count); //for logging
RedisFuture<String> setResult = asyncCall.set(productKey,count);

setResult.whenComplete((result, throwable) -> {if(throwable!=null){
System.out.println("Callback from redis failed:" + throwable);
resultFuture.complete(new ArrayList<>());
}
else{
resultFuture.complete(new ArrayList(Collections.singleton(result)));
}});
}

// method closes what was opened during initialization to free any resources
// held by the operator (e.g. open network connections, io streams)
@Override
public void close() throws Exception {
client.close();
}

}

用例:

  • 數(shù)據(jù)科學模型可以使用流式傳輸?shù)?Redis 的數(shù)據(jù)來查找和生成更多在銷售季節(jié)經(jīng)常銷售的類別的產(chǎn)品。
  • 它可用于在網(wǎng)頁上展示圖表和數(shù)字作為銷售統(tǒng)計數(shù)據(jù),以在用戶中產(chǎn)生積極購買的動力。

要點:

  • Flink 為處理數(shù)據(jù)流提供了一個高度可擴展和容錯的平臺,而 Redis 提供了一個高性能的內存數(shù)據(jù)庫,可用于存儲和查詢數(shù)據(jù)。
  • 異步編程可用于通過允許對外部系統(tǒng)(如 Redis)進行非阻塞調用來提高數(shù)據(jù)處理管道的性能。
  • 兩者的結合可能有助于帶來實時數(shù)據(jù)決策文化。
責任編輯:武曉燕 來源: Java學研大本營
相關推薦

2022-12-21 15:56:23

代碼文檔工具

2023-05-23 13:59:41

RustPython程序

2013-12-31 09:19:23

Python調試

2013-12-17 09:02:03

Python調試

2021-08-27 06:41:34

Docker ContainerdRun&Exec

2021-12-28 11:23:36

SQLServerExcel數(shù)據(jù)分析

2021-05-20 08:37:32

multiprocesPython線程

2013-08-22 10:17:51

Google大數(shù)據(jù)業(yè)務價值

2015-03-16 12:50:44

2011-01-18 10:45:16

喬布斯

2012-06-08 13:47:32

Wndows 8Vista

2015-02-05 13:27:02

移動開發(fā)模塊SDK

2017-04-26 14:02:18

大數(shù)據(jù)數(shù)據(jù)分析Excel

2012-03-21 10:15:48

RIM越獄

2017-05-22 10:33:14

PythonJuliaCython

2021-09-07 10:29:11

JavaScript模塊CSS

2021-12-14 19:40:07

Node路由Vue

2015-04-09 11:27:34

2012-06-14 09:48:11

OpenStackLinux

2011-10-24 13:07:00

點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: av看片| 特黄色一级毛片 | 欧美日韩精品影院 | 天天操妹子 | 国产资源在线播放 | 91精品国产一区二区三区动漫 | 97国产超碰 | 国产超碰人人爽人人做人人爱 | 国产一区二区久久 | 在线成人精品视频 | 91精品国产一二三 | 亚洲福利视频一区二区 | 亚洲综合99| 欧美一卡二卡在线 | 国产一二三区在线 | 天天躁日日躁狠狠很躁 | 成人精品一区 | 欧美视频一区 | 91麻豆精品国产91久久久久久久久 | 欧美一级片在线 | 精品一区二区在线看 | 国产一区二区三区四区 | 午夜免费在线电影 | 成人午夜网站 | 久久国产秒 | 久久久久久久久久久久久久久久久久久久 | 91精品久久 | 国产精品天堂 | 一级片av| 久草在线影 | 日日草夜夜草 | 久久99网 | www狠狠爱com | 日韩av一区二区在线观看 | 国产成人精品一区二 | 91观看 | 日韩精品免费视频 | 999免费网站 | 在线观看国产视频 | av网站免费在线观看 | 日韩欧美网 |