/*
* JBoss, Home of Professional Open Source.
* See the COPYRIGHT.txt file distributed with this work for information
* regarding copyright ownership. Some portions may be licensed
* to Red Hat, Inc. under one or more contributor license agreements.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301 USA.
*/
package org.teiid.common.buffer.impl;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.lang.ref.PhantomReference;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.SoftReference;
import java.lang.ref.WeakReference;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.teiid.client.BatchSerializer;
import org.teiid.client.ResizingArrayList;
import org.teiid.client.util.ExceptionUtil;
import org.teiid.common.buffer.*;
import org.teiid.common.buffer.AutoCleanupUtil.Removable;
import org.teiid.common.buffer.LobManager.ReferenceMode;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidRuntimeException;
import org.teiid.core.types.DataTypeManager;
import org.teiid.core.types.DataTypeManager.WeakReferenceHashedValueCache;
import org.teiid.core.types.Streamable;
import org.teiid.core.util.Assertion;
import org.teiid.dqp.internal.process.DQPConfiguration;
import org.teiid.dqp.internal.process.RequestWorkItem;
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
import org.teiid.logging.MessageLevel;
import org.teiid.query.QueryPlugin;
import org.teiid.query.ReplicatedObject;
import org.teiid.query.processor.relational.ListNestedSortComparator;
import org.teiid.query.sql.symbol.ElementSymbol;
import org.teiid.query.sql.symbol.Expression;
import org.teiid.query.util.CommandContext;
/**
*
Default implementation of BufferManager.
* Responsible for creating/tracking TupleBuffers and providing access to the StorageManager.
*
*
* TODO: add detection of pinned batches to prevent unnecessary purging of non-persistent batches
* - this is not necessary for already persistent batches, since we hold a weak reference
*
* TODO: add a pre-fetch for tuplebuffers or some built-in correlation logic with the queue.
*/
public class BufferManagerImpl implements BufferManager, ReplicatedObject {
private static final int SYSTEM_OVERHEAD_MEGS = 150;
/**
* Asynch cleaner attempts to age out old entries and to reduce the memory size when
* little is reserved.
*/
private static final int MAX_READ_AGE = 1<<17;
private static final class Cleaner extends TimerTask {
WeakReference bufferRef;
public Cleaner(BufferManagerImpl bufferManagerImpl) {
this.bufferRef = new WeakReference(bufferManagerImpl);
}
@Override
public void run() {
while (true) {
BufferManagerImpl impl = this.bufferRef.get();
if (impl == null) {
this.cancel();
return;
}
impl.cleaning.set(true);
try {
long evicted = impl.doEvictions(impl.maxProcessingBytes, false, impl.initialEvictionQueue);
if (evicted != 0) {
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Asynch eviction run", evicted, impl.reserveBatchBytes.get(), impl.maxReserveBytes, impl.activeBatchBytes.get()); //$NON-NLS-1$
}
LogManager.logInfo(LogConstants.CTX_BUFFER_MGR, "Modified - Asynch eviction run " + evicted + " " + impl.reserveBatchBytes.get() + " " + impl.maxReserveBytes + " " + impl.activeBatchBytes.get());
continue;
}
} catch (Throwable t) {
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, t, "Exception during cleaning run"); //$NON-NLS-1$
}
synchronized (this) {
impl.cleaning.set(false);
try {
this.wait(30000);
} catch (InterruptedException e) {
break;
}
}
}
}
}
private final class Remover implements Removable {
private Long id;
private AtomicBoolean prefersMemory;
public Remover(Long id, AtomicBoolean prefersMemory) {
this.id = id;
this.prefersMemory = prefersMemory;
}
@Override
public void remove() {
removeCacheGroup(id, prefersMemory.get());
}
}
/**
* This estimate is based upon adding the value to 2/3 maps and having CacheEntry/PhysicalInfo keys
*/
private static final long BATCH_OVERHEAD = 128;
final class BatchManagerImpl implements BatchManager, Serializer>> {
final Long id;
SizeUtility sizeUtility;
private WeakReference ref = new WeakReference(this);
private PhantomReference cleanup;
AtomicBoolean prefersMemory = new AtomicBoolean();
String[] types;
private LobManager lobManager;
private long totalSize;
private long rowsSampled;
private BatchManagerImpl(Long newID, Class>[] types) {
this.id = newID;
this.sizeUtility = new SizeUtility(types);
this.types = new String[types.length];
for (int i = 0; i < types.length; i++) {
this.types[i] = DataTypeManager.getDataTypeName(types[i]);
}
}
@Override
public Long getId() {
return id;
}
public void setLobManager(LobManager lobManager) {
this.lobManager = lobManager;
}
@Override
public String[] getTypes() {
return types;
}
@Override
public boolean prefersMemory() {
return prefersMemory.get();
}
@Override
public void setPrefersMemory(boolean prefers) {
//TODO: it's only expected to move from not preferring to preferring
this.prefersMemory.set(prefers);
}
@Override
public boolean useSoftCache() {
return prefersMemory.get();
}
@Override
public Reference extends BatchManager> getBatchManagerReference() {
return ref;
}
@Override
public Long createManagedBatch(List extends List>> batch,
Long previous, boolean removeOld)
throws TeiidComponentException {
if (cleanup == null) {
cache.createCacheGroup(id);
cleanup = AutoCleanupUtil.setCleanupReference(this, new Remover(id, prefersMemory));
}
int sizeEstimate = getSizeEstimate(batch);
Long oid = batchAdded.getAndIncrement();
CacheEntry old = null;
if (previous != null) {
if (removeOld) {
old = BufferManagerImpl.this.remove(id, previous, prefersMemory.get());
} else {
old = fastGet(previous, prefersMemory.get(), true);
}
} else {
totalSize += sizeEstimate;
rowsSampled += batch.size();
}
CacheKey key = new CacheKey(oid, (int)readAttempts.get(), old!=null?old.getKey().getOrderingValue():0);
CacheEntry ce = new CacheEntry(key, sizeEstimate, batch, this.ref, false);
if (!cache.addToCacheGroup(id, ce.getId())) {
this.remove();
throw new TeiidComponentException(QueryPlugin.Event.TEIID31138, QueryPlugin.Util.gs(QueryPlugin.Event.TEIID31138, id));
}
overheadBytes.addAndGet(BATCH_OVERHEAD);
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Add batch to BufferManager", this.id, ce.getId(), "with size estimate", ce.getSizeEstimate()); //$NON-NLS-1$ //$NON-NLS-2$
}
addMemoryEntry(ce, true);
return oid;
}
@Override
public List extends List>> deserialize(ObjectInput ois)
throws IOException, ClassNotFoundException {
List extends List>> batch = BatchSerializer.readBatch(ois, types);
if (lobManager != null) {
for (int i = batch.size() - 1; i >= 0; i--) {
try {
lobManager.updateReferences(batch.get(i), ReferenceMode.ATTACH);
} catch (TeiidComponentException e) {
throw new TeiidRuntimeException(QueryPlugin.Event.TEIID30052, e);
}
}
}
return batch;
}
@Override
public void serialize(List extends List>> obj,
ObjectOutput oos) throws IOException {
ResizingArrayList> list = null;
if (obj instanceof ResizingArrayList>) {
list = (ResizingArrayList>)obj;
}
try {
//it's expected that the containing structure has updated the lob manager
BatchSerializer.writeBatch(oos, types, obj);
} catch (RuntimeException e) {
if (ExceptionUtil.getExceptionOfType(e, ClassCastException.class) != null) {
throw e;
}
//there is a chance of a concurrent persist while modifying
//in which case we want to swallow this exception
if (list == null) {
throw e;
}
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, e, "Possible Concurrent Modification", id); //$NON-NLS-1$
}
}
public int getSizeEstimate(List extends List>> obj) {
return (int) Math.max(1, sizeUtility.getBatchSize(DataTypeManager.isValueCacheEnabled(), obj));
}
@SuppressWarnings("unchecked")
@Override
public List> getBatch(Long batch, boolean retain)
throws TeiidComponentException {
cleanSoftReferences();
long reads = readAttempts.incrementAndGet();
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, id, "getting batch", batch, "total reads", reads, "reference hits", referenceHit.get()); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
}
CacheEntry ce = fastGet(batch, prefersMemory.get(), retain);
if (ce != null) {
return (List>)(!retain?ce.nullOut():ce.getObject());
}
//obtain a granular lock to prevent double memory loading
Object o = cache.lockForLoad(batch, this);
try {
ce = fastGet(batch, prefersMemory.get(), retain);
if (ce != null) {
return (List>)(!retain?ce.nullOut():ce.getObject());
}
long count = readCount.incrementAndGet();
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, id, "reading batch", batch, "from storage, total reads:", count); //$NON-NLS-1$ //$NON-NLS-2$
}
ce = cache.get(o, batch, this.ref);
if (ce == null) {
throw new AssertionError("Batch not found in storage " + batch); //$NON-NLS-1$
}
if (!retain) {
removeFromCache(this.id, batch);
persistBatchReferences(ce.getSizeEstimate());
} else {
addMemoryEntry(ce, false);
}
} finally {
cache.unlockForLoad(o);
}
return (List>)ce.getObject();
}
@Override
public void remove(Long batch) {
BufferManagerImpl.this.remove(id, batch, prefersMemory.get());
}
@Override
public void remove() {
if (cleanup != null) {
removeCacheGroup(id, prefersMemory.get());
AutoCleanupUtil.removeCleanupReference(cleanup);
cleanup = null;
}
}
@Override
public String toString() {
return id.toString();
}
@Override
public int getRowSizeEstimate() {
if (rowsSampled == 0) {
return 0;
}
return (int)(totalSize/rowsSampled);
}
}
private static class BatchSoftReference extends SoftReference {
private int sizeEstimate;
private Long key;
public BatchSoftReference(CacheEntry referent,
ReferenceQueue super CacheEntry> q, int sizeEstimate) {
super(referent, q);
this.sizeEstimate = sizeEstimate;
this.key = referent.getId();
}
}
static final int CONCURRENCY_LEVEL = 32; //TODO: make this configurable since it is roughly the same as max active plans
private static final int TARGET_BYTES_PER_ROW = 1 << 11; //2k bytes per row
private static ReferenceQueue SOFT_QUEUE = new ReferenceQueue();
// Configuration
private int processorBatchSize = BufferManager.DEFAULT_PROCESSOR_BATCH_SIZE;
//set to acceptable defaults for testing
private int maxProcessingBytes = 1 << 21;
private Integer maxProcessingBytesOrig;
long maxReserveBytes = 1 << 28;;
AtomicLong reserveBatchBytes = new AtomicLong();
AtomicLong overheadBytes = new AtomicLong();
private int maxActivePlans = DQPConfiguration.DEFAULT_MAX_ACTIVE_PLANS; //used as a hint to set the reserveBatchKB
private boolean useWeakReferences = true;
private boolean inlineLobs = true;
private int targetBytesPerRow = TARGET_BYTES_PER_ROW;
private int maxSoftReferences;
private int nominalProcessingMemoryMax = maxProcessingBytes;
private ReentrantLock lock = new ReentrantLock();
private Condition batchesFreed = lock.newCondition();
AtomicLong activeBatchBytes = new AtomicLong();
private AtomicLong readAttempts = new AtomicLong();
//TODO: consider the size estimate in the weighting function
LrfuEvictionQueue evictionQueue = new LrfuEvictionQueue(readAttempts);
LrfuEvictionQueue initialEvictionQueue = new LrfuEvictionQueue(readAttempts);
ConcurrentHashMap memoryEntries = new ConcurrentHashMap(16, .75f, CONCURRENCY_LEVEL);
//limited size reference caches based upon the memory settings
private WeakReferenceHashedValueCache weakReferenceCache;
private Map softCache = Collections.synchronizedMap(new LinkedHashMap(16, .75f, false) {
private static final long serialVersionUID = 1L;
protected boolean removeEldestEntry(Map.Entry eldest) {
if (size() > maxSoftReferences) {
BatchSoftReference bsr = eldest.getValue();
clearSoftReference(bsr);
return true;
}
return false;
}
});
private Cache cache;
private Map tupleBufferMap = new ConcurrentHashMap();
private ReferenceQueue tupleBufferQueue = new ReferenceQueue();
private AtomicLong tsId = new AtomicLong();
private AtomicLong batchAdded = new AtomicLong();
private AtomicLong readCount = new AtomicLong();
private AtomicLong writeCount = new AtomicLong();
private AtomicLong referenceHit = new AtomicLong();
//TODO: this does not scale well with multiple embedded instances
private static final Timer timer = new Timer("BufferManager Cleaner", true); //$NON-NLS-1$
private Cleaner cleaner;
private AtomicBoolean cleaning = new AtomicBoolean();
public BufferManagerImpl() {
this.cleaner = new Cleaner(this);
timer.schedule(cleaner, 100);
}
void clearSoftReference(BatchSoftReference bsr) {
synchronized (bsr) {
overheadBytes.addAndGet(-bsr.sizeEstimate);
bsr.sizeEstimate = 0;
}
bsr.clear();
}
void removeFromCache(Long gid, Long batch) {
if (cache.remove(gid, batch)) {
overheadBytes.addAndGet(-BATCH_OVERHEAD);
}
}
public long getBatchesAdded() {
return batchAdded.get();
}
public long getReadCount() {
return readCount.get();
}
public long getWriteCount() {
return writeCount.get();
}
public long getReadAttempts() {
return readAttempts.get();
}
@Override
public int getMaxProcessingSize() {
return maxProcessingBytes;
}
public long getReserveBatchBytes() {
return reserveBatchBytes.get();
}
/**
* Get processor batch size
* @return Number of rows in a processor batch
*/
@Override
public int getProcessorBatchSize() {
return this.processorBatchSize;
}
public void setTargetBytesPerRow(int targetBytesPerRow) {
this.targetBytesPerRow = targetBytesPerRow;
}
public void setProcessorBatchSize(int processorBatchSize) {
this.processorBatchSize = processorBatchSize;
}
@Override
public TupleBuffer createTupleBuffer(final List elements, String groupName,
TupleSourceType tupleSourceType) {
final Long newID = this.tsId.getAndIncrement();
int[] lobIndexes = LobManager.getLobIndexes(elements);
Class>[] types = getTypeClasses(elements);
BatchManagerImpl batchManager = createBatchManager(newID, types);
LobManager lobManager = null;
if (lobIndexes != null) {
FileStore lobStore = createFileStore(newID + "_lobs"); //$NON-NLS-1$
lobManager = new LobManager(lobIndexes, lobStore);
batchManager.setLobManager(lobManager);
}
TupleBuffer tupleBuffer = new TupleBuffer(batchManager, String.valueOf(newID), elements, lobManager, getProcessorBatchSize(elements));
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Creating TupleBuffer:", newID, elements, Arrays.toString(types), "batch size", tupleBuffer.getBatchSize(), "of type", tupleSourceType); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
}
tupleBuffer.setInlineLobs(inlineLobs);
return tupleBuffer;
}
public STree createSTree(final List extends Expression> elements, String groupName, int keyLength) {
Long newID = this.tsId.getAndIncrement();
int[] lobIndexes = LobManager.getLobIndexes(elements);
Class>[] types = getTypeClasses(elements);
BatchManagerImpl bm = createBatchManager(newID, types);
LobManager lobManager = null;
if (lobIndexes != null) {
lobManager = new LobManager(lobIndexes, null); //persistence is not expected yet - later we might utilize storage for out-of-line lob values
bm.setLobManager(lobManager);
}
BatchManager keyManager = createBatchManager(this.tsId.getAndIncrement(), Arrays.copyOf(types, keyLength));
int[] compareIndexes = new int[keyLength];
for (int i = 1; i < compareIndexes.length; i++) {
compareIndexes[i] = i;
}
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Creating STree:", newID); //$NON-NLS-1$
}
return new STree(keyManager, bm, new ListNestedSortComparator(compareIndexes), getProcessorBatchSize(elements.subList(0, keyLength)), getProcessorBatchSize(elements), keyLength, lobManager);
}
private static Class>[] getTypeClasses(final List extends Expression> elements) {
Class>[] types = new Class[elements.size()];
for (ListIterator extends Expression> i = elements.listIterator(); i.hasNext();) {
Expression expr = i.next();
Class> type = expr.getType();
Assertion.isNotNull(type);
types[i.previousIndex()] = type;
}
return types;
}
private BatchManagerImpl createBatchManager(final Long newID, Class>[] types) {
return new BatchManagerImpl(newID, types);
}
@Override
public FileStore createFileStore(String name) {
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Creating FileStore:", name); //$NON-NLS-1$
}
return this.cache.createFileStore(name);
}
public Cache getCache() {
return cache;
}
public void setMaxActivePlans(int maxActivePlans) {
this.maxActivePlans = maxActivePlans;
}
public void setMaxProcessingKB(int maxProcessingKB) {
if (maxProcessingKB > -1) {
this.maxProcessingBytes = maxProcessingKB<<10;
} else {
this.maxProcessingBytes = -1;
}
}
public void setMaxReserveKB(int maxReserveBatchKB) {
if (maxReserveBatchKB > -1) {
int maxReserve = maxReserveBatchKB<<10;
this.maxReserveBytes = maxReserve;
this.reserveBatchBytes.set(maxReserve);
} else {
this.maxReserveBytes = -1;
}
}
@Override
public void initialize() throws TeiidComponentException {
long maxMemory = Runtime.getRuntime().maxMemory();
maxMemory = Math.max(0, maxMemory - (SYSTEM_OVERHEAD_MEGS << 20)); //assume an overhead for the AS/system stuff
if (getMaxReserveKB() < 0) {
this.maxReserveBytes = 0;
int one_gig = 1 << 30;
if (maxMemory > one_gig) {
//assume 70% of the memory over the first gig
this.maxReserveBytes = (long)Math.max(0, (maxMemory - one_gig) * .7);
}
this.maxReserveBytes += Math.max(0, Math.min(one_gig, maxMemory) >> 1);
}
this.reserveBatchBytes.set(maxReserveBytes);
if (this.maxProcessingBytesOrig == null) {
//store the config value so that we can be reinitialized (this is not a clean approach)
this.maxProcessingBytesOrig = this.maxProcessingBytes;
}
if (this.maxProcessingBytesOrig < 0) {
this.maxProcessingBytes = (int)Math.min(Math.max(processorBatchSize * targetBytesPerRow * 16l, (.2 * maxMemory)/maxActivePlans), Integer.MAX_VALUE);
}
//make a guess at the max number of batches
long memoryBatches = maxMemory / (processorBatchSize * targetBytesPerRow);
//memoryBatches represents a full batch, so assume that most will be smaller
int logSize = 67 - Long.numberOfLeadingZeros(memoryBatches);
if (useWeakReferences) {
weakReferenceCache = new WeakReferenceHashedValueCache(Math.min(30, logSize));
}
this.maxSoftReferences = 1 << Math.min(30, logSize);
this.nominalProcessingMemoryMax = (int)Math.max(Math.min(this.maxReserveBytes, 2*this.maxProcessingBytes), Math.min(Integer.MAX_VALUE, 2*this.maxReserveBytes/maxActivePlans));
}
void setNominalProcessingMemoryMax(int nominalProcessingMemoryMax) {
this.nominalProcessingMemoryMax = nominalProcessingMemoryMax;
}
@Override
public void releaseOrphanedBuffers(long count) {
releaseBuffers(count, false);
}
@Override
public void releaseBuffers(int count) {
releaseBuffers(count, true);
}
private void releaseBuffers(long count, boolean updateContext) {
if (count < 1) {
return;
}
if (updateContext) {
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Releasing buffer space", count); //$NON-NLS-1$
}
CommandContext context = CommandContext.getThreadLocalContext();
if (context != null) {
context.addAndGetReservedBuffers((int)-count);
}
} else {
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.INFO)) {
LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Releasing orphaned buffer space", count); //$NON-NLS-1$
}
}
lock.lock();
try {
this.reserveBatchBytes.addAndGet(count);
batchesFreed.signalAll();
} finally {
lock.unlock();
}
}
@Override
public int reserveBuffers(int count, BufferReserveMode mode) {
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Reserving buffer space", count, mode); //$NON-NLS-1$
}
CommandContext context = CommandContext.getThreadLocalContext();
int existing = 0;
if (context != null) {
existing = (int)Math.min(Integer.MAX_VALUE, context.addAndGetReservedBuffers(0));
}
int result = count;
if (mode == BufferReserveMode.FORCE) {
reserve(count, context);
} else {
lock.lock();
try {
count = Math.min(count, nominalProcessingMemoryMax - existing);
result = noWaitReserve(count, false, context);
} finally {
lock.unlock();
}
}
persistBatchReferences(result);
return result;
}
private void reserve(int count, CommandContext context) {
this.reserveBatchBytes.addAndGet(-count);
if (context != null) {
context.addAndGetReservedBuffers(count);
}
}
@Override
public int reserveBuffersBlocking(int count, long[] val, boolean force) throws BlockedException {
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Reserving buffer space", count, force); //$NON-NLS-1$
}
assert count >= 0;
if (count == 0) {
return 0;
}
int result = 0;
int count_orig = count;
CommandContext context = CommandContext.getThreadLocalContext();
long reserved = 0;
if (context != null) {
reserved = context.addAndGetReservedBuffers(0);
//TODO: in theory we have to check the whole stack as we could be
//issuing embedded queries back to ourselves
}
count = Math.min(count, (int)Math.min(Integer.MAX_VALUE, nominalProcessingMemoryMax - reserved));
if (count_orig != count && !force) {
return 0; //is not possible to reserve the desired amount
}
result = noWaitReserve(count, true, context);
if (result == 0) {
if (val[0]++ == 0) {
val[1] = System.currentTimeMillis();
}
if (val[1] > 1) {
long last = val[1];
val[1] = System.currentTimeMillis();
try {
lock.lock();
if (val[1] - last < 10) {
//if the time difference is too close, then wait to prevent tight spins
//but we can't wait too long as we don't want to thread starve the system
batchesFreed.await(20, TimeUnit.MILLISECONDS);
}
if ((val[0] << (force?16:18)) > count) {
//aging out
//TOOD: ideally we should be using a priority queue and better scheduling
if (!force) {
return 0;
}
reserve(count_orig, context);
result = count_orig;
} else {
int min = 0;
if (force) {
min = 2*count/3;
} else {
min = 4*count/5;
}
//if a sample looks good proceed
if (reserveBatchBytes.get() > min){
reserve(count_orig, context);
result = count_orig;
}
}
} catch (InterruptedException e) {
throw new TeiidRuntimeException(e);
} finally {
lock.unlock();
}
}
if (result == 0) {
if (context != null) {
RequestWorkItem workItem = context.getWorkItem();
if (workItem != null) {
//if we have a workitem (non-test scenario) then before
//throwing blocked on memory to indicate there's more work
workItem.moreWork();
}
}
throw BlockedException.BLOCKED_ON_MEMORY_EXCEPTION;
}
}
if (force && result < count_orig) {
reserve(count_orig - result, context);
result = count_orig;
}
val[0] = 0;
persistBatchReferences(result);
return result;
}
private int noWaitReserve(int count, boolean allOrNothing, CommandContext context) {
boolean success = false;
for (int i = 0; !success && i < 2; i++) {
long reserveBatch = this.reserveBatchBytes.get();
long overhead = this.overheadBytes.get();
long current = reserveBatch - overhead;
if (allOrNothing) {
if (count > current) {
return 0;
}
} else if (count > current) {
count = (int)Math.max(0, current);
}
if (count == 0) {
return 0;
}
if (this.reserveBatchBytes.compareAndSet(reserveBatch, reserveBatch - count)) {
success = true;
}
}
//the value is changing rapidly, but we've already potentially adjusted the value twice, so just proceed
if (!success) {
this.reserveBatchBytes.addAndGet(-count);
}
if (context != null) {
context.addAndGetReservedBuffers(count);
}
return count;
}
void persistBatchReferences(int max) {
if (max <= 0) {
return;
}
if (!cleaning.get()) {
synchronized (cleaner) {
cleaner.notify();
}
}
long activeBatch = activeBatchBytes.get() + overheadBytes.get();
long reserveBatch = reserveBatchBytes.get();
long memoryCount = activeBatch + maxReserveBytes - reserveBatch;
if (memoryCount <= maxReserveBytes) {
if (DataTypeManager.USE_VALUE_CACHE && DataTypeManager.isValueCacheEnabled() && memoryCount < maxReserveBytes / 8) {
DataTypeManager.setValueCacheEnabled(false);
}
return;
} else if (DataTypeManager.USE_VALUE_CACHE) {
DataTypeManager.setValueCacheEnabled(true);
}
//we delay work here as there should be excess vm space, we are using an overestimate, and we want the cleaner to do the work if possible
//TODO: track sizes held by each queue independently
long maxToFree = Math.min(max, memoryCount - maxReserveBytes);
LrfuEvictionQueue first = initialEvictionQueue;
LrfuEvictionQueue second = evictionQueue;
if (evictionQueue.getSize() > 2*initialEvictionQueue.getSize()) {
//attempt to evict from the non-initial queue first as these should essentially be cost "free" and hopefully the reference cache can mitigate
//the cost of rereading
first = evictionQueue;
second = initialEvictionQueue;
}
maxToFree -= doEvictions(maxToFree, true, first);
if (maxToFree > 0) {
maxToFree = Math.min(maxToFree, activeBatchBytes.get() + overheadBytes.get() - reserveBatchBytes.get());
if (maxToFree > 0) {
doEvictions(maxToFree, true, second);
}
}
}
long doEvictions(long maxToFree, boolean checkActiveBatch, LrfuEvictionQueue queue) {
if (queue == evictionQueue) {
maxToFree = Math.min(maxToFree, this.maxProcessingBytes);
}
long freed = 0;
while (freed <= maxToFree && (
!checkActiveBatch //age out
|| (queue == evictionQueue && activeBatchBytes.get() + overheadBytes.get() + this.maxReserveBytes/2 > reserveBatchBytes.get()) //nominal cleaning criterion
|| (queue != evictionQueue && activeBatchBytes.get() + overheadBytes.get() + 3*this.maxReserveBytes/4 > reserveBatchBytes.get()))) { //assume that basically all initial batches will need to be written out at some point
//Added this comment
LogManager.logInfo(LogConstants.CTX_BUFFER_MGR, "preparing to evict first entry, maxtofree " + maxToFree + " ,freed " + freed);
CacheEntry ce = queue.firstEntry(checkActiveBatch);
if (ce == null) {
//Added this comment
LogManager.logInfo(LogConstants.CTX_BUFFER_MGR, "breaking doEvictions maxtofree " + maxToFree + " ,freed " + freed);
break;
}
synchronized (ce) {
if (!memoryEntries.containsKey(ce.getId())) {
if (!checkActiveBatch) {
//Added this comment
LogManager.logInfo(LogConstants.CTX_BUFFER_MGR, "removing from queue ce_id=" + ce.getId() + " + maxToFree= + " + maxToFree + ",freed=" + freed);
queue.remove(ce);
}
continue; //not currently a valid eviction
}
}
if (!checkActiveBatch) {
long lastAccess = ce.getKey().getLastAccess();
long currentTime = readAttempts.get();
long age = currentTime - lastAccess;
if (age < MAX_READ_AGE) {
checkActiveBatch = true;
continue;
}
}
boolean evicted = true;
try {
LogManager.logInfo(LogConstants.CTX_BUFFER_MGR, "persisting to storage ce_id=" + ce.getId() + " + maxToFree= + " + maxToFree + ",freed=" + freed);
evicted = evict(ce);
} catch (Throwable e) {
LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, QueryPlugin.Util.gs(QueryPlugin.Event.TEIID30017, ce.getId() ));
} finally {
synchronized (ce) {
if (evicted && memoryEntries.remove(ce.getId()) != null) {
freed += ce.getSizeEstimate();
activeBatchBytes.addAndGet(-ce.getSizeEstimate());
queue.remove(ce); //ensures that an intervening get will still be cleaned
}
}
}
}
return freed;
}
boolean evict(CacheEntry ce) throws Exception {
Serializer> s = ce.getSerializer();
if (s == null) {
return true;
}
boolean persist = false;
synchronized (ce) {
if (!ce.isPersistent()) {
persist = true;
ce.setPersistent(true);
}
}
if (persist) {
long count = writeCount.incrementAndGet();
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, s.getId(), ce.getId(), "writing batch to storage, total writes: ", count); //$NON-NLS-1$
}
}
boolean result = cache.add(ce, s);
if (s.useSoftCache()) {
createSoftReference(ce);
} else if (useWeakReferences) {
weakReferenceCache.getValue(ce); //a get will set the value
}
return result;
}
private void createSoftReference(CacheEntry ce) {
//if we don't set aside some reserve, we
//will push the soft ref out of memory potentially too quickly
int sizeEstimate = ce.getSizeEstimate()/2;
BatchSoftReference ref = new BatchSoftReference(ce, SOFT_QUEUE, sizeEstimate);
softCache.put(ce.getId(), ref);
overheadBytes.addAndGet(sizeEstimate);
}
/**
* Get a CacheEntry without hitting storage
*/
CacheEntry fastGet(Long batch, Boolean prefersMemory, boolean retain) {
CacheEntry ce = null;
if (retain) {
ce = memoryEntries.get(batch);
} else {
ce = memoryEntries.remove(batch);
}
if (ce != null) {
synchronized (ce) {
if (retain) {
//there is a minute chance the batch was evicted
//this call ensures that we won't leak
if (memoryEntries.containsKey(batch)) {
if (ce.isPersistent()) {
evictionQueue.touch(ce);
} else {
initialEvictionQueue.touch(ce);
}
}
} else {
evictionQueue.remove(ce);
if (!ce.isPersistent()) {
initialEvictionQueue.remove(ce);
}
}
}
if (!retain) {
BufferManagerImpl.this.remove(ce, true);
}
return ce;
}
if (prefersMemory == null || prefersMemory) {
BatchSoftReference bsr = softCache.remove(batch);
if (bsr != null) {
ce = bsr.get();
if (ce != null) {
clearSoftReference(bsr);
}
}
}
if (ce == null && (prefersMemory == null || !prefersMemory) && useWeakReferences) {
ce = weakReferenceCache.getByHash(batch);
if (ce == null || !ce.getId().equals(batch)) {
return null;
}
}
if (ce != null && ce.getObject() != null) {
referenceHit.getAndIncrement();
if (retain) {
addMemoryEntry(ce, false);
} else {
BufferManagerImpl.this.remove(ce, false);
}
return ce;
}
return null;
}
AtomicInteger removed = new AtomicInteger();
CacheEntry remove(Long gid, Long batch, boolean prefersMemory) {
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Removing batch from BufferManager", gid, batch); //$NON-NLS-1$
}
cleanSoftReferences();
CacheEntry ce = fastGet(batch, prefersMemory, false);
if (ce == null) {
removeFromCache(gid, batch);
} else {
ce.nullOut();
}
return ce;
}
private void remove(CacheEntry ce, boolean inMemory) {
if (inMemory) {
activeBatchBytes.addAndGet(-ce.getSizeEstimate());
}
Serializer> s = ce.getSerializer();
if (s != null) {
removeFromCache(s.getId(), ce.getId());
}
}
void addMemoryEntry(CacheEntry ce, boolean initial) {
persistBatchReferences(ce.getSizeEstimate());
synchronized (ce) {
boolean added = memoryEntries.put(ce.getId(), ce) == null;
if (initial) {
initialEvictionQueue.add(ce);
} else if (added) {
evictionQueue.recordAccess(ce);
evictionQueue.add(ce);
} else {
evictionQueue.touch(ce);
}
}
activeBatchBytes.getAndAdd(ce.getSizeEstimate());
}
void removeCacheGroup(Long id, Boolean prefersMemory) {
cleanSoftReferences();
Collection vals = cache.removeCacheGroup(id);
long overhead = vals.size() * BATCH_OVERHEAD;
overheadBytes.addAndGet(-overhead);
if (!vals.isEmpty()) {
for (Long val : vals) {
//TODO: we will unnecessarily call remove on the cache, but that should be low cost
fastGet(val, prefersMemory, false);
}
}
}
void cleanSoftReferences() {
for (int i = 0; i < 10; i++) {
BatchSoftReference ref = (BatchSoftReference)SOFT_QUEUE.poll();
if (ref == null) {
break;
}
softCache.remove(ref.key);
clearSoftReference(ref);
}
}
@Override
public int getProcessorBatchSize(List extends Expression> schema) {
return getSizeEstimates(schema)[0];
}
private int[] getSizeEstimates(List extends Expression> elements) {
int total = 0;
boolean isValueCacheEnabled = DataTypeManager.isValueCacheEnabled();
for (int i = elements.size() - 1; i >= 0; i--) {
Class> type = elements.get(i).getType();
total += SizeUtility.getSize(isValueCacheEnabled, type);
}
//assume 64-bit
total += 8*elements.size() + 36; // column list / row overhead
//nominal targetBytesPerRow but can scale up or down
int totalCopy = total;
boolean less = totalCopy < targetBytesPerRow;
int rowCount = processorBatchSize;
for (int i = 0; i < 3; i++) {
if (less) {
totalCopy <<= 1;
} else {
totalCopy >>= 2;
}
if (less && totalCopy > targetBytesPerRow
|| !less && totalCopy < targetBytesPerRow) {
break;
}
if (less) {
rowCount <<= 1;
} else {
rowCount >>= 1;
}
}
rowCount = Math.max(1, rowCount);
total *= rowCount;
return new int[]{rowCount, Math.max(1, total)};
}
@Override
public int getSchemaSize(List extends Expression> elements) {
return getSizeEstimates(elements)[1];
}
public void shutdown() {
this.cache.shutdown();
this.cache = null;
this.memoryEntries.clear();
this.evictionQueue.getEvictionQueue().clear();
this.initialEvictionQueue.getEvictionQueue().clear();
this.cleaner.cancel();
}
@Override
public void addTupleBuffer(TupleBuffer tb) {
cleanDefunctTupleBuffers();
this.tupleBufferMap.put(tb.getId(), new TupleReference(tb, this.tupleBufferQueue));
}
@Override
public void distributeTupleBuffer(String uuid, TupleBuffer tb) {
tb.setId(uuid);
addTupleBuffer(tb);
}
@Override
public TupleBuffer getTupleBuffer(String id) {
cleanDefunctTupleBuffers();
Reference r = this.tupleBufferMap.get(id);
if (r != null) {
return r.get();
}
return null;
}
private void cleanDefunctTupleBuffers() {
while (true) {
Reference> r = this.tupleBufferQueue.poll();
if (r == null) {
break;
}
this.tupleBufferMap.remove(((TupleReference)r).id);
}
}
static class TupleReference extends WeakReference{
String id;
public TupleReference(TupleBuffer referent, ReferenceQueue super TupleBuffer> q) {
super(referent, q);
id = referent.getId();
}
}
public void setUseWeakReferences(boolean useWeakReferences) {
this.useWeakReferences = useWeakReferences;
}
@Override
public void getState(OutputStream ostream) {
}
@Override
public void getState(String state_id, OutputStream ostream) {
TupleBuffer buffer = this.getTupleBuffer(state_id);
if (buffer != null) {
try {
ObjectOutputStream out = new ObjectOutputStream(ostream);
getTupleBufferState(out, buffer);
out.flush();
} catch (TeiidComponentException e) {
throw new TeiidRuntimeException(QueryPlugin.Event.TEIID30054, e);
} catch (IOException e) {
throw new TeiidRuntimeException(QueryPlugin.Event.TEIID30055, e);
}
}
}
private void getTupleBufferState(ObjectOutputStream out, TupleBuffer buffer) throws TeiidComponentException, IOException {
out.writeInt(buffer.getRowCount());
out.writeInt(buffer.getBatchSize());
out.writeObject(buffer.getTypes());
for (int row = 1; row <= buffer.getRowCount(); row+=buffer.getBatchSize()) {
TupleBatch b = buffer.getBatch(row);
BatchSerializer.writeBatch(out, buffer.getTypes(), b.getTuples());
}
}
@Override
public void setState(InputStream istream) {
}
@Override
public void setState(String state_id, InputStream istream) {
TupleBuffer buffer = this.getTupleBuffer(state_id);
if (buffer == null) {
try {
ObjectInputStream in = new ObjectInputStream(istream);
setTupleBufferState(state_id, in);
} catch (IOException e) {
throw new TeiidRuntimeException(QueryPlugin.Event.TEIID30056, e);
} catch(ClassNotFoundException e) {
throw new TeiidRuntimeException(QueryPlugin.Event.TEIID30057, e);
} catch(TeiidComponentException e) {
throw new TeiidRuntimeException(QueryPlugin.Event.TEIID30058, e);
}
}
}
private void setTupleBufferState(String state_id, ObjectInputStream in) throws IOException, ClassNotFoundException, TeiidComponentException {
int rowCount = in.readInt();
int batchSize = in.readInt();
String[] types = (String[])in.readObject();
List schema = new ArrayList(types.length);
for (int i = 0; i < types.length; i++) {
ElementSymbol es = new ElementSymbol("x"); //$NON-NLS-1$
es.setType(DataTypeManager.getDataTypeClass(types[i]));
schema.add(es);
}
TupleBuffer buffer = createTupleBuffer(schema, "cached", TupleSourceType.FINAL); //$NON-NLS-1$
buffer.setBatchSize(batchSize);
buffer.setId(state_id);
for (int row = 1; row <= rowCount; row+=batchSize) {
List> batch = BatchSerializer.readBatch(in, types);
for (int i = 0; i < batch.size(); i++) {
buffer.addTuple(batch.get(i));
}
}
if (buffer.getRowCount() != rowCount) {
buffer.remove();
throw new IOException(QueryPlugin.Util.getString("not_found_cache")); //$NON-NLS-1$
}
buffer.close();
addTupleBuffer(buffer);
}
@Override
public void setAddress(Serializable address) {
}
@Override
public void droppedMembers(Collection addresses) {
}
public void setInlineLobs(boolean inlineLobs) {
this.inlineLobs = inlineLobs;
}
public int getMaxReserveKB() {
return (int)maxReserveBytes>>10;
}
public void setCache(Cache cache) {
this.cache = cache;
}
public int getMemoryCacheEntries() {
return memoryEntries.size();
}
public long getActiveBatchBytes() {
return activeBatchBytes.get();
}
@Override
public boolean hasState(String stateId) {
return this.getTupleBuffer(stateId) != null;
}
public long getReferenceHits() {
return referenceHit.get();
}
@Override
public Streamable> persistLob(Streamable> lob, FileStore store,
byte[] bytes) throws TeiidComponentException {
return LobManager.persistLob(lob, store, bytes, inlineLobs, DataTypeManager.MAX_LOB_MEMORY_BYTES);
}
public void invalidCacheGroup(Long gid) {
removeCacheGroup(gid, null);
}
}