java高并发下数据入库

java高并发下数据入库java高并发下数据批量入库该服务利用线程池并结合缓存类来处理高并发下数据入库问题,做到实时数据存入redis和数据批量入库,使用的时候需要修改为自己的业务数据,该服务暂时适合下面两种情况:1、达到设置的超时时间。2、达到最大批次。packageio.renren.service.impl;importcom.alibaba.fastjson.JSON;importcom.alibaba.fastjson.JSONArray;importlombok.extern.slf4j.Slf

大家好,又见面了,我是你们的朋友全栈君。

java高并发下数据入库

该服务利用线程池并结合缓存类来处理高并发下数据入库问题,做到实时数据存入redis和数据批量入库,使用的时候需要修改为自己的业务数据,该模块是根据下面的设置进行高并发处理。
1、达到设置的超时时间。
2、达到最大批次。

package io.jack.service.impl;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/** * <pre> * 数据批量入库服务 * </pre> * Created by RuiXing Hou on 2021-08-05. * * @since 1.0 */
@Component
@Slf4j
public class BatchDataStorageService implements InitializingBean
{ 
   
	/** * 最大批次数量 */
	@Value("${app.db.maxBatchCount:800}")
    private int maxBatchCount;

	/** * 最大线程数 */
    @Value("${app.db.maxBatchThreads:100}")
    private int maxBatchThreads;

	/** * 超时时间 */
	@Value("${app.db.batchTimeout:3000}")
    private int batchTimeout;

	/** * 批次数量 */
    private int batchCount = 0;

	/** * 批次号 */
	private static long batchNo = 0;

	/** * 线程池定义接口 */
    private ExecutorService executorService = null;

	/** * 服务器缓存工具类,下面提供源码 */
	@Resource
	private CacheService cacheService;

	/** * 业务接口 */
	@Resource
	private DeviceRealTimeService deviceRealTimeService;

	/** * redis工具类 */
	@Resource
	private RedisUtils redisUtils;

	@Override
	public void afterPropertiesSet() { 
   
		this.executorService = Executors.newFixedThreadPool(this.maxBatchThreads, r -> { 
   
			Thread thread = new Thread(r);
			if (r instanceof BatchWorker) { 
   
				thread.setName("batch-worker-" + ((BatchWorker) r).batchKey);
			}
			return thread;
		});
	}

	/** * 需要做高并发处理的类只需要调用该方法 (我用的是rabbitMq) * * @param deviceRealTimeDTO */
	public void saveRealTimeData(DeviceRealTimeDTO deviceRealTimeDTO) { 
   
		final String failedCacheKey = "device:real_time:failed_records";

		try { 
   

			String durationKey = "device:real_time:batchDuration" + batchNo;
			String batchKey = "device:real_time:batch" + batchNo;

			if (!cacheService.exists(durationKey)) { 
   
				cacheService.put(durationKey, System.currentTimeMillis());
				new BatchTimeoutCommitThread(batchKey, durationKey, failedCacheKey).start();
			}

			cacheService.lPush(batchKey, deviceRealTimeDTO);
			if (++batchCount >= maxBatchCount) { 
   
				// 达到最大批次,执行入库逻辑
				dataStorage(durationKey, batchKey, failedCacheKey);
			}

		} catch (Exception ex) { 
   
			log.warn("[DB:FAILED] 设备上报记录入批处理集合异常: " + ex.getMessage() + ", DeviceRealTimeDTO: " + JSON.toJSONString(deviceRealTimeDTO), ex);
			cacheService.lPush(failedCacheKey, deviceRealTimeDTO);
		} finally { 
   
			updateRealTimeData(deviceRealTimeDTO);
		}
	}

	/** * 更新实时数据 * @param deviceRealTimeDTO 业务POJO */
	private void updateRealTimeData(DeviceRealTimeDTO deviceRealTimeDTO) { 
   
		redisUtils.set("real_time:"+deviceRealTimeDTO.getDeviceId(), JSONArray.toJSONString(deviceRealTimeDTO));
	}

	/** * * @param durationKey 持续时间标识 * @param batchKey 批次标识 * @param failedCacheKey 错误标识 */
	private void dataStorage(String durationKey, String batchKey, String failedCacheKey) { 
   
		batchNo++;
		batchCount = 0;
		cacheService.del(durationKey);
		if (batchNo >= Long.MAX_VALUE) { 
   
			batchNo = 0;
		}
		executorService.execute(new BatchWorker(batchKey, failedCacheKey));
	}

	private class BatchWorker implements Runnable
	{ 
   

		private final String failedCacheKey;
		private final String batchKey;

