/** * */ package com.subex.spark.common.distributedcaching.data; import net.jcip.annotations.ThreadSafe; import java.io.IOException; import java.util.AbstractCollection; import java.util.AbstractSet; import java.util.Collection; import java.util.Iterator; import java.util.Map; import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.infinispan.commons.equivalence.Equivalence; import org.infinispan.commons.util.concurrent.ParallelIterableMap.KeyValueAction; import org.infinispan.container.DataContainer; import org.infinispan.container.DefaultDataContainer; import org.infinispan.container.InternalEntryFactory; import org.infinispan.container.entries.InternalCacheEntry; import org.infinispan.eviction.ActivationManager; import org.infinispan.eviction.EvictionManager; import org.infinispan.eviction.PassivationManager; import org.infinispan.factories.annotations.Inject; import org.infinispan.factories.annotations.Start; import org.infinispan.filter.KeyFilter; import org.infinispan.filter.KeyValueFilter; import org.infinispan.metadata.Metadata; import org.infinispan.persistence.manager.PersistenceManager; import org.infinispan.util.CoreImmutables; import org.infinispan.util.TimeService; /** * @author prashant.thakur * */ @ThreadSafe public class ExtendedDataContainer implements DataContainer { DefaultDataContainer defaultDataContainer; EvictionManager evictionManager; PassivationManager passivator; InternalEntryFactory entryFactory; ActivationManager activator; PersistenceManager clm; TimeService timeService; private static final Log log = LogFactory.getLog(ExtendedDataContainer.class); public ExtendedDataContainer() { defaultDataContainer = new DefaultDataContainer<>(25); } public ExtendedDataContainer(int concurrencyLevel) { defaultDataContainer = new DefaultDataContainer<>(concurrencyLevel); } public ExtendedDataContainer(int concurrencyLevel, Equivalence keyEq) { defaultDataContainer = new DefaultDataContainer<>(concurrencyLevel, keyEq); } @Inject public void initialize(EvictionManager evictionManager, PassivationManager passivator, InternalEntryFactory entryFactory, ActivationManager activator, PersistenceManager clm, TimeService timeService) { this.evictionManager = evictionManager; this.passivator = passivator; this.entryFactory = entryFactory; this.activator = activator; this.clm = clm; this.timeService = timeService; } @Start public void start(){ log.debug("Inside start entryFactory is " + entryFactory.toString()); defaultDataContainer.initialize(evictionManager, passivator, entryFactory, activator, clm, timeService); } @Override public Iterator> iterator() { return new EntryIterator(defaultDataContainer.iterator()); } private static CompactObjectSerializable createValueInstance(int classId) { switch (classId) { case 1: return new Subscriber(); default: return null; } } private V parseValue(byte[] valueBytes) throws IOException { CompressedCompactObjectByteInputStream inputStream = new CompressedCompactObjectByteInputStream( valueBytes); int classId = inputStream.readInt(); V value = (V) createValueInstance(classId); value.fromBytes(valueBytes); return value; } @Override public InternalCacheEntry get(Object key) { InternalCacheEntry internalCacheEntry = defaultDataContainer .get(key); return convertToCacheEntry(internalCacheEntry); } private InternalCacheEntry convertToCacheEntry( InternalCacheEntry internalCacheEntry) { CompactObjectSerializable value; if(internalCacheEntry==null) return null; try { value = parseValue(internalCacheEntry.getValue()); InternalCacheEntry cacheEntry = entryFactory.create( internalCacheEntry.getKey(), (V) value, internalCacheEntry.getMetadata()); return cacheEntry; } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); return null; } } private InternalCacheEntry convertToInternalCacheEntry( InternalCacheEntry internalCacheEntry) { if(internalCacheEntry==null) return null; byte[] value; try { value = ((CompactObjectSerializable) internalCacheEntry.getValue()) .toBytes(); InternalCacheEntry cacheEntry = entryFactory.create( internalCacheEntry.getKey(), value, internalCacheEntry.getMetadata()); return cacheEntry; } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); return null; } } @Override public InternalCacheEntry peek(Object key) { InternalCacheEntry internalCacheEntry = defaultDataContainer .peek(key); return convertToCacheEntry(internalCacheEntry); } @Override public void put(K k, V v, Metadata metadata) { try { if(v == null) { System.out.println("value is null"); } if (defaultDataContainer == null) { System.out.println("defaultDataContainer is null"); } if (metadata == null) { System.out.println("metadata is null"); } defaultDataContainer.put(k, v.toBytes(), metadata); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } @Override public boolean containsKey(Object key) { return defaultDataContainer.containsKey(key); } @Override public InternalCacheEntry remove(Object key) { return convertToCacheEntry(defaultDataContainer.remove(key)); } @Override public int size() { return defaultDataContainer.size(); } @Override public void clear() { defaultDataContainer.clear(); } @Override public Set keySet() { return defaultDataContainer.keySet(); } @Override public Collection values() { return new Values(); } @Override public Set> entrySet() { return new EntrySet(); } @Override public void purgeExpired() { defaultDataContainer.purgeExpired(); } @Override public void evict(K key) { defaultDataContainer.evict(key); } @Override public InternalCacheEntry compute(K key, org.infinispan.container.DataContainer.ComputeAction action) { return convertToCacheEntry(defaultDataContainer.compute(key, new ComputeActionWrapper(action))); } @Override public void executeTask(KeyFilter filter, KeyValueAction> action) throws InterruptedException { defaultDataContainer.executeTask(filter, new KeyValueActionWrapper( action)); } @Override public void executeTask(KeyValueFilter filter, KeyValueAction> action) throws InterruptedException { defaultDataContainer.executeTask(new KeyValueFilterWrapper(filter), new KeyValueActionWrapper(action)); } private class ComputeActionWrapper implements org.infinispan.container.DataContainer.ComputeAction { org.infinispan.container.DataContainer.ComputeAction action; public ComputeActionWrapper( org.infinispan.container.DataContainer.ComputeAction action) { this.action = action; } @Override public InternalCacheEntry compute(K key, InternalCacheEntry oldEntry, InternalEntryFactory factory) { return convertToInternalCacheEntry(action.compute(key, convertToCacheEntry(oldEntry), factory)); } } private class KeyValueActionWrapper implements KeyValueAction> { KeyValueAction> action; public KeyValueActionWrapper( KeyValueAction> action) { this.action = action; } @Override public void apply(K key, InternalCacheEntry entry) { action.apply(key, convertToCacheEntry(entry)); } } private class KeyValueFilterWrapper implements KeyValueFilter { private KeyValueFilter filter; public KeyValueFilterWrapper(KeyValueFilter filter) { this.filter = filter; } @Override public boolean accept(K key, byte[] value, Metadata metadata) { try { return filter.accept(key, parseValue(value), metadata); } catch (IOException e) { e.printStackTrace(); return this.accept(key, value, metadata); } } } public class EntryIterator implements Iterator> { private final Iterator> it; EntryIterator(Iterator> it) { this.it = it; } @Override public InternalCacheEntry next() { return convertToCacheEntry(it.next()); } @Override public boolean hasNext() { return it.hasNext(); } @Override public void remove() { it.remove(); } } /** * Minimal implementation needed for unmodifiable Collection * */ private class Values extends AbstractCollection { @Override public Iterator iterator() { return new ValueIterator(defaultDataContainer.values().iterator()); } @Override public int size() { return defaultDataContainer.size(); } } public class ValueIterator implements Iterator { private final Iterator it; ValueIterator(Iterator it) { this.it = it; } @Override public V next() { try { return parseValue(it.next()); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); return null; } } @Override public boolean hasNext() { return it.hasNext(); } @Override public void remove() { it.remove(); } } /** * Minimal implementation needed for unmodifiable Set * */ private class EntrySet extends AbstractSet> { @Override public boolean contains(Object o) { if (!(o instanceof Map.Entry)) { return false; } @SuppressWarnings("rawtypes") Map.Entry e = (Map.Entry) o; InternalCacheEntry ice = defaultDataContainer.get(e.getKey()); if (ice == null) { return false; } return ice.getValue().equals(e.getValue()); } @Override public Iterator> iterator() { return new ImmutableEntryIterator(defaultDataContainer.iterator()); } @Override public int size() { return defaultDataContainer.size(); } @Override public String toString() { return defaultDataContainer.toString(); } } private class ImmutableEntryIterator extends EntryIterator { ImmutableEntryIterator(Iterator> it) { super(it); } @Override public InternalCacheEntry next() { return CoreImmutables.immutableInternalCacheEntry(super.next()); } } }