As you may know, JBoss AS 7 is incredible fast and low-memory, and it comes with Hibernate 4.0.0 by default. Since AS 7.0.1, it is easy for you to support both Hibernate 3.x  and Hibernate 4.0.0 under JBoss AS, thanks to the modular classpath here. If you want to run Hibernate 3.x under AS7 along with Hibernate 4, you should check out this blog entry and this documentation.

In RiftSaw 3 development, we are taking this opportunity to upgrading its JPA layer from JPA 1.0 to JPA 2.0, as JPA 2.0 is back compatible, it shouldn’t be an big issue. However, we’ve implemented the ConnectionProvider and TransactionManagerLookup interfaces, since we’ve created the Datasource and TransactionManager on our own, we need to implement these two interfaces to ask Hibernate to use our own one.

In Hibernate 4.x, the whole package has been split into three categories, API/Impl/SPI.  So the ConnectionProvider class has been re-packaged at: org.hibernate.service.jdbc.connections.spi.ConnectionProvider, if you see the internal implementation of this, you would also notice that it also implements the Configurable and Stoppable. The Configurable interface allows you to access to the Hibernate’s properties, which is kept as Map. It is much like Spring’s BeanNameAware interface, where you get to access to the BeanName. So below is the code snippet that I used for my ConnectionProvider.

public class DataSourceConnectionProvider implements ConnectionProvider, Configurable, Stoppable  {

  private Properties _props;

  private boolean available = true;

  public DataSourceConnectionProvider() {
  }

  @SuppressWarnings( {"unchecked"})
  public void configure(Map properties) {
	 _props = new Properties();
	 _props.putAll(properties);
  }

  public Connection getConnection() throws SQLException {
	if (!available) {
		throw new HibernateException( "Provider is closed!" );
	}
    Connection c = HibernateUtil.getConnection(_props);
    DbIsolation.setIsolationLevel(c);
    return c;
  }

  public void closeConnection(Connection con) throws SQLException {
    con.close();
  }

  public boolean supportsAggressiveRelease() {
    return true;
  }

  public boolean isUnwrappableAs(Class unwrapType) {
		return ConnectionProvider.class.equals(unwrapType) ||
				DataSourceConnectionProvider.class.isAssignableFrom(unwrapType);
  }

  @SuppressWarnings( {"unchecked"})
  public  T unwrap(Class unwrapType) {
		if (ConnectionProvider.class.equals(unwrapType) ||
				DataSourceConnectionProvider.class.isAssignableFrom(unwrapType)) {
			return (T) this;
		} else {
			throw new UnknownUnwrapTypeException( unwrapType );
		}
  }

  public void stop() {
	available = false;
  }
}

For the TransactionManagerLookup interface, you would get WARN  saying this API has been deprecated ( I believe it has been deprecated before 4.x, but I didn’t get a chance to upgrade it until now), we should implement the org.hibernate.service.jta.platform.spi.JtaPlatform SPI instead, and set it to the ‘hibernate.transaction.jta.platform’ property. Instead of having a class that implement the JtaPlatform SPI directly, I’ve subclassed it from the  AbstractJtaPlatform class, so that I’ve only needed to override two interfaces. Below is the code snippet for my custom JtaPlatform impl.

/**
 *
 * uses {@link HibernateUtil} to obtain the JTA {@link TransactionManager} object.
 *
 * @author Jeff Yu
 *
 */
public class OdeJtaPlatform extends AbstractJtaPlatform {

	private Properties properties = new Properties();

	public void configure(Map configValues) {
		super.configure(configValues);
		properties.putAll(configValues);
	}

	@Override
	protected TransactionManager locateTransactionManager() {
		return HibernateUtil.getTransactionManager(properties);
	}

	@Override
	protected UserTransaction locateUserTransaction() {
		throw new UnsupportedOperationException("locateUserTransaction is not supported. Use the locateTransactionManager instead.");
	}

} 

