简介
Lettuce是一个高性能基于Java编写的Redis驱动框架,底层集成了Project Reactor提供天然的反应式编程,通信框架集成了Netty使用了非阻塞IO,5.x版本之后融合了JDK1.8的异步编程特性,在保证高性能的同时提供了十分丰富易用的API。
引入
maven
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>5.1.8.RELEASE</version>
</dependency>
gradle
dependencies {
compile 'io.lettuce:lettuce-core:5.1.8.RELEASE'
}
基本使用
Lettuce使用的时候依赖于四个主要组件:
- RedisURI:连接信息。
- RedisClient:Redis客户端,特殊地,集群连接有一个定制的RedisClusterClient。
- Connection:Redis连接,主要是StatefulConnection或者StatefulRedisConnection的子类,连接的类型主要由连接的具体方式(单机、哨兵、集群、订阅发布等等)选定,比较重要。
- RedisCommands:Redis命令API接口,基本上覆盖了Redis发行版本的所有命令,提供了同步(sync)、异步(async)、反应式(reative)的调用方式,对于使用者而言,会经常跟RedisCommands系列接口打交道。
public void test() throws Exception {
RedisURI redisUri = RedisURI.builder()
.withHost("localhost")
.withPort(6379)
.withTimeout(Duration.of(10, ChronoUnit.SECONDS))
.build();
RedisClient redisClient = RedisClient.create(redisUri);
StatefulRedisConnection<String, String> connection = redisClient.connect();
RedisCommands<String, String> redisCommands = connection.sync();
SetArgs setArgs = SetArgs.Builder.nx().ex(5);
String result = redisCommands.set("name", "throwable", setArgs);
Assertions.assertThat(result).isEqualToIgnoringCase("OK");
result = redisCommands.get("name");
connection.close();
redisClient.shutdown();
}
API
Lettuce主要提供三种API:
- 同步(sync):RedisCommands。
- 异步(async):RedisAsyncCommands。
- 反应式(reactive):RedisReactiveCommands。
public interface StatefulRedisConnection<K, V> extends StatefulConnection<K, V> {
boolean isMulti();
RedisCommands<K, V> sync();
RedisAsyncCommands<K, V> async();
RedisReactiveCommands<K, V> reactive();
}
同步API
public void testSyncSetAndGet() throws Exception {
RedisCommands<String, String> COMMAND = CONNECTION.sync();
SetArgs setArgs = SetArgs.Builder.nx().ex(5);
COMMAND.set("name", "throwable", setArgs);
String value = COMMAND.get("name");
log.info("Get value: {}", value);
}
异步API
public interface RedisFuture<V> extends CompletionStage<V>, Future<V> {
String getError();
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
}
public void testAsyncSetAndGet() throws Exception {
RedisAsyncCommands<String, String> ASYNC_COMMAND = CONNECTION.async();
SetArgs setArgs = SetArgs.Builder.nx().ex(5);
RedisFuture<String> future = ASYNC_COMMAND.set("name", "throwable", setArgs);
future.thenAccept(value -> log.info("Set命令返回:{}", value));
future.get();
}
反应式API
public void testReactiveSetAndGet() throws Exception {
RedisReactiveCommands<String, String> REACTIVE_COMMAND = CONNECTION.reactive();
SetArgs setArgs = SetArgs.Builder.nx().ex(5);
REACTIVE_COMMAND.set("name", "throwable", setArgs).block();
REACTIVE_COMMAND.get("name").subscribe(value -> log.info("Get命令返回:{}", value));
Thread.sleep(1000);
}
发布和订阅
RedisClusterClient clusterClient = ...
StatefulRedisClusterPubSubConnection<String, String> connection = clusterClient.connectPubSub();
connection.addListener(new RedisPubSubListener<String, String>() { ... });
// 同步命令
RedisPubSubCommands<String, String> sync = connection.sync();
sync.subscribe("channel");
// 异步命令
RedisPubSubAsyncCommands<String, String> async = connection.async();
RedisFuture<Void> future = async.subscribe("channel");
// 反应式命令
RedisPubSubReactiveCommands<String, String> reactive = connection.reactive();
reactive.subscribe("channel").subscribe();
reactive.observeChannels().doOnNext(patternMessage -> {...}).subscribe()
事务和批量命令执行
事务相关的命令就是WATCH、UNWATCH、EXEC、MULTI和DISCARD,在RedisCommands系列接口中有对应的方法。
public void testSyncMulti() throws Exception {
COMMAND.multi();
COMMAND.setex("name-1", 2, "throwable");
COMMAND.setex("name-2", 2, "doge");
TransactionResult result = COMMAND.exec();
int index = 0;
for (Object r : result) {
log.info("Result-{}:{}", index, r);
index++;
}
}
Redis的Pipeline(管道机制)可以理解为把多个命令打包在一次请求发送到Redis服务端,然后Redis服务端把所有的响应结果打包好一次性返回,从而节省不必要的网络资源(最主要是减少网络请求次数)。
Redis对于Pipeline机制如何实现并没有明确的规定,也没有提供特殊的命令支持Pipeline机制。Pipeline在Lettuce中对使用者是透明的,由于底层的通讯框架是Netty,所以网络通讯层面的优化Lettuce不需要过多干预,Netty帮Lettuce从底层实现了Redis的Pipeline机制。
在SpringBoot中使用Lettuce
先引入依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
配置
spring:
redis:
host: 127.0.0.1 # IP
port: 6379 # 端口号
password: 123456 # 密码
timeout: 10000
lettuce:
pool:
max-active: 8 # 连接池最大连接数
max-wait: -1ms # 连接池最大阻塞等待时间(使用负值表示没有限制)
min-idle: 0 # 连接池中的最小空闲连接
max-idle: 8 # 连接池中的最大空闲连接
@Configuration
@EnableCaching //启用缓存
public class CacheConfig extends CachingConfigurerSupport {
/**
* 自定义缓存key的生成策略。默认的生成策略是看不懂的(乱码内容) 通过Spring 的依赖注入特性进行自定义的配置注入并且此类是一个配置类可以更多程度的自定义配置
*
* @return
*/
@Bean
@Override
public KeyGenerator keyGenerator() {
return new KeyGenerator() {
@Override
public Object generate(Object target, Method method, Object... params) {
StringBuilder sb = new StringBuilder();
sb.append(target.getClass().getName());
sb.append(method.getName());
for (Object obj : params) {
sb.append(obj.toString());
}
return sb.toString();
}
};
}
/**
* 缓存配置管理器
*/
@Bean
public CacheManager cacheManager(LettuceConnectionFactory factory) {
//创建默认缓存配置对象
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig();
RedisCacheManager cacheManager = new RedisCacheManager(writer, config);
return cacheManager;
}
/**
* 获取缓存操作助手对象
*
* @return
*/
@Bean
public RedisTemplate<String, String> redisTemplate(LettuceConnectionFactory factory) {
//创建Redis缓存操作助手RedisTemplate对象
StringRedisTemplate template = new StringRedisTemplate();
template.setConnectionFactory(factory);
//以下代码为将RedisTemplate的Value序列化方式由JdkSerializationRedisSerializer更换为Jackson2JsonRedisSerializer
//此种序列化方式结果清晰、容易阅读、存储字节少、速度快,所以推荐更换
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
template.setValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
}
/**
* 缓存提供类
*/
public class CacheUtils {
//由于当前class不在spring boot框架内(不在web项目中)所以无法使用autowired,使用此种方法进行注入
private static RedisTemplate<String, String> template = (RedisTemplate<String, String>) SpringBeanUtil.getBean("redisTemplate");
public static <T> boolean set(String key, T value) {
Gson gson = new Gson();
return set(key, gson.toJson(value));
}
public static boolean set(String key, String value, long validTime) {
boolean result = template.execute(new RedisCallback<Boolean>() {
@Override
public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
RedisSerializer<String> serializer = template.getStringSerializer();
connection.set(serializer.serialize(key), serializer.serialize(value));
connection.expire(serializer.serialize(key), validTime);
return true;
}
});
return result;
}
public static <T> T get(String key, Class<T> clazz) {
Gson gson = new Gson();
return gson.fromJson(get(key), clazz);
}
public static String get(String key) {
String result = template.execute(new RedisCallback<String>() {
@Override
public String doInRedis(RedisConnection connection) throws DataAccessException {
RedisSerializer<String> serializer = template.getStringSerializer();
byte[] value = connection.get(serializer.serialize(key));
return serializer.deserialize(value);
}
});
return result;
}
public static boolean del(String key) {
return template.delete(key);
}
}
本文暂时没有评论,来添加一个吧(●'◡'●)