Antimatroid, The

thoughts on computer science, electronics, mathematics

Posts Tagged ‘Parallelization

GPU Accelerated Expectation Maximization for Gaussian Mixture Models using CUDA

leave a comment »

C, CUDA, and Python source code available on GitHub

Introduction

Gaussian Mixture Models [1, 435-439] offer a simple way to capture complex densities by employing a linear combination of K multivariate normal distributions, each with their own mean, covariance, and mixture coefficient, \pi_{k}, s.t. \sum_{k} \pi_{k} = 1.

\displaystyle p( x ) = \sum_{k = 1}^{K} \pi_{k} p(x \lvert \mu_k, \Sigma_k)


Of practical interest is the learning of the number of components and the values of the parameters. Evaluation criteria, such as Akaike and Bayesian, can be used to identify the number of components, or non-parametric models like Dirichlet processes can be used to avoid the matter all together. We won’t cover these techniques here, but will instead focus on finding the values of the parameters given sufficient training data using the Expectation-Maximization algorithm [3], and doing so efficiently on the GPU. Technical considerations will be discussed and the work will conclude with an empirical evaluation of sequential and parallel implementations for the CPU, and a massively parallel implementation for the GPU for varying numbers of components, points, and point dimensions.

Multivariate Normal Distribution

The multivariate normal distribution With mean, \mu \in \mathbb{R}^d, d \in \mathbb{N}_1, and symmetric, positive definite covariance, \Sigma \in \mathbb{R}^{d \times d}, is given by:

\displaystyle 	p( x \lvert \mu, \Sigma ) = \frac{1}{\sqrt{(2\pi)^d \lvert \Sigma \rvert }} \exp{\left( - (x - \mu)^{T} \Sigma^{-} (x - \mu) / 2 \right)}


From a computational perspective, we will be interested in evaluating the density for N values. Thus, a naive implementation would be bounded by \mathcal{O}\left(N d^4\right) due to the matrix determinate in the normalization term. We can improve upon this by computing the Cholesky factorization, \Sigma = L L^T, where L is a lower triangular matrix [6, 157-158]. The factorization requires \mathcal{O} \left ( d^3 \right ) time and computing the determinate becomes \mathcal{O} \left (d \right) by taking advantage of the fact that \det\left(L L^T\right) = \det(L)^2 = \prod_i L_{i,i}^2. Further, we can precompute the factorization and normalization factor for a given parameterization which leaves us with complexity of the Mahalanobis distance given by the quadratic form in the exponential. Naive computation requires one perform two vector matrix operations and find the inverse of the covariance matrix with worst case behavior \mathcal{O} \left (d^3\right). Leveraging the Cholesky factorization, we’ll end up solving a series of triangular systems by forward and backward substitution in \mathcal{O} \left (d^2\right) and completing an inner product in \mathcal{O} \left (d\right) as given by L z = x - \mu, L^T z = y, and (x-\mu)^T y. Thus, our pre-initialization time is \mathcal{O} \left (d^3 \right) and density determination given by \mathcal{O} \left (N d^2 \right). Further optimizations are possible by considering special diagonal cases of the covariance matrix, such as the isotropic, \Sigma = \sigma I, and non-isotropic, \Sigma_{k,k} = \sigma_k, configurations. For robustness, we’ll stick with the full covariance.

\displaystyle \log p( x \lvert \mu, \Sigma ) = - \frac{1}{2} \left( d \log 2\pi + \log \lvert \Sigma \rvert \right ) - \frac{1}{2} (x - \mu)^{T} \Sigma^{-} (x - \mu)


To avoid numerical issues such as overflow and underflow, we’re going to consider \log p(x \lvert \mu, \Sigma) throughout the remainder of the work. For estimates of the covariance matrix, we will want more samples than the dimension of the data to avoid a singular covariance matrix [4]. Even with this criteria satisfied, it may still be possible to produce a singular matrix if some of the data are collinear and span a subspace of \mathbb{R}^d.

Expectation Maximization

From an unsupervised learning point of view, GMMs can be seen as a generalization of k-means allowing for partial assignment of points to multiple classes. A possible classifier is given by k^{*} = \arg\max_k \, \log \pi_{k} + \log p(x \lvert \mu_k, \Sigma_k). Alternatively, multiple components can be used to represent a single class and we argmax over the corresponding subset sums. The utility of of GMMs goes beyond classification, and can be used for regression as well. The Expectation-Maximization (EM) algorithm will be used to find the parameters of of the model by starting with an initial guess for the parameters given by uniform mixing coefficients, means determined by the k-means algorithm, and spherical covariances for each component. Then, the algorithm iteratively computes probabilities given a fixed set of parameters, then updating those parameters by maximizing the log-likelihood of the data:

\displaystyle \mathcal{L} \left( \mathcal{D} \lvert \mu, \Sigma \right) = \sum_{n = 1}^{N} \log p(x_n) = \sum_{n=1}^{N} \log{ \left [ \sum_{k = 1}^{K} \pi_{k} p \left( x_n \lvert \mu_k, \Sigma_k \right ) \right ] }


Because we are dealing with exponents and logarithms, it’s very easy to end up with underflow and overflow situations, so we’ll continue the trend of working in log-space and also make use of the “log-sum-exp trick” to avoid these complications:

\displaystyle \log p( x ) = a + \log \left[ \sum_{k = 1}^{K} \exp{ \left( \log \pi_{k} + \log p(x \lvert \mu_k, \Sigma_k) - a \right ) } \right ]


Where the a term is the maximum exponential argument within a stated sum. Within the expectation stage of the algorithm we will compute the posterior distributions of the components conditioned on the training data (we omit the mixing coefficient since it cancels out in the maximization steps of \mu_k and \Sigma_k, and account for it explicitly in the update of \pi_k):

\displaystyle \gamma_{k, n} = \frac{ p \left ( x_n \lvert \mu_k, \Sigma_k \right ) }{ p(x) } \qquad \Gamma_k = \sum_{n=1}^{N} \gamma_{k, n}


\displaystyle \log \gamma_{k, n} =  \log p \left ( x_n \lvert \mu_k, \Sigma_k \right )  - \log p(x) \qquad \log \Gamma_k = a + \log \left [ \sum_{n=1}^{N} \exp{ \left( \log \gamma_{k, n} - a \right )} \right ]


The new parameters are resolved within the maximization step:

\displaystyle \pi_{k}^{(t+1)} = \frac{ \pi_{k}^{(t)} \Gamma_k }{ \sum_{i=1}^{K} \pi_{i}^{(t)} \Gamma_i } \qquad \log \pi_{k}^{(t+1)} = \log \pi_{k}^{(t)} + \log \Gamma_k - a - \log \left [ \sum_{i=1}^{K} \exp{ \left( \log \pi_{i}^{(t)} + \log \Gamma_i - a \right )} \right ]


\displaystyle \mu_k^{(t+1)} = \frac{ \sum_{n=1}^{N} x_n \gamma_{n, k} }{ \Gamma_k  } \qquad \mu_k^{(t+1)} = \frac{ \sum_{n=1}^{N} x_n \exp{ \log \gamma_{n, k} } }{ \exp{ \log \Gamma_k }  }


\displaystyle \Sigma_k^{(t+1)} = \frac{ \sum_{n=1}^{N} (x_n - \mu_k^{(t+1)}) (x_n - \mu_k^{(t+1)})^T \gamma_{n, k} }{ \Gamma_k  }


\displaystyle \Sigma_k^{(t+1)} = \frac{ \sum_{n=1}^{N} (x_n - \mu_k^{(t+1)}) (x_n - \mu_k^{(t+1)})^T \exp \log \gamma_{n, k} }{ \exp \log \Gamma_k  }


The algorithm continues back and forth between expectation and maximization stages until the change in log likelihood is less than some epsilon, or a maximum number of user specified iterations has elapsed.

Implementations

Sequential Per iteration complexity given by \mathcal{O}\left(2 K N d^2 + K N d + 2K + N + K d^3\right). We expect d \le K < N because too many dimensions leads to a lot of dead space and too many components results in overfitting of the data. Thus, the dominating term for sequential execution is given by \mathcal{O}\left(2 K N d^2 \right).

Parallel There are two natural data parallelisms that appear in the algorithm. The calculation of the \mathcal{L} and \gamma across points, while the probability densities and parameter updates have natural parallelisms across components. Each POSIX thread runs the full iterative algorithm with individual stages coordinated by barrier synchronization. Resulting complexity is given by \mathcal{O}\left(\frac{2}{P} d^2 K N \right) for work coordinated across P processors.

cudaFlow