And then when you create your entityManagerFactory, remember to pass the following properties in.

props.put(Environment.CONNECTION_PROVIDER, DataSourceConnectionProvider.class.getName());
props.put(Environment.JTA_PLATFORM, OdeJtaPlatform.class.getName());

Done, the custom ConnectionProvider and JtaPlatform implementation should be all set now.

Hope this will help you do the Hibernate upgrade at some point. Here, I’d like to thanks Strong Liu to send the helpful links to me on this upgrading. :-)

,

JBoss AS Version 5 and above used the JBoss MicroContainer project to accomplish the service management/dependency injection. If we want to write a service that uses a JBoss Server service, like the ‘HAPartitionService’ for example, the below shows several ways that can be done.

1. Using JBoss MC’s xml syntax
This is the most common and recommended way to do. Below is the JBoss RiftSaw Clustering service that uses the ‘HAPartitonService’ from JBoss AS Cluster.

   <bean name="RiftSawClusteringService" class="org.jboss.soa.bpel.clustering.JBossClusteringService">
   	 <property name="haPartition"><inject bean="HAPartition" /></property>
   	 <property name="bpelEngineName">bpel/Engine</property>
   	 <depends>BPELEngine</depends>
   </bean>

See Ales’ Advanced Dependency Injections article for more about MC’s xml syntax.

 

2. Using JBoss MC’s service programmatically
This way actually is the main topic for this blog entry. If you are a Spring framework user, you would know you can use BeanFactory or ApplicationContext to get the service bean. Then in JBoss, you would wonder, whats the equivalent way to do this?

1) org.jboss.kernel.Kernel, this is the service that has the KernelController and ControllerContext that we need for getting the service. so firstly, you can define a service in -jboss-bean.xml that injects the Kernel service, like the following:

<!--    Locate the single instance of the kernel    -->
<bean name="org.jboss.soa.bpel.runtime.util:service=KernelLocator"
     class="org.jboss.soa.bpel.runtime.integration.KernelLocator">
    <property name="kernel">
      <inject bean="jboss.kernel:service=Kernel" />
    </property>
</bean>

We’ve defined above class as following:

package org.jboss.soa.bpel.runtime.integration;
public class KernelLocator{
  private static Kernel kernel;
  public static Kernel getKernel()  {    return KernelLocator.kernel;  }
  public void setKernel(Kernel kernel)  {    KernelLocator.kernel = kernel;  }
}

2) Use the KernelController and ControllerContext to get the service that we defined in *-jboss-bean.xml.
Below is the code to actually obtain the started service out of JBoss AS.

public class KernelAwareSPIFactory{
   @SuppressWarnings("unchecked")
   public <T> T getKernelProvidedSPI(String beanName, Class<T> spiArtifact)   {
      KernelController controller = KernelLocator.getKernel().getController();
      ControllerContext ctx = controller.getInstalledContext(beanName);
      return (T)ctx.getTarget();   }
}

In the end, let’s take an example from RiftSaw code base to show how it was used.

 

In RiftSaw, we’ve defined a ServerConfig interface.

public interface ServerConfig{
  /** The default bean name */
  String BEAN_NAME = "org.jboss.soa.bpel.runtime.util:service=ServerConfig";

  String getImplementationTitle();

  String getImplementationVersion();

  File getServerTempDir();

  File getServerDataDir();

  String getWebServiceHost();

  int getWebServicePort();

  .....
}

And then we’ve had the ServerConfigImpl class.

public class ServerConfigImpl implements ServerConfig{  ....}

With this implementation, we’ve defined the ServerConfig service in the *-jboss-bean.xml as following.

<!--       ServerConfig    -->
 <bean name="org.jboss.soa.bpel.runtime.util:service=ServerConfig"
        class="org.jboss.soa.bpel.runtime.integration.ServerConfigImpl">
    <property name="mbeanServer">
     <inject bean="JMXKernel" property="mbeanServer"/>
    </property>
    <property name="webServiceHost">${jboss.bind.address}</property>
  </bean>

