 |
Guide to DECthreads
Example 6-2 implements a simple, text-based, asynchronous user
interface. It allows you to use the terminal to start multiple commands
that run concurrently and that report their results at the terminal
when complete. You can monitor the status of, or cancel, commands that
are already running.
This C program utilizes DECthreads pthread interface
routines but also uses the DECthreads exception package to capture and
cleanup from thread cancelations (and other synchronous fatal errors)
as exceptions.
Asynchronous Commands
The asynchronous commands are date and time.
The asynchronous commands are as follows:
- date delay_number_of_seconds
Waits the
specified number of seconds before displaying today's date.
- time delay_number_of_seconds
Waits the
specified number of seconds before displaying the time of day.
For example, issuing the following command causes the program to wait
10 seconds before reporting the time:
Housekeeping Commands
The housekeeping commands are as follows:
- status command_number
Displays the state of
a command.
- wait command_number
Waits for a command to
finish.
- cancel command_number
Stops a command.
The argument command_number is the number of the command that
assigned and displayed when the asynchronous command starts.
This program is limited to four outstanding commands.
Here is a sample of the output that the program produces:
Info> help
Commands are formed by a verb and an optional numeric argument.
The following commands are available:
Cancel <COMMAND> Cancel running command
Date <DELAY> Print the date
Help Print this text
Quit Quit (same as EOF)
Status [<COMMAND>] Report on running command
Time <DELAY> Print the time
Wait <COMMAND> Wait for command to finish
<COMMAND> refers to the command number.
<DELAY> delays the command execution for some number of seconds.
This delay simulates a command task that actually takes some
period of time to execute. During this delay, commands may be
initiated, queried, and/or canceled.
Info> time 5
This is command #0.
Info> date 15
This is command #1.
(0) At the tone the time will be, 11:19:46.
Info> status 1
Command #1: "date", 8 seconds remaining.
Info> status 1
Command #1: "date", 5 seconds remaining.
Info> time 10
This is command #0.
Info> status 0
Command #0: "time", 8 seconds remaining.
Info> status 1
Command #1: "date", waiting to print.
(1) Today is Tue, 6 Oct 1992.
Info> time 3
This is command #0.
Info> wait 0
(0) At the tone the time will be, 11:21:26.
Info> date 10
This is command #0.
Info> cancel 0
(0) Canceled.
Info> quit
|
The following pthread routines are used in Example 6-2:
- pthread_cancel()
- pthread_cond_signal()
- pthread_cond_wait()
- pthread_create()
- pthread_delay_np()
- pthread_detach()
- pthread_exc_report_np()
- pthread_join()
- pthread_mutex_init()
- pthread_mutex_lock()
- pthread_mutex_unlock()
- pthread_once()
- sched_yield()
In the program source, notice that:
- The main() routine uses pthread_once() to
perform one-time initialization.
- The do_delay() routine specifies the preset delay
interval. For a timespec structure, initializing tv.sec =
1 and tv.nsec = 0 results in a delay of one second.
- The do_cleanup() and find_free_thread() routines
must lock two mutexes at the same time. To avoid deadlock, each routine
in the program must lock the two mutexes in the same order.
- The find_free_thread() routine uses
pthread_detach() to detach the free thread found because no
other threads will join with it.
Example 6-2 C Program Example (Asynchronous
User Interface) |
/*
*
* DECthreads example program featuring an asynchronous user interface
*
*/
/*
* Include files
*/
#include <pthread.h>
#include <pthread_exception.h>
#include <stdio.h>
#include <time.h>
#define check(status,string) if (status != 0) { \
errno = status; \
fprintf (stderr, "%s status %d: %s\n", status, string, strerror (status)); \
}
/*
* Local definitions
*/
#define PROMPT "Info> " /* Prompt string */
#define MAXLINSIZ 81 /* Command line size */
#define THDNUM 5 /* Number of server threads */
/*
* Server thread "states"
*/
#define ST_INIT 0 /* "Initial" state (no thread) */
#define ST_FINISHED 1 /* Command completed */
#define ST_CANCELED 2 /* Command was canceled */
#define ST_ERROR 3 /* Command was terminated by an error */
#define ST_RUNNING 4 /* Command is running */
#ifndef FALSE /* Just in case these are not defined */
# define FALSE 0
# define TRUE (!FALSE)
#endif
#ifndef NULL /* Just in case this is not defined */
# define NULL ((void*)0)
#endif
/*
* Global variables
*/
struct THREAD_DATA {
pthread_t thread; /* Server thread handle */
pthread_mutex_t mutex; /* Mutex to protect fields below */
int time; /* Amount of delay remaining */
char task; /* Task being performed ('t' or 'd') */
int state; /* State of the server thread */
} thread_data[THDNUM];
pthread_mutex_t free_thread_mutex = /* Mutex to protect "free_thread" */
PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t free_thread_cv = /* Condition variable for same */
PTHREAD_COND_INITIALIZER;
int free_thread; /* Flag indicating a free thread */
/*
* Local Routines
*/
static void
dispatch_task (void *(*routine)(void*), char task, int time);
static void
do_cancel (int index);
static void
do_cleanup (int index, int final_state);
static void*
do_date (void* arg);
static void
do_delay (int index);
static void
do_status (int index);
static void*
do_time (void* arg);
static void
do_wait (int index);
static int
find_free_thread (int *index);
static char *
get_cmd (char *buffer, int size);
static int
get_y_or_n (char *query, char defans);
static void
init_routine (void);
static void
print_help (void);
/*
* The main program:
*/
main()
{
int done = FALSE; /* Flag indicating user is "done" */
char cmdline[MAXLINSIZ]; /* Command line */
char cmd_wd[MAXLINSIZ]; /* Command word */
int cmd_arg; /* Command argument */
int cmd_cnt; /* Number of items on command line */
int status;
void *(*routine)(void*); /* Routine to execute in a thread */
static pthread_once_t once_block = PTHREAD_ONCE_INIT;
/*
* Perform program initialization.
*/
status = pthread_once (&once_block, init_routine);
check (status, "Pthread_once");
/*
* Main command loop
*/
do {
/*
* Get and parse a command. Yield first so that any threads waiting
* to execute get a chance to before we take out the global lock
* and block for I/O.
*/
sched_yield ();
if (get_cmd(cmdline, sizeof (cmdline))) {
cmd_cnt = sscanf (cmdline, "%s %d", cmd_wd, &cmd_arg);
routine = NULL; /* No routine yet */
if ((cmd_cnt == 1) || (cmd_cnt == 2)) { /* Normal result */
cmd_wd[0] = tolower(cmd_wd[0]); /* Map to lower case */
switch (cmd_wd[0]) {
case 'h': /* "Help" */
case '?':
print_help();
break;
case 'q': /* "Quit" */
done = TRUE;
break;
case 's': /* "Status" */
do_status ((cmd_cnt == 2 ? cmd_arg : -1));
break;
/*
* These commands require an argument
*/
case 'c': /* "Cancel" */
case 'd': /* "Date" */
case 't': /* "Time" */
case 'w': /* "Wait" */
if (cmd_cnt != 2)
printf ("Missing command argument.\n");
else {
switch (cmd_wd[0]) {
case 'c': /* "Cancel" */
do_cancel (cmd_arg);
break;
case 'd': /* "Date" */
routine = do_date;
break;
case 't': /* "Time" */
routine = do_time;
break;
case 'w': /* "Wait" */
do_wait (cmd_arg);
break;
}
}
break;
default:
printf ("Unrecognized command.\n");
break;
}
}
else if (cmd_cnt != EOF) /* Ignore blank command line */
printf ("Unexpected parse error.\n");
/*
* If there is a routine to be executed in a server thread,
* create the thread.
*/
if (routine) dispatch_task (routine, cmd_wd[0], cmd_arg);
}
else
done = TRUE;
} while (!done);
}
/*
* Create a thread to handle the user's request.
*/
static void
dispatch_task (void *(*routine)(void*), char task, int time)
{
int i; /* Index of free thread slot */
int status;
if (find_free_thread (&i)) {
/*
* Record the data for this thread where both the main thread and the
* server thread can share it. Lock the mutex to ensure exclusive
* access to the storage.
*/
status = pthread_mutex_lock (&thread_data[i].mutex);
check (status, "Mutex_lock");
thread_data[i].time = time;
thread_data[i].task = task;
thread_data[i].state = ST_RUNNING;
status = pthread_mutex_unlock (&thread_data[i].mutex);
check (status, "Mutex_unlock");
/*
* Create the thread, using the default attributes. The thread will
* execute the specified routine and get its data from array slot 'i'.
*/
status = pthread_create (
&thread_data[i].thread,
NULL,
routine,
(void*)i);
check (status, "Pthread_create");
printf ("This is command #%d.\n\n", i);
}
}
/*
* Wait for the completion of the specified command.
*/
static void
do_cancel (int index)
{
int cancelable;
int status;
if ((index < 0) || (index >= THDNUM))
printf ("Bad command number %d.\n", index);
else {
status = pthread_mutex_lock (&thread_data[index].mutex);
check (status, "Mutex_lock");
cancelable = (thread_data[index].state == ST_RUNNING);
status = pthread_mutex_unlock (&thread_data[index].mutex);
check (status, "Mutex_unlock");
if (cancelable) {
status = pthread_cancel (thread_data[index].thread);
check (status, "Pthread_cancel");
}
else
printf ("Command %d is not active.\n", index);
}
}
/*
* Post-task clean-up routine.
*/
static void
do_cleanup (int index, int final_state)
{
int status;
/*
* This thread is about to make the change from "running" to "finished",
* so lock a mutex to prevent a race condition in which the main thread
* sees this thread as finished before it is actually done cleaning up.
*
* Note that when attempting to lock more than one mutex at a time,
* always lock the mutexes in the same order everywhere in the code.
* The ordering here is the same as in "find_free_thread".
*/
status = pthread_mutex_lock (&free_thread_mutex);
check (status, "Mutex_lock");
/*
* Mark the thread as finished with its task.
*/
status = pthread_mutex_lock (&thread_data[index].mutex);
check (status, "Mutex_lock");
thread_data[index].state = final_state;
status = pthread_mutex_unlock (&thread_data[index].mutex);
check (status, "Mutex_unlock");
/*
* Set the flag indicating that there is a free thread, and signal the
* main thread, in case it is waiting.
*/
free_thread = TRUE;
status = pthread_cond_signal (&free_thread_cv);
check (status, "Cond_signal");
status = pthread_mutex_unlock (&free_thread_mutex);
check (status, "Mutex_unlock");
}
/*
* Thread routine that prints out the date.
*
* Synchronize access to ctime as it is not thread-safe (it returns the address
* of a static string). Also synchronize access to stdio routines.
*/
static void*
do_date (void* arg)
{
time_t clock_time; /* Julian time */
char *date_str; /* Pointer to string returned from ctime */
char day[4], month[4], date[3], year[5]; /* Pieces of ctime string */
TRY {
/*
* Pretend that this task actually takes a long time to perform.
*/
do_delay ((int)arg);
clock_time = time ((time_t *)0);
date_str = ctime (&clock_time);
sscanf (date_str, "%s %s %s %*s %s", day, month, date, year);
printf ("%d) Today is %s, %s %s %s.\n\n", arg, day, date, month, year);
}
CATCH (pthread_cancel_e) {
printf ("%d) Canceled.\n", arg);
/*
* Perform exit actions
*/
do_cleanup ((int)arg, ST_CANCELED);
RERAISE;
}
CATCH_ALL {
printf ("%d) ", arg);
pthread_exc_report_np (THIS_CATCH);
/*
* Perform exit actions
*/
do_cleanup ((int)arg, ST_ERROR);
RERAISE;
}
ENDTRY;
/*
* Perform exit actions (thread was not canceled).
*/
do_cleanup ((int)arg, ST_FINISHED);
/*
* All thread routines return a value. This program doesn't check the
* value, however.
*/
return arg;
}
/*
* Delay routine
*
* Since the actual tasks that threads do in this program take so little time
* to perform, execute a delay to make it seem like they are taking a long
* time. Also, this will give the user something to query the progress of.
*/
static void
do_delay (int index)
{
static struct timespec interval = {1, 0};
int done; /* Loop exit condition */
int status;
while (TRUE) {
/*
* Decrement the global count, so the main thread can see how much
* progress we've made. Keep decrementing as long as the remaining
* time is greater than zero.
*
* Lock the mutex to ensure no conflict with the main thread that
* might be reading the time remaining while we're decrementing it.
*/
status = pthread_mutex_lock (&thread_data[index].mutex);
check (status, "Mutex_lock");
done = ((thread_data[index].time--) <= 0);
status = pthread_mutex_unlock (&thread_data[index].mutex);
check (status, "Mutex_unlock");
/*
* Quit if the time is up.
*/
if (done) break;
/*
* Wait for one second.
*/
pthread_delay_np (&interval);
}
}
/*
* Print the status of the specified thread.
*/
static void
do_status (int index)
{
int start, end; /* Range of commands queried */
int i; /* Loop index */
int output = FALSE; /* Flag: produced output */
int status;
if ((index < -1) || (index >= THDNUM))
printf ("Bad command number %d.\n", index);
else {
if (index == -1)
start = 0, end = THDNUM;
else
start = index, end = start + 1;
for (i = start; i < end; i++) {
status = pthread_mutex_lock (&thread_data[i].mutex);
check (status, "Mutex_lock");
if (thread_data[i].state != ST_INIT) {
printf ("Command #%d: ", i);
switch (thread_data[i].task) {
case 't':
printf ("\"time\", ");
break;
case 'd':
printf ("\"date\", ");
break;
default:
printf ("[unknown] ");
break;
}
switch (thread_data[i].state) {
case ST_FINISHED:
printf ("completed");
break;
case ST_CANCELED:
printf ("canceled");
break;
case ST_ERROR:
printf ("terminated by error");
break;
case ST_RUNNING:
if (thread_data[i].time < 0)
printf ("waiting to print");
else
printf (
"%d seconds remaining",
thread_data[i].time);
break;
default:
printf ("Bad thread state.\n");
break;
}
printf (".\n");
output = TRUE;
}
status = pthread_mutex_unlock (&thread_data[i].mutex);
check (status, "Mutex_unlock");
}
if (!output) printf ("No such command.\n");
printf ("\n");
}
}
/*
* Thread routine that prints out the date.
*/
static void*
do_time (void* arg)
{
time_t clock_time; /* Julian time */
char *date_str; /* Pointer to string returned from ctime */
char time_str[8]; /* Piece of ctime string */
TRY {
/*
* Pretend that this task actually takes a long time to perform.
*/
do_delay ((int)arg);
clock_time = time ((time_t *)0);
date_str = ctime (&clock_time);
sscanf (date_str, "%*s %*s %*s %s", time_str);
printf ("%d) At the tone the time will be, %s.%c\n\n",
arg,
time_str,
'\007');
}
CATCH (pthread_cancel_e) {
printf ("%d) Canceled.\n", arg);
do_cleanup ((int)arg, ST_CANCELED);
RERAISE;
}
CATCH_ALL {
printf ("%d) ", arg);
pthread_exc_report_np (THIS_CATCH);
do_cleanup ((int)arg, ST_ERROR);
RERAISE;
}
ENDTRY;
/*
* Perform exit actions (thread was not canceled).
*/
do_cleanup ((int)arg, ST_FINISHED);
/*
* All thread routines return a value. This program doesn't check the
* value, however.
*/
return arg;
}
/*
* Wait for the completion of the specified command.
*/
static void
do_wait (int index)
{
int status;
void *value;
if ((index < 0) || (index >= THDNUM))
printf ("Bad command number %d.\n", index);
else {
status = pthread_join (thread_data[index].thread, &value);
check (status, "Pthread_join");
if (value == (void*)index)
printf ("Command %d terminated successfully.\n", index);
else if (value == PTHREAD_CANCELED)
printf ("Command %d was cancelled.\n", index);
else
printf ("Command %d terminated with unexpected value %#lx",
index, value);
}
}
/*
* Find a free server thread to handle the user's request.
*
* If a free thread is found, its index is written at the supplied address
* and the function returns true.
*/
static int
find_free_thread (int *index)
{
int i; /* Loop index */
int found; /* Free thread found */
int retry = FALSE; /* Look again for finished threads */
int status;
do {
/*
* We're about to look for a free thread, so prevent the data state
* from changing while we are looking.
*
* Note that when attempting to lock more than one mutex at a time,
* always lock the mutexes in the same order everywhere in the code.
* The ordering here is the same as in "do_cleanup".
*/
status = pthread_mutex_lock (&free_thread_mutex);
check (status, "Mutex_lock");
/*
* Find a slot that doesn't have a running thread in it.
*
* Before checking, lock the mutex to prevent conflict with the thread
* if it is running.
*/
for (i = 0, found = FALSE; i < THDNUM; i++) {
status = pthread_mutex_lock (&thread_data[i].mutex);
check (status, "Mutex_lock");
found = (thread_data[i].state != ST_RUNNING);
status = pthread_mutex_unlock (&thread_data[i].mutex);
check (status, "Mutex_unlock");
/*
* Now that the mutex is unlocked, break out of the loop if the
* thread is free.
*/
if (found) break;
}
if (found)
retry = FALSE;
else {
retry = get_y_or_n (
"All threads are currently busy, do you want to wait?",
'Y');
if (retry) {
/*
* All threads were busy when we started looking, so clear
* the "free thread" flag.
*/
free_thread = FALSE;
/*
* Now wait until some thread finishes and sets the flag
*/
while (!free_thread)
pthread_cond_wait (&free_thread_cv, &free_thread_mutex);
}
}
pthread_mutex_unlock (&free_thread_mutex);
} while (retry);
if (found) {
/*
* Request DECthreads reclaim its internal storage for this old thread
* before we use the handle to create a new one.
*/
status = pthread_detach (thread_data[i].thread);
check (status, "Pthread_detach");
*index = i;
}
return (found);
}
/*
* Get the next user command.
*
* Synchronize I/O with other threads to prevent conflicts if the stdio
* routines are not thread-safe.
*/
static char *
get_cmd (char *buffer, int size)
{
printf (PROMPT);
return fgets (buffer, size, stdin);
}
/*
* Get a yes or no answer to a query. A "blank" answer uses default answer.
*
* Returns TRUE for "yes" and FALSE for "no".
*/
static int
get_y_or_n (char *query, char defans)
{
char buffer[MAXLINSIZ]; /* User's answer */
int answer; /* Boolean equivalent */
int retry = TRUE; /* Ask again? */
do {
buffer[0] = '\0'; /* Initialize the buffer */
flockfile (stdout);
flockfile (stdin);
printf ("%s [%c] ", query, defans);
fgets (buffer, sizeof (buffer), stdin);
funlockfile (stdin);
funlockfile (stdout);
if (buffer[0] == '\0') buffer[0] = defans; /* Apply default */
switch (buffer[0]) {
case 'y':
case 'Y':
answer = TRUE;
retry = FALSE;
break;
case 'n':
case 'N':
answer = FALSE;
retry = FALSE;
break;
default:
printf ("Please enter \"Y\" or \"N\".\n");
retry = TRUE;
break;
}
} while (retry);
return answer;
}
/*
* Initialization routine;
*
* Called as a one-time initialization action.
*/
static void
init_routine (void)
{
int i;
for (i = 0; i < THDNUM; i++) {
pthread_mutex_init (&thread_data[i].mutex, NULL);
thread_data[i].time = 0;
thread_data[i].task = '\0';
thread_data[i].state = ST_INIT;
}
}
/*
* Print help text.
*/
static void
print_help (void)
{
printf ("Commands are formed by a verb and optional numeric argument.\n");
printf ("The following commands are available:\n");
printf ("\tCancel\t[command]\tCancel running command\n");
printf ("\tDate\t[delay]\t\tPrint the date\n");
printf ("\tHelp\t\t\tPrint this text\n");
printf ("\tQuit\t\t\tQuit (same as EOF)\n");
printf ("\tStatus\t[command]\tReport on running command\n");
printf ("\tTime\t[delay]\t\tPrint the time\n");
printf ("\tWait\t[command]\tWait for command to finish\n");
printf ("\n[command] refers to the command number.\n");;
printf ("[delay] delays command execution for some number of seconds.\n");
printf ("This delay simulates a command task that actually takes some\n");
printf ("period of time to execute. During this delay, commands may be\n");
printf ("initiated, queried, and/or canceled.\n");
}
|
|