Massively Parallel The parallel implementation can be taken and mapped over to the GPU with parallelism taken across points and components depending on the terms being computed. There are several types of parallelism that we will leverage under the CUDA programming model. For the calculation of \log p\left(x | \mu_k, \Sigma_k \right) we compute each point in parallel by forming a grid of one dimensional blocks, and use streams with event synchronization to carry out each component in parallel across the streaming multiprocessors. Calculation of the loglikelihood and \log \gamma_{n,k} is done by computing and storing \log p(x), then updating the storage for \log p\left(x|\mu_k,\Sigma_k\right), and then performing a parallel reduction over \log p(x) to produce the loglikelihood. Parallel reductions are a core tasks are implemented by first standardizing the input array of points to an supremum power of two, then reducing each block using shared memory, and applying a linear map to the memory so that successive block reductions can be applied. Several additional approaches are discussed in [5]. Once the loglikelihood is computed, the streams are synchronized with the host and the result is copied from the device back to the host. To compute \log \Gamma_k, \log \gamma_{n,k} is copied to a working memory and a maximum parallel reduction is performed. The resulting maximum is used in a separate exponential map for numerical stability when computing the parallel reduction of each component to yield \log \Gamma_k. Updates to the mean and covariances are performed by mapping each term to a working memory allocated for each component’s stream and executing a parallel reduction to yield the updated mean and covariance. Once all component streams have been synchronized, the mixture coefficients and Cholesky decompositions of the covariances is computed with a single kernel invocation parallel in the number of components.

The main design consideration was whether or not use streams. For larger numbers of components, this will result in improved runtime performance, however, it comes at the cost of increased memory usage which limits the size of problems an end user can study with the implementation. Because the primary design goal is performance, the increase in memory was favorable to using less memory and executing each component sequentially.

To optimize the runtime of the implementation nvprof along with the NVIDIA Visual Profiler was used to identify performance bottlenecks. The original implementation was a naive port of the parallel C code which required frequent memory transfers between host and device resulting in significant CUDA API overhead that dominated the runtime. By transferring and allocating memory on the device beforehand, this allowed the implementation to execute primarily on the GPU and eliminate the API overhead. The second primary optimization was using streams and events for parallelization of the component probability densities and parameter updates in the maximization step. In doing so, this allowed for a K fold reduction since the components calculations would be performed in parallel. The next optimization step was to streamline the parallel reductions by using block reductions against fast shared block memory minimizing the number of global memory writes instead of performing iterated reductions against sequential addressing that preformed global memory reads and writes for each point. The final optimization step was to used pinned host memory to enable zero-copy transfers from DRAM to the GPU over DMA.

Evaluation

To evaluate the implementations we need a way of generating GMMs and sampling data from the resulting distributions. To sample from a standard univariate normal distribution one can use The Box-Muller transform, Zigguart method, or Ratio-of-uniforms method [7]. The latter is used here due to its simplicity and efficiency. Sampling from the multivariate normal distribution can by done by sampling a standard normal vector \eta \sim \mathcal{N}(0 ,I_d) and computing \mu + \Sigma^{1/2} \eta where \Sigma^{1/2} can be computed by Eigendecomposition, \Sigma^{1/2} = Q \Delta^{1/2} Q^{-}, or Cholesky factorization, \Sigma = L L^T, \Sigma^{1/2} = L. The latter is used since it is more efficient. The GMM describes a generative process whereby we pick a component at random with probability given by its mixture coefficient and then sample the underlying \mathcal{N}(\mu_k, \Sigma_k) distribution, and perform this process for the desired number of points.

The matter of generating GMMs it more interesting. Here we draw \pi_i = X_i / \sum_{j} X_j for X_i \sim \mathcal{U}(0, 1), alternatively, one could draw \pi \sim \text{Dir}(\alpha). Means are drawn by \mu \sim \mathcal{N}(0, a I_d) with a > 1 so that means are relatively spread out in \mathbb{R}^{d}. The more exciting prospect is how to sample the covariance matrix. This is where the Wishart distribution, \Sigma \sim W(I_d, d, n) for n > d - 1, comes in handy. The Wishart distribution is a model of what the sample covariance matrix should look like given a series of n x_i \sim \mathcal{N}(0, I_d) vectors. Based on a \mathcal{O}\left(d^2\right) method by [8], [9] gives an equally efficient method for sampling \Sigma^{1/2} = L by letting L_{i,i} \sim \chi^2(n - i) and L_{i,j} \sim \mathcal{N}(0, 1) for 0 \le i < d and 0 \le j < i.

To evaluate the performance of the different implementations, the wall clock time taken to run the algorithm on a synthetic instance was measured by varying each of the N, K, and d parameters while holding the other two fixed. From an end user perspective wall clock time is preferable to the time the operating system actually devoted to the problem since wall clock time is more valuable. There will be variability in the results since each instance requires a different number of iterations for the log likelihood to converge. Tests were conducted on a Xeon 1245 v5 3.5 Ghz system with 32GB of memory and an NVIDIA GTX 1060 6GB graphics card with 1280 cores.