Now, finally let’s see how we get this ServerConfig Service in our code.

ServerConfig = new KernelAwareSPIFactory()
               .getKernelProvidedSPI("org.jboss.soa.bpel.runtime.util:service=ServerConfig", ServerConfig.class

And then, here you go, you’ve got the ServerConfig service that is in the JBoss AS.

 

Hope this can help people who are doing integration with JBoss AS a little bit.

PS: Thanks Glen for pointing out some grammar errors that I did earlier. As what we used to say “your patch has been applied, thanks a lot. ;)

In Ode/RiftSaw, we make the process execution as asynchronous, which means that if you running a bpel process, we are using more than one thread to accomplish this invocation.

And this is what the scheduler-simple module for, it takes care of putting a task into database and pull tasks out of database and how to run them. Lets make a simple example here, say your bpel process has a invoke Activity to invoke an external web service. In Ode, just right before entering invoke Activity, we’ve created a job that captures this invoke activity information, and store it into the database. Because once we’ve started Ode Bpel Server, we already started a background thread that checks this ode_job table periodically, once we’ve found that there is a job needs to be executed, it will load it from database, put it into memory, and then submit it to the ExecutorService for execution.

In this blogpost, we will examine this module’s architecture and important APIs.

First is the Task API, this is the parent class for Job and SchedulerTask.

class Task {
    /** Scheduled date/time. */
    public long schedDate;

    Task(long schedDate) {
        this.schedDate = schedDate;
    }
}

It is very simple, just had a scheduled date for its execution.
Next, we will see the Job’s API, Job is for invoking an external service and like. we’ve put all of important information into the JobDetail object.

class Job extends Task {
    private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss z");
    String jobId;
    boolean transacted;
    JobDetails detail;
    boolean persisted = true;

    public Job(long when, String jobId, boolean transacted, JobDetails jobDetail) {
        super(when);
        this.jobId = jobId;
        this.detail = jobDetail;
        this.transacted = transacted;
    }
....
}

Now, lets look at another type of Task, which is called SchedulerTask.

private abstract class SchedulerTask extends Task implements Runnable {
    SchedulerTask(long schedDate) {
        super(schedDate);
    }
}

This is an abstract class, its subclasses are: LoadImmediateTask, UpgradeJobsTask, CheckStaleNodes.

 

To understand these tasks, it is better that we look at what SimpleScheduler class defined. In Ode, the job design was based around three time horizons: “immediate”, “near future”, and “everything else”.
Immediate jobs (i.e. jobs that are about to be up) are written to the database and kept in an in-memory priority queue. When they execute, they are removed from the database. Near future jobs are placed in the database and assigned to the current node, however they are not stored in
memory. Periodically jobs are “upgraded” from near-future to immediate status, at which point they get loaded into memory. Jobs that are further out in time, are placed in the database without a node identifer; when they are ready to be “upgraded” to near-future jobs they are assigned to one
of the known live nodes. recovery is straight forward, with stale node identifiers being reassigned to known good nodes.

In terms of time, we defined two variables, one is: _immediateInterval and _nearFutureInterval.
if a job’s scheduled date is between [now, now + _immediateInterval], it belongs to the “immediate” job.
while if it is in [now + _immediateInterval, now + _nearFutureInterval], it belongs to the “near future” job then.

You can check the SimpleScheduler.doLoadImmediate() and SimpleScheduler.doUpgrade() respectively for its logic.

Also, you may be aware that we’ve also had the CheckStaleNodes task, this is basically for clustering work, to
check if there are any stale nodes, if it has, we will move the assigned jobs over to other node by updating nodeId.

So now, we’ve seen different Tasks, like Jobs and SchedulerTask. Now, we will need an interface to run these Tasks, hence TaskRunner was introduced.

interface TaskRunner {
    public void runTask(Task task);
}

Here is the implementation from SimpleScheduler.TaskRunner() method.

