I thought I came up with a clever solution so I thought I'd share it. Let me first say, that part of the reason I chose this solution is because I was interested in practicing my C. The particular features of C I'm playing with are: signalling, forking, and SQL. So here is my list of what this program does:
- Creates a deamon
- Deamon periodically monitors SQL table
- When something changes, it exectues a command
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* Our process ID and Session ID */ | |
pid_t pid, sid; | |
/* Fork off the parent process */ | |
pid = fork(); | |
/* If we got a good PID, then | |
we can exit the parent process. */ | |
if (pid > 0) { | |
exit(0); /* No problems, just the parent leaving.*/ | |
} | |
/* Create a new SID for the child process */ | |
sid = setsid(); |
Step 2 means we need to monitor the SQL table for changes. For now, assume our table has two columns: a filename column and a dispatch column. When we wish to execute a job, we make the dispatch column NULL. That is the signal to the daemon that it should execute its job on that row, where the filename is a parameter to the job. Here is the minimum code to accomplish this:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/*Structure used for SQL queries*/ | |
typedef struct database_s { | |
char server[100]; | |
char user[25]; | |
char password[50]; | |
char database[100]; | |
char table[100]; | |
} database; | |
database* db; = //.....Set up your database | |
void wait_for_change() { | |
MYSQL *conn; | |
MYSQL_RES *res; | |
MYSQL_ROW row; | |
unsigned int do_dispatch = 0; | |
char sql_query[100]; | |
/* Connect to the sql server*/ | |
conn = mysql_init(NULL); | |
/* Connect to database */ | |
if (!mysql_real_connect(conn, db->server, | |
db->user, db->password, db->database, 0, NULL, 0)) { | |
mysql_close(conn); | |
exit(EXIT_FAILURE); | |
} | |
/* Query last row job_id */ | |
sprintf(sql_query, "SELECT count(job_id) FROM %s WHERE dispatch is NULL", db->table); | |
if (mysql_query(conn, sql_query)) { | |
exit(EXIT_FAILURE); | |
} | |
res = mysql_use_result(conn); | |
while ((row = mysql_fetch_row(res)) != NULL) { | |
/*was the count greater than 0?*/ | |
if(atoi(row[0]) > 0) { | |
do_dispatch = 1; | |
} | |
} | |
/* close connection */ | |
mysql_free_result(res); | |
mysql_close(conn); | |
/* Do dispatches if necessary */ | |
if(do_dispatch) { | |
//Execute job | |
// ... | |
} | |
} |
The next thing we need to do is call this code from our daemon. The easiest way is to check every so many seconds. This may be accomplished with this code:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
void wait_for_change() { | |
while (1) { | |
check_for_change(); | |
sleep(10 * 1000); /* sleep for 10 seconds*/ | |
} | |
} |
signal(SIGALRM, wait_for_change);This code says that when we are signalled, we call the wait for change method. Notice that if we were waiting, we would go back to the top of the loop and call check_for_change(). Thus, we will immediately check for a change. To signal the program, we may use this command from either the terminal or another program:
pkill -ALRM [program name]There is one slight problem, however. Signals can come at any time in our code. You can imagine that it would be bad if we were in the middle of querying the SQL database or executing a job and we went straight to the check_for_change code. We would leave open files, open connections, and other badness. So we "lock" and "unlock" our code when it is important that we aren't interrupted by a signal. Here are these two method:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
void lock() { | |
sigset_t st; | |
sigemptyset (&st); | |
sigaddset(&st, SIGALRM); | |
sigprocmask(SIG_BLOCK, &st, NULL); | |
} | |
void unlock() { | |
sigset_t st; | |
sigemptyset (&st); | |
sigaddset(&st, SIGALRM); | |
sigprocmask(SIG_UNBLOCK, &st, NULL); | |
} |
Finally, we need to set-up a job manager. Basically, we want to allow only a finite number of jobs to execute at any given time. I won't go into too much detail of the code, but here's the simple explanation of how it works:
- If the counter for the number of jobs is greater than 0, fork the program like we did for the deamon but without creating a new session. We decrement the number of jobs we can execute.
- When a child finishes, it will signal the daemon. The daemon records that a child finished.
In theory, the child signals (step 2) could be used to conut the number of children processes that have finished. However, I've found practically that this doesn't always work. If a child crashes, somehow the parent isn't always signaled. Maybe someone more knowledgeable can leave a comment about that. I instead have the children construct empty files when they start and delete them when finished. Then the parent program just counts the number of these files to get the number of running jobs. These files are also useful for debugging information.
That's basically it. Here now is the complete program. It contains locks and unlocks, much more error handling, and logging compared to what we've seen before.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <sys/types.h> | |
#include <sys/stat.h> | |
#include <stdio.h> | |
#include <stdlib.h> | |
#include <fcntl.h> | |
#include <errno.h> | |
#include <unistd.h> | |
#include <syslog.h> | |
#include <string.h> | |
#include <mysql.h> | |
#include <getopt.h> | |
#include <signal.h> | |
#define SLEEP 500 | |
/* | |
* See the help for more information on what this does. | |
* | |
* | |
*/ | |
/*forks and executes the job*/ | |
int doDispatch(const unsigned int job_id, const char* cmd); | |
/*Prints help information*/ | |
void print_help(); | |
/*Is called once it is determined a job must be dispatched. Gathers the information from the SQL table to pass to the new job*/ | |
void prepareJob(); | |
/*Check for database changes*/ | |
void check_for_change(); | |
/*Waiting method, allowing the program to sleep*/ | |
void wait_for_change(); | |
/*Lock the daemon so that it may not be interrupted by signals.*/ | |
void lock(); //do not accept sigalrms | |
/*Unlocks it*/ | |
void unlock(); | |
/*Add up the number of spawned processes.*/ | |
int tally_children(); | |
/*Add a new child pid to the list of spawned processes*/ | |
void add_cpid(pid_t cpid); | |
/*Wait for a process to finish to enable cleaning up child*/ | |
void clean_up_child(); | |
/*Structure used for SQL queries*/ | |
typedef struct database_s { | |
char server[100]; | |
char user[25]; | |
char password[50]; | |
char database[100]; | |
char table[100]; | |
} database; | |
/* Declare global variables so when the process is signalled, it has access to its state*/ | |
database* db; | |
char workdir[100]; | |
pid_t* cpids; | |
int running_cpids; | |
unsigned int max_proc; | |
char cmd[150]; | |
void print_help() { | |
printf("Usage: rdaemon --workdir=[workdir] --server=[localhost] --user=[root] --database=[mysql] --table=[table name] --cmd=[command to execute] --pool=[max number of processes]\n"); | |
printf("This daemon minitors the given table for changes. When it detects a change, by moniting\n"); | |
printf("the last row, it will execute [cmd] on all new rows. It determines\n"); | |
printf("if a job has been dispatch (a row is new) by getting and setting the column \n"); | |
printf("\"dispatch\". \"dispatch\" should be a TIME field and \"job_id\", should be an \n"); | |
printf("INTEGER. [cmd] should take a single argument, the job_id.\n"); | |
} | |
int main(int argc, char* argv[]) { | |
//process the command line arguments | |
if(argc != 8) { | |
print_help(); | |
exit(EXIT_FAILURE); | |
} | |
db = (database*) malloc(sizeof(database)); | |
extern char *optarg; | |
char c; | |
//build up required arguments array. Just a bunch of random code to process the daemon call | |
static struct option opts[] = { | |
{"server", 1, NULL, 0}, | |
{"user", 1, NULL, 1}, | |
{"database", 1, NULL, 2}, | |
{"table", 1, NULL, 3}, | |
{"pool", 1, NULL, 4}, | |
{"cmd", 1, NULL, 5}, | |
{"workdir", 1, NULL, 6}, | |
{0, 0, 0, 0} | |
}; | |
while((c = getopt_long_only(argc, argv, "", opts, NULL)) != -1) { | |
switch(c) { | |
case 0: | |
if(strlen(optarg) >= 100) { | |
fprintf(stderr, "arg too big\n"); | |
} | |
sscanf(optarg, "%s", db->server); | |
break; | |
case 1: | |
if(strlen(optarg) >= 25) { | |
fprintf(stderr, "arg too big\n"); | |
} | |
sscanf(optarg, "%s", db->user); | |
break; | |
case 2: | |
if(strlen(optarg) >= 100) { | |
fprintf(stderr, "arg too big\n"); | |
} | |
sscanf(optarg, "%s", db->database); | |
printf("%s\n", db->database); | |
break; | |
case 3: | |
if(strlen(optarg) >= 100) { | |
fprintf(stderr, "arg too big\n"); | |
} | |
sscanf(optarg, "%s", db->table); | |
break; | |
case 4: | |
sscanf(optarg, "%u", &max_proc); | |
break; | |
case 5: | |
if(strlen(optarg) >= 150) { | |
fprintf(stderr, "arg too big\n"); | |
} | |
strcpy(cmd, optarg); | |
break; | |
case 6: | |
if(strlen(optarg) >= 100) { | |
fprintf(stderr, "arg too big\n"); | |
} | |
strcpy(workdir, optarg); | |
break; | |
} | |
} | |
/*Change to working directory*/ | |
if(chdir(workdir) != 0) { | |
perror("Could not change directory"); | |
exit(EXIT_FAILURE); | |
} | |
/*Get password*/ | |
printf("Password:"); | |
scanf("%s", db->password); | |
MYSQL *conn; | |
MYSQL_RES *res; | |
MYSQL_ROW row; | |
mysql_library_init(0, NULL, NULL); | |
/* Test our connection to the sql server before forking*/ | |
conn = mysql_init(NULL); | |
/* Connect to database */ | |
if (!mysql_real_connect(conn, db->server, | |
db->user, db->password, db->database, 0, NULL, 0)) { | |
fprintf(stderr, "Could not connect to mysql. %s\n", mysql_error(conn)); | |
exit(EXIT_FAILURE); | |
} | |
/* Find tables */ | |
if (mysql_query(conn, "show tables")) { | |
fprintf(stderr, "Could not retrieve table list. %s\n", mysql_error(conn)); | |
exit(EXIT_FAILURE); | |
} | |
res = mysql_use_result(conn); | |
while ((row = mysql_fetch_row(res)) != NULL) { | |
if(strcmp(row[0], db->table) != 0) { | |
fprintf(stderr, "Could not find table %s\n", db->table); | |
exit(EXIT_FAILURE); | |
} | |
} | |
/* close connection */ | |
mysql_free_result(res); | |
mysql_close(conn); | |
/* Our process ID and Session ID */ | |
pid_t pid, sid; | |
/* Fork off the parent process */ | |
pid = fork(); | |
if (pid < 0) { | |
exit(EXIT_FAILURE); | |
} | |
/* If we got a good PID, then | |
we can exit the parent process. */ | |
if (pid > 0) { | |
exit(EXIT_SUCCESS); | |
} | |
/* Change the file mode mask */ | |
umask(0); | |
/* Open a connection to the syslog server */ | |
setlogmask(LOG_UPTO(LOG_DEBUG)); | |
openlog("[peplib Rdaemon]: ",LOG_NOWAIT|LOG_PID,LOG_USER); | |
/*begin logging*/ | |
syslog(LOG_DEBUG, "rdaemon launched in directory %s", get_current_dir_name()); | |
syslog(LOG_NOTICE, "rdaemon has begun with user [%s], database[%s], and host [%s].", db->user, db->database, db->server); | |
syslog(LOG_NOTICE, "rdaemon will fork at most %u processes to execute [%sJOB_ID].", max_proc, cmd); | |
/* Create a new SID for the child process */ | |
sid = setsid(); | |
if (sid < 0) { | |
/* Log the failure */ | |
exit(EXIT_FAILURE); | |
} | |
/* Close out the standard file descriptors */ | |
close(STDIN_FILENO); | |
close(STDOUT_FILENO); | |
close(STDERR_FILENO); | |
/*Setup process pool*/ | |
cpids = (pid_t*) malloc(sizeof(pid_t) * max_proc); | |
running_cpids = 0; | |
unsigned int i; | |
for(i = 0; i < max_proc; i++) | |
cpids[i] = -1; | |
syslog(LOG_NOTICE, "Constructed process pool"); | |
/*Register with signal now that we are in daemon mode*/ | |
signal(SIGALRM, wait_for_change); | |
signal(SIGCHLD, clean_up_child); | |
syslog(LOG_NOTICE, "registered signal, beginning main loop"); | |
wait_for_change(); | |
closelog(); | |
mysql_library_end(); | |
free(db); | |
} | |
int doDispatch(const unsigned int job_id, const char* cmd) { | |
/* Our process ID and Session ID */ | |
pid_t pid, ppid; | |
/* Fork off the parent process */ | |
pid = fork(); | |
if (pid < 0) { | |
return 1; | |
} | |
/* If we got a good PID, then | |
we reutrn to parent process. */ | |
if (pid > 0) { | |
add_cpid(pid); | |
return 0; | |
} | |
/*set up signalling stuff*/ | |
ppid = getppid(); | |
pid = getpid(); | |
/*Create a file to notify parent when we're finished*/ | |
char filename[20]; | |
sprintf(filename, "pid_%d", pid); | |
int fd = open(filename, O_WRONLY | O_CREAT, S_IWUSR | S_IRUSR); | |
if(fd == -1) { | |
syslog(LOG_ERR, "Failed to create PID file! %m"); | |
exit(EXIT_FAILURE); | |
} | |
/*Lock the file with a non-blocking lock*/ | |
if(lockf(fd, F_TLOCK, 0) != 0) { | |
syslog(LOG_ERR, "Failed to lock PID file! %m"); | |
} | |
/*Execute the command */ | |
syslog(LOG_INFO, "Executing [%s%d] in directory %s", cmd, job_id, get_current_dir_name()); | |
char exestr[50]; | |
sprintf(exestr, "%s%d > log_%d 2>&1", cmd, job_id, job_id); | |
system(exestr); | |
/*Remove lock so the parent knows it is complete*/ | |
lockf(fd, F_ULOCK, 0); | |
/*Delete file*/ | |
unlink(filename); | |
/*Signal success to the parent*/ | |
syslog(LOG_INFO, "Exiting and signalling parent [%d]", ppid); | |
exit(EXIT_SUCCESS); | |
} | |
void check_for_change() { | |
MYSQL *conn; | |
MYSQL_RES *res; | |
MYSQL_ROW row; | |
unsigned int do_dispatch = 0; | |
char sql_query[100]; | |
lock(); | |
syslog(LOG_DEBUG, "Beginning SQL Query to monitor rows"); | |
/* Connect to the sql server*/ | |
conn = mysql_init(NULL); | |
/* Connect to database */ | |
if (!mysql_real_connect(conn, db->server, | |
db->user, db->password, db->database, 0, NULL, 0)) { | |
syslog(LOG_ERR, "%s\n", mysql_error(conn)); | |
mysql_close(conn); | |
exit(EXIT_FAILURE); | |
} | |
/* Query last row job_id */ | |
sprintf(sql_query, "SELECT count(job_id) FROM %s WHERE dispatch is NULL", db->table); | |
if (mysql_query(conn, sql_query)) { | |
syslog(LOG_ERR, "Query failed with \"%s\"", mysql_error(conn)); | |
mysql_close(conn); | |
exit(EXIT_FAILURE); | |
} | |
res = mysql_use_result(conn); | |
while ((row = mysql_fetch_row(res)) != NULL) { | |
if(atoi(row[0]) > 0) { | |
do_dispatch = 1; | |
syslog(LOG_DEBUG, "Found new row"); | |
} | |
} | |
/* close connection */ | |
mysql_free_result(res); | |
mysql_close(conn); | |
/* Do dispatches if necessary */ | |
unlock(); | |
if(do_dispatch) { | |
prepareJob(); | |
} | |
} | |
void prepareJob() { | |
lock(); | |
syslog(LOG_DEBUG, "Dispatching jobs"); | |
MYSQL *conn; | |
MYSQL_RES *res; | |
MYSQL_ROW row; | |
unsigned int* jobs_to_dispatch; | |
unsigned int result_number; | |
unsigned int i; | |
unsigned int jobs_dispatched; | |
char sql_query[100]; | |
conn = mysql_init(NULL); | |
/* Connect to database */ | |
if (!mysql_real_connect(conn, db->server, | |
db->user, db->password, db->database, 0, NULL, 0)) { | |
syslog(LOG_ERR, "SQL connection failed with \"%s\"", mysql_error(conn)); | |
exit(EXIT_FAILURE); | |
} | |
/* Query last row job_id */ | |
sprintf(sql_query, "SELECT job_id FROM %s WHERE dispatch IS NULL", db->table); | |
if (mysql_query(conn, sql_query)) { | |
syslog(LOG_ERR, "Query failed with \"%s\"", mysql_error(conn)); | |
mysql_close(conn); | |
exit(EXIT_FAILURE); | |
} | |
res = mysql_use_result(conn); | |
/*Create an array to store the job ids in, so that we can refer to them later*/ | |
result_number = mysql_num_rows(res); | |
jobs_to_dispatch = (unsigned int*) malloc(result_number * sizeof(unsigned int*)); | |
/*Loop through the rows for dispatching but do not go over the process pool number*/ | |
tally_children(); | |
for(i = 0; (running_cpids < max_proc) && (row = mysql_fetch_row(res)) != NULL; i++) { | |
jobs_to_dispatch[i] = atoi(row[0]); | |
if(doDispatch(jobs_to_dispatch[i], cmd) != 0) { | |
syslog(LOG_ERR, "Failed to dispatch %u. Exiting", jobs_to_dispatch[i]); | |
mysql_close(conn); | |
exit(EXIT_FAILURE); | |
} | |
syslog(LOG_NOTICE, "Dispatching job %u", jobs_to_dispatch[i]); | |
} | |
/*record the number of jobs actually dispatched*/ | |
jobs_dispatched = i; | |
/* free result */ | |
mysql_free_result(res); | |
/*Update rows which were dispatched*/ | |
for(i = 0;i < jobs_dispatched; i++) { | |
sprintf(sql_query, "UPDATE %s SET dispatch=CURTIME() WHERE job_id=%d", db->table, jobs_to_dispatch[i]); | |
if (mysql_query(conn, sql_query)) { | |
syslog(LOG_ERR, "UPDATE failed with \"%s\"", mysql_error(conn)); | |
mysql_close(conn); | |
exit(EXIT_FAILURE); | |
} | |
} | |
mysql_close(conn); | |
free(jobs_to_dispatch); | |
unlock(); | |
} | |
void wait_for_change() { | |
while (1) { | |
check_for_change(); | |
syslog(LOG_NOTICE, "Sleeping"); | |
sleep(SLEEP); | |
wait(); //wait for child processes that somehow were uncaught (zombies) | |
} | |
} | |
void lock() { | |
sigset_t st; | |
sigemptyset (&st); | |
sigaddset(&st, SIGALRM); | |
sigaddset(&st, SIGCHLD); | |
sigprocmask(SIG_BLOCK, &st, NULL); | |
syslog(LOG_DEBUG, "Locked"); | |
} | |
void unlock() { | |
sigset_t st; | |
sigemptyset (&st); | |
sigaddset(&st, SIGALRM); | |
sigaddset(&st, SIGCHLD); | |
sigprocmask(SIG_UNBLOCK, &st, NULL); | |
syslog(LOG_DEBUG, "Unlocked"); | |
} | |
void add_cpid(pid_t cpid) { | |
unsigned int i; | |
unsigned short int success = 0; | |
for(i = 0; i < max_proc; i++) { | |
if(cpids[i] == -1) { | |
cpids[i] = cpid; | |
running_cpids++; | |
success = 1; | |
break; | |
} | |
} | |
if(!success) { | |
syslog(LOG_ERR, "Too many children processes running! Aborting"); | |
exit(EXIT_FAILURE); | |
} | |
} | |
int tally_children() { | |
unsigned int i; | |
int fd; | |
extern int errno; | |
unsigned short int finished; | |
unsigned short int possible_error; | |
char filename[20]; | |
for(i = 0; i < max_proc; i++) { | |
if(cpids[i] == -1) | |
continue; | |
finished = 0; | |
possible_error = 0; | |
sprintf(filename, "pid_%d", cpids[i]); | |
fd = open(filename, O_WRONLY); | |
if(fd == -1) { | |
finished = 1; | |
} | |
/*Check if the file is locked*/ | |
if(!finished && lockf(fd, F_TLOCK, 0) != 0) { | |
if(errno == EAGAIN) { | |
continue; | |
} else { | |
syslog(LOG_ERR, "Very strange bad locking condition for file %s! %m", filename); | |
exit(EXIT_FAILURE); | |
} | |
} else { | |
finished = 1; | |
possible_error = 1; | |
} | |
if(finished) { | |
if(possible_error) { | |
syslog(LOG_ERR, "Child process did not exit gracefully"); | |
unlink(filename); | |
} | |
cpids[i] = -1; | |
running_cpids--; | |
syslog(LOG_DEBUG, "Detected completed process, %d left running", running_cpids); | |
} | |
} | |
return running_cpids; | |
} | |
void clean_up_child() { | |
syslog(LOG_NOTICE, "Registered child signal"); | |
syslog(LOG_INFO, "Attempting to reap dead children."); | |
waitpid(-1, NULL, 0 | WNOHANG); | |
wait_for_change(); | |
} |
No comments:
Post a Comment