runtime

Since the parameter space is relatively large Figures 2-5 look at varying one parameter will fixing the others to demonstrate the relative merits of each approach. When the number of points dominates the CUDA approach tends to be 18x faster; the Parallel approach tends to be 3x faster when the dimension is high; and CUDA is suitable when the num of components is high giving a 20x improvement relative to the sequential approach. Thus, when dealing with suitably large datasets, the CUDA based implementation is preferable delivering superior runtime performance without sacrificing quality.

It is important to note that the results obtained from the CUDA solution may differ to those the sequential and parallel approaches. This is due to nondeterministic round off errors associated with executing parallel reductions compared to sequential reductions [2], and differences in the handling of floating point values on the GPU [10], notably, the presence of fused multiple add on NVIDIA GPUs which are more accurate than what is frequently implemented in CPU architectures. The following two synthetic data sets illustrate typical results of the three schemes:

example

Conclusion

This work demonstrated the utility of using NVIDIA GPUs to train Gaussian mixture models by the Expectation Maximization algorithm. Speedups as high as 20x were observed on synthetic datasets by varying the number of points, components, and data dimension while leaving the others fixed. It is believed that further speedups should be possible with additional passes, and the inclusion of metric data structures to limit which data is considered during calculations. Future work would pursue more memory efficient solutions on the GPU to allow for larger problem instance, and focus on providing higher level language bindings so that it can be better utilized in traditional data science toolchains.

References

  1. Bishop, C. M. Pattern recognition and machine learning. Springer, 2006.
  2. Collange, S., Defour, D., Graillat, S., and Lakymhuk, R. Numerical reproducibility for the parallel reduction on multi- and many-core architectures. Parallel Computing 49 (2015), 83-97.
  3. Dempster, A. P., Laird, N. M., and Rubin, D. B. Maximum likelihood from incomplete data via the eme algorithm. Journal of the royal statistical society. Series B (methodological) (1977), 1-38.
  4. Fan, J., Liao, Y., and Liu, H. An overview of the estimation of large covariance and precision matrices. The Econometrics Journal 19, (2016) C1-C32.
  5. Harris, M. Optimizing cuda. SC07: High Performance Computing with CUDA (2007).
  6. Kincaid, D., and Cheney, W. Numerical analysis: mathematics of scientific computing. 3 ed. Brooks/Cole, 2002.
  7. Kinderman, A. J., and Monahan, J. F. Computer generation of random variables using the ratio of uniform deviates. ACM Transactions on Mathematical Software (TOMS) 3, 3 (1977), 257-260.
  8. Odell, P., and Feiveson, A. A Numerical procedure to generate a sample covariance matrix. Journal of the American Statistical Association 61, 313 (1966), 199-203.
  9. Sawyer, S. Wishart distributions and inverse-wishart sampling. URL: http://www.math.wustl.edu/~sawyer/hmhandouts/Wishart.pdf (2007).
  10. Whitehead, N., and Fit-Florea, A. Precision and performance: Floating point and ieee 754 compliance for nvidia gpus. rn(A + B) 21., 1 (2011), 18749-19424.
Advertisements

Parallel Merge Sort in Java

with one comment

Introduction

This past November I was a pretty busy getting settled into a new job and trying to balance life’s other priorities. With a new job also came a new technology stack and while I’ll continue to do C# development in my free time, I’m going to be going back to doing Java development after a seven year hiatus. Before starting the new job, I decided to refresh my memory of the language’s finer details when it comes to generics and threading. So, I decided to implement something simple and settled on a parallel implementation of merge sort. This article is going to focus on making use of Java’s various features and evaluating the theoretical and empirical run time performance of the sequential and parallel versions of the algorithm.

Sequential Approach

Specification

Given a list of values, the list is sorted by employing a divide and conquer method that partitions the list into two (roughly) equal sized partitions, followed by recursively sorting each partition and then merging the two resulting sorted partitions into the final sorted list.

Pseudocode

