00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include "config.h"
00022 #include <fcntl.h>
00023 #include <plugin/pool_of_threads/pool_of_threads.h>
00024 #include "drizzled/pthread_globals.h"
00025 #include "drizzled/internal/my_pthread.h"
00026
00027 using namespace std;
00028 using namespace drizzled;
00029
00030
00031 static PoolOfThreadsScheduler *scheduler= NULL;
00032
00036 static volatile bool kill_pool_threads= false;
00037
00038 static volatile uint32_t created_threads= 0;
00039 static int deinit(drizzled::plugin::Registry ®istry);
00040
00041 static struct event session_add_event;
00042 static struct event session_kill_event;
00043
00044
00045 static int session_add_pipe[2];
00046 static int session_kill_pipe[2];
00047
00048
00049 static bool libevent_needs_immediate_processing(Session *session);
00050 static void libevent_connection_close(Session *session);
00051 void libevent_session_add(Session* session);
00052 bool libevent_should_close_connection(Session* session);
00053 extern "C" {
00054 void *libevent_thread_proc(void *arg);
00055 void libevent_io_callback(int Fd, short Operation, void *ctx);
00056 void libevent_add_session_callback(int Fd, short Operation, void *ctx);
00057 void libevent_kill_session_callback(int Fd, short Operation, void *ctx);
00058 }
00059
00060 static uint32_t size= 0;
00061
00068 static bool init_pipe(int pipe_fds[])
00069 {
00070 int flags;
00071 return pipe(pipe_fds) < 0 ||
00072 (flags= fcntl(pipe_fds[0], F_GETFL)) == -1 ||
00073 fcntl(pipe_fds[0], F_SETFL, flags | O_NONBLOCK) == -1 ||
00074 (flags= fcntl(pipe_fds[1], F_GETFL)) == -1 ||
00075 fcntl(pipe_fds[1], F_SETFL, flags | O_NONBLOCK) == -1;
00076 }
00077
00078
00079
00080
00081
00094 void libevent_io_callback(int, short, void *ctx)
00095 {
00096 Session *session= reinterpret_cast<Session*>(ctx);
00097 session_scheduler *sched= static_cast<session_scheduler *>(session->scheduler_arg);
00098 assert(sched);
00099 PoolOfThreadsScheduler *pot_scheduler= static_cast<PoolOfThreadsScheduler *>(session->scheduler);
00100 pot_scheduler->doIO(sched);
00101 }
00102
00103 void PoolOfThreadsScheduler::doIO(session_scheduler *sched)
00104 {
00105 safe_mutex_assert_owner(&LOCK_event_loop);
00106 sessions_waiting_for_io.erase(sched->session);
00107 sessions_need_processing.push(sched->session);
00108 }
00116 void libevent_kill_session_callback(int Fd, short, void *ctx)
00117 {
00118 PoolOfThreadsScheduler *pot_scheduler=
00119 reinterpret_cast<PoolOfThreadsScheduler *>(ctx);
00120
00121 pot_scheduler->killSession(Fd);
00122 }
00123
00124 void PoolOfThreadsScheduler::killSession(int Fd)
00125 {
00126 safe_mutex_assert_owner(&LOCK_event_loop);
00127
00128
00129
00130 char c;
00131 int count= 0;
00132
00133 pthread_mutex_lock(&LOCK_session_kill);
00134 while (! sessions_to_be_killed.empty())
00135 {
00136
00137
00138
00139
00140 Session* session= sessions_to_be_killed.front();
00141 pthread_mutex_unlock(&LOCK_session_kill);
00142
00143 session_scheduler *sched= static_cast<session_scheduler *>(session->scheduler_arg);
00144 assert(sched);
00145
00146
00147
00148
00149 event_del(&sched->io_event);
00150
00151
00152
00153 sessions_waiting_for_io.erase(session);
00154
00155
00156
00157
00158 sessions_need_processing.push(sched->session);
00159
00160 pthread_mutex_lock(&LOCK_session_kill);
00161
00162
00163
00164 sessions_to_be_killed.pop();
00165 }
00166
00167
00168
00169
00170
00171 while (read(Fd, &c, sizeof(c)) == sizeof(c))
00172 {
00173 count++;
00174 }
00175 assert(count == 1);
00176 pthread_mutex_unlock(&LOCK_session_kill);
00177 }
00178
00179
00189 void libevent_add_session_callback(int Fd, short, void *ctx)
00190 {
00191 PoolOfThreadsScheduler *pot_scheduler=
00192 reinterpret_cast<PoolOfThreadsScheduler *>(ctx);
00193 pot_scheduler->addSession(Fd);
00194 }
00195
00196 void PoolOfThreadsScheduler::addSession(int Fd)
00197 {
00198 safe_mutex_assert_owner(&LOCK_event_loop);
00199
00200
00201
00202 char c;
00203 int count= 0;
00204
00205 pthread_mutex_lock(&LOCK_session_add);
00206 while (! sessions_need_adding.empty())
00207 {
00208
00209
00210
00211 Session* session= sessions_need_adding.front();
00212 pthread_mutex_unlock(&LOCK_session_add);
00213
00214 session_scheduler *sched= static_cast<session_scheduler *>(session->scheduler_arg);
00215 assert(sched);
00216
00217
00218 if (!sched->logged_in || libevent_should_close_connection(session))
00219 {
00220
00221
00222
00223
00224 sessions_need_processing.push(sched->session);
00225 }
00226 else
00227 {
00228
00229 if (event_add(&sched->io_event, NULL))
00230 {
00231 errmsg_printf(ERRMSG_LVL_ERROR, _("event_add error in libevent_add_session_callback\n"));
00232 libevent_connection_close(session);
00233 }
00234 else
00235 {
00236 sessions_waiting_for_io.insert(sched->session);
00237 }
00238 }
00239
00240 pthread_mutex_lock(&LOCK_session_add);
00241
00242
00243
00244 sessions_need_adding.pop();
00245 }
00246
00247
00248
00249
00250
00251 while (read(Fd, &c, sizeof(c)) == sizeof(c))
00252 {
00253 count++;
00254 }
00255 assert(count == 1);
00256 pthread_mutex_unlock(&LOCK_session_add);
00257 }
00258
00263 static void libevent_connection_close(Session *session)
00264 {
00265 session_scheduler *sched= (session_scheduler *)session->scheduler_arg;
00266 assert(sched);
00267 session->killed= Session::KILL_CONNECTION;
00268
00269 if (session->client->getFileDescriptor() >= 0)
00270 {
00271 session->disconnect(0, true);
00272 }
00273 sched->thread_detach();
00274
00275 delete sched;
00276 session->scheduler_arg= NULL;
00277
00278 Session::unlink(session);
00279
00280 return;
00281 }
00282
00283
00291 bool libevent_should_close_connection(Session* session)
00292 {
00293 return session->client->haveError() ||
00294 session->killed == Session::KILL_CONNECTION;
00295 }
00296
00297
00304 void *libevent_thread_proc(void *ctx)
00305 {
00306 if (my_thread_init())
00307 {
00308 my_thread_global_end();
00309 errmsg_printf(ERRMSG_LVL_ERROR, _("libevent_thread_proc: my_thread_init() failed\n"));
00310 exit(1);
00311 }
00312
00313 PoolOfThreadsScheduler *pot_scheduler=
00314 reinterpret_cast<PoolOfThreadsScheduler *>(ctx);
00315 return pot_scheduler->mainLoop();
00316 }
00317
00318 void *PoolOfThreadsScheduler::mainLoop()
00319 {
00320
00321
00322
00323
00324 (void) pthread_mutex_lock(&LOCK_thread_count);
00325 created_threads++;
00326 if (created_threads == size)
00327 (void) pthread_cond_signal(&COND_thread_count);
00328 (void) pthread_mutex_unlock(&LOCK_thread_count);
00329
00330 for (;;)
00331 {
00332 Session *session= NULL;
00333 (void) pthread_mutex_lock(&LOCK_event_loop);
00334
00335
00336 while (sessions_need_processing.empty())
00337 {
00338 if (kill_pool_threads)
00339 {
00340
00341 (void) pthread_mutex_unlock(&LOCK_event_loop);
00342 goto thread_exit;
00343 }
00344 event_loop(EVLOOP_ONCE);
00345 }
00346
00347
00348 session= sessions_need_processing.front();
00349 sessions_need_processing.pop();
00350 session_scheduler *sched= (session_scheduler *)session->scheduler_arg;
00351
00352 (void) pthread_mutex_unlock(&LOCK_event_loop);
00353
00354
00355
00356
00357 session->thread_stack= (char*) &session;
00358
00359 if (sched->thread_attach())
00360 {
00361 libevent_connection_close(session);
00362 continue;
00363 }
00364
00365
00366 if (!sched->logged_in)
00367 {
00368 if (session->authenticate())
00369 {
00370
00371 libevent_connection_close(session);
00372 continue;
00373 }
00374 else
00375 {
00376
00377 sched->logged_in= true;
00378 session->prepareForQueries();
00379 if (!libevent_needs_immediate_processing(session))
00380 continue;
00381 }
00382 }
00383
00384 do
00385 {
00386
00387 if (! session->executeStatement())
00388 {
00389 libevent_connection_close(session);
00390 break;
00391 }
00392 } while (libevent_needs_immediate_processing(session));
00393
00394 if (kill_pool_threads)
00395 goto thread_exit;
00396 }
00397
00398 thread_exit:
00399 (void) pthread_mutex_lock(&LOCK_thread_count);
00400 created_threads--;
00401 pthread_cond_broadcast(&COND_thread_count);
00402 (void) pthread_mutex_unlock(&LOCK_thread_count);
00403 my_thread_end();
00404 pthread_exit(0);
00405
00406 return NULL;
00407 }
00408
00409
00419 static bool libevent_needs_immediate_processing(Session *session)
00420 {
00421 session_scheduler *sched= (session_scheduler *)session->scheduler_arg;
00422
00423 if (libevent_should_close_connection(session))
00424 {
00425 libevent_connection_close(session);
00426 return false;
00427 }
00428
00429
00430
00431
00432
00433
00434
00435
00436 if (session->client->haveMoreData())
00437 return true;
00438
00439 sched->thread_detach();
00440 libevent_session_add(session);
00441
00442 return false;
00443 }
00444
00445
00456 void libevent_session_add(Session* session)
00457 {
00458 session_scheduler *sched= (session_scheduler *)session->scheduler_arg;
00459 assert(sched);
00460 PoolOfThreadsScheduler *pot_scheduler=
00461 static_cast<PoolOfThreadsScheduler *>(session->scheduler);
00462 pot_scheduler->sessionAddToQueue(sched);
00463 }
00464
00465 void PoolOfThreadsScheduler::sessionAddToQueue(session_scheduler *sched)
00466 {
00467 char c= 0;
00468 pthread_mutex_lock(&LOCK_session_add);
00469 if (sessions_need_adding.empty())
00470 {
00471
00472 size_t written= write(session_add_pipe[1], &c, sizeof(c));
00473 assert(written == sizeof(c));
00474 }
00475
00476 sessions_need_adding.push(sched->session);
00477 pthread_mutex_unlock(&LOCK_session_add);
00478 }
00479
00480
00481 PoolOfThreadsScheduler::PoolOfThreadsScheduler(const char *name_arg)
00482 : Scheduler(name_arg), sessions_need_adding(), sessions_to_be_killed(),
00483 sessions_need_processing(), sessions_waiting_for_io()
00484 {
00485 struct sched_param tmp_sched_param;
00486
00487 memset(&tmp_sched_param, 0, sizeof(struct sched_param));
00488
00489 (void) pthread_attr_init(&attr);
00490 (void) pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
00491 pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
00492
00493 tmp_sched_param.sched_priority= WAIT_PRIOR;
00494 (void) pthread_attr_setschedparam(&attr, &tmp_sched_param);
00495
00496 pthread_mutex_init(&LOCK_session_add, NULL);
00497 pthread_mutex_init(&LOCK_session_kill, NULL);
00498 pthread_mutex_init(&LOCK_event_loop, NULL);
00499
00500 }
00501
00502
00503 PoolOfThreadsScheduler::~PoolOfThreadsScheduler()
00504 {
00505 (void) pthread_mutex_lock(&LOCK_thread_count);
00506
00507 kill_pool_threads= true;
00508 while (created_threads)
00509 {
00510
00511
00512
00513 char c= 0;
00514 size_t written= write(session_add_pipe[1], &c, sizeof(c));
00515 assert(written == sizeof(c));
00516
00517 pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
00518 }
00519 (void) pthread_mutex_unlock(&LOCK_thread_count);
00520
00521 event_del(&session_add_event);
00522 close(session_add_pipe[0]);
00523 close(session_add_pipe[1]);
00524 event_del(&session_kill_event);
00525 close(session_kill_pipe[0]);
00526 close(session_kill_pipe[1]);
00527
00528 (void) pthread_mutex_destroy(&LOCK_event_loop);
00529 (void) pthread_mutex_destroy(&LOCK_session_add);
00530 (void) pthread_mutex_destroy(&LOCK_session_kill);
00531 (void) pthread_attr_destroy(&attr);
00532 }
00533
00534
00535 bool PoolOfThreadsScheduler::addSession(Session *session)
00536 {
00537 assert(session->scheduler_arg == NULL);
00538 session_scheduler *sched= new session_scheduler(session);
00539
00540 if (sched == NULL)
00541 return true;
00542
00543 session->scheduler_arg= (void *)sched;
00544
00545 libevent_session_add(session);
00546
00547 return false;
00548 }
00549
00550
00551 void PoolOfThreadsScheduler::killSession(Session *session)
00552 {
00553 char c= 0;
00554
00555 pthread_mutex_lock(&LOCK_session_kill);
00556
00557 if (sessions_to_be_killed.empty())
00558 {
00559
00560
00561
00562
00563 size_t written= write(session_kill_pipe[1], &c, sizeof(c));
00564 assert(written == sizeof(c));
00565 }
00566
00567
00568
00569
00570 sessions_to_be_killed.push(session);
00571 pthread_mutex_unlock(&LOCK_session_kill);
00572 }
00573
00574
00575 bool PoolOfThreadsScheduler::libevent_init(void)
00576 {
00577 uint32_t x;
00578
00579 event_init();
00580
00581
00582
00583 if (init_pipe(session_add_pipe))
00584 {
00585 errmsg_printf(ERRMSG_LVL_ERROR,
00586 _("init_pipe(session_add_pipe) error in libevent_init\n"));
00587 return true;
00588 }
00589
00590 if (init_pipe(session_kill_pipe))
00591 {
00592 errmsg_printf(ERRMSG_LVL_ERROR,
00593 _("init_pipe(session_kill_pipe) error in libevent_init\n"));
00594 close(session_add_pipe[0]);
00595 close(session_add_pipe[1]);
00596 return true;
00597 }
00598 event_set(&session_add_event, session_add_pipe[0], EV_READ|EV_PERSIST,
00599 libevent_add_session_callback, this);
00600 event_set(&session_kill_event, session_kill_pipe[0], EV_READ|EV_PERSIST,
00601 libevent_kill_session_callback, this);
00602
00603 if (event_add(&session_add_event, NULL) || event_add(&session_kill_event, NULL))
00604 {
00605 errmsg_printf(ERRMSG_LVL_ERROR, _("session_add_event event_add error in libevent_init\n"));
00606 return true;
00607
00608 }
00609
00610 pthread_mutex_lock(&LOCK_thread_count);
00611
00612 for (x= 0; x < size; x++)
00613 {
00614 pthread_t thread;
00615 int error;
00616 if ((error= pthread_create(&thread, &attr, libevent_thread_proc, this)))
00617 {
00618 errmsg_printf(ERRMSG_LVL_ERROR, _("Can't create completion port thread (error %d)"),
00619 error);
00620 pthread_mutex_unlock(&LOCK_thread_count);
00621 return true;
00622 }
00623 }
00624
00625
00626 while (created_threads != size)
00627 pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
00628 pthread_mutex_unlock(&LOCK_thread_count);
00629
00630 return false;
00631 }
00632
00633
00640 static int init(drizzled::plugin::Registry ®istry)
00641 {
00642 assert(size != 0);
00643
00644 scheduler= new PoolOfThreadsScheduler("pool_of_threads");
00645 registry.add(scheduler);
00646
00647 return 0;
00648 }
00649
00654 static int deinit(drizzled::plugin::Registry ®istry)
00655 {
00656 registry.remove(scheduler);
00657 delete scheduler;
00658
00659 return 0;
00660 }
00661
00662
00663
00664
00665
00666 static DRIZZLE_SYSVAR_UINT(size, size,
00667 PLUGIN_VAR_RQCMDARG,
00668 N_("Size of Pool."),
00669 NULL, NULL, 8, 1, 1024, 0);
00670
00671 static drizzle_sys_var* system_variables[]= {
00672 DRIZZLE_SYSVAR(size),
00673 NULL,
00674 };
00675
00676 DRIZZLE_DECLARE_PLUGIN
00677 {
00678 DRIZZLE_VERSION_ID,
00679 "pool_of_threads",
00680 "0.1",
00681 "Brian Aker",
00682 "Pool of Threads Scheduler",
00683 PLUGIN_LICENSE_GPL,
00684 init,
00685 deinit,
00686 NULL,
00687 system_variables,
00688 NULL
00689 }
00690 DRIZZLE_DECLARE_PLUGIN_END;