-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathNomConsumeAll.php
executable file
·112 lines (74 loc) · 2.53 KB
/
NomConsumeAll.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#!/usr/bin/php
<?php
global $CDB;
require_once("CouchDB.php");
require_once("Hippy.php");
require_once("parse_function.php");
$CBOPTS['host'] = "127.0.0.1";
$CBOPTS['port'] = 5984;
$EXCHANGE_NAME = "vqmonitor";
$CDB = new CouchDB($CBOPTS); // See if we can make a connection
$CDB->send("PUT","/vqmon");
$CDB->send("PUT","/vqr");
function nomnom( $envelope, $queue ) {
global $CDB;
echo __FILE__." ".__FUNCTION__." consuming...\n";
$body = $envelope->getBody();
$exbody = explode("\n",$body);
$cdata = array();
foreach( $exbody as $key => $line ) {
if( strlen(trim($line)) == 0 ) continue;
if( $key == 0 ) {
$cdata['sip_header'] = $line;
} else {
$colon = strpos($line,":");
$nkey = substr($line,0,$colon);
$nval = trim(substr($line,$colon+1));
$cdata[$nkey] = $nval;
}
}
echo "Got {$cdata['sip_header']}\n";
$CDB->send("POST","/vqmon/",json_encode($cdata));
echo "Sent to couch vqmon\n";
$pd = parse_sip_vqr($cdata);
if( $pd != false ) {
$CDB->send("POST","/vqr/",json_encode($pd));
echo "Sent to couch vqr\n";
$duration = strtotime($pd['STOP']) - strtotime($pd['START']);
$msg = "Duration: <b>$duration</b> MOSLQ=<b>{$pd['QualityEst']['MOSLQ']}</b> MOSCQ=<b>{$pd['QualityEst']['MOSCQ']}</b> CallID=<b>{$pd['CallID']}</b> From=<b>{$pd['From']['sip_address']}</b> To=<b>{$pd['To']['sip_address']}</b> <a href='http://localhost:5984/vqr/_design/generic/_view/by_callid?key=%22{$pd['CallID']}%22'>vqr link</a>";
//if( $pd['QualityEst']['MOSLQ'] < 4.3 || $pd['QualityEst']['MOSCQ'] < 4.4 ) {
if( $pd['QualityEst']['MOSLQ'] < 4.3 && $duration > 10 ) {
Hippy::add("Alert ".$msg);
Hippy::go();
} else {
echo strip_tags($msg)."\n";
}
} else {
$CDB->log("received invalid data!!!!!!!!!!!!!!!!!!!!!!!!!!");
}
//$CDB->log($resp);
$queue->ack($envelope->getDeliveryTag());
return(true);
}
$cnn = new AMQPConnection();
$cnn->setHost('127.0.0.1');
Hippy::speak('NomNom Consumer Started '.basename(__FILE__));
if ($cnn->connect()) {
echo "Established a connection to AMQP\n";
Hippy::add("Connected to AMQP $EXCHANGE_NAME exchange.");
$ch = new AMQPChannel($cnn);
$ex = new AMQPExchange($ch);
$ex->setName($EXCHANGE_NAME);
$ex->setType(AMQP_EX_TYPE_FANOUT);
$ex->setFlags(AMQP_DURABLE);
$ex->declare();
$q = new AMQPQueue($ch);
$q->declare();
$q->bind($EXCHANGE_NAME,'*'); //USE * FOR ROUTING KEY SO YOU GET EVERYTHING bind("EXCHANGE NAME","ROUTING KEY")
Hippy::go();
$q->consume('nomnom');
} else {
Hippy::add("Cannot connect to AMQP $EXCHANGE_NAME exchange.");
Hippy::go();
}
?>