12 Replies Latest reply on Jun 20, 2012 5:18 PM by rokhmanov

    Proper ways to close Continuous execution

    rokhmanov

      Hi,

       

      I am looking for a proper way to initiate a stop for Continuous Execution (from both, the client and translator side). For the client, I use the examples from documnetation for "Non-blocking Statement Execution" and some teiid unit tests, extended with ResultsFuture. When at some moment I explicitly close the TeiidPreparedStatement from inside of StatementCallBack.onRow() - querying stops well. On a translator side, to stop the continuous execution, I have to explicitly throw new TranslatorException from inside of next() method. Is this the proper way of doing things, or anything better can be used?

      I've heard something about returning null or empty ResultSet at some moment inside my overriden execute() translator method, but cannot find anything related to that in docs or unit tests.

       

      I use Teiid 8.1.0.Alpha2 on JBoss AS 7.1.1

       

      Thanks,

      Andriy

        • 1. Re: Proper ways to close Continuous execution
          shawkins

          Andriy,

           

          Yes, closing the statement from within your callback will stop execution.  From a translator side there isn't a real mechanism yet.  When a translator returns a null it is simply indicating that the current result window has ended and it will just wait until it is reexecuted for the next iterator.  Throwing an exception will work just fine as that will kick the query out of the processing loop.  I'm not sure what would be a better way given that the client and processing logic currently expects to just keep executing.

           

          Steve

          • 2. Re: Proper ways to close Continuous execution
            markaddleman

            Ultimately, I'd like all of our queries to use the continuous executions and some queries will have a definite end of results that only the translator/data source know.  We just need some way of indicating that. 

             

            I can see a few options:

            1. Use a special exception like you say
            2. Use/abuse statement warnings

             

            Both of these approaches require the client to coordinate but that's not a big deal.  An alternative is for the execution to return null twice in a row to indicate end of results to the query engine.  Or, some other sentinel (perhaps a special, constant java.util.List object). 

             

            For now, I think we can go with a special exception since exception semantics are very similar (ie, close all JDBC resources).

             

            Curious, what is the overhead (both memory and CPU) of non-operating queries?  I seem to recall you saying that its very low.

            • 3. Re: Proper ways to close Continuous execution
              shawkins

              Mark,

               

              Perhaps you're looking for a boolean flag to be returned by the reset method on ReusableExecution to indicate that results are no longer available.  The only grey area is when multiple sources are involved.  Would you expect them all to indicate that their finished?

               

              Yes the overhead is low.  The RequestWorkItem and associated ProcessorPlan should typically have a memory overhead in the 25-250k range (the higher side typically being large procedure plans that execute dozens to hundreds of teiid queries, which in turn each have a ProcessorPlan).  Prepared plans share much of their metadata state and will have a lower overhead in total when reused.

               

              Steve

              • 4. Re: Proper ways to close Continuous execution
                markaddleman

                The boolean flag is the cleanest solution but I'm fine with other mechanisms, too.

                 

                I *think* the multiple sources semantics are the same as any regular query where one source runs out of data:  If an inner join, the entire execution ends; if an outer join, the execution may continue depending on the particular query construction.

                • 5. Re: Proper ways to close Continuous execution
                  shawkins

                  Mark Addleman wrote:

                   

                  The boolean flag is the cleanest solution but I'm fine with other mechanisms, too.

                   

                  I *think* the multiple sources semantics are the same as any regular query where one source runs out of data:  If an inner join, the entire execution ends; if an outer join, the execution may continue depending on the particular query construction.

                   

                  Yes, that is exactly what we'll do within a query window.  However we're talking about across windows.  If one source has indicated it is done producing results for good, does that mean that the user query is done?  Or should we just assume that additional windows will have no results for that source and wait until there is a concenus of all reusable executions indicating their done for good?  Or should we just recommend the usage of an exception, which clearly indicates that processing is over?

                  • 6. Re: Proper ways to close Continuous execution
                    markaddleman

                    Duh, of course.

                     

                    Perhaps the semantics extend directly to the cross-window case?  The point of this flag is to indicate that there cannot be more results for the execution, so stop asking.  That is the moral equivalent of reaching the end of results in the non-continuous case and that implies all executions stop in an inner join and possibly on an outter join. 

                     

                    Am I still missing something?

                    • 7. Re: Proper ways to close Continuous execution
                      shawkins

                      It's not that you're missing something.  It's that we're now talking about something that is slightly different than what Andriy was asking about.  An exception clearly stops all processing of a continuous query.  No further iterations will be initiated.  With the source just indicating that no more results will ever be available you're now expecting the engine to introspect the plan to determine what that means for the continuous execution, whether to keep going and just not ask for more results from that source (in the case of a union) or to stop.  That is a more complicated mechanism than just full stop.

                       

                      At the very least there should be enough here to create a JIRA should you want something other than an exception to indicate that the continous query has stopped from a server perspective.

                       

                      Steve

                      • 8. Re: Proper ways to close Continuous execution
                        rokhmanov

                        I am afraid I would need additional clarification. As a reminder: my task is to stop continuous execution from client by close() the statement from inside the StatementCallBack.onRow(s, rs) after 1-2 minutes of processing. I want to have the retrieved results stored in ResultsFuture (which I populate with every row retrieved during execution).

                        I just found that to properly cancel the continuous execution, the statement should be closed at the moment when StatementCallBack.onRow method fired on the last row of resultset for this round of executuon. If statement closed somewhere in the middle of resultset, the execution gets in infinite loop, throwing TeiidSQLException: Error trying to operate on closed ResultSet.

                         

                        Here is the sample code I have (below). the dynamicvdb-portfolio example from teiid has 25 rows in product table. This SQL was used in sample: "select * from Accounts.PRODUCT". Any value of rowCount other than 49 gives the exception dump (a very bottom).

                        Any advice what would be the best approach here? To stop execution properly - there is no good way to know how many records retrieved in advance. I've tried to modify close condition to "if (rowCount > 49 && rs.isLast())..." but was not successful on that - seems isLast() is not supported properly here.

                             private void processAsync(Connection connection, String sql)
                             {
                                  Statement statement = null;
                                  try {
                                       statement = connection.createStatement();
                                       final TeiidStatement ts = statement.unwrap(TeiidStatement.class);
                                            final ResultsFuture> result = new ResultsFuture>(); 
                                            ts.submitExecute(sql, new StatementCallback() {
                                                 int rowCount = 0;
                                                 List rowStore = new ArrayList();
                                                 public void onRow(Statement s, ResultSet rs) throws SQLException {
                                                      rowCount++;
                                                      System.out.println("### " + rowCount);
                                                      String row = rs.getString(1) + " - " + rs.getString(2) + " - " + rs.getString(3);
                                                      System.out.println(row);
                                                      rowStore.add(row);
                                                      if (rowCount > 49)
                                                      {
                                                           s.close();
                                                      }
                                                 }                         
                                                 public void onException(Statement s, Exception e) {
                                                      System.out.println("### EXCEPTION:");
                                                      e.printStackTrace();
                                                      result.getResultsReceiver().receiveResults(rowStore);
                                                      try {
                                                           s.close();
                                                      } catch (SQLException e1) {
                                                           // TODO Auto-generated catch block
                                                           System.out.println("!!! Execution completed with: " + e1.getMessage());
                                                      }
                                                 }                         
                                                 public void onComplete(Statement s) throws Exception {
                                                      System.out.println("### COMPLETE");
                                                      result.getResultsReceiver().receiveResults(rowStore);
                                                      s.close();
                                                 }
                        //                    }, new RequestOptions());
                                            }, new RequestOptions().continuous(true));
                                            
                                            System.out.println("### Processed:" + result.get().size() );                    
                                            System.out.println("### Values:" + Arrays.toString(result.get().toArray()) );                    
                                  } catch (SQLException e) {
                                       e.printStackTrace();
                                  } catch (InterruptedException e) {
                                       // TODO Auto-generated catch block
                                       e.printStackTrace();
                                  } catch (ExecutionException e) {
                                       // TODO Auto-generated catch block
                                       e.printStackTrace();
                                  }
                             }
                        
                        

                         

                         

                        11:07:27,695 INFO  [stdout] (Worker3_QueryProcessorQueue24) ### EXCEPTION:
                        11:07:27,695 ERROR [stderr] (Worker3_QueryProcessorQueue24) org.teiid.jdbc.TeiidSQLException: Error trying to operate on a closed ResultSet object.
                        11:07:27,696 ERROR [stderr] (Worker3_QueryProcessorQueue24)      at org.teiid.jdbc.ResultSetImpl.checkClosed(ResultSetImpl.java:175)
                        11:07:27,697 ERROR [stderr] (Worker3_QueryProcessorQueue24)      at org.teiid.jdbc.ResultSetImpl.next(ResultSetImpl.java:245)
                        11:07:27,698 ERROR [stderr] (Worker3_QueryProcessorQueue24)      at org.teiid.jdbc.ResultSetImpl.submitNext(ResultSetImpl.java:226)
                        11:07:27,698 ERROR [stderr] (Worker3_QueryProcessorQueue24)      at org.teiid.jdbc.NonBlockingRowProcessor$1.run(NonBlockingRowProcessor.java:59)
                        11:07:27,700 ERROR [stderr] (Worker3_QueryProcessorQueue24)      at org.teiid.jdbc.NonBlockingRowProcessor$1$1.onCompletion(NonBlockingRowProcessor.java:68)
                        11:07:27,703 ERROR [stderr] (Worker3_QueryProcessorQueue24)      at org.teiid.client.util.ResultsFuture.done(ResultsFuture.java:130)
                        11:07:27,704 ERROR [stderr] (Worker3_QueryProcessorQueue24)      at org.teiid.client.util.ResultsFuture.access$200(ResultsFuture.java:37)
                        11:07:27,704 ERROR [stderr] (Worker3_QueryProcessorQueue24)      at org.teiid.client.util.ResultsFuture$1.receiveResults(ResultsFuture.java:75)
                        11:07:27,706 ERROR [stderr] (Worker3_QueryProcessorQueue24)      at org.teiid.jdbc.ResultSetImpl$1.onCompletion(ResultSetImpl.java:235)
                        11:07:27,707 ERROR [stderr] (Worker3_QueryProcessorQueue24)      at org.teiid.client.util.ResultsFuture.done(ResultsFuture.java:130)
                        11:07:27,710 ERROR [stderr] (Worker3_QueryProcessorQueue24)      at org.teiid.client.util.ResultsFuture.access$200(ResultsFuture.java:37)
                        11:07:27,713 ERROR [stderr] (Worker3_QueryProcessorQueue24)      at org.teiid.client.util.ResultsFuture$1.receiveResults(ResultsFuture.java:75)
                        11:07:27,714 ERROR [stderr] (Worker3_QueryProcessorQueue24)      at org.teiid.dqp.internal.process.RequestWorkItem.sendResultsIfNeeded(RequestWorkItem.java:713)
                        11:07:27,717 ERROR [stderr] (Worker3_QueryProcessorQueue24)      at org.teiid.dqp.internal.process.RequestWorkItem$1.flushBatchDirect(RequestWorkItem.java:561)
                        11:07:27,720 ERROR [stderr] (Worker3_QueryProcessorQueue24)      at org.teiid.query.processor.BatchCollector.flushBatch(BatchCollector.java:174)
                        11:07:27,721 ERROR [stderr] (Worker3_QueryProcessorQueue24)      at org.teiid.query.processor.BatchCollector.collectTuples(BatchCollector.java:149)
                        11:07:27,724 ERROR [stderr] (Worker3_QueryProcessorQueue24)      at org.teiid.dqp.internal.process.RequestWorkItem.processMore(RequestWorkItem.java:380)
                        11:07:27,727 ERROR [stderr] (Worker3_QueryProcessorQueue24)      at org.teiid.dqp.internal.process.RequestWorkItem.process(RequestWorkItem.java:289)
                        11:07:27,729 ERROR [stderr] (Worker3_QueryProcessorQueue24)      at org.teiid.dqp.internal.process.AbstractWorkItem.run(AbstractWorkItem.java:49)
                        11:07:27,735 ERROR [stderr] (Worker3_QueryProcessorQueue24)      at org.teiid.dqp.internal.process.RequestWorkItem.run(RequestWorkItem.java:217)
                        11:07:27,736 ERROR [stderr] (Worker3_QueryProcessorQueue24)      at org.teiid.dqp.internal.process.DQPWorkContext.runInContext(DQPWorkContext.java:245)
                        11:07:27,736 ERROR [stderr] (Worker3_QueryProcessorQueue24)      at org.teiid.dqp.internal.process.ThreadReuseExecutor$RunnableWrapper.run(ThreadReuseExecutor.java:123)
                        11:07:27,738 ERROR [stderr] (Worker3_QueryProcessorQueue24)      at org.teiid.dqp.internal.process.ThreadReuseExecutor$3.run(ThreadReuseExecutor.java:298)
                        11:07:27,742 ERROR [stderr] (Worker3_QueryProcessorQueue24)      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
                        11:07:27,743 ERROR [stderr] (Worker3_QueryProcessorQueue24)      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
                        11:07:27,747 ERROR [stderr] (Worker3_QueryProcessorQueue24)      at java.lang.Thread.run(Thread.java:722)
                        11:07:27,747 WARNING [org.teiid.jdbc.NonBlockingRowProcessor] (Worker3_QueryProcessorQueue24) Unhandled exception from StatementCallback: org.teiid.jdbc.TeiidSQLException: Error trying to operate on a closed ResultSet object.
                             at org.teiid.jdbc.ResultSetImpl.checkClosed(ResultSetImpl.java:175) [teiid-client-8.1.0.Alpha2-SNAPSHOT.jar:8.1.0.Alpha2-SNAPSHOT]
                             at org.teiid.jdbc.ResultSetImpl.next(ResultSetImpl.java:245) [teiid-client-8.1.0.Alpha2-SNAPSHOT.jar:8.1.0.Alpha2-SNAPSHOT]
                             at org.teiid.jdbc.ResultSetImpl.submitNext(ResultSetImpl.java:226) [teiid-client-8.1.0.Alpha2-SNAPSHOT.jar:8.1.0.Alpha2-SNAPSHOT]
                             at org.teiid.jdbc.NonBlockingRowProcessor$1.run(NonBlockingRowProcessor.java:59) [teiid-client-8.1.0.Alpha2-SNAPSHOT.jar:8.1.0.Alpha2-SNAPSHOT]
                             at org.teiid.jdbc.NonBlockingRowProcessor$1$1.onCompletion(NonBlockingRowProcessor.java:68) [teiid-client-8.1.0.Alpha2-SNAPSHOT.jar:8.1.0.Alpha2-SNAPSHOT]
                             at org.teiid.client.util.ResultsFuture.done(ResultsFuture.java:130) [teiid-client-8.1.0.Alpha2-SNAPSHOT.jar:8.1.0.Alpha2-SNAPSHOT]
                             at org.teiid.client.util.ResultsFuture.access$200(ResultsFuture.java:37) [teiid-client-8.1.0.Alpha2-SNAPSHOT.jar:8.1.0.Alpha2-SNAPSHOT]
                             at org.teiid.client.util.ResultsFuture$1.receiveResults(ResultsFuture.java:75) [teiid-client-8.1.0.Alpha2-SNAPSHOT.jar:8.1.0.Alpha2-SNAPSHOT]
                             at org.teiid.jdbc.ResultSetImpl$1.onCompletion(ResultSetImpl.java:235) [teiid-client-8.1.0.Alpha2-SNAPSHOT.jar:8.1.0.Alpha2-SNAPSHOT]
                             at org.teiid.client.util.ResultsFuture.done(ResultsFuture.java:130) [teiid-client-8.1.0.Alpha2-SNAPSHOT.jar:8.1.0.Alpha2-SNAPSHOT]
                             at org.teiid.client.util.ResultsFuture.access$200(ResultsFuture.java:37) [teiid-client-8.1.0.Alpha2-SNAPSHOT.jar:8.1.0.Alpha2-SNAPSHOT]
                             at org.teiid.client.util.ResultsFuture$1.receiveResults(ResultsFuture.java:75) [teiid-client-8.1.0.Alpha2-SNAPSHOT.jar:8.1.0.Alpha2-SNAPSHOT]
                             at org.teiid.dqp.internal.process.RequestWorkItem.sendResultsIfNeeded(RequestWorkItem.java:713)
                             at org.teiid.dqp.internal.process.RequestWorkItem$1.flushBatchDirect(RequestWorkItem.java:561)
                             at org.teiid.query.processor.BatchCollector.flushBatch(BatchCollector.java:174)
                             at org.teiid.query.processor.BatchCollector.collectTuples(BatchCollector.java:149)
                             at org.teiid.dqp.internal.process.RequestWorkItem.processMore(RequestWorkItem.java:380)
                             at org.teiid.dqp.internal.process.RequestWorkItem.process(RequestWorkItem.java:289)
                             at org.teiid.dqp.internal.process.AbstractWorkItem.run(AbstractWorkItem.java:49)
                             at org.teiid.dqp.internal.process.RequestWorkItem.run(RequestWorkItem.java:217)
                             at org.teiid.dqp.internal.process.DQPWorkContext.runInContext(DQPWorkContext.java:245)
                             at org.teiid.dqp.internal.process.ThreadReuseExecutor$RunnableWrapper.run(ThreadReuseExecutor.java:123)
                             at org.teiid.dqp.internal.process.ThreadReuseExecutor$3.run(ThreadReuseExecutor.java:298)
                             at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) [rt.jar:1.7.0_05]
                             at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) [rt.jar:1.7.0_05]
                             at java.lang.Thread.run(Thread.java:722) [rt.jar:1.7.0_05]
                        
                        
                        • 9. Re: Proper ways to close Continuous execution
                          shawkins

                          Andriy,

                           

                          You should not be required to known when the end of a batch/window results is occurring.  There are several things here.  The first is that there is an issue with the NonBlockingRowProcessor, when it calls onException in the while loop, it needs to break.  Instead it keeps looping, which is the behavior you are seeing.  The next issue is that you don't want to see exceptions related to closure as hard errors.  That's also easily addressed.  Can you log an issue to cover these?

                           

                          Also isLast should return false for continuous queries since we keep appending results to the same resultset with each execution and never consider there being a last row.

                           

                          Steve

                          • 10. Re: Proper ways to close Continuous execution
                            rokhmanov

                            Thanks Steve, I opened TEIID-2081 for the issues above.

                            • 11. Re: Proper ways to close Continuous execution
                              shawkins
                              1 of 1 people found this helpful
                              • 12. Re: Proper ways to close Continuous execution
                                rokhmanov

                                I tested fix for TEIID-2081 - the issue is resolved.