Tokenize a Kafka Message

Hi Guys,

I am writing a code that get a message from the Kafka Queue and I would like tokenizer this message to after apply a word counter. Any idea? My code is below.

Thanks Guys

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
 while (run) {

                //std::cout << "ESTOU EM WHILE(RUN)" << std::endl;                                                                                                                                                                              


                RdKafka::Message *msg = consumer->consume(topic, partition, 1000);


                /*switch (msg->err())                                                                                                                                                                                                           
                  {                                                                                                                                                                                                                             
                  case RdKafka::ERR__TIMED_OUT:                                                                                                                                                                                                 
                    break;                                                                                                                                                                                                                      
                                                                                                                                                                                                                                                
                    case RdKafka::ERR_NO_ERROR:*/

                    std::cerr << "Read msg at offset " << msg->offset() << std::endl;

                    printf("%.*s\n",
                           static_cast<int>(msg->len()),
                           static_cast<const char *>(msg->payload()));

                    std::cout << "Estou lendo as messagens agora" << std::endl;
                    sleep(5);
                    /*break;                                                                                                                                                                                                                    
                                                                                                                                                                                                                                                
                  case RdKafka::ERR__PARTITION_EOF:                                                                                                                                                                                             
                    if (exit_eof)                                                                                                                                                                                                               
                      run = false;                                                                                                                                                                                                              
                    break;                                                                                                                                                                                                                      
                                                                                                                                                                                                                                                
                  default:                                                                                                                                                                                                                      
                                                                                                                                                                                                                                                
                    std::cerr << "Consume failed: " << msg->errstr() << std::endl;                                                                                                                                                              
                    run = false;                                                                                                                                                                                                                
                    }*/


                // execute only once, if there is no I/O on the socket                                                                                                                                                                          
                // go back to the caller                                                                                                                                                                                                        
                int retc;

                        // we have retc + offset byte in the storage, process it                                                                                                                                                                
                        int jj, jjword = 0;
                        char *word = NULL;
                        for(jj = 0; jj < retc + tmpstorageoffset; jj++) {
                            if(isspace(tmpstorage[jj])) {
                                tmpstorage[jj] = 0;
                                if(word != NULL) {
                                    // we isolated a word, now match it in the map                                                                                                                                                              
                                    m_word_counted ++;
#define DISPLAY_THROUGHPUT_INFO_EVERY 1000000
                                    if((m_word_counted % DISPLAY_THROUGHPUT_INFO_EVERY) ==
                                       0) {
                                        struct timeval tnow;
                                        gettimeofday(&tnow, NULL);
                                        long deltas = tnow.tv_sec - ts.tv_sec;
                                        long deltau = tnow.tv_usec - ts.tv_usec;
                                        long delta = deltas * 1000000 + deltau;
                                        long double wt = m_word_counted * 1000000.0 / delta;
                                        if((delta - cumtime) >= 60000000) {
                                            std::cout << "\nWordSource LOG FOR PAPER " << wt << " at " << delta / 1000000.0 << "\n";
                                            cumtime += 60000000;
                                        }
                                        std::cout << "WordsSource processed " << wt <<
                                            "word/sec\n";

                                    }
                                    std::shared_ptr<std::unordered_map<std::string, uint32_t>> map =
                                        outWordsMap->map();
                                    std::unordered_map<std::string, uint32_t>::iterator it =
                                        map->find(word);
                                    if(it != map->end()) {
                                        it->second ++;
                                    } else {
                                        map->insert(std::pair<std::string, uint32_t>
                                                    (word, 1)
                                                    );
                                    }
                                    if((m_word_counted % send_limit) == 0) {
                                        std::cout << "Sending map to aggregator\n";
                                        auto outWordsMap_copy = outWordsMap;
                                        if(map->find("xxxxx11111") == map->end())
                                            map->insert(std::pair<std::string, uint32_t>
                                                        ("xxxxx11111", node_id)
                                                        );
                                        send_out_through(move(outWordsMap_copy),
                                                         m_outgate_id);
                                        m_map_sent ++;
                                    }
Topic archived. No new replies allowed.