Redis在多线程情况下写入失败的问题及解决
作者:我惠依旧
出现场景
同一时间多次调用jedis的set方法,出现:
redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketException: Socket is not connected: socket write error
at redis.clients.jedis.Protocol.sendCommand(Protocol.java:98)
at redis.clients.jedis.Protocol.sendCommand(Protocol.java:78)
at redis.clients.jedis.Connection.sendCommand(Connection.java:101)
at redis.clients.jedis.BinaryClient.set(BinaryClient.java:99)
at redis.clients.jedis.Client.set(Client.java:29)
at redis.clients.jedis.Jedis.set(Jedis.java:72)
at com.castle.cache.JedisUtils.setSingle(JedisUtils.java:21)
at com.castle.cache.JedisUtils$1.run(JedisUtils.java:36)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketException: Socket is not connected: socket write error
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(Unknown Source)
at java.net.SocketOutputStream.write(Unknown Source)
at redis.clients.util.RedisOutputStream.flushBuffer(RedisOutputStream.java:31)
at redis.clients.util.RedisOutputStream.write(RedisOutputStream.java:38)
at redis.clients.jedis.Protocol.sendCommand(Protocol.java:84)
... 10 more
问题重现代码
环境:jre7 32bit
jedis:
<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.7.3</version> <type>jar</type> <scope>compile</scope> </dependency>
private static Jedis jedis = new Jedis("localhost",6379);;
// 过期时间/生存时间
// protected static int expireTime = 60 * 60 *24;
public static Jedis getInstance(){
return jedis;
}
public static void setSingle(String key,String value){
jedis.set(key, value);
// jedis.expire(key, expireTime);
}
public static String getSingle(String key){
// jedis.expire(key, expireTime);
return jedis.get(key);
}
public static void main(String[] args) {
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 1; i <= 1000; i++) {
final String ii = "TEST:TEST_NUM-"+i;
cachedThreadPool.execute(new Thread(){
public void run(){
setSingle(ii, ii);
}
});
}
}运行代码然后就出现:

原因分析
猜测多线程获取连接,同一时间获取同一个连接导致卡死
解决方案
拟用连接池:
private static JedisPool pool;
/**
* 初始化Redis连接池
*/
private static void initializePool() {
//redisURL 与 redisPort 的配置文件
JedisPoolConfig config = new JedisPoolConfig();
//设置最大连接数(100个足够用了,没必要设置太大)
config.setMaxTotal(20);
//最大空闲连接数
config.setMaxIdle(5);
//获取Jedis连接的最大等待时间(50秒)
config.setMaxWaitMillis(50 * 1000);
//在获取Jedis连接时,自动检验连接是否可用
config.setTestOnBorrow(true);
//在将连接放回池中前,自动检验连接是否有效
config.setTestOnReturn(true);
//自动测试池中的空闲连接是否都是可用连接
config.setTestWhileIdle(true);
//创建连接池
pool = new JedisPool(config, "localhost",6379);
}
/**
* 多线程环境同步初始化(保证项目中有且仅有一个连接池)
*/
private static synchronized void poolInit() {
if (null == pool) {
initializePool();
}
}
public static Jedis getJedis() {
if (pool == null) {
poolInit();
}
//如果没有以下代码会造成初始化的jedis拿不到 jedis对象
Jedis jedis = null;
try {
if (pool != null) {
jedis = pool.getResource();
}
}
catch (Exception e) {
e.printStackTrace();
}
return jedis;
}
/**
* 释放Jedis资源
*
* @param jedis
*/
public static void returnResource(Jedis jedis) {
if (null != jedis) {
pool.returnResourceObject(jedis);
}
}
//private static Jedis jedis = new Jedis("localhost",6379);;
// public static Jedis getInstance(){
// return jedis;
// }
// 过期时间/生存时间
protected static int expireTime = 60 * 60 *24;
public static void put(String key,String value){
// jedis.set(key, value);
Jedis jedis = getJedis();
while (true) {
if (null != jedis) {
break;
} else {
jedis = getJedis();
}
}
jedis.set(key, value);
returnResource(jedis);
// jedis.expire(key, expireTime);
}
public static String get(String key){
// jedis.expire(key, expireTime);
// return jedis.get(key);
Jedis jedis = getJedis();
while (true) {
if (null != jedis) {
break;
} else {
jedis = getJedis();
}
}
String value = jedis.get(key);
returnResource(jedis);
return value;
}
public static Set<String> keys(String keyMatch){
// jedis.expire(key, expireTime);
//
Jedis jedis = getJedis();
while (true) {
if (null != jedis) {
break;
} else {
jedis = getJedis();
}
}
Set<String> res = jedis.keys(keyMatch);
returnResource(jedis);
return res;
}
public static void remove(String key){
// jedis.del(key);
Jedis jedis = getJedis();
while (true) {
if (null != jedis) {
break;
} else {
jedis = getJedis();
}
}
jedis.del(key);
returnResource(jedis);
}测试代码:
public static void main(String[] args) {
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 1; i <= 1000; i++) {
final String ii = "TEST:TEST_NUM-"+i;
cachedThreadPool.execute(new Thread(){
public void run(){
put(ii, ii);
}
});
}
}插入成功:

这是在jdk7 32位的情况下,我在jdk8 64位运行会报异常的代码仍然能插入。
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。
