diff --git a/sdk/base/packet.h b/sdk/base/packet.h index 36b38b4..50ea2d0 100644 --- a/sdk/base/packet.h +++ b/sdk/base/packet.h @@ -40,16 +40,17 @@ // NOTE: All text transmitted by pack cmd is in UTF-8 format !!! - +enum cancel_io +{ + CANCEL_IO_CANCEL = 0x0ca0cel, +}; enum ep0_req { - USB_REQ_EP0_GET_PROTO_VER = 100, // get protocol version (PROTOCOL_VER), req = me, ind = 0, val = 0, len = 2 - USB_REQ_EP0_GET_IO_SIZE, // get bulk size of IO buffer, req = me, ind = 0, val = 0, len = sizeof(uint32_t) + USB_REQ_EP0_GET_PEER_CONFIG = 100, // get protocol version (PROTOCOL_VER), req = me, ind = 0, val = 0, len = sizeof(PEERCFG) USB_REQ_EP0_GET_STATUS, // 获取各工作线程状态, return EP0REPLYSTATUS. req = me, ind = 0, val = 0, len = sizeof(EP0REPLYSTATUS) - USB_REQ_EP0_RESET_BULK, // 关闭并重新打开BULK端点, return error number (uint32_t). req = me, ind = 0, val = 0, len = sizeof(uint32_t) - USB_REQ_EP0_CANCEL_CMD, // 取消当前指令的继续执行(一般用于中止大数据的传输). req = me, ind = 0, val = 0, len = sizeof(uint32_t) * 2 [(uint32_t)cmd + (uint32_t)pack-id] + USB_REQ_EP0_CANCEL_IO, // 设置当前IO数据的有效性. req = me, ind = 0, val = 0, len = sizeof(uint32_t), discard IO data when data is CANCEL_IO_CANCEL + // work-flow: write control with 'CANCEL_IO_CANCEL', write bulk with 1 byte, write control with not 'CANCEL_IO_CANCEL' to restore USB_REQ_EP0_SET_ENCRYPT, // 设置加密方式, req = me, ind = 0, val = 0, len = sizeof(PACK_BASE) - USB_REQ_EP0_SET_BULK_BUFFER, // 设置bulk缓冲区大小系数, req = me, ind = coef, val = 0, len = 0 }; enum woker_status { @@ -255,6 +256,12 @@ typedef struct _ep0_reply uint32_t bytes_to_sent; // how many bytes data is waiting for be sent in one replying packet }EP0REPLYSTATUS, *LPEP0REPLYSTATUS; +typedef struct _peer_config +{ + uint16_t ver; // protocol version + uint32_t io_size; // IO buffer size +}PEERCFG, *LPPEERCFG; + typedef struct _pack_base // A piece of data has only one header { uint32_t enc_cmd : 2; // encrypting type, for 'cmd' diff --git a/usb/usb_io.cpp b/usb/usb_io.cpp index 126ea3f..9ff0b60 100644 --- a/usb/usb_io.cpp +++ b/usb/usb_io.cpp @@ -266,10 +266,11 @@ dyn_mem_ptr async_usb_gadget::handle_ctrl_setup(dyn_mem_ptr data) { switch (pev->u.setup.bRequest) { - case USB_REQ_EP0_GET_PROTO_VER: - reply = dyn_mem::memory(sizeof(short)); - *(short*)reply->ptr() = PROTOCOL_VER; - reply->set_len(sizeof(short)); + case USB_REQ_EP0_GET_PEER_CONFIG: + reply = dyn_mem::memory(sizeof(PEERCFG)); + ((LPPEERCFG)reply->ptr())->ver = PROTOCOL_VER; + ((LPPEERCFG)reply->ptr())->io_size = unit_out_; + reply->set_len(sizeof(PEERCFG)); break; case USB_REQ_EP0_GET_STATUS: reply = dyn_mem::memory(sizeof(struct _ep0_reply)); @@ -278,6 +279,7 @@ dyn_mem_ptr async_usb_gadget::handle_ctrl_setup(dyn_mem_ptr data) LPEP0REPLYSTATUS lps = (LPEP0REPLYSTATUS)reply->ptr(); lps->in_status = statu_in_; lps->out_status = statu_out_; + lps->task_cnt = cmd_que_.size(); lps->task_cmd = task_cmd_; lps->task_pack_id = task_packet_id_; lps->task_required_bytes = want_bytes_task_; @@ -286,13 +288,12 @@ dyn_mem_ptr async_usb_gadget::handle_ctrl_setup(dyn_mem_ptr data) utils::log_mem_info("threads status:", lps, sizeof(*lps), LOG_LEVEL_DEBUG); } break; - case USB_REQ_EP0_GET_IO_SIZE: - reply = dyn_mem::memory(sizeof(unit_out_)); - reply->put(&unit_out_, sizeof(unit_out_)); - break; - case USB_REQ_EP0_RESET_BULK: - reply = dyn_mem::memory(sizeof(uint32_t)); - reply->put(&err, sizeof(err)); + case USB_REQ_EP0_CANCEL_IO: + if (pev->u.setup.wLength == sizeof(uint32_t)) + { + uint32_t val = *(uint32_t*)&pev[1]; + cancel_io_ = (val == CANCEL_IO_CANCEL); + } break; case USB_REQ_EP0_SET_ENCRYPT: if (pev->u.setup.wLength == sizeof(PACK_BASE)) @@ -304,15 +305,6 @@ dyn_mem_ptr async_usb_gadget::handle_ctrl_setup(dyn_mem_ptr data) utils::to_log(LOG_LEVEL_DEBUG, "Set encrypting method: command - %d; payload - %d\n", enc_head_, enc_payload_); } break; - case USB_REQ_EP0_SET_BULK_BUFFER: - // if(pev->u.setup.wLength == sizeof(short)) - { - uint16_t pre = buf_coef_; - - set_buf_coefficient(pev->u.setup.wIndex); - utils::to_log(LOG_LEVEL_DEBUG, "Set bulk buffer size coefficient from %d to %d\n", pre, buf_coef_); - } - break; default: handled = false; } @@ -332,7 +324,7 @@ int async_usb_gadget::inner_write_bulk_memory(int fd, uint8_t* buf, uint32_t* le int w = 0, to = 0, err = 0, total = *len, off = 0, size = *len; - while(1) + while(!cancel_io_) { w = write(fd, buf + off, size); if(w == -1) @@ -491,6 +483,7 @@ void async_usb_gadget::thread_read_bulk(int fd) while(run_) { + statu_out_ = WORKER_STATUS_WAIT_RESOURCE; dyn_mem_ptr buf = get_io_buffer(); int l = 0; @@ -508,7 +501,12 @@ void async_usb_gadget::thread_read_bulk(int fd) buf->set_session_id(session_id_); l = read(fd, buf->ptr(), bulk_size); statu_out_ = WORKER_STATUS_IDLE; - if(l <= 0) + if (!run_) + { + buf->release(); + break; + } + else if(l <= 0) { free_io_buffer(buf); if(errno) @@ -522,16 +520,18 @@ void async_usb_gadget::thread_read_bulk(int fd) cnt_0++; } } - else if (!run_) + else { - utils::to_log(LOG_LEVEL_ALL, "thread_read_bulk do reset-bulk ...\n"); - buf->release(); - break; - } - else - { - buf->set_len(l); - cmd_que_.save(buf, true); + // if(!cancel_io_) // ensure to trigger task thread clean it's work + { + buf->set_len(l); + cmd_que_.save(buf, true); + } + // else + // { + // free_io_buffer(buf); + // utils::to_log(LOG_LEVEL_DEBUG, "Cancel read with %u bytes.\n", l); + // } } } statu_out_ = WORKER_STATUS_NOT_START; @@ -550,14 +550,21 @@ void async_usb_gadget::thread_write_bulk(int fd) if(sent_que_.take(data, true)) { want_bytes_in_ = data->get_rest(); - err = inner_write_bulk(fd, data, mem, bulk_size); + if(!cancel_io_) { - file_reader_ptr fr = dynamic_cast(data); - if(fr) + err = inner_write_bulk(fd, data, mem, bulk_size); { - utils::to_log(LOG_LEVEL_DEBUG, "Sent file(%s) with error: %d.\n", fr->path_file(), err); + file_reader_ptr fr = dynamic_cast(data); + if(fr) + { + utils::to_log(LOG_LEVEL_DEBUG, "Sent file(%s) with error: %d.\n", fr->path_file(), err); + } } } + else + { + utils::to_log(LOG_LEVEL_DEBUG, "Cancel write with %u bytes.\n", data->get_rest()); + } data->release(); if(err) { @@ -583,6 +590,7 @@ void async_usb_gadget::thread_pump_task(void) { data = nullptr; statu_task_ = WORKER_STATUS_IDLE; + task_cmd_ = 0; if(cmd_que_.take(data, true) && data) { bool pool = true; @@ -590,6 +598,32 @@ void async_usb_gadget::thread_pump_task(void) if(max_que < cmd_que_.size()) max_que = cmd_que_.size(); + if(cancel_io_) + { + if(prev) + { + utils::to_log(LOG_LEVEL_DEBUG, "Cancel task previous packet with %u bytes.\n", prev->get_rest()); + prev->release(); + prev = nullptr; + } + if(dh) + { + utils::to_log(LOG_LEVEL_DEBUG, "Cancel task holder need %u bytes.\n", dh->get_required()); + dh->cancel(); + dh->release(); + dh = nullptr; + } + if(reply) + { + reply->release(); + reply = nullptr; + } + want_bytes_task_ = 0; + utils::to_log(LOG_LEVEL_DEBUG, "Cancel task with %u bytes.\n", data->get_rest()); + data->release(); + continue; + } + if(prev) { if(prev->get_session_id() != data->get_session_id()) diff --git a/usb/usb_io.h b/usb/usb_io.h index 7ff2f30..b80f223 100644 --- a/usb/usb_io.h +++ b/usb/usb_io.h @@ -42,6 +42,7 @@ class usb_device; class async_usb_gadget : public refer { volatile bool run_ = true; + volatile bool cancel_io_ = false; usb_device *dev_ = nullptr; safe_thread threads_; size_t unit_in_ = 0x200;