STREAMING_STATE_TRANSFER fails with lots of data.
nanreh Jun 11, 2007 8:55 PMSetup: I'm using Java 5 and JBossCache CR2.0 on Linux.
I have two cache instances with their own BDBJE cache loaders configured to "fetchPersistentState" on startup. This works with a standard STATE_TRANSFER and also works for STREAMING_STATE_TRANSFERs except when I have lots of cache state to replicate (about 500MB in this case).
I start up the first cache and it loads the state from its cache loader, everything looks fine. Then I start the second cache and I can see it contact the first cache to request an initial state transfer. Here's what the first cache sees when the new cache calls up asking for the initial state:
DEBUG 17:26.49 [UpHandler (GMS)] STREAMING_STATE_TRANSFER - GET_DIGEST_STATE_OK: digest is 162.49.100.122:32917: [0 : 5], 162.49.100.121:33261: [0 : 0] DEBUG 17:26.49 [StateProviderThreadSpawner] STREAMING_STATE_TRANSFER$StateProviderThreadSpawner - StateProviderThreadSpawner listening at 162.49.100.122:58357... DEBUG 17:26.49 [StateProviderThreadSpawner] STREAMING_STATE_TRANSFER$StateProviderThreadSpawner - StateProviderThreadSpawner listening at 162.49.100.122:58357... DEBUG 17:26.49 [STREAMING_STATE_TRANSFER.poolid=1] STREAMING_STATE_TRANSFER$StateProviderThreadSpawner$1 - Accepted request for state transfer from /162.49.100.121:43102 handing of to PooledExecutor thread DEBUG 17:26.49 [STREAMING_STATE_TRANSFER.poolid=1] STREAMING_STATE_TRANSFER$StateProviderHandler - Running on Thread[STREAMING_STATE_TRANSFER.poolid=1,5,JGroups threads]. Accepted request for state transfer from /162.49.100.121:43102, original buffer size was 8192 and was reset to 8192, passing outputstream up... INFO 17:26.49 [STREAMING_STATE_TRANSFER.poolid=1] StateTransferManager - locking the / subtree to return the in-memory (transient) state DEBUG 17:26.52 [Timer-1] FD$Monitor - sending are-you-alive msg to 162.49.100.121:33261 (own address=162.49.100.122:32917) DEBUG 17:26.52 [UpHandler (FD)] FD - received ack from 162.49.100.121:33261 DEBUG 17:26.55 [Timer-1] FD$Monitor - sending are-you-alive msg to 162.49.100.121:33261 (own address=162.49.100.122:32917) DEBUG 17:26.55 [UpHandler (FD)] FD - received ack from 162.49.100.121:33261 DEBUG 17:26.58 [Timer-1] FD$Monitor - sending are-you-alive msg to 162.49.100.121:33261 (own address=162.49.100.122:32917) DEBUG 17:26.58 [UpHandler (FD)] FD - received ack from 162.49.100.121:33261 INFO 17:26.59 [STREAMING_STATE_TRANSFER.poolid=1] StateTransferManager - Successfully generated state in 9929 msec DEBUG 17:26.59 [STREAMING_STATE_TRANSFER.poolid=1] STREAMING_STATE_TRANSFER$StreamingOutputStreamWrapper - State writer Socket[addr=/162.49.100.121,port=43102,localport=58357] is closing the socket DEBUG 17:27.00 [MERGE2.FindSubgroups thread (channel=ACS_CLUSTER)] MERGE2$FindSubgroups - initial_mbrs=[[own_addr=162.49.100.121:33261, coord_addr=162.49.100.122:32917, is_server=true], [own_addr=162.49.100.122:32917, coord_addr=162.49.100.122:32917, is_server=true]] DEBUG 17:27.01 [Timer-1] FD$Monitor - sending are-you-alive msg to 162.49.100.121:33261 (own address=162.49.100.122:32917) DEBUG 17:27.01 [UpHandler (FD)] FD - received ack from 162.49.100.121:33261 DEBUG 17:27.04 [Timer-1] FD$Monitor - sending are-you-alive msg to 162.49.100.121:33261 (own address=162.49.100.122:32917) DEBUG 17:27.04 [UpHandler (FD)] FD - received ack from 162.49.100.121:33261 DEBUG 17:27.07 [Timer-1] FD$Monitor - sending are-you-alive msg to 162.49.100.121:33261 (own address=162.49.100.122:32917) DEBUG 17:27.07 [UpHandler (FD)] FD - received ack from 162.49.100.121:33261 DEBUG 17:27.09 [UpHandler (GMS)] GMS - received LEAVE_REQ for 162.49.100.121:33261 from 162.49.100.121:33261 DEBUG 17:27.09 [ViewHandler] CoordGmsImpl - new=[], suspected=[], leaving=[162.49.100.121:33261], new view: [162.49.100.122:32917|2] [162.49.100.122:32917] DEBUG 17:27.09 [UpHandler (GMS)] CoordGmsImpl - view=[162.49.100.122:32917|2] [162.49.100.122:32917] DEBUG 17:27.09 [UpHandler (GMS)] GMS - [local_addr=162.49.100.122:32917] view is [162.49.100.122:32917|2] [162.49.100.122:32917] INFO 17:27.09 [UpHandler (GMS)] CacheImpl$MembershipListenerAdaptor - viewAccepted(): [162.49.100.122:32917|2] [162.49.100.122:32917] DEBUG 17:27.09 [UpHandler (GMS)] NAKACK - removing 162.49.100.121:33261 from received_msgs (not member anymore) DEBUG 17:27.09 [DownHandler (FD)] FD$Broadcaster - suspected_mbrs: [], after adjustment: []
The new cache's log appears below, you can see it start up and eventually fail during the initial state transfer:
DEBUG 17:26.42 [WrapperSimpleAppMain] XmlConfigurationParser - Attribute size: 18 DEBUG 17:26.42 [WrapperSimpleAppMain] XmlConfigurationParser - Attribute size: 3 DEBUG 17:26.42 [WrapperSimpleAppMain] CacheImpl - cache mode is REPL_SYNC INFO 17:26.42 [WrapperSimpleAppMain] JChannel - JGroups version: 2.4.1 SP-3 DEBUG 17:26.42 [WrapperSimpleAppMain] ClassConfigurator - mapping is: 1: class org.jgroups.stack.IpAddress 2: class org.jgroups.protocols.CAUSAL$CausalHeader 3: class org.jgroups.protocols.FD$FdHeader 4: class org.jgroups.protocols.FD_PID$FdHeader 5: class org.jgroups.protocols.FD_PROB$FdHeader 6: class org.jgroups.protocols.FD_SOCK$FdHeader 7: class org.jgroups.protocols.FragHeader 8: class org.jgroups.protocols.MERGE$MergeHeader 9: class org.jgroups.protocols.NakAckHeader 10: class org.jgroups.protocols.PARTITIONER$PartitionerHeader 11: class org.jgroups.protocols.PerfHeader 12: class org.jgroups.protocols.PIGGYBACK$PiggybackHeader 13: class org.jgroups.protocols.PingHeader 14: class org.jgroups.protocols.TcpHeader 15: class org.jgroups.protocols.TOTAL$Header 16: class org.jgroups.protocols.TOTAL_OLD$TotalHeader 17: class org.jgroups.protocols.TOTAL_TOKEN$TotalTokenHeader 18: class org.jgroups.protocols.TOTAL_TOKEN$RingTokenHeader 19: class org.jgroups.protocols.TunnelHeader 20: class org.jgroups.protocols.UdpHeader 21: class org.jgroups.protocols.UNICAST$UnicastHeader 22: class org.jgroups.protocols.VERIFY_SUSPECT$VerifyHeader 23: class org.jgroups.protocols.WANPIPE$WanPipeHeader 24: class org.jgroups.protocols.pbcast.GMS$GmsHeader 25: class org.jgroups.protocols.pbcast.NakAckHeader 26: class org.jgroups.protocols.pbcast.PbcastHeader 27: class org.jgroups.protocols.pbcast.STABLE$StableHeader 28: class org.jgroups.protocols.pbcast.STATE_TRANSFER$StateHeader 29: class org.jgroups.protocols.SMACK$SmackHeader 30: class org.jgroups.Message 31: class org.jgroups.View 32: class org.jgroups.ViewId 33: class org.jgroups.util.List 34: interface org.jgroups.Address 35: class org.jgroups.blocks.RequestCorrelator$Header 36: class org.jgroups.protocols.PingRsp 38: class java.util.Vector 39: class org.jgroups.protocols.pbcast.JoinRsp 40: class org.jgroups.protocols.pbcast.Digest 41: class java.util.Hashtable 53: class org.jgroups.protocols.COMPRESS$CompressHeader 54: class org.jgroups.protocols.FC$FcHeader 55: class org.jgroups.protocols.WanPipeAddress 56: class org.jgroups.protocols.TpHeader 57: class org.jgroups.protocols.ENCRYPT$EncryptHeader 58: class org.jgroups.protocols.SEQUENCER$SequencerHeader 59: class org.jgroups.protocols.FD_SIMPLE$FdHeader 60: class org.jgroups.protocols.VIEW_SYNC$ViewSyncHeader DEBUG 17:26.42 [WrapperSimpleAppMain] AUTOCONF - frag_size=64000 DEBUG 17:26.42 [WrapperSimpleAppMain] GMS - changed role to org.jgroups.protocols.pbcast.ClientGmsImpl DEBUG 17:26.42 [WrapperSimpleAppMain] VersionAwareMarshaller - Initialised with version 2.0.0 and versionInt 20 DEBUG 17:26.42 [WrapperSimpleAppMain] VersionAwareMarshaller - Using default marshaller class org.jboss.cache.marshall.CacheMarshaller200 INFO 17:26.42 [WrapperSimpleAppMain] CacheImpl - Using marshaller org.jboss.cache.marshall.VersionAwareMarshaller INFO 17:26.42 [WrapperSimpleAppMain] InterceptorChainFactory - interceptor chain is: class org.jboss.cache.interceptors.CallInterceptor class org.jboss.cache.interceptors.EvictionInterceptor class org.jboss.cache.interceptors.CacheStoreInterceptor class org.jboss.cache.interceptors.CacheLoaderInterceptor class org.jboss.cache.interceptors.UnlockInterceptor class org.jboss.cache.interceptors.PessimisticLockInterceptor class org.jboss.cache.interceptors.ReplicationInterceptor class org.jboss.cache.interceptors.NotificationInterceptor class org.jboss.cache.interceptors.TxInterceptor class org.jboss.cache.interceptors.CacheMgmtInterceptor class org.jboss.cache.interceptors.InvocationContextInterceptor DEBUG 17:26.47 [WrapperSimpleAppMain] BdbjeCacheLoader - Created JE environment com.sleepycat.je.Environment@44f787 for cache loader org.jboss.cache.loader.bdbje.BdbjeCacheLoader@1d6fbb3 DEBUG 17:26.47 [DownHandler (UDP)] UDP - creating sockets and starting threads INFO 17:26.47 [DownHandler (UDP)] UDP - sockets will use interface 162.49.100.121 INFO 17:26.47 [DownHandler (UDP)] UDP - socket information: local_addr=162.49.100.121:33261, mcast_addr=228.1.2.3:48866, bind_addr=/162.49.100.121, ttl=64 sock: bound to 162.49.100.121:33261, receive buffer size=80000, send buffer size=131071 mcast_recv_sock: bound to 162.49.100.121:48866, send buffer size=131071, receive buffer size=80000 mcast_send_sock: bound to 162.49.100.121:33262, send buffer size=131071, receive buffer size=80000 DEBUG 17:26.47 [DownHandler (UDP)] UDP - created unicast receiver thread DEBUG 17:26.49 [DownHandler (GMS)] ClientGmsImpl - initial_mbrs are [[own_addr=162.49.100.122:32917, coord_addr=162.49.100.122:32917, is_server=true]] DEBUG 17:26.49 [DownHandler (GMS)] ClientGmsImpl - election results: {162.49.100.122:32917=1} DEBUG 17:26.49 [DownHandler (GMS)] ClientGmsImpl - sending handleJoin(162.49.100.121:33261) to 162.49.100.122:32917 DEBUG 17:26.49 [DownHandler (GMS)] ClientGmsImpl - [162.49.100.121:33261]: JoinRsp=[162.49.100.122:32917|1] [162.49.100.122:32917, 162.49.100.121:33261] DEBUG 17:26.49 [DownHandler (GMS)] ClientGmsImpl - new_view=[162.49.100.122:32917|1] [162.49.100.122:32917, 162.49.100.121:33261] DEBUG 17:26.49 [DownHandler (GMS)] GMS - [local_addr=162.49.100.121:33261] view is [162.49.100.122:32917|1] [162.49.100.122:32917, 162.49.100.121:33261] INFO 17:26.49 [DownHandler (GMS)] CacheImpl$MembershipListenerAdaptor - viewAccepted(): [162.49.100.122:32917|1] [162.49.100.122:32917, 162.49.100.121:33261] DEBUG 17:26.49 [DownHandler (FD)] FD$Broadcaster - suspected_mbrs: [], after adjustment: [] DEBUG 17:26.49 [DownHandler (GMS)] GMS - 162.49.100.121:33261 changed role to org.jgroups.protocols.pbcast.ParticipantGmsImpl INFO 17:26.49 [WrapperSimpleAppMain] CacheImpl - CacheImpl local address is 162.49.100.121:33261 DEBUG 17:26.49 [WrapperSimpleAppMain] STREAMING_STATE_TRANSFER - Member 162.49.100.121:33261 asking 162.49.100.122:32917 for full state DEBUG 17:26.49 [DownHandler (GMS)] STABLE - suspending message garbage collection DEBUG 17:26.49 [DownHandler (GMS)] STABLE - resume task started, max_suspend_time=22000 DEBUG 17:26.49 [UpHandler (GMS)] STREAMING_STATE_TRANSFER - Connecting to state provider /162.49.100.122:58357, original buffer size was 43690 and was reset to 8192 DEBUG 17:26.49 [UpHandler (GMS)] STREAMING_STATE_TRANSFER - Connected to state provider, my end of the socket is /162.49.100.121:43102 passing inputstream up... DEBUG 17:26.52 [Timer-1] FD$Monitor - sending are-you-alive msg to 162.49.100.122:32917 (own address=162.49.100.121:33261) DEBUG 17:26.52 [UpHandler (FD)] FD - received ack from 162.49.100.122:32917 DEBUG 17:26.55 [Timer-1] FD$Monitor - sending are-you-alive msg to 162.49.100.122:32917 (own address=162.49.100.121:33261) DEBUG 17:26.55 [UpHandler (FD)] FD - received ack from 162.49.100.122:32917 INFO 17:26.56 [STREAMING_STATE_TRANSFER.reader] StateTransferManager - starting state integration at node UnversionedNode[ / data=[] RL] DEBUG 17:26.58 [Timer-1] FD$Monitor - sending are-you-alive msg to 162.49.100.122:32917 (own address=162.49.100.121:33261) DEBUG 17:26.58 [UpHandler (FD)] FD - received ack from 162.49.100.122:32917 DEBUG 17:27.01 [Timer-1] FD$Monitor - sending are-you-alive msg to 162.49.100.122:32917 (own address=162.49.100.121:33261) DEBUG 17:27.01 [UpHandler (FD)] FD - received ack from 162.49.100.122:32917 DEBUG 17:27.04 [Timer-1] FD$Monitor - sending are-you-alive msg to 162.49.100.122:32917 (own address=162.49.100.121:33261) DEBUG 17:27.04 [UpHandler (FD)] FD - received ack from 162.49.100.122:32917 DEBUG 17:27.07 [Timer-1] FD$Monitor - sending are-you-alive msg to 162.49.100.122:32917 (own address=162.49.100.121:33261) DEBUG 17:27.07 [UpHandler (FD)] FD - received ack from 162.49.100.122:32917 DEBUG 17:27.09 [DownHandler (GMS)] STABLE - resuming message garbage collection DEBUG 17:27.09 [DownHandler (GMS)] ParticipantGmsImpl - sending LEAVE request to 162.49.100.122:32917 (local_addr=162.49.100.121:33261) DEBUG 17:27.09 [DownHandler (GMS)] GMS - 162.49.100.121:33261 changed role to org.jgroups.protocols.pbcast.ClientGmsImpl DEBUG 17:27.09 [DownHandler (UDP)] UDP - closing sockets and stopping threads DEBUG 17:27.09 [UDP mcast receiver] UDP - multicast thread terminated DEBUG 17:27.09 [DownHandler (UDP)] UDP - multicast receive socket closed DEBUG 17:27.09 [DownHandler (UDP)] UDP - multicast send socket closed DEBUG 17:27.09 [DownHandler (UDP)] UDP - socket closed DEBUG 17:27.09 [UDP.UcastReceiverThread] UDP$UcastReceiver - unicast receiver socket is closed, exception=java.net.SocketException: Socket closed DEBUG 17:27.09 [UDP.UcastReceiverThread] UDP$UcastReceiver - unicast receiver thread terminated ERROR 17:27.09 [WrapperSimpleAppMain] Main - Error starting ShardServer. org.jboss.cache.CacheException: Unable to fetch state on startup at org.jboss.cache.CacheImpl.start(CacheImpl.java:791) at org.jboss.cache.DefaultCacheFactory.createCache(DefaultCacheFactory.java:87) at org.jboss.cache.DefaultCacheFactory.createCache(DefaultCacheFactory.java:58) at org.jboss.cache.DefaultCacheFactory.createCache(DefaultCacheFactory.java:51) at com.scea.scne.acs.shard.cache.JBossCacheShardStore.initialize(JBossCacheShardStore.java:44) at com.scea.scne.acs.shard.cache.JBossCacheShardStore.<init>(JBossCacheShardStore.java:35) at com.scea.scne.acs.shard.server.Main.buildShardStore(Main.java:65) at com.scea.scne.acs.shard.server.Main.main(Main.java:28) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:585) at org.tanukisoftware.wrapper.WrapperSimpleApp.run(WrapperSimpleApp.java:240) at java.lang.Thread.run(Thread.java:595) Caused by: org.jboss.cache.CacheException: Initial state transfer failed: Channel.getState() returned false at org.jboss.cache.CacheImpl.fetchStateOnStartup(CacheImpl.java:1190) at org.jboss.cache.CacheImpl.start(CacheImpl.java:783) ... 13 more
I've tried changing lots of my settings based on other forum posts like this one and a reading of the JGroups documentation but nothing seems to make a difference--I see the same sequence of events where the new cache decides it's time to leave the cluster:
DEBUG 17:27.09 [DownHandler (GMS)] STABLE - resuming message garbage collection DEBUG 17:27.09 [DownHandler (GMS)] ParticipantGmsImpl - sending LEAVE request to 162.49.100.122:32917 (local_addr=162.49.100.121:33261)
I've tried changing my transaction isolation level to make sure there aren't any lock-related issues here and I'm running this test without any cache clients reading or writing to either cache so they should be unencumbered and free to exchange state.
I have tested quite a bit with these two machines. They replicate changes properly and the STREAMING_STATE_TRANSFER works until the data gets too big which makes me think that something is timing out but I've had no luck understanding it.
Here's my cache configuration:
<?xml version="1.0" encoding="UTF-8"?> <!-- ===================================================================== --> <!-- --> <!-- Sample JBoss Cache Service Configuration --> <!-- --> <!-- ===================================================================== --> <server> <classpath codebase="./lib" archives="jboss-cache.jar, jgroups.jar"/> <!-- ==================================================================== --> <!-- Defines JBoss Cache configuration --> <!-- ==================================================================== --> <mbean code="org.jboss.cache.CacheImpl" name="jboss.cache:service=Cache"> <depends>jboss:service=Naming</depends> <depends>jboss:service=TransactionManager</depends> <!-- Configure the TransactionManager --> <!-- <attribute name="TransactionManagerLookupClass">org.jboss.cache.transaction.GenericTransactionManagerLookup</attribute> --> <!-- Use our custom TxManagerLookup class --> <attribute name="TransactionManagerLookupClass">com.scea.scne.acs.shard.tx.CustomTxManagerLookup</attribute> <!-- Node locking level : SERIALIZABLE REPEATABLE_READ (default) READ_COMMITTED READ_UNCOMMITTED NONE --> <attribute name="IsolationLevel">READ_COMMITTED</attribute> <!-- Lock parent before doing node additions/removes --> <attribute name="LockParentForChildInsertRemove">false</attribute> <!-- Valid modes are LOCAL REPL_ASYNC REPL_SYNC INVALIDATION_ASYNC INVALIDATION_SYNC --> <attribute name="CacheMode">REPL_SYNC</attribute> <!-- Name of cluster. Needs to be the same for all JBoss Cache nodes in a cluster in order to find each other. --> <attribute name="ClusterName">ACS_CLUSTER</attribute> <!--Uncomment next three statements to enable JGroups multiplexer. This configuration is dependent on the JGroups multiplexer being registered in an MBean server such as JBossAS. --> <!-- <depends>jgroups.mux:name=Multiplexer</depends> <attribute name="MultiplexerService">jgroups.mux:name=Multiplexer</attribute> <attribute name="MultiplexerStack">fc-fast-minimalthreads</attribute> --> <!-- JGroups protocol stack properties. ClusterConfig isn't used if the multiplexer is enabled and successfully initialized. --> <attribute name="ClusterConfig"> <config> <!-- UDP: if you have a multihomed machine, set the bind_addr attribute to the appropriate NIC IP address --> <!-- UDP: On Windows machines, because of the media sense feature being broken with multicast (even after disabling media sense) set the loopback attribute to true --> <UDP mcast_addr="228.1.2.3" mcast_port="48866" ip_ttl="64" ip_mcast="true" mcast_send_buf_size="150000" mcast_recv_buf_size="80000" ucast_send_buf_size="150000" ucast_recv_buf_size="80000" loopback="true"/> <PING timeout="2000" num_initial_members="3" up_thread="false" down_thread="false"/> <MERGE2 min_interval="10000" max_interval="20000"/> <FD shun="true" up_thread="true" down_thread="true"/> <VERIFY_SUSPECT timeout="1500" up_thread="false" down_thread="false"/> <pbcast.NAKACK gc_lag="50" retransmit_timeout="600,1200,2400,4800" max_xmit_size="8192" up_thread="false" down_thread="false"/> <UNICAST timeout="600,1200,2400" down_thread="false"/> <pbcast.STABLE desired_avg_gossip="20000" up_thread="false" down_thread="false"/> <FRAG frag_size="8192" down_thread="false" up_thread="false"/> <pbcast.GMS join_timeout="5000" join_retry_timeout="2000" shun="true" print_local_addr="true"/> <!-- <pbcast.STATE_TRANSFER up_thread="false" down_thread="false"/> --> <pbcast.STREAMING_STATE_TRANSFER down_thread="false" up_thread="false" use_flush="true" use_reading_thread="true"/> </config> </attribute> <!-- The max amount of time (in milliseconds) we wait until the initial state (ie. the contents of the cache) are retrieved from existing members in a clustered environment --> <attribute name="InitialStateRetrievalTimeout">30000</attribute> <!-- Number of milliseconds to wait until all responses for a synchronous call have been received. --> <attribute name="SyncReplTimeout">20000</attribute> <!-- Max number of milliseconds to wait for a lock acquisition --> <attribute name="LockAcquisitionTimeout">15000</attribute> <!-- Specific eviction policy configurations.--> <attribute name="EvictionPolicyConfig"> <config> <attribute name="wakeUpIntervalSeconds">5</attribute> <!-- This defaults to 200000 if not specified --> <attribute name="eventQueueSize">200000</attribute> <attribute name="policyClass">org.jboss.cache.eviction.FIFOPolicy</attribute> <!-- Cache wide default --> <region name="/_default_"> <attribute name="maxNodes">10000</attribute> <attribute name="timeToLiveSeconds">0</attribute> <attribute name="maxAgeSeconds">0</attribute> </region> </config> </attribute> <attribute name="CacheLoaderConfiguration"> <config> <passivation>false</passivation> <!--<preload>/</preload>--> <shared>false</shared> <cacheloader> <class>org.jboss.cache.loader.bdbje.BdbjeCacheLoader</class> <properties> location=/tmp/acs/filestore </properties> <fetchPersistentState>true</fetchPersistentState> <async>false</async> <ignoreModifications>false</ignoreModifications> <purgeOnStartup>false</purgeOnStartup> </cacheloader> </config> </attribute> </mbean> </server>
Thanks for reading this far, I appreciate any insights you may have on this issue.
frank