Thursday, August 23, 2012

A server job dispatcher using SQL in C

OK, imagine this: you would like to execute some jobs on your server, but you don't want a fancy job dispatcher. You just want to put some parameters into a SQL table and have the table updated when the job is done. Perhaps you have trouble imagining that. However, that was my fantasy while writing a webapp in codeigniter, a lovely php framework. I had some statistics to calculate and I didn't want to calculate them in codeigniter, so I wrote some R scripts. The scripts are time consuming, so you can't just call them from the server process. If you do, the webpage will stop loading and wait for the scripts to finish. Well, just run them in the background and update the page with javascript when the scripts are finished, right? Not if there are one hundred people using the server at once. You need to control the number of simultaneous jobs. Hence the need for a job dispatcher.



I thought I came up with a clever solution so I thought I'd share it. Let me first say, that part of the reason I chose this solution is because I was interested in practicing my C. The particular features of C I'm playing with are: signalling, forking, and SQL. So here is my list of what this program does:

  1. Creates a deamon 
  2. Deamon periodically monitors SQL table 
  3. When something changes, it exectues a command 
 Let's look at part 1, which I learned from here. The idea in part 1 is to create a daemon to make the program its own session, so that when you close the terminal window the program won't exit. Here's the minimum code to accomplish this:

/* Our process ID and Session ID */
pid_t pid, sid;
/* Fork off the parent process */
pid = fork();
/* If we got a good PID, then
we can exit the parent process. */
if (pid > 0) {
exit(0); /* No problems, just the parent leaving.*/
}
/* Create a new SID for the child process */
sid = setsid();
view raw gistfile1.c hosted with ❤ by GitHub
This first bit of code forks the program. We basically cloned our code, and now two copies are running from the same point in the code. The only difference between the two programs is that pid is 0 for the new program and greater than 0 for the old one. We use this to elminate the original program, making our copy have no parent process. If it had a parent, then if the parent process exits so would the child process. Now that we killed the child's parent, we need to move it to its own session so that it won't exit if the session is exited. That is what the last line of code accomplishes. In the full program below, there are some extra bits to close the stdin/stdout and open a system log file.

Step 2 means we need to monitor the SQL table for changes. For now, assume our table has two columns: a filename column and a dispatch column. When we wish to execute a job, we make the dispatch column NULL. That is the signal to the daemon that it should execute its job on that row, where the filename is a parameter to the job. Here is the minimum code to accomplish this:
/*Structure used for SQL queries*/
typedef struct database_s {
char server[100];
char user[25];
char password[50];
char database[100];
char table[100];
} database;
database* db; = //.....Set up your database
void wait_for_change() {
MYSQL *conn;
MYSQL_RES *res;
MYSQL_ROW row;
unsigned int do_dispatch = 0;
char sql_query[100];
/* Connect to the sql server*/
conn = mysql_init(NULL);
/* Connect to database */
if (!mysql_real_connect(conn, db->server,
db->user, db->password, db->database, 0, NULL, 0)) {
mysql_close(conn);
exit(EXIT_FAILURE);
}
/* Query last row job_id */
sprintf(sql_query, "SELECT count(job_id) FROM %s WHERE dispatch is NULL", db->table);
if (mysql_query(conn, sql_query)) {
exit(EXIT_FAILURE);
}
res = mysql_use_result(conn);
while ((row = mysql_fetch_row(res)) != NULL) {
/*was the count greater than 0?*/
if(atoi(row[0]) > 0) {
do_dispatch = 1;
}
}
/* close connection */
mysql_free_result(res);
mysql_close(conn);
/* Do dispatches if necessary */
if(do_dispatch) {
//Execute job
// ...
}
}
view raw gistfile1.c hosted with ❤ by GitHub
The first section of code sets up the variables we need to interact with a MySQL database in C, including things like username, database name, etc. The next section of code connects to the database, with some error checking. Next, we preform a query to determine if there is a column with a NULL in the dispatch column. We check if our query had any results, and if so we know it's time to execute the job again.

