添加资源管理接口

This commit is contained in:
gb 2024-02-25 13:51:52 +08:00
parent 9d9c29aa7d
commit 6c2ba5166c
10 changed files with 226 additions and 10 deletions

View File

@ -458,6 +458,7 @@ void scanner_hw::thread_image_capture(bool paper_ready)
break;
}
img.prc_time = watch.elapse_ms();
mem = new dyn_mem_shared(frame, size, put_v4l2_mem, (void*)ind);
used_v4l2_mem++;
img.pos.status = hg_err_2_image_status(err);
@ -478,8 +479,15 @@ void scanner_hw::thread_image_capture(bool paper_ready)
}
}
watch.reset();
motor_->pick_paper();
if(res_(TASK_CAPTURER, true, 3000) && scanning_)
{
watch.reset();
motor_->pick_paper();
}
else
{
break;
}
}
printf("Scan turn finished with event(%d - 0x%08x).\n", mbev.first, mbev.second);
@ -711,7 +719,7 @@ void scanner_hw::enable(const char* name, bool able)
}
// operation ...
int scanner_hw::open(std::function<IMAGE_HANDLER_PROTO> image_handler, std::string* cfgjson, bool count_mode)
int scanner_hw::open(std::function<IMAGE_HANDLER_PROTO> image_handler, CHK_RES_FUNC res, std::string* cfgjson, bool count_mode)
{
std::string tips("");
refer_guard<gb_json> cfg(new gb_json());
@ -722,6 +730,16 @@ int scanner_hw::open(std::function<IMAGE_HANDLER_PROTO> image_handler, std::stri
if(!image_handler)
return SCANNER_ERR_INVALID_PARAMETER;
res_ = res;
if(!res_)
{
DECL_CHK_RES_FUNC(&, r)
{
return true;
};
res_ = r;
}
count_mode_ = count_mode;
img_handler_ = image_handler;
if(!count_mode_)

View File

@ -36,6 +36,7 @@ class FpgaComm;
class scanner_hw : public sane_opt_provider
{
std::function<IMAGE_HANDLER_PROTO> img_handler_ = std::function<IMAGE_HANDLER_PROTO>();
CHK_RES_FUNC res_ = CHK_RES_FUNC();
volatile bool scanning_ = false;
int time_to_exit_auto_scan_ = 60; // seconds
std::unique_ptr<FpgaComm> img_controller_;
@ -120,7 +121,7 @@ public:
// operation ...
public:
int open(std::function<IMAGE_HANDLER_PROTO> image_handler, std::string* cfgjson = nullptr, bool count_mode = false);
int open(std::function<IMAGE_HANDLER_PROTO> image_handler, CHK_RES_FUNC res, std::string* cfgjson = nullptr, bool count_mode = false);
int start_scan(void);
int stop_scan(bool from_ui = false);
int close(bool from_worker = false);

View File

@ -32,11 +32,22 @@ static std::string device_opt_json[] = {
imgproc_mgr::imgproc_mgr(std::function<void(data_source_ptr)> sender
, device_option* devopts
, CHK_RES_FUNC res
)
: img_sender_(sender), opts_(devopts), prc_que_("prcimg")
, res_(res)
{
ADD_THIS_JSON();
if(!res_)
{
DECL_CHK_RES_FUNC(&, r)
{
return true;
};
res_ = r;
}
if (opts_)
opts_->add_ref();
else
@ -103,6 +114,13 @@ void imgproc_mgr::thread_worker(void)
RAWIMG img;
while(run_)
{
while(!res_(TASK_IMG_PROCESSOR, true, 3000))
{
if(!run_)
break;
}
if(!run_)
break;
if(prc_que_.take(img, true))
{
add_busy_worker();

View File

@ -41,6 +41,8 @@ class imgproc_mgr : public sane_opt_provider
device_option* opts_;
safe_thread workers_;
safe_fifo<RAWIMG> prc_que_;
CHK_RES_FUNC res_ = CHK_RES_FUNC();
std::function<void(data_source_ptr)> img_sender_;
static bool sort_processor_by_pos(image_processor* l, image_processor* r);
@ -56,7 +58,7 @@ class imgproc_mgr : public sane_opt_provider
void send_image(std::vector<PROCIMGINFO>& imgs, bool clear_after_send);
public:
imgproc_mgr(std::function<void(data_source_ptr)> sender, device_option* devopts);
imgproc_mgr(std::function<void(data_source_ptr)> sender, device_option* devopts, CHK_RES_FUNC res = CHK_RES_FUNC());
protected:
virtual ~imgproc_mgr();

View File

@ -17,6 +17,7 @@
#include <sane_opt_json/user.h>
#include <base/ui.h>
#include <imgprc_mgr.h>
#include "res_mgr.h"
@ -122,6 +123,7 @@ async_scanner::async_scanner() : usb_(nullptr), cfg_mgr_(nullptr), scan_id_(0)
utils::init_log(LOG_TYPE_FILE);
utils::to_log(LOG_LEVEL_DEBUG, "System info: page-size = %u, mapping-page-size = %u, disk-cluster-size = %u, default-stack-size = %u.\n"
, global_info::page_size, global_info::page_map_size, global_info::cluster_size, global_info::stack_size);
res_mgr_.reset(new resource_mgr());
init();
auto bulk_handle = [&](dyn_mem_ptr data, uint32_t* used, packet_data_base_ptr* required) -> dyn_mem_ptr
@ -155,12 +157,17 @@ async_scanner::async_scanner() : usb_(nullptr), cfg_mgr_(nullptr), scan_id_(0)
if(img->ptr() && ((LPPACK_BASE)img->ptr())->cmd == PACK_CMD_SCAN_FINISHED_ROGER)
{
cis_->close();
res_mgr_->stop();
utils::print_memory_usage("Memory usage when finished scanning", false);
system("sudo cpufreq-set -g ondemand");
}
usb_->write_bulk(img);
};
DECL_CHK_RES_FUNC(this, res)
{
return res_mgr_->is_resource_enable((enum _task)task, wait, to_ms);
};
cfg_mgr_ = new device_option(true, user, on_log);
utils::to_log(LOG_LEVEL_DEBUG, "OPT - initializing ...\n");
@ -171,7 +178,7 @@ async_scanner::async_scanner() : usb_(nullptr), cfg_mgr_(nullptr), scan_id_(0)
cis_->set_value(SANE_OPT_NAME(DEVICE_MODEL), &cfg_mgr_->get_option_value(SANE_OPT_NAME(DEVICE_MODEL), SANE_ACTION_GET_VALUE)[0]);
cfg_mgr_->add(cis_);
cfg_mgr_->add(user_);
img_prcr_ = new imgproc_mgr(sender, cfg_mgr_);
img_prcr_ = new imgproc_mgr(sender, cfg_mgr_, res);
cfg_mgr_->add(img_prcr_);
utils::to_log(LOG_LEVEL_DEBUG, "OPT - initialized %u options.\n", cfg_mgr_->count());
@ -181,6 +188,9 @@ async_scanner::async_scanner() : usb_(nullptr), cfg_mgr_(nullptr), scan_id_(0)
async_scanner::~async_scanner()
{
res_mgr_->stop();
res_mgr_.reset(nullptr);
if(cis_)
{
cis_->stop_scan();
@ -293,7 +303,7 @@ void async_scanner::init(void)
bool auto_scan = true;
std::string prev(cfg_mgr_->get_option_value(SANE_OPT_NAME(WAIT_TO_SCAN), SANE_ACTION_GET_VALUE));
cis_->set_value(SANE_OPT_NAME(WAIT_TO_SCAN), &auto_scan);
cis_->open(receiver, nullptr, true);
cis_->open(receiver, CHK_RES_FUNC(), nullptr, true);
cis_->start_scan();
}
else if(devui::UI_CMD_STOP_SCAN == pack->msg)
@ -580,16 +590,21 @@ dyn_mem_ptr async_scanner::handle_scan_start(LPPACK_BASE pack, uint32_t* used, p
scan_id_ = pack->pack_id;
scan_err_ = 0;
reply_start_ = false;
res_mgr_->start();
auto receiver = [this](dyn_mem_ptr data, bool img, LPPACKIMAGE lpinfo) -> void
{
img_prcr_->process(lpinfo, data, img);
};
DECL_CHK_RES_FUNC(this, res)
{
return res_mgr_->is_resource_enable((enum _task)task, wait, to_ms);
};
utils::print_memory_usage("Memory usage before scanning", false);
*used = base_head_size;
img_prcr_->start_new_turn(scan_id_, session);
scan_err_ = cis_->open(receiver, &config);
scan_err_ = cis_->open(receiver, res, &config);
reply = dyn_mem::memory(base_head_size + config.length() + 2);
reply->set_len(base_head_size + config.length() + 2);
if(scan_err_ == 0)

View File

@ -19,6 +19,7 @@ class scanner_const_opts;
class scanner_hw;
class user_priv;
class imgproc_mgr;
class resource_mgr;
class async_scanner : public refer
{
@ -36,6 +37,7 @@ class async_scanner : public refer
uint32_t scan_err_;
volatile bool reply_start_;
int last_err_ = 0;
std::unique_ptr<resource_mgr> res_mgr_;
MUTEX fsender_;
std::vector<file_reader*> send_files_;

77
scanner/res_mgr.cpp Normal file
View File

@ -0,0 +1,77 @@
#include "res_mgr.h"
resource_mgr::resource_mgr()
{
thread_.reset(new std::thread(&resource_mgr::thread_monitor, this));
}
resource_mgr::~resource_mgr()
{
run_ = monitor_ = false;
if(thread_.get() && thread_->joinable())
thread_->join();
}
void resource_mgr::thread_monitor(void)
{
while(run_)
{
if(monitor_)
{
do
{
uint64_t now = 0;
if(utils::get_memory_usage(nullptr, &now, nullptr) == 0)
mem_now_ = now;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
} while (monitor_);
}
else
{
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}
}
std::string resource_mgr::task_type(_task task)
{
RETURN_ENUM_STR(task, TASK_NONE);
RETURN_ENUM_STR(task, TASK_EP0);
RETURN_ENUM_STR(task, TASK_BULK_IN);
RETURN_ENUM_STR(task, TASK_BULK_OUT);
RETURN_ENUM_STR(task, TASK_CAPTURER);
RETURN_ENUM_STR(task, TASK_IMG_PROCESSOR);
return std::move("Unk-task(" + std::to_string((int)task) + ")");
}
void resource_mgr::start(void)
{
monitor_ = true;
}
void resource_mgr::stop(void)
{
monitor_ = false;
}
void resource_mgr::set_memory_limit(uint64_t max_size)
{
mem_limit_ = max_size;
}
bool resource_mgr::is_resource_enable(_task task, bool wait, int to_ms)
{
if(wait && mem_now_ > mem_limit_)
{
chronograph watch;
utils::to_log(LOG_LEVEL_WARNING, "Resources(task '%s') have reached their maximum limit(%lld): %lld, wait now ...\n", resource_mgr::task_type(task).c_str(), mem_limit_, mem_now_);
printf("Resources(task '%s') have reached their maximum limit(%lld): %lld, wait now ...\n", resource_mgr::task_type(task).c_str(), mem_limit_, mem_now_);
while(monitor_ && watch.elapse_ms() < to_ms)
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
if(mem_now_ < mem_limit_)
break;
}
utils::to_log(LOG_LEVEL_WARNING, "Resources wait result: %lld\n", mem_now_);
}
return mem_now_ < mem_limit_;
}

68
scanner/res_mgr.h Normal file
View File

@ -0,0 +1,68 @@
// resource manager
//
// Date: 2024-02-25
//
// Resources in monitoring: memory
#pragma once
#include <base/utils.h>
#include <base/packet.h>
class resource_mgr
{
volatile bool run_ = true;
volatile bool monitor_ = false;
uint64_t mem_limit_ = SIZE_GB(2);
uint64_t mem_now_ = 0;
std::unique_ptr<std::thread> thread_;
void thread_monitor(void);
public:
resource_mgr();
~resource_mgr();
static std::string task_type(_task task);
public:
void start(void);
void stop(void);
void set_memory_limit(uint64_t max_size = SIZE_GB(2));
bool is_resource_enable(_task task, bool wait = false, int to_ms = 0);
};
// watch.reset();
// bool first = true;
// while(scanning_)
// {
// if(res_(TASK_CAPTURER, true, 3))
// {
// if(!first)
// {
// uint64_t now = 0;
// utils::get_memory_usage(nullptr, &now, nullptr);
// utils::to_log(LOG_LEVEL_DEBUG, "Resources OK: %lld\n", now);
// }
// break;
// }
// if(first)
// {
// uint64_t now = 0;
// first = false;
// utils::get_memory_usage(nullptr, &now, nullptr);
// utils::to_log(LOG_LEVEL_WARNING, "Resources have reached their maximum limit: %lld\n", now);
// }
// if(watch.elapse_s() >= 3)
// {
// uint64_t now = 0;
// utils::get_memory_usage(nullptr, &now, nullptr);
// utils::to_log(LOG_LEVEL_FATAL, "Resources is not enough(memory = %lld) for continue scanning, exit scanning now.\n", now);
// auto_scan_ = scanning_ = false;
// err = SCANNER_ERR_INSUFFICIENT_MEMORY;
// break;
// }
// std::this_thread::sleep_for(std::chrono::milliseconds(3));
// }

View File

@ -40,6 +40,12 @@
// protocol version, The first thing to do after connecting is to check whether the field is compatible !!!
#define PROTOCOL_VER MAKE_WORD(0, 1)
// resource manager callback
#define CHK_RES_FUNC std::function<bool(int, bool, int)>
#define DECL_CHK_RES_FUNC(where, n) \
auto n = [where](int task, bool wait, int to_ms) -> bool
// NOTE: All text transmitted by pack cmd is in UTF-8 format !!!
enum cancel_io
@ -64,6 +70,15 @@ enum woker_status
WORKER_STATUS_RESET, // in reset(close and reopen) process
WORKER_STATUS_WAIT_RESOURCE, // wait resource
};
enum _task
{
TASK_NONE = 0,
TASK_EP0,
TASK_BULK_IN,
TASK_BULK_OUT,
TASK_CAPTURER,
TASK_IMG_PROCESSOR,
};
enum packet_cmd
{

View File

@ -61,7 +61,7 @@ add_defines("BUILD_AS_DEVICE")
add_defines("VER_MAIN=2")
add_defines("VER_FAMILY=200")
add_defines("VER_DATE=20240225")
add_defines("VER_BUILD=9")
add_defines("VER_BUILD=18")
target("conf")
set_kind("phony")