		public BatchWorker(String batchKey, String failedCacheKey) { 
   
			this.batchKey = batchKey;
			this.failedCacheKey = failedCacheKey;
		}
		
		@Override
		public void run() { 
   
			final List<DeviceRealTimeDTO> deviceRealTimeDTOList = new ArrayList<>();
			try { 
   
				DeviceRealTimeDTO deviceRealTimeDTO = cacheService.lPop(batchKey);
				while(deviceRealTimeDTO != null) { 
   
					deviceRealTimeDTOList.add(deviceRealTimeDTO);
					deviceRealTimeDTO = cacheService.lPop(batchKey);
				}

				long timeMillis = System.currentTimeMillis();

				try { 
   
					List<DeviceRealTimeEntity> deviceRealTimeEntityList = ConvertUtils.sourceToTarget(deviceRealTimeDTOList, DeviceRealTimeEntity.class);
					deviceRealTimeService.insertBatch(deviceRealTimeEntityList);
				} finally { 
   
					cacheService.del(batchKey);
					log.info("[DB:BATCH_WORKER] 批次:" + batchKey + ",保存设备上报记录数:" + deviceRealTimeDTOList.size() + ", 耗时:" + (System.currentTimeMillis() - timeMillis) + "ms");
				}
			} catch (Exception e) { 
   
				log.warn("[DB:FAILED] 设备上报记录批量入库失败:" + e.getMessage() + ", DeviceRealTimeDTO: " + deviceRealTimeDTOList.size(), e);
				for (DeviceRealTimeDTO deviceRealTimeDTO : deviceRealTimeDTOList) { 
   
					cacheService.lPush(failedCacheKey, deviceRealTimeDTO);
				}
			}
		}
    }

	class BatchTimeoutCommitThread extends Thread { 
   

		private final String batchKey;
		private final String durationKey;
		private final String failedCacheKey;

		public BatchTimeoutCommitThread(String batchKey, String durationKey, String failedCacheKey) { 
   
			this.batchKey = batchKey;
			this.durationKey = durationKey;
			this.failedCacheKey = failedCacheKey;
			this.setName("batch-thread-" + batchKey);
		}

		public void run() { 
   
			try { 
   
				Thread.sleep(batchTimeout);
			} catch (InterruptedException e) { 
   
				log.error("[DB] 内部错误,直接提交:" + e.getMessage());
			}

			if (cacheService.exists(durationKey)) { 
   
				// 达到最大批次的超时间,执行入库逻辑
				dataStorage(durationKey, batchKey, failedCacheKey);
			}
		}

	}

}

package io.jack.service;

import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

@Component
@Scope("singleton")
public class CacheService implements InitializingBean { 
   

    private Map<String, Object> objectCache = new ConcurrentHashMap<>();

    private Map<String, AtomicLong> statCache = new ConcurrentHashMap<>();

    @Override
    public void afterPropertiesSet() { 
   
        statCache.put("terminals", new AtomicLong(0));
        statCache.put("connections", new AtomicLong(0));
    }

    public long incr(String statName) { 
   
        if (!statCache.containsKey(statName))
            statCache.put(statName, new AtomicLong(0));
        return statCache.get(statName).incrementAndGet();
    }

    public long decr(String statName) { 
   
        if (!statCache.containsKey(statName))
            statCache.put(statName, new AtomicLong(0));
        return statCache.get(statName).decrementAndGet();
    }

    public long stat(String statName) { 
   
        if (!statCache.containsKey(statName))
            statCache.put(statName, new AtomicLong(0));
        return statCache.get(statName).get();
    }

    public <T> void put(String key, T object) { 
   
        objectCache.put(key, object);
    }

    public <T> T get(String key) { 
   
        return (T) objectCache.get(key);
    }

    public void remove(String key) { 
   
        objectCache.remove(key);
    }

    public void hSet(String key, String subkey, Object value) { 
   
        synchronized (objectCache) { 
   
            HashMap<String, Object> submap = (HashMap<String, Object>) objectCache.get(key);
            if (submap == null) { 
   
                submap = new HashMap<>();
                objectCache.put(key, submap);
            }
            submap.put(subkey, value);
        }
    }

    public <T> T hGet(String key, String subkey) { 
   
        synchronized (objectCache) { 
   
            HashMap<String, Object> submap = (HashMap<String, Object>) objectCache.get(key);
            if (submap != null) { 
   
                return (T) submap.get(subkey);
            }
            return null;
        }
    }

