1 2 Previous Next 18 Replies Latest reply on Mar 21, 2014 5:23 AM by ataylor

    scale down and prepared transactions

    ataylor

      Ive been working on enhancing Justins scale down work to also transfer prepared transactions. Basically I have added another method, scaleDownTransactions() that looks at the resource manager and handles each prepared transaction. Basically what i do is this:

       

      for every set of references being routed for a single message

       

      1. add the HDR_ROUTE_TO_IDS for each queue.

      2. start an xas session with the same xid

      3. send the messages

      3. end and prepare the tx.

       

      This all works fine

       

      for every reference being acknowledged for the same single message i

       

      1. add a new property HDR_ROUTE_TO_ACK_IDS for every queue that has a reference acked

      2. send the message as part of the same send as for routing (so the message will have a list of HDR_ROUTE_TO_IDS and HDR_ROUTE_TO_ACK_IDS.

      3 at the server when routed through routeFromCluster i hold a flag on the routing RoutingContextList for every queue (for an address) where the reference is acked.

      4.I then when the message reference is created set in route() i set a flag on the message reference to mark it as already acked and acknowledge it on the queue.

      5 at this point the message and all references are written to the journal along with the acks.

      6 on prepare the usual stuff happens.

       

      now when the client reconnects and commits all the records are committed, the only difference is i dont add the references to the queue.

       

      If the client rolls back then i will add all the acked refs and the message to the journal (currently i do this non tx but i could do it in a new tx)

       

      So firstly is this the best approach? there may be a better way of doing it.

       

      Also if the same message exists on 2 queues, and 1 queue is acked in a tax at crash. Then the same message will be sent twice, once in justins original method and once when we handle the tx's, would this make any difference since the ID's will change anyway?

        • 1. Re: scale down and prepared transactions
          ataylor
          • 2. Re: scale down and prepared transactions
            jbertram

            Can you outline the client use-case in view here?  For example, a client connects to node A in a live/live cluster, starts an XA transaction, consumes 3 messages, "prepares" the tx, node A is shutdown...  I need some context to fit this logic into.

            • 3. Re: scale down and prepared transactions
              clebert.suconic

              ve been working on enhancing Justins scale down work to also transfer prepared transactions. Basically I have added another method, scaleDownTransactions() that looks at the resource manager and handles each prepared transaction. Basically what i do is this:

               

              for every set of references being routed for a single message

               

              1. add the HDR_ROUTE_TO_IDS for each queue.

              2. start an xas session with the same xid

              3. send the messages

              3. end and prepare the tx.

               

              This all works fine

              sounds like a plan here!

               

              for every reference being acknowledged for the same single message i

               

              1. add a new property HDR_ROUTE_TO_ACK_IDS for every queue that has a reference acked

              2. send the message as part of the same send as for routing (so the message will have a list of HDR_ROUTE_TO_IDS and HDR_ROUTE_TO_ACK_IDS.

              3 at the server when routed through routeFromCluster i hold a flag on the routing RoutingContextList for every queue (for an address) where the reference is acked.

              4.I then when the message reference is created set in route() i set a flag on the message reference to mark it as already acked and acknowledge it on the queue.

              5 at this point the message and all references are written to the journal along with the acks.

              6 on prepare the usual stuff happens.

               

              I'm not sure I understand your algorithm here... but we need some sort of way to send a message and make the ACK already part of a Prepared TX. We could either use a special protocol packet for that... or some way of sending the message. if that's what you mean.

               

               

              Also if the same message exists on 2 queues, and 1 queue is acked in a tax at crash. Then the same message will be sent twice, once in justins original method and once when we handle the tx's, would this make any difference since the ID's will change anyway?

               

               

              You mean.. on that case the message will be split? one message for the prepared ACK TX... and one for the already sent?

               

              I thought the same thing and I don't see any other way around. I don't think it matters really...  we split the message on this edge case and I think it's acceptable and the viable solution. I don't see an issue with changing the IDs of the message on such case.

               

              As long as the message (from the POV of the user) is only sent once for each queue..it's all fine.

              • 4. Re: scale down and prepared transactions
                clebert.suconic

                BTW: There are brokers that will always split the messages between queues... we do it to save memory and performance. So there's definitely not an issue on making two messages (one for the prepared ACK, and one for the other queue where it's ready for delivery)

                • 5. Re: scale down and prepared transactions
                  ataylor

                  Can you outline the client use-case in view here?  For example, a client connects to node A in a live/live cluster, starts an XA transaction, consumes 3 messages, "prepares" the tx, node A is shutdown...  I need some context to fit this logic into.

                  The same as your use case, when the server shuts down the prepared transactions are transferred to a new node.

                  I'm not sure I understand your algorithm here... but we need some sort of way to send a message and make the ACK already part of a Prepared TX. We could either use a special protocol packet for that... or some way of sending the message. if that's what you mean.

                  i basically send the message, with some extra ids in the header so we know which ones should be put in acked state.

                  You mean.. on that case the message will be split? one message for the prepared ACK TX... and one for the already sent?

                  The same message is sent twice, once transferred normally to the queues its destined for and then sent as part of a prepared tx (in fact it will be resent for every tx it appears in)

                  BTW: There are brokers that will always split the messages between queues... we do it to save memory and performance. So there's definitely not an issue on making two messages (one for the prepared ACK, and one for the other queue where it's ready for delivery)

                  so that means above is fine.

                  • 6. Re: scale down and prepared transactions
                    ataylor

                    by the way, the reason i do a send is so that paging etc is adhered to, it basically just get routed but not added to the tail of the queue

                    • 7. Re: scale down and prepared transactions
                      ataylor

                      I'm making progress, basically everything works apart from dealing with store and forward queues. For this i need to know the node id of the server i'm connected to which is currently available. Since we are sharing work here is a list of what needs done.

                       

                      1) change core client API to receive the node id of the server on connect (and on reconnect as it will change on live to live failover). Clebert I wasnt sure of the best place to do this since your API changes

                      2) once 1 is done, change the scale down to check for store and forward queues and make sure they go to the correct destination on send.

                      3) delivery count doesnt work with scale down as we basically re send so its set to 0, not sure if and how we should deal with this.

                      4) live to live failover, we need the client to failover to live, this can easily be done by setting the correct connector on the cluster connection but since the server is already live the client would connect before any transfer had happened. we need to tell the client to wait after initial reconnect until transfer is complete

                      5) scheduled delivery doesn't work with the current message transfer. Justin maybe the best approach would be to make the scheduled delivery handler put all its messages in the queue before starting the transfer wdyt?

                      6) we also need a way of informing the cluster that a live has gone permanently. This is so each live node can handle any messages it has in its store and forward queue and send them to the correct destination.

                       

                      Any thoughts guys, or help if you are free

                      • 8. Re: scale down and prepared transactions
                        jbertram

                        1) change core client API to receive the node id of the server on connect (and on reconnect as it will change on live to live failover). Clebert I wasnt sure of the best place to do this since your API changes

                        Shouldn't this information already be on the client by way of the cluster topology updates?

                         

                        2) once 1 is done, change the scale down to check for store and forward queues and make sure they go to the correct destination on send.

                        I can do this, but I've been waiting for you to commit your changes because I know you've been working directly on the bits of code involved here.

                         

                        3) delivery count doesnt work with scale down as we basically re send so its set to 0, not sure if and how we should deal with this.

                        I don't have any good ideas on how to handle this right now. 

                         

                        4) live to live failover, we need the client to failover to live, this can easily be done by setting the correct connector on the cluster connection but since the server is already live the client would connect before any transfer had happened. we need to tell the client to wait after initial reconnect until transfer is complete

                        I created org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage_V2 (which informs the client where the current server is scaling down its messages) as part of 0125ced52ae2d4c8c3412b86119af2b334c51cd7, but your approach might be simpler.  As you noted, the client will almost certainly connect to the other live server before the scale-down has completed, but is it necessary to wait that long?  The scaled-down messages will be going to the back of the queue(s) on the other server so its quite possible that the client won't even be able to get the messages for awhile (assuming it's not using a filter). 

                         

                        5) scheduled delivery doesn't work with the current message transfer. Justin maybe the best approach would be to make the scheduled delivery handler put all its messages in the queue before starting the transfer wdyt?

                        Will that mean that the messages would lose their schedule when sent to the other server?  If we can keep the schedule then I think we should.

                         

                        6) we also need a way of informing the cluster that a live has gone permanently. This is so each live node can handle any messages it has in its store and forward queue and send them to the correct destination.

                        Couldn't that be accomplished by tuning <reconnect-attempts> on the <cluster-connection>?

                        • 9. Re: scale down and prepared transactions
                          ataylor

                          1) change core client API to receive the node id of the server on connect (and on reconnect as it will change on live to live failover). Clebert I wasnt sure of the best place to do this since your API changes

                          Shouldn't this information already be on the client by way of the cluster topology updates?

                           

                          the topology knows the ids of all the server, but its not exposed to the session yet, we need something like session.getconnection.getserveri

                          2) once 1 is done, change the scale down to check for store and forward queues and make sure they go to the correct destination on send.

                          I can do this, but I've been waiting for you to commit your changes because I know you've been working directly on the bits of code involved here.

                          I will tidy up today and send a PR so we can work together

                           

                          3) delivery count doesnt work with scale down as we basically re send so its set to 0, not sure if and how we should deal with this.

                          I don't have any good ideas on how to handle this right now. 

                          me neither, anyone else any ideas

                          4) live to live failover, we need the client to failover to live, this can easily be done by setting the correct connector on the cluster connection but since the server is already live the client would connect before any transfer had happened. we need to tell the client to wait after initial reconnect until transfer is complete

                          I created org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage_V2 (which informs the client where the current server is scaling down its messages) as part of 0125ced52ae2d4c8c3412b86119af2b334c51cd7, but your approach might be simpler.  As you noted, the client will almost certainly connect to the other live server before the scale-down has completed, but is it necessary to wait that long?  The scaled-down messages will be going to the back of the queue(s) on the other server so its quite possible that the client won't even be able to get the messages for awhile (assuming it's not using a filter).

                          for the colocated scenario, we just need to use the parent servers invm connector, so just setting that on the cluster connection as say <scaledown-connector> or even have this on the servers configuration itself. As far as the client waiting to reconnect, this is mainly for handling transactions, i.e. if the client calls commit then the transaction has to exist.

                          5) scheduled delivery doesn't work with the current message transfer. Justin maybe the best approach would be to make the scheduled delivery handler put all its messages in the queue before starting the transfer wdyt?

                          Will that mean that the messages would lose their schedule when sent to the other server?  If we can keep the schedule then I think we should.

                          no, as the header will still be set, it will get picked up when re routed. I will try to do that today, maybe  method queue.forceScheduledDelivery()

                          6) we also need a way of informing the cluster that a live has gone permanently. This is so each live node can handle any messages it has in its store and forward queue and send them to the correct destination.

                          Couldn't that be accomplished by tuning <reconnect-attempts> on the <cluster-connection>?

                          Im just thinking about transient network failures etc, would be better to know for sure if possible that a live server is stopping permanently

                          • 10. Re: scale down and prepared transactions
                            ataylor

                            ive sent a PR https://github.com/hornetq/hornetq/pull/1589 for scale down, ive implemented for number 5 on my list (scheduled messages) and i am working on delivery count.

                            • 11. Re: scale down and prepared transactions
                              clebert.suconic

                              We were talking about the NodeID used by the Session for what you need to Store and Forward queues...

                              We could easily expose the Topology used on the ClientSessionFactory::

                               

                                public ClientSessionFactory createSessionFactory(String nodeID) throws Exception

                                 {

                                    TopologyMember topologyMember = topology.getMember(nodeID);

                              .... either live

                                       ClientSessionFactoryInternal factory = (ClientSessionFactoryInternal) createSessionFactory(topologyMember.getLive());

                              ....  or backup

                                       // This shouldn't happen, however I wanted this to consider all possible cases

                                       ClientSessionFactoryInternal factory = (ClientSessionFactoryInternal) createSessionFactory(topologyMember.getBackup());

                               

                              ....

                               

                                 }

                               

                               

                               

                              We coud get the topologyMember here and send it to the SessionFactoryInternal... by passing the topologyMember... and later exposing it through ClientSessionFactory.getTopology().

                               

                               

                              Would that work for you?

                              • 12. Re: scale down and prepared transactions
                                clebert.suconic

                                Well that method is only used on a specific occasion...

                                 

                                 

                                but would the ClientSessionFactory::getToppology() be enough? we would have to treat all the cases.

                                • 13. Re: scale down and prepared transactions
                                  ataylor

                                  all i really need is a method on session (or any object on session)that will return the node id of the server its is currently connected to.

                                  • 14. Re: Re: scale down and prepared transactions
                                    jbertram

                                    It appears we can do this already, e.g.:

                                     

                                    myClientSessionFactory.getServerLocator().getTopology().getMember(myClientSessionFactory.getConnectorConfiguration()).getNodeId();
                                    

                                     

                                    It's a little ugly, but it appears to work based on some quick tests.

                                    1 2 Previous Next