diff --git a/hardware/cis/gvideo.h b/hardware/cis/gvideo.h index 972a85e..bd7d9ee 100644 --- a/hardware/cis/gvideo.h +++ b/hardware/cis/gvideo.h @@ -42,7 +42,7 @@ protected: size_t length; }; - const int v4l_buffer_count = 5; + const int v4l_buffer_count = 6; int buf_size_ = 0; int v4l_width = 3100; int v4l_height = 3100; diff --git a/hardware/hardware.cpp b/hardware/hardware.cpp index ebc3d2b..0bae714 100644 --- a/hardware/hardware.cpp +++ b/hardware/hardware.cpp @@ -373,6 +373,8 @@ void scanner_hw::thread_image_capture(bool paper_ready) std::function img_callback(img_handler_); utils::to_log(LOG_LEVEL_DEBUG, "scanning thread working ...\n"); + utils::to_log(LOG_LEVEL_DEBUG, "set capture thread to CPU %d = %d\n", CPU_MAJOR_0, utils::set_cpu_affinity(CPU_MAJOR_0)); + avail_mem.enable_wait_log(false); motor_->clear_error(); if(paper_ready) diff --git a/imgproc/algs/auto_crop.cpp b/imgproc/algs/auto_crop.cpp index fa06a08..8f57f89 100644 --- a/imgproc/algs/auto_crop.cpp +++ b/imgproc/algs/auto_crop.cpp @@ -55,8 +55,11 @@ static void myWarpAffine(cv::InputArray _src, cv::OutputArray _dst, cv::InputArr // #define OPTION_FUNC(name) auto name = [this](void* val) -> void -auto_crop::auto_crop() : image_processor("auto_crop") +auto_crop::auto_crop(bool weaker) : image_processor("auto_crop") { + if(weaker) + return; + init(); ADD_THIS_JSON(); @@ -321,6 +324,26 @@ int auto_crop::set_value(const char* name/*nullptr for all options*/, void* val/ return ret; } +image_processor* auto_crop::copy_weaker(void) +{ + auto_crop *weaker = new auto_crop(true); + + weaker->pos_ = pos_; + weaker->enabled_ = enabled_; + + weaker->crop_ = crop_; + weaker->deskew_ = deskew_; + weaker->fill_bg_ = fill_bg_; + weaker->convex_ = convex_; + weaker->fill_clr_ = fill_clr_; + weaker->threshold_ = threshold_; + weaker->indent_ = indent_; + weaker->noise_ = noise_; + weaker->fixed_paper_ = fixed_paper_; + weaker->lateral_ = lateral_; + + return weaker; +} int auto_crop::process(std::vector& in, std::vector& out) { int ret = SCANNER_ERR_OK; diff --git a/imgproc/algs/auto_crop.h b/imgproc/algs/auto_crop.h index 9272c93..2e799a6 100644 --- a/imgproc/algs/auto_crop.h +++ b/imgproc/algs/auto_crop.h @@ -26,7 +26,7 @@ class auto_crop : public image_processor int work(PROCIMGINFO& in, PROCIMGINFO& out); public: - auto_crop(); + auto_crop(bool weaker = false); protected: ~auto_crop(); @@ -35,5 +35,6 @@ public: virtual int set_value(const char* name/*nullptr for all options*/, void* val/*nullptr for restore*/) override; public: + virtual image_processor* copy_weaker(void) override; virtual int process(std::vector& in, std::vector& out) override; }; diff --git a/imgproc/algs/color_correct.cpp b/imgproc/algs/color_correct.cpp index e163a79..8b11288 100644 --- a/imgproc/algs/color_correct.cpp +++ b/imgproc/algs/color_correct.cpp @@ -21,8 +21,11 @@ static std::string device_opt_json[] = { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // -color_correct::color_correct() : image_processor("color_correct") +color_correct::color_correct(bool weaker) : image_processor("color_correct") { + if(weaker) + return; + ADD_THIS_JSON(); hg::initLut(lut_path_.c_str(), clr_); } @@ -57,6 +60,17 @@ int color_correct::set_value(const char* name/*nullptr for all options*/, void* return ret; } +image_processor* color_correct::copy_weaker(void) +{ + color_correct *weaker = new color_correct(true); + + weaker->pos_ = pos_; + weaker->enabled_ = enabled_; + weaker->correct_ = correct_; + weaker->clr_ = clr_; + + return weaker; +} int color_correct::process(std::vector& in, std::vector& out) { int ret = SCANNER_ERR_OK; diff --git a/imgproc/algs/color_correct.h b/imgproc/algs/color_correct.h index d9d30c9..0fb7217 100644 --- a/imgproc/algs/color_correct.h +++ b/imgproc/algs/color_correct.h @@ -12,7 +12,7 @@ class color_correct : public image_processor std::string lut_path_ = "/usr/local/huago/Textlut200clr.bmp"; public: - color_correct(); + color_correct(bool weaker = false); protected: ~color_correct(); @@ -21,5 +21,6 @@ public: virtual int set_value(const char* name/*nullptr for all options*/, void* val/*nullptr for restore*/) override; public: + virtual image_processor* copy_weaker(void) override; virtual int process(std::vector& in, std::vector& out) override; }; diff --git a/imgproc/algs/image_encoder.cpp b/imgproc/algs/image_encoder.cpp index f63db26..9ea7eee 100644 --- a/imgproc/algs/image_encoder.cpp +++ b/imgproc/algs/image_encoder.cpp @@ -12,8 +12,11 @@ static std::string device_opt_json[] = { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // img_encoder -img_encoder::img_encoder() : image_processor("img_encoder") +img_encoder::img_encoder(bool weaker) : image_processor("img_encoder") { + if(weaker) + return; + ADD_THIS_JSON(); param_.push_back(cv::IMWRITE_JPEG_QUALITY); @@ -62,6 +65,19 @@ int img_encoder::set_value(const char* name, void* val) return ret; } +image_processor* img_encoder::copy_weaker(void) +{ + img_encoder *weaker = new img_encoder(true); + + weaker->pos_ = pos_; + weaker->enabled_ = enabled_; + + weaker->fmt_ = fmt_; + weaker->jpeg_quality_ = jpeg_quality_; + weaker->param_ = param_; + + return weaker; +} int img_encoder::process(std::vector& in, std::vector& out) { for(auto& v: in) diff --git a/imgproc/algs/image_encoder.h b/imgproc/algs/image_encoder.h index ae0f024..24c1090 100644 --- a/imgproc/algs/image_encoder.h +++ b/imgproc/algs/image_encoder.h @@ -14,7 +14,7 @@ class img_encoder : public image_processor std::vector param_; public: - img_encoder(); + img_encoder(bool weaker = false); protected: ~img_encoder(); @@ -23,6 +23,7 @@ public: virtual int set_value(const char* name/*nullptr for all options*/, void* val/*nullptr for restore*/) override; public: + virtual image_processor* copy_weaker(void) override; virtual int process(std::vector& in, std::vector& out) override; public: diff --git a/imgproc/algs/rebuild.cpp b/imgproc/algs/rebuild.cpp index 641c3da..8232570 100644 --- a/imgproc/algs/rebuild.cpp +++ b/imgproc/algs/rebuild.cpp @@ -11,8 +11,11 @@ static std::string device_opt_json[] = { "{\"rebuild\":{\"cat\":\"imgp\",\"group\":\"imgp\",\"title\":\"CIS\\u56fe\\u50cf\\u8fd8\\u539f\",\"desc\":\"\\u5c06\\u4eceCIS\\u8f93\\u51fa\\u7684\\u539f\\u59cb\\u6570\\u636e\\u6d41\\uff0c\\u8fd8\\u539f\\u4e3aBMP\\u56fe\\u7247\\uff0c\\u5e76\\u62c6\\u5206\\u6b63\\u53cd\\u9762\",\"type\":\"bool\",\"pos\":10,\"ui-pos\":1,\"auth\":0,\"visible\":0,\"size\":4,\"auto\":false,\"cur\":true,\"default\":true}}" }; -rebuild::rebuild() : image_processor("rebuild") +rebuild::rebuild(bool weaker) : image_processor("rebuild") { + if(weaker) + return; + ADD_THIS_JSON(); } rebuild::~rebuild() @@ -34,6 +37,17 @@ void rebuild::enable(const char* name, bool able) enabled_ = able; } +image_processor* rebuild::copy_weaker(void) +{ + rebuild *weaker = new rebuild(true); + + weaker->pos_ = pos_; + weaker->enabled_ = enabled_; + + weaker->rebuild_ = rebuild_; + + return weaker; +} int rebuild::process(std::vector& in, std::vector& out) { int ret = SCANNER_ERR_OK; @@ -98,8 +112,6 @@ void rebuild::do_rebuild(LPPACKIMAGE info, uint8_t* stream, std::vectorpos.paper_ind, info->width, info->height - , o.info.width, o.info.height, dstf, dstb, size); for(int h = 0; h < info->height; ++h) { for(int w = 0; w < o.info.width; ++w) diff --git a/imgproc/algs/rebuild.h b/imgproc/algs/rebuild.h index 23c0107..d14f758 100644 --- a/imgproc/algs/rebuild.h +++ b/imgproc/algs/rebuild.h @@ -10,7 +10,7 @@ class rebuild : public image_processor bool rebuild_ = true; public: - rebuild(); + rebuild(bool weaker = false); protected: ~rebuild(); @@ -20,6 +20,7 @@ public: virtual void enable(const char* name, bool able) override; public: + virtual image_processor* copy_weaker(void) override; virtual int process(std::vector& in, std::vector& out) override; public: diff --git a/imgproc/algs/stretch.cpp b/imgproc/algs/stretch.cpp index 16ea5de..315b961 100644 --- a/imgproc/algs/stretch.cpp +++ b/imgproc/algs/stretch.cpp @@ -17,8 +17,11 @@ static std::string device_opt_json[] = { ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // -stretch::stretch() : image_processor("stretch") +stretch::stretch(bool weaker) : image_processor("stretch") { + if(weaker) + return; + ADD_THIS_JSON(); } stretch::~stretch() @@ -35,6 +38,17 @@ int stretch::set_value(const char* name/*nullptr for all options*/, void* val/*n return ret; } +image_processor* stretch::copy_weaker(void) +{ + stretch *weaker = new stretch(true); + + weaker->pos_ = pos_; + weaker->enabled_ = enabled_; + + weaker->dpi_ = dpi_; + + return weaker; +} int stretch::process(std::vector& in, std::vector& out) { for(auto& v: in) diff --git a/imgproc/algs/stretch.h b/imgproc/algs/stretch.h index 4490f5a..595f504 100644 --- a/imgproc/algs/stretch.h +++ b/imgproc/algs/stretch.h @@ -10,7 +10,7 @@ class stretch : public image_processor uint32_t dpi_ = 200; public: - stretch(); + stretch(bool weaker = false); protected: ~stretch(); @@ -19,5 +19,6 @@ public: virtual int set_value(const char* name/*nullptr for all options*/, void* val/*nullptr for restore*/) override; public: + virtual image_processor* copy_weaker(void) override; virtual int process(std::vector& in, std::vector& out) override; }; diff --git a/imgproc/imgprc_mgr.cpp b/imgproc/imgprc_mgr.cpp index 768c75f..92be0b9 100644 --- a/imgproc/imgprc_mgr.cpp +++ b/imgproc/imgprc_mgr.cpp @@ -6,11 +6,11 @@ #include #include "./algs/rebuild.h" +#include "./algs/image_encoder.h" #include "./algs/stretch.h" #include "./algs/auto_crop.h" #include "./algs/color_correct.h" #include "./algs/ImageProcess_Public.h" -#include "./algs/image_encoder.h" @@ -31,25 +31,14 @@ static std::string device_opt_json[] = { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // imgproc_mgr -imgproc_mgr::imgproc_mgr(std::function sender - , device_option* devopts - , CHK_RES_FUNC res +imgproc_mgr::imgproc_mgr(device_option* devopts, void(*sender)(SENDER_PROTO), void* sp + , bool(*res)(RES_CHK_PROTO), void* rp ) - : img_sender_(sender), opts_(devopts), prc_que_("prcimg") - , res_(res) + : opts_(devopts), img_sender_(sender), sender_param_(sp) + , res_(res), res_param_(rp) { - prc_que_.enable_wait_log(false); ADD_THIS_JSON(); - if(!res_) - { - DECL_CHK_RES_FUNC(&, r) - { - return true; - }; - res_ = r; - } - if (opts_) opts_->add_ref(); else @@ -60,27 +49,6 @@ imgproc_mgr::imgproc_mgr(std::function sender opts_->add(first_.get()); opts_->add(last_.get()); - - auto thrd = [&](void) -> void - { - thread_worker(); - }; - auto restart = [this](const char* thread_name) ->void - { - auto thrd = [this](void) -> void - { - thread_worker(); - }; - workers_.stop(thread_name); - add_busy_worker(-1); - printf("\nrestart imgproc_mgr::thread_worker\n\n"); - workers_.start(thrd, SIZE_MB(0), "imgproc_mgr::thread_worker", (void*)&imgproc_mgr::thread_worker); - }; - workers_.set_exception_handler(restart); - workers_.start(thrd, SIZE_MB(0), "thread_worker1", (void*)&imgproc_mgr::thread_worker); - workers_.start(thrd, SIZE_MB(0), "thread_worker2", (void*)&imgproc_mgr::thread_worker); - workers_.start(thrd, SIZE_MB(0), "thread_worker3", (void*)&imgproc_mgr::thread_worker); - workers_.start(thrd, SIZE_MB(0), "thread_worker4", (void*)&imgproc_mgr::thread_worker); } imgproc_mgr::~imgproc_mgr() { @@ -88,6 +56,31 @@ imgproc_mgr::~imgproc_mgr() opts_->release(); } +void imgproc_mgr::PAGEDPARAM::free(void) +{ + for(auto& p: mean.processors) + { + if(!p) + break; + p->release(); + } + memset(mean.processors, 0, sizeof(mean.processors)); + if(mean.que) + delete mean.que; + mean.que = nullptr; + if(mean.rebld) + mean.rebld->release(); + mean.rebld = nullptr; + if(mean.encoder) + mean.encoder->release(); + mean.encoder = nullptr; + + mean.res = nullptr; + mean.dump = nullptr; + mean.sender = nullptr; + mean.res_param = mean.dump_param = mean.sender_param = nullptr; +} + bool imgproc_mgr::sort_processor_by_pos(image_processor* l, image_processor* r) { return l->get_position() < r->get_position(); @@ -104,115 +97,6 @@ data_source_ptr imgproc_mgr::scan_finished_packet(uint32_t scanid, uint32_t err) return reply; } - -uint32_t imgproc_mgr::add_busy_worker(int inc) -{ - SIMPLE_LOCK(working_cnt_lock_); - working_cnt_ += inc; - - return working_cnt_; -} -void imgproc_mgr::thread_worker(void) -{ - RAWIMG img; - while(run_) - { - while(!res_(TASK_IMG_PROCESSOR, true, 3000)) - { - if(!run_) - break; - } - if(!run_) - break; - if(prc_que_.take(img, true)) - { - add_busy_worker(); - - // try - // { - process(&img); - // } - // catch(const std::exception& e) - // { - // printf("exception occurs when process paper %d: %s!\n", img.info.pos.paper_ind, e.what()); - // } - - add_busy_worker(-1); - } - } -} -void imgproc_mgr::process(RAWIMG* img) -{ - if(img->img) - { - std::vector in, out, *src = &in, *dst = &out, *swp = nullptr; - chronograph watch; - auto realdump = [this](std::vector* arr, LPPACKIMAGE info, cv::Mat* mat, char* infoex, size_t infexl, bool last) -> void - { - if(arr) - send_image(*arr, last); - else - send_image(info, *mat, infoex, infexl, last); - }; - auto emptydump = [this](std::vector* arr, LPPACKIMAGE info, cv::Mat* mat, char* infoex, size_t infexl, bool last) -> void - {}; - std::function* arr, LPPACKIMAGE info, cv::Mat* mat, char* infoex, size_t infexl, bool last)> dump = realdump; - - if(dump_img_) - { - cv::Mat mat(img->info.width, img->info.height, CV_8UC1, img->data->ptr()); - send_image(&img->info, mat, nullptr, 0, false); - } - else - dump = emptydump; - - if(do_rebuild_) - { - first_->do_rebuild(&img->info, img->data->ptr(), in); - utils::to_log(LOG_LEVEL_ALL, "Rebuild paper %d spend %llu milliseconds.\n", img->info.pos.paper_ind, watch.elapse_ms()); - dump(&in, nullptr, nullptr, nullptr, 0, false); - } - else - { - PROCIMGINFO i; - i.info = img->info; - i.img = cv::Mat(img->info.width, img->info.height, CV_8UC1, img->data->ptr()); - in.push_back(i); - } - img->data->release(); - - for(auto& v: processors_) - { - if(v->is_enable()) - { - process(v, src, dst); - src->clear(); - swp = src; - src = dst; - dst = swp; - dump(src, nullptr, nullptr, nullptr, 0, false); - } - } - - send_image(*src, true); - } - else - { - data_source_ptr ptr = imgproc_mgr::scan_finished_packet(scan_id_, img->info.data_size); - uint32_t wait = 0, que = 0; - - ptr->set_session_id(session_id_); - while((que = add_busy_worker(0)) > 1) - { - if(wait++ == 0) - utils::to_log(LOG_LEVEL_DEBUG, "Received scan completed event while processing %u paper(s), wait ...\n", que - 1); - - std::this_thread::sleep_for(std::chrono::milliseconds(5)); - } - img_sender_(ptr); - ptr->release(); - } -} void imgproc_mgr::process(image_processor* prc, std::vector* in, std::vector* out) { try @@ -232,18 +116,10 @@ void imgproc_mgr::process(image_processor* prc, std::vector* in, st throw(exception_ex(msg.c_str())); } } -void imgproc_mgr::send_image(LPPACKIMAGE head, cv::Mat& mat, void* info, size_t info_l, bool last) +void imgproc_mgr::send_image(LPPAGEDPARAM obj, LPPACKIMAGE head, cv::Mat& mat, void* info, size_t info_l, bool last) { - auto ovr = [&](uint64_t total, uint64_t cur_size, uint32_t err, void* user_data) -> int - { - if(total == FINAL_NOTIFY && cur_size == FINAL_NOTIFY) - printf("~%p\n", user_data); - - return 0; - }; - PACKIMAGE h(*head); - std::shared_ptr> compd(last_->encode(&h, mat)); + std::shared_ptr> compd(obj->mean.encoder->encode(&h, mat)); image_packet_ptr ptr = nullptr; if(last) @@ -255,13 +131,209 @@ void imgproc_mgr::send_image(LPPACKIMAGE head, cv::Mat& mat, void* info, size_t h.prc_stage = head->prc_stage; h.prc_time = head->prc_time; } - ptr = new image_packet(&h, compd, scan_id_, info, info_l); + ptr = new image_packet(&h, compd, obj->mean.scan_id, info, info_l); - ptr->set_session_id(session_id_); - img_sender_(ptr); + ptr->set_session_id(obj->mean.session); + obj->mean.sender(ptr, obj->mean.sender_param); ptr->release(); } -void imgproc_mgr::send_image(std::vector& imgs, bool last) +void imgproc_mgr::real_dump_image(DUMP_PROTO) +{ + if(arr) + { + for(auto& v: *arr) + imgproc_mgr::send_image(obj, &v.info, v.img, &v.ext_info[0], v.ext_info.length(), last); + } + else + { + imgproc_mgr::send_image(obj, info, *mat, infoex, infexl, last); + } +} +void imgproc_mgr::empty_dump_image(DUMP_PROTO) +{} + +void imgproc_mgr::start_workers(int cnt) +{ + workers_.stop(nullptr); + + for(auto& v: params_) + v->free(); + + auto thrd = [this](void* param) -> void + { + thread_worker(param); + }; + auto restart = [this](const char* thread_name, void* param) ->void + { + auto thrd = [this](void* param) -> void + { + thread_worker(param); + }; + workers_.stop(thread_name); + add_busy_worker(-1); + printf("\nrestart imgproc_mgr::thread_worker\n\n"); + workers_.start(thrd, param, SIZE_MB(0), "imgproc_mgr::thread_worker", (void*)&imgproc_mgr::thread_worker); + }; + workers_.set_exception_handler(restart); + + for(int i = 0; i < cnt; ++i) + { + LPPAGEDPARAM param = nullptr; + if(i < params_.size()) + param = params_[i]; + else + { + param = new PAGEDPARAM(); + params_.push_back(param); + } + + param->mean.run = run_; + param->mean.ind = i; + param->mean.que = new safe_fifo("prcimg"); + param->mean.que->enable_wait_log(false); + param->mean.scan_id = scan_id_; + param->mean.session = session_id_; + param->mean.rebld = do_rebuild_ ? dynamic_cast(first_->copy_weaker()) : nullptr; + param->mean.encoder = dynamic_cast(last_->copy_weaker()); + param->mean.dumpi = dump_img_; + if(param->mean.dumpi) + { + param->mean.dump = &imgproc_mgr::real_dump_image; + param->mean.dump_param = this; + } + else + { + param->mean.dump = &imgproc_mgr::empty_dump_image; + param->mean.dump_param = nullptr; + } + param->mean.sender = img_sender_; + param->mean.sender_param = sender_param_; + + int ind = 0; + for(auto& v: processors_) + { + if(ind >= _countof(param->mean.processors)) + break; + + image_processor *prc = v->copy_weaker(); + param->mean.processors[ind++] = prc; + param->mean.processors[ind] = nullptr; + } + char n[40] = {0}; + + sprintf(n, "thread_worker%d", i + 1); + workers_.start(thrd, param, SIZE_MB(0), n, (void*)&imgproc_mgr::thread_worker); + } +} +uint32_t imgproc_mgr::add_busy_worker(int inc) +{ + SIMPLE_LOCK(working_cnt_lock_); + working_cnt_ += inc; + + return working_cnt_; +} +void imgproc_mgr::thread_worker(void* param) +{ + LPPAGEDPARAM para = (LPPAGEDPARAM)param; + RAWIMG img; + int cpu = (para->mean.ind % (CPU_CORES - CPU_MAJOR_CNT)) + CPU_MINOR_0; + + utils::to_log(LOG_LEVEL_DEBUG, "set image process thread %d to CPU %d = %d\n" + , para->mean.ind, cpu, utils::set_cpu_affinity(cpu)); + + add_busy_worker(); + while(para->mean.run) + { + if(para->mean.res) + { + while(!para->mean.res(TASK_IMG_PROCESSOR, true, 3000, para->mean.res_param)) + { + if(!para->mean.run) + break; + } + if(!para->mean.run) + break; + } + if(para->mean.que->take(img, true)) + { + if(img.img && !img.data) + break; + process(&img, para); + } + } + add_busy_worker(-1); +} +void imgproc_mgr::process(RAWIMG* img, LPPAGEDPARAM param) +{ + if(img->img) + { + std::vector in, out, *src = &in, *dst = &out, *swp = nullptr; + chronograph watch; + + if(param->mean.dumpi) + { + cv::Mat mat(img->info.width, img->info.height, CV_8UC1, img->data->ptr()); + send_image(param, &img->info, mat, nullptr, 0, false); + } + + if(param->mean.rebld) + { + param->mean.rebld->do_rebuild(&img->info, img->data->ptr(), in); + utils::to_log(LOG_LEVEL_ALL, "Rebuild paper %d spend %llu milliseconds.\n", img->info.pos.paper_ind, watch.elapse_ms()); + param->mean.dump(param, &in, nullptr, nullptr, nullptr, 0, false, param->mean.dump_param); + } + else + { + PROCIMGINFO i; + i.info = img->info; + i.img = cv::Mat(img->info.width, img->info.height, CV_8UC1, img->data->ptr()); + in.push_back(i); + } + img->data->release(); + + for(auto& v: param->mean.processors) + { + if(!v) + { + break; + } + if(v->is_enable()) + { + process(v, src, dst); + src->clear(); + swp = src; + src = dst; + dst = swp; + param->mean.dump(param, src, nullptr, nullptr, nullptr, 0, false, param->mean.dump_param); + } + } + + send_image(param, *src, true); + } + else + { + data_source_ptr ptr = imgproc_mgr::scan_finished_packet(scan_id_, img->info.data_size); + uint32_t wait = 0, que = 0; + RAWIMG over; + + over.data = nullptr; + over.img = true; + for(auto& v: params_) + v->mean.que->save(over, true); + + ptr->set_session_id(session_id_); + while((que = add_busy_worker(0)) > 1) + { + if(wait++ == 0) + utils::to_log(LOG_LEVEL_DEBUG, "Received scan completed event while processing %u paper(s), wait ...\n", que - 1); + + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + } + img_sender_(ptr, sender_param_); + ptr->release(); + } +} +void imgproc_mgr::send_image(LPPAGEDPARAM obj, std::vector& imgs, bool last) { // if(last && imgs.size()) // { @@ -278,10 +350,10 @@ void imgproc_mgr::send_image(std::vector& imgs, bool last) // } for(auto& v: imgs) - send_image(&v.info, v.img, v.ext_info.empty() ? nullptr : &v.ext_info[0], v.ext_info.length(), last); + imgproc_mgr::send_image(obj, &v.info, v.img, v.ext_info.empty() ? nullptr : &v.ext_info[0], v.ext_info.length(), last); - if(last) - sent_ind_++; + // if(last) + // sent_ind_++; } int imgproc_mgr::set_value(const char* name, void* val) @@ -294,25 +366,12 @@ int imgproc_mgr::set_value(const char* name, void* val) do_rebuild_ = *(bool*)val; else if(strcmp(name, SANE_OPT_NAME(CIS_STRETCH)) == 0) { - if(*(bool*)val) + for(auto& v: processors_) { - if(stretcher_) + if(strcmp(v->from(), name) == 0) { - processors_.push_back(stretcher_); - stretcher_ = nullptr; - std::sort(processors_.begin(), processors_.end(), &imgproc_mgr::sort_processor_by_pos); - } - } - else - { - for(size_t i = 0; i < processors_.size(); ++i) - { - if(strcmp(processors_[i]->from(), "stretch") == 0) - { - stretcher_ = processors_[i]; - processors_.erase(processors_.begin() + i); - break; - } + v->enable(name, *(bool*)val); + break; } } } @@ -342,9 +401,10 @@ int imgproc_mgr::clear(void) for (auto& v : processors_) v->release(); processors_.clear(); - if(stretcher_) - stretcher_->release(); - stretcher_ = nullptr; + + for(auto& v: params_) + delete v; + params_.clear(); return 0; } @@ -353,6 +413,7 @@ int imgproc_mgr::process(LPPACKIMAGE info, dyn_mem_ptr data, bool img) { RAWIMG ri; int ret = SCANNER_ERR_OK; + int ind = put_ind_ % working_cnt_; ri.data = data; if(img) @@ -362,7 +423,8 @@ int imgproc_mgr::process(LPPACKIMAGE info, dyn_mem_ptr data, bool img) else ri.info.data_size = (uint32_t)(long)info; ri.img = img; - prc_que_.save(ri, true); + params_[ind]->mean.que->save(ri, true); + ++put_ind_; return ret; } @@ -370,7 +432,12 @@ int imgproc_mgr::process(LPPACKIMAGE info, dyn_mem_ptr data, bool img) void imgproc_mgr::stop(void) { run_ = false; - prc_que_.trigger(); + for(auto& v: params_) + { + v->mean.run = false; + if(v->mean.que) + v->mean.que->trigger(); + } workers_.stop(nullptr); } @@ -385,4 +452,6 @@ void imgproc_mgr::start_new_turn(uint32_t scanid, uint32_t sessionid) scan_id_ = scanid; sent_ind_ = 1; session_id_ = sessionid; + put_ind_ = 0; + start_workers(3); } diff --git a/imgproc/imgprc_mgr.h b/imgproc/imgprc_mgr.h index e6f1e4f..ec7056d 100644 --- a/imgproc/imgprc_mgr.h +++ b/imgproc/imgprc_mgr.h @@ -13,24 +13,67 @@ #include #include -typedef std::shared_ptr> dcptr; +#define DUMP_PROTO union _page_thrd_data_4k* obj, std::vector* arr, LPPACKIMAGE info, cv::Mat* mat, char* infoex, size_t infexl, bool last, void* param class device_option; -class rebuild; class img_encoder; +class rebuild; + class imgproc_mgr : public sane_opt_provider { typedef struct _raw_img { PACKIMAGE info; - dyn_mem_ptr data; + dyn_mem_ptr data; // nullptr and img is 'true' to exit worker bool img; }RAWIMG; + // allocate a page to store the vars for every worker thread + typedef union _page_thrd_data_4k + { + struct + { + bool run; + bool dumpi; + int ind; + int session; + int scan_id; + rebuild* rebld; + img_encoder* encoder; + safe_fifo *que; + image_processor* processors[20]; + bool(*res)(RES_CHK_PROTO); + void *res_param; + void(*dump)(DUMP_PROTO); + void *dump_param; + void(*sender)(SENDER_PROTO); + void *sender_param; + }mean; + uint8_t page[SIZE_KB(4)]; + + _page_thrd_data_4k() + { + mean.rebld = nullptr; + mean.encoder = nullptr; + mean.que = nullptr; + memset(mean.processors, 0, sizeof(mean.processors)); + + mean.res = nullptr; + mean.dump = nullptr; + mean.sender = nullptr; + mean.res_param = mean.dump_param = mean.sender_param = nullptr; + } + ~_page_thrd_data_4k() + { + this->free(); + } + void free(void); + }PAGEDPARAM, *LPPAGEDPARAM; + std::vector params_; + refer_guard first_; refer_guard last_; - image_processor* stretcher_ = nullptr; bool do_rebuild_ = true; volatile bool run_ = true; volatile uint32_t sent_ind_ = 1; @@ -39,27 +82,34 @@ class imgproc_mgr : public sane_opt_provider uint32_t session_id_ = 0; MUTEX working_cnt_lock_; uint32_t working_cnt_ = 0; + uint32_t put_ind_ = 0; std::vector processors_; device_option* opts_; safe_thread workers_; - safe_fifo prc_que_; - CHK_RES_FUNC res_ = CHK_RES_FUNC(); - std::function img_sender_; + bool(*res_)(RES_CHK_PROTO) = nullptr; + void* res_param_ = nullptr; + + void(*img_sender_)(SENDER_PROTO) = nullptr; + void* sender_param_ = nullptr; static bool sort_processor_by_pos(image_processor* l, image_processor* r); static bool sort_image_packet(image_packet_ptr l, image_packet_ptr r); static data_source_ptr scan_finished_packet(uint32_t scanid, uint32_t err = 0); + static void process(image_processor* prc, std::vector* in, std::vector* out); + static void send_image(LPPAGEDPARAM obj, LPPACKIMAGE head, cv::Mat& mat, void* info = nullptr, size_t info_l = 0, bool last = true); + static void real_dump_image(DUMP_PROTO); + static void empty_dump_image(DUMP_PROTO); + void start_workers(int cnt); uint32_t add_busy_worker(int inc = 1); - void thread_worker(void); - void process(RAWIMG* img); - void process(image_processor* prc, std::vector* in, std::vector* out); - void send_image(LPPACKIMAGE head, cv::Mat& mat, void* info = nullptr, size_t info_l = 0, bool last = true); - void send_image(std::vector& imgs, bool last); + void thread_worker(void* param); + void process(RAWIMG* img, LPPAGEDPARAM param); + void send_image(LPPAGEDPARAM obj, std::vector& imgs, bool last); public: - imgproc_mgr(std::function sender, device_option* devopts, CHK_RES_FUNC res = CHK_RES_FUNC()); + imgproc_mgr(device_option* devopts, void(*sender)(SENDER_PROTO), void* sp, + bool(*res)(RES_CHK_PROTO) = nullptr, void* rp = nullptr); protected: virtual ~imgproc_mgr(); diff --git a/scanner/async_scanner.cpp b/scanner/async_scanner.cpp index eb9a162..bbdf185 100644 --- a/scanner/async_scanner.cpp +++ b/scanner/async_scanner.cpp @@ -152,23 +152,6 @@ async_scanner::async_scanner() : usb_(nullptr), cfg_mgr_(nullptr), scan_id_(0) utils::log_info(msg, LOG_LEVEL_DEBUG); }; - auto sender = [this](data_source_ptr img) -> void - { - if(img->ptr() && ((LPPACK_BASE)img->ptr())->cmd == PACK_CMD_SCAN_FINISHED_ROGER) - { - cis_->close(); - res_mgr_->stop(); - utils::print_memory_usage("Memory usage when finished scanning", true); - system("sudo cpufreq-set -g ondemand"); - } - - usb_->write_bulk(img); - }; - DECL_CHK_RES_FUNC(this, res) - { - return res_mgr_->is_resource_enable((enum _task)task, wait, to_ms); - }; - cfg_mgr_ = new device_option(true, user, on_log); utils::to_log(LOG_LEVEL_DEBUG, "OPT - initializing ...\n"); const_opts_ = new scanner_const_opts(); @@ -178,7 +161,7 @@ async_scanner::async_scanner() : usb_(nullptr), cfg_mgr_(nullptr), scan_id_(0) cis_->set_value(SANE_OPT_NAME(DEVICE_MODEL), &cfg_mgr_->get_option_value(SANE_OPT_NAME(DEVICE_MODEL), SANE_ACTION_GET_VALUE)[0]); cfg_mgr_->add(cis_); cfg_mgr_->add(user_); - img_prcr_ = new imgproc_mgr(sender, cfg_mgr_, res); + img_prcr_ = new imgproc_mgr(cfg_mgr_, &async_scanner::image_sender, (void*)this, &async_scanner::resource_check, (void*)res_mgr_.get()); cfg_mgr_->add(img_prcr_); utils::to_log(LOG_LEVEL_DEBUG, "OPT - initialized %u options.\n", cfg_mgr_->count()); @@ -227,6 +210,24 @@ async_scanner::~async_scanner() devui::uninit_ui(); } +bool async_scanner::resource_check(RES_CHK_PROTO) +{ + return ((resource_mgr*)param)->is_resource_enable((enum _task)type, wait, to_ms); +} +void async_scanner::image_sender(SENDER_PROTO) +{ + async_scanner* obj = (async_scanner*)param; + if(ptr->ptr() && ((LPPACK_BASE)ptr->ptr())->cmd == PACK_CMD_SCAN_FINISHED_ROGER) + { + obj->cis_->close(); + obj->res_mgr_->stop(); + utils::print_memory_usage("Memory usage when finished scanning", true); + system("sudo cpufreq-set -g ondemand"); + } + + obj->usb_->write_bulk(ptr); +} + dyn_mem_ptr async_scanner::handle_bulk_cmd(LPPACK_BASE pack, uint32_t* used, packet_data_base_ptr* required, uint32_t session) { dyn_mem_ptr reply = nullptr; diff --git a/scanner/async_scanner.h b/scanner/async_scanner.h index 7747d0b..c31f0b0 100644 --- a/scanner/async_scanner.h +++ b/scanner/async_scanner.h @@ -42,6 +42,10 @@ class async_scanner : public refer MUTEX fsender_; std::vector send_files_; + + static bool resource_check(RES_CHK_PROTO); + static void image_sender(SENDER_PROTO); + dyn_mem_ptr handle_bulk_cmd(LPPACK_BASE pack, uint32_t* used, packet_data_base_ptr* required, uint32_t session); void init(void); bool on_energy_conservation(bool normal/*true - working status; false - go to sleep*/); // return true to enable get into 'normal' status diff --git a/sdk/base/data.h b/sdk/base/data.h index db6fd79..15c61e6 100644 --- a/sdk/base/data.h +++ b/sdk/base/data.h @@ -402,3 +402,5 @@ public: #define FUNCTION_PROTO_PARAMETERS dyn_mem_ptr, uint32_t*, packet_data_base_ptr* #define FUNCTION_PROTO_COMMAND_HANDLE dyn_mem_ptr(FUNCTION_PROTO_PARAMETERS) +#define SENDER_PROTO data_source_ptr ptr, void* param +#define RES_CHK_PROTO int type, bool wait, int to_ms, void* param diff --git a/sdk/base/ui.cpp b/sdk/base/ui.cpp index 7ebd4b0..0f82f64 100644 --- a/sdk/base/ui.cpp +++ b/sdk/base/ui.cpp @@ -37,7 +37,7 @@ namespace devui public: pipe_reader(const char* fifo, std::function h) : pipe_path_(fifo), handler_(h) { - auto r = [this](void) -> void + auto r = [this](void*) -> void { int cycle = 0; while(run_) @@ -56,7 +56,7 @@ namespace devui } }; - worker_.start(r, SIZE_MB(1), "worker", (void*)&pipe_reader::worker); + worker_.start(r, nullptr, SIZE_MB(1), "worker", (void*)&pipe_reader::worker); } protected: virtual ~pipe_reader() @@ -117,7 +117,7 @@ namespace devui { sent_que_.enable_wait_log(false); - auto r = [this](void) -> void + auto r = [this](void*) -> void { int cycle = 0; while(run_) @@ -136,7 +136,7 @@ namespace devui } }; - worker_.start(r, SIZE_MB(1), "worker", (void*)&pipe_sender::worker); + worker_.start(r, nullptr, SIZE_MB(1), "worker", (void*)&pipe_sender::worker); } protected: virtual ~pipe_sender() @@ -205,16 +205,16 @@ namespace devui } else { - auto r = [this](void) -> void + auto r = [this](void*) -> void { receiver(); }; - auto s = [this](void) -> void + auto s = [this](void*) -> void { sender(); }; - workers_.start(r, SIZE_MB(1), "receiver"); - workers_.start(s, SIZE_MB(1), "sender"); + workers_.start(r, nullptr, SIZE_MB(1), "receiver"); + workers_.start(s, nullptr, SIZE_MB(1), "sender"); } } void close(void) diff --git a/sdk/base/utils.cpp b/sdk/base/utils.cpp index 7e2a034..ad22246 100644 --- a/sdk/base/utils.cpp +++ b/sdk/base/utils.cpp @@ -1529,6 +1529,21 @@ namespace utils return ps; } + int set_cpu_affinity(int cpuind, pthread_t thread) + { + int ret = 0; + + if(thread == (pthread_t)-1) + thread = GetCurrentThreadId(); + + cpu_set_t cpu; + + CPU_ZERO(&cpu); + CPU_SET(cpuind, &cpu); + ret = pthread_setaffinity_np(thread, sizeof(cpu), &cpu); + + return ret; + } std::string init_log(log_type type, log_level level, const char* fn_appendix) { @@ -2501,13 +2516,18 @@ safe_thread::~safe_thread() threads_.clear(); } -void safe_thread::thread_worker(std::function func, std::string name, void* addr) +void safe_thread::thread_worker(std::function func, void* param, std::string name, void* addr) { + EXCEPARAM ep; + + ep.name = name; + ep.param = param; + printf("stack size of thread '%s': %u\n", name.c_str(), utils::get_stack_size()); try { utils::to_log(LOG_LEVEL_DEBUG, "+++ safe_thread of '%s(addr: %p) - id: %p' is running ...\n", name.c_str(), addr, GetCurrentThreadId()); - func(); + func(param); utils::to_log(LOG_LEVEL_DEBUG, "--- safe_thread of '%s - %p' exited.\n", name.c_str(), GetCurrentThreadId()); return; } @@ -2523,17 +2543,17 @@ void safe_thread::thread_worker(std::function func, std::string name { utils::to_log(LOG_LEVEL_FATAL, "Unknown exception in thread '%p - %s'!\n", GetCurrentThreadId(), name.c_str()); } - excep_que_.save(name, true); + excep_que_.save(ep, true); } void safe_thread::thread_notify_exception(void) { while(run_) { - std::string name(""); - if(excep_que_.take(name, true)) + EXCEPARAM ep; + if(excep_que_.take(ep, true)) { if(excep_handler_) - excep_handler_(name.c_str()); + excep_handler_(ep.name.c_str(), ep.param); } } } @@ -2545,22 +2565,23 @@ void* safe_thread::raw_thread(void* lp) { safe_thread *obj = ((LPCRAWTHRD)lp)->obj; - std::function func = ((LPCRAWTHRD)lp)->func; + std::function func = ((LPCRAWTHRD)lp)->func; std::string name(((LPCRAWTHRD)lp)->name); void* addr = ((LPCRAWTHRD)lp)->addr; + void* param = ((LPCRAWTHRD)lp)->param; delete lp; - obj->thread_worker(func, name, addr); + obj->thread_worker(func, param, name, addr); return 0; } -void safe_thread::set_exception_handler(std::function on_exception) +void safe_thread::set_exception_handler(std::function on_exception) { excep_handler_ = on_exception; } -int safe_thread::start(std::function f, std::size_t stack, const char* thread_name, void* addr) +int safe_thread::start(std::function f, void* param, std::size_t stack, const char* thread_name, void* addr) { SAFETHRD st; @@ -2568,18 +2589,19 @@ int safe_thread::start(std::function f, std::size_t stack, const cha if(stack == 0) { st.raw = false; - st.thread.reset(new std::thread(&safe_thread::thread_worker, this, f, thread_name, addr)); + st.thread.reset(new std::thread(&safe_thread::thread_worker, this, f, param, st.name, addr)); } else { // st.thread.reset(new std::thread(std::thread(&safe_thread::thread_worker, this, f, thread_name, addr), std::move(stack))); - LPCRAWTHRD param = new CRAWTHRD; + LPCRAWTHRD para = new CRAWTHRD; int err = 0; - param->obj = this; - param->func = f; - param->name = thread_name; - param->addr = addr; + para->obj = this; + para->func = f; + para->name = thread_name; + para->addr = addr; + para->param = param; #if OS_WIN st.thread_raw = CreateThread(NULL, stack, &safe_thread::raw_thread, param, 0, nullptr); @@ -2591,14 +2613,14 @@ int safe_thread::start(std::function f, std::size_t stack, const cha pthread_attr_setstacksize(&attr, stack); st.raw = true; - err = pthread_create(&st.thread_raw, &attr, &safe_thread::raw_thread, param); + err = pthread_create(&st.thread_raw, &attr, &safe_thread::raw_thread, para); if(err) err = errno; pthread_attr_destroy(&attr); #endif if(err) { - delete param; + delete para; return err; } @@ -2646,6 +2668,7 @@ int safe_thread::stop(const char* thread_name) } + //////////////////////////////////////////////////////////////////////////////////////////////// // safe_file safe_file::safe_file(const char* path) diff --git a/sdk/base/utils.h b/sdk/base/utils.h index bc9bc3f..0b74ff1 100644 --- a/sdk/base/utils.h +++ b/sdk/base/utils.h @@ -17,6 +17,10 @@ #define USE_SAFE_THREAD +#define CPU_CORES 6 +#define CPU_MAJOR_0 4 +#define CPU_MAJOR_CNT 2 +#define CPU_MINOR_0 0 enum log_type { @@ -110,6 +114,7 @@ namespace utils void print_memory_usage(const char* tips, bool to_log_file); int get_disk_space(const char* path, unsigned long long* total, unsigned long long* avail, unsigned long long* block); unsigned int get_page_size(unsigned int* map_unit = nullptr); + int set_cpu_affinity(int cpuind, pthread_t thread = -1); // return logging file path if 'type' was LOG_TYPE_FILE std::string init_log(log_type type, log_level level = LOG_LEVEL_ALL, const char* fn_appendix = nullptr/*appendix to default log-file-name*/); @@ -489,22 +494,28 @@ class safe_thread std::shared_ptr thread; // valid when !raw pthread_t thread_raw; // valid when raw }SAFETHRD; + typedef struct _excep_param + { + std::string name; + void* param; + }EXCEPARAM, *LPEXCEPARAM; volatile bool run_ = true; MUTEX lock_; std::unique_ptr notify_thread_; std::vector threads_; - safe_fifo excep_que_; - std::function excep_handler_ = std::function(); + safe_fifo excep_que_; + std::function excep_handler_ = std::function(); - void thread_worker(std::function func, std::string name, void* addr); + void thread_worker(std::function func, void* param, std::string name, void* addr); void thread_notify_exception(void); typedef struct _cr_raw_thread { safe_thread* obj; - std::function func; + std::function func; std::string name; void* addr; + void* param; }CRAWTHRD, *LPCRAWTHRD; static #if OS_WIN @@ -519,8 +530,8 @@ public: ~safe_thread(); public: - void set_exception_handler(std::function on_exception = std::function()); - int start(std::function f, std::size_t stack = 0, const char* thread_name = nullptr, void* addr = nullptr); + void set_exception_handler(std::function on_exception = std::function()); + int start(std::function f, void* param = nullptr, std::size_t stack = 0, const char* thread_name = nullptr, void* addr = nullptr); int stop(const char* thread_name); }; diff --git a/sdk/imgprc/img_processor.h b/sdk/imgprc/img_processor.h index 5d17465..1b39ae0 100644 --- a/sdk/imgprc/img_processor.h +++ b/sdk/imgprc/img_processor.h @@ -29,6 +29,8 @@ typedef struct _proc_img_info_modules // 跨模块参数 uint8_t* data; // size = bytes_per_line * height }PROCIIM, *LPPROCIIM; #pragma pack(pop) +typedef std::shared_ptr> dcptr; + class image_processor : public sane_opt_provider { @@ -49,6 +51,8 @@ public: bool is_enable(void) { return enabled_; } int get_version(void) { return ver_; } int get_position(void) { return pos_; } + + virtual image_processor* copy_weaker(void) = 0; virtual int process(std::vector& in, std::vector& out) = 0; diff --git a/ui/dev_menu.cpp b/ui/dev_menu.cpp index b5961e3..700c618 100644 --- a/ui/dev_menu.cpp +++ b/ui/dev_menu.cpp @@ -446,7 +446,7 @@ ui_mgr::ui_mgr() : disp_data_("lcd-msg") keyboard_.reset(new KeyMonitor(ke)); - auto display = [this](void) -> void + auto display = [this](void*) -> void { #ifdef TEST_PLATFORM_EVENT printf("\tdisplay thread starting, sleep 3000ms ...\n"); @@ -459,7 +459,7 @@ ui_mgr::ui_mgr() : disp_data_("lcd-msg") #ifdef TEST_PLATFORM_EVENT to__.enable_log(false); #endif - disp_thrd_.start(display, SIZE_MB(4), "thread_display"); + disp_thrd_.start(display, nullptr, SIZE_MB(4), "thread_display"); #ifdef TEST_PLATFORM_EVENT // display thread starting, sleep 3000ms ...