Extending Parallelism in aCollaborative Filtering Prediction System with OpenMPI[1]
Bryan Mesich, Matthew Piehl, William Perrizo, Greg Wettstein, Prakash Ranganathan[2]
Department of Computer Science
North Dakota State University
Abstract
The Collaborative Filtering Prediction System (CFPS), developed at North Dakota State University by Dr. William Perrizo with assistance from Dr. Greg Wettstein, was designed as a response to the Netflix Prize announcement in October of 2006. The goal of the competition was to develop a collaborative filtering algorithm that could be used to predict user ratings for movies based on a users' previous ratings. A $1,000,000 prize would then be awarded to the team who could surpass Netflix's own Cinematch algorithm by a margin of 10%. As the title suggests, an implemention with OpenMPI was developed with the notion that we could increase performance by making more processors available to the application in a distributed memory environment.
This paper is divided into the following sections:
Section 1: Overall architectural overview of the original Collaborative Filtering Prediction System (CFPS), Section 2: Area of improvement, Section 3: Architecture overview of modified CFPS, Section 4: Performance Benchmarks and section 5: Reflection
1 INTRODUCTION
Driven by data and new demands of emerging applications, computing architectures keep on improving with the support of silicon technology scaling. In recent days, how to support the theoretic peak performance of the powerful multi-core processors is the core key to high performance computing. Exposing and utilizing parallelism in different levels rely on the efforts of architecture designer and programmer. Although great efforts has been made to modeling underline hardware architecture into special software platform, the gap between the ability of parallel hardware and the ability of software to use the parallelism continues expanding. We focus on expanding parallelism to recommender system application. Recommender systems provide users with personalized suggestions for products or services. These systems often rely on Collaborating Filtering (CF), where past transactions are analyzed in order to establish connections between users and products. The most common approach to CF is based on neighborhood models, which is based on similarities between products or users [1]][2][3][4]. Two major problems that most CF approaches have to
contend with are scalability and sparseness of the user profiles.To deal with huge user and movie data that
expand continuously in a prediction system, an effective algorithm should be designed to 1) run on thousands of parallel machines for sharing storage and speeding up computation, 2) perform incremental retraining and updates for attaining online performance, and 3) fuse information of multiple sources for alleviating information sparseness. The goal of our research is to focus on the first part which can allow and encourage theexecution of parallel-distributed programs that adaptto changes and thereby increase the performance of the system in a collaborative filtering recommendation system.
1.1Overview Architecture
CFPSdeveloped at NDSU ran on a single SMP system, although it was later used in a distributed memory environment.Before making a prediction run, a configuration file and an input file are required to run the application. The configuration file contains settings the application will use to predict user ratings. The input file contains numeric movie and user ids that the application will make predictions on. A sample input file might look like the following shown in figure 1. Movie ids are denoted with an ending
colon and all others are user ids. User ids directly following a movie id are associated to that movie and end when a new movie id is defined.
For example, in figure 1, movie 12641 has 4 users associated to it and movie 12502 has 2 users associated to it. Multiple movies and their corresponding user associations can be defined in a single input file.
Figure 1: Movies and User association
After a configuration file and input file have been created, the program can make a prediction run. The prediction application takes advantage of parallelism by forking n number for processes, each one predicting user ratings for a given movie. The variable n can be defined by the user at run time, but it is usually set to the number of processors (or cores) available to the system. In our case, we were running on a dual quad-core penryn based system, so n would be set to 8 to run with 8 processors. The forked processes also share the user and movie Ptrees that get loaded on start-up, which when loaded consume about 8 GB of memory. The concept of the prediction system using shared memory is the one that we missed initially.
When a child process has finished making predictions, it returns to the parent process and another child process is forked. The new process will then predict user ratings for a new movie. This continues until the end of the input file has been reached, with a maximum of n number of movies being predicted upon at any given time. Shown in figure 2 is pseudo code that depicts the basic flow of mpp-mpred.C. This is also the the main entry point for the prediction application. The distributed memory environment utilized to run the prediction application consisted of 32 nodes, each penryn based with a total of 256 processors (32 nodes * 8 processors). In order to run the prediction application, a submit script called mpp-submit was used to allocate resources on the cluster usinga Simple Linux Utility for Resource Management (SLURM). SLURM is an open source tool for Linux clusters to perform Cluster management and job scheduling tasks. As a cluster resource manager, SLURM has three key functions. First, it allocates exclusive and/or non-exclusive access to resources (computer nodes) to users for some duration of time so they can perform work. Second, it provides a framework for starting, executing, and monitoring work (normally a parallel job) on the set of allocated nodes. Finally, it arbitrates conflicting requires for resources by managing a queue of pending work. The submit script would allocate one node per job. Each time a job is allocated, a different input or configuration file (or both) would be provided to the prediction application.
An output file is generated for each movie, which includes the predicted user ratings for a movie. The individual output files are then written to local disk in a scratch directory. After all the predictions have been made, the output files are combined,RMSE is calculated, and a single file containing the results is written to the user specified output directory. Figure 3 shows a simplistic view of the overall architecture. Note that each job is associated with one node and each job can take any number of configuration and input file variations. A total of 30 jobs can run simultaneously, since there are 30 nodes available on a SLURM production partition.
main {
load_ptrees()
loop (input_file) {
user_list = get_users(movie);
if ( processes < n ) {
fork()
Predict(movie, user_list,..)
exit()
}
else {
wait()
}
}
}
Figure 2: Psuedo code for mpp-mpred.C
Figure: 3 CFPS architecture
2. Area of Improvement
After reviewing the existing code base, we noticed the benefits that OpenMPI could bring to the table when large input files were used. By using MPI, we could remove the 8 processor limitation by utilizing processors from other nodes in the cluster (see figure 8). We decided to approach the problem by utilizing the commonly used master-slave configuration, as shown in figure 4. When using MPI, each node (orprocess) is assigned arank. In this case, rank0 is considered theroot/master process andall others are considered as subordinate/slave processes. The master in this case would control a job queue and sends jobs off to the slaves as needed. Depending on how a user allocates resources on the cluster, rank n can be considered as a physical node, or a single process on a physical node. Typically the -n or -N switch is used to distinguish this when allocating resources with SLURM. Our initial redesign had resources being allocated at the process level, which in the end did not work for us due to the shared memory requirements. To better illustrate this point, figure 5 contains pseudo code for main() in mpp-mpred.C. In our case we are allocating resources with the -n switch, which is used for allocating n number of processes. The 'mpirun' command is then called to execute mpp-mpred. An important concept to understand here is that main gets executed by all the participating processors. Since we have 8 processors per physical node, main() can be executed up to 8 times on each node. Each process that is participating is automatically assigned a unique rank when the MPI_Init() function is called. We then use the if statement shown in figure 5 to decide if the do_master() or do_slave() function is called. This decision is only made once per execution.
Figure 4: Master Slave Configuration
main {
MPI_Init()
MPI_Com_rank(..., &rank)
if ( rank == 0) {
do_master()
} else {
do_slave()
}
MPI_Finalize
}
Figure 5: pseudo code for main() in mpp-mpred.C.
while ( i < total_work ) {
MPI_Recv(JOB_TAG);
if ( ++i < total_work ) {
MPI_Send(work, WORK_TAG, slave_rank,..)
} else {
MPI_Send(DONE_TAG, slave_rank,...)
}
}
return
Figure 6: pseudo code for do_master function
Figure 6 shows pseudo code for what the typical do_master() function contains. As you can see, a slave executes main and falls into the else statement shown in figure 5. The rank 0 process receives the request for work and then decides to send work, or to finish if there is no more work available. The slave function is shown in figure 7.
while (1) {
MPI_Send(JOB_TAG, rank_0)
MPI_Probe()
if (TAG == WORK_TAG) {
MPI_Recv(work, WORK_TAG)
do_work(work)
} else if ( TAG == DONE_TAG ) {
MPI_Recv(DONE_TAG)
break
}
return
}
Figure 7: pseudo code for Slave function
As shown by the pseudo code, the slave process loops in a while loop as it processes work sent to it by the root node. The while loop is broken once the root node indicates that there is no more work to be done. We had initially planned on removing the process forking the original code base implemented in favor of MPI. However, by allocating resources on a per-process basis, we could not take advantage of shared memory. Each process we allocated would need to load local user and movie Ptrees separately before any predicting could take place. This limited us to only being able to run 3 -4 processes per physical node due to wasted memory usage. The solution to our problem was to use a hybrid model where MPI and process forking were used in conjunction. MPI gave us access to more processors and process forking on each physical node allowed us implement shared memory.
3. Implementation
Even though our first implementation attempt failed, it was relatively easy to rebound from since the MPI architecture really didn't change much. Instead of allocating resources at the process level, we allocate resources at the node level as shown in Figure 8. This way each node executes main() (mpp-mpred) only once.
Figure 8: Architecture allocating resources at node level
Processes will then be forked on each slave node, just as was done in the original version (we actually reused the old code). We still use MPI for job allocation between the master and slave nodes. Figure 9 shows the modified pseudo code for the do_slave() function. The major difference here is the process forking. The slaves continue to request jobs until the maximum number of simultaneous processes have been reached. When a process finishes, the node
load_ptrees()
while ( have_work ) {
if ( n > max_processes ) {
wait_for _exit()
} else {
MPI_Probe()
if ( TAG == JOB_TAG ) {
MPI_Recv(JOB_TAG)
fork()
do_work(work)
} else {
MPI_Recv(DONE_TAG)
wait_for_everyone_to_finish()
break
}
}
}
return
Figure 9: Modified code for Master function with process forking
requests a new job from the master node. If work is available, a new process is forked. If the master node indicates its time to finish, the slave waits for the remaining processes to finish before exiting. This is similar to the psuedo code shown in figure 2. Also note the load_ptrees() function above the while loop. In our implementation, the master node does all the command line parsing and then passes this information to the slaves via MPI. The slaves then use this information to do some required initializing before the Ptrees can be loaded. There are a few downfalls to using this architecture, the most important being the unused processes on the master node. Only one process is utilized on the master node, which is used for job management between it and the slaves. Ideally we would want to use the remaining processes , 7 in our case, to do slave work. Our initial approach to this problem was to fork processes on the master node, but the application would need to check 2 separate job queues. This is a problem because the master process does a blocking MPI_Probe() when waiting for job requests to come in from the slaves.
We could have implemented a non-blocking probe with MPI_IProbe(), but to do this eloquently while watching the local job queue was past our skill set. We decided not to implement slave processing on the master node due to the limited time we had remaining in the semester to finish the project. I was however interested in how to implement a solution for this problem. Further research pointed me to use threading in conjunction with MPI_Init_thread(). This modified version of MPI_init() allows a user to select a desired level of threading support, allowing the threads to directly interface with the MPI_Comm_world communicator. MPI_Send() and MPI_Recv() functions work just as if they were called from the parent. This is big win for us, since we can dispatch jobs from the same queue over MPI. This architecture is depicted in figure 10. Note the addition on the green rectangles representing forked processes on the slave and the master node sharing resources with a slave. It is important to use standardized message passing tags in this environment since the slave threads on the master node will be able to receive massages sent from other slaves intended for the master process.
In order to compile an OpenMPI program, special compilers called mpicc or mpiCC are used in place of a standard compiler. The mpicc/mpiCC compilers are really just pgcc/pgCC under the hood. TheMPI specific compilers set library paths to the appropriate MPI header files and set a few compiler switches. To get the prediction application to compile with MPI support, the top-level makefile in the project directory was modified to use mpicc/mpiCC.
Figure 10: Job dispatch over MPI
The submission script, mpp-submit, was heavily modified to work with our hybrid model. Additional shell scripts were also created for pre and post processing tasks, which include:
•mpp-scratch – Create scratch directories
•mpp-destroy – Delete scratch directories when finished
•mpp-gather–Send output files to common NFS directory
The mpp-gather script is an important addition because mpp-glue can not combine the output files when they are spread across the slaves nodes in scratch directories.
3.1.Benchmarking
We also calculated total run time with mpp-submit by calculating the time difference from start to end (including post-processing). Figure 11 shows run times for the different configurations we submitted with. The red and blue bars represent our modified prediction code with 8 and 12 processes forked on the slaves. The green and yellow bars show the original non-MPI code with the same parameters.
We seem to be getting diminishing returns after 3 nodes are allocated to a job. We were under the assumption that we should see better performance with each additional node since we can run 8 additional predictions. To investigate this, we ran 'top' on the slave nodes while they were running a job. We were surprised to see only 4 to 5 processes were being forked simultaneously. This is obviously non-efficient, but we are not sure where the contention is located. Our guess is that MPI has certain amount of overhead we have not accounted for or that themaster node can't distribute jobs fast enough to the slaves.
Figure: 11, Run times MPI vs Hybrid
The next step in finding the contention would be to profile the program in hopes of finding a bottleneck. The additional overhead MPI has is apparent when looking at the benchmarking for a single allocated node. The non-MPI code is clearly the winner when only one node is utilized.We also noted the effect the kernel page caching can have when loading Ptrees. If the Ptrees had been loaded by a previous job, the time to load Ptrees on a subsequent job was cut from 3 – 4 minutes to 15 to 20 seconds.
- Conclusions
As we look back at our project, we a noticed strange race conditions that are appearing when running with large node allocations (6 or more). The problem appears to be with the master node sending INIT_TAG messages to the slave nodes after they have finished their initialization. We could easily remedy the problem by having the slaves parse the command line arguments rather then having the master do this form them. We then remove the INIT_TAG messages that are being sent from the master to the slaves in favor of a simpler architecture. The reason we went down this path in the first place was to remove unneeded Ptree loading on the master node. However, after investigating the possibility of threading slave jobs on themaster node, the Ptrees would need to be loaded anyway.
- References
[1] G. Adomavicius and A. Tuzhilin, “Towards the Next
Generation of Recommender Systems: A Survey of the
State-of-the-Art and Possible Extensions”, IEEE
Transactions on Knowledge and Data Engineering 17