像Flink一样使用Redis

人工智能2025-11-04 14:11:434

Apache Flink和 Redis 是样使用两个强大的工具,可以一起使用来构建可以处理大量数据的样使用实时数据处理管道。Flink 为处理数据流提供了一个高度可扩展和容错的样使用平台,而 Redis 提供了一个高性能的样使用内存数据库,可用于存储和查询数据。样使用在本文中,样使用将探讨如何使用 Flink 来使用异步函数调用 Redis,样使用并展示如何使用它以非阻塞方式将数据推送到 Redis。样使用

Redis的样使用故事

“Redis:不仅仅是一个缓存

Redis 是一种功能强大的 NoSQL 内存数据结构存储,已成为开发人员的样使用首选工具。虽然它通常被认为只是样使用一个缓存,但 Redis 远不止于此。样使用它可以作为数据库、样使用消息代理和缓存三者合一。样使用

Redis 的样使用优势之一是它的多功能性。它支持各种数据类型,包括字符串、列表、集合、有序集合、免费源码下载哈希、流、HyperLogLogs 和位图。Redis 还提供地理空间索引和半径查询,使其成为基于位置的应用程序的宝贵工具。

Redis 的功能超出了它的数据模型。它具有内置的复制、Lua 脚本和事务,并且可以使用 Redis Cluster 自动分区数据。此外,Redis 通过 Redis Sentinel 提供高可用性。

注意:在本文中,将更多地关注Redis集群模式

Redis 集群使用带哈希槽的算法分片来确定哪个分片拥有给定的键并简化添加新实例的过程。同时,它使用 Gossiping 来确定集群的健康状况,如果主节点没有响应,可以提升辅助节点以保持集群健康。必须有奇数个主节点和两个副本才能进行稳健设置,以避免脑裂现象(集群无法决定提升谁并最终做出分裂决定)

为了与 Redis 集群对话,将使用lettuce和 Redis Async Java 客户端。

Flink 的故事

Apache Flink 是一个开源、统一的免费信息发布网流处理和批处理框架,旨在处理实时、高吞吐量和容错数据处理。它建立在 Apache Gelly 框架之上,旨在支持有界和无界流上的复杂事件处理和有状态计算,它的快速之处在于其利用内存中性能和异步检查本地状态。

故事的主人公

与数据库的异步交互是流处理应用程序的游戏规则改变者。通过这种方法,单个函数实例可以同时处理多个请求,从而允许并发响应并显着提高吞吐量。通过将等待时间与其他请求和响应重叠,处理管道变得更加高效。

我们将以电商数据为例,计算24小时滑动窗口中每个品类的销售额,滑动时间为30秒,下沉到Redis,以便更快地查找下游服务。

充足的数据集

复制Category, TimeStampElectronics,1679832334Furniture,1679832336Fashion,1679832378Food,167983235361.2.3.4.5.

Flink Kafka 消费者类

复制package Aysnc_kafka_redis;import AsyncIO.RedisSink;import akka.japi.tuple.Tuple3;import deserializer.Ecommdeserialize;import model.Ecomm;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.connector.kafka.source.KafkaSource;import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;import org.apache.flink.streaming.api.datastream.AsyncDataStream;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;import org.apache.flink.streaming.api.functions.windowing.WindowFunction;import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import org.apache.flink.util.Collector;import java.util.concurrent.TimeUnit;public class FlinkAsyncRedis { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Ecommdeserialize jsonde = new Ecommdeserialize(); KafkaSource<Ecomm> source = KafkaSource.<Ecomm>builder() .setTopics("{dummytopic}") .setBootstrapServers("{dummybootstrap}") .setGroupId("test_flink") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(jsonde) .build(); DataStream<Ecomm> orderData = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source"); orderData.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Ecomm>(Time.seconds(10)) {

@Override

public long extractTimestamp(Ecomm element) { return element.getEventTimestamp(); // extract watermark column from

stream

} }); SingleOutputStreamOperator<Tuple3<String, Long, Long>> aggregatedData = orderData.keyBy(Ecomm::getCategory) .window(SlidingEventTimeWindows.of(Time.hours(24),Time.seconds(30))) .apply((WindowFunction<Ecomm, Tuple3<String, Long, Long>, String, TimeWindow>) (key, window, input, out) -> { long count = 0; for (Ecomm event : input) { count++; // increment the count for each event in

the window

} out.collect(new Tuple3<>(key, window.getEnd(), count)); // output the category, window end time, and count }); // calling async I/0 operator to sink data to redis in

UnOrdered way

SingleOutputStreamOperator<String> sinkResults = AsyncDataStream.unorderedWait(aggregatedData,new RedisSink( "{redisClusterUrl}"), 1000, // the timeout defines how long an asynchronous operation take before it is

finally considered failed

TimeUnit.MILLISECONDS, 100); //capacity This parameter defines how many asynchronous requests may be in progress at the same time

.

sinkResults.print(); // print out the redis set response stored in

the future for every key

env.execute("RedisAsyncSink"); // you will be able to see your job running on cluster by

this name

}}1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.56.57.58.59.60.61.62.63.64.65.66.67.68.69.70.71.72.73.74.