The next thing we need to do is call this code from our daemon. The easiest way is to check every so many seconds. This may be accomplished with this code:
void wait_for_change() {
while (1) {
check_for_change();
sleep(10 * 1000); /* sleep for 10 seconds*/
}
}
view raw gistfile1.c hosted with ❤ by GitHub
However, we may wish to instead signal our program. This is why I wrote the program in C, in fact, to play with signals. In Linux, one program may signal another. If data is uploaded to my server, for example, I want to tell my daemon right away and can have my server signal it. We must simply tell the operating system what to do when our program is signaled. It is much simpler than it sounds:
signal(SIGALRM, wait_for_change);


This code says that when we are signalled, we call the wait for change method. Notice that if we were waiting, we would go back to the top of the loop and call check_for_change(). Thus, we will immediately check for a change. To signal the program, we may use this command from either the terminal or another program:
pkill -ALRM [program name]
There is one slight problem, however. Signals can come at any time in our code. You can imagine that it would be bad if we were in the middle of querying the SQL database or executing a job and we went straight to the check_for_change code. We would leave open files, open connections, and other badness. So we "lock" and "unlock" our code when it is important that we aren't interrupted by a signal. Here are these two method:
void lock() {
sigset_t st;
sigemptyset (&st);
sigaddset(&st, SIGALRM);
sigprocmask(SIG_BLOCK, &st, NULL);
}
void unlock() {
sigset_t st;
sigemptyset (&st);
sigaddset(&st, SIGALRM);
sigprocmask(SIG_UNBLOCK, &st, NULL);
}
view raw gistfile1.c hosted with ❤ by GitHub
They are relatively simple; they just say that we're blocking the SIGALRM signal when locked and unblock it when unlocked. So, when we are about to do something delicate in the code, we call lock() and when finished we call unlock().
Finally, we need to set-up a job manager. Basically, we want to allow only a finite number of jobs to execute at any given time. I won't go into too much detail of the code, but here's the simple explanation of how it works:
  1. If the counter for the number of jobs is greater than 0, fork the program like we did for the deamon but without creating a new session. We decrement the number of jobs we can execute.
  2. When a child finishes, it will signal the daemon. The daemon records that a child finished.