\displaystyle   \textbf{MERGE}(X, Y)  \newline \indent L_X \leftarrow \textbf{LENGTH}(X)  \newline \indent L_Y \leftarrow \textbf{LENGTH}(Y)  \newline \indent L_Z \leftarrow L_X + L_Y  \newline \indent Z \leftarrow [L_Z]  \newline \indent i, j, k \leftarrow 0, 0, 0  \newline  \newline \indent \textbf{while} \quad k < L_Y    \newline \indent \indent \textbf{if} \quad i < L_X \land j \ge L_Y  \newline \indent \indent \indent \indent Z[k] \leftarrow X[i]  \newline \indent \indent \indent \indent i \leftarrow i + 1  \newline \indent \indent \textbf{else-if} \quad i \ge L_X \land j < L_Y  \newline \indent \indent \indent \indent Z[k] \leftarrow Y[j]  \newline \indent \indent \indent \indent j \leftarrow j + 1  \newline \indent \indent \textbf{else-if} \quad i < L_X \land j < L_Y  \newline \indent \indent \indent \textbf{if} \quad X[i] \le Y[j]  \newline \indent \indent \indent \indent Z[k] \leftarrow X[i]  \newline \indent \indent \indent \indent i \leftarrow i + 1  \newline \indent \indent \indent \textbf{else}   \newline \indent \indent \indent \indent Z[k] \leftarrow Y[j]  \newline \indent \indent \indent \indent j \leftarrow j + 1  \newline \indent \indent k \leftarrow k + 1  \newline  \newline \indent \textbf{return} \quad Z  \displaystyle   \textbf{MERGE-SORT}(X)  \newline \indent L \leftarrow \textbf{LENGTH}(X)  \newline \indent \textbf{if} \quad L \le 1  \newline \indent \indent \textbf{return} \quad X  \newline  \newline \indent \textbf{return} \quad \textbf{MERGE} (  \newline \indent \indent \textbf{MERGE-SORT} (   \newline \indent \indent \indent \textbf{PARTITION}(X, 0, \lfloor\ L / 2 \rfloor + L \mod 2)  \newline \indent \indent ),  \newline \indent \indent \textbf{MERGE-SORT}(   \newline \indent \indent \indent \textbf{PARTITION}(X, \lfloor\ L / 2 \rfloor + L \mod 2, \lfloor\ L / 2 \rfloor)  \newline \indent \indent )  \newline \indent )

\displaystyle   \textbf{PARTITION}(X, s, L)  \newline \indent Y \leftarrow [L]  \newline \indent k \leftarrow 0  \newline  \newline \indent \textbf{while} \quad k < L  \newline \indent \indent Y[k] \leftarrow X[s + k]  \newline \indent \indent k \leftarrow k + 1  \newline  \newline \indent \textbf{return} \quad Y

Time Complexity

In terms of time complexity, the algorithm is on the order of \mathcal{O}(n \log_2(n)). To show this, observe that the input size, n, is divided into to two equal parts, 2 T(n/2), followed by a merge operation, f(n). This leads to the recurrence relation given by \displaystyle T(n) =   \begin{cases}   1 & n \le 1 \\   2 T(n/2) + f(n) & n > 1   \end{cases}  . By induction, the recurrence relation is reduced to \displaystyle T(n) = 2^k T(n/2^k) + \sum_{m = 0}^{k-1} 2^n f \left ( \frac{n}{2^m} \right ). Observing that the merge function is on the order \mathcal{O}(n), i.e., f(n) = c n, then the expression reduces further to \displaystyle T(n) = 2^k T \left ( \frac{n}{2^k} \right ) + \sum_{m = 0}^{k-1} c n and \displaystyle T(n) = 2^k T \left ( \frac{n}{2^k} \right ) + c n k. As the number of subdivisions increases, eventually n will be reduced to 1. As such, let 1 = n/2^k which implies 2^k = n which implies k = \log_2(n), and thus T(n) = n T(1) + c n \log_2(n). Therefore, T(n) \subset \mathcal{O}(n \log_2 n) \quad \square

Implementation

In attempting to implement a generic version of merge sort there were a few matters that needed to be addressed. First, the type being sorted required an order relation to be specified so that the merge operation could take place. This is facilitated by restricting the type parameter T to Comparable<T>. Secondly, I had forgotten that you can’t initialize arrays of generics in Java like you can in C# [1]. To workaround this limitation, I settled on specifying the desired operations over implementations of the List<T> interface. Finally, since the List<T> interface makes no guarantees that its implementations provide (near) constant time reading or writing of elements from the list, an additional generic parameter, L, was added so that only those implementations of the List<T> and RandomAccess [2] interfaces could use this implementation of merge sort. The rest of the implementation is a near facsimile of the pseudocode.

package com.wordpress.antimatroid;

import java.util.List;
import java.util.RandomAccess;

public interface IListOperation
    <T, L extends List<T> & RandomAccess> {
    
	L execute();
}
package com.wordpress.antimatroid;

import java.util.ArrayList;
import java.util.List;
import java.util.RandomAccess;

