add safe_thread

This commit is contained in:
gb 2023-11-24 14:24:11 +08:00
parent 5c87f8c2ad
commit d73e068724
13 changed files with 207 additions and 20 deletions

View File

@ -111,6 +111,7 @@ PACKIMAGE img_receiver::head(void)
hg_scanner::hg_scanner(ONLNSCANNER* dev, imgproc_mgr* imgproc, hguser* user, std::vector<sane_opt_provider*>* constopts)
: dev_(*dev), status_(SCANNER_ERR_OPENED_BY_OTHER_PROCESS)
, msg_(from_default_language("\350\256\276\345\244\207\345\267\262\347\273\217\350\242\253\350\277\233\347\250\213 '%s' \345\215\240\347\224\250"))
, raw_imgs_("img-usb"), final_imgs_("img-final")
{
singleton_ = hg_scanner::create_device_singleton(dev_.vid, dev_.pid, dev_.addr);
if (!singleton_->is_first())
@ -140,7 +141,15 @@ hg_scanner::hg_scanner(ONLNSCANNER* dev, imgproc_mgr* imgproc, hguser* user, std
for(auto& v: *constopts)
dev_opts_->add(v);
}
#ifdef USE_SAFE_THREAD
auto tf = [this](void*) -> void
{
thread_image_processor();
};
imgpr_thread_.start(tf, nullptr, "hg_scanner::thread_image_processor");
#else
imgpr_thread_.reset(new std::thread(&hg_scanner::thread_image_processor, this));
#endif
}
}
}
@ -224,6 +233,43 @@ void hg_scanner::thread_image_processor(void)
//processor->release();
}
int hg_scanner::set_value(const char* name, void* val)
{
int type = DATA_TYPE_BOOL,
val_size = 0,
ret = SCANNER_ERR_OK;
size_t size = 0;
std::string t(dev_opts_->get_option_value_type(name, &size));
uint8_t after = 0;
val_size = size;
if (t == JSON_SANE_TYPE_BOOL)
{
type = DATA_TYPE_BOOL;
}
else if (t == JSON_SANE_TYPE_INT)
{
type = DATA_TYPE_INT4;
}
else if (t == JSON_SANE_TYPE_FIXED)
{
type = DATA_TYPE_FLOAT;
}
else if (t == JSON_SANE_TYPE_STRING)
{
type = DATA_TYPE_STRING;
val_size = strlen((char*)val);
}
ret = scanner_->option_value_set(name, type, val, size, val_size, &after);
if (ret == 0)
{
if (after)
ret = SCANNER_ERR_RELOAD_OPT_PARAM;
}
return ret;
}
int hg_scanner::start(void)
{
int ret = SCANNER_ERR_OK;
@ -272,8 +318,10 @@ int hg_scanner::close(void)
// wait image thread ...
raw_imgs_.trigger();
#ifndef USE_SAFE_THREAD
if (imgpr_thread_.get() && imgpr_thread_->joinable())
imgpr_thread_->join();
#endif
if (imgproc_)
{

View File

@ -85,7 +85,11 @@ class hg_scanner : public sane_opt_provider
volatile bool run_ = true;
safe_fifo<img_receiver*> raw_imgs_;
safe_fifo<img_receiver*> final_imgs_;
#ifdef USE_SAFE_THREAD
safe_thread imgpr_thread_;
#else
std::unique_ptr<std::thread> imgpr_thread_;
#endif
bool online_ = true;
static shared_memory* create_device_singleton(int vid, int pid, int addr);
@ -99,6 +103,10 @@ public:
protected:
virtual ~hg_scanner();
// sane_opt_provider
public:
virtual int set_value(const char* name, void* val) override;
// scanner operation ...
public:
int start(void);

View File

@ -10,7 +10,11 @@
async_usb_host::async_usb_host(std::function<FUNCTION_PROTO_COMMAND_HANDLE> cmd_handler)
: handler_(cmd_handler), usb_dev_(nullptr), usb_handle_(nullptr), run_(true), cancel_write_(false), writing_(false)
, head_enc_type_(ENCRYPT_CMD_NONE), payload_enc_type_(ENCRYPT_NONE), enc_data_(0), buf_coef_(1)
, in_que_("usb-r"), out_que_("usb-w")
{
in_que_.enable_wait_log(false);
out_que_.enable_wait_log(false);
memset(&bulk_in_, -1, sizeof(bulk_in_));
memset(&bulk_out_, -1, sizeof(bulk_out_));
bulk_in_.claimed = bulk_out_.claimed = 0;
@ -145,6 +149,10 @@ int async_usb_host::stop(void)
data_source_ptr out = nullptr;
run_ = false;
if (usb_handle_)
libusb_close(usb_handle_);
stop_worker_threads();
in_que_.trigger();
while (in_que_.take(data))
@ -160,7 +168,6 @@ int async_usb_host::stop(void)
if(bulk_out_.claimed)
libusb_release_interface(usb_handle_, bulk_out_.iface);
libusb_close(usb_handle_);
usb_handle_ = nullptr;
}
if (usb_dev_)
@ -354,9 +361,27 @@ void async_usb_host::thread_pump_task(void)
}
void async_usb_host::create_worker_threads(void)
{
#ifdef USE_SAFE_THREAD
auto thread_w = [this](void*) -> void
{
thread_write_bulk();
};
auto thread_r = [this](void*) -> void
{
thread_read_bulk();
};
auto thread_p = [this](void*) -> void
{
thread_pump_task();
};
thread_w_.start(thread_w, nullptr, "async_usb_host::thread_write_bulk");
thread_r_.start(thread_r, nullptr, "async_usb_host::thread_read_bulk");
thread_p_.start(thread_p, nullptr, "async_usb_host::thread_pump_task");
#else
thread_w_.reset(new std::thread(&async_usb_host::thread_write_bulk, this));
thread_r_.reset(new std::thread(&async_usb_host::thread_read_bulk, this));
thread_p_.reset(new std::thread(&async_usb_host::thread_pump_task, this));
#endif
}
void async_usb_host::stop_worker_threads(void)
{
@ -367,9 +392,11 @@ void async_usb_host::stop_worker_threads(void)
out_que_.trigger();
in_que_.trigger();
#ifndef USE_SAFE_THREAD
WAIT_THREAD(thread_w_);
WAIT_THREAD(thread_r_);
WAIT_THREAD(thread_p_);
#endif
}
int async_usb_host::bulk_write_buf(uint8_t* buf, int* len)

View File

@ -41,9 +41,15 @@ class async_usb_host : public refer
USBEP bulk_out_;
safe_fifo<dyn_mem_ptr> in_que_;
safe_fifo<data_source_ptr> out_que_;
#ifdef USE_SAFE_THREAD
safe_thread thread_w_;
safe_thread thread_r_;
safe_thread thread_p_;
#else
std::unique_ptr<std::thread> thread_w_;
std::unique_ptr<std::thread> thread_r_;
std::unique_ptr<std::thread> thread_p_;
#endif
uint32_t head_enc_type_;
uint32_t payload_enc_type_;

View File

@ -19,7 +19,8 @@ cmd_result::cmd_result(std::function<int(cmd_result*)> call,
void* param, uint32_t id)
: wait_(new platform_event("wait_reply")), id_(id == 0 ? cmd_result::gen_pack_id() : id)
, call_(call), clean_(clean), roger_(roger), param_(param), err_(0), to_(1000), over_(false)
{
{
wait_->enable_log(false);
}
cmd_result::~cmd_result()
{

View File

@ -38,12 +38,9 @@ public:
set_where("language-option");
const char* lang = language_option_descriptor();
if (lang && *lang)
{
std::string t(lang);
std::string t(lang && *lang ? lang : "{}");
set_opt_json_text(&t[0]);
}
set_opt_json_text(&t[0]);
}
protected:

View File

@ -36,23 +36,32 @@ usb_manager::usb_manager() : run_(true)
status_ = SCANNER_ERR_USB_INIT_FAILED;
wait_pnp_.set_debug_info("Waiting PNP");
#ifdef USE_SAFE_THREAD
auto tf = [this](void*) -> void
{
thread_notify_usb_event();
};
usb_notify_thread_.start(tf, nullptr, "usb_manager::thread_notify_usb_event");
#else
if (!usb_notify_thread_.get())
{
run_ = true;
usb_notify_thread_.reset(new std::thread(&usb_manager::thread_notify_usb_event, this));
}
#endif
}
usb_manager::~usb_manager()
{
run_ = false;
wait_pnp_.trigger();
libusb_context* ctx = nullptr;
#if defined(WIN32) || defined(_WIN64)
#if OS_WIN
ctx = context_;
libusb_quit(ctx);
#endif
if(usb_cb_handle_)
libusb_hotplug_deregister_callback(ctx, usb_cb_handle_);
#ifndef USE_SAFE_THREAD
if (usb_monitor_thread_.get() && usb_monitor_thread_->joinable())
{
usb_monitor_thread_->join();
@ -63,6 +72,7 @@ usb_manager::~usb_manager()
usb_notify_thread_->join();
usb_notify_thread_.reset();
}
#endif
libusb_exit(context_);
utils::to_log(LOG_LEVEL_DEBUG, "usb_manager(%p) destroying and free context(%p)\n", this, context_);
}
@ -123,11 +133,18 @@ int usb_manager::register_usb_pnp(void)
}
void usb_manager::init_notify_thread()
{
#ifdef USE_SAFE_THREAD
auto tf = [this](void*) -> void
{
thread_trigger_usb_event();
};
usb_monitor_thread_.start(tf, nullptr, "usb_manager::thread_trigger_usb_event");
#else
if(!usb_monitor_thread_.get())
{
usb_monitor_thread_.reset(new std::thread(&usb_manager::thread_trigger_usb_event, this));
}
#endif
}
void usb_manager::fake_usb_pnp(std::vector<libusb_device*>& devices)
{

View File

@ -66,8 +66,13 @@ class usb_manager
libusb_context* context_; // declare my own context, avoid sharing the default context with other processes
int status_;
void* usb_cb_param_;
#ifdef USE_SAFE_THREAD
safe_thread usb_notify_thread_;
safe_thread usb_monitor_thread_; // some unknown reason, operation is accessible after certain delay
#else
std::shared_ptr<std::thread> usb_notify_thread_;
std::shared_ptr<std::thread> usb_monitor_thread_; // some unknown reason, operation is accessible after certain delay
#endif
libusb_hotplug_callback_handle usb_cb_handle_;
std::chrono::system_clock::time_point born_;

View File

@ -1575,7 +1575,8 @@ bool platform_event::wait(unsigned timeout)
{
bool waited = true;
utils::to_log(LOG_LEVEL_DEBUG, "platform_event(%p - %s) --> waiting...\n", this, dbg_info_.c_str());
if(log_)
utils::to_log(LOG_LEVEL_DEBUG, "platform_event(%p - %s) --> waiting...\n", this, dbg_info_.c_str());
waiting_ = true;
if (timeout == USB_TIMEOUT_INFINITE)
sem_wait(&sem_);
@ -1586,7 +1587,8 @@ bool platform_event::wait(unsigned timeout)
to.tv_nsec = (long)((timeout % 1000) * 1000 * 1000);
waited = sem_timedwait(&sem_, &to) == 0;
}
utils::to_log(LOG_LEVEL_DEBUG, "platform_event(%p - %s) --> %s.\n", this, dbg_info_.c_str(), waited ? "waited" : "wait timeout");
if (log_)
utils::to_log(LOG_LEVEL_DEBUG, "platform_event(%p - %s) --> %s.\n", this, dbg_info_.c_str(), waited ? "waited" : "wait timeout");
waiting_ = false;
return waited;
@ -1603,6 +1605,10 @@ void platform_event::set_debug_info(const char* info)
{
dbg_info_ = info ? info : "";
}
void platform_event::enable_log(bool enable)
{
log_ = enable;
}
@ -1833,3 +1839,44 @@ int shared_memory::write(const char* data, size_t len)
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// safe_thread
safe_thread::safe_thread() : name_("")
{}
safe_thread::~safe_thread()
{
if (thread_.get() && thread_->joinable())
thread_->join();
}
void safe_thread::thread_worker(std::function<void(void*)> func, void* param)
{
try
{
func(param);
}
catch (std::exception e)
{
utils::to_log(LOG_LEVEL_FATAL, "Exception in thread '%s': %s\n", name_.c_str(), e.what());
}
catch (...)
{
utils::to_log(LOG_LEVEL_FATAL, "Unknown exception in thread '%s'!\n", name_.c_str());
}
}
int safe_thread::start(std::function<void(void*)> f, void* param, const char* thread_name)
{
if (thread_.get() && thread_->joinable())
thread_->join();
name_ = thread_name ? thread_name : "";
thread_.reset(new std::thread(&safe_thread::thread_worker, this, f, param));
return 0;
}

View File

@ -15,6 +15,9 @@
#include <deque>
#include <functional>
#define USE_SAFE_THREAD
enum log_type
{
LOG_TYPE_NONE = 0, // no logging
@ -217,6 +220,7 @@ class platform_event : public refer
sem_t sem_;
volatile bool waiting_;
std::string dbg_info_;
bool log_ = true;
public:
platform_event(const char* info = "");
@ -227,6 +231,7 @@ public:
bool wait(unsigned timeout = USB_TIMEOUT_INFINITE/*ms*/); // USB_TIMEOUT_INFINITE is waiting unfinite, true when watied and false for wait timeout
void trigger(void);
bool is_waiting(void);
void enable_log(bool enable);
void set_debug_info(const char* info);
};
@ -271,7 +276,7 @@ class safe_fifo
platform_event* wait_;
public:
safe_fifo() : wait_(new platform_event("fifo"))
safe_fifo(const char* who) : wait_(new platform_event(who && *who ? who : "fifo"))
{}
~safe_fifo()
{
@ -329,6 +334,23 @@ public:
{
wait_->trigger();
}
void enable_wait_log(bool enable)
{
wait_->enable_log(enable);
}
};
class safe_thread
{
std::unique_ptr<std::thread> thread_;
std::string name_;
void thread_worker(std::function<void(void*)> func, void* param);
public:
safe_thread(void);
~safe_thread();
public:
int start(std::function<void(void*)> f, void* param, const char* thread_name);
};

View File

@ -91,10 +91,7 @@ char* sane_opt_provider::get_value(const char* name, void* value, size_t* size,
}
int sane_opt_provider::set_value(const char* name, void* val)
{
if (following_.count(name))
return following_[name]->set_value(name, val);
else
return SCANNER_ERR_DEVICE_NOT_SUPPORT;
return SCANNER_ERR_DEVICE_NOT_SUPPORT;
}
void sane_opt_provider::enable(const char* name, bool able)
{}

View File

@ -1903,7 +1903,7 @@ std::string device_option::get_name_by_sane_id(int sane_ind)
return std::move(value);
}
std::string device_option::get_option_value_type(const char* name)
std::string device_option::get_option_value_type(const char* name, size_t* size)
{
std::string value("");
gb_json* jsn = now_ ? now_ : origin_;
@ -1916,6 +1916,12 @@ std::string device_option::get_option_value_type(const char* name)
if (child)
{
child->get_value("type", value);
if (size)
{
int v = 0;
child->get_value("size", v);
*size = v;
}
child->release();
}
}
@ -1992,7 +1998,7 @@ std::string device_option::get_option_field_string(const char* name, const char*
return std::move(value);
}
std::string device_option::get_option_value_type(int sane_ind)
std::string device_option::get_option_value_type(int sane_ind, size_t* size)
{
std::string value("");
gb_json* jsn = now_ ? now_ : origin_;
@ -2001,6 +2007,12 @@ std::string device_option::get_option_value_type(int sane_ind)
{
gb_json* child = now_->child(sane_ind - 1);
child->get_value("type", value);
if (size)
{
int v = 0;
child->get_value("size", v);
*size = v;
}
child->release();
}

View File

@ -362,8 +362,8 @@ public:
int count(void); // return option count
bool is_auto_restore_default(const char* name);
std::string get_name_by_sane_id(int sane_ind);
std::string get_option_value_type(const char* name);
std::string get_option_value_type(int sane_ind);
std::string get_option_value_type(const char* name, size_t* size = nullptr);
std::string get_option_value_type(int sane_ind, size_t* size = nullptr);
std::string get_option_field_string(const char* name, const char* key);
std::string get_option_value(const char* name, int type/*OPT_VAL_xxx*/, int* size = nullptr, void* in_data = nullptr); // return whole json-text if name was null
std::string get_option_value(int sane_ind, int type/*OPT_VAL_xxx*/, int* size = nullptr, void* in_data = nullptr); // return whole json-text if name was null