    public boolean hExists(String key, String subkey) { 
   
        synchronized (objectCache) { 
   
            HashMap<String, Object> submap = (HashMap<String, Object>) objectCache.get(key);
            if (submap != null) { 
   
                return submap.containsKey(subkey);
            }
            return false;
        }
    }

    public void lPush(String key, Object value) { 
   
        synchronized (objectCache) { 
   
            LinkedList queue = (LinkedList) objectCache.get (key);
            if (queue == null) { 
   
                queue = new LinkedList();
                objectCache.put(key, queue);
            }
            queue.addLast(value);
        }
    }

    public <T> T lPop(String key) { 
   
        synchronized (objectCache) { 
   
            LinkedList queue = (LinkedList) objectCache.get (key);
            if (queue != null) { 
   
                if (!queue.isEmpty()) { 
   
                    return (T)queue.removeLast();
                }
                objectCache.remove(key);
            }
            return null;
        }
    }

    public void del(String key) { 
   
        objectCache.remove(key);
    }

    public boolean exists(String key) { 
   
        return objectCache.containsKey(key);
    }

    public void dump() { 
   

    }
}

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/144596.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • pycharm提示no python interpreter_pycharm failed to create virtual

    pycharm提示no python interpreter_pycharm failed to create virtual学习笔记遇到问题在学习时,用pycharm打开浏览器的时候(我用的是谷歌浏览器,别的浏览器也用这个思路解决)发现提示说找不到这个文件解决方法:打开设置(File→Settings)点击工具,浏览器(Tools→WebBrowsers)在工具那一项中,点击浏览器,再在右边的浏览器中,找到自己要用的浏览器在浏览器对应的路径中修改路径(找到该浏览器下载在本机的位置)这一步老师…

    2022年8月27日
    8
  • 黑客暴力激活成功教程必备的12大逆向工具!设置再复杂的密码也没用!

    黑客暴力激活成功教程必备的12大逆向工具!设置再复杂的密码也没用!暴力激活成功教程攻击是最流行的密码激活成功教程方法之一,然而,它不仅仅是密码激活成功教程。暴力攻击还可用于发现Web应用程序中的隐藏页面和内容,在你成功之前,这种攻击基本上是“攻击一次尝试一次”。暴力激活成功教程是最流行的密码激活成功教程方法之一,然而,它不仅仅是密码激活成功教程。暴力激活成功教程还可用于发现Web应用程序中的隐藏页面和内容,在你成功之前,这种激活成功教程基本上是“激活成功教程一次尝试一次”。这种激活成功教程有时需要更长的时间,但其成功率也会更高。在本文中…

    2022年8月22日
    8
  • SQL Server 2008 R2 详细安装图文教程

    SQL Server 2008 R2 详细安装图文教程SQLServer2008R2安装教程1、打开安装包,点击setup.exe2、选择左侧列表中的“安装”。3、点击“全新安装或向现有安装添加功能”。4、检测完成点击确定。5、选择版本和密钥,然后下一步。(因为有版权要求,在这不能给安装密钥,需要安装密钥的请私信博主)6、选择“我接受许可条款”。点击下一步7、点击“安装”。8、…

    2022年6月23日
    47
  • [c/c++]——最长回文子串「建议收藏」

    [c/c++]——最长回文子串「建议收藏」最长回文子串已经很久没有更新关于leetcode的题解了,一个是觉得太费时间,二一个目前网上也有很全面的解答,但是在写leetcode的最长回文子串时,发现很多同学的代码都很长(实际上几行就可以解决的事情),并且c++解答的代码不够完整。最关键的是在一种“马拉车”的算法卡了很久很久,今天把几种求解的方法全部都整理出来,方便大家也便于自己以后复习。ps:讲解很少,都是整理出可看性很高的源码方法…

    2022年5月2日
    53
  • C++中this指针的本质

    C++中this指针的本质一直以来对C++中的this不理解,只知道在构造函数中,如果构造函数的参数和类成员的名字一样的话,就可以用this指针来区分,如:this->a=a;一直以来都有这个疑问:this究竟是什么?我们明明没有定义这个this,但是我们可以直接用而编译器不会报错。今天来解决这个疑问。从刚才的代码中,我们用”this->”而不是”this.”就说明this是一个指针,而我们知道,在C、C++中,指针

    2022年5月16日
    47
  • torchvision – ImportError: No module named torchvision

    torchvision – ImportError: No module named torchvisiontorchvision-ImportError:NomodulenamedtorchvisionimageandvideodatasetsandmodelsfortorchdeeplearningThetorchvisionpackageconsistsofpopulardatasets,modelarchitectures,andcommonimagetransformationsforcomputervision.1.Installat

    2022年6月24日
    36

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号