public class CopyListOperation
    <T, L extends List<T> & RandomAccess>
    implements IListOperation<T, L> {
    
	private final L source;
	private final int length, initialIndex;

	public CopyListOperation(L source, int length, int initialIndex) {
		if(source == null)
			throw new IllegalArgumentException("source must be non-null.");

		if(length < 0)
			throw new IllegalArgumentException(String.format(
                "length, %d, must be greater than or equal to zero.", length
            ));

		if(initialIndex < 0)
			throw new IllegalArgumentException(String.format(
                "initialIndex, %d, must be greater than or equal to zero.", initialIndex
            ));

		if(initialIndex + length > source.size())
			throw new IllegalArgumentException(String.format(
                "initialIndex, %d, + length, %d, must be less than or equal to source.size(), %d.",
                initialIndex, length, source.size()
            ));

		this.source = source;
		this.length = length;
		this.initialIndex = initialIndex;
	}

	@Override
	public L execute() {
		L destination = (L) new ArrayList<T>(length);
		for(int i = 0; i < length; i++)
			destination.add(i, source.get(initialIndex + i));
		return destination;
	}
}
package com.wordpress.antimatroid;

import java.util.ArrayList;
import java.util.List;
import java.util.RandomAccess;

public class MergeListOperation
    <T extends Comparable<T>, L extends List<T> & RandomAccess>
    implements IListOperation<T, L> {

	private final L a, b;
	
	public MergeListOperation(L a, L b) {
		if(a == null)
			throw new IllegalArgumentException("a must not be null.");
		
		if(b == null)
			throw new IllegalArgumentException("b must not be null.");
		
		this.a = a;
		this.b = b;
	}

	@Override
	public L execute() {
		int length = a.size() + b.size();
		L c = (L) new ArrayList<T>(length);
		
		int i = 0, j = 0;
		for(int k = 0; k < length; k++) {
			if(i < a.size() && j < b.size()) {
				if(a.get(i).compareTo(b.get(j)) <= 0) {
					c.add(k, a.get(i++));
				} else { 
					c.add(k, b.get(j++));
				}
			} else if (i < a.size() && j >= b.size()) {
				c.add(k, a.get(i++));
			} else if (i >= a.size() && j < b.size()) {
				c.add(k, b.get(j++));
			} else {
				break;
			}
		}
		
		return c;
	}
}
package com.wordpress.antimatroid;

import java.util.List;
import java.util.RandomAccess;

public class MergeSortListOperation <
    T extends Comparable<T>,
    L extends List<T> & RandomAccess
    > implements IListOperation<T, L> {

	private final L a;

	public MergeSortListOperation(L a) {
		if(a == null)
			throw new IllegalArgumentException("a must not be null.");

		this.a = a;
	}

	@Override
	public L execute() {
		if(a.size() <= 1)
			return a;
		
		CopyListOperation<T, L> leftPartition
                = new CopyListOperation<T, L>(a, (a.size() / 2) +  a.size() % 2, 0);
		CopyListOperation<T, L> rightPartition
                = new CopyListOperation<T, L>(a, (a.size() / 2), (a.size() / 2) +  a.size() % 2);
		
		MergeSortListOperation<T, L> leftSort
                = new MergeSortListOperation<T, L>(leftPartition.execute());
		MergeSortListOperation<T, L> rightSort
                = new MergeSortListOperation<T, L>(rightPartition.execute());
		
		MergeListOperation<T, L> merge
                = new MergeListOperation<T, L>(leftSort.execute(), rightSort.execute());

		return merge.execute();
	}
}

Run Time Analysis

Noting that the theoretical time complexity is \mathcal{O}(n \log_2 n), inputs of the form 2^k will yield a k 2^k curve. Taking the logarithm of which will give \log(k) + k. Observing that as k increases the linear term will dominate the expression. As a result, the curve should look near linear in logarithmic space with the exception of small values of k. Which means that conducting a linear least squares regression of the empirical run times in logarithmic space will yield a satisfactory approximation to the theoretical time complexity.

To verify that the implementation follows the theoretical time complexity, increasing values of k were used to generate lists containing 2^k random values. These lists were then sorted and the System.nanoTime() before and after values were used to determine the elapsed time. These values were collected and a total of 50 identical trails were conducted on an Intel Core i7-2630QM CPU @ 2.00 GHz based machine with 6.00 GB RAM.

As presented in the plot, the regressed linear model in logarithmic space yields a satisfactory theoretical curve whose relative error to the empirical curve diminishes to zero as the input size increases.

Parallel Approach

Specification

