当前位置:网站首页>利用线程通信、解决缓存穿透数据库雪崩

利用线程通信、解决缓存穿透数据库雪崩

2020-11-07 18:55:56 lis1314

业务场景:

有一个面向C端的查询接口,访问量很大,假设我们使用缓存技术进行了传统的优化,第一次查询数据时,查缓存->缓存没有->查数据库->写入缓存

但是可能会面临一个问题、在同一时刻有很多用户(假设1W)查询同一条数据(假设商品ID一致)、此时数据并没有在缓存、可能会造成数据库雪崩,原因是这个时刻可能因为同一条数据对数据库进行了1W次的SQL查询(这里不讨论预先写入缓存方案,同时这里说的1W次也是极端的情况)。

我们怎么进行优化,这是本篇文章讨论的重点:

思路效果:

无论有多少用户访问查询、假设他们查询商品数据携带的ID一致、那么只产生1条查询、其他用户线程共用同一个查询结果,那么也就从n(10000)变成了1。

这个方案适合任何多线程工作去除重复效果提高性能的场景,这里只是其中一个场景。

下面讨论实现:

A、B、C 3个线程并发查询商品ID为1的数据,传统的缓存优化缓存中没有数据时,会发起3个SQL查询访问数据库,例sql:select * from products where id = 1;

那么我们如何利用行级锁(用户ID维度)来达到A、B、C,3个线程只查询一次数据库得到对应的结果呢?

思路:首先我们需要一个通信维度(用户ID一致认为是同一行为),如果访问入参的用户ID一样,我们可以实现如下的一个效果:

第一个进入的线程进行查询操作(假设是A),那么B、C线程我们让他进入等待的状态,等待线程A的查询结果,最终达到只查询一次数据库的操作

代码灵感来自阿里开源缓存框架jetcache,下面是改良后的公共实现,可以通用

package com.xxx.utils.sync;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import org.slf4j.Logger;

import com.alibaba.fastjson.JSON;

/**
 * 同步工具类<br/>
 * 主要预防优化缓存雪崩、数据雪崩</br>
 * 多线程访问的情况下、认为是同一操作的重复行为
 *
 */
public class SyncUtil {
	

	private static final Logger logger = org.slf4j.LoggerFactory.getLogger(SyncUtil.class);


	private ConcurrentHashMap<Object, LoaderLock> loaderMap = new ConcurrentHashMap<>();

	private Function<Object, Object> KeyConvertor;
	
	public SyncUtil() {
		this(null);
	}
	
	/**
	 * @param KeyConvertor 设置key转换规则<br/>
	 * 可以为空,默认使用fastJson
	 * 
	 */
	public SyncUtil(Function<Object, Object> KeyConvertor) {
		if(KeyConvertor == null) {
			this.KeyConvertor = new Function<Object, Object>() {
				@Override
				public Object apply(Object originalKey) {
					 if (originalKey == null) {
						 return null;
					 }
					 if (originalKey instanceof String) {
						 return originalKey;
					 }
					 return JSON.toJSONString(originalKey);
				}
			};
		}else {
			this.KeyConvertor = KeyConvertor;
		}
	}
	

