fix IO endpoint bug; add session ID to control all tasks and IO

This commit is contained in:
gb 2023-12-02 10:15:44 +08:00
parent 575d2919cb
commit 4dc3d7ccc1
6 changed files with 129 additions and 32 deletions

View File

@ -42,8 +42,23 @@ int main()
int err = scanner->last_error();
if(err == 0)
while(1) {std::this_thread::sleep_for(std::chrono::milliseconds(2));}
{
while(1)
{
if(getchar() == 'e')
{
if(getchar() == 'x')
{
if(getchar() == 'i')
{
if(getchar() == 't')
break;
}
}
}
}
}
scanner->stop();
scanner->release();

View File

@ -30,6 +30,10 @@ void packet_data_base::set_packet_param(uint32_t cmd, uint32_t id)
pack_cmd_ = cmd;
pack_id_ = id;
}
void packet_data_base::set_session_id(uint32_t session_id)
{
session_id_ = session_id;
}
int packet_data_base::get_packet_command(void)
{
return pack_cmd_;
@ -38,6 +42,10 @@ int packet_data_base::get_packet_id(void)
{
return pack_id_;
}
uint32_t packet_data_base::get_session_id(void)
{
return session_id_;
}
void packet_data_base::set_progress_notify(PROGRESS_NOTIFYER notify, void* param)
{
@ -52,6 +60,9 @@ data_holder::data_holder()
data_holder::~data_holder()
{}
void data_holder::cancel(void)
{}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@ -216,7 +227,12 @@ uint32_t file_saver::get_required(void)
{
return size_ - wrote_;
}
void file_saver::cancel(void)
{
utils::to_log(LOG_LEVEL_DEBUG, "Discard receiving file (%u/%u): '%s'.\n", wrote_, size_, path_.c_str());
close();
remove(path_.c_str());
}

View File

@ -27,6 +27,7 @@ class packet_data_base : public refer
protected:
uint32_t pack_cmd_;
uint32_t pack_id_;
uint32_t session_id_ = -1;
public:
packet_data_base();
@ -37,8 +38,11 @@ protected:
public:
void set_packet_param(uint32_t cmd, uint32_t id);
void set_session_id(uint32_t session_id);
int get_packet_command(void);
int get_packet_id(void);
uint32_t get_session_id(void);
void set_progress_notify(PROGRESS_NOTIFYER notify = PROGRESS_NOTIFYER(), void* param = nullptr);
};
@ -58,6 +62,7 @@ public:
virtual int put_data(const void* data, uint32_t* size/*[in] - total bytes of data; [out] - used bytes*/) = 0; // return error code
virtual bool is_complete(void) = 0;
virtual uint32_t get_required(void) = 0;
virtual void cancel(void);
};
class mem_holder : public data_holder
@ -124,6 +129,7 @@ public:
virtual int put_data(const void* data, uint32_t* size/*[in] - total bytes of data; [out] - used bytes*/) override;
virtual bool is_complete(void) override;
virtual uint32_t get_required(void) override;
virtual void cancel(void) override;
};
@ -245,7 +251,7 @@ CLS_PTR(file_reader);
//
// when invalid packet, suggest use the entire data
//
// packet_data_base_ptr* - return data_holder or data_source or nullptr £¨The number of bytes required for this packet, 0 is over for this packet£©
// packet_data_base_ptr* - return data_holder or data_source or nullptr <EFBFBD><EFBFBD>The number of bytes required for this packet, 0 is over for this packet<65><74>
//
// data_holder: the packet/command need more data than dyn_mem_ptr provides to complete the business. such as 'write a large file'
//

View File