The parallel implementation operates under the premise that the divide portion of merge sort can be easily parallelized by sorting one partition on the present thread and sorting the other partition on a secondary thread. Once the secondary thread has completed, then the two threads join, and consequently, the two sorted lists are merged. To avoid copious thread creation, whenever the input size is less than a threshold, \tau, the sequential version of the algorithm is used.

This process can be easily visualized below where each left-hand branch is the originating thread processing the first partition, each right-hand branch is the secondary thread processing the second partition and the junction of those edges represents the consequent merge operation after the secondary thread as joined back in with the originating thread.

Time Complexity

The introduction of parallelism changes the original recurrence relation to the following:

T(N) = \begin{cases} 1 & n \le 1 \\ 2T(n/2) + f(n) & n \le \tau \\ \max{\left (T(n/2),T(n/2)\right )} + f(n) & n > \tau \end{cases}

Assuming, \tau = 1, and that there is no asymptotic difference in sorting the first and second partition, then the time complexity is on the order of \mathcal{O}(n). To see this, observe that the recurrence relation becomes T(N) = \begin{cases} 1 & n \le 1 \\ T(n/2) + f(n) & n > 1 \end{cases} under the presented assumtions. Following the same process of induction as in the sequential case, the recurrence relation reduces to \displaystyle T(n) = T \left ( \frac{n}{2^k} \right ) + \sum_{m=0}^{k-1} f \left ( \frac{n}{2^m} \right ) and is simplified further under the assumption f(n) = c n to \displaystyle T(n) = T \left ( \frac{n}{2^k} \right ) + c n \sum_{m=0}^{k-1} \frac{1}{2^m}. Observing that the sum is a finite geometric series leads to \displaystyle T(n) = T \left ( \frac{n}{2^k} \right ) + c n 2 (1 - \frac{1}{2^{k-1}}) and under the same reduction argument as before to T(n) = T(1) + c n 2 (1 - 2/n). Thus, the time complexity of the parallel merge sort specified is T(n) \subset \mathcal{O}(n) \quad \square

Assuming \tau = \infty, then the time complexity of the algorithm is still on the order \mathcal{O}(n \log_2 n). Thus, for various values of \tau \in [0, \infty) and n \ge 2, the time complexity is between \mathcal{O}(n \log_2 n) \le T(n) \le \mathcal{O}(n).

Implementation

In terms of parallelizing the sequential implementation, an addition interface, IThreadedListOperation was added to provide a BeginOperation, EndOperation asynchronous programming model found in the .net world. After looking around the Java world, I didn’t encounter a preferred idiom, so I went with what I knew.

As I mentioned in the sequential approach, the original data structures were going to be arrays which have a guarantee of providing thread safe reads, but not necessarily thread safe writes. To avoid the issue all together, I decided that the IListOperations should always return a new List<T> instance so that only one thread at a time would be reading or manipulating that memory. Since I knew my implementation would not be sharing IListOperations between threads, I decided not to gold plate the implementation with synchronization constructs. If in the future such ability were required, I would go back and modify the code accordingly.

For the parallel implementation I took advantage of the fact that method arguments are evaluated left-to-right [3] to save one some space, but if the specification ever changed, then it would be more appropriate to move the out the leftSort.execute() and rightSort.executeEnd() methods up a line to form a more explicit operation.

package com.wordpress.antimatroid;

import java.util.List;
import java.util.RandomAccess;

abstract public class IThreadedListOperation
    <T, L extends List<T> & RandomAccess>
    implements Runnable, IListOperation<T, L> {

	private Thread thread;
	
	public void executeBegin() {
		if(thread != null)
			throw new IllegalStateException();
		
		thread = new Thread(this);
		thread.start();
	}
	
	public L executeEnd() {
		if(thread == null)
			throw new IllegalStateException();
		
		try {
			thread.join();
		} catch (InterruptedException e) {

		}
		
		return getResult();
	}

	public L execute() {
		if(thread != null)
			throw new IllegalStateException();

		run();
		return getResult();
	}
	
	abstract protected L getResult();
}
package com.wordpress.antimatroid;

import java.util.List;
import java.util.RandomAccess;