Redis 设置键异步 I/0 运算符

复制package AsyncIO;import akka.japi.tuple.Tuple3;import io.lettuce.core.RedisFuture;import io.lettuce.core.cluster.RedisClusterClient;import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands;import lombok.AllArgsConstructor;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.async.ResultFuture;import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;import scala.collection.immutable.List;import java.util.ArrayList;import java.util.Collections;

@AllArgsConstructor

public class RedisSink extends RichAsyncFunction<Tuple3<String, Long, Long>, String> { String redisUrl; public RedisSink(String redisUrl){ this.redisUrl=redisUrl; } private transient RedisClusterClient client = null; private transient StatefulRedisClusterConnection<String, String> clusterConnection = null; private transient RedisAdvancedClusterAsyncCommands<String, String> asyncCall = null; // method executes any operator-

specific initialization

@Override

public void open(Configuration parameters) { if (client == null ) { client = RedisClusterClient.create(redisUrl); } if (clusterConnection == null) { clusterConnection = client.connect(); } if (asyncCall == null) { asyncCall = clusterConnection.async(); } } // core logic to set key in redis using async connection and

return result of the call via ResultFuture

@Override

public void asyncInvoke(Tuple3<String, Long, Long> stream, ResultFuture<String> resultFuture) { String productKey = stream.t1(); System.out.println("RedisKey:" + productKey); //

for logging

String count = stream.t3().toString(); System.out.println("Redisvalue:" + count); //

for logging

RedisFuture<String> setResult = asyncCall.set(productKey,count); setResult.whenComplete((result, throwable) -> {if(throwable!=null){ System.out.println("Callback from redis failed:" + throwable); resultFuture.complete(new ArrayList<>()); } else{ resultFuture.complete(new ArrayList(Collections.singleton(result))); }}); } //

method closes what was opened during initialization to free any resources

// held by the operator (e.g. open network connections, io streams)

@Override

public void close() throws Exception { client.close(); }}1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.56.57.58.59.60.61.62.63.64.65.66.67.68.69.70.71.72.

用例:

数据科学模型可以使用流式传输到 Redis 的数据来查找和生成更多在销售季节经常销售的类别的产品。它可用于在网页上展示图表和数字作为销售统计数据,以在用户中产生积极购买的动力。

要点:

Flink 为处理数据流提供了一个高度可扩展和容错的亿华云计算平台,而 Redis 提供了一个高性能的内存数据库,可用于存储和查询数据。异步编程可用于通过允许对外部系统(如 Redis)进行非阻塞调用来提高数据处理管道的性能。两者的结合可能有助于带来实时数据决策文化。
本文地址:http://www.bhae.cn/html/941b8598973.html
版权声明

本文仅代表作者观点,不代表本站立场。
本文系作者授权发表,未经许可,不得转载。

全站热门

利用无peu盘装系统的教程及技巧(无peu盘装系统,实现简单高效,方便快捷安装)

让我们一起聊聊 Django 框架

十分钟带你入门 Web Components

美国大厂新员工薪资曝光! 微软最高近30万美元,TikTok低至时薪30美元

Mac装系统教程(从准备工作到系统安装,全方位指导助你顺利完成)

探讨AR和IoT的用例

教你一招!用Python和Flask创建REST API!

注意:雪花算法并不是ID的唯一选择!

友情链接

滇ICP备2023000592号-9