001package co.codewizards.cloudstore.local; 002 003import static co.codewizards.cloudstore.core.util.AssertUtil.*; 004 005import java.util.HashMap; 006import java.util.Map; 007import java.util.concurrent.CopyOnWriteArrayList; 008import java.util.concurrent.TimeUnit; 009import java.util.concurrent.locks.Lock; 010 011import javax.jdo.PersistenceManager; 012import javax.jdo.PersistenceManagerFactory; 013import javax.jdo.Transaction; 014 015import org.slf4j.Logger; 016import org.slf4j.LoggerFactory; 017 018import co.codewizards.cloudstore.core.context.ExtensibleContextSupport; 019import co.codewizards.cloudstore.core.io.TimeoutException; 020import co.codewizards.cloudstore.core.repo.local.ContextWithLocalRepoManager; 021import co.codewizards.cloudstore.core.repo.local.LocalRepoManager; 022import co.codewizards.cloudstore.core.repo.local.LocalRepoTransaction; 023import co.codewizards.cloudstore.core.repo.local.LocalRepoTransactionListenerRegistry; 024import co.codewizards.cloudstore.core.repo.local.LocalRepoTransactionPostCloseEvent; 025import co.codewizards.cloudstore.core.repo.local.LocalRepoTransactionPostCloseListener; 026import co.codewizards.cloudstore.core.repo.local.LocalRepoTransactionPreCloseEvent; 027import co.codewizards.cloudstore.core.repo.local.LocalRepoTransactionPreCloseListener; 028import co.codewizards.cloudstore.core.util.AssertUtil; 029import co.codewizards.cloudstore.local.persistence.Dao; 030import co.codewizards.cloudstore.local.persistence.LocalRepository; 031import co.codewizards.cloudstore.local.persistence.LocalRepositoryDao; 032 033public class LocalRepoTransactionImpl implements LocalRepoTransaction, ContextWithLocalRepoManager, ContextWithPersistenceManager { 034 private static final Logger logger = LoggerFactory.getLogger(LocalRepoTransactionImpl.class); 035 036 public static final long LOCK_TIMEOUT = 5 * 60 * 1000; 037 038 private final LocalRepoManager localRepoManager; 039 private final PersistenceManagerFactory persistenceManagerFactory; 040 private final boolean write; 041 private PersistenceManager persistenceManager; 042 private Transaction jdoTransaction; 043 private final Lock lock; 044 private long localRevision = -1; 045 private final Map<Class<?>, Object> daoClass2Dao = new HashMap<>(); 046 private final ExtensibleContextSupport extensibleContextSupport = new ExtensibleContextSupport(); 047 048 private final LocalRepoTransactionListenerRegistry listenerRegistry = new LocalRepoTransactionListenerRegistry(this); 049 050 private final CopyOnWriteArrayList<LocalRepoTransactionPreCloseListener> preCloseListeners = new CopyOnWriteArrayList<>(); 051 private final CopyOnWriteArrayList<LocalRepoTransactionPostCloseListener> postCloseListeners = new CopyOnWriteArrayList<>(); 052 053 public LocalRepoTransactionImpl(final LocalRepoManagerImpl localRepoManager, final boolean write) { 054 this.localRepoManager = AssertUtil.assertNotNull(localRepoManager, "localRepoManager"); 055 this.persistenceManagerFactory = AssertUtil.assertNotNull(localRepoManager.getPersistenceManagerFactory(), "localRepoManager.persistenceManagerFactory"); 056 this.lock = localRepoManager.getLock(); 057 this.write = write; 058 begin(); 059 } 060 061 private void begin() { 062 boolean locked = false; 063 try { 064 locked = lock.tryLock(LOCK_TIMEOUT, TimeUnit.MILLISECONDS); 065 } catch (InterruptedException e) { 066 // ignore 067 } 068 if (! locked) 069 throw new TimeoutException(String.format("Starting %s transaction on '%s' within timeout (%s ms) failed! ", write ? "write" : "read", localRepoManager.getLocalRoot(), LOCK_TIMEOUT)); 070 try { 071 if (isActive()) 072 throw new IllegalStateException("Transaction is already active!"); 073 074 lockIfWrite(); 075 076 persistenceManager = persistenceManagerFactory.getPersistenceManager(); 077 jdoTransaction = persistenceManager.currentTransaction(); 078 jdoTransaction.begin(); 079 listenerRegistry.onBegin(); 080 } finally { 081 lock.unlock(); 082 } 083 } 084 085 private final void lockIfWrite() { 086 if (write) 087 lock.lock(); // UNbalance lock to keep it after method returns! 088 } 089 090 private final void unlockIfWrite() { 091 if (write) 092 lock.unlock(); // UNbalance unlock to counter the unbalanced lock in lockIfWrite(). 093 } 094 095 @Override 096 public void commit() { 097 lock.lock(); 098 try { 099 if (!isActive()) 100 throw new IllegalStateException("Transaction is not active!"); 101 102 listenerRegistry.onCommit(); 103 firePreCloseListeners(true); 104 daoClass2Dao.clear(); 105 jdoTransaction.commit(); 106 persistenceManager.close(); 107 jdoTransaction = null; 108 persistenceManager = null; 109 localRevision = -1; 110 111 unlockIfWrite(); 112 } finally { 113 lock.unlock(); 114 } 115 firePostCloseListeners(true); 116 } 117 118 @Override 119 public boolean isActive() { 120 lock.lock(); 121 try { 122 return jdoTransaction != null && jdoTransaction.isActive(); 123 } finally { 124 lock.unlock(); 125 } 126 } 127 128 @Override 129 public void rollback() { 130 _rollback(); 131 firePostCloseListeners(false); 132 } 133 134 @Override 135 public void rollbackIfActive() { 136 boolean active; 137 lock.lock(); 138 try { 139 active = isActive(); 140 if (active) { 141 _rollback(); 142 } 143 } finally { 144 lock.unlock(); 145 } 146 if (active) { 147 firePostCloseListeners(false); 148 } 149 } 150 151 protected void _rollback() { 152 lock.lock(); 153 try { 154 if (!isActive()) 155 throw new IllegalStateException("Transaction is not active!"); 156 157 listenerRegistry.onRollback(); 158 firePreCloseListeners(false); 159 daoClass2Dao.clear(); 160 jdoTransaction.rollback(); 161 persistenceManager.close(); 162 jdoTransaction = null; 163 persistenceManager = null; 164 localRevision = -1; 165 166 unlockIfWrite(); 167 } finally { 168 lock.unlock(); 169 } 170 } 171 172 @Override 173 public void close() { 174 rollbackIfActive(); 175 } 176 177 @Override 178 public PersistenceManager getPersistenceManager() { 179 if (!isActive()) { 180 throw new IllegalStateException("Transaction is not active!"); 181 } 182 return persistenceManager; 183 } 184 185 @Override 186 public long getLocalRevision() { 187 if (localRevision < 0) { 188 if (!write) 189 throw new IllegalStateException("This is a read-only transaction!"); 190 191 jdoTransaction.setSerializeRead(true); 192 final LocalRepository lr = getDao(LocalRepositoryDao.class).getLocalRepositoryOrFail(); 193 jdoTransaction.setSerializeRead(null); 194 localRevision = lr.getRevision() + 1; 195 lr.setRevision(localRevision); 196 persistenceManager.flush(); 197 } 198 return localRevision; 199 } 200 201 @Override 202 public LocalRepoManager getLocalRepoManager() { 203 return localRepoManager; 204 } 205 206 @Override 207 public <D> D getDao(final Class<D> daoClass) { 208 assertNotNull(daoClass, "daoClass"); 209 210 @SuppressWarnings("unchecked") 211 D dao = (D) daoClass2Dao.get(daoClass); 212 213 if (dao == null) { 214 final PersistenceManager pm = getPersistenceManager(); 215 try { 216 dao = daoClass.newInstance(); 217 } catch (final InstantiationException e) { 218 throw new RuntimeException(e); 219 } catch (final IllegalAccessException e) { 220 throw new RuntimeException(e); 221 } 222 223 if (!(dao instanceof Dao)) 224 throw new IllegalStateException(String.format("dao class %s does not extend Dao!", daoClass.getName())); 225 226 ((Dao<?, ?>)dao).setPersistenceManager(pm); 227 ((Dao<?, ?>)dao).setDaoProvider(this); 228 229 daoClass2Dao.put(daoClass, dao); 230 } 231 return dao; 232 } 233 234 @Override 235 public void flush() { 236 final PersistenceManager pm = getPersistenceManager(); 237 pm.flush(); 238 } 239 240 @Override 241 public void setContextObject(final Object object) { 242 extensibleContextSupport.setContextObject(object); 243 } 244 245 @Override 246 public <T> T getContextObject(final Class<T> clazz) { 247 return extensibleContextSupport.getContextObject(clazz); 248 } 249 250 @Override 251 public void removeContextObject(Object object) { 252 extensibleContextSupport.removeContextObject(object); 253 } 254 255 @Override 256 public void removeContextObject(Class<?> clazz) { 257 extensibleContextSupport.removeContextObject(clazz); 258 } 259 260 @Override 261 public void addPreCloseListener(LocalRepoTransactionPreCloseListener listener) { 262 preCloseListeners.add(assertNotNull(listener, "listener")); 263 } 264 @Override 265 public void addPostCloseListener(LocalRepoTransactionPostCloseListener listener) { 266 postCloseListeners.add(assertNotNull(listener, "listener")); 267 } 268 269 protected void firePreCloseListeners(final boolean commit) { 270 LocalRepoTransactionPreCloseEvent event = null; 271 for (final LocalRepoTransactionPreCloseListener listener : preCloseListeners) { 272 try { 273 if (event == null) 274 event = new LocalRepoTransactionPreCloseEvent(this); 275 276 if (commit) 277 listener.preCommit(event); 278 else 279 listener.preRollback(event); 280 } catch (Exception x) { 281 logger.error("firePreCloseListeners: " + x, x); 282 } 283 } 284 } 285 protected void firePostCloseListeners(final boolean commit) { 286 LocalRepoTransactionPostCloseEvent event = null; 287 for (final LocalRepoTransactionPostCloseListener listener : postCloseListeners) { 288 try { 289 if (event == null) 290 event = new LocalRepoTransactionPostCloseEvent(this); 291 292 if (commit) 293 listener.postCommit(event); 294 else 295 listener.postRollback(event); 296 } catch (Exception x) { 297 logger.error("firePostCloseListeners: " + x, x); 298 } 299 } 300 } 301}