public class MergeSortThreadedListOperation
    <T extends Comparable<T>, L extends List<T> & RandomAccess>
    extends IThreadedListOperation<T, L> {
    
	private final L a;
	private L b;

	private final int threshold;
	
	public MergeSortThreadedListOperation(L a) {
		this(a, 1024);
	}

	public MergeSortThreadedListOperation(L a, int threshold) {
		if(a == null)
			throw new IllegalArgumentException("a must be non-null.");

		if(threshold <= 0)
			throw new IllegalArgumentException("threshold must be greater than zero.");
		
		this.a = a;
		this.threshold = threshold;
	}
	
	@Override
	public void run() {
		if(a.size() <= 1) {
			b = a;
			return;
		}

		if(a.size() <= threshold) {
			MergeSortListOperation<T, L> mergeSort = new MergeSortListOperation<T, L>(a);
			b = mergeSort.execute();
			return;
		}
		
		CopyListOperation<T, L> leftPartition
                = new CopyListOperation<T, L>(a, (a.size() / 2) +  a.size() % 2, 0);
		MergeSortThreadedListOperation<T, L> leftSort
                = new MergeSortThreadedListOperation<T, L>(leftPartition.execute());
		
		CopyListOperation<T, L> rightPartition
                = new CopyListOperation<T, L>(a, (a.size() / 2), (a.size() / 2) +  a.size() % 2);
		MergeSortThreadedListOperation<T, L> rightSort
                = new MergeSortThreadedListOperation<T, L>(rightPartition.execute());

		rightSort.executeBegin();

		MergeListOperation<T, L> merge
                = new MergeListOperation<T, L>(leftSort.execute(), rightSort.executeEnd());

        b = merge.execute();
	}

	@Override
	protected L getResult() {
		return b;
	}
}

Run Time Analysis

Noting that the time complexity for the parallel approach is \mathcal{O}(n), a simple linear least squares regression of the empirical run times in normal space will yield a satisfactory approximation to the theoretical time complexity.

The trial methodology used in the sequential run time analysis is used once again to produce the following plot. Note that it begins at 2048 instead of 1. This was done so that only the parallel implementation was considered and not the sequential implementation when the input size is \le 1024.

As presented in the plot, the regressed linear model in logarithmic space yields a satisfactory theoretical curve whose relative error to the empirical curve diminishes to zero as the input size increases.

Threshold Selection

As a thought experiment, it makes sense that as the threshold approaches infinity, that there is no difference between the sequential implementation and parallel one. Likewise, as the threshold approaches one, then the number of threads being created becomes exceedingly large and as a result, places a higher cost on parallelizing the operation. Someplace in the middle ought to be an optimal threshold that yields better run time performance compared to the sequential implementation and a pure parallel implementation. So a fixed input size should produce a convex curve as a function of the threshold and hence have a global minimum.

Conducting a similar set of trials as the ones conducted under the analysis of the sequential run time give a fully parallel and sequential curve which to evaluate where the optimal threshold resides. As the plot depicts, as the threshold approaches one, there is an increase in the processing taking the form of a convex curve. As the threshold exceeds the input size, then the sequential approach dominates. By conducting a Paired T-Test against the means of the two curves at each input size, 1024 was determined to be the optimal threshold based on the hardware used to conduct the trials. As the input size grows, it is evident that for thresholds less than 1024, the sequential approach requires less time and afterwards, the parallel approach is favorable.

Conclusion

In comparing the sequential and parallel implementations it was observed that the specified parallel implementation produced as much as a 2.65 factor improvement over the specified sequential implementation for megabyte sized lists.

Larger sized lists exhibited a declining improvement factor. It is presumed that as the input size grows that the amount of memory being created is causing excessive paging and as a result increasing the total run time and consequently reducing the improvement factor. To get around this limitation, the algorithm would need to utilize an in-place approach and appropriate synchronization constructs put into place to guarantee thread safety.

From a theoretical point of view, the improvement factor is the ratio of the run time of the sequential implementation to the parallel implementation. Using the time complexities presented, \displaystyle S = \frac{n \log_2 n}{n}. Taking the limit as the input size grows to infinity gives \displaystyle \lim_{n \to \infty} \log_2 n = \infty. So if there is any upper bound to the improvement factor it should be purely technical.

Footnotes

[1] This design decision is discussed in §4.7 of the Java Language Specification (3rd Edition) on reifiable types.

[2] The only two java.util classes providing this guarantee are ArrayList and Vector. Both of which implement the interface RandomAccess which is intended indicate that the class provides the (near) constant reading and writing of elements.

[3] The left-to-right order of operations is specified by §15.7.4 of the Java Language Specification (3rd Edition). Also worth noting the specification recommends against the practice of relying on this convention however in §15.7:

… It is recommended that code not rely crucially on this specification. Code is usually clearer when each expression contains at most one side effect, as its outermost operation, and when code does not depend on exactly which exception arises as a consequence of the left-to-right evaluation of expressions.

Written by lewellen

2012-12-01 at 8:00 am