In theory, the child signals (step 2) could be used to conut the number of children processes that have finished. However, I've found practically that this doesn't always work. If a child crashes, somehow the parent isn't always signaled. Maybe someone more knowledgeable can leave a comment about that. I instead have the children construct empty files when they start and delete them when finished. Then the parent program just counts the number of these files to get the number of running jobs. These files are also useful for debugging information.
That's basically it. Here now is the complete program. It contains locks and unlocks, much more error handling, and logging compared to what we've seen before.
#include <sys/types.h>
#include <sys/stat.h>
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <errno.h>
#include <unistd.h>
#include <syslog.h>
#include <string.h>
#include <mysql.h>
#include <getopt.h>
#include <signal.h>
#define SLEEP 500
/*
* See the help for more information on what this does.
*
*
*/
/*forks and executes the job*/
int doDispatch(const unsigned int job_id, const char* cmd);
/*Prints help information*/
void print_help();
/*Is called once it is determined a job must be dispatched. Gathers the information from the SQL table to pass to the new job*/
void prepareJob();
/*Check for database changes*/
void check_for_change();
/*Waiting method, allowing the program to sleep*/
void wait_for_change();
/*Lock the daemon so that it may not be interrupted by signals.*/
void lock(); //do not accept sigalrms
/*Unlocks it*/
void unlock();
/*Add up the number of spawned processes.*/
int tally_children();
/*Add a new child pid to the list of spawned processes*/
void add_cpid(pid_t cpid);
/*Wait for a process to finish to enable cleaning up child*/
void clean_up_child();
/*Structure used for SQL queries*/
typedef struct database_s {
char server[100];
char user[25];
char password[50];
char database[100];
char table[100];
} database;
/* Declare global variables so when the process is signalled, it has access to its state*/
database* db;
char workdir[100];
pid_t* cpids;
int running_cpids;
unsigned int max_proc;
char cmd[150];
void print_help() {
printf("Usage: rdaemon --workdir=[workdir] --server=[localhost] --user=[root] --database=[mysql] --table=[table name] --cmd=[command to execute] --pool=[max number of processes]\n");
printf("This daemon minitors the given table for changes. When it detects a change, by moniting\n");
printf("the last row, it will execute [cmd] on all new rows. It determines\n");
printf("if a job has been dispatch (a row is new) by getting and setting the column \n");
printf("\"dispatch\". \"dispatch\" should be a TIME field and \"job_id\", should be an \n");
printf("INTEGER. [cmd] should take a single argument, the job_id.\n");
}
int main(int argc, char* argv[]) {
//process the command line arguments
if(argc != 8) {
print_help();
exit(EXIT_FAILURE);
}
db = (database*) malloc(sizeof(database));
extern char *optarg;
char c;
//build up required arguments array. Just a bunch of random code to process the daemon call
static struct option opts[] = {
{"server", 1, NULL, 0},
{"user", 1, NULL, 1},
{"database", 1, NULL, 2},
{"table", 1, NULL, 3},
{"pool", 1, NULL, 4},
{"cmd", 1, NULL, 5},
{"workdir", 1, NULL, 6},
{0, 0, 0, 0}
};
while((c = getopt_long_only(argc, argv, "", opts, NULL)) != -1) {
switch(c) {
case 0:
if(strlen(optarg) >= 100) {
fprintf(stderr, "arg too big\n");
}
sscanf(optarg, "%s", db->server);
break;
case 1:
if(strlen(optarg) >= 25) {
fprintf(stderr, "arg too big\n");
}
sscanf(optarg, "%s", db->user);
break;
case 2:
if(strlen(optarg) >= 100) {
fprintf(stderr, "arg too big\n");
}
sscanf(optarg, "%s", db->database);
printf("%s\n", db->database);
break;
case 3:
if(strlen(optarg) >= 100) {
fprintf(stderr, "arg too big\n");
}
sscanf(optarg, "%s", db->table);
break;
case 4:
sscanf(optarg, "%u", &max_proc);
break;
case 5:
if(strlen(optarg) >= 150) {
fprintf(stderr, "arg too big\n");
}
strcpy(cmd, optarg);
break;
case 6:
if(strlen(optarg) >= 100) {
fprintf(stderr, "arg too big\n");
}
strcpy(workdir, optarg);
break;
}
}
/*Change to working directory*/
if(chdir(workdir) != 0) {
perror("Could not change directory");
exit(EXIT_FAILURE);
}
/*Get password*/
printf("Password:");
scanf("%s", db->password);
MYSQL *conn;
MYSQL_RES *res;
MYSQL_ROW row;
mysql_library_init(0, NULL, NULL);
/* Test our connection to the sql server before forking*/
conn = mysql_init(NULL);
/* Connect to database */
if (!mysql_real_connect(conn, db->server,
db->user, db->password, db->database, 0, NULL, 0)) {
fprintf(stderr, "Could not connect to mysql. %s\n", mysql_error(conn));
exit(EXIT_FAILURE);
}
/* Find tables */
if (mysql_query(conn, "show tables")) {
fprintf(stderr, "Could not retrieve table list. %s\n", mysql_error(conn));
exit(EXIT_FAILURE);
}
res = mysql_use_result(conn);
while ((row = mysql_fetch_row(res)) != NULL) {
if(strcmp(row[0], db->table) != 0) {
fprintf(stderr, "Could not find table %s\n", db->table);
exit(EXIT_FAILURE);
}
}
/* close connection */
mysql_free_result(res);
mysql_close(conn);
/* Our process ID and Session ID */
pid_t pid, sid;
/* Fork off the parent process */
pid = fork();
if (pid < 0) {
exit(EXIT_FAILURE);
}
/* If we got a good PID, then
we can exit the parent process. */
if (pid > 0) {
exit(EXIT_SUCCESS);
}
/* Change the file mode mask */
umask(0);
/* Open a connection to the syslog server */
setlogmask(LOG_UPTO(LOG_DEBUG));
openlog("[peplib Rdaemon]: ",LOG_NOWAIT|LOG_PID,LOG_USER);
/*begin logging*/
syslog(LOG_DEBUG, "rdaemon launched in directory %s", get_current_dir_name());
syslog(LOG_NOTICE, "rdaemon has begun with user [%s], database[%s], and host [%s].", db->user, db->database, db->server);
syslog(LOG_NOTICE, "rdaemon will fork at most %u processes to execute [%sJOB_ID].", max_proc, cmd);
/* Create a new SID for the child process */
sid = setsid();
if (sid < 0) {
/* Log the failure */
exit(EXIT_FAILURE);
}
/* Close out the standard file descriptors */
close(STDIN_FILENO);
close(STDOUT_FILENO);
close(STDERR_FILENO);
/*Setup process pool*/
cpids = (pid_t*) malloc(sizeof(pid_t) * max_proc);
running_cpids = 0;
unsigned int i;
for(i = 0; i < max_proc; i++)
cpids[i] = -1;
syslog(LOG_NOTICE, "Constructed process pool");
/*Register with signal now that we are in daemon mode*/
signal(SIGALRM, wait_for_change);
signal(SIGCHLD, clean_up_child);
syslog(LOG_NOTICE, "registered signal, beginning main loop");
wait_for_change();
closelog();
mysql_library_end();
free(db);
}
int doDispatch(const unsigned int job_id, const char* cmd) {
/* Our process ID and Session ID */
pid_t pid, ppid;
/* Fork off the parent process */
pid = fork();
if (pid < 0) {
return 1;
}
/* If we got a good PID, then
we reutrn to parent process. */
if (pid > 0) {
add_cpid(pid);
return 0;
}
/*set up signalling stuff*/
ppid = getppid();
pid = getpid();
/*Create a file to notify parent when we're finished*/
char filename[20];
sprintf(filename, "pid_%d", pid);
int fd = open(filename, O_WRONLY | O_CREAT, S_IWUSR | S_IRUSR);
if(fd == -1) {
syslog(LOG_ERR, "Failed to create PID file! %m");
exit(EXIT_FAILURE);
}
/*Lock the file with a non-blocking lock*/
if(lockf(fd, F_TLOCK, 0) != 0) {
syslog(LOG_ERR, "Failed to lock PID file! %m");
}
/*Execute the command */
syslog(LOG_INFO, "Executing [%s%d] in directory %s", cmd, job_id, get_current_dir_name());
char exestr[50];
sprintf(exestr, "%s%d > log_%d 2>&1", cmd, job_id, job_id);
system(exestr);
/*Remove lock so the parent knows it is complete*/
lockf(fd, F_ULOCK, 0);
/*Delete file*/
unlink(filename);
/*Signal success to the parent*/
syslog(LOG_INFO, "Exiting and signalling parent [%d]", ppid);
exit(EXIT_SUCCESS);
}
void check_for_change() {
MYSQL *conn;
MYSQL_RES *res;
MYSQL_ROW row;
unsigned int do_dispatch = 0;
char sql_query[100];
lock();
syslog(LOG_DEBUG, "Beginning SQL Query to monitor rows");
/* Connect to the sql server*/
conn = mysql_init(NULL);
/* Connect to database */
if (!mysql_real_connect(conn, db->server,
db->user, db->password, db->database, 0, NULL, 0)) {
syslog(LOG_ERR, "%s\n", mysql_error(conn));
mysql_close(conn);
exit(EXIT_FAILURE);
}
/* Query last row job_id */
sprintf(sql_query, "SELECT count(job_id) FROM %s WHERE dispatch is NULL", db->table);
if (mysql_query(conn, sql_query)) {
syslog(LOG_ERR, "Query failed with \"%s\"", mysql_error(conn));
mysql_close(conn);
exit(EXIT_FAILURE);
}
res = mysql_use_result(conn);
while ((row = mysql_fetch_row(res)) != NULL) {
if(atoi(row[0]) > 0) {
do_dispatch = 1;
syslog(LOG_DEBUG, "Found new row");
}
}
/* close connection */
mysql_free_result(res);
mysql_close(conn);
/* Do dispatches if necessary */
unlock();
if(do_dispatch) {
prepareJob();
}
}
void prepareJob() {
lock();
syslog(LOG_DEBUG, "Dispatching jobs");
MYSQL *conn;
MYSQL_RES *res;
MYSQL_ROW row;
unsigned int* jobs_to_dispatch;
unsigned int result_number;
unsigned int i;
unsigned int jobs_dispatched;
char sql_query[100];
conn = mysql_init(NULL);
/* Connect to database */
if (!mysql_real_connect(conn, db->server,
db->user, db->password, db->database, 0, NULL, 0)) {
syslog(LOG_ERR, "SQL connection failed with \"%s\"", mysql_error(conn));
exit(EXIT_FAILURE);
}
/* Query last row job_id */
sprintf(sql_query, "SELECT job_id FROM %s WHERE dispatch IS NULL", db->table);
if (mysql_query(conn, sql_query)) {
syslog(LOG_ERR, "Query failed with \"%s\"", mysql_error(conn));
mysql_close(conn);
exit(EXIT_FAILURE);
}
res = mysql_use_result(conn);
/*Create an array to store the job ids in, so that we can refer to them later*/
result_number = mysql_num_rows(res);
jobs_to_dispatch = (unsigned int*) malloc(result_number * sizeof(unsigned int*));
/*Loop through the rows for dispatching but do not go over the process pool number*/
tally_children();
for(i = 0; (running_cpids < max_proc) && (row = mysql_fetch_row(res)) != NULL; i++) {
jobs_to_dispatch[i] = atoi(row[0]);
if(doDispatch(jobs_to_dispatch[i], cmd) != 0) {
syslog(LOG_ERR, "Failed to dispatch %u. Exiting", jobs_to_dispatch[i]);
mysql_close(conn);
exit(EXIT_FAILURE);
}
syslog(LOG_NOTICE, "Dispatching job %u", jobs_to_dispatch[i]);
}
/*record the number of jobs actually dispatched*/
jobs_dispatched = i;
/* free result */
mysql_free_result(res);
/*Update rows which were dispatched*/
for(i = 0;i < jobs_dispatched; i++) {
sprintf(sql_query, "UPDATE %s SET dispatch=CURTIME() WHERE job_id=%d", db->table, jobs_to_dispatch[i]);
if (mysql_query(conn, sql_query)) {
syslog(LOG_ERR, "UPDATE failed with \"%s\"", mysql_error(conn));
mysql_close(conn);
exit(EXIT_FAILURE);
}
}
mysql_close(conn);
free(jobs_to_dispatch);
unlock();
}
void wait_for_change() {
while (1) {
check_for_change();
syslog(LOG_NOTICE, "Sleeping");
sleep(SLEEP);
wait(); //wait for child processes that somehow were uncaught (zombies)
}
}
void lock() {
sigset_t st;
sigemptyset (&st);
sigaddset(&st, SIGALRM);
sigaddset(&st, SIGCHLD);
sigprocmask(SIG_BLOCK, &st, NULL);
syslog(LOG_DEBUG, "Locked");
}
void unlock() {
sigset_t st;
sigemptyset (&st);
sigaddset(&st, SIGALRM);
sigaddset(&st, SIGCHLD);
sigprocmask(SIG_UNBLOCK, &st, NULL);
syslog(LOG_DEBUG, "Unlocked");
}
void add_cpid(pid_t cpid) {
unsigned int i;
unsigned short int success = 0;
for(i = 0; i < max_proc; i++) {
if(cpids[i] == -1) {
cpids[i] = cpid;
running_cpids++;
success = 1;
break;
}
}
if(!success) {
syslog(LOG_ERR, "Too many children processes running! Aborting");
exit(EXIT_FAILURE);
}
}
int tally_children() {
unsigned int i;
int fd;
extern int errno;
unsigned short int finished;
unsigned short int possible_error;
char filename[20];
for(i = 0; i < max_proc; i++) {
if(cpids[i] == -1)
continue;
finished = 0;
possible_error = 0;
sprintf(filename, "pid_%d", cpids[i]);
fd = open(filename, O_WRONLY);
if(fd == -1) {
finished = 1;
}
/*Check if the file is locked*/
if(!finished && lockf(fd, F_TLOCK, 0) != 0) {
if(errno == EAGAIN) {
continue;
} else {
syslog(LOG_ERR, "Very strange bad locking condition for file %s! %m", filename);
exit(EXIT_FAILURE);
}
} else {
finished = 1;
possible_error = 1;
}
if(finished) {
if(possible_error) {
syslog(LOG_ERR, "Child process did not exit gracefully");
unlink(filename);
}
cpids[i] = -1;
running_cpids--;
syslog(LOG_DEBUG, "Detected completed process, %d left running", running_cpids);
}
}
return running_cpids;
}
void clean_up_child() {
syslog(LOG_NOTICE, "Registered child signal");
syslog(LOG_INFO, "Attempting to reap dead children.");
waitpid(-1, NULL, 0 | WNOHANG);
wait_for_change();
}
view raw gistfile1.c hosted with ❤ by GitHub

No comments:

Post a Comment