import redis.clients.jedis.Pipeline; //導入方法依賴的package包/類
/
* Store a trigger in redis
* @param trigger the trigger to be stored
* @param replaceExisting true if an existing trigger with the same identity should be replaced
* @param jedis a thread-safe Redis connection
* @throws JobPersistenceException
* @throws ObjectAlreadyExistsException
*/
@Override
public void storeTrigger(OperableTrigger trigger, boolean replaceExisting, Jedis jedis) throws JobPersistenceException {
final String triggerHashKey = redisSchema.triggerHashKey(trigger.getKey());
final String triggerGroupSetKey = redisSchema.triggerGroupSetKey(trigger.getKey());
final String jobTriggerSetKey = redisSchema.jobTriggersSetKey(trigger.getJobKey());
if(!(trigger instanceof SimpleTrigger) && !(trigger instanceof CronTrigger)){
throw new UnsupportedOperationException(“Only SimpleTrigger and CronTrigger are supported.”);
}
final boolean exists = jedis.exists(triggerHashKey);
if(exists && !replaceExisting){
throw new ObjectAlreadyExistsException(trigger);
}
Map triggerMap = mapper.convertValue(trigger, new TypeReference>() {});
triggerMap.put(TRIGGER_CLASS, trigger.getClass().getName());
Pipeline pipe = jedis.pipelined();
pipe.hmset(triggerHashKey, triggerMap);
pipe.sadd(redisSchema.triggersSet(), triggerHashKey);
pipe.sadd(redisSchema.triggerGroupsSet(), triggerGroupSetKey);
pipe.sadd(triggerGroupSetKey, triggerHashKey);
pipe.sadd(jobTriggerSetKey, triggerHashKey);
if(trigger.getCalendarName() != null && !trigger.getCalendarName().isEmpty()){
final String calendarTriggersSetKey = redisSchema.calendarTriggersSetKey(trigger.getCalendarName());
pipe.sadd(calendarTriggersSetKey, triggerHashKey);
}
if (trigger.getJobDataMap() != null && !trigger.getJobDataMap().isEmpty()) {
final String triggerDataMapHashKey = redisSchema.triggerDataMapHashKey(trigger.getKey());
pipe.hmset(triggerDataMapHashKey, getStringDataMap(trigger.getJobDataMap()));
}
pipe.sync();
if(exists){
// We’re overwriting a previously stored instance of this trigger, so clear any existing trigger state.
unsetTriggerState(triggerHashKey, jedis);
}
pipe = jedis.pipelined();
Response triggerPausedResponse = pipe.sismember(redisSchema.pausedTriggerGroupsSet(), triggerGroupSetKey);
Response jobPausedResponse = pipe.sismember(redisSchema.pausedJobGroupsSet(), redisSchema.jobGroupSetKey(trigger.getJobKey()));
pipe.sync();
final String jobHashKey = redisSchema.jobHashKey(trigger.getJobKey());
final long nextFireTime = trigger.getNextFireTime() != null ? trigger.getNextFireTime().getTime() : -1;
if (triggerPausedResponse.get() || jobPausedResponse.get()){
if (jedis.sismember(redisSchema.blockedJobsSet(), jobHashKey)) {
setTriggerState(RedisTriggerState.PAUSED_BLOCKED, (double) nextFireTime, triggerHashKey, jedis);
} else {
setTriggerState(RedisTriggerState.PAUSED, (double) nextFireTime, triggerHashKey, jedis);
}
} else if(trigger.getNextFireTime() != null){
if (jedis.sismember(redisSchema.blockedJobsSet(), jobHashKey)) {
setTriggerState(RedisTriggerState.BLOCKED, nextFireTime, triggerHashKey, jedis);
} else {
setTriggerState(RedisTriggerState.WAITING, (double) trigger.getNextFireTime().getTime(), triggerHashKey, jedis);
}
}
}
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/220803.html原文链接:https://javaforall.net
