-
1. Re: Sending and Receiving Messages Concurrently
ataylor May 1, 2014 4:57 AM (in response to yairogen)without seeing any code its impossible to say
-
2. Re: Re: Sending and Receiving Messages Concurrently
yairogen May 1, 2014 6:51 AM (in response to ataylor)Here is the code:
int NUM_ITER = 100; ExecutorService threadPool = Executors.newFixedThreadPool(NUM_ITER + 10); final CountDownLatch countDownLatch = new CountDownLatch(NUM_ITER); long start = System.currentTimeMillis(); final LongAdder longAdder = new LongAdder(); for (int i = 0; i < NUM_ITER; i++) { threadPool.execute(new Runnable() { @Override public void run() { ClientMessage clientMessage = session.createMessage(true); clientMessage.getBodyBuffer().writeString("hi there"); } }); threadPool.execute(new Runnable() { @Override public void run() { ClientMessage clientMessage = consumer.receive(); clientMessage.acknowledge(); countDownLatch.countDown(); longAdder.add(1); long sum = longAdder.sum(); if (sum % 500 == 0) { System.out.println("read/write index: " + sum); } } }); } countDownLatch.await(); long end = System.currentTimeMillis(); double total = (end - start) / 1000D; double tps = NUM_ITER / total; System.out.println("tps read and write: " + tps); threadPool.shutdown();
-
3. Re: Re: Sending and Receiving Messages Concurrently
ataylor May 1, 2014 6:55 AM (in response to yairogen)your code formatting seems to have removed some of the code so i cant see it properly, however make sure you are
a) starting the session for the consumer
b) make sure you use a separate session for each thread as sessions are not thread safe
-
4. Re: Re: Sending and Receiving Messages Concurrently
yairogen May 1, 2014 8:49 AM (in response to ataylor)I don't know what's going on in the formatting. Either way I have a wrapper over the core API and it uses thread local to verify each thread has only one session, consumer and producer instance.
-
5. Re: Re: Sending and Receiving Messages Concurrently
ataylor May 1, 2014 9:02 AM (in response to yairogen)so you are starting the session?
-
6. Re: Re: Sending and Receiving Messages Concurrently
yairogen May 1, 2014 10:44 AM (in response to ataylor)yes. if I see I need a new session as it doesn't exist in ThreadLocal - I create a new Session and start it.
-
7. Re: Re: Sending and Receiving Messages Concurrently
jbertram May 1, 2014 10:49 AM (in response to yairogen)I'd need a test-case to reproduce the issue to investigate further.
-
8. Re: Re: Re: Sending and Receiving Messages Concurrently
yairogen May 4, 2014 1:58 AM (in response to jbertram)See attached. Notice that when the shutdown hook runs some messages do get consumed. I can't understand what I'm doing wrong.
/* * Copyright 2014 Cisco Systems, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import com.twitter.jsr166e.LongAdder; import org.hornetq.api.core.HornetQException; import org.hornetq.api.core.TransportConfiguration; import org.hornetq.api.core.client.*; import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashSet; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Created by Yair Ogen on 04/05/2014. */ public class ParallelReadWriteTest { private static final ThreadLocal<ClientProducer> producer = new ThreadLocal<ClientProducer>(); public static ThreadLocal<ClientSession> sessionThreadLocal = new ThreadLocal<ClientSession>(); private static ClientSessionFactory nettyFactory = null; private static final ThreadLocal<ClientConsumer> consumer = new ThreadLocal<ClientConsumer>(); private static Logger LOGGER = LoggerFactory.getLogger("hornetq-test"); private static final Set<ClientProducer> producers = new HashSet<ClientProducer>(); private static final Set<ClientConsumer> consumers = new HashSet<ClientConsumer>(); static{ init(); } private static void init() { try { nettyFactory = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName())).createSessionFactory(); } catch (Exception e) { LOGGER.error("can't create hornetq session: {}", e, e); throw new RuntimeException(e); } Runtime.getRuntime().addShutdownHook(new Thread(){ @Override public void run() { try { for (ClientProducer clientProducer : producers) { clientProducer.close(); } for (ClientConsumer clientConsumer : consumers) { clientConsumer.close(); } } catch (HornetQException e) { LOGGER.error("can't close producer, error: {}", e, e); } } }); } private ClientProducer getProducer(String queueName) { try { if (producer.get() == null) { ClientProducer clientProducer = getSession().createProducer(queueName); producers.add(clientProducer); producer.set(clientProducer); } return producer.get(); } catch (Exception e) { LOGGER.error("can't create queue consumer: {}", e, e); throw new RuntimeException(e); } } private ClientConsumer getConsumer(String queueName) { try { if (consumer.get() == null) { ClientConsumer clientConsumer = getSession().createConsumer(queueName); consumers.add(clientConsumer); consumer.set(clientConsumer); } return consumer.get(); } catch (Exception e) { LOGGER.error("can't create queue consumer: {}", e, e); throw new RuntimeException(e); } } public static ClientSession getSession() { if (sessionThreadLocal.get() == null) { try { ClientSession hornetQSession = nettyFactory.createSession(true, true); hornetQSession.start(); sessionThreadLocal.set(hornetQSession); } catch (Exception e) { LOGGER.error("can't create hornetq session: {}", e, e); throw new RuntimeException(e); } } return sessionThreadLocal.get(); } @Test public void testParralelSendAndSyncRecieveUsingCore() throws Exception { int NUM_ITER = 100; ExecutorService threadPool = Executors.newFixedThreadPool(NUM_ITER + 10); final CountDownLatch countDownLatch = new CountDownLatch(NUM_ITER); long start = System.currentTimeMillis(); final LongAdder longAdder = new LongAdder(); for (int i = 0; i < NUM_ITER; i++) { threadPool.execute(new Runnable() { @Override public void run() { ClientMessage clientMessage = getSession().createMessage(true); clientMessage.getBodyBuffer().writeString("hi there"); try { getProducer("myExampleDirect").send(clientMessage); } catch (HornetQException e) { e.printStackTrace(); } } }); threadPool.execute(new Runnable() { @Override public void run() { try { ClientMessage clientMessage = getConsumer("myExampleDirect").receive(); clientMessage.acknowledge(); } catch (HornetQException e) { e.printStackTrace(); } countDownLatch.countDown(); longAdder.add(1); long sum = longAdder.sum(); if (sum % 500 == 0) { System.out.println("read/write index: " + sum); } } }); } countDownLatch.await(); long end = System.currentTimeMillis(); double total = (end - start) / 1000D; double tps = NUM_ITER / total; System.out.println("tps read and write: " + tps); threadPool.shutdown(); } }
-
9. Re: Re: Re: Sending and Receiving Messages Concurrently
jbertram May 12, 2014 11:19 AM (in response to yairogen)I was looking for something I could actually run with minimal set-up. Could you modify one of the existing examples we ship to reproduce the problem you're seeing and attach the whole thing to the thread? That way I can just grab it and run it.