public void runTask(final Task task) {
    if (task instanceof Job) {
        Job job = (Job)task;
        if( job.detail.getDetailsExt().get("runnable") != null ) {
            runPolledRunnable(job);
        } else {
            runJob(job);
        }
    } else if (task instanceof SchedulerTask) {
        _exec.submit(new Callable() {
            public Void call() throws Exception {
                try {
                    ((SchedulerTask) task).run();
                } catch (Exception ex) {
                    __log.error("Error during SchedulerTask execution", ex);
                }
                return null;
            }
        });
    }
}

As I said before, once we’ve start BpelServer, we will start a thread running, it only gets stopped only when the BpelServer is been stopped.
Thats called SchedulerThread.

 

In this class, basically we had following members: PriorityBlockingQueue, this is queue for the immediate execution. TaskRunner, this is the
container for running Task. The logic for the running is quite straight forward.

/**
 * Pop items off the todo queue, and send them to the task runner for processing.
 */
public void run() {
    while (!_done) {
        _lock.lock();
        try {
            long nextjob;
            while ((nextjob = nextJobTime()) &gt; 0 &amp;&amp; !_done)
                _activity.await(nextjob, TimeUnit.MILLISECONDS);

            if (!_done &amp;&amp; nextjob == 0) {
                Task task = _todo.take();
                _taskrunner.runTask(task);

            }
        } catch (InterruptedException ex) {
            ; // ignore
        } finally {
            _lock.unlock();
        }
    }
}

Now that we’ve seen all of important APIs here, we will look at how we start SimpleScheduler when ODEServer is started.
excerpt from SimpleScheduler.start() method.

public synchronized void start() {
    if (_running)
        return;

    if (_exec == null)
        _exec = Executors.newCachedThreadPool();

    _todo.clearTasks(UpgradeJobsTask.class);
    _todo.clearTasks(LoadImmediateTask.class);
    _todo.clearTasks(CheckStaleNodes.class);
    _processedSinceLastLoadTask.clear();
    _outstandingJobs.clear();

    _knownNodes.clear();

    try {
        execTransaction(new Callable() {

            public Void call() throws Exception {
                _knownNodes.addAll(_db.getNodeIds());
                return null;
            }

        });
    } catch (Exception ex) {
        __log.error("Error retrieving node list.", ex);
        throw new ContextException("Error retrieving node list.", ex);
    }

    long now = System.currentTimeMillis();

    // Pretend we got a heartbeat...
    for (String s : _knownNodes) _lastHeartBeat.put(s, now);

    // schedule immediate job loading for now!
    _todo.enqueue(new LoadImmediateTask(now));

    // schedule check for stale nodes, make it random so that the nodes don't overlap.
    _todo.enqueue(new CheckStaleNodes(now + randomMean(_staleInterval)));

    // do the upgrade sometime (random) in the immediate interval.
    _todo.enqueue(new UpgradeJobsTask(now + randomMean(_immediateInterval)));

    _todo.start();
    _running = true;
}

Also, please noted that we had two different types of JobProcessor, one is ordinary JobProcessor, the other one is PolledRunnableJobProcessor, which is meant for running some jobs that gets run periodically.

 

,

If you look at the ODE source code, the BpelServer API is a very important one. In this entry, we will look at the class that how we use the BpelServer, for the ODE, lets look at the ODEServer source code.

 __log.debug("Initializing transaction manager");
 initTxMgr();
__log.debug("Creating data source.");
initDataSource();
__log.debug("Starting DAO.");
initDAO();
EndpointReferenceContextImpl eprContext = new EndpointReferenceContextImpl(this);                __log.debug("Initializing BPEL process store.");
initProcessStore(eprContext);
__log.debug("Initializing BPEL server.");
initBpelServer(eprContext);
 __log.debug("Initializing HTTP connection manager");
 initHttpConnectionManager();

// Register BPEL event listeners configured in axis2.properties file.
registerEventListeners();
registerMexInterceptors();
registerContextInterceptors();        .....

