1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219
|
size_t num_rx_samps = rx_stream->recv(&buff.front(), buff.size(), md, 3.0, enable_size_map);[\code]
Here are the segments of me trying to push the packets onto a vector:
[code]
std::vector<size_t> processing_queue; //HERE IS WHERE I'M TRYING TO PUSH THE PACKETS ONTO A VECTOR
processing_queue.push_back(num_rx_samps);
[\code]
I previously had my program saving to a .dat file (the commented out lines), but I need to change it to pushing values onto a vector so I can stream the data packets from the ethernet cable to be processed. However, the packets aren't being pushed onto the stack as I suspected. What do I need to change to push the data packets onto a vector?
The following is the code:
[code]
#include <uhd/types/tune_request.hpp>
#include <uhd/utils/thread_priority.hpp>
#include <uhd/utils/safe_main.hpp>
#include <uhd/usrp/multi_usrp.hpp>
#include <uhd/exception.hpp>
#include <boost/program_options.hpp>
#include <boost/format.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <fstream>
#include <csignal>
#include <complex>
namespace po = boost::program_options;
static bool stop_signal_called = false;
void sig_int_handler(int){stop_signal_called = true;}
template<typename samp_type> void recv_to_file(
uhd::usrp::multi_usrp::sptr usrp,
const std::string &cpu_format,
const std::string &wire_format,
const std::string &file,
size_t samps_per_buff,
unsigned long long num_requested_samples,
double time_requested = 0.0,
bool bw_summary = false,
bool stats = false,
bool null = false,
bool enable_size_map = false,
bool continue_on_bad_packet = false
){
unsigned long long num_total_samps = 0;
//create a receive streamer
uhd::stream_args_t stream_args(cpu_format,wire_format);
uhd::rx_streamer::sptr rx_stream = usrp->get_rx_stream(stream_args);
uhd::rx_metadata_t md;
std::vector<samp_type> buff(samps_per_buff);
//std::ofstream outfile;
//if (not null)
// outfile.open(file.c_str(), std::ofstream::binary);
bool overflow_message = true;
//setup streaming
uhd::stream_cmd_t stream_cmd((num_requested_samples == 0)?
uhd::stream_cmd_t::STREAM_MODE_START_CONTINUOUS:
uhd::stream_cmd_t::STREAM_MODE_NUM_SAMPS_AND_DONE
);
stream_cmd.num_samps = num_requested_samples;
stream_cmd.stream_now = true;
stream_cmd.time_spec = uhd::time_spec_t();
rx_stream->issue_stream_cmd(stream_cmd);
boost::system_time start = boost::get_system_time();
unsigned long long ticks_requested = (long)(time_requested * (double)boost::posix_time::time_duration::ticks_per_second());
boost::posix_time::time_duration ticks_diff;
boost::system_time last_update = start;
unsigned long long last_update_samps = 0;
typedef std::map<size_t,size_t> SizeMap;
SizeMap mapSizes;
while(not stop_signal_called and (num_requested_samples != num_total_samps or num_requested_samples == 0)){
boost::system_time now = boost::get_system_time();
size_t num_rx_samps = rx_stream->recv(&buff.front(), buff.size(), md, 3.0, enable_size_map); ///////////////////////
if (md.error_code == uhd::rx_metadata_t::ERROR_CODE_TIMEOUT) {
std::cout << boost::format("Timeout while streaming") << std::endl;
break;
}
if (md.error_code == uhd::rx_metadata_t::ERROR_CODE_OVERFLOW){
if (overflow_message){
overflow_message = false;
std::cerr << boost::format(
"Got an overflow indication. Please consider the following:\n"
" Your write medium must sustain a rate of %fMB/s.\n"
" Dropped samples will not be written to the file.\n"
" Please modify this example for your purposes.\n"
" This message will not appear again.\n"
) % (usrp->get_rx_rate()*sizeof(samp_type)/1e6);
}
continue;
}
if (md.error_code != uhd::rx_metadata_t::ERROR_CODE_NONE){
std::string error = str(boost::format("Receiver error: %s") % md.strerror());
if (continue_on_bad_packet){
std::cerr << error << std::endl;
continue;
}
else
throw std::runtime_error(error);
}
if (enable_size_map){
SizeMap::iterator it = mapSizes.find(num_rx_samps);
if (it == mapSizes.end())
mapSizes[num_rx_samps] = 0;
mapSizes[num_rx_samps] += 1;
}
num_total_samps += num_rx_samps;
//if (outfile.is_open())//////
////outfile.open(file.c_str(), std::ofstream::binary);
// outfile.write((const char*)&buff.front(), num_rx_samps*sizeof(samp_type));
//streaming line //size_t num_rx_samps = rx_stream->recv(&buff.front(), buff.size(), md, 3.0, enable_size_map);
std::vector<size_t> processing_queue;
processing_queue.push_back(num_rx_samps);
//bool processing_done = false;
//if(processing_done ==true){
//pop first element using a queue (see 235 lab)
//}
if (bw_summary){
last_update_samps += num_rx_samps;
boost::posix_time::time_duration update_diff = now - last_update;
if (update_diff.ticks() > boost::posix_time::time_duration::ticks_per_second()) {
double t = (double)update_diff.ticks() / (double)boost::posix_time::time_duration::ticks_per_second();
double r = (double)last_update_samps / t;
std::cout << boost::format("\t%f Msps") % (r/1e6) << std::endl;
last_update_samps = 0;
last_update = now;
}
}
ticks_diff = now - start;
if (ticks_requested > 0){
if ((unsigned long long)ticks_diff.ticks() > ticks_requested)
break;
}
}
//if (outfile.is_open())
// outfile.close();
if (stats){
std::cout << std::endl;
double t = (double)ticks_diff.ticks() / (double)boost::posix_time::time_duration::ticks_per_second();
std::cout << boost::format("Received %d samples in %f seconds") % num_total_samps % t << std::endl;
double r = (double)num_total_samps / t;
std::cout << boost::format("%f Msps") % (r/1e6) << std::endl;
if (enable_size_map) {
std::cout << std::endl;
std::cout << "Packet size map (bytes: count)" << std::endl;
for (SizeMap::iterator it = mapSizes.begin(); it != mapSizes.end(); it++)
std::cout << it->first << ":\t" << it->second << std::endl;
}
}
}
typedef boost::function<uhd::sensor_value_t (const std::string&)> get_sensor_fn_t;
bool check_locked_sensor(std::vector<std::string> sensor_names, const char* sensor_name, get_sensor_fn_t get_sensor_fn, double setup_time){
if (std::find(sensor_names.begin(), sensor_names.end(), sensor_name) == sensor_names.end())
return false;
boost::system_time start = boost::get_system_time();
boost::system_time first_lock_time;
std::cout << boost::format("Waiting for \"%s\": ") % sensor_name;
std::cout.flush();
while (true){
if ((not first_lock_time.is_not_a_date_time()) and
(boost::get_system_time() > (first_lock_time + boost::posix_time::seconds(setup_time))))
{
std::cout << " locked." << std::endl;
break;
}
if (get_sensor_fn(sensor_name).to_bool()){
if (first_lock_time.is_not_a_date_time())
first_lock_time = boost::get_system_time();
std::cout << "+";
std::cout.flush();
}
else{
first_lock_time = boost::system_time(); //reset to 'not a date time'
if (boost::get_system_time() > (start + boost::posix_time::seconds(setup_time))){
std::cout << std::endl;
throw std::runtime_error(str(boost::format("timed out waiting for consecutive locks on sensor \"%s\"") % sensor_name));
}
std::cout << "_";
std::cout.flush();
}
boost::this_thread::sleep(boost::posix_time::milliseconds(100));
}
std::cout << std::endl;
return true;
}
|