#include "imgprc_mgr.h" #include #include #include #include #include "./algs/rebuild.h" #include "./algs/image_encoder.h" #include "./algs/stretch.h" #include "./algs/auto_crop.h" #include "./algs/color_correct.h" #include "./algs/multi_out.h" #include "./algs/ImageProcess_Public.h" //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // imgproc_mgr #define ADD_IMG_PROCESSOR(cls) \ { \ cls *obj = new cls(); \ opts_->add(obj); \ processors_.push_back(obj); \ } static std::string device_opt_json[] = { "{\"dump-img\":{\"cat\":\"base\",\"group\":\"advance\",\"title\":\"\\u8f93\\u51fa\\u4e2d\\u95f4\\u56fe\\u50cf\",\"desc\":\"\\u8f93\\u51fa\\u5404\\u7b97\\u6cd5\\u4e2d\\u95f4\\u7ed3\\u679c\\u56fe\\u50cf\",\"type\":\"bool\",\"ui-pos\":20,\"auth\":10,\"affect\":2,\"size\":4,\"cur\":false,\"default\":false}}" }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // imgproc_mgr imgproc_mgr::imgproc_mgr(device_option* devopts, void(*sender)(SENDER_PROTO), void* sp , bool(*res)(RES_CHK_PROTO), void* rp ) : opts_(devopts), img_sender_(sender), sender_param_(sp) , res_(res), res_param_(rp) #ifndef USE_THREAD_PAGED_DATA , raw_("cis-img") #endif { ADD_THIS_JSON(); #ifndef USE_THREAD_PAGED_DATA raw_.enable_wait_log(false); #endif if (opts_) opts_->add_ref(); else opts_ = new device_option(true); load_processor(nullptr); first_.reset(new rebuild()); last_.reset(new img_encoder()); opts_->add(first_.get()); opts_->add(last_.get()); } imgproc_mgr::~imgproc_mgr() { clear(); opts_->release(); } void imgproc_mgr::PAGEDPARAM::free(void) { for(auto& p: mean.processors) { if(!p) break; p->release(); } memset(mean.processors, 0, sizeof(mean.processors)); if(mean.que) delete mean.que; mean.que = nullptr; if(mean.rebld) mean.rebld->release(); mean.rebld = nullptr; if(mean.encoder) mean.encoder->release(); mean.encoder = nullptr; mean.res = nullptr; mean.dump = nullptr; mean.sender = nullptr; mean.res_param = mean.dump_param = mean.sender_param = nullptr; } bool imgproc_mgr::sort_processor_by_pos(image_processor* l, image_processor* r) { return l->get_position() < r->get_position(); } bool imgproc_mgr::sort_image_packet(image_packet_ptr l, image_packet_ptr r) { return l->get_paper_index() < r->get_paper_index(); } data_source_ptr imgproc_mgr::scan_finished_packet(uint32_t scanid, uint32_t err) { dyn_mem_ptr reply = dyn_mem::memory(sizeof(PACK_BASE)); BASE_PACKET_REPLY(*((LPPACK_BASE)reply->ptr()), PACK_CMD_SCAN_FINISHED_ROGER, scanid, err); reply->set_len(sizeof(PACK_BASE)); return reply; } void imgproc_mgr::process(image_processor* prc, std::vector* in, std::vector* out) { try { prc->process(*in, *out); } catch(const exception_ex& e) { std::string msg(std::string("image process '") + prc->from() + "': " + e.what()); throw(exception_ex(msg.c_str())); } catch(const std::exception& e) { std::string msg(std::string("image process '") + prc->from() + "': " + e.what()); throw(exception_ex(msg.c_str())); } } void imgproc_mgr::send_image(LPPAGEDPARAM obj, LPPACKIMAGE head, cv::Mat& mat, void* info, size_t info_l, bool last) { PACKIMAGE h(*head); #ifdef USE_THREAD_PAGED_DATA std::shared_ptr> compd(obj->mean.encoder->encode(&h, mat)); #else std::shared_ptr> compd(last_->encode(&h, mat)); #endif image_packet_ptr ptr = nullptr; if(last) { h.life = chronograph::from_process_born() - h.life; } else { h.prc_stage = head->prc_stage; h.prc_time = head->prc_time; } h.prc_last = last; #ifdef USE_THREAD_PAGED_DATA ptr = new image_packet(&h, compd, obj->mean.scan_id, info, info_l); ptr->set_session_id(obj->mean.session); obj->mean.sender(ptr, obj->mean.sender_param); #else ptr = new image_packet(&h, compd, scan_id_, info, info_l); ptr->set_session_id(session_id_); img_sender_(ptr, sender_param_); #endif ptr->release(); } void imgproc_mgr::real_dump_image(DUMP_PROTO) { #ifdef USE_THREAD_PAGED_DATA if(arr) { for(auto& v: *arr) imgproc_mgr::send_image(obj, &v.info, v.img, &v.ext_info[0], v.ext_info.length(), last); } else { imgproc_mgr::send_image(obj, info, *mat, infoex, infexl, last); } #else if(arr) { ((imgproc_mgr*)param)->send_image(nullptr, *arr, last); } else { ((imgproc_mgr*)param)->send_image(obj, info, *mat, infoex, infexl, last); } #endif } void imgproc_mgr::empty_dump_image(DUMP_PROTO) {} void imgproc_mgr::start_workers(int cnt) { run_ = false; for(int i = 0; i < working_cnt_; ++i) raw_.trigger(); workers_.stop(nullptr); run_ = true; #ifdef REBUILD_IN_CIS_THREAD raw_.clear(); #else RAWIMG ele; while(raw_.take(ele)) { if(ele.img) ele.data->release(); } #endif #ifdef USE_THREAD_PAGED_DATA for(auto& v: params_) v->free(); #endif auto thrd = [this](void* param) -> void { thread_worker(param); }; auto restart = [this](const char* thread_name, void* param) ->void { auto thrd = [this](void* param) -> void { thread_worker(param); }; workers_.stop(thread_name); add_busy_worker(-1); printf("\nrestart imgproc_mgr::thread_worker\n\n"); workers_.start(thrd, param, SIZE_MB(0), "imgproc_mgr::thread_worker", (void*)&imgproc_mgr::thread_worker); }; workers_.set_exception_handler(restart); for(int i = 0; i < cnt; ++i) { LPPAGEDPARAM param = (LPPAGEDPARAM)i; #ifdef USE_THREAD_PAGED_DATA if(i < params_.size()) param = params_[i]; else { param = new PAGEDPARAM(); params_.push_back(param); } param->mean.run = run_; param->mean.ind = i; param->mean.que = new safe_fifo("prcimg"); param->mean.que->enable_wait_log(false); param->mean.scan_id = scan_id_; param->mean.session = session_id_; param->mean.rebld = first_->is_enable() ? dynamic_cast(first_->copy_weaker()) : nullptr; param->mean.encoder = dynamic_cast(last_->copy_weaker()); param->mean.dumpi = dump_img_; if(param->mean.dumpi) { param->mean.dump = &imgproc_mgr::real_dump_image; param->mean.dump_param = this; } else { param->mean.dump = &imgproc_mgr::empty_dump_image; param->mean.dump_param = nullptr; } param->mean.sender = img_sender_; param->mean.sender_param = sender_param_; int ind = 0; for(auto& v: processors_) { if(ind >= _countof(param->mean.processors)) break; image_processor *prc = v->copy_weaker(); param->mean.processors[ind++] = prc; param->mean.processors[ind] = nullptr; } #endif char n[40] = {0}; sprintf(n, "thread_worker%d", i + 1); workers_.start(thrd, param, SIZE_MB(0), n, (void*)&imgproc_mgr::thread_worker); } } uint32_t imgproc_mgr::add_busy_worker(int inc) { SIMPLE_LOCK(working_cnt_lock_); working_cnt_ += inc; return working_cnt_; } void imgproc_mgr::thread_worker(void* param) { RAWIMG img; #ifdef USE_THREAD_PAGED_DATA LPPAGEDPARAM para = (LPPAGEDPARAM)param; int ind = para->mean.ind; #else int ind = (int)(long)param; void(*dump)(DUMP_PROTO) = dump_img_ ? &imgproc_mgr::real_dump_image : &imgproc_mgr::empty_dump_image; LPPAGEDPARAM para = (LPPAGEDPARAM)(void*)dump; #endif #ifdef BIND_CPU std::vector cpu; for(int i = 0; i < CPU_CORES - CPU_MAJOR_CNT; ++i) cpu.push_back(CPU_MINOR_0 + i); utils::to_log(LOG_LEVEL_DEBUG, "set image process thread %d to CPU %d = %d\n" , ind + 1, cpu, utils::set_cpu_affinity(&cpu[0], cpu.size())); #endif add_busy_worker(); #ifdef USE_THREAD_PAGED_DATA while(para->mean.run) { if(para->mean.res) { while(!para->mean.res(TASK_IMG_PROCESSOR, true, 3000, para->mean.res_param)) { if(!para->mean.run) break; } if(!para->mean.run) break; } if(para->mean.que->take(img, true)) { #ifdef REBUILD_IN_CIS_THREAD if(img.img && img.imgs.size() == 0) #else if(img.img && !img.data) #endif break; process(&img, para, ind); } } #else while(run_) { if(ind && res_) { while(!res_(TASK_IMG_PROCESSOR, true, 3000, res_param_)) { if(!run_) break; } if(!run_) break; } if(raw_.take(img, true)) { #ifdef REBUILD_IN_CIS_THREAD if(img.img && img.imgs.size() == 0) #else if(img.img && !img.data) #endif break; process(&img, para, ind); } } #endif add_busy_worker(-1); } void imgproc_mgr::process(RAWIMG* img, LPPAGEDPARAM param, int thrd_sn) { if(img->img) { std::vector in, out, *src = &in, *dst = &out, *swp = nullptr; chronograph watch; #ifdef REBUILD_IN_CIS_THREAD void(*dump)(DUMP_PROTO) = (void(*)(DUMP_PROTO))param; src = &img->imgs; utils::to_log(LOG_LEVEL_ALL, "Rebuild paper %d spend %u milliseconds.\n", img->imgs[0].info.pos.paper_ind, img->imgs[0].info.prc_time); #else #ifdef USE_THREAD_PAGED_DATA if(param->mean.dumpi) #else if(dump_img_) #endif { cv::Mat mat(img->info.width, img->info.height, CV_8UC1, img->data->ptr()); send_image(param, &img->info, mat, nullptr, 0, false); } #ifdef USE_THREAD_PAGED_DATA if(param->mean.rebld) { param->mean.rebld->do_rebuild(&img->info, img->data->ptr(), in); utils::to_log(LOG_LEVEL_ALL, "Thread %d Rebuild paper %d spend %llu milliseconds.\n", thrd_sn + 1, img->info.pos.paper_ind, watch.elapse_ms()); param->mean.dump(param, &in, nullptr, nullptr, nullptr, 0, false, param->mean.dump_param); } #else if(first_->is_enable()) { first_->do_rebuild(&img->info, img->data->ptr(), in); utils::to_log(LOG_LEVEL_ALL, "Thread %d Rebuild paper %d spend %llu milliseconds.\n", thrd_sn + 1, img->info.pos.paper_ind, watch.elapse_ms()); dump(nullptr, &in, nullptr, nullptr, nullptr, 0, false, this); } #endif else { PROCIMGINFO i; i.info = img->info; i.img = cv::Mat(img->info.width, img->info.height, CV_8UC1, img->data->ptr()); in.push_back(i); } img->data->release(); // page fault #endif #ifdef USE_THREAD_PAGED_DATA for(auto& v: param->mean.processors) { if(!v) { break; } if(v->is_enable()) { process(v, src, dst); src->clear(); swp = src; src = dst; dst = swp; param->mean.dump(param, src, nullptr, nullptr, nullptr, 0, false, param->mean.dump_param); } } #else for(auto& v: processors_) { if(v->is_enable()) { process(v, src, dst); src->clear(); swp = src; src = dst; dst = swp; dump(param, src, nullptr, nullptr, nullptr, 0, false, this); } } #endif send_image(param, *src, true); } else { uint32_t wait = 0, que = 0; RAWIMG over; #ifdef REBUILD_IN_CIS_THREAD data_source_ptr ptr = imgproc_mgr::scan_finished_packet(scan_id_, img->imgs[0].info.data_size); #else data_source_ptr ptr = imgproc_mgr::scan_finished_packet(scan_id_, img->info.data_size); over.data = nullptr; #endif over.img = true; #ifdef USE_THREAD_PAGED_DATA for(auto& v: params_) v->mean.que->save(over, true); #else for(int i = 0; i < working_cnt_; ++i) raw_.save(over, true); #endif ptr->set_session_id(session_id_); while((que = add_busy_worker(0)) > 1) { if(wait++ == 0) utils::to_log(LOG_LEVEL_DEBUG, "Received scan completed (in thread %d) event while processing %u paper(s), wait ...\n", thrd_sn + 1, que - 1); std::this_thread::sleep_for(std::chrono::milliseconds(5)); } img_sender_(ptr, sender_param_); ptr->release(); } } void imgproc_mgr::send_image(LPPAGEDPARAM obj, std::vector& imgs, bool last) { // if(last && imgs.size()) // { // bool first = true; // while(imgs[0].info.pos.paper_ind != sent_ind_) // { // if(first) // { // first = false; // utils::to_log(LOG_LEVEL_DEBUG, "Wait paper %d sent before sending paper %d ...\n", sent_ind_, imgs[0].info.pos.paper_ind); // } // std::this_thread::sleep_for(std::chrono::milliseconds(3)); // } // } int total = imgs.size(), ind = 0; for(auto& v: imgs) { v.info.pos.paper_all = total; v.info.pos.ind_in_paper = ind++; imgproc_mgr::send_image(obj, &v.info, v.img, v.ext_info.empty() ? nullptr : &v.ext_info[0], v.ext_info.length(), last); } // if(last) // sent_ind_++; } int imgproc_mgr::set_value(const char* name, void* val) { int ret = SCANNER_ERR_OK; if(strcmp(name, SANE_OPT_NAME(DUMP_IMG)) == 0) dump_img_ = *(bool*)val; else ret = SCANNER_ERR_DEVICE_NOT_SUPPORT; return ret; } int imgproc_mgr::load_processor(const char* path) { int ret = SCANNER_ERR_OK; // ADD_IMG_PROCESSOR(rebuild); ADD_IMG_PROCESSOR(stretch); ADD_IMG_PROCESSOR(auto_crop); ADD_IMG_PROCESSOR(color_correct); ADD_IMG_PROCESSOR(multi_out); // ADD_IMG_PROCESSOR(img_encoder); std::sort(processors_.begin(), processors_.end(), &imgproc_mgr::sort_processor_by_pos); return ret; } int imgproc_mgr::clear(void) { for (auto& v : processors_) v->release(); processors_.clear(); #ifdef USE_THREAD_PAGED_DATA for(auto& v: params_) delete v; params_.clear(); #endif return 0; } static uint64_t rebuild_cis = 0; static uint32_t scan_count = 0; int imgproc_mgr::process(CIS_IMAGE_PROTO) { RAWIMG ri; int ret = SCANNER_ERR_OK; int ind = put_ind_ % working_cnt_; #ifdef REBUILD_IN_CIS_THREAD if(type == CIS_CB_IMAGE) { LPPAGEDPARAM paged = nullptr; #ifdef USE_THREAD_PAGED_DATA paged = params_[ind]; #endif if(dump_img_) { cv::Mat mat(lpinfo->width, lpinfo->height, CV_8UC1, data->ptr()); send_image(paged, lpinfo, mat, nullptr, 0, false); } if(first_->is_enable()) { chronograph watch; first_->do_rebuild(lpinfo, data->ptr(), ri.imgs); rebuild_cis += ri.imgs[0].info.prc_time; scan_count++; if(dump_img_) send_image(paged, ri.imgs, false); } else { PROCIMGINFO i; i.info = *lpinfo; i.img = cv::Mat(lpinfo->width, lpinfo->height, CV_8UC1, data->ptr()); ri.imgs.push_back(i); } } else if(type == CIS_CB_SCAN_OVER) { PROCIMGINFO i; i.info.data_size = (uint32_t)(long)lpinfo; ri.imgs.push_back(i); if(scan_count) { printf("--> Rebuild %d papers in %llums, average is %.2fms\n", scan_count , rebuild_cis, rebuild_cis * 1.0f / scan_count); } } #else ri.data = data; if(type == CIS_CB_IMAGE) { data->add_ref(); ri.info = *lpinfo; } else if(type == CIS_CB_SCAN_OVER) { ri.info.data_size = (uint32_t)(long)lpinfo; } #endif else if(type == CIS_CB_STATUS) { data->set_session_id(session_id_); img_sender_(data, sender_param_); return SCANNER_ERR_OK; } ri.img = type == CIS_CB_IMAGE; #ifdef USE_THREAD_PAGED_DATA params_[ind]->mean.que->save(ri, true); #else raw_.save(ri, true); #endif ++put_ind_; return ret; } void imgproc_mgr::stop(void) { run_ = false; #ifdef USE_THREAD_PAGED_DATA for(auto& v: params_) { v->mean.run = false; if(v->mean.que) v->mean.que->trigger(); } #else for(int i = 0; i < working_cnt_; ++i) raw_.trigger(); #endif workers_.stop(nullptr); } bool imgproc_mgr::is_busy(void) { SIMPLE_LOCK(working_cnt_lock_); return working_cnt_; } void imgproc_mgr::start_new_turn(uint32_t scanid, uint32_t sessionid) { scan_id_ = scanid; sent_ind_ = 1; session_id_ = sessionid; put_ind_ = 0; start_workers(3); rebuild_cis = 0; scan_count = 0; }