1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
|
while (run) {
//std::cout << "ESTOU EM WHILE(RUN)" << std::endl;
RdKafka::Message *msg = consumer->consume(topic, partition, 1000);
/*switch (msg->err())
{
case RdKafka::ERR__TIMED_OUT:
break;
case RdKafka::ERR_NO_ERROR:*/
std::cerr << "Read msg at offset " << msg->offset() << std::endl;
printf("%.*s\n",
static_cast<int>(msg->len()),
static_cast<const char *>(msg->payload()));
std::cout << "Estou lendo as messagens agora" << std::endl;
sleep(5);
/*break;
case RdKafka::ERR__PARTITION_EOF:
if (exit_eof)
run = false;
break;
default:
std::cerr << "Consume failed: " << msg->errstr() << std::endl;
run = false;
}*/
// execute only once, if there is no I/O on the socket
// go back to the caller
int retc;
// we have retc + offset byte in the storage, process it
int jj, jjword = 0;
char *word = NULL;
for(jj = 0; jj < retc + tmpstorageoffset; jj++) {
if(isspace(tmpstorage[jj])) {
tmpstorage[jj] = 0;
if(word != NULL) {
// we isolated a word, now match it in the map
m_word_counted ++;
#define DISPLAY_THROUGHPUT_INFO_EVERY 1000000
if((m_word_counted % DISPLAY_THROUGHPUT_INFO_EVERY) ==
0) {
struct timeval tnow;
gettimeofday(&tnow, NULL);
long deltas = tnow.tv_sec - ts.tv_sec;
long deltau = tnow.tv_usec - ts.tv_usec;
long delta = deltas * 1000000 + deltau;
long double wt = m_word_counted * 1000000.0 / delta;
if((delta - cumtime) >= 60000000) {
std::cout << "\nWordSource LOG FOR PAPER " << wt << " at " << delta / 1000000.0 << "\n";
cumtime += 60000000;
}
std::cout << "WordsSource processed " << wt <<
"word/sec\n";
}
std::shared_ptr<std::unordered_map<std::string, uint32_t>> map =
outWordsMap->map();
std::unordered_map<std::string, uint32_t>::iterator it =
map->find(word);
if(it != map->end()) {
it->second ++;
} else {
map->insert(std::pair<std::string, uint32_t>
(word, 1)
);
}
if((m_word_counted % send_limit) == 0) {
std::cout << "Sending map to aggregator\n";
auto outWordsMap_copy = outWordsMap;
if(map->find("xxxxx11111") == map->end())
map->insert(std::pair<std::string, uint32_t>
("xxxxx11111", node_id)
);
send_out_through(move(outWordsMap_copy),
m_outgate_id);
m_map_sent ++;
}
|