pool_of_threads.cc

00001 /* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
00002  * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
00003  *
00004  * Copyright (C) 2006 MySQL AB
00005  * Copyright (C) 2009 Sun Microsystems
00006  *
00007  * This program is free software; you can redistribute it and/or modify
00008  * it under the terms of the GNU General Public License as published by
00009  * the Free Software Foundation; version 2 of the License.
00010  *
00011  * This program is distributed in the hope that it will be useful,
00012  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00013  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00014  * GNU General Public License for more details.
00015  *
00016  * You should have received a copy of the GNU General Public License
00017  * along with this program; if not, write to the Free Software
00018  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
00019  */
00020 
00021 #include "config.h"
00022 #include <fcntl.h>
00023 #include <plugin/pool_of_threads/pool_of_threads.h>
00024 #include "drizzled/pthread_globals.h"
00025 #include "drizzled/internal/my_pthread.h"
00026 
00027 using namespace std;
00028 using namespace drizzled;
00029 
00030 /* Global's (TBR) */
00031 static PoolOfThreadsScheduler *scheduler= NULL;
00032 
00036 static volatile bool kill_pool_threads= false;
00037 
00038 static volatile uint32_t created_threads= 0;
00039 static int deinit(drizzled::plugin::Registry &registry);
00040 
00041 static struct event session_add_event;
00042 static struct event session_kill_event;
00043 
00044 
00045 static int session_add_pipe[2]; /* pipe to signal add a connection to libevent*/
00046 static int session_kill_pipe[2]; /* pipe to signal kill a connection in libevent */
00047 
00048 
00049 static bool libevent_needs_immediate_processing(Session *session);
00050 static void libevent_connection_close(Session *session);
00051 void libevent_session_add(Session* session);
00052 bool libevent_should_close_connection(Session* session);
00053 extern "C" {
00054   void *libevent_thread_proc(void *arg);
00055   void libevent_io_callback(int Fd, short Operation, void *ctx);
00056   void libevent_add_session_callback(int Fd, short Operation, void *ctx);
00057   void libevent_kill_session_callback(int Fd, short Operation, void *ctx);
00058 }
00059 
00060 static uint32_t size= 0;
00061 
00068 static bool init_pipe(int pipe_fds[])
00069 {
00070   int flags;
00071   return pipe(pipe_fds) < 0 ||
00072           (flags= fcntl(pipe_fds[0], F_GETFL)) == -1 ||
00073           fcntl(pipe_fds[0], F_SETFL, flags | O_NONBLOCK) == -1 ||
00074           (flags= fcntl(pipe_fds[1], F_GETFL)) == -1 ||
00075           fcntl(pipe_fds[1], F_SETFL, flags | O_NONBLOCK) == -1;
00076 }
00077 
00078 
00079 
00080 
00081 
00094 void libevent_io_callback(int, short, void *ctx)
00095 {
00096   Session *session= reinterpret_cast<Session*>(ctx);
00097   session_scheduler *sched= static_cast<session_scheduler *>(session->scheduler_arg);
00098   assert(sched);
00099   PoolOfThreadsScheduler *pot_scheduler= static_cast<PoolOfThreadsScheduler *>(session->scheduler);
00100   pot_scheduler->doIO(sched);
00101 }
00102 
00103 void PoolOfThreadsScheduler::doIO(session_scheduler *sched)
00104 {
00105   safe_mutex_assert_owner(&LOCK_event_loop);
00106   sessions_waiting_for_io.erase(sched->session);
00107   sessions_need_processing.push(sched->session);
00108 }
00116 void libevent_kill_session_callback(int Fd, short, void *ctx)
00117 {
00118   PoolOfThreadsScheduler *pot_scheduler=
00119     reinterpret_cast<PoolOfThreadsScheduler *>(ctx);
00120 
00121   pot_scheduler->killSession(Fd);
00122 }
00123 
00124 void PoolOfThreadsScheduler::killSession(int Fd)
00125 {
00126   safe_mutex_assert_owner(&LOCK_event_loop);
00127   /*
00128    For pending events clearing
00129   */
00130   char c;
00131   int count= 0;
00132 
00133   pthread_mutex_lock(&LOCK_session_kill);
00134   while (! sessions_to_be_killed.empty())
00135   {
00136 
00137     /*
00138      Fetch a session from the queue
00139     */
00140     Session* session= sessions_to_be_killed.front();
00141     pthread_mutex_unlock(&LOCK_session_kill);
00142 
00143     session_scheduler *sched= static_cast<session_scheduler *>(session->scheduler_arg);
00144     assert(sched);
00145 
00146     /*
00147      Delete from libevent and add to the processing queue.
00148     */
00149     event_del(&sched->io_event);
00150     /*
00151      Remove from the sessions_waiting_for_io set
00152     */
00153     sessions_waiting_for_io.erase(session);
00154     /*
00155      Push into the sessions_need_processing; the kill action will be
00156      performed out of the event loop
00157     */
00158     sessions_need_processing.push(sched->session);
00159 
00160     pthread_mutex_lock(&LOCK_session_kill);
00161     /*
00162      Pop until this session is already processed
00163     */
00164     sessions_to_be_killed.pop();
00165   }
00166   
00167   /*
00168    Clear the pending events 
00169    One and only one charactor should be in the pipe
00170   */
00171   while (read(Fd, &c, sizeof(c)) == sizeof(c))
00172   {
00173     count++;
00174   }
00175   assert(count == 1);
00176   pthread_mutex_unlock(&LOCK_session_kill);
00177 }
00178 
00179 
00189 void libevent_add_session_callback(int Fd, short, void *ctx)
00190 {
00191   PoolOfThreadsScheduler *pot_scheduler=
00192     reinterpret_cast<PoolOfThreadsScheduler *>(ctx);
00193   pot_scheduler->addSession(Fd);
00194 }
00195 
00196 void PoolOfThreadsScheduler::addSession(int Fd)
00197 {
00198   safe_mutex_assert_owner(&LOCK_event_loop);
00199   /*
00200    For pending events clearing
00201   */
00202   char c;
00203   int count= 0;
00204 
00205   pthread_mutex_lock(&LOCK_session_add);
00206   while (! sessions_need_adding.empty())
00207   {
00208     /*
00209      Pop the first session off the queue 
00210     */
00211     Session* session= sessions_need_adding.front();
00212     pthread_mutex_unlock(&LOCK_session_add);
00213 
00214     session_scheduler *sched= static_cast<session_scheduler *>(session->scheduler_arg);
00215     assert(sched);
00216 
00217 
00218     if (!sched->logged_in || libevent_should_close_connection(session))
00219     {
00220       /*
00221        Add session to sessions_need_processing queue. If it needs closing
00222        we'll close it outside of event_loop().
00223       */
00224       sessions_need_processing.push(sched->session);
00225     }
00226     else
00227     {
00228       /* Add to libevent */
00229       if (event_add(&sched->io_event, NULL))
00230       {
00231         errmsg_printf(ERRMSG_LVL_ERROR, _("event_add error in libevent_add_session_callback\n"));
00232         libevent_connection_close(session);
00233       }
00234       else
00235       {
00236         sessions_waiting_for_io.insert(sched->session);
00237       }
00238     }
00239 
00240     pthread_mutex_lock(&LOCK_session_add);
00241     /*
00242      Pop until this session is already processed
00243     */
00244     sessions_need_adding.pop();
00245   }
00246 
00247   /*
00248    Clear the pending events 
00249    One and only one charactor should be in the pipe
00250   */
00251   while (read(Fd, &c, sizeof(c)) == sizeof(c))
00252   {
00253     count++;
00254   }
00255   assert(count == 1);
00256   pthread_mutex_unlock(&LOCK_session_add);
00257 }
00258 
00263 static void libevent_connection_close(Session *session)
00264 {
00265   session_scheduler *sched= (session_scheduler *)session->scheduler_arg;
00266   assert(sched);
00267   session->killed= Session::KILL_CONNECTION;    /* Avoid error messages */
00268 
00269   if (session->client->getFileDescriptor() >= 0) /* not already closed */
00270   {
00271     session->disconnect(0, true);
00272   }
00273   sched->thread_detach();
00274   
00275   delete sched;
00276   session->scheduler_arg= NULL;
00277 
00278   Session::unlink(session);   /* locks LOCK_thread_count and deletes session */
00279 
00280   return;
00281 }
00282 
00283 
00291 bool libevent_should_close_connection(Session* session)
00292 {
00293   return session->client->haveError() ||
00294          session->killed == Session::KILL_CONNECTION;
00295 }
00296 
00297 
00304 void *libevent_thread_proc(void *ctx)
00305 {
00306   if (my_thread_init())
00307   {
00308     my_thread_global_end();
00309     errmsg_printf(ERRMSG_LVL_ERROR, _("libevent_thread_proc: my_thread_init() failed\n"));
00310     exit(1);
00311   }
00312 
00313   PoolOfThreadsScheduler *pot_scheduler=
00314     reinterpret_cast<PoolOfThreadsScheduler *>(ctx);
00315   return pot_scheduler->mainLoop();
00316 }
00317 
00318 void *PoolOfThreadsScheduler::mainLoop()
00319 {
00320   /*
00321    Signal libevent_init() when all threads has been created and are ready
00322    to receive events.
00323   */
00324   (void) pthread_mutex_lock(&LOCK_thread_count);
00325   created_threads++;
00326   if (created_threads == size)
00327     (void) pthread_cond_signal(&COND_thread_count);
00328   (void) pthread_mutex_unlock(&LOCK_thread_count);
00329 
00330   for (;;)
00331   {
00332     Session *session= NULL;
00333     (void) pthread_mutex_lock(&LOCK_event_loop);
00334 
00335     /* get session(s) to process */
00336     while (sessions_need_processing.empty())
00337     {
00338       if (kill_pool_threads)
00339       {
00340         /* the flag that we should die has been set */
00341         (void) pthread_mutex_unlock(&LOCK_event_loop);
00342         goto thread_exit;
00343       }
00344       event_loop(EVLOOP_ONCE);
00345     }
00346 
00347     /* pop the first session off the queue */
00348     session= sessions_need_processing.front();
00349     sessions_need_processing.pop();
00350     session_scheduler *sched= (session_scheduler *)session->scheduler_arg;
00351 
00352     (void) pthread_mutex_unlock(&LOCK_event_loop);
00353 
00354     /* now we process the connection (session) */
00355 
00356     /* set up the session<->thread links. */
00357     session->thread_stack= (char*) &session;
00358 
00359     if (sched->thread_attach())
00360     {
00361       libevent_connection_close(session);
00362       continue;
00363     }
00364 
00365     /* is the connection logged in yet? */
00366     if (!sched->logged_in)
00367     {
00368       if (session->authenticate())
00369       {
00370         /* Failed to log in */
00371         libevent_connection_close(session);
00372         continue;
00373       }
00374       else
00375       {
00376         /* login successful */
00377         sched->logged_in= true;
00378         session->prepareForQueries();
00379         if (!libevent_needs_immediate_processing(session))
00380           continue; /* New connection is now waiting for data in libevent*/
00381       }
00382     }
00383 
00384     do
00385     {
00386       /* Process a query */
00387       if (! session->executeStatement())
00388       {
00389         libevent_connection_close(session);
00390         break;
00391       }
00392     } while (libevent_needs_immediate_processing(session));
00393 
00394     if (kill_pool_threads) /* the flag that we should die has been set */
00395       goto thread_exit;
00396   }
00397 
00398 thread_exit:
00399   (void) pthread_mutex_lock(&LOCK_thread_count);
00400   created_threads--;
00401   pthread_cond_broadcast(&COND_thread_count);
00402   (void) pthread_mutex_unlock(&LOCK_thread_count);
00403   my_thread_end();
00404   pthread_exit(0);
00405 
00406   return NULL;                               /* purify: deadcode */
00407 }
00408 
00409 
00419 static bool libevent_needs_immediate_processing(Session *session)
00420 {
00421   session_scheduler *sched= (session_scheduler *)session->scheduler_arg;
00422 
00423   if (libevent_should_close_connection(session))
00424   {
00425     libevent_connection_close(session);
00426     return false;
00427   }
00428   /*
00429    If more data in the socket buffer, return true to process another command.
00430   
00431    Note: we cannot add for event processing because the whole request
00432    might already be buffered and we wouldn't receive an event. This is
00433    indeed the root of the reason of low performace. Need to be changed
00434    when nonblocking Protocol is finished.
00435   */
00436   if (session->client->haveMoreData())
00437     return true;
00438 
00439   sched->thread_detach();
00440   libevent_session_add(session);
00441 
00442   return false;
00443 }
00444 
00445 
00456 void libevent_session_add(Session* session)
00457 {
00458   session_scheduler *sched= (session_scheduler *)session->scheduler_arg;
00459   assert(sched);
00460   PoolOfThreadsScheduler *pot_scheduler=
00461     static_cast<PoolOfThreadsScheduler *>(session->scheduler);
00462   pot_scheduler->sessionAddToQueue(sched);
00463 }
00464 
00465 void PoolOfThreadsScheduler::sessionAddToQueue(session_scheduler *sched)
00466 {
00467   char c= 0;
00468   pthread_mutex_lock(&LOCK_session_add);
00469   if (sessions_need_adding.empty())
00470   {
00471     /* notify libevent */
00472     size_t written= write(session_add_pipe[1], &c, sizeof(c));
00473     assert(written == sizeof(c));
00474   }
00475   /* queue for libevent */
00476   sessions_need_adding.push(sched->session);
00477   pthread_mutex_unlock(&LOCK_session_add);
00478 }
00479 
00480 
00481 PoolOfThreadsScheduler::PoolOfThreadsScheduler(const char *name_arg)
00482   : Scheduler(name_arg), sessions_need_adding(), sessions_to_be_killed(),
00483     sessions_need_processing(), sessions_waiting_for_io()
00484 {
00485   struct sched_param tmp_sched_param;
00486 
00487   memset(&tmp_sched_param, 0, sizeof(struct sched_param));
00488   /* Setup attribute parameter for session threads. */
00489   (void) pthread_attr_init(&attr);
00490   (void) pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
00491   pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
00492 
00493   tmp_sched_param.sched_priority= WAIT_PRIOR;
00494   (void) pthread_attr_setschedparam(&attr, &tmp_sched_param);
00495 
00496   pthread_mutex_init(&LOCK_session_add, NULL);
00497   pthread_mutex_init(&LOCK_session_kill, NULL);
00498   pthread_mutex_init(&LOCK_event_loop, NULL);
00499 
00500 }
00501 
00502 
00503 PoolOfThreadsScheduler::~PoolOfThreadsScheduler()
00504 {
00505   (void) pthread_mutex_lock(&LOCK_thread_count);
00506 
00507   kill_pool_threads= true;
00508   while (created_threads)
00509   {
00510     /*
00511      * Wake up the event loop
00512      */
00513     char c= 0;
00514     size_t written= write(session_add_pipe[1], &c, sizeof(c));
00515     assert(written == sizeof(c));
00516 
00517     pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
00518   }
00519   (void) pthread_mutex_unlock(&LOCK_thread_count);
00520 
00521   event_del(&session_add_event);
00522   close(session_add_pipe[0]);
00523   close(session_add_pipe[1]);
00524   event_del(&session_kill_event);
00525   close(session_kill_pipe[0]);
00526   close(session_kill_pipe[1]);
00527 
00528   (void) pthread_mutex_destroy(&LOCK_event_loop);
00529   (void) pthread_mutex_destroy(&LOCK_session_add);
00530   (void) pthread_mutex_destroy(&LOCK_session_kill);
00531   (void) pthread_attr_destroy(&attr);
00532 }
00533 
00534 
00535 bool PoolOfThreadsScheduler::addSession(Session *session)
00536 {
00537   assert(session->scheduler_arg == NULL);
00538   session_scheduler *sched= new session_scheduler(session);
00539 
00540   if (sched == NULL)
00541     return true;
00542 
00543   session->scheduler_arg= (void *)sched;
00544 
00545   libevent_session_add(session);
00546 
00547   return false;
00548 }
00549 
00550 
00551 void PoolOfThreadsScheduler::killSession(Session *session)
00552 {
00553   char c= 0;
00554 
00555   pthread_mutex_lock(&LOCK_session_kill);
00556 
00557   if (sessions_to_be_killed.empty())
00558   {
00559     /* 
00560       Notify libevent with the killing event if this's the first killing
00561       notification of the batch
00562     */
00563     size_t written= write(session_kill_pipe[1], &c, sizeof(c));
00564     assert(written == sizeof(c));
00565   }
00566 
00567   /*
00568     Push into the sessions_to_be_killed queue
00569   */
00570   sessions_to_be_killed.push(session);
00571   pthread_mutex_unlock(&LOCK_session_kill);
00572 }
00573 
00574 
00575 bool PoolOfThreadsScheduler::libevent_init(void)
00576 {
00577   uint32_t x;
00578 
00579   event_init();
00580 
00581 
00582   /* Set up the pipe used to add new sessions to the event pool */
00583   if (init_pipe(session_add_pipe))
00584   {
00585     errmsg_printf(ERRMSG_LVL_ERROR,
00586                   _("init_pipe(session_add_pipe) error in libevent_init\n"));
00587     return true;
00588   }
00589   /* Set up the pipe used to kill sessions in the event queue */
00590   if (init_pipe(session_kill_pipe))
00591   {
00592     errmsg_printf(ERRMSG_LVL_ERROR,
00593                   _("init_pipe(session_kill_pipe) error in libevent_init\n"));
00594     close(session_add_pipe[0]);
00595     close(session_add_pipe[1]);
00596     return true;
00597   }
00598   event_set(&session_add_event, session_add_pipe[0], EV_READ|EV_PERSIST,
00599             libevent_add_session_callback, this);
00600   event_set(&session_kill_event, session_kill_pipe[0], EV_READ|EV_PERSIST,
00601             libevent_kill_session_callback, this);
00602 
00603   if (event_add(&session_add_event, NULL) || event_add(&session_kill_event, NULL))
00604   {
00605     errmsg_printf(ERRMSG_LVL_ERROR, _("session_add_event event_add error in libevent_init\n"));
00606     return true;
00607 
00608   }
00609   /* Set up the thread pool */
00610   pthread_mutex_lock(&LOCK_thread_count);
00611 
00612   for (x= 0; x < size; x++)
00613   {
00614     pthread_t thread;
00615     int error;
00616     if ((error= pthread_create(&thread, &attr, libevent_thread_proc, this)))
00617     {
00618       errmsg_printf(ERRMSG_LVL_ERROR, _("Can't create completion port thread (error %d)"),
00619                     error);
00620       pthread_mutex_unlock(&LOCK_thread_count);
00621       return true;
00622     }
00623   }
00624 
00625   /* Wait until all threads are created */
00626   while (created_threads != size)
00627     pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
00628   pthread_mutex_unlock(&LOCK_thread_count);
00629 
00630   return false;
00631 }
00632 
00633 
00640 static int init(drizzled::plugin::Registry &registry)
00641 {
00642   assert(size != 0);
00643 
00644   scheduler= new PoolOfThreadsScheduler("pool_of_threads");
00645   registry.add(scheduler);
00646 
00647   return 0;
00648 }
00649 
00654 static int deinit(drizzled::plugin::Registry &registry)
00655 {
00656   registry.remove(scheduler);
00657   delete scheduler;
00658 
00659   return 0;
00660 }
00661 
00662 /*
00663  The defaults here were picked based on what I see (aka Brian). They should
00664  be vetted across a larger audience.
00665 */
00666 static DRIZZLE_SYSVAR_UINT(size, size,
00667                            PLUGIN_VAR_RQCMDARG,
00668                            N_("Size of Pool."),
00669                            NULL, NULL, 8, 1, 1024, 0);
00670 
00671 static drizzle_sys_var* system_variables[]= {
00672   DRIZZLE_SYSVAR(size),
00673   NULL,
00674 };
00675 
00676 DRIZZLE_DECLARE_PLUGIN
00677 {
00678   DRIZZLE_VERSION_ID,
00679   "pool_of_threads",
00680   "0.1",
00681   "Brian Aker",
00682   "Pool of Threads Scheduler",
00683   PLUGIN_LICENSE_GPL,
00684   init, /* Plugin Init */
00685   deinit, /* Plugin Deinit */
00686   NULL,   /* status variables */
00687   system_variables,   /* system variables */
00688   NULL    /* config options */
00689 }
00690 DRIZZLE_DECLARE_PLUGIN_END;

Generated on Tue Jan 5 10:49:17 2010 for drizzle by  doxygen 1.5.8