+------------+ 1 +-------------+ N +------------+ N +-------+ 1 +----------------+ | PostOffice |-------> |PagingManager|-------> |PagingStore | ------> | Page | ------> | SequentialFile | +------------+ +-------------+ +------------+ +-------+ +----------------+ | 1 ^ | | | | | | 1 | N +-------------------+ +--------> | DestinationAdress | +-------------------+
The PagingManager is the class (single object) that will work closely with the PostOffice storing and removing messages out of the pageStore. (We call these operations as page and depage).
The PostOffice will aways inform the PagingManager when a message was placed into memory by routing. We will use a reference count to each message and when the refCount=0 during acknowledgement we inform the PagingManager the message was done. (Of course we need to compensate this on transaction.rollback where we need to place the counters and size back)
The PagingManager will start depaging threads as needed (depending on watermarks configured on the queue). The address will stay in paging mode until all the pages are consumed (What could take a while under high load, so because of that the queue sizes will need to be properly configured).
Depage is a single thread operation, and each destination will have only 0 or 1 depage threads being executed. (Actually they are pooled)
The PagingManager is also responsible to organize (sort and initialize) the PagingStores that are organized by destination.
We aways have one PagingStore per destination address.
If inPageMode (set by startPaging method), the method page on PagingStore will write the message to the current page, or add a new page if the maxSize of a page file was reached.
PagingStore will be in pageMode until depage is called multiple times until you don't have any more pending pages.
A depage method will aways return a full page. We never depage just part of a page as we need to guarantee messages are consumed atomically of Paging.
Also: note that depage will return the page, but you won't actually read the page inside depage. This is done to maximize the throuput of PagingStore where we could release the locks and do the reading in another space.
Locking schema on PagingStore
You could have several threads calling page, and that method has the following threading model:
Allocating sizes from paging is done inside a synchronized block. (controlled by a semaphore).
If the size < maxSize a readLock is acquired and the synchronized block is released. (So you could have multiple thread writing to the file at once).
If the maxSize was achived we wait for all the writes to finish (by acquiring a writeLock), and we move to the next file.
There are testcases validating this locking schema and it currently shows it is possible to have multiple threads writing to a single currentPage file at the same time.
Each page is represented by a file.
PagingStore will write to a Page as long as the size of the Page < MaxConfiguredSize for pages.
The default page-size whould be 10M.
Page has methods to read of the file, and delete the page.
Transactions on Paging
Paging itself is not transactional, so it needs to trust in another transactional store to control transactions.
There is a simple schema currently implemented:
If a page operation is being done transactionally, during the page operations we will:
A- Record the transactionID along with the message
B- When transactionImpl.commit is called we sync the pages on disk and we record the transactionID used on a transactional storage. (Through the PersistentManager. Currently the journal, in the future the DBPersistentManager)
C- Read the messages and TransactionIDs (recorded on B)
D- Verify if the transactionID exists on the storage added on B. If we don't have that record it means that B failed, hence we need to reject the record.
In D, we also delete the information added on B as soon as numberOfReads==numberOfWrites. (For that we keep a numberOfMessages recorded along with the record).
The record in B is called PageTransactionInfo, and it contains the TransactionID (B) and numberOfMessagesWritten.
(There is also an internal field used by the journal called recordID which will be the primary key of the record on the Journal or database).
When depage is called, we need to do it transactionally, i.e. if the server fails during depaging we need to come back at the same place we were before starting the operation.
We record the ID of the latest page being processed, so we will aways reject any pages being reprocessed. (we are able to determine if the server crashed after commit and before deleting the page file)