	/**
	 * 同步加载函数<br/>
	 * 例如:A、B、C三个线程同一时刻查询执行某一个操作(查询数据库等),实际上查询的参数条件一样<br/>
	 * 这个时候我们可以对线程进行优化、防止数据雪崩缓存穿透,让其中的一个线程进行查库操作,其他线程等待具体工作线程的返回结果<br/>
	 * 如:A线程进行查库、B、C线程进行等待A线程的返回结果。
	 * @param <K>
	 * @param <V>
	 * @param timeout 等待时间(毫秒)(假设A线程查询超过等待时间、那么B、C线程放弃等待自己去执行业务查询)
	 * @param key 辨别是否是同一操作的key
	 * @param loader 加载数据的函数<br/>
	 * Function<String, String> loader = new Function<String, String>() {
			@Override
			public String apply(String t) {
				return "dbData";
			}
		};
	 * 
	 * @return
	 */
	@SuppressWarnings("unchecked")
	public <K, V> V synchronizedLoad(Integer timeout, K key, Function<K, V> loader) {
		Object lockKey = buildLoaderLockKey(key);
		while (true) {
			//有没有线程去做具体的获取业务数据的逻辑
			boolean create[] = new boolean[1];
			LoaderLock ll = loaderMap.computeIfAbsent(lockKey, (unusedKey) -> {
				create[0] = true;
				LoaderLock loaderLock = new LoaderLock();
				loaderLock.signal = new CountDownLatch(1);
				loaderLock.loaderThread = Thread.currentThread();
				return loaderLock;
			});
			if (create[0] || ll.loaderThread == Thread.currentThread()) {
				try {
					//第一个进入的线程进行真实的访问操作
					V loadedValue = loader.apply(key);
					ll.success = true;
					ll.value = loadedValue;
					return loadedValue;
				} finally {
					if (create[0]) {
						ll.signal.countDown();
						loaderMap.remove(lockKey);
					}
				}
			} else {
				//其他线程进行等待操作
				try {
					if (timeout == null) {
						ll.signal.await();
					} else {
						boolean ok = ll.signal.await(timeout, TimeUnit.MILLISECONDS);
						if (!ok) {
							//如果等待超时,放弃等待,当前线程直接进行访问
							logger.info("loader wait timeout:" + timeout);
							return loader.apply(key);
						}
					}
				} catch (InterruptedException e) {
					logger.warn("loader wait interrupted");
					return loader.apply(key);
				}
				//共用同一个返回结果
				if (ll.success) {
					return (V) ll.value;
				} else {
					continue;
				}

			}
		}
	}

	private Object buildLoaderLockKey(Object key) {
		return KeyConvertor.apply(key);
	}

	static class LoaderLock {
		CountDownLatch signal;
		Thread loaderThread;
		volatile boolean success;
		volatile Object value;
	}

}

测试代码

package test;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import com.xxx.MessageServiceApplication;
import com.xxx.mapper.ActivityInfoMapper;
import com.xxx.model.ActivityInfo;
import com.xxx.utils.sync.SyncUtil;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = { MessageServiceApplication.class })
public class QueryTest {
	
	@Autowired
	private ActivityInfoMapper activityInfoMapper;
	
	SyncUtil syncUtil = new SyncUtil();
	
	private AtomicInteger count = new AtomicInteger();
	private CyclicBarrier barrier = new CyclicBarrier(100);
	ExecutorService pool = Executors.newFixedThreadPool(100);
	
	public void synSelect(Long id) throws Exception {
		for (int i = 0; i < 100; i++) {
			pool.execute(()->{
				try {
					barrier.await();//线程等待,触发多线程同步的效果
//					realSelect1(id);//传统查询
					realSelect2(id);//优化后
				} catch (Exception e) {
					e.printStackTrace();
				}
			});
		}
		pool.shutdown();
		while(!pool.isTerminated()){
		}
		System.out.println();
	}
	
	public ActivityInfo realSelect2(Long id) {
		//线程通信的访问
		Function<Long,ActivityInfo> loader = (param)->{
			//记录真实的访问数据库次数(1)
			count.incrementAndGet();
			return activityInfoMapper.selectById(param);
		};
		return syncUtil.synchronizedLoad(3000, id, loader);
	}
	
	public ActivityInfo realSelect1(Long id) {
		//记录真实的访问数据库次数(100)
		count.incrementAndGet();
		return activityInfoMapper.selectById(id);
	}
	
	@Test
	public void testSelect() throws Exception {
		synSelect(1L);
		System.out.println("真实调用次数="+count.get());
	}
}

传统的查询结果

优化后的查询结果,只执行了1次SQL

版权声明
本文为[lis1314]所创,转载请带上原文链接,感谢
https://my.oschina.net/lis1314/blog/4707646