![[image]](http://mowser.com/img?url=http%3A%2F%2Ftoday.java.net%2Fim%2Fa.gif)
![[image]](http://mowser.com/img?url=http%3A%2F%2Ftoday.java.net%2Fim%2Fa.gif)

Query by Slice, Parallel Execute, and Join: A Thread Pool Pattern in JavaPagination is a technique by which you can present large data sets in small chunks with forward and backward navigability. Pagination can be done with custom code or with commercial, off-the-shelf (COTS) libraries. Nevertheless, many of these frameworks first bring the full dataset to the business, presentation, or client tier and then page them into small batches. This may not be the best possible solution; for one thing, such approaches consume huge amounts of memory.
This article will first show you how to effectively utilize ROWNUM at the database level itself, so that we implement "true pagination": querying data in slices. Of course, you may also want to do some business processing to the fetched data. If you have millions of rows to be processed, you may want to process them in parallel to fully utilize the available processing power. In Java we use threads to do this, but with the advent of Java SE 5's java.util.concurrent.ThreadPoolExecutor, we also have a means to reuse the threads created. This means our paged data can be sent in batches to available threads in a thread pool. The JDK also provides us a mechanism to "join" back or aggregate the processed results from multiple threads.
By effectively combining all the above concepts, it is possible to abstract out a Thread Pool pattern in the JDK, which can be reused across your daily parallel processing solutions. This article will showcase code that can be built and run using the JDK along with your favorite database.
When you search for a product on an online site, you might get back a list of items instead of a single item. If the list is too big, you may also need some form of pagination. It is customary that in such scenarios you would also require a mechanism to navigate across pages, just like the one shown in Figure 1.
Figure 1. Navigation with pagination
As I have already mentioned in the introduction, many pagination frameworks first bring the full dataset to the business, presentation, or client tier, and then page them into small batches. Deviating from this traditional approach, let us first look into a schema where we fetch pages from the datastore in batches.
If you can somehow estimate the total fetch size for a query, you could then decide on the size of each page or batch to be queried. From this information, you can then work out the first index and the last index for each query. By limiting the query not to return more rows than is required, we can conserve system resources. Each database vendor has its own unique way to do this, and most of these solutions revolve around the ROW_NUMBER() window function.
Let us now look at one of the common ways this is implemented by enterprise databases like Oracle. In Oracle, we can use ROWNUM. The ROWNUM function is covered in the SQL Reference, Basic Elements of Oracle SQL, Chapter 2 (PDF). ROWNUM in Oracle is a pseudo-column, meaning it is not a "real" column that will show up when you describe a table using the DESC command. It doesn't exist anywhere in the database. But it exists for a row when retrieved using a query, and represents the sequential order in which Oracle has retrieved the row. The value of ROWNUM is just an integer that is assigned for each row of data fetched, and Oracle assigns the ROWNUM "on the fly" just after the data is retrieved but before the ORDER BY clause is processed.
To explain this, and much more, let me introduce a single, simple table, customers. The customers table schema is described in Figure 2.
Figure 2. DESC customers
This table can be created and pumped with data easily, by executing the script data.sql.
Let us now look at applying ROWNUM for pagination. To retrieve rows X through Y of a result set, the general form is as follows:
select *
from ( select /*+ FIRST_ROWS(n) */
a.*, ROWNUM rnum
from ( select customerid, customername, age from customers order by customerid ) a
where ROWNUM <=
:LAST_INDEX_TO_FETCH )
where rnum >= :FIRST_INDEX_TO_FETCH;
To make the ROWNUM work, I have used the ORDER BY statement to order by customerid. Since customerid is the primary key in my table, this will work, but what if the column you are ordering by is not unique? In this case, you have to add something to the end of the ORDER BY to make it so. The following additional points are to be noted in the above query statement:
FIRST_ROWS hint chooses the cost-based approach to optimize a statement block in Oracle with a goal of best response time (minimum resource usage to return first row). LAST_INDEX_TO_FETCH is set to the last row of the result set to fetch; i.e., if you wanted rows 71 to 80 of the result set, you would set LAST_INDEX_TO_FETCH to 80. FIRST_INDEX_TO_FETCH is set to the first row of the result set to fetch. i.e., to get rows 71 to 80, you would set this to 71.Figure 3 shows typical query results.
Figure 3. A paginated query
Did you like that? Rather, how many of you are now thinking of numerous other possible optimizations and workouts you can build around this data pagination using your Java tools? I am going to show you at least one such possibility, which I would abstract out as a parallel execution pattern.
Now assume that your data table is a very big table, with more than a million records, and each record is "fat," too--say, 100 or more bytes. You may want to apply some business rules to the data in these rows and then execute some time-consuming business processing. How can you do this in the least possible amount of time?
To solve our problem of processing a million rows in Java, it is easy to create a million threads and try to get them processed concurrently. But if you try to do that, your process is going to crash due to lack of resources. A better approach is to create a limited number of threads and try to execute the tasks with them. If we apply this approach to our million row problem, we may have to batch or paginate the rows, and send each batch or page to a thread. A thread will work on one page at a time, and the size of the page can be adjusted taking into consideration many aspects, a few of which are listed below:
Once a thread is started, the Java Virtual Machine calls the run() method of this thread. Any processing may be executed within the run() method, and when the processing is completed the run() method exits. It is never legal to start a thread more than once. In particular, a thread may not be restarted once it has completed execution. Still, Threads are heavy-weight objects, and if we can reuse them for further processing we can improve efficiency. We can use the Thread.sleep() method to "idle" a thread if it has completed current processing, until more tasks are available for processing. Such idling threads can be placed in a pool and when more tasks are available, we can take any idle thread from the pool and repeat the process execution.
ThreadPoolExecutor--The Smarter WayIf we are to create a new thread for each task, we would be spending more time and consuming more system resources by creating and destroying more and more threads than by doing actual business processing. A Thread Pool helps us here by providing a solution to both the problem of thread lifecycle overhead and the problem of resource thrashing. A thread pool improves efficiency by following any or all of the following strategies:
The PooledExecutor class from Doug Lea's open source library of concurrency utilities, util.concurrent, is a widely used, efficient, and correct implementation of a thread pool. Now we have the concurrency utilities in the java.util.concurrent package in the core JDK and java.util.concurrent.ThreadPoolExecutor is the main thread pool class that executes each submitted task using one of possibly several pooled threads. Let us look at few methods of interest in the ThreadPoolExecutor class, which we will see in detail in the code sample.
package java.util.concurrent;
public class ThreadPoolExecutor implements ExecutionService {
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory)
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory, RejectedExecutionHandler handler)
public void execute(Runnable command);
public int getCorePoolSize();
public int getLargestPoolSize();
public int getMaximumPoolSize();
public int getPoolSize();
public BlockingQueue<Runnable> getQueue();
public long getTaskCount();
public ThreadFactory getThreadFactory();
public boolean remove(Runnable task);
public void setCorePoolSize(int corePoolSize);
public void setKeepAliveTime(long time, TimeUnit unit);
public void setMaximumPoolSize(int maximumPoolSize);
// other methods goes here...
}
First, you need to instantiate an appropriately configured ThreadPoolExecutor. This class provides many adjustable parameters and extensibility hooks. We can either adjust these parameters to fine-tune the pool, or use the more convenient Executors factory methods, which are listed below:
Executors.newCachedThreadPool(): Unbounded thread pool, with automatic thread reclamation Executors.newFixedThreadPool(int): Fixed size thread pool Executors.newSingleThreadExecutor(): Single background threadThe Executors factory methods preconfigure settings for common usage scenarios. To manually configure and fine-tune the pool, we need to know how to configure the thread pool size and the backing queue. This is explained next.
corePoolSize: If you submit a new task when fewer than corePoolSize threads are running, a new thread is created to handle the request, even if other worker threads are idle. maximumPoolSize: If you submit a new task when more than corePoolSize threads are running but less than maximumPoolSize, a new thread is created only if the queue is full.SynchronousQueue--Hands off tasks to threads without otherwise holding them. If you submit a new task when no threads are immediately available, a new thread will be constructed. Unbounded queues Example: LinkedBlockingQueue--If you submit a new task when all corePoolSize threads are busy, the new tasks will wait. Here the number of threads will not exceed corePoolSize. Bounded queues Example: ArrayBlockingQueue--By controlling the maximumPoolSizes, we can prevent resource exhaustion.Since there are numerous books available on threads (such as Java Threads, Third Edition), let us now jump into some practical usage patterns with code.
In the sample table you created, you have about 450 customer entities, each identified separately with different customerid values. Let us assume that you need to apply some business rules or process our customer entities. Going with strict design principles, you will normally have these rules or business processing code in the middle tier. It is not wise to process a million rows one by one; nor we can attach a separate thread each for each of these entities. This is where you can batch the business entities in small chunks, and use a thread pool with a predefined number of threads to process them in parallel. We will now look into that in code:
private void testGetCustomersSortByIdInBatch()throws Exception{
Date start = new Date();
logger.info("TX-ID : " + ThreadLocalTxCounter.get() + " | Start ");
int numBatch = 0;
CustomerDao customerDao = new CustomerDaoJdbc();
int count = customerDao.getCustomerCount();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE,
KEEP_ALIVE_TIME, TimeUnit.MINUTES,
new LinkedBlockingQueue(BLOCKING_QUEUE_CAPACITY));
List bucketToCollect =
Collections.synchronizedList(new ArrayList());
Callable callable = null;
Customer customer = null;
int batch = 0;
Collection collection = new ArrayList();
logger.info("Batch #");
for(int i = 0; i < count; i += BATCH_SIZE){
if(logger.isInfoEnabled()){
System.out.print(" " + (++numBatch));
}
batch = (count - i) > BATCH_SIZE ? BATCH_SIZE : (count - i);
callable = new ObjectRelationalQueryTask(
bucketToCollect, (i + 1), (i + batch));
collection.add(callable);
}
if(logger.isInfoEnabled()){
System.out.print("\n");
}
threadPoolExecutor.invokeAll(collection);
logger.debug("-----------------------------------------");
logger.debug("bucketToCollect.size() : " +
bucketToCollect.size());
for(Iterator iterator = bucketToCollect.iterator();
iterator.hasNext();){
customer = (Customer) iterator.next();
logger.debug("customer : " + customer);
}
logger.debug("-----------------------------------------");
Date end = new Date();
logger.info("TX-ID : " + ThreadLocalTxCounter.get()
+ " | End | TimeElapsed(ms) : "
+ (end.getTime() - start.getTime()));
}
This is what we are doing in the above code:
CORE_POOL_SIZE = 4: The number of threads to keep in the pool, even if they are idle. MAXIMUM_POOL_SIZE = 7: The maximum number of threads to allow in the pool. KEEP_ALIVE_TIME = 1: If the number of threads is greater than the core, the excess idle threads will wait for new tasks before terminating up to these many time units. BLOCKING_QUEUE_CAPACITY = 5: The fixed capacity of the LinkedBlockingQueue.ArrayList that can act as a shared memory for all the threads to put results into. Next, we need to create tasks for the thread pool to work. Normally such tasks are objects that implement the Runnable interface. But the java.util.concurrent.Callable interface provides more control by allowing you to return a result or throw a checked exception (even though we will not demonstrate this here, since we already use another mechanism, the shared ArrayList described above, for collecting the response). We will use a BATCH_SIZE of 40 here. This means all our 450 entities will be split into batches, with each batch containing 40 items maximum. Corresponding to each batch, we create an instance of ObjectRelationalQueryTask, which is an implementation of the Callable interface.
public class ObjectRelationalQueryTask implements Callable{
private List list;
private int startIndex;
private int endIndex;
public ObjectRelationalQueryTask(List list,
int startIndex, int endIndex){
this.list = list;
this.startIndex = startIndex;
this.endIndex = endIndex;
}
public List call(){
ThreadLocalTxCounter.resetTxId();
CustomerDao customerDao = new CustomerDaoJdbc();
List customers =
customerDao.getCustomersSortById(startIndex, endIndex);
//We will do business processing here :)
list.addAll(customers);
//We have our own mechanism to return results.
//Hence returning null.
return null;
}
}
All such tasks are then put in a collection and then supplied to the thread pool for execution. The invokeAll method executes the given tasks, returning a list of Futures holding their status and results in the same sequential order as produced by the iterator for the given task collection when they all complete. Last but not least, the sample prints out the results of the processing into the console. You may need to enable logging at the DEBUG level to view the results in the console.To build and run the sample, first you need to download QueryBySliceParallelExecuteAndJoinSrc.zip, available in the Resources section, and follow these steps:
ant there, which will build the sample:
cd QueryBySliceParallelExecuteAndJoinSrc antant run.Keep watching the console. It will look like the one shown in Figure 4.
Figure 4. Run the sample (click on thumbnail to view full-sized image)
To understand the dynamics of the sample run, let us go over the major notable aspects with reference to Figure 4:
pool-1-thread-1 to pool-1-thread-7. Each batch is processed by any one thread fully, in a single transaction, correlated with TX ID. In every transaction we log once every time we complete processing 15 items. Hence in processing 40 items in a batch, we log three times. Once a thread processes a batch fully (40 items), the thread will be made available by returning it the pool; the thread will be re-allocated if any more batches are waiting to be processed. A single thread may be thus reused for many batches. This means a single thread may log more than one TX ID, in multiple processing or transaction contexts. (By "transactions" we don't mean ACID transactions, but instead just mean a processing unit of work.) To mock some "heavy processing" happening on the tasks, each thread sleeps for some random amount of time after processing every 15 items. This helps us to visualize many threads working in parallel in the console, executing transactions. If you want to make sure that all items are processed and the responses are available, enable logging at the DEBUG level and re-run the sample. This will demonstrate that we can in fact "join" the results of multiple threads.You may also re-run the sample by using the no-argument constructor of LinkedBlockingQueue. When you use this constructor, it creates a LinkedBlockingQueue with a capacity of Integer.MAX_VALUE (i.e., unlimited capacity for most practical cases). If so, the console will look like that shown in Figure 5 instead.
Figure 5. Run the sample with unlimited capacity queue (click on thumbnail to view full-sized image)
What has happened here? When you use the unlimited capacity queue, you are in fact using unbounded queues. In this case, if you submit a new task when all corePoolSize threads are busy, the new tasks will wait. Here the number of threads will not exceed corePoolSize.
Again, as an exercise for the reader, you can change the various configuration parameters of the thread pool and understand the effect by re-running the sample.
Threads are powerful constructs available for software programming, which we have been leveraging even from the earliest versions of Java. But many times programs may not run efficiently due to non-optimized usage of system resources, including threads. Thread pools are yet another powerful tool, which even a less-experienced programmer can use to write optimized parallel programming code. This article showed you how you can apply parallelism in middle-tier Java programming and at the same time apply split-and-query mechanisms at the database level, so that we extend optimization patterns across multiple tiers in our application.
ROWNUM and Limiting Results in Oracle" "Distribute, Detach, and Parallelize in Tomcat" "The Secret of Java Thread Pools" "Programming Java Threads in the Real World, Part 1" "Thread Pools and Work Queues" JSR 166: Concurrency Utilities "Develop High Performance J2EE Threads with WebSphere Application Server" Java Threads, Third Edition (O'Reilly)Binildas Christudas currently works as a Principal Architect for Infosys Technologies, where he heads the J2EE Architects group servicing Communications Service Provider clients.

View all java.net Articles.
Showing messages 1 through 6 of 6.
You are viewing a mobilized version of this site...
View original page here