1 2 Previous Next 15 Replies Latest reply on Sep 17, 2013 10:41 AM by ybxiang.china

    TCP client on EJB with JBOSS

    romk

      Hi All!

       

      I'm doing distributed system, based on JBOSS (AS 7.1) which would do:

       

      1. Monitoring of some hardware (PLC or whatever)

      2. Received monitoring results need to be transformed

          - to entities for saving to database (according database saving model)

          - to beans for GUI client (Standalone remote applications and Web) (according client view model)

       

      Now I am thinking about first problem: which would be the best way to create monitoring client for the hardware using EJB.

       

      For example, let it be simple TCP monitoring: I need to receive message which contains some hardware parameters every second.

      I use @Schedule(second="*/1", minute="*",hour="*", persistent=false)

      The question is how to make TCP exchange using EJB correctly?

        • 1. Re: TCP client on EJB with JBOSS
          wdfink

          I'm not sure whether I understand it correct.

          Do you create an EJB application to monitor the HW?

           

          If you try to check whether a system is alive you will open a TCP connection?

          From the EJB specification this is not allowed, nevertheless IO will work and you can do it.

          • 2. Re: TCP client on EJB with JBOSS
            romk

            Yes, that's right I'm thinking about monitoring HW from EJB application.


            I need EJB application to provide data storage and to support distributed clients.

            I guess that it would be more correct - to create simple TCP Server layer, which monitors HW by TCP and ten calls remote EJBs to exchange data. But this way I have one application more and I'm just checking if ways to do it from EJB.

            • 3. Re: TCP client on EJB with JBOSS
              ybxiang.china

              I had developed an HW Monitoring system similar to yours.

              I think you can do it like my codes:

               

              public interface ITesterMonitoringTimer {

              }

               

              package com.asb.tms.ejb.timer;

              import java.util.Iterator;

              import java.util.logging.Logger;

              import javax.annotation.PostConstruct;

              import javax.annotation.Resource;

              import javax.ejb.EJB;

              import javax.ejb.Local;

              import javax.ejb.ScheduleExpression;

              import javax.ejb.SessionContext;

              import javax.ejb.Singleton;

              import javax.ejb.Startup;

              import javax.ejb.Timeout;

              import javax.ejb.TimerConfig;

              import javax.ejb.TimerService;

              import javax.persistence.EntityManager;

              import javax.persistence.PersistenceContext;

              import com.asb.tms.common.TestManager;

              import com.asb.tms.ejb.entity.TesterProduct;

              import com.asb.tms.ejb.session.ITesterProductSession;

              import com.asb.tms.ejb.session.core.ICoreService;

              import com.asb.tms.query.QueryResult;

              @Local(ITesterMonitoringTimer.class)

              @Singleton

              @Startup

              public class TesterMonitoringTimer implements ITesterMonitoringTimer{

                  Logger log = Logger.getLogger(TesterMonitoringTimer.class.getName());

               

                  @EJB

                  ICoreService coreService;

                  @EJB

                  ITesterProductSession testerProductSession;

                

                

                  @Resource

                  private SessionContext sessionContext;

                

                  @Resource

                  TimerService timerService;

                

                  @PersistenceContext

                  protected EntityManager em;

                

                  private String getMinuteExpression(){

                      if(TestManager.isTesting_TesterMonitoringTimer){

                          return "*";//"0,5,10,15,20,25,30,35,40,45,50,55";

                      }

                    

                      String s = "0,15,30,45";

                      int i = coreService.getTesterMonitoringPeriodMinutes();

                      if(i==10){

                          s = "0,10,20,30,40,50";

                      }else if(i==15){

                          //s = "0,15,30,45"

                      }else if(i==20){

                          s = "0,20,40";

                      }else if(i==30){

                          s = "0,30";

                      }else if(i==60){

                          s = "0";

                      }

                      return s;

                  }

                  @PostConstruct

                  public void createTimer() {

                      log.info("==>starting TesterMonitoringTimer timer...");

               

                      ScheduleExpression scheduleExpression = new ScheduleExpression();

                      //

                      scheduleExpression.second(0);//[0,59]

                      scheduleExpression.minute(getMinuteExpression());//[0,59]//[0,15,30,45]

                      scheduleExpression.hour("*");//[0,23]

                      timerService.createCalendarTimer(scheduleExpression, new TimerConfig(null,false));

                      log.info("==>timer TesterMonitoringTimer started!!!");

                  }

                

                  @Timeout

                  public void timeout( ){

                      try{

                          //log.info("TesterMonitoringTimer...");

                          QueryResult queryResult = testerProductSession.listMonitored();

                          @SuppressWarnings("unchecked")

                          Iterator<TesterProduct> it = queryResult.records.iterator();

                          while(it.hasNext()){

                              TesterProduct tp = it.next();

                              //log.info("Scheduling for product:"+tp.getId()+" ......");

                              coreService.getTesterMonitoringThreadPool().execute(new MonitoringTask(tp,coreService.getMonitoringShellWorkingDirectoryFullPath()));

                              //log.info("Scheduled for product:"+tp.getId());

                          }

                      }catch(Exception e){

                          log.severe(e.getMessage());

                      }

                  }

              }

              • 4. Re: TCP client on EJB with JBOSS
                ybxiang.china

                package com.asb.tms.ejb.session.core;

                 

                import java.util.concurrent.Executor;

                 

                public interface ICoreService {

                    public String getJaasSecurityDomain();

                    public String getServerBindAddress();

                    public int getTesterMonitoringPeriodMinutes();

                    public String getMonitoringShellWorkingDirectoryFullPath();

                    //

                    public Executor getTesterMonitoringThreadPool();

                    //

                    public String getSSH2ServerHostName();

                    public String getSSH2ServerUserName();

                    public String getSSH2ServerPassword();

                    //

                    public int getMinitoringShellRunningTimeoutMinutes();

                }

                 

                 

                 

                 

                 

                package com.asb.tms.ejb.session.core;

                 

                import java.io.File;

                import java.util.concurrent.Executor;

                import java.util.concurrent.Executors;

                import java.util.logging.Logger;

                 

                import javax.annotation.PostConstruct;

                import javax.annotation.PreDestroy;

                import javax.annotation.security.PermitAll;

                import javax.annotation.security.RolesAllowed;

                import javax.ejb.Local;

                import javax.ejb.Singleton;

                import javax.ejb.Startup;

                import javax.persistence.EntityManager;

                import javax.persistence.PersistenceContext;

                 

                import com.asb.tms.common.KnownJaasRoles;

                import com.asb.tms.common.ServerConstants;

                 

                @Local(ICoreService.class)

                @Singleton

                @Startup

                public class CoreService implements ICoreService{

                    Logger log = Logger.getLogger(CoreService.class.getName());

                  

                    //*********************************[member fields]*********************************//

                    //Initialization of System Properties configured in standalone.xml.

                    private String jaasSecurityDomain;

                    private String serverBindAddress;

                    //private String contextPath;

                    private int testerMonitoringPeriodMinutes;

                    private String monitoringShellWorkingDirectoryFullPath;

                    //

                    private Executor testerMonitoringThreadPool;

                    //

                    private String ssh2ServerHostName;

                    private String ssh2ServerUserName;

                    private String ssh2ServerPassword;

                    //

                    private int minitoringShellRunningTimeoutMinutes;

                  

                    @PersistenceContext

                    private EntityManager em;

                  

                    @PostConstruct

                    public void start() throws Exception {

                        log.info(ServerConstants.SERVICE_STARTING_FLAG);

                        initSystemProperties();

                        log.info(ServerConstants.SERVICE_STARTED_FLAG);

                    }

                 

                    @PreDestroy

                    public void destroy(){

                        log.info(ServerConstants.SERVICE_DESTROYING_FLAG);

                        log.info(ServerConstants.SERVICE_DESTROYED_FLAG);

                    }

                  

                    //*********************************[interface methods]*********************************//

                    @Override

                    @RolesAllowed({KnownJaasRoles.ADMINISTRATOR})

                    public String getJaasSecurityDomain(){

                        return jaasSecurityDomain;

                    }

                  

                    @Override

                    @PermitAll()

                    public String getServerBindAddress(){

                        return serverBindAddress;

                    }

                  

                    @Override

                    @PermitAll()

                    public int getTesterMonitoringPeriodMinutes(){

                        return testerMonitoringPeriodMinutes;

                    }

                  

                    @Override

                    @PermitAll()

                    public String getMonitoringShellWorkingDirectoryFullPath(){

                        return monitoringShellWorkingDirectoryFullPath;

                    }

                  

                    @Override

                    @PermitAll()

                    public Executor getTesterMonitoringThreadPool(){

                        return testerMonitoringThreadPool;

                    }

                  

                    @Override

                    @PermitAll()

                    public String getSSH2ServerHostName(){

                        return ssh2ServerHostName;

                    }

                    @Override

                    @PermitAll()

                    public String getSSH2ServerUserName(){

                        return ssh2ServerUserName;

                    }

                    @Override

                    @PermitAll()

                    public String getSSH2ServerPassword(){

                        return ssh2ServerPassword;

                    }

                    @Override

                    @PermitAll()

                    public int getMinitoringShellRunningTimeoutMinutes(){

                        return minitoringShellRunningTimeoutMinutes;

                    }

                  

                    //*********************************[helper methods]*********************************//

                    /**

                     * 初始化一些属性供其它服务使用

                     */

                    private void initSystemProperties(){

                        try{

                            jaasSecurityDomain = System.getProperty(ServerConstants.SYSTEM_PROPERTY_KEY_ASB_TMS_JAAS_SECURITY_DOMAIN,ServerConstants.DEFAULT_ASB_TMS_JAAS_SECURITY_DOMAIN);

                            log.info("jaasSecurityDomain="+jaasSecurityDomain);

                            //

                            serverBindAddress = System.getProperty(ServerConstants.SYSTEM_PROPERTY_KEY_JBOSS_BIND_ADDRESS,ServerConstants.DEFAULT_JBOSS_BIND_ADDRESS);

                            //

                            testerMonitoringPeriodMinutes = Integer.parseInt(System.getProperty(ServerConstants.SYSTEM_PROPERTY_KEY_TIMER_testerMonitoringPeriodMinutes,ServerConstants.DEFAULT_TIMER_testerMonitoringPeriodMinutes));

                            //

                            monitoringShellWorkingDirectoryFullPath = System.getProperty(ServerConstants.SYSTEM_PROPERTY_KEY_JBOSS_SERVER_TEMP_DIR)+File.separator+"ASB-WORK";

                            File f = new File(monitoringShellWorkingDirectoryFullPath);

                            if(!f.exists()){

                                f.mkdirs();

                            }

                            if(!f.exists()){

                                log.severe("Can NOT create working directory:"+monitoringShellWorkingDirectoryFullPath);

                            }

                            //

                             int corePoolSize = Integer.parseInt(System.getProperty(ServerConstants.SYSTEM_PROPERTY_KEY_TIMER_testerMonitoringThreadPoolCorePoolSize, ServerConstants.DEFAULT_TIMER_testerMonitoringThreadPoolCorePoolSize));

                            testerMonitoringThreadPool = Executors.newScheduledThreadPool(corePoolSize);

                            //

                            ssh2ServerHostName = System.getProperty(ServerConstants.SYSTEM_PROPERTY_KEY_SSH2Server_hostName,ServerConstants.DEFAULT_SSH2Server_hostName);

                            ssh2ServerUserName = System.getProperty(ServerConstants.SYSTEM_PROPERTY_KEY_SSH2Server_userName,ServerConstants.DEFAULT_SSH2Server_userName);

                            ssh2ServerPassword = System.getProperty(ServerConstants.SYSTEM_PROPERTY_KEY_SSH2Server_password,ServerConstants.DEFAULT_SSH2Server_password);

                            //

                            minitoringShellRunningTimeoutMinutes = Integer.parseInt(System.getProperty(ServerConstants.SYSTEM_PROPERTY_KEY_MONITORINGSHELL_runningTimeoutMinutes, ServerConstants.DEFAULT_MONITORINGSHELL_runningTimeoutMinutes));

                          

                            //

                          

                            log.info("serverBindAddress="+serverBindAddress);

                            log.info("Monitoring Shell Working Directory="+monitoringShellWorkingDirectoryFullPath);

                        }catch(Exception e){

                            log.severe(e.getMessage());

                        }

                    }

                    private void printSystemProperty(String key){

                        log.info(key+": "+System.getProperty(key));

                    }

                  

                //    @PermitAll()

                //    public String getContextPath(){

                //        return contextPath;

                //    }

                }

                • 5. Re: TCP client on EJB with JBOSS
                  ybxiang.china

                  package com.asb.tms.ejb.timer;

                   

                  import java.io.File;

                  import java.util.logging.Logger;

                   

                   

                   

                  import com.asb.tms.common.FileHelper;

                  import com.asb.tms.common.ServerConstants;

                  import com.asb.tms.common.ServerNamingContextHelper;

                  import com.asb.tms.common.TestManager;

                  import com.asb.tms.ejb.entity.Status;

                  import com.asb.tms.ejb.entity.TesterMonitoring;

                  import com.asb.tms.ejb.entity.TesterProduct;

                  import com.asb.tms.ejb.session.core.ICoreService;

                  import com.ybxiang.ssh2.ra.api.SSHCommandResult;

                   

                  public class MonitoringTask implements Runnable {

                      Logger log = Logger.getLogger(MonitoringTask.class.getName());

                    

                  //    @EJB

                  //    ICoreService coreService;

                    

                      private final TesterProduct product;

                      private final String monitoringShellWorkingDirectoryFullPath;

                      private final int minitoringShellRunningTimeoutMinutes;

                    

                      public MonitoringTask(TesterProduct product, String monitoringShellWorkingDirectoryFullPath){

                          this.product = product;

                          this.monitoringShellWorkingDirectoryFullPath = monitoringShellWorkingDirectoryFullPath;

                          //

                          ICoreService coreService = null;

                          try{

                              coreService = ServerNamingContextHelper.getCoreService();

                          }catch(Exception e){

                          }

                          minitoringShellRunningTimeoutMinutes = coreService==null

                                  ?Integer.parseInt(ServerConstants.DEFAULT_MONITORINGSHELL_runningTimeoutMinutes)

                                  :coreService.getMinitoringShellRunningTimeoutMinutes();

                      }

                    

                      @Override

                      public void run() {

                        

                          TesterMonitoring testerMonitoring = new TesterMonitoring();

                          String errorMessage = null;

                          try{

                              //1. prepare shell file

                              prepareShellFile();

                            

                              //2. run shell file: perl shellFileName

                              SSHCommandResult sshCommandResult = null;

                              String command = null;

                              String directory = null;

                              long timeout = 6000;

                              //

                            

                              if(TestManager.isTesting_MonitoringTask){

                                  //command = "echo 'online=true;busy=true'";//example: "ls -al" or "uname -a"

                                  int r = new java.util.Random().nextInt(4);

                                  if(r==0){

                                      command = "sleep 15; echo '["+product.getId()+"]:online=true;busy=true'";

                                  }else if(r==1){

                                      command = "sleep 15; echo '["+product.getId()+"]:online=true;busy=false'";

                                  }else if(r==2){

                                      command = "sleep 15; echo '["+product.getId()+"]:online=false'";

                                  }else if(r==3){

                                      command = "sleep 15; echo '["+product.getId()+"]:online=false'";

                                  }

                                

                                  directory = "~";//example: "~"

                                  timeout = 6000;//6 seconds

                              }else{

                                  command = product.getTesterType().getMonitoringShell().getShellLanguage().getCommandName()+" "+getShellFileName() +" "+ product.getParameterValues();

                                  directory = monitoringShellWorkingDirectoryFullPath;

                                  timeout = minitoringShellRunningTimeoutMinutes;

                              }

                            

                              if(TestManager.isTesting_MonitoringTask){//(before)

                                  log.info("Running command for product (id="+product.getId()+"):"+command);

                              }

                              sshCommandResult = ServerNamingContextHelper.getSSH2Executor().execute(command, directory, timeout);

                              if(TestManager.isTesting_MonitoringTask){//(after)

                                  log.info("Result of Running command for product (id="+product.getId()+"):"+sshCommandResult.result);

                              }

                            

                              if(sshCommandResult==null){

                                  log.warning("sshCommandResult is null. Please check if SSH2 Connections are exhausted.");

                              }

                              //3. parse result

                              if(sshCommandResult.status==Status.EXIT_CODE_NORMAL){//正常退出

                                  String log = sshCommandResult.result;

                                  if(log==null){

                                      testerMonitoring.setStatus(Status.SHELL_EXCEPTION);

                                      errorMessage = "Monitoring shell doesn't print any log.";

                                  }else{

                                      log = log.trim().toLowerCase();

                                      if(log.indexOf("online=true")>=0){

                                          if(log.indexOf("busy=true")>=0){

                                              testerMonitoring.setStatus(Status.ONLINE_OCCUPIED);

                                          }else if(log.indexOf("busy=false")>=0){

                                              testerMonitoring.setStatus(Status.ONLINE_IDLE);

                                          }else{

                                              testerMonitoring.setStatus(Status.UNREACHABLE);

                                              errorMessage = "'onlie' state is true, but 'busy' state isn't set.";

                                          }

                                      }else if(log.indexOf("online=false")>=0){

                                          testerMonitoring.setStatus(Status.UNREACHABLE);

                                          errorMessage = null;

                                      }else{

                                          testerMonitoring.setStatus(Status.SHELL_EXCEPTION);

                                          errorMessage = "Result log doesn't contain 'online' state[true|false].";

                                      }

                                  }

                              }else{

                                  testerMonitoring.setStatus(Status.SHELL_EXCEPTION);

                                  errorMessage = sshCommandResult.error;

                              }

                              //

                          }catch(Exception e){

                              log.severe("com.asb.tms.ejb.timer.MonitoringTask.run():"+e);

                              testerMonitoring.setStatus(Status.TMS_EXCEPTION);

                              errorMessage = "未知异常: "+e.getMessage();

                          }

                        

                          //save result

                          try{

                              if(errorMessage!=null){

                                  if(errorMessage.length() > TesterMonitoring.MAX_ERROR_MSG_LENGTH){

                                      errorMessage = errorMessage.substring(0,TesterMonitoring.MAX_ERROR_MSG_LENGTH-3)+"...";

                                  }

                              }

                              //

                              testerMonitoring.setError(errorMessage);

                              ServerNamingContextHelper.getTesterMonitoringSession().create(testerMonitoring,product.getId());

                          }catch(Exception e){

                              log.severe(e.getMessage());

                          }

                      }

                    

                      private void prepareShellFile(){

                          //Override the old content if the file exists.

                          FileHelper.writeFile(getShellFilePath(),product.getTesterType().getMonitoringShell().getContent());

                      }

                      private String getShellFileName(){

                          return String.valueOf(product.getId());

                      }

                      private String getShellFilePath(){

                          return monitoringShellWorkingDirectoryFullPath+File.separator+getShellFileName();

                      }

                   

                  }

                  • 6. Re: TCP client on EJB with JBOSS
                    ybxiang.china

                    About the SSH2Executor, please refer to JBoss5.1.0 resource adapter(JCA) - SSH2  and JBoss AS 7 resource adapter(JCA).

                     

                    I hope you can understand my codes.

                     

                    • 7. Re: Re: TCP client on EJB with JBOSS
                      romk

                      Thanks, Xiang!


                      And I have a question:
                      You are using executor in TesterMonitoringTimer:

                      coreService.getTesterMonitoringThreadPool().execute(new MonitoringTask(tp,coreService.getMonitoringShellWorkingDirectoryFullPath()));

                       

                      As I understand EJB spec, it says nothing about client sockets (and I think it allows this), but doesn't greet server sockets listening and any threads in EJBs. (http://stackoverflow.com/questions/4587673/ejbs-and-threading). So the question is what is the goal of using Executor?
                      I am new with EJB development and just want to understand.

                       

                      Another interesting thing I found is JCA Connector project to connect to a remote TCP Server
                      http://www.mastertheboss.com/ironjacamar/create-your-first-jca-connector-tutorial

                      But now I have some problems when I try to deploy my own connector to JBOSS in a rar-archive. May be better would start a new topic for this,
                      but want to ask, did you try this in your project?

                       

                      Just yet I created simple EJB tcp client based on EJB timer.

                      • 8. Re: TCP client on EJB with JBOSS
                        ybxiang.china

                        but doesn't greet server sockets listening and any threads in EJBs. (http://stackoverflow.com/questions/4587673/ejbs-and-threading). So the question is what is the goal of using Executor?

                         

                        ~~~~~~~~~~

                        (1) "doesn't greet" just means "NOT suggested"!, NOT "forbidden".

                        If you can control life-cycle of server-sockets/Executor well in EJB and do NOT forget to recycle resources, I think you can do anything in EJB.

                        Actually, any EJB is a service. Do you agree?

                         

                        (2) My Executor: it is a thread pool with some worker-threads to execute the tasks.

                        • 9. Re: TCP client on EJB with JBOSS
                          ybxiang.china

                          my "SSH2Executor" is implemented by my Resource Adapter project.

                          The SSH Server and SSH clients are on the same host.
                          SSH clients is managed by "SSH2Executor".

                           

                          What I post here is from my project which works well.

                          • 10. Re: TCP client on EJB with JBOSS
                            ybxiang.china

                            As I understand EJB spec, it says nothing about client sockets (and I think it allows this), but doesn't greet server sockets listening and any threads in EJBs.

                             

                            ~~~~~~~ EJB specs and many forums do NOT tell us how to open server socket in J2EE Container. They just tell us "had better not...".

                            • 11. Re: TCP client on EJB with JBOSS
                              ybxiang.china

                              But now I have some problems when I try to deploy my own connector to JBOSS in a rar-archive.

                              ~~~~~~~~~I package my RAR-archive in my EAR archive. It works well. I think pure RAR can be deployed successfully too.

                               

                              <?xml version="1.0" encoding="UTF-8"?>

                              <application xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

                                  xmlns="http://java.sun.com/xml/ns/javaee"

                                  xmlns:application="http://java.sun.com/xml/ns/javaee"

                                  xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/application_6.xsd"

                                  id="Application_ID" version="6">

                                <display-name>asb-tms</display-name>

                                <module>

                                  <ejb>asb-tms-ejb.jar</ejb>

                                </module>

                                <module>

                                  <web>

                                    <web-uri>asb-tms-war.war</web-uri>

                                    <context-root>/tms</context-root>

                                  </web>

                                </module>

                                <module>

                                  <connector>asb-tms-ssh2-ra-app.rar</connector>

                                </module>

                              </application>

                              • 12. Re: TCP client on EJB with JBOSS
                                ybxiang.china

                                Would you please describe your RAR in detail  in a new thread?

                                I do NOT find any RAR related codes in your attachments(a EJB3 timer, a TCP client, and a TCP Server).

                                I hope you describe you problem step by step in detail. Nobody likes too simple question.

                                • 13. Re: TCP client on EJB with JBOSS
                                  wdfink

                                  Hi Xiang,

                                  you should not post the source code as it makes the thread unreadable.

                                  you can attach the files if you use the advanced editor, you need to klick 'edit' after you post the comment to add attachments

                                  • 14. Re: TCP client on EJB with JBOSS
                                    wdfink

                                    From the Java EJB spec it is not allowed to create Threads or use any kind of IO from the EJB. You should not make any assumption regarding the servers environment.

                                    But technical this would work with some restrictions, i.e. dependencies to the environment and the used application server.

                                    It will be the most failure that threads are created and not handled correct and you will have a thread leak.

                                     

                                    The JCA adapter is the correct approach to use outbound or inbound connections. But with your approach it looks like a big overhead..

                                     

                                    You might stay with the approach and accept the restrictions.

                                    1 2 Previous Next