As above code shown:

  • Initialized the TransactionManager, which will be used in the datasource and scheduler service.
  • Created the data source.
  • Created the DAOConnectionFactory, the extension API is BpelDAOConnectionFactory.
  • Created the EndpointReferenceContext, which takes care of resolving EndpointReference.
  • Created the Process Store, it takes care of process deploying, undeploying, list etc.
  • Initializing the BpelServer, includes following actions: setMessageExchangeContext, setDaoConnectionFactory, setBindingContext, the BindingContext and the MessageExchangeContext are the extension points for communicating with partner services.
  • Register the EventListners, MexInterceptors etc, these are all the APIs in BpelServer.

You also could see how we initialize the BpelServer in the MockBpelServer class. In the Riftsaw project,it is in the BpelEngineImpl class. In the Riftsaw project, we are adding another implementation for the ProcessStore that leverages the JBoss Application Server’s Deployer mechanism, also we adding another implementation for the BindingContext that use the JAXWS based approach, which used the JBossWS to accomplish.

,

In this blog entry, we will continue to explore the ODE source code. Typically, we should see the ODE’s architecture in our first part of this series, but here I put it in the third part, as at that time, I was just trying to write a blog entry about the ODE’s inner model about bpel file, didn’t thought I will write this as a series.

Anyway, lets see the ODE’s architecture diagram, which I copied it from the ODE’s architecture wiki page.

On our first part, we look at the bpel compiler module, and we look at the JACOB framework on our second part.

In this part, we will try to make an introduction to ODE’s modules:

ODE core modules
1.bpel-api: It contains the api definition for ODE, some important packages are:

  • org.apache.ode.bpel.iapi: this is for integration interfaces, like Axis2 module will implement it.
  • org.apache.ode.bpel.rapi: these interfaces are for runtime api that are implemented in the bpel-runtime module.
  • org.apache.ode.bpel.pmapi: this is for the process management.
  • org.apache.ode.bpel.evt: this is for the event.

2. bpel-runtimes: this module takes care of implementing the Bpel’s Activities, like INVOKE, REPLY, WAIT by extending the JacobRunnable Object, also it is the place that includes the internal model for compiled Bpel file, and Channel definition. You would notice that it has v1 and v2 packages, thats for ODE 1.x and ODE 2.x respectively.
3. bpel-dao: this module is the API for DAO layer, currently, it doesn’t include the DAO API for process store.
4. dao-jpa: currently it is the openjpa implementation for DAO.
5. dao-jpa-db: This is the DDL script for openjpa’s impl.
6. dao-hibernate, dao-hibernate-db: these two are the Hibernate’s impl for DAO, and its DDL scripts.
7. bpel-schemas: this is module that use xmlbeans to generate Java objects from xsd schemas, they are: deploymentDescriptor, (dd.xsd), pmapi.xsd (Process Management API), schedules.xsd, context.xsd.
8. bpel-scripts: this module is having those bpel files, it is used in bpel-compiler’s test case.
9. bpel-compiler: this module is to convert the bpel file into ODE internal model for compiled bpel file.
10. il-common: this module is the common integration layer.
11. scheduler-simple: this module is the implementation of scheduler service.
12. bpel-store: this is the module takes charge of storing process from the filesystem, the artifact includes deploy.xml, .bpel, wsdl artifacts.
13. engine: this is the ode engine that uses the runtimes, dao, scheduler services.
14. bpel-ql: bpel query language.

JACOB framework module
1. jacob-ap
2. jacob

Integration modules:
1.axis2 integration: axis2, axis2-war
2.jca integration: bpel-api-jca, bpel-connector, jca-ra, jca-server
3.jbi integration: jbi
4.extension: extensions

some leftover modules are:
1. tools: this is for the bpelc, sendsoap command line.
2. utils: utils for ODE project.
3. tasks: this is tasks for buildr tool.
4. distro: this is the module for building distro.

,