Redis

关注公众号 jb51net

关闭
首页 > 数据库 > Redis > Redis在多线程情况下写入失败

Redis在多线程情况下写入失败的问题及解决

作者:我惠依旧

文章描述了在JRE 7 32位环境下使用Jedis客户端并发调用`set`方法时出现的问题,怀疑是多线程环境下获取连接导致的卡死,通过使用连接池和在JDK 8 64位环境下进行测试,问题得到解决

出现场景

同一时间多次调用jedis的set方法,出现:

redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketException: Socket is not connected: socket write error
    at redis.clients.jedis.Protocol.sendCommand(Protocol.java:98)
    at redis.clients.jedis.Protocol.sendCommand(Protocol.java:78)
    at redis.clients.jedis.Connection.sendCommand(Connection.java:101)
    at redis.clients.jedis.BinaryClient.set(BinaryClient.java:99)
    at redis.clients.jedis.Client.set(Client.java:29)
    at redis.clients.jedis.Jedis.set(Jedis.java:72)
    at com.castle.cache.JedisUtils.setSingle(JedisUtils.java:21)
    at com.castle.cache.JedisUtils$1.run(JedisUtils.java:36)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketException: Socket is not connected: socket write error
    at java.net.SocketOutputStream.socketWrite0(Native Method)
    at java.net.SocketOutputStream.socketWrite(Unknown Source)
    at java.net.SocketOutputStream.write(Unknown Source)
    at redis.clients.util.RedisOutputStream.flushBuffer(RedisOutputStream.java:31)
    at redis.clients.util.RedisOutputStream.write(RedisOutputStream.java:38)
    at redis.clients.jedis.Protocol.sendCommand(Protocol.java:84)
    ... 10 more

问题重现代码

环境:jre7 32bit

jedis:

        <dependency>
			<groupId>redis.clients</groupId>
			<artifactId>jedis</artifactId>
			<version>2.7.3</version>
			<type>jar</type>
			<scope>compile</scope>
		</dependency>
private static Jedis jedis = new Jedis("localhost",6379);;
	// 过期时间/生存时间
//    protected static int  expireTime = 60 * 60 *24;
	public static Jedis getInstance(){
		return jedis;
	}
	
	public static void setSingle(String key,String value){
		jedis.set(key, value);
//		jedis.expire(key, expireTime);
	}
	
	public static String getSingle(String key){
//		jedis.expire(key, expireTime);
		return jedis.get(key);
	}
	
	public static void main(String[] args) {
		ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        for (int i = 1; i <= 1000; i++) {
            final String ii = "TEST:TEST_NUM-"+i;
            cachedThreadPool.execute(new Thread(){
            	public void run(){
            		setSingle(ii, ii);
            	}
            });
        }
	}

运行代码然后就出现:

原因分析

猜测多线程获取连接,同一时间获取同一个连接导致卡死

解决方案

拟用连接池:

private static JedisPool pool;
	/**
     * 初始化Redis连接池
     */
    private static void initializePool() {
		//redisURL 与 redisPort 的配置文件
        JedisPoolConfig config = new JedisPoolConfig();
		//设置最大连接数(100个足够用了,没必要设置太大)
		config.setMaxTotal(20);
		//最大空闲连接数
		config.setMaxIdle(5);
		//获取Jedis连接的最大等待时间(50秒) 
		config.setMaxWaitMillis(50 * 1000);
		//在获取Jedis连接时,自动检验连接是否可用
		config.setTestOnBorrow(true);
		//在将连接放回池中前,自动检验连接是否有效
		config.setTestOnReturn(true);
		//自动测试池中的空闲连接是否都是可用连接
		config.setTestWhileIdle(true);
		//创建连接池
		pool = new JedisPool(config, "localhost",6379);
    }
 
    /**
     * 多线程环境同步初始化(保证项目中有且仅有一个连接池)
     */
    private static synchronized void poolInit() {
        if (null == pool) {
            initializePool();
        }
    }
	
	public static Jedis getJedis() {

	       if (pool == null) {
	           poolInit();
	       }
	       //如果没有以下代码会造成初始化的jedis拿不到 jedis对象
	       Jedis jedis = null;
	       try {
	           if (pool != null) {
	               jedis = pool.getResource();
	           }
	       }
	       catch (Exception e) {
	           e.printStackTrace();
	       }
	       return jedis;
	   }
	
	/**
     * 释放Jedis资源
     *
     * @param jedis
     */
    public static void returnResource(Jedis jedis) {
        if (null != jedis) {
            pool.returnResourceObject(jedis);
        }
    }
	
	//private static Jedis jedis = new Jedis("localhost",6379);;
//	public static Jedis getInstance(){
//		return jedis;
//	}
	// 过期时间/生存时间
    protected static int  expireTime = 60 * 60 *24;
	
	public static void put(String key,String value){
//		jedis.set(key, value);
		Jedis jedis = getJedis();
        while (true) {
            if (null != jedis) {
                break;
            } else {
                jedis = getJedis();
            }
        }
        jedis.set(key, value);
        returnResource(jedis);
//		jedis.expire(key, expireTime);
	}
	
	public static String get(String key){
//		jedis.expire(key, expireTime);
//		return jedis.get(key);
		Jedis jedis = getJedis();
        while (true) {
            if (null != jedis) {
                break;
            } else {
                jedis = getJedis();
            }
        }
        String value = jedis.get(key);
        returnResource(jedis);
        return value;
	}
	
	public static Set<String> keys(String keyMatch){
//		jedis.expire(key, expireTime);
//		
		Jedis jedis = getJedis();
        while (true) {
            if (null != jedis) {
                break;
            } else {
                jedis = getJedis();
            }
        }
        Set<String> res = jedis.keys(keyMatch);
        returnResource(jedis);
        return res;
	}
	
	public static void remove(String key){
//		jedis.del(key);
		Jedis jedis = getJedis();
        while (true) {
            if (null != jedis) {
                break;
            } else {
                jedis = getJedis();
            }
        }
        jedis.del(key);
        returnResource(jedis);
	}

测试代码:

public static void main(String[] args) {
		ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        for (int i = 1; i <= 1000; i++) {
            final String ii = "TEST:TEST_NUM-"+i;
            cachedThreadPool.execute(new Thread(){
            	public void run(){
            		put(ii, ii);
            	}
            });
        }
	}

插入成功:

这是在jdk7 32位的情况下,我在jdk8 64位运行会报异常的代码仍然能插入。

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文