-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathtwitterbot.php
164 lines (148 loc) · 4.82 KB
/
twitterbot.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
<?php
defined('TWITTERBOT') or die('Restricted.');
require_once('config.php');
require_once('storage.php');
require_once(ACTIONS . 'aggregator.php');
/**
* This class handles the lion's share of creating the action processes,
* forking them off into daemons, and monitoring them to ensure they
* execute correctly. This class is based heavily off the ServiceRunnerCheck
* class in George Schlossnagle's "Advanced PHP Programming" book (2004), chpt 5.
*
* @author Shannon Quinn
*/
class Twitterbot {
private $actions = array(); // actions we're interested in running
private $current = array(); // any actions currently running (child processes)
private $aggregator; // the phirehose aggregator
private $exit = false; // a flag to indicate when we're exiting
/**
* Initializes the engine.
*/
public function __construct() {
global $actions; // pull in the actions array from the configuration
// first, initialize all the user-specified actions
foreach ($actions as $action) {
require_once(ACTIONS . $action['file']);
$class = new ReflectionClass($action['class']);
if ($class->isInstantiable()) {
$item = $class->newInstance($action['name'], $action['active'],
(isset($action['args']) ? $action['args'] : array()));
if ($item->isActive()) {
$this->actions[] = $item;
}
} else {
die('Twitterbot: ERROR: ' . $action['name'] . ' is not instantiable!');
}
}
$this->aggregator = new DataAggregator(OAUTH_TOKEN, OAUTH_TOKEN_SECRET,
Phirehose::METHOD_SAMPLE);
$this->aggregator->setLang('en'); // Set language to filter specific tweets.
}
/**
* Sorts the actions based on which is to execute next.
* @param Action $a
* @param Action $b
* @return -1 if $a < $b, 1 if $a > $b, 0 if $a == $b
*/
private function nextAttemptSort($a, $b) {
if ($a->getNextAttempt() == $b->getNextAttempt()) {
return 0;
}
return ($a->getNextAttempt() < $b->getNextAttempt() ? -1 : 1);
}
/**
* Returns the next action to fire.
* @return Action The next action to fire.
*/
private function next() {
usort($this->actions, array($this, 'nextAttemptSort'));
return (count($this->actions) > 0 ? $this->actions[0] : null);
}
/**
* This is the main function of this class. This registers any needed
* signal handlers, starts an infinite loop, and fires any events
* as they need to be fired.
*/
public function loop() {
declare(ticks = 1);
// spin off the aggregator
if (($pid = pcntl_fork())) {
$this->current[$pid] = $this->aggregator;
} else {
exit($this->aggregator->consume());
}
// set up signal handlers
pcntl_signal(SIGCHLD, array($this, "sig_child"));
pcntl_signal(SIGTERM, array($this, "sig_kill"));
// now start all the other actions
while (1) {
// do we exit?
if ($this->exit) {
return;
}
// determine the next action that should fire
$now = time();
$action = $this->next();
if ($action == null) {
// in this case, just the aggregator is running,
// so we can in fact safely quit!
$this->exit = true;
continue;
}
if ($now < $action->getNextAttempt()) {
// sleep until the next action has to fire
sleep($action->getNextAttempt() - $now);
continue;
}
$action->setNextAttempt();
if ($pid = pcntl_fork()) {
// parent process
$this->current[$pid] = $action;
} else {
// child process
pcntl_alarm($action->getTimeout());
exit($action->run());
}
}
}
/**
* Signal handler for child processes that have exited via SIGCHLD.
* @param int $signal
*/
private function sig_child($signal) {
$status = Action::FAILURE;
while (($pid = pcntl_wait($status, WNOHANG)) > 0) {
$action = $this->current[$pid];
unset($this->current[$pid]);
if (pcntl_wifexited($status) &&
pcntl_wexitstatus($status) == Action::SUCCESS) {
$status = Action::SUCCESS;
}
if ($action != $this->aggregator) {
$action->post_run($status);
} else {
// the aggregator failed! this is a problem
$db = Storage::getDatabase();
$db->log('Twitterbot', 'Aggregator crashed! Exiting.');
unset($db);
exit;
}
}
}
/**
* Signal handler for SIGTERM and SIGINT, conducts a graceful shutdown if
* the user forcefully quits the parent process for the twitterbot.
* @param int $signal
*/
private function sig_kill($signal) {
// send a kill signal to all the processes still running
foreach ($this->current as $pid => $action) {
// send the SIGTERM signal to the child
posix_kill($pid, SIGTERM);
}
// set the flag to kill the parent process
$this->exit = true;
}
}
?>