I'm working on a project that splits an enormous amount of data from a legacy system text file, and queues it up in JBoss MQ for a clustered MDB to process. The file is usually 2 GB plus in size, and I'm using a ThreadPoolExecutor to read the partitions of the file asynchronously. However, under heavy load, when I reach around 25,000 records queued, I always receive the following error.
Unknown exception occurred java.io.IOException: org.springframework.jms.UncategorizedJmsException: Uncategorized exception occured during JMS processing; nested exception is org.jboss.mq.SpyJMSException: Cannot authenticate user; - nested throwable: (java.net.SocketException: Address already in use); nested exception is java.net.SocketException: Address already in use at com.purdueefcu.statements.reader.pstatmnt.ConcurrentFileReaderImpl.readFile(ConcurrentFileReaderImpl.java:109) at com.purdueefcu.statements.reader.Reader.readFile(Reader.java:80) at com.purdueefcu.statements.reader.Reader.main(Reader.java:97)
I'm not sure why I'm getting this, but I'm guessing that I'm not getting a connection pool via the remote JNDI lookup that spring uses, but rather getting a new connection each time. This is inefficient, and obviously not the behavior I want. I have 2 requirements for my connection in my POJO client.
1. I need the connection to be a pool. I know the size of the pool before the program executes. It will need to be equal to the number of partition readers. Can I set the size of the pool after my JNDI lookup, or is that even necessary/possible?
2. I need all transactions to be XA compliant. Several database records are written for auditing purpose during the queuing operation. As such all write operations to the db and to the JMS queue need to participate in the same transaction.
Here is my spring bean definition for the connection. Any assistance would be great.
<bean id="remoteJMSQueue" class="org.springframework.jndi.JndiObjectFactoryBean" lazy-init="true"> <property name="jndiEnvironment"> <ref bean="jndiProperties" /> </property> <!-- We need to make sure that we connect to the XA capable JNDI, not the single transaction JNDI--> <property name="jndiName"> <value>XAConnectionFactory</value> </property> <property name="cache"> <value>false</value> </property> <property name="proxyInterface"> <value>javax.jms.QueueConnectionFactory</value> </property> </bean>
JNDI properties file
java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory java.naming.factory.url.pkgs= org.jboss.naming:org.jnp.interfaces #production host java.naming.provider.url=jnp://prodjboss1:1100,prodjboss2:1100