@ -1876,9 +1876,9 @@ void safe_thread::thread_worker(std::function<void(void)> func, std::string name
{
try
{
utils::to_log(LOG_LEVEL_DEBUG, "safe_thread of '%s' is running ...\n", name.c_str());
utils::to_log(LOG_LEVEL_DEBUG, "+++ safe_thread of '%s' is running ...\n", name.c_str());
func();
utils::to_log(LOG_LEVEL_DEBUG, "safe_thread of '%s' exited.\n", name.c_str());
utils::to_log(LOG_LEVEL_DEBUG, "--- safe_thread of '%s' exited.\n", name.c_str());
return;
}
catch (std::exception e)

View File

@ -19,6 +19,7 @@ async_usb_gadget::async_usb_gadget(std::function<FUNCTION_PROTO_COMMAND_HANDLE>
: handle_cmd_(cmd_handler), dev_connect_(dev_conn)
, enc_head_(ENCRYPT_CMD_NONE), enc_payload_(ENCRYPT_NONE), enc_data_(0)
, cmd_que_("in-queue"), sent_que_("out-queue")
, wait_in_("wait_usb_enable_in"), wait_out_("wait_usb_enable_out")
{
memset((void*)&status_, 0, sizeof(status_));
@ -43,24 +44,38 @@ async_usb_gadget::async_usb_gadget(std::function<FUNCTION_PROTO_COMMAND_HANDLE>
};
auto bulkw = [this](void) -> void
{
int fd = -1,
err = dev_->open_endpoint(EP_IND_BULK_IN, &fd);
if(err == 0)
while(run_)
{
thread_read_bulk(fd);
dev_->close_endpoint(EP_IND_BULK_IN);
wait_in_.wait();
if(!run_)
break;
int fd = -1,
err = dev_->open_endpoint(EP_IND_BULK_IN, &fd);
if(err == 0)
{
thread_write_bulk(fd);
dev_->close_endpoint(EP_IND_BULK_IN);
}
}
};
auto bulkr = [this](void) -> void
{
int fd = -1,
err = dev_->open_endpoint(EP_IND_BULK_OUT, &fd);
if(err == 0)
while(run_)
{
thread_write_bulk(fd);
dev_->close_endpoint(EP_IND_BULK_OUT);
wait_out_.wait();
if(!run_)
break;
int fd = -1,
err = dev_->open_endpoint(EP_IND_BULK_OUT, &fd);
if(err == 0)
{
thread_read_bulk(fd);
dev_->close_endpoint(EP_IND_BULK_OUT);
}
}
};
@ -135,11 +150,16 @@ dyn_mem_ptr async_usb_gadget::handle_ctrl_message(dyn_mem_ptr data)
{
case FUNCTIONFS_ENABLE:
utils::to_log(LOG_LEVEL_ALL, "EP0 FFS ENABLE\n");
online_ = true;
session_id_++;
wait_in_.trigger();
wait_out_.trigger();
if(dev_connect_)
dev_connect_(true);
break;
case FUNCTIONFS_DISABLE:
utils::to_log(LOG_LEVEL_ALL, "EP0 FFS DISABLE\n");
online_ = false;
if(dev_connect_)
dev_connect_(false);
break;
@ -405,13 +425,14 @@ void async_usb_gadget::thread_read_bulk(int fd)
int l = 0;
status_.out_status = BULK_STATUS_IO;
buf->set_session_id(session_id_);
l = read(fd, buf->ptr(), bulk_size);
if(l <= 0)
{
buf->release();
if(errno)
{
utils::to_log(LOG_LEVEL_ALL, "read bulk failed: %d(%s)\n", errno, strerror(errno));
utils::to_log(LOG_LEVEL_ALL, "read bulk(%d) failed: %d(%s)\n", l, errno, strerror(errno));
status_.out_status = BULK_STATUS_ERROR;
status_.out_err = errno;
break;
@ -485,19 +506,36 @@ void async_usb_gadget::thread_pump_task(void)
if(max_que < status_.task_cnt)
max_que = status_.task_cnt;
if (in_err_statu)
{
}
if(prev)
{
utils::to_log(LOG_LEVEL_ALL, "Combine partial packet with length %u and %u ...\n", prev->get_rest(), data->get_rest());
*prev += *data;
data->release();
data = prev;
if(prev->get_session_id() != data->get_session_id())
{
utils::to_log(LOG_LEVEL_DEBUG, "Discard previous packet for the session ID(%u) is not equal to now(%u).\n", prev->get_session_id(), data->get_session_id());
prev->release();
}
else
{
utils::to_log(LOG_LEVEL_ALL, "Combine partial packet with length %u and %u ...\n", prev->get_rest(), data->get_rest());
*prev += *data;
data->release();
data = prev;
}
prev = nullptr;
}
if(dh && dh->get_session_id() != data->get_session_id())
{
dh->cancel();
dh->release();
dh = nullptr;
}
if (!online_ || data->get_session_id() != session_id_)
{
utils::to_log(LOG_LEVEL_DEBUG, "Discard task for session ID(%u) is not equal now(%u) or is offline.\n", data->get_session_id(), session_id_);
data->release();
continue;
}
do
{
packet_data_base_ptr pack_data = nullptr;
@ -506,7 +544,7 @@ void async_usb_gadget::thread_pump_task(void)
if(dh == nullptr)
{
if (data->get_rest() < sizeof(PACK_BASE))
if (!online_ || data->get_rest() < sizeof(PACK_BASE))
break;
else
{
@ -519,6 +557,7 @@ void async_usb_gadget::thread_pump_task(void)
used &= INT32_MAX;
if(pack_data)
{
pack_data->set_session_id(data->get_session_id());
dh = dynamic_cast<data_holder_ptr>(pack_data);
if(!dh)
{
@ -544,6 +583,7 @@ void async_usb_gadget::thread_pump_task(void)
if(dh->is_complete() || err)
{
reply = dyn_mem::memory(sizeof(PACK_BASE));
reply->set_session_id(dh->get_session_id());
pack = (LPPACK_BASE)reply->ptr();
BASE_PACKET_REPLY(*pack, dh->get_packet_command() + 1, dh->get_packet_id(), err);
reply->set_len(sizeof(PACK_BASE));
@ -615,6 +655,11 @@ int async_usb_gadget::stop(void)
{
run_ = false;
wait_in_.trigger();
wait_out_.trigger();
cmd_que_.trigger();
sent_que_.trigger();
if(dev_)
{
dev_->pull_down();
@ -627,12 +672,22 @@ int async_usb_gadget::stop(void)
}
int async_usb_gadget::write_bulk(data_source_ptr data)
{
int quel = 0;
if(data->get_session_id() != session_id_ || !online_)
{
utils::to_log(LOG_LEVEL_DEBUG, "Discard packet for the session ID(%u) is not equal current session(%u) or is offline.\n", data->get_session_id(), session_id_);
data->release();
data->add_ref();
quel = sent_que_.save(data, true);
return sent_que_.size();
}
else
{
int quel = 0;
return quel;
data->add_ref();
quel = sent_que_.save(data, true);
return quel;
}
}
int async_usb_gadget::last_error(void)
{

View File

@ -49,6 +49,11 @@ class async_usb_gadget : public refer
uint16_t buf_coef_ = 1;
MUTEX buf_coef_lock_;
volatile bool online_ = false;
volatile uint32_t session_id_ = 0;
platform_event wait_in_;
platform_event wait_out_;
uint32_t enc_head_;
uint32_t enc_payload_;
uint8_t enc_data_;