CIS缓冲区增加到6个;削减图像处理线程公共数据访问;增加线程亲缘函数

This commit is contained in:
gb 2024-03-05 17:07:44 +08:00
parent 14e17b61cf
commit fe90a83e5c
22 changed files with 513 additions and 263 deletions

View File

@ -42,7 +42,7 @@ protected:
size_t length;
};
const int v4l_buffer_count = 5;
const int v4l_buffer_count = 6;
int buf_size_ = 0;
int v4l_width = 3100;
int v4l_height = 3100;

View File

@ -373,6 +373,8 @@ void scanner_hw::thread_image_capture(bool paper_ready)
std::function<IMAGE_HANDLER_PROTO> img_callback(img_handler_);
utils::to_log(LOG_LEVEL_DEBUG, "scanning thread working ...\n");
utils::to_log(LOG_LEVEL_DEBUG, "set capture thread to CPU %d = %d\n", CPU_MAJOR_0, utils::set_cpu_affinity(CPU_MAJOR_0));
avail_mem.enable_wait_log(false);
motor_->clear_error();
if(paper_ready)

View File

@ -55,8 +55,11 @@ static void myWarpAffine(cv::InputArray _src, cv::OutputArray _dst, cv::InputArr
//
#define OPTION_FUNC(name) auto name = [this](void* val) -> void
auto_crop::auto_crop() : image_processor("auto_crop")
auto_crop::auto_crop(bool weaker) : image_processor("auto_crop")
{
if(weaker)
return;
init();
ADD_THIS_JSON();
@ -321,6 +324,26 @@ int auto_crop::set_value(const char* name/*nullptr for all options*/, void* val/
return ret;
}
image_processor* auto_crop::copy_weaker(void)
{
auto_crop *weaker = new auto_crop(true);
weaker->pos_ = pos_;
weaker->enabled_ = enabled_;
weaker->crop_ = crop_;
weaker->deskew_ = deskew_;
weaker->fill_bg_ = fill_bg_;
weaker->convex_ = convex_;
weaker->fill_clr_ = fill_clr_;
weaker->threshold_ = threshold_;
weaker->indent_ = indent_;
weaker->noise_ = noise_;
weaker->fixed_paper_ = fixed_paper_;
weaker->lateral_ = lateral_;
return weaker;
}
int auto_crop::process(std::vector<PROCIMGINFO>& in, std::vector<PROCIMGINFO>& out)
{
int ret = SCANNER_ERR_OK;

View File

@ -26,7 +26,7 @@ class auto_crop : public image_processor
int work(PROCIMGINFO& in, PROCIMGINFO& out);
public:
auto_crop();
auto_crop(bool weaker = false);
protected:
~auto_crop();
@ -35,5 +35,6 @@ public:
virtual int set_value(const char* name/*nullptr for all options*/, void* val/*nullptr for restore*/) override;
public:
virtual image_processor* copy_weaker(void) override;
virtual int process(std::vector<PROCIMGINFO>& in, std::vector<PROCIMGINFO>& out) override;
};

View File

@ -21,8 +21,11 @@ static std::string device_opt_json[] = {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//
color_correct::color_correct() : image_processor("color_correct")
color_correct::color_correct(bool weaker) : image_processor("color_correct")
{
if(weaker)
return;
ADD_THIS_JSON();
hg::initLut(lut_path_.c_str(), clr_);
}
@ -57,6 +60,17 @@ int color_correct::set_value(const char* name/*nullptr for all options*/, void*
return ret;
}
image_processor* color_correct::copy_weaker(void)
{
color_correct *weaker = new color_correct(true);
weaker->pos_ = pos_;
weaker->enabled_ = enabled_;
weaker->correct_ = correct_;
weaker->clr_ = clr_;
return weaker;
}
int color_correct::process(std::vector<PROCIMGINFO>& in, std::vector<PROCIMGINFO>& out)
{
int ret = SCANNER_ERR_OK;

View File

@ -12,7 +12,7 @@ class color_correct : public image_processor
std::string lut_path_ = "/usr/local/huago/Textlut200clr.bmp";
public:
color_correct();
color_correct(bool weaker = false);
protected:
~color_correct();
@ -21,5 +21,6 @@ public:
virtual int set_value(const char* name/*nullptr for all options*/, void* val/*nullptr for restore*/) override;
public:
virtual image_processor* copy_weaker(void) override;
virtual int process(std::vector<PROCIMGINFO>& in, std::vector<PROCIMGINFO>& out) override;
};

View File

@ -12,8 +12,11 @@ static std::string device_opt_json[] = {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// img_encoder
img_encoder::img_encoder() : image_processor("img_encoder")
img_encoder::img_encoder(bool weaker) : image_processor("img_encoder")
{
if(weaker)
return;
ADD_THIS_JSON();
param_.push_back(cv::IMWRITE_JPEG_QUALITY);
@ -62,6 +65,19 @@ int img_encoder::set_value(const char* name, void* val)
return ret;
}
image_processor* img_encoder::copy_weaker(void)
{
img_encoder *weaker = new img_encoder(true);
weaker->pos_ = pos_;
weaker->enabled_ = enabled_;
weaker->fmt_ = fmt_;
weaker->jpeg_quality_ = jpeg_quality_;
weaker->param_ = param_;
return weaker;
}
int img_encoder::process(std::vector<PROCIMGINFO>& in, std::vector<PROCIMGINFO>& out)
{
for(auto& v: in)

View File

@ -14,7 +14,7 @@ class img_encoder : public image_processor
std::vector<int> param_;
public:
img_encoder();
img_encoder(bool weaker = false);
protected:
~img_encoder();
@ -23,6 +23,7 @@ public:
virtual int set_value(const char* name/*nullptr for all options*/, void* val/*nullptr for restore*/) override;
public:
virtual image_processor* copy_weaker(void) override;
virtual int process(std::vector<PROCIMGINFO>& in, std::vector<PROCIMGINFO>& out) override;
public:

View File

@ -11,8 +11,11 @@ static std::string device_opt_json[] = {
"{\"rebuild\":{\"cat\":\"imgp\",\"group\":\"imgp\",\"title\":\"CIS\\u56fe\\u50cf\\u8fd8\\u539f\",\"desc\":\"\\u5c06\\u4eceCIS\\u8f93\\u51fa\\u7684\\u539f\\u59cb\\u6570\\u636e\\u6d41\\uff0c\\u8fd8\\u539f\\u4e3aBMP\\u56fe\\u7247\\uff0c\\u5e76\\u62c6\\u5206\\u6b63\\u53cd\\u9762\",\"type\":\"bool\",\"pos\":10,\"ui-pos\":1,\"auth\":0,\"visible\":0,\"size\":4,\"auto\":false,\"cur\":true,\"default\":true}}"
};
rebuild::rebuild() : image_processor("rebuild")
rebuild::rebuild(bool weaker) : image_processor("rebuild")
{
if(weaker)
return;
ADD_THIS_JSON();
}
rebuild::~rebuild()
@ -34,6 +37,17 @@ void rebuild::enable(const char* name, bool able)
enabled_ = able;
}
image_processor* rebuild::copy_weaker(void)
{
rebuild *weaker = new rebuild(true);
weaker->pos_ = pos_;
weaker->enabled_ = enabled_;
weaker->rebuild_ = rebuild_;
return weaker;
}
int rebuild::process(std::vector<PROCIMGINFO>& in, std::vector<PROCIMGINFO>& out)
{
int ret = SCANNER_ERR_OK;
@ -98,8 +112,6 @@ void rebuild::do_rebuild(LPPACKIMAGE info, uint8_t* stream, std::vector<PROCIMGI
o.info.width = out[0].info.width;
o.info.channels = out[0].info.channels;
printf("rebuild %03d - (%d * %d * 8) to (%d * %d * 24), front = %p, back = %p, size = %u ...\n", info->pos.paper_ind, info->width, info->height
, o.info.width, o.info.height, dstf, dstb, size);
for(int h = 0; h < info->height; ++h)
{
for(int w = 0; w < o.info.width; ++w)

View File

@ -10,7 +10,7 @@ class rebuild : public image_processor
bool rebuild_ = true;
public:
rebuild();
rebuild(bool weaker = false);
protected:
~rebuild();
@ -20,6 +20,7 @@ public:
virtual void enable(const char* name, bool able) override;
public:
virtual image_processor* copy_weaker(void) override;
virtual int process(std::vector<PROCIMGINFO>& in, std::vector<PROCIMGINFO>& out) override;
public:

View File

@ -17,8 +17,11 @@ static std::string device_opt_json[] = {
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//
stretch::stretch() : image_processor("stretch")
stretch::stretch(bool weaker) : image_processor("stretch")
{
if(weaker)
return;
ADD_THIS_JSON();
}
stretch::~stretch()
@ -35,6 +38,17 @@ int stretch::set_value(const char* name/*nullptr for all options*/, void* val/*n
return ret;
}
image_processor* stretch::copy_weaker(void)
{
stretch *weaker = new stretch(true);
weaker->pos_ = pos_;
weaker->enabled_ = enabled_;
weaker->dpi_ = dpi_;
return weaker;
}
int stretch::process(std::vector<PROCIMGINFO>& in, std::vector<PROCIMGINFO>& out)
{
for(auto& v: in)

View File

@ -10,7 +10,7 @@ class stretch : public image_processor
uint32_t dpi_ = 200;
public:
stretch();
stretch(bool weaker = false);
protected:
~stretch();
@ -19,5 +19,6 @@ public:
virtual int set_value(const char* name/*nullptr for all options*/, void* val/*nullptr for restore*/) override;
public:
virtual image_processor* copy_weaker(void) override;
virtual int process(std::vector<PROCIMGINFO>& in, std::vector<PROCIMGINFO>& out) override;
};

View File

@ -6,11 +6,11 @@
#include <base/packet.h>
#include "./algs/rebuild.h"
#include "./algs/image_encoder.h"
#include "./algs/stretch.h"
#include "./algs/auto_crop.h"
#include "./algs/color_correct.h"
#include "./algs/ImageProcess_Public.h"
#include "./algs/image_encoder.h"
@ -31,25 +31,14 @@ static std::string device_opt_json[] = {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// imgproc_mgr
imgproc_mgr::imgproc_mgr(std::function<void(data_source_ptr)> sender
, device_option* devopts
, CHK_RES_FUNC res
imgproc_mgr::imgproc_mgr(device_option* devopts, void(*sender)(SENDER_PROTO), void* sp
, bool(*res)(RES_CHK_PROTO), void* rp
)
: img_sender_(sender), opts_(devopts), prc_que_("prcimg")
, res_(res)
: opts_(devopts), img_sender_(sender), sender_param_(sp)
, res_(res), res_param_(rp)
{
prc_que_.enable_wait_log(false);
ADD_THIS_JSON();
if(!res_)
{
DECL_CHK_RES_FUNC(&, r)
{
return true;
};
res_ = r;
}
if (opts_)
opts_->add_ref();
else
@ -60,27 +49,6 @@ imgproc_mgr::imgproc_mgr(std::function<void(data_source_ptr)> sender
opts_->add(first_.get());
opts_->add(last_.get());
auto thrd = [&](void) -> void
{
thread_worker();
};
auto restart = [this](const char* thread_name) ->void
{
auto thrd = [this](void) -> void
{
thread_worker();
};
workers_.stop(thread_name);
add_busy_worker(-1);
printf("\nrestart imgproc_mgr::thread_worker\n\n");
workers_.start(thrd, SIZE_MB(0), "imgproc_mgr::thread_worker", (void*)&imgproc_mgr::thread_worker);
};
workers_.set_exception_handler(restart);
workers_.start(thrd, SIZE_MB(0), "thread_worker1", (void*)&imgproc_mgr::thread_worker);
workers_.start(thrd, SIZE_MB(0), "thread_worker2", (void*)&imgproc_mgr::thread_worker);
workers_.start(thrd, SIZE_MB(0), "thread_worker3", (void*)&imgproc_mgr::thread_worker);
workers_.start(thrd, SIZE_MB(0), "thread_worker4", (void*)&imgproc_mgr::thread_worker);
}
imgproc_mgr::~imgproc_mgr()
{
@ -88,6 +56,31 @@ imgproc_mgr::~imgproc_mgr()
opts_->release();
}
void imgproc_mgr::PAGEDPARAM::free(void)
{
for(auto& p: mean.processors)
{
if(!p)
break;
p->release();
}
memset(mean.processors, 0, sizeof(mean.processors));
if(mean.que)
delete mean.que;
mean.que = nullptr;
if(mean.rebld)
mean.rebld->release();
mean.rebld = nullptr;
if(mean.encoder)
mean.encoder->release();
mean.encoder = nullptr;
mean.res = nullptr;
mean.dump = nullptr;
mean.sender = nullptr;
mean.res_param = mean.dump_param = mean.sender_param = nullptr;
}
bool imgproc_mgr::sort_processor_by_pos(image_processor* l, image_processor* r)
{
return l->get_position() < r->get_position();
@ -104,115 +97,6 @@ data_source_ptr imgproc_mgr::scan_finished_packet(uint32_t scanid, uint32_t err)
return reply;
}
uint32_t imgproc_mgr::add_busy_worker(int inc)
{
SIMPLE_LOCK(working_cnt_lock_);
working_cnt_ += inc;
return working_cnt_;
}
void imgproc_mgr::thread_worker(void)
{
RAWIMG img;
while(run_)
{
while(!res_(TASK_IMG_PROCESSOR, true, 3000))
{
if(!run_)
break;
}
if(!run_)
break;
if(prc_que_.take(img, true))
{
add_busy_worker();
// try
// {
process(&img);
// }
// catch(const std::exception& e)
// {
// printf("exception occurs when process paper %d: %s!\n", img.info.pos.paper_ind, e.what());
// }
add_busy_worker(-1);
}
}
}
void imgproc_mgr::process(RAWIMG* img)
{
if(img->img)
{
std::vector<PROCIMGINFO> in, out, *src = &in, *dst = &out, *swp = nullptr;
chronograph watch;
auto realdump = [this](std::vector<PROCIMGINFO>* arr, LPPACKIMAGE info, cv::Mat* mat, char* infoex, size_t infexl, bool last) -> void
{
if(arr)
send_image(*arr, last);
else
send_image(info, *mat, infoex, infexl, last);
};
auto emptydump = [this](std::vector<PROCIMGINFO>* arr, LPPACKIMAGE info, cv::Mat* mat, char* infoex, size_t infexl, bool last) -> void
{};
std::function<void(std::vector<PROCIMGINFO>* arr, LPPACKIMAGE info, cv::Mat* mat, char* infoex, size_t infexl, bool last)> dump = realdump;
if(dump_img_)
{
cv::Mat mat(img->info.width, img->info.height, CV_8UC1, img->data->ptr());
send_image(&img->info, mat, nullptr, 0, false);
}
else
dump = emptydump;
if(do_rebuild_)
{
first_->do_rebuild(&img->info, img->data->ptr(), in);
utils::to_log(LOG_LEVEL_ALL, "Rebuild paper %d spend %llu milliseconds.\n", img->info.pos.paper_ind, watch.elapse_ms());
dump(&in, nullptr, nullptr, nullptr, 0, false);
}
else
{
PROCIMGINFO i;
i.info = img->info;
i.img = cv::Mat(img->info.width, img->info.height, CV_8UC1, img->data->ptr());
in.push_back(i);
}
img->data->release();
for(auto& v: processors_)
{
if(v->is_enable())
{
process(v, src, dst);
src->clear();
swp = src;
src = dst;
dst = swp;
dump(src, nullptr, nullptr, nullptr, 0, false);
}
}
send_image(*src, true);
}
else
{
data_source_ptr ptr = imgproc_mgr::scan_finished_packet(scan_id_, img->info.data_size);
uint32_t wait = 0, que = 0;
ptr->set_session_id(session_id_);
while((que = add_busy_worker(0)) > 1)
{
if(wait++ == 0)
utils::to_log(LOG_LEVEL_DEBUG, "Received scan completed event while processing %u paper(s), wait ...\n", que - 1);
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
img_sender_(ptr);
ptr->release();
}
}
void imgproc_mgr::process(image_processor* prc, std::vector<PROCIMGINFO>* in, std::vector<PROCIMGINFO>* out)
{
try
@ -232,18 +116,10 @@ void imgproc_mgr::process(image_processor* prc, std::vector<PROCIMGINFO>* in, st
throw(exception_ex(msg.c_str()));
}
}
void imgproc_mgr::send_image(LPPACKIMAGE head, cv::Mat& mat, void* info, size_t info_l, bool last)
void imgproc_mgr::send_image(LPPAGEDPARAM obj, LPPACKIMAGE head, cv::Mat& mat, void* info, size_t info_l, bool last)
{
auto ovr = [&](uint64_t total, uint64_t cur_size, uint32_t err, void* user_data) -> int
{
if(total == FINAL_NOTIFY && cur_size == FINAL_NOTIFY)
printf("~%p\n", user_data);
return 0;
};
PACKIMAGE h(*head);
std::shared_ptr<std::vector<uchar>> compd(last_->encode(&h, mat));
std::shared_ptr<std::vector<uchar>> compd(obj->mean.encoder->encode(&h, mat));
image_packet_ptr ptr = nullptr;
if(last)
@ -255,13 +131,209 @@ void imgproc_mgr::send_image(LPPACKIMAGE head, cv::Mat& mat, void* info, size_t
h.prc_stage = head->prc_stage;
h.prc_time = head->prc_time;
}
ptr = new image_packet(&h, compd, scan_id_, info, info_l);
ptr = new image_packet(&h, compd, obj->mean.scan_id, info, info_l);
ptr->set_session_id(session_id_);
img_sender_(ptr);
ptr->set_session_id(obj->mean.session);
obj->mean.sender(ptr, obj->mean.sender_param);
ptr->release();
}
void imgproc_mgr::send_image(std::vector<PROCIMGINFO>& imgs, bool last)
void imgproc_mgr::real_dump_image(DUMP_PROTO)
{
if(arr)
{
for(auto& v: *arr)
imgproc_mgr::send_image(obj, &v.info, v.img, &v.ext_info[0], v.ext_info.length(), last);
}
else
{
imgproc_mgr::send_image(obj, info, *mat, infoex, infexl, last);
}
}
void imgproc_mgr::empty_dump_image(DUMP_PROTO)
{}
void imgproc_mgr::start_workers(int cnt)
{
workers_.stop(nullptr);
for(auto& v: params_)
v->free();
auto thrd = [this](void* param) -> void
{
thread_worker(param);
};
auto restart = [this](const char* thread_name, void* param) ->void
{
auto thrd = [this](void* param) -> void
{
thread_worker(param);
};
workers_.stop(thread_name);
add_busy_worker(-1);
printf("\nrestart imgproc_mgr::thread_worker\n\n");
workers_.start(thrd, param, SIZE_MB(0), "imgproc_mgr::thread_worker", (void*)&imgproc_mgr::thread_worker);
};
workers_.set_exception_handler(restart);
for(int i = 0; i < cnt; ++i)
{
LPPAGEDPARAM param = nullptr;
if(i < params_.size())
param = params_[i];
else
{
param = new PAGEDPARAM();
params_.push_back(param);
}
param->mean.run = run_;
param->mean.ind = i;
param->mean.que = new safe_fifo<RAWIMG>("prcimg");
param->mean.que->enable_wait_log(false);
param->mean.scan_id = scan_id_;
param->mean.session = session_id_;
param->mean.rebld = do_rebuild_ ? dynamic_cast<rebuild*>(first_->copy_weaker()) : nullptr;
param->mean.encoder = dynamic_cast<img_encoder*>(last_->copy_weaker());
param->mean.dumpi = dump_img_;
if(param->mean.dumpi)
{
param->mean.dump = &imgproc_mgr::real_dump_image;
param->mean.dump_param = this;
}
else
{
param->mean.dump = &imgproc_mgr::empty_dump_image;
param->mean.dump_param = nullptr;
}
param->mean.sender = img_sender_;
param->mean.sender_param = sender_param_;
int ind = 0;
for(auto& v: processors_)
{
if(ind >= _countof(param->mean.processors))
break;
image_processor *prc = v->copy_weaker();
param->mean.processors[ind++] = prc;
param->mean.processors[ind] = nullptr;
}
char n[40] = {0};
sprintf(n, "thread_worker%d", i + 1);
workers_.start(thrd, param, SIZE_MB(0), n, (void*)&imgproc_mgr::thread_worker);
}
}
uint32_t imgproc_mgr::add_busy_worker(int inc)
{
SIMPLE_LOCK(working_cnt_lock_);
working_cnt_ += inc;
return working_cnt_;
}
void imgproc_mgr::thread_worker(void* param)
{
LPPAGEDPARAM para = (LPPAGEDPARAM)param;
RAWIMG img;
int cpu = (para->mean.ind % (CPU_CORES - CPU_MAJOR_CNT)) + CPU_MINOR_0;
utils::to_log(LOG_LEVEL_DEBUG, "set image process thread %d to CPU %d = %d\n"
, para->mean.ind, cpu, utils::set_cpu_affinity(cpu));
add_busy_worker();
while(para->mean.run)
{
if(para->mean.res)
{
while(!para->mean.res(TASK_IMG_PROCESSOR, true, 3000, para->mean.res_param))
{
if(!para->mean.run)
break;
}
if(!para->mean.run)
break;
}
if(para->mean.que->take(img, true))
{
if(img.img && !img.data)
break;
process(&img, para);
}
}
add_busy_worker(-1);
}
void imgproc_mgr::process(RAWIMG* img, LPPAGEDPARAM param)
{
if(img->img)
{
std::vector<PROCIMGINFO> in, out, *src = &in, *dst = &out, *swp = nullptr;
chronograph watch;
if(param->mean.dumpi)
{
cv::Mat mat(img->info.width, img->info.height, CV_8UC1, img->data->ptr());
send_image(param, &img->info, mat, nullptr, 0, false);
}
if(param->mean.rebld)
{
param->mean.rebld->do_rebuild(&img->info, img->data->ptr(), in);
utils::to_log(LOG_LEVEL_ALL, "Rebuild paper %d spend %llu milliseconds.\n", img->info.pos.paper_ind, watch.elapse_ms());
param->mean.dump(param, &in, nullptr, nullptr, nullptr, 0, false, param->mean.dump_param);
}
else
{
PROCIMGINFO i;
i.info = img->info;
i.img = cv::Mat(img->info.width, img->info.height, CV_8UC1, img->data->ptr());
in.push_back(i);
}
img->data->release();
for(auto& v: param->mean.processors)
{
if(!v)
{
break;
}
if(v->is_enable())
{
process(v, src, dst);
src->clear();
swp = src;
src = dst;
dst = swp;
param->mean.dump(param, src, nullptr, nullptr, nullptr, 0, false, param->mean.dump_param);
}
}
send_image(param, *src, true);
}
else
{
data_source_ptr ptr = imgproc_mgr::scan_finished_packet(scan_id_, img->info.data_size);
uint32_t wait = 0, que = 0;
RAWIMG over;
over.data = nullptr;
over.img = true;
for(auto& v: params_)
v->mean.que->save(over, true);
ptr->set_session_id(session_id_);
while((que = add_busy_worker(0)) > 1)
{
if(wait++ == 0)
utils::to_log(LOG_LEVEL_DEBUG, "Received scan completed event while processing %u paper(s), wait ...\n", que - 1);
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
img_sender_(ptr, sender_param_);
ptr->release();
}
}
void imgproc_mgr::send_image(LPPAGEDPARAM obj, std::vector<PROCIMGINFO>& imgs, bool last)
{
// if(last && imgs.size())
// {
@ -278,10 +350,10 @@ void imgproc_mgr::send_image(std::vector<PROCIMGINFO>& imgs, bool last)
// }
for(auto& v: imgs)
send_image(&v.info, v.img, v.ext_info.empty() ? nullptr : &v.ext_info[0], v.ext_info.length(), last);
imgproc_mgr::send_image(obj, &v.info, v.img, v.ext_info.empty() ? nullptr : &v.ext_info[0], v.ext_info.length(), last);
if(last)
sent_ind_++;
// if(last)
// sent_ind_++;
}
int imgproc_mgr::set_value(const char* name, void* val)
@ -294,25 +366,12 @@ int imgproc_mgr::set_value(const char* name, void* val)
do_rebuild_ = *(bool*)val;
else if(strcmp(name, SANE_OPT_NAME(CIS_STRETCH)) == 0)
{
if(*(bool*)val)
for(auto& v: processors_)
{
if(stretcher_)
if(strcmp(v->from(), name) == 0)
{
processors_.push_back(stretcher_);
stretcher_ = nullptr;
std::sort(processors_.begin(), processors_.end(), &imgproc_mgr::sort_processor_by_pos);
}
}
else
{
for(size_t i = 0; i < processors_.size(); ++i)
{
if(strcmp(processors_[i]->from(), "stretch") == 0)
{
stretcher_ = processors_[i];
processors_.erase(processors_.begin() + i);
break;
}
v->enable(name, *(bool*)val);
break;
}
}
}
@ -342,9 +401,10 @@ int imgproc_mgr::clear(void)
for (auto& v : processors_)
v->release();
processors_.clear();
if(stretcher_)
stretcher_->release();
stretcher_ = nullptr;
for(auto& v: params_)
delete v;
params_.clear();
return 0;
}
@ -353,6 +413,7 @@ int imgproc_mgr::process(LPPACKIMAGE info, dyn_mem_ptr data, bool img)
{
RAWIMG ri;
int ret = SCANNER_ERR_OK;
int ind = put_ind_ % working_cnt_;
ri.data = data;
if(img)
@ -362,7 +423,8 @@ int imgproc_mgr::process(LPPACKIMAGE info, dyn_mem_ptr data, bool img)
else
ri.info.data_size = (uint32_t)(long)info;
ri.img = img;
prc_que_.save(ri, true);
params_[ind]->mean.que->save(ri, true);
++put_ind_;
return ret;
}
@ -370,7 +432,12 @@ int imgproc_mgr::process(LPPACKIMAGE info, dyn_mem_ptr data, bool img)
void imgproc_mgr::stop(void)
{
run_ = false;
prc_que_.trigger();
for(auto& v: params_)
{
v->mean.run = false;
if(v->mean.que)
v->mean.que->trigger();
}
workers_.stop(nullptr);
}
@ -385,4 +452,6 @@ void imgproc_mgr::start_new_turn(uint32_t scanid, uint32_t sessionid)
scan_id_ = scanid;
sent_ind_ = 1;
session_id_ = sessionid;
put_ind_ = 0;
start_workers(3);
}

View File

@ -13,24 +13,67 @@
#include <base/data.h>
#include <vector>
typedef std::shared_ptr<std::vector<char>> dcptr;
#define DUMP_PROTO union _page_thrd_data_4k* obj, std::vector<PROCIMGINFO>* arr, LPPACKIMAGE info, cv::Mat* mat, char* infoex, size_t infexl, bool last, void* param
class device_option;
class rebuild;
class img_encoder;
class rebuild;
class imgproc_mgr : public sane_opt_provider
{
typedef struct _raw_img
{
PACKIMAGE info;
dyn_mem_ptr data;
dyn_mem_ptr data; // nullptr and img is 'true' to exit worker
bool img;
}RAWIMG;
// allocate a page to store the vars for every worker thread
typedef union _page_thrd_data_4k
{
struct
{
bool run;
bool dumpi;
int ind;
int session;
int scan_id;
rebuild* rebld;
img_encoder* encoder;
safe_fifo<RAWIMG> *que;
image_processor* processors[20];
bool(*res)(RES_CHK_PROTO);
void *res_param;
void(*dump)(DUMP_PROTO);
void *dump_param;
void(*sender)(SENDER_PROTO);
void *sender_param;
}mean;
uint8_t page[SIZE_KB(4)];
_page_thrd_data_4k()
{
mean.rebld = nullptr;
mean.encoder = nullptr;
mean.que = nullptr;
memset(mean.processors, 0, sizeof(mean.processors));
mean.res = nullptr;
mean.dump = nullptr;
mean.sender = nullptr;
mean.res_param = mean.dump_param = mean.sender_param = nullptr;
}
~_page_thrd_data_4k()
{
this->free();
}
void free(void);
}PAGEDPARAM, *LPPAGEDPARAM;
std::vector<LPPAGEDPARAM> params_;
refer_guard<rebuild> first_;
refer_guard<img_encoder> last_;
image_processor* stretcher_ = nullptr;
bool do_rebuild_ = true;
volatile bool run_ = true;
volatile uint32_t sent_ind_ = 1;
@ -39,27 +82,34 @@ class imgproc_mgr : public sane_opt_provider
uint32_t session_id_ = 0;
MUTEX working_cnt_lock_;
uint32_t working_cnt_ = 0;
uint32_t put_ind_ = 0;
std::vector<image_processor*> processors_;
device_option* opts_;
safe_thread workers_;
safe_fifo<RAWIMG> prc_que_;
CHK_RES_FUNC res_ = CHK_RES_FUNC();
std::function<void(data_source_ptr)> img_sender_;
bool(*res_)(RES_CHK_PROTO) = nullptr;
void* res_param_ = nullptr;
void(*img_sender_)(SENDER_PROTO) = nullptr;
void* sender_param_ = nullptr;
static bool sort_processor_by_pos(image_processor* l, image_processor* r);
static bool sort_image_packet(image_packet_ptr l, image_packet_ptr r);
static data_source_ptr scan_finished_packet(uint32_t scanid, uint32_t err = 0);
static void process(image_processor* prc, std::vector<PROCIMGINFO>* in, std::vector<PROCIMGINFO>* out);
static void send_image(LPPAGEDPARAM obj, LPPACKIMAGE head, cv::Mat& mat, void* info = nullptr, size_t info_l = 0, bool last = true);
static void real_dump_image(DUMP_PROTO);
static void empty_dump_image(DUMP_PROTO);
void start_workers(int cnt);
uint32_t add_busy_worker(int inc = 1);
void thread_worker(void);
void process(RAWIMG* img);
void process(image_processor* prc, std::vector<PROCIMGINFO>* in, std::vector<PROCIMGINFO>* out);
void send_image(LPPACKIMAGE head, cv::Mat& mat, void* info = nullptr, size_t info_l = 0, bool last = true);
void send_image(std::vector<PROCIMGINFO>& imgs, bool last);
void thread_worker(void* param);
void process(RAWIMG* img, LPPAGEDPARAM param);
void send_image(LPPAGEDPARAM obj, std::vector<PROCIMGINFO>& imgs, bool last);
public:
imgproc_mgr(std::function<void(data_source_ptr)> sender, device_option* devopts, CHK_RES_FUNC res = CHK_RES_FUNC());
imgproc_mgr(device_option* devopts, void(*sender)(SENDER_PROTO), void* sp,
bool(*res)(RES_CHK_PROTO) = nullptr, void* rp = nullptr);
protected:
virtual ~imgproc_mgr();

View File

@ -152,23 +152,6 @@ async_scanner::async_scanner() : usb_(nullptr), cfg_mgr_(nullptr), scan_id_(0)
utils::log_info(msg, LOG_LEVEL_DEBUG);
};
auto sender = [this](data_source_ptr img) -> void
{
if(img->ptr() && ((LPPACK_BASE)img->ptr())->cmd == PACK_CMD_SCAN_FINISHED_ROGER)
{
cis_->close();
res_mgr_->stop();
utils::print_memory_usage("Memory usage when finished scanning", true);
system("sudo cpufreq-set -g ondemand");
}
usb_->write_bulk(img);
};
DECL_CHK_RES_FUNC(this, res)
{
return res_mgr_->is_resource_enable((enum _task)task, wait, to_ms);
};
cfg_mgr_ = new device_option(true, user, on_log);
utils::to_log(LOG_LEVEL_DEBUG, "OPT - initializing ...\n");
const_opts_ = new scanner_const_opts();
@ -178,7 +161,7 @@ async_scanner::async_scanner() : usb_(nullptr), cfg_mgr_(nullptr), scan_id_(0)
cis_->set_value(SANE_OPT_NAME(DEVICE_MODEL), &cfg_mgr_->get_option_value(SANE_OPT_NAME(DEVICE_MODEL), SANE_ACTION_GET_VALUE)[0]);
cfg_mgr_->add(cis_);
cfg_mgr_->add(user_);
img_prcr_ = new imgproc_mgr(sender, cfg_mgr_, res);
img_prcr_ = new imgproc_mgr(cfg_mgr_, &async_scanner::image_sender, (void*)this, &async_scanner::resource_check, (void*)res_mgr_.get());
cfg_mgr_->add(img_prcr_);
utils::to_log(LOG_LEVEL_DEBUG, "OPT - initialized %u options.\n", cfg_mgr_->count());
@ -227,6 +210,24 @@ async_scanner::~async_scanner()
devui::uninit_ui();
}
bool async_scanner::resource_check(RES_CHK_PROTO)
{
return ((resource_mgr*)param)->is_resource_enable((enum _task)type, wait, to_ms);
}
void async_scanner::image_sender(SENDER_PROTO)
{
async_scanner* obj = (async_scanner*)param;
if(ptr->ptr() && ((LPPACK_BASE)ptr->ptr())->cmd == PACK_CMD_SCAN_FINISHED_ROGER)
{
obj->cis_->close();
obj->res_mgr_->stop();
utils::print_memory_usage("Memory usage when finished scanning", true);
system("sudo cpufreq-set -g ondemand");
}
obj->usb_->write_bulk(ptr);
}
dyn_mem_ptr async_scanner::handle_bulk_cmd(LPPACK_BASE pack, uint32_t* used, packet_data_base_ptr* required, uint32_t session)
{
dyn_mem_ptr reply = nullptr;

View File

@ -42,6 +42,10 @@ class async_scanner : public refer
MUTEX fsender_;
std::vector<file_reader*> send_files_;
static bool resource_check(RES_CHK_PROTO);
static void image_sender(SENDER_PROTO);
dyn_mem_ptr handle_bulk_cmd(LPPACK_BASE pack, uint32_t* used, packet_data_base_ptr* required, uint32_t session);
void init(void);
bool on_energy_conservation(bool normal/*true - working status; false - go to sleep*/); // return true to enable get into 'normal' status

View File

@ -402,3 +402,5 @@ public:
#define FUNCTION_PROTO_PARAMETERS dyn_mem_ptr, uint32_t*, packet_data_base_ptr*
#define FUNCTION_PROTO_COMMAND_HANDLE dyn_mem_ptr(FUNCTION_PROTO_PARAMETERS)
#define SENDER_PROTO data_source_ptr ptr, void* param
#define RES_CHK_PROTO int type, bool wait, int to_ms, void* param

View File

@ -37,7 +37,7 @@ namespace devui
public:
pipe_reader(const char* fifo, std::function<void(uint8_t*, size_t)> h) : pipe_path_(fifo), handler_(h)
{
auto r = [this](void) -> void
auto r = [this](void*) -> void
{
int cycle = 0;
while(run_)
@ -56,7 +56,7 @@ namespace devui
}
};
worker_.start(r, SIZE_MB(1), "worker", (void*)&pipe_reader::worker);
worker_.start(r, nullptr, SIZE_MB(1), "worker", (void*)&pipe_reader::worker);
}
protected:
virtual ~pipe_reader()
@ -117,7 +117,7 @@ namespace devui
{
sent_que_.enable_wait_log(false);
auto r = [this](void) -> void
auto r = [this](void*) -> void
{
int cycle = 0;
while(run_)
@ -136,7 +136,7 @@ namespace devui
}
};
worker_.start(r, SIZE_MB(1), "worker", (void*)&pipe_sender::worker);
worker_.start(r, nullptr, SIZE_MB(1), "worker", (void*)&pipe_sender::worker);
}
protected:
virtual ~pipe_sender()
@ -205,16 +205,16 @@ namespace devui
}
else
{
auto r = [this](void) -> void
auto r = [this](void*) -> void
{
receiver();
};
auto s = [this](void) -> void
auto s = [this](void*) -> void
{
sender();
};
workers_.start(r, SIZE_MB(1), "receiver");
workers_.start(s, SIZE_MB(1), "sender");
workers_.start(r, nullptr, SIZE_MB(1), "receiver");
workers_.start(s, nullptr, SIZE_MB(1), "sender");
}
}
void close(void)

View File

@ -1529,6 +1529,21 @@ namespace utils
return ps;
}
int set_cpu_affinity(int cpuind, pthread_t thread)
{
int ret = 0;
if(thread == (pthread_t)-1)
thread = GetCurrentThreadId();
cpu_set_t cpu;
CPU_ZERO(&cpu);
CPU_SET(cpuind, &cpu);
ret = pthread_setaffinity_np(thread, sizeof(cpu), &cpu);
return ret;
}
std::string init_log(log_type type, log_level level, const char* fn_appendix)
{
@ -2501,13 +2516,18 @@ safe_thread::~safe_thread()
threads_.clear();
}
void safe_thread::thread_worker(std::function<void(void)> func, std::string name, void* addr)
void safe_thread::thread_worker(std::function<void(void*)> func, void* param, std::string name, void* addr)
{
EXCEPARAM ep;
ep.name = name;
ep.param = param;
printf("stack size of thread '%s': %u\n", name.c_str(), utils::get_stack_size());
try
{
utils::to_log(LOG_LEVEL_DEBUG, "+++ safe_thread of '%s(addr: %p) - id: %p' is running ...\n", name.c_str(), addr, GetCurrentThreadId());
func();
func(param);
utils::to_log(LOG_LEVEL_DEBUG, "--- safe_thread of '%s - %p' exited.\n", name.c_str(), GetCurrentThreadId());
return;
}
@ -2523,17 +2543,17 @@ void safe_thread::thread_worker(std::function<void(void)> func, std::string name
{
utils::to_log(LOG_LEVEL_FATAL, "Unknown exception in thread '%p - %s'!\n", GetCurrentThreadId(), name.c_str());
}
excep_que_.save(name, true);
excep_que_.save(ep, true);
}
void safe_thread::thread_notify_exception(void)
{
while(run_)
{
std::string name("");
if(excep_que_.take(name, true))
EXCEPARAM ep;
if(excep_que_.take(ep, true))
{
if(excep_handler_)
excep_handler_(name.c_str());
excep_handler_(ep.name.c_str(), ep.param);
}
}
}
@ -2545,22 +2565,23 @@ void*
safe_thread::raw_thread(void* lp)
{
safe_thread *obj = ((LPCRAWTHRD)lp)->obj;
std::function<void(void)> func = ((LPCRAWTHRD)lp)->func;
std::function<void(void*)> func = ((LPCRAWTHRD)lp)->func;
std::string name(((LPCRAWTHRD)lp)->name);
void* addr = ((LPCRAWTHRD)lp)->addr;
void* param = ((LPCRAWTHRD)lp)->param;
delete lp;
obj->thread_worker(func, name, addr);
obj->thread_worker(func, param, name, addr);
return 0;
}
void safe_thread::set_exception_handler(std::function<void(const char*)> on_exception)
void safe_thread::set_exception_handler(std::function<void(const char*, void*)> on_exception)
{
excep_handler_ = on_exception;
}
int safe_thread::start(std::function<void(void)> f, std::size_t stack, const char* thread_name, void* addr)
int safe_thread::start(std::function<void(void*)> f, void* param, std::size_t stack, const char* thread_name, void* addr)
{
SAFETHRD st;
@ -2568,18 +2589,19 @@ int safe_thread::start(std::function<void(void)> f, std::size_t stack, const cha
if(stack == 0)
{
st.raw = false;
st.thread.reset(new std::thread(&safe_thread::thread_worker, this, f, thread_name, addr));
st.thread.reset(new std::thread(&safe_thread::thread_worker, this, f, param, st.name, addr));
}
else
{
// st.thread.reset(new std::thread(std::thread(&safe_thread::thread_worker, this, f, thread_name, addr), std::move(stack)));
LPCRAWTHRD param = new CRAWTHRD;
LPCRAWTHRD para = new CRAWTHRD;
int err = 0;
param->obj = this;
param->func = f;
param->name = thread_name;
param->addr = addr;
para->obj = this;
para->func = f;
para->name = thread_name;
para->addr = addr;
para->param = param;
#if OS_WIN
st.thread_raw = CreateThread(NULL, stack, &safe_thread::raw_thread, param, 0, nullptr);
@ -2591,14 +2613,14 @@ int safe_thread::start(std::function<void(void)> f, std::size_t stack, const cha
pthread_attr_setstacksize(&attr, stack);
st.raw = true;
err = pthread_create(&st.thread_raw, &attr, &safe_thread::raw_thread, param);
err = pthread_create(&st.thread_raw, &attr, &safe_thread::raw_thread, para);
if(err)
err = errno;
pthread_attr_destroy(&attr);
#endif
if(err)
{
delete param;
delete para;
return err;
}
@ -2646,6 +2668,7 @@ int safe_thread::stop(const char* thread_name)
}
////////////////////////////////////////////////////////////////////////////////////////////////
// safe_file
safe_file::safe_file(const char* path)

View File

@ -17,6 +17,10 @@
#define USE_SAFE_THREAD
#define CPU_CORES 6
#define CPU_MAJOR_0 4
#define CPU_MAJOR_CNT 2
#define CPU_MINOR_0 0
enum log_type
{
@ -110,6 +114,7 @@ namespace utils
void print_memory_usage(const char* tips, bool to_log_file);
int get_disk_space(const char* path, unsigned long long* total, unsigned long long* avail, unsigned long long* block);
unsigned int get_page_size(unsigned int* map_unit = nullptr);
int set_cpu_affinity(int cpuind, pthread_t thread = -1);
// return logging file path if 'type' was LOG_TYPE_FILE
std::string init_log(log_type type, log_level level = LOG_LEVEL_ALL, const char* fn_appendix = nullptr/*appendix to default log-file-name*/);
@ -489,22 +494,28 @@ class safe_thread
std::shared_ptr<std::thread> thread; // valid when !raw
pthread_t thread_raw; // valid when raw
}SAFETHRD;
typedef struct _excep_param
{
std::string name;
void* param;
}EXCEPARAM, *LPEXCEPARAM;
volatile bool run_ = true;
MUTEX lock_;
std::unique_ptr<std::thread> notify_thread_;
std::vector<SAFETHRD> threads_;
safe_fifo<std::string> excep_que_;
std::function<void(const char*)> excep_handler_ = std::function<void(const char*)>();
safe_fifo<EXCEPARAM> excep_que_;
std::function<void(const char*, void*)> excep_handler_ = std::function<void(const char*, void*)>();
void thread_worker(std::function<void(void)> func, std::string name, void* addr);
void thread_worker(std::function<void(void*)> func, void* param, std::string name, void* addr);
void thread_notify_exception(void);
typedef struct _cr_raw_thread
{
safe_thread* obj;
std::function<void(void)> func;
std::function<void(void*)> func;
std::string name;
void* addr;
void* param;
}CRAWTHRD, *LPCRAWTHRD;
static
#if OS_WIN
@ -519,8 +530,8 @@ public:
~safe_thread();
public:
void set_exception_handler(std::function<void(const char*)> on_exception = std::function<void(const char*)>());
int start(std::function<void(void)> f, std::size_t stack = 0, const char* thread_name = nullptr, void* addr = nullptr);
void set_exception_handler(std::function<void(const char*, void*)> on_exception = std::function<void(const char*, void*)>());
int start(std::function<void(void*)> f, void* param = nullptr, std::size_t stack = 0, const char* thread_name = nullptr, void* addr = nullptr);
int stop(const char* thread_name);
};

View File

@ -29,6 +29,8 @@ typedef struct _proc_img_info_modules // 跨模块参数
uint8_t* data; // size = bytes_per_line * height
}PROCIIM, *LPPROCIIM;
#pragma pack(pop)
typedef std::shared_ptr<std::vector<char>> dcptr;
class image_processor : public sane_opt_provider
{
@ -50,6 +52,8 @@ public:
int get_version(void) { return ver_; }
int get_position(void) { return pos_; }
virtual image_processor* copy_weaker(void) = 0;
virtual int process(std::vector<PROCIMGINFO>& in, std::vector<PROCIMGINFO>& out) = 0;
// 跨模块图像处理接口。回调函数返回false则停止处理void*参数同 'param'。

View File

@ -446,7 +446,7 @@ ui_mgr::ui_mgr() : disp_data_("lcd-msg")
keyboard_.reset(new KeyMonitor(ke));
auto display = [this](void) -> void
auto display = [this](void*) -> void
{
#ifdef TEST_PLATFORM_EVENT
printf("\tdisplay thread starting, sleep 3000ms ...\n");
@ -459,7 +459,7 @@ ui_mgr::ui_mgr() : disp_data_("lcd-msg")
#ifdef TEST_PLATFORM_EVENT
to__.enable_log(false);
#endif
disp_thrd_.start(display, SIZE_MB(4), "thread_display");
disp_thrd_.start(display, nullptr, SIZE_MB(4), "thread_display");
#ifdef TEST_PLATFORM_EVENT
// display thread starting, sleep 3000ms ...