001package co.codewizards.cloudstore.local;
002
003import static co.codewizards.cloudstore.core.util.AssertUtil.*;
004
005import java.util.HashMap;
006import java.util.Map;
007import java.util.concurrent.CopyOnWriteArrayList;
008import java.util.concurrent.TimeUnit;
009import java.util.concurrent.locks.Lock;
010
011import javax.jdo.PersistenceManager;
012import javax.jdo.PersistenceManagerFactory;
013import javax.jdo.Transaction;
014
015import org.slf4j.Logger;
016import org.slf4j.LoggerFactory;
017
018import co.codewizards.cloudstore.core.context.ExtensibleContextSupport;
019import co.codewizards.cloudstore.core.io.TimeoutException;
020import co.codewizards.cloudstore.core.repo.local.ContextWithLocalRepoManager;
021import co.codewizards.cloudstore.core.repo.local.LocalRepoManager;
022import co.codewizards.cloudstore.core.repo.local.LocalRepoTransaction;
023import co.codewizards.cloudstore.core.repo.local.LocalRepoTransactionListenerRegistry;
024import co.codewizards.cloudstore.core.repo.local.LocalRepoTransactionPostCloseEvent;
025import co.codewizards.cloudstore.core.repo.local.LocalRepoTransactionPostCloseListener;
026import co.codewizards.cloudstore.core.repo.local.LocalRepoTransactionPreCloseEvent;
027import co.codewizards.cloudstore.core.repo.local.LocalRepoTransactionPreCloseListener;
028import co.codewizards.cloudstore.core.util.AssertUtil;
029import co.codewizards.cloudstore.local.persistence.Dao;
030import co.codewizards.cloudstore.local.persistence.LocalRepository;
031import co.codewizards.cloudstore.local.persistence.LocalRepositoryDao;
032
033public class LocalRepoTransactionImpl implements LocalRepoTransaction, ContextWithLocalRepoManager, ContextWithPersistenceManager {
034        private static final Logger logger = LoggerFactory.getLogger(LocalRepoTransactionImpl.class);
035
036        public static final long LOCK_TIMEOUT = 5 * 60 * 1000;
037
038        private final LocalRepoManager localRepoManager;
039        private final PersistenceManagerFactory persistenceManagerFactory;
040        private final boolean write;
041        private PersistenceManager persistenceManager;
042        private Transaction jdoTransaction;
043        private final Lock lock;
044        private long localRevision = -1;
045        private final Map<Class<?>, Object> daoClass2Dao = new HashMap<>();
046        private final ExtensibleContextSupport extensibleContextSupport = new ExtensibleContextSupport();
047
048        private final LocalRepoTransactionListenerRegistry listenerRegistry = new LocalRepoTransactionListenerRegistry(this);
049
050        private final CopyOnWriteArrayList<LocalRepoTransactionPreCloseListener> preCloseListeners = new CopyOnWriteArrayList<>();
051        private final CopyOnWriteArrayList<LocalRepoTransactionPostCloseListener> postCloseListeners = new CopyOnWriteArrayList<>();
052
053        public LocalRepoTransactionImpl(final LocalRepoManagerImpl localRepoManager, final boolean write) {
054                this.localRepoManager = AssertUtil.assertNotNull(localRepoManager, "localRepoManager");
055                this.persistenceManagerFactory = AssertUtil.assertNotNull(localRepoManager.getPersistenceManagerFactory(), "localRepoManager.persistenceManagerFactory");
056                this.lock = localRepoManager.getLock();
057                this.write = write;
058                begin();
059        }
060
061        private void begin() {
062                boolean locked = false;
063                try {
064                        locked = lock.tryLock(LOCK_TIMEOUT, TimeUnit.MILLISECONDS);
065                } catch (InterruptedException e) {
066                        // ignore
067                }
068                if (! locked)
069                        throw new TimeoutException(String.format("Starting %s transaction on '%s' within timeout (%s ms) failed! ", write ? "write" : "read", localRepoManager.getLocalRoot(), LOCK_TIMEOUT));
070                try {
071                        if (isActive())
072                                throw new IllegalStateException("Transaction is already active!");
073
074                        lockIfWrite();
075
076                        persistenceManager = persistenceManagerFactory.getPersistenceManager();
077                        jdoTransaction = persistenceManager.currentTransaction();
078                        jdoTransaction.begin();
079                        listenerRegistry.onBegin();
080                } finally {
081                        lock.unlock();
082                }
083        }
084
085        private final void lockIfWrite() {
086                if (write)
087                        lock.lock(); // UNbalance lock to keep it after method returns!
088        }
089
090        private final void unlockIfWrite() {
091                if (write)
092                        lock.unlock(); // UNbalance unlock to counter the unbalanced lock in lockIfWrite().
093        }
094
095        @Override
096        public void commit() {
097                lock.lock();
098                try {
099                        if (!isActive())
100                                throw new IllegalStateException("Transaction is not active!");
101
102                        listenerRegistry.onCommit();
103                        firePreCloseListeners(true);
104                        daoClass2Dao.clear();
105                        jdoTransaction.commit();
106                        persistenceManager.close();
107                        jdoTransaction = null;
108                        persistenceManager = null;
109                        localRevision = -1;
110
111                        unlockIfWrite();
112                } finally {
113                        lock.unlock();
114                }
115                firePostCloseListeners(true);
116        }
117
118        @Override
119        public boolean isActive() {
120                lock.lock();
121                try {
122                        return jdoTransaction != null && jdoTransaction.isActive();
123                } finally {
124                        lock.unlock();
125                }
126        }
127
128        @Override
129        public void rollback() {
130                _rollback();
131                firePostCloseListeners(false);
132        }
133
134        @Override
135        public void rollbackIfActive() {
136                boolean active;
137                lock.lock();
138                try {
139                        active = isActive();
140                        if (active) {
141                                _rollback();
142                        }
143                } finally {
144                        lock.unlock();
145                }
146                if (active) {
147                        firePostCloseListeners(false);
148                }
149        }
150
151        protected void _rollback() {
152                lock.lock();
153                try {
154                        if (!isActive())
155                                throw new IllegalStateException("Transaction is not active!");
156
157                        listenerRegistry.onRollback();
158                        firePreCloseListeners(false);
159                        daoClass2Dao.clear();
160                        jdoTransaction.rollback();
161                        persistenceManager.close();
162                        jdoTransaction = null;
163                        persistenceManager = null;
164                        localRevision = -1;
165
166                        unlockIfWrite();
167                } finally {
168                        lock.unlock();
169                }
170        }
171
172        @Override
173        public void close() {
174                rollbackIfActive();
175        }
176
177        @Override
178        public PersistenceManager getPersistenceManager() {
179                if (!isActive()) {
180                        throw new IllegalStateException("Transaction is not active!");
181                }
182                return persistenceManager;
183        }
184
185        @Override
186        public long getLocalRevision() {
187                if (localRevision < 0) {
188                        if (!write)
189                                throw new IllegalStateException("This is a read-only transaction!");
190
191                        jdoTransaction.setSerializeRead(true);
192                        final LocalRepository lr = getDao(LocalRepositoryDao.class).getLocalRepositoryOrFail();
193                        jdoTransaction.setSerializeRead(null);
194                        localRevision = lr.getRevision() + 1;
195                        lr.setRevision(localRevision);
196                        persistenceManager.flush();
197                }
198                return localRevision;
199        }
200
201        @Override
202        public LocalRepoManager getLocalRepoManager() {
203                return localRepoManager;
204        }
205
206        @Override
207        public <D> D getDao(final Class<D> daoClass) {
208                assertNotNull(daoClass, "daoClass");
209
210                @SuppressWarnings("unchecked")
211                D dao = (D) daoClass2Dao.get(daoClass);
212
213                if (dao == null) {
214                        final PersistenceManager pm = getPersistenceManager();
215                        try {
216                                dao = daoClass.newInstance();
217                        } catch (final InstantiationException e) {
218                                throw new RuntimeException(e);
219                        } catch (final IllegalAccessException e) {
220                                throw new RuntimeException(e);
221                        }
222
223                        if (!(dao instanceof Dao))
224                                throw new IllegalStateException(String.format("dao class %s does not extend Dao!", daoClass.getName()));
225
226                        ((Dao<?, ?>)dao).setPersistenceManager(pm);
227                        ((Dao<?, ?>)dao).setDaoProvider(this);
228
229                        daoClass2Dao.put(daoClass, dao);
230                }
231                return dao;
232        }
233
234        @Override
235        public void flush() {
236                final PersistenceManager pm = getPersistenceManager();
237                pm.flush();
238        }
239
240        @Override
241        public void setContextObject(final Object object) {
242                extensibleContextSupport.setContextObject(object);
243        }
244
245        @Override
246        public <T> T getContextObject(final Class<T> clazz) {
247                return extensibleContextSupport.getContextObject(clazz);
248        }
249
250        @Override
251        public void removeContextObject(Object object) {
252                extensibleContextSupport.removeContextObject(object);
253        }
254
255        @Override
256        public void removeContextObject(Class<?> clazz) {
257                extensibleContextSupport.removeContextObject(clazz);
258        }
259
260        @Override
261        public void addPreCloseListener(LocalRepoTransactionPreCloseListener listener) {
262                preCloseListeners.add(assertNotNull(listener, "listener"));
263        }
264        @Override
265        public void addPostCloseListener(LocalRepoTransactionPostCloseListener listener) {
266                postCloseListeners.add(assertNotNull(listener, "listener"));
267        }
268
269        protected void firePreCloseListeners(final boolean commit) {
270                LocalRepoTransactionPreCloseEvent event = null;
271                for (final LocalRepoTransactionPreCloseListener listener : preCloseListeners) {
272                        try {
273                                if (event == null)
274                                        event = new LocalRepoTransactionPreCloseEvent(this);
275
276                                if (commit)
277                                        listener.preCommit(event);
278                                else
279                                        listener.preRollback(event);
280                        } catch (Exception x) {
281                                logger.error("firePreCloseListeners: " + x, x);
282                        }
283                }
284        }
285        protected void firePostCloseListeners(final boolean commit) {
286                LocalRepoTransactionPostCloseEvent event = null;
287                for (final LocalRepoTransactionPostCloseListener listener : postCloseListeners) {
288                        try {
289                                if (event == null)
290                                        event = new LocalRepoTransactionPostCloseEvent(this);
291
292                                if (commit)
293                                        listener.postCommit(event);
294                                else
295                                        listener.postRollback(event);
296                        } catch (Exception x) {
297                                logger.error("firePostCloseListeners: " + x, x);
298                        }
299                }
300        }
301}