base, shared memory IPC

This commit is contained in:
gb 2022-12-05 16:03:17 +08:00
commit 6d1f12e55a
14 changed files with 2428 additions and 0 deletions

255
common/event_monitor.cpp Normal file
View File

@ -0,0 +1,255 @@
#include "event_monitor.h"
#include <fcntl.h> /* nonblocking */
#include <sys/resource.h> /*setrlimit */
#include <sys/eventfd.h>
#include <sys/sysinfo.h>
#include <sys/ipc.h>
#include <sys/ioctl.h>
#include <sys/epoll.h>
#include <sys/types.h>
#include <unistd.h>
#include <vector>
#include <string.h>
#include "log_util.h"
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// event_handler
event_handler::event_handler()
{}
event_handler::~event_handler()
{}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// parent_holder
parent_holder::parent_holder()
{}
parent_holder::~parent_holder()
{}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// linux - epoll wrapper ...
class epoll_wrapper : public event_monitor
{
int32_t epoll_fd_;
int32_t quit_fd_[2];
volatile bool run_;
std::vector<THREAD_PTR> threads_;
static int32_t epoll_max;
void clear_threads(void)
{
if(threads_.size())
{
for (size_t i = 0; i < threads_.size(); ++i)
{
if (threads_[i]->joinable())
threads_[i]->join();
threads_[i].reset();
}
threads_.clear();
}
}
void close_monitor_fd(void)
{
if (epoll_fd_ != -1)
close(epoll_fd_);
epoll_fd_ = -1;
}
void clear(void)
{
clear_threads();
close_monitor_fd();
}
void monitor_thread(void)
{
log_cls::log(LOG_LEVEL_DEBUG, "monitor thread(%p) of object(%p) is working ...\n", gettid(), this);
while (run_)
{
struct epoll_event evs;
memset(&evs, 0, sizeof(evs));
if (epoll_wait(epoll_fd_, &evs, 1, -1) == -1)
continue;
if (evs.events == EPOLLOUT && evs.data.fd == quit_fd_[1])
break;
if (evs.events == EPOLLIN)
((event_handler*)evs.data.ptr)->on_event(event_handler::EVENT_READ, nullptr, 0);
else if (evs.events == EPOLLOUT)
((event_handler*)evs.data.ptr)->on_event(event_handler::EVENT_WRITE, nullptr, 0);
else
;
}
log_cls::log(LOG_LEVEL_DEBUG, "monitor thread(%p) of object(%p) finished working.\n", gettid(), this);
}
protected:
virtual ~epoll_wrapper()
{
stop();
}
public:
epoll_wrapper(const char* desc) : event_monitor(desc), epoll_fd_(-1), run_(true)
{
memset(quit_fd_, -1, sizeof(quit_fd_));
}
public:
virtual int32_t start(int32_t threads = 1) override
{
int32_t ret = stop();
struct rlimit rt;
rt.rlim_max = rt.rlim_cur = epoll_wrapper::epoll_max;
if (ret == 0 && setrlimit(RLIMIT_NOFILE, &rt) == 0)
{
epoll_fd_ = epoll_create(epoll_wrapper::epoll_max);
if (epoll_fd_ == -1)
{
ret = errno;
log_cls::log(LOG_LEVEL_FATAL, "epoll_create for '%s' failed: %s\n", desc_.c_str(), strerror(ret));
}
else
{
run_ = true;
for (size_t i = 0; i < (size_t)threads; ++i)
{
THREAD_PTR t;//(new std::thread(&epoll_wrapper::monitor_thread, this));
t.reset(new std::thread(&epoll_wrapper::monitor_thread, this));
threads_.push_back(t);
}
}
}
else
{
log_cls::log(LOG_LEVEL_FATAL, "setrlimit for '%s-epoll' failed: %s\n", desc_.c_str(), strerror(ret));
}
return ret;
}
virtual int32_t stop(void) override
{
if(threads_.size())
{
struct epoll_event ev;
#ifdef __USE_GNU
pipe2(quit_fd_, O_NONBLOCK);
#else
pipe(quit_fd_);
#endif
log_cls::log(LOG_LEVEL_DEBUG, "quit fd[0] = %p, fd[1] = %p\n", quit_fd_[0], quit_fd_[1]);
run_ = false;
ev.data.fd = quit_fd_[1];
ev.events = EPOLLOUT | EPOLLET | EPOLLONESHOT;
for(size_t i = 0; i < threads_.size(); ++i)
epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, ev.data.fd, &ev);
clear_threads();
close(quit_fd_[0]);
close(quit_fd_[1]);
memset(quit_fd_, -1, sizeof(quit_fd_));
}
close_monitor_fd();
return 0;
}
virtual int32_t add_fd(event_handler* handler) override
{
struct epoll_event ev;
int32_t ret = -1;
if (!handler || handler->get_fd() == -1)
return EINVAL;
if (epoll_fd_ == -1)
return EFAULT;
handler->add_ref(); // add ref for epoll_event holder ...
ev.data.ptr = handler;
ev.events = EPOLLIN | EPOLLOUT | EPOLLET; // EPOLLONESHOT | EPOLLHUP
ret = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, handler->get_fd(), &ev);
if (ret == -1)
{
ret = errno;
log_cls::log(LOG_LEVEL_FATAL, "add fd(%d) to %s-epoll failed: %s\n", handler->get_fd(), desc_.c_str(), strerror(ret));
handler->release();
}
return ret;
}
virtual int32_t remove_fd(event_handler* handler) override
{
struct epoll_event ev;
int32_t ret = -1;
if (!handler || handler->get_fd() == -1)
return EINVAL;
if (epoll_fd_ == -1)
return EFAULT;
ev.data.ptr = handler;
ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLHUP; // EPOLLONESHOT
ret = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, handler->get_fd(), &ev);
if (ret == 0)
{
// ENOENT returned if object 'handler' has not registered, so we can free it here when success in EPOLL_CTL_DEL ...
handler->release();
}
else
{
ret = errno;
log_cls::log(LOG_LEVEL_FATAL, "remove fd(%d) from %s-epoll failed: %s\n", handler->get_fd(), desc_.c_str(), strerror(ret));
}
return ret;
}
};
int32_t epoll_wrapper::epoll_max = 10;
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// event_handler
event_monitor::event_monitor(const char* desc) : desc_(desc ? desc : "")
{
log_cls::log(LOG_LEVEL_DEBUG, "+event_monitor(%p) of '%s' contructing ...\n", this, desc_.c_str());
}
event_monitor::~event_monitor()
{
log_cls::log(LOG_LEVEL_DEBUG, "-event_monitor(%p) of '%s' destroyed\n", this, desc_.c_str());
}
event_monitor* event_monitor::create(const char* desc, int32_t type)
{
if (type == EV_TYPE_EPOLL)
return dynamic_cast<event_monitor*>(new epoll_wrapper(desc));
else
return nullptr;
}

70
common/event_monitor.h Normal file
View File

@ -0,0 +1,70 @@
#pragma once
// event monitor
//
// created on 2022-11-29
//
#include "referer.h"
#include <thread>
#include <memory>
typedef std::shared_ptr<std::thread> THREAD_PTR;
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// object event_handler
//
// derived from 'event_handler' if your class will handle some events of drived by events
//
class event_handler : public refer
{
protected:
event_handler();
virtual ~event_handler();
public:
enum
{
EVENT_NONE = 0,
EVENT_READ, // indicates that the 'fd' is readable, you should perform 'read' if data was null
EVENT_WRITE, // indicates that the 'fd' is writeable, data in 'data' has sent 'data_len' bytes, in async-write only
};
virtual int32_t on_event(int32_t ev, void* data, size_t data_len) = 0;
virtual int32_t get_fd(void) = 0;
};
class parent_holder : public refer
{
protected:
parent_holder();
virtual ~parent_holder();
public:
virtual int32_t stop(void) = 0; // stop work and release parent ptr
};
// event_monitor to manage an event-driven model, this will trigger EVENT_READ/EVENT_WRITE events to 'handler'
//
class event_monitor : public refer
{
protected:
std::string desc_;
protected:
event_monitor(const char* desc);
virtual ~event_monitor();
public:
virtual int32_t start(int32_t threads = 1) = 0;
virtual int32_t stop(void) = 0;
virtual int32_t add_fd(event_handler* handler) = 0;
virtual int32_t remove_fd(event_handler* handler) = 0;
enum
{
EV_TYPE_POLL = 1,
EV_TYPE_EPOLL,
};
static event_monitor* create(const char* desc, int32_t type = EV_TYPE_EPOLL);
};

417
common/ipc_util.cpp Normal file
View File

@ -0,0 +1,417 @@
#include "ipc_util.h"
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <fcntl.h>
#include <string.h>
#include "log_util.h"
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// linux_event
unsigned long linux_event::to_abs_time_us = 0;
linux_event::linux_event(const char* desc) : waiting_(false), sem_(nullptr), desc_(desc ? desc : ""), first_(true), multi_proc_(false)
{
log_cls::log(LOG_LEVEL_ALL, "+linux_event(%p) unamed for '%s' constructing ...\n", this, desc_.c_str());
err_ = sem_init(&local_sem_, 0, 0);
if (err_ == -1)
{
err_ = errno;
log_cls::log(LOG_LEVEL_FATAL, " %p: sem_init = %s\n", this, strerror(err_));
}
else
{
sem_ = &local_sem_;
}
}
linux_event::linux_event(const char* name, const char* desc) : waiting_(false), sem_(nullptr), desc_(desc ? desc : ""), first_(true), multi_proc_(true)
{
log_cls::log(LOG_LEVEL_ALL, "+linux_event(%p) of named '%s' for '%s' constructing ...\n", this, name, desc_.c_str());
sem_ = sem_open(name, O_CREAT | O_EXCL, 0777, 0);
if (sem_ == (sem_t*)SEM_FAILED)
{
sem_ = nullptr;
err_ = errno;
log_cls::log(LOG_LEVEL_FATAL, " %p: sem_open(O_CREAT | O_EXCL) = %s\n", this, strerror(err_));
if(err_ = EEXIST)
{
sem_ = sem_open(name, 0666);
if (sem_ == (sem_t*)SEM_FAILED)
{
err_ = errno;
sem_ = nullptr;
log_cls::log(LOG_LEVEL_FATAL, " %p: sem_open = %s\n", this, strerror(err_));
}
else
{
err_ = 0;
first_ = false;
}
}
}
else
{
name_ = name;
log_cls::log(LOG_LEVEL_DEBUG, " %p: created named sem OK.\n", this);
err_ = sem_init(sem_, 1, 0); // this is used to initialize the event count, whether named or unamed
if (err_ == -1)
{
err_ = errno;
log_cls::log(LOG_LEVEL_FATAL, " %p: sem_init = %s\n", this, strerror(err_));
sem_close(sem_);
sem_ = nullptr;
sem_unlink(name);
}
}
}
linux_event::linux_event(sem_t* mem_sem, bool first, const char* desc) : waiting_(false), sem_(mem_sem), desc_(desc ? desc : ""), first_(first), multi_proc_(true)
{
log_cls::log(LOG_LEVEL_ALL, "+linux_event(%p) at mem(%p) for '%s' constructing ...\n", this, mem_sem, desc_.c_str());
if(first)
{
err_ = sem_init(sem_, 1, 0);
if (err_ == -1)
{
err_ = errno;
log_cls::log(LOG_LEVEL_FATAL, " %p: sem_init = %s\n", this, strerror(err_));
}
}
}
linux_event::~linux_event()
{
if (sem_)
{
char ptr[40] = {0};
std::string tips("");
sprintf(ptr, " ~%p: ", this);
tips = ptr;
if(sem_ == &local_sem_ || (first_ && name_.empty()))
{
err_ = log_cls::log_when_err(sem_destroy(sem_), (tips + "sem_destroy").c_str());
}
else
{
err_ = log_cls::log_when_err(sem_close(sem_), (tips + "sem_close").c_str());
// else // why not else ? we should ensure delete the kernel object when unused.
if(!name_.empty()) // i am the named object owner !
{
err_ = log_cls::log_when_err(sem_unlink(name_.c_str()), (tips + "sem_unlink").c_str(), LOG_LEVEL_FATAL); // This will cause previously opened objects to never receive events, even if you reopen it.
}
}
}
log_cls::log(LOG_LEVEL_ALL, "-linux_event(%p) destroyed.\n", this);
}
int32_t linux_event::clear_named_event(const char* name)
{
sem_t* sem = sem_open(name, O_CREAT | O_EXCL, 0777, 0);
int32_t err = 0;
if(sem == (sem_t*)SEM_FAILED)
{
if(errno == EEXIST)
{
err = sem_unlink(name);
if(err == -1)
err = errno;
}
else
{
return 0;
}
}
else
{
sem_close(sem);
sem_unlink(name);
}
return err;
}
void linux_event::reset_calc_abs_time(unsigned us)
{
if(us == -1)
{
struct timeval now = {0}, after = {0};
struct timespec abst = {0};
linux_event::to_abs_time_us = 10;
if(chronograph::now(&now))
{
abst.tv_sec = now.tv_sec;
abst.tv_nsec = USEC_2_NS(now.tv_usec);
abst.tv_nsec += MSEC_2_NS(1) + linux_event::to_abs_time_us;
// overflow ...
abst.tv_sec += abst.tv_nsec / SEC_2_NS(1);
abst.tv_nsec %= SEC_2_NS(1);
if(chronograph::now(&after))
{
if(after.tv_usec > now.tv_usec)
linux_event::to_abs_time_us = after.tv_usec - now.tv_usec;
else
linux_event::to_abs_time_us = SEC_2_US(1) + after.tv_usec - now.tv_usec;
}
}
}
else
{
linux_event::to_abs_time_us = us;
}
}
bool linux_event::abs_time_after(struct timespec* abstm, unsigned ms)
{
struct timeval now = {0};
if(!chronograph::now(&now))
{
log_cls::log(LOG_LEVEL_FATAL, "gettimeofday faied: %s\n!", strerror(errno));
time(&now.tv_sec);
}
abstm->tv_sec = now.tv_sec;
abstm->tv_nsec = USEC_2_NS(now.tv_usec);
abstm->tv_nsec += MSEC_2_NS(ms) + USEC_2_NS(linux_event::to_abs_time_us);
// overflow ...
abstm->tv_sec += abstm->tv_nsec / SEC_2_NS(1);
abstm->tv_nsec %= SEC_2_NS(1);
return true;
}
bool linux_event::is_ready(void)
{
return sem_ != nullptr;
}
bool linux_event::is_named_first(void)
{
return !name_.empty();
}
bool linux_event::wait_try(void)
{
return sem_trywait(sem_) == 0;
}
bool linux_event::wait(unsigned timeout)
{
bool waited = true;
log_cls::log(LOG_LEVEL_ALL, "linux_event(%p): waiting(%u) ...\n", this, timeout);
waiting_ = true;
if (timeout == WAIT_INFINITE)
{
sem_wait(sem_);
}
else
{
struct timespec to = {0};
linux_event::abs_time_after(&to, timeout);
waited = sem_timedwait(sem_, &to) == 0;
}
waiting_ = false;
log_cls::log(LOG_LEVEL_ALL, "linux_event(%p): waited(%u) = %d\n", this, timeout, waited);
return waited;
}
void linux_event::trigger(void)
{
err_ = sem_post(sem_);
if(err_)
err_ = errno;
log_cls::log(LOG_LEVEL_ALL, "linux_event(%p): trigger = %s\n", this, err_ == 0 ? "OK" : strerror(errno));
}
void linux_event::reset(void)
{
err_ = sem_init(sem_, multi_proc_, 0);
if(err_)
err_ = errno;
log_cls::log(LOG_LEVEL_ALL, "linux_event(%p): reset = %s\n", this, err_ == 0 ? "OK" : strerror(errno));
}
bool linux_event::is_waiting(void)
{
return waiting_;
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// shared_mem
uint64_t shared_mem::mem_total_ = 0;
uint64_t shared_mem::page_unit_ = 0;
uint64_t shared_mem::huge_page_unit_ = 0;
shared_mem::shared_mem() : name_(""), id_(0), size_(0), shm_buf_(shared_mem::invalid_map_addr()), first_(false), shm_id_(-1)
{
if (shared_mem::page_unit_ == 0)
shared_mem::init_page_info();
log_cls::log(LOG_LEVEL_ALL, "+shared_mem(%p) constructed, page size = %ld\n", this, page_unit_);
}
shared_mem::~shared_mem()
{
close();
log_cls::log(LOG_LEVEL_ALL, "-shared_mem(%p) destroyed.\n", this);
}
void shared_mem::init_page_info(void)
{
shared_mem::page_unit_ = sys_util::get_page_size();
// Hugepagesize: 2048 kB
if(sys_util::get_inf_file_data("/proc/meminfo", 80, "MemTotal: %ld", &shared_mem::mem_total_))
shared_mem::mem_total_ *= 1024;
if(sys_util::get_inf_file_data("/proc/meminfo", 80, "Hugepagesize: %ld", &shared_mem::huge_page_unit_))
shared_mem::huge_page_unit_ *= 1024;
log_cls::log(LOG_LEVEL_DEBUG, "TotalMemory: %s, system page size: %s, huge page size %s\n"
, sys_util::format_readable_bytes(shared_mem::mem_total_).c_str()
, sys_util::format_readable_bytes(shared_mem::page_unit_).c_str()
, sys_util::format_readable_bytes(shared_mem::huge_page_unit_).c_str());
}
char* shared_mem::invalid_map_addr(void)
{
return (char*)-1;
}
int32_t shared_mem::open(int32_t id, size_t* bytes, const char* name)
{
key_t key = (key_t)-1; // ftok(name, id);
int32_t ret = close();
size_t size = bytes ? *bytes : 0;
std::string pe("");
if (ret)
return ret;
if(!name || *name == 0)
{
pe = sys_util::get_module_path();
name = pe.c_str();
}
key = ftok(name, id);
if (key == (key_t)-1)
{
log_cls::log(LOG_LEVEL_FATAL, "shared_memory(%p): ftok('%s', %d) = %s\n", this, name, id, strerror(errno));
return errno;
}
size = ALIGN_INT(size, shared_mem::page_unit_);
if (!bytes || *bytes != size)
{
log_cls::log(LOG_LEVEL_DEBUG, "add %ld upto multiple of page size: %ld\n", bytes ? *bytes : 0, size);
if (bytes)
*bytes = size;
}
shm_id_ = shmget(key, size, IPC_EXCL | IPC_CREAT | 0600);
if (shm_id_ == -1)
{
ret = errno;
log_cls::log(LOG_LEVEL_WARNING, "%p: create shared memory('%s', %d) failed: %s\n", this, name, id, strerror(ret));
if (ret == EEXIST)
{
shm_id_ = shmget(key, size, 0600);
if (shm_id_ == -1)
{
ret = errno;
log_cls::log(LOG_LEVEL_WARNING, "%p: open shared memory('%s', %d) failed: %s\n", this, name, id, strerror(ret));
}
else
{
ret = 0;
first_ = false;
}
}
}
else // i created the shared memory ...
{
first_ = true;
}
if (ret == 0)
{
shm_buf_ = (char*)shmat(shm_id_, nullptr, 0);
if (shm_buf_ == shared_mem::invalid_map_addr())
{
ret = errno;
log_cls::log(LOG_LEVEL_WARNING, "%p: shmat failed: %s\n", this, strerror(ret));
close();
}
else
{
log_cls::log(LOG_LEVEL_DEBUG, "%p: %s shared memory('%s', %d) at %p(+%s) OK.\n", this, first_ ? "create" : "open", name, id, shm_buf_, sys_util::format_readable_bytes(size).c_str());
name_ = name;
id_ = id;
size_ = size;
}
}
return ret;
}
int32_t shared_mem::close(void)
{
int32_t ret = 0;
if (shm_buf_ != shared_mem::invalid_map_addr())
{
ret = log_cls::log_when_err(shmdt(shm_buf_), "shmdt");
if (ret)
return ret;
shm_buf_ = shared_mem::invalid_map_addr();
}
if (first_ && shm_id_ >= 0)
{
ret = log_cls::log_when_err(shmctl(shm_id_, IPC_RMID, nullptr), "shmctrl(IPC_RMID)");
if (ret)
{
// re-map buffer ...
shm_buf_ = (char*)shmat(shm_id_, nullptr, 0);
return ret;
}
shm_id_ = -1;
}
name_ = "";
id_ = 0;
first_ = false;
size_ = 0;
return ret;
}
char* shared_mem::get_mem(size_t* size)
{
if (size)
*size = size_;
return shm_buf_ == shared_mem::invalid_map_addr() ? nullptr : shm_buf_;
}
bool shared_mem::is_first(void)
{
return first_;
}
void shared_mem::clear_kernel_object(void)
{
log_cls::log_when_err(shmctl(shm_id_, IPC_RMID, nullptr), "shmctrl(IPC_RMID)");
}

134
common/ipc_util.h Normal file
View File

@ -0,0 +1,134 @@
#pragma once
// IPC utility
//
// created on 2022-11-29
//
#include "referer.h"
#include <semaphore.h>
#include <deque>
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// object event
//
class linux_event : public refer
{
int32_t err_;
sem_t local_sem_;
sem_t *sem_;
bool first_;
bool multi_proc_;
volatile bool waiting_;
std::string desc_;
std::string name_;
static unsigned long to_abs_time_us; // used in sem_timedwait to calculate absolute time from elapse
public:
linux_event(const char* desc); // to initialize a in-process object
linux_event(const char* name, const char* desc); // to initialize a multi-process object
linux_event(sem_t* mem_sem, bool first/*invoke sem_init when true*/, const char* desc); // to initialize an event at given memory (shared memory)
static int32_t clear_named_event(const char* name);
static void reset_calc_abs_time(unsigned us = -1); // reset 'to_abs_time_us' value, -1 is for re-calc
static bool abs_time_after(struct timespec* abstm, unsigned ms);
protected:
~linux_event();
public:
bool is_ready(void);
bool is_named_first(void); // whether I created the named event
bool wait_try(void);
bool wait(unsigned timeout = WAIT_INFINITE/*ms*/); // WAIT_INFINITE is waiting unfinite, true when watied and false for wait timeout
void trigger(void);
void reset(void); // re-initialize. DANGEROUS !!! all wait operation before will not receive any event after this !!!
bool is_waiting(void);
};
template<class T>
class safe_fifo
{
MUTEX lock_;
std::deque<T> que_;
public:
safe_fifo()
{}
~safe_fifo()
{}
public:
void save(const T& t)
{
LOCKER lock(lock_);
que_.push_back(t);
}
bool take(T& t)
{
LOCKER lock(lock_);
if (que_.size())
{
t = que_.front();
que_.pop_front();
return true;
}
else
{
return false;
}
}
size_t size(void)
{
LOCKER lock(lock_);
return que_.size();
}
void clear(void)
{
LOCKER lock(lock_);
que_.clear();
}
};
class shared_mem : public refer
{
std::string name_;
int32_t id_;
int32_t shm_id_;
char *shm_buf_;
size_t size_;
bool first_;
public:
shared_mem();
static void init_page_info(void);
static char* invalid_map_addr(void);
static uint64_t mem_total_;
static uint64_t page_unit_;
static uint64_t huge_page_unit_;
protected:
~shared_mem();
public:
int32_t open(int32_t id, size_t* bytes, const char* name = nullptr);
int32_t close(void);
char* get_mem(size_t* size = nullptr);
bool is_first(void);
void clear_kernel_object(void);
};

373
common/ipc_wrapper.cpp Normal file
View File

@ -0,0 +1,373 @@
#include "ipc_wrapper.h"
#include <string.h>
#include <vector>
#include "log_util.h"
#include "ipc_util.h"
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// ipc_wrapper_shm
class ipc_wrapper_shm : public ipc_wrapper
{
enum data_ind
{
DATA_INTER_CMD = 0,
};
enum internal_cmd
{
INTER_CMD_NONE = 0,
INTER_CMD_EXIT,
};
typedef struct _shm_pack
{
size_t space; // buffer length
size_t bytes; // data length
char data[4];
}SHMPACK, *LPSHMPACK;
typedef struct _sync_shm // 1-lock(write_lock) --> 2-write content --> 3-notify read --> 4-wait(wait_sent) --> 5-return step 2 if data has not sent finished, or else to 6 --> 6-release(write_lock) --> 7-over
{
sem_t wait_arrive; // read event
sem_t wait_sent; // peer has read, i can re-write now
sem_t notify_read; // notify peer to read
sem_t notify_write; // notify peer to write
char data[8]; // internal data
}SYNCSHM, *LPSYNCSHM;
volatile bool run_;
int32_t id_;
shared_mem* shm_;
LPSYNCSHM sync_shm_;
linux_event* wait_arrive_;
linux_event* wait_sent_;
linux_event* notify_write_;
linux_event* notify_read_;
THREAD_PTR thread_;
LPSHMPACK buf_in_;
LPSHMPACK buf_out_;
bool buf_out_used_;
volatile bool cancel_write_;
void create(const char* file, int32_t id, size_t bytes, unsigned sent_percent)
{
int32_t err = 0;
shm_ = new shared_mem();
err = shm_->open(id, &bytes, file);
if (err)
{
log_cls::log(LOG_LEVEL_FATAL, "ipc_wrapper_shm(%p) open shared memory(%s, %d, %s) failed: %s\n"
, this, file, id, sys_util::format_readable_bytes(bytes).c_str(), strerror(err));
shm_->release();
shm_ = nullptr;
return;
}
SHMPACK pack = { 0 };
std::string desc("ipc_wrapper_shm-");
char buf[80] = { 0 }, *ptr = shm_->get_mem(&pack.space);
int32_t pack_head = ALIGN_INT(sizeof(pack), 16), head = ALIGN_INT(sizeof(SYNCSHM), 16);
sprintf(buf, "%d-", id);
desc += buf;
printf("Total size: %ld, Head size: %d, pack-head: %d\n", pack.space, head, pack_head);
sync_shm_ = (LPSYNCSHM)ptr;
ptr += head;
pack.space -= head + pack_head * 2;
head = ALIGN_INT(pack.space * sent_percent / 100, 16);
printf("%ld * %d%% = %d\n", pack.space, sent_percent, head);
if (shm_->is_first())
{
SYNCSHM ss;
memcpy(sync_shm_, &ss, sizeof(ss));
buf_in_ = (LPSHMPACK)ptr;
buf_in_->space = pack.space - head;
ptr += buf_in_->space + pack_head;
buf_out_ = (LPSHMPACK)ptr;
buf_out_->space = head;
//*
wait_arrive_ = new linux_event(&sync_shm_->wait_arrive, true, (desc + "wait_arrive").c_str());
wait_sent_ = new linux_event(&sync_shm_->wait_sent, true, (desc + "wait_sent").c_str());
notify_read_ = new linux_event(&sync_shm_->notify_read, true, (desc + "notify_read").c_str());
notify_write_ = new linux_event(&sync_shm_->notify_write, true, (desc + "notify_write").c_str());
/*/
wait_arrive_ = new linux_event((desc + "wait_arrive").c_str(), "");
wait_sent_ = new linux_event((desc + "wait_sent").c_str(), "");
notify_write_ = new linux_event((desc + "notify_write").c_str(), "");
notify_read_ = new linux_event((desc + "notify_read").c_str(), "");
///*////////////
memset(sync_shm_->data, 0, sizeof(sync_shm_->data));
}
else
{
//*
wait_sent_ = new linux_event(&sync_shm_->notify_write, false, (desc + "wait_sent").c_str());
wait_arrive_ = new linux_event(&sync_shm_->notify_read, false, (desc + "wait_arrive").c_str());
notify_write_ = new linux_event(&sync_shm_->wait_sent, false, (desc + "notify_write").c_str());
notify_read_ = new linux_event(&sync_shm_->wait_arrive, false, (desc + "notify_read").c_str());
/*/
wait_arrive_ = new linux_event((desc + "notify_read").c_str(), "");
wait_sent_ = new linux_event((desc + "notify_write").c_str(), "");
notify_read_ = new linux_event((desc + "wait_arrive").c_str(), "");
notify_write_ = new linux_event((desc + "wait_sent").c_str(), "");
///*////////////
buf_out_ = (LPSHMPACK)ptr;
buf_in_ = (LPSHMPACK)(ptr + pack_head + buf_out_->space);
}
printf("buf in = %p + %ld, buf out = %p + %ld\n", buf_in_, buf_in_->space, buf_out_, buf_out_->space);
thread_.reset(new std::thread(&ipc_wrapper_shm::read_thread, this));
// handler_->on_event(event_handler::EVENT_WRITE, buf_out_->data, buf_size_);
}
void read_thread(void)
{
event_handler* handler = handler_;
handler->add_ref();
while (run_)
{
wait_arrive_->wait();
if (!run_)
break;
if (!shm_->is_first() && sync_shm_->data[DATA_INTER_CMD] == INTER_CMD_EXIT)
break;
handler->on_event(event_handler::EVENT_READ, buf_in_->data, buf_in_->bytes);
notify_write_->trigger();
}
handler->release();
}
public:
ipc_wrapper_shm(const char* file, int32_t id
, size_t bytes
, event_handler* handler
, unsigned sent_percent = 50)
: ipc_wrapper(handler), run_(true), shm_(nullptr), sync_shm_(nullptr)
, wait_arrive_(nullptr), wait_sent_(nullptr), notify_write_(nullptr), notify_read_(nullptr)
, buf_in_(nullptr), buf_out_(nullptr), id_(id), buf_out_used_(false), cancel_write_(false)
{
create(file, id, bytes, sent_percent);
}
protected:
~ipc_wrapper_shm()
{}
public:
virtual int32_t write(const char* pack, size_t * bytes, bool kbuf, unsigned timeout = WAIT_INFINITE) override
{
if (!shm_)
return ENOTCONN;
// invoke in read-thread is not allowed
if(std::this_thread::get_id() == thread_->get_id())
return EDEADLOCK;
size_t rest = *bytes;
int32_t ret = 0;
cancel_write_ = false;
if (kbuf)
{
buf_out_->bytes = rest;
notify_read_->trigger();
if (wait_sent_->wait(timeout))
rest = 0;
else
ret = ETIME;
}
else
{
chronograph timer;
while (rest)
{
buf_out_->bytes = rest > buf_out_->space ? buf_out_->space : rest;
memcpy(buf_out_->data, pack, buf_out_->bytes);
notify_read_->trigger();
pack += buf_out_->space;
if (!wait_sent_->wait(timeout))
break;
if (cancel_write_)
break;
if (rest <= buf_out_->space)
{
rest = 0;
break;
}
rest -= buf_out_->space;
if (timeout)
{
uint64_t t = timer.elapse_ms();
if (t >= timeout)
break;
timeout -= t;
timer.reset();
}
}
if (cancel_write_)
ret = ECANCELED;
else if (rest)
ret = ETIME;
}
*bytes -= rest;
return ret;
}
virtual bool cancel_write(void) override
{
cancel_write_ = true;
if (wait_sent_->is_waiting())
{
wait_sent_->trigger();
wait_sent_->reset(); // sure ?
}
return true;
}
virtual bool is_ok(void) override
{
return shm_ != nullptr;
}
virtual bool is_first(void) override
{
return shm_ && shm_->is_first();
}
virtual void* get_kbuf(size_t* bytes) override
{
if (!bytes)
return nullptr;
if (buf_out_used_)
{
*bytes = 0;
return nullptr;
}
buf_out_used_ = true;
*bytes = buf_out_->space;
return buf_out_->data;
}
virtual void release_kbuf(void* buf) override
{
buf_out_used_ = false;
}
virtual void clear_kernel_objects(void) override
{
std::string desc("ipc_wrapper_shm-");
char id[20] = { 0 };
sprintf(id, "%d-", id_);
desc += id;
linux_event::clear_named_event((desc + "wait_arrive").c_str());
linux_event::clear_named_event((desc + "wait_sent").c_str());
linux_event::clear_named_event((desc + "notify_write").c_str());
linux_event::clear_named_event((desc + "notify_read").c_str());
shm_->clear_kernel_object();
}
virtual int32_t stop(void) override
{
run_ = false;
if (shm_)
{
wait_arrive_->trigger();
if (thread_.get() && thread_->joinable())
thread_->join();
thread_.reset();
if (is_first())
{
sync_shm_->data[DATA_INTER_CMD] = INTER_CMD_EXIT;
notify_read_->trigger();
wait_sent_->wait(100);
}
notify_read_->release();
notify_read_ = nullptr;
notify_write_->release();
notify_write_ = nullptr;
wait_arrive_->release();
wait_arrive_ = nullptr;
wait_sent_->release();
wait_sent_ = nullptr;
sync_shm_ = nullptr;
buf_in_ = buf_out_ = nullptr;
shm_->close();
shm_->release();
}
shm_ = nullptr;
return ipc_wrapper::stop();
}
};
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// ipc_wrapper
ipc_wrapper::ipc_wrapper(event_handler* handler) : handler_(handler)
{
if (handler_)
handler_->add_ref();
}
ipc_wrapper::~ipc_wrapper()
{}
void* ipc_wrapper::get_kbuf(size_t* bytes)
{
if (bytes)
*bytes = 0;
return nullptr;
}
void ipc_wrapper::release_kbuf(void* buf)
{}
void ipc_wrapper::clear_kernel_objects(void)
{}
bool ipc_wrapper::cancel_write(void)
{
return false;
}
int32_t ipc_wrapper::stop(void)
{
if (handler_)
handler_->release();
handler_ = nullptr;
return 0;
}
ipc_wrapper* ipc_wrapper::create_ipc(event_handler* handler, ipc_type type, const char* param)
{
if (type == IPC_SHARED_MEM)
{
// path-file:id:size[:write-ratio-> percent of size of sent buffer, default is 50, only valid in owner]
std::string file(param);
size_t pos = file.rfind(':');
std::vector<size_t> params;
while(pos != std::string::npos && params.size() < 3)
{
params.push_back(std::stold(file.substr(pos + 1)));
file.erase(pos);
pos = file.rfind(':');
}
if(params.size() >= 2)
{
params.insert(params.begin(), 50);
pos = params.size();
return dynamic_cast<ipc_wrapper*>(new ipc_wrapper_shm(file.c_str(), params[pos - 1], params[pos - 2], handler, params[pos - 3]));
}
}
return nullptr;
}

73
common/ipc_wrapper.h Normal file
View File

@ -0,0 +1,73 @@
#pragma once
// IPC utility
//
// created on 2022-12-02
//
#include "event_monitor.h"
typedef struct _pack_base
{
uint64_t total_bytes; // total bytes of payload
uint64_t offset; // offset in total of this part
uint64_t bytes; // bytes in data, not include this head
int32_t cmd; // command
char data[0]; // payload
}PACK_BASE, *LPPACK_BASE;
class ipc_wrapper : public parent_holder
{
protected:
event_handler* handler_;
public:
ipc_wrapper(event_handler* handler);
enum ipc_type
{
IPC_FILE = 0, // param: path file
IPC_PIPE, // param: pipe name
IPC_NET, // param: dot-ip:port
IPC_SHARED_MEM, // param: path-file:id:size[:write-ratio-> percent of size of sent buffer, default is 50, only valid in owner]
IPC_USB, // param: vid:pid
IPC_COM, // param: COM1
};
static ipc_wrapper* create_ipc(event_handler* handler, ipc_type type, const char* param);
protected:
virtual ~ipc_wrapper();
public:
// Function: write content to peer
//
// Parameters: pack - content pack
//
// bytes - [in] bytes of data in 'pack', [out] - bytes of data has sent
//
// kbuf - whether memory 'pack' is from IPC, i.e. return from method get_kbuf()
//
// timeout - time out, in milliseconds
//
// Return: 0 - success
// ENOTCONN - the commuction is not connected, equal to !is_ok()
// EDEADLOCK - calling from current thread is disallowed
// ETIME - time out
// ECANCELED - user cancelled the operation
virtual int32_t write(const char* pack, size_t *bytes, bool kbuf, unsigned timeout = WAIT_INFINITE) = 0; // DON'T call in event_handler::on_event routine, it will be DEAD-LOCK !!!
virtual bool is_ok(void) = 0; // whether the commuction is ready
virtual bool is_first(void) = 0; // whether the communication established by me
// Function: obtain IPC internal buffer to reduce ONE memory copy for sent data
//
// Parameter: bytes - [in] desired size; [out] - real size
//
// Return: memory pointer if success, or nullptr. call release_kbuf(ptr) if no longer used
virtual void* get_kbuf(size_t* bytes);
virtual void release_kbuf(void* buf); // release the internal buffer returned by get_kbuf
virtual void clear_kernel_objects(void); // clear all kernel objects the IPC used, used to clear exception
virtual bool cancel_write(void); // cancel current write operation
virtual int32_t stop(void) override; // close the connection
};

112
common/log_util.cpp Normal file
View File

@ -0,0 +1,112 @@
#include "log_util.h"
#include <sys/stat.h>
#include <fcntl.h>
#include <memory>
#include <unistd.h>
#include <string.h>
#include "referer.h"
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// class<73><73>
log_cls* log_cls::inst_ = nullptr;
log_cls::log_cls(const char* path_file, log_level level, int32_t max_size) : file_(path_file), max_size_(max_size), level_(level), dst_(nullptr)
{
create_log_file();
}
log_cls::~log_cls()
{
if (dst_)
fclose(dst_);
}
void log_cls::create_log_file(void)
{
dst_ = fopen(file_.c_str(), "a+b");
if (dst_)
{
fseek(dst_, 0, SEEK_END);
if (ftell(dst_) == 0)
{
unsigned char bom[] = { 0x0ef, 0x0bb, 0x0bf };
fwrite(bom, sizeof(bom), 1, dst_);
}
else
fwrite("\n\n\n", 1, 3, dst_);
}
}
void log_cls::log_internal(const char* txt)
{
std::string now("[" + chronograph::now() + "] ");
now += txt;
{
LOCKER locker(lock_);
if (dst_)
{
fwrite(now.c_str(), 1, now.length(), dst_);
fflush(dst_);
if (ftell(dst_) >= max_size_)
{
fclose(dst_);
remove(file_.c_str());
create_log_file();
}
}
}
}
void log_cls::initialize(const char* path_file, log_level level, int32_t max_size)
{
if (log_cls::inst_)
delete log_cls::inst_;
std::string path("");
if (!path_file || *path_file == 0)
{
size_t pos = 0;
char strpid[20] = {0};
path = sys_util::get_module_path();
pos = path.rfind('/');
if (pos++ != std::string::npos)
path.erase(0, pos);
path.insert(0, "/tmp/scanner/");
mkdir("/tmp/scanner", S_IREAD | S_IWRITE | S_IEXEC);
sprintf(strpid, "_%p.log", getpid());
path += strpid;
path_file = path.c_str();
}
log_cls::inst_ = new log_cls(path_file, level, max_size);
}
int32_t log_cls::log_when_err(int32_t err, const char* oper_desc, log_level level)
{
if(err == -1)
{
err = errno;
log_cls::log(level, "%s = %s\n", oper_desc, strerror(err));
}
return err;
}
log_level log_cls::get_log_level(void)
{
if (log_cls::inst_)
return log_cls::inst_->level_;
else
return LOG_LEVEL_ALL;
}
std::string log_cls::get_log_file(void)
{
if (log_cls::inst_)
return log_cls::inst_->file_;
else
return "";
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//

67
common/log_util.h Normal file
View File

@ -0,0 +1,67 @@
#pragma once
// log utility
//
// created on 2022-11-30
//
#include "referer.h"
#include <string>
#include <memory>
#define SIZE_KB(n) (n) * 1024
#define SIZE_MB(n) SIZE_KB((n) * 1024)
#define SIZE_GB(n) SIZE_MB((n) * 1024)
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// object event
//
enum log_level
{
LOG_LEVEL_ALL = 0,
LOG_LEVEL_DEBUG,
LOG_LEVEL_WARNING,
LOG_LEVEL_FATAL,
};
class log_cls
{
std::string file_;
int32_t max_size_;
log_level level_;
MUTEX lock_;
FILE *dst_;
static log_cls* inst_;
void create_log_file(void);
protected:
log_cls(const char* path_file, log_level level, int32_t max_size);
~log_cls();
void log_internal(const char* txt);
public:
static void initialize(const char* path_file, log_level level = LOG_LEVEL_ALL, int32_t max_size = SIZE_MB(10));
template<typename ... Args>
static void log(log_level level, const char* fmt, Args ... args)
{
if (level >= log_cls::get_log_level() && log_cls::inst_)
{
size_t size = snprintf(nullptr, 0, fmt, args ...) + 1;
std::unique_ptr<char[]> buf(new char[size]);
snprintf(buf.get(), size, fmt, args ...);
log_cls::inst_->log_internal(buf.get());
}
}
static int32_t log_when_err(int32_t err, const char* oper_desc, log_level level = LOG_LEVEL_WARNING); // log as: oper_desc = strerror(errno)\n. return real error number errno
static log_level get_log_level(void);
static std::string get_log_file(void);
};

355
common/referer.cpp Normal file
View File

@ -0,0 +1,355 @@
#include "referer.h"
#include "log_util.h"
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// refer
refer::refer() : ref_(1)
{
on_born();
}
refer::~refer()
{
on_dead();
}
void refer::on_born(void)
{}
void refer::on_dead(void)
{}
int32_t refer::add_ref(void)
{
LOCKER lock(mutex_);
return ++ref_;
}
int32_t refer::release(void)
{
int32_t ref = 0;
{
LOCKER lock(mutex_);
ref = --ref_;
}
if (ref == 0)
delete this;
return ref;
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// chronograph
#include <sys/time.h>
chronograph::chronograph()
{
reset();
}
chronograph::~chronograph()
{}
bool chronograph::now(struct timeval* tv)
{
struct timezone tz = { 0 };
return gettimeofday(tv, &tz) == 0;
}
bool chronograph::now(uint64_t* seconds, uint64_t* u_seconds)
{
struct timeval tv = { 0 };
struct timezone tz = { 0 };
if (gettimeofday(&tv, &tz) == 0)
{
if (seconds)
*seconds = tv.tv_sec;
if (u_seconds)
*u_seconds = tv.tv_usec;
return true;
}
else
{
return false;
}
}
std::string chronograph::now(bool with_ms/*whether with milliseconds*/) // return '2022-11-30 10:38:42.123', no '.123' if with_ms was false
{
struct timeval tv = { 0 };
if (!chronograph::now(&tv))
return "";
char buf[40] = { 0 };
time_t t = tv.tv_sec;
struct tm* l = localtime(&t);
if (with_ms)
sprintf(buf, "%04d-%02d-%02d %02d:%02d:%02d.%06d", l->tm_year + 1900, l->tm_mon + 1, l->tm_mday
, l->tm_hour, l->tm_min, l->tm_sec, tv.tv_usec);
else
sprintf(buf, "%04d-%02d-%02d %02d:%02d:%02d", l->tm_year + 1900, l->tm_mon + 1, l->tm_mday
, l->tm_hour, l->tm_min, l->tm_sec);
return buf;
}
uint64_t chronograph::elapse_s(void)
{
struct timeval tv = { 0 };
chronograph::now(&tv);
return tv.tv_sec - bgn_.tv_sec;
}
uint64_t chronograph::elapse_ms(void)
{
struct timeval tv = { 0 };
uint64_t dif = 0;
chronograph::now(&tv);
dif = SEC_2_MS(tv.tv_sec - bgn_.tv_sec);
dif += tv.tv_usec / MSEC_2_US(1);
dif -= bgn_.tv_usec / MSEC_2_US(1);
return dif;
}
uint64_t chronograph::elapse_us(void)
{
struct timeval tv = { 0 };
uint64_t dif = 0;
chronograph::now(&tv);
dif = SEC_2_US(tv.tv_sec - bgn_.tv_sec);
dif += tv.tv_usec;
dif -= bgn_.tv_usec;
return dif;
}
void chronograph::reset()
{
chronograph::now(&bgn_);
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// sys utility
#include <dirent.h>
#include <unistd.h>
#include <sys/stat.h>
#include <string.h>
namespace sys_util
{
static bool find_module(const char* path, void* param)
{
std::string* para = (std::string*)param;
if (para[0].empty())
{
para[1] = path;
return false;
}
else
{
const char* name = strrchr(path, '/');
if (name++ == nullptr)
name = path;
if (strstr(name, para[0].c_str()))
{
para[1] = path;
return false;
}
return true;
}
}
int32_t enum_modules(bool(*on_found)(const char* path_module_name, void* param),// return false to stop enumeratin
void* param, // user defined data, passed into callback on_found
unsigned pid // process id, -1 is self
) // return errno
{
char path[128] = { 0 };
if (pid == -1)
pid = getpid();
sprintf(path, "/proc/%u/map_files/", pid);
return enum_files(path, on_found, param, false);
}
int32_t enum_files(const char* dir, // dir path
bool(*on_found)(const char* path_name, void* param), // return false to stop enumeratin
void* param // user defined data, passed into callback on_found
, bool recursive
) // return errno
{
int32_t ret = 0;
DIR* pdir = nullptr;
struct dirent* ent = nullptr;
pdir = opendir(dir);
if (!pdir)
return errno;
while ((ent = readdir(pdir)))
{
if (ent->d_type & DT_DIR)
{
if (recursive)
{
if (strcmp(ent->d_name, ".") && strcmp(ent->d_name, ".."))
{
std::string sub(dir);
sub += "/";
sub += ent->d_name;
ret = enum_files(sub.c_str(), on_found, param, recursive);
if (ret == 0x5e17)
break;
}
}
}
else
{
std::string file(dir);
file += "/";
file += ent->d_name;
if (!on_found(read_link(file.c_str()).c_str(), param))
{
ret = 0x5e17;
break;
}
}
}
closedir(pdir);
return ret == 0x5e17 ? 0 : ret;
}
std::string get_module_path(const char* module_name, unsigned pid) // get module full path, nullptr is for main-exe
{
std::string param[] = { module_name ? module_name : "", "" };
enum_modules(find_module, param, pid);
return param[1];
}
std::string read_link(const char* lnk)
{
char path[512] = { 0 };
readlink(lnk, path, sizeof(path) - 1);
return path;
}
size_t get_page_size(void)
{
size_t size = sysconf(_SC_PAGESIZE);
if (size < 1024 || (size & 0x0fe0000ff)) // nKB && < 16MB
size = getpagesize();
return size;
}
bool create_folder(const char* dir)
{
return mkdir(dir, S_IREAD | S_IWRITE | S_IEXEC) == 0 || errno == EEXIST;
}
int32_t get_memory_info(uint64_t* total, uint64_t* available)
{
if(!total && !available)
return 0;
char line[128] = {0};
FILE *src = fopen("/proc/meminfo", "rb");
int32_t count = total && available ? 2 : 1;
unsigned long val = 0;
if(!src)
return log_cls::log_when_err(-1, "fopen('/proc/meminfo', 'rb')", LOG_LEVEL_FATAL);
while(fgets(line, sizeof(line) - 1, src))
{
if(sscanf(line, "MemTotal: %ld", &val))
{
if(total)
{
*total = val * 1024;
if(--count == 0)
break;
}
}
else if(sscanf(line, "MemFree: %ld", &val))
{
if(available)
{
*available = val * 1024;
if(--count == 0)
break;
}
}
}
fclose(src);
return 0;
}
std::string format_readable_bytes(uint64_t bytes)
{
std::string str("\0", 80);
if(bytes >= SIZE_GB(1))
{
double v = bytes * 1.0f / (SIZE_GB(1));
size_t pos = 0;
sprintf(&str[0], "%.2fGB", v);
pos = str.find(".");
while(pos > 3)
{
pos -= 3;
str.insert(pos, ",");
}
}
else if(bytes >= SIZE_MB(1))
{
double v = bytes * 1.0f / (SIZE_MB(1));
sprintf(&str[0], "%.2fMB", v);
}
else if(bytes >= SIZE_KB(1))
{
double v = bytes * 1.0f / (SIZE_KB(1));
sprintf(&str[0], "%.2fKB", v);
}
else
{
sprintf(&str[0], "%uB", (unsigned)bytes);
}
return str;
}
}

120
common/referer.h Normal file
View File

@ -0,0 +1,120 @@
#pragma once
// Objects life management
//
// created on 2022-11-29
//
#include <semaphore.h>
#include <mutex>
#define ALIGN_INT(val, n) ((((val) + (n) - 1) / (n)) * (n))
#define WAIT_INFINITE 0
#define SEC_2_MS(s) ((s) * 1000)
#define MSEC_2_US(ms) ((ms) * 1000)
#define USEC_2_NS(us) ((us) * 1000)
#define SEC_2_US(s) MSEC_2_US(SEC_2_MS(s))
#define SEC_2_NS(s) USEC_2_NS(MSEC_2_US(SEC_2_MS(s)))
#define MSEC_2_NS(ms) USEC_2_NS(MSEC_2_US(ms))
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
typedef std::mutex MUTEX;
typedef std::lock_guard<MUTEX> LOCKER;
// object life referer
//
// derived from 'refer' if your class used in multi-threads
//
class refer
{
volatile int32_t ref_;
MUTEX mutex_;
protected:
refer();
virtual ~refer();
virtual void on_born(void);
virtual void on_dead(void);
public:
virtual int32_t add_ref(void);
virtual int32_t release(void);
};
#include <sys/time.h>
class chronograph
{
struct timeval bgn_;
public:
chronograph();
~chronograph();
static bool now(struct timeval* tv);
static bool now(uint64_t* seconds, uint64_t* u_seconds);
static std::string now(bool with_ms = true/*whether with milliseconds*/); // return '2022-11-30 10:38:42.123', no '.123' if with_ms was false
public:
uint64_t elapse_s(void);
uint64_t elapse_ms(void);
uint64_t elapse_us(void);
void reset(void);
};
#include <string>
namespace sys_util
{
int32_t enum_modules(bool(*on_found)(const char* path_module_name, void* param),// return false to stop enumeratin
void* param, // user defined data, passed into callback on_found
unsigned pid = -1 // process id, -1 is self
); // return errno
int32_t enum_files(const char* dir, // dir path
bool(*on_found)(const char* path_name, void* param), // return false to stop enumeratin
void* param // user defined data, passed into callback on_found
, bool recursive = true // walk recursive
); // return errno
std::string get_module_path(const char* module_name = nullptr
, unsigned pid = -1); // get module full path, nullptr is for main-exe
std::string read_link(const char* lnk);
size_t get_page_size(void);
bool create_folder(const char* dir);
// Function: pick single-line info file data, return count of set-value variable
//
// Parameters: file - full path of local file
//
// line_max - max bytes of a line in file 'file'
//
// fmt - line fromat string, e.g. "model name : %60[\x20-\x7e]", "MemoryTotal: %ld", "address sizes : %d bits physical, %d bits virtual", ...
//
// args - variable list
//
// Return: count of the variable which got the value
template<typename ... Args>
int32_t get_inf_file_data(const char* file, size_t line_max, const char* fmt, Args ... args)
{
std::string buf("\0", line_max + 8);
FILE *src = fopen(file, "rb");
int32_t count = 0;
if(!src)
return 0;
while(fgets(&buf[0], line_max, src))
{
count = sscanf(&buf[0], fmt, args ...);
if(count > 0)
break;
}
fclose(src);
return count;
}
int32_t get_memory_info(uint64_t* total, uint64_t* available);
std::string format_readable_bytes(uint64_t bytes); // convert to readable text: 512B, 1.21KB, 1.10MB, 3.45GB, 1,234.56GB ...
}

22
scanner/CMakeLists.txt Normal file
View File

@ -0,0 +1,22 @@
project(scanner)
add_compile_options(-std=c++11)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fPIC")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fPIC")
aux_source_directory(${PROJECT_SOURCE_DIR} DIR_SRCS)
file(GLOB DIR_HEADS "${PROJECT_SOURCE_DIR}/*.h")
set(DIR_SRCS ${DIR_SRCS} ${DIR_HEADS})
# add_library(${PROJECT_NAME} SHARED ${DIR_SRCS})
add_executable(scanner ${PROJECT_SOURCE_DIR}/main/scanner.cpp ${PROJECT_SOURCE_DIR}/../common/event_monitor.cpp ${PROJECT_SOURCE_DIR}/../common/ipc_util.cpp ${PROJECT_SOURCE_DIR}/../common/log_util.cpp ${PROJECT_SOURCE_DIR}/../common/referer.cpp ${PROJECT_SOURCE_DIR}/../common/ipc_wrapper.cpp)
link_libraries(libdl libpthread librt)
target_link_libraries(${PROJECT_NAME} PRIVATE
dl
pthread
rt
)
target_include_directories(${PROJECT_NAME} PRIVATE ${PROJECT_SOURCE_DIR}
${PROJECT_SOURCE_DIR}/main
${PROJECT_SOURCE_DIR}/../common
)
set(LIBRARY_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/out)

201
scanner/main/scanner.cpp Normal file
View File

@ -0,0 +1,201 @@
// scanner.cpp : 此文件包含 "main" 函数。程序执行将在此处开始并结束。
//
#include <iostream>
#include <unistd.h>
#include <string.h>
#include <vector>
#include "../../common/ipc_util.h"
#include "../../common/log_util.h"
#include "../../common/event_monitor.h"
#include "../../common/ipc_wrapper.h"
static void test(const char* oper);
int32_t main(int32_t argc, char *argv[])
{
char *oper = argc > 1 ? argv[1] : nullptr;
log_cls::initialize(nullptr);
log_cls::log(LOG_LEVEL_DEBUG, "----starting...----\n");
test(oper);
return 0;
}
class rogger : public event_handler
{
linux_event* quit_;
ipc_wrapper* ipc_;
safe_fifo<std::string> reply_;
volatile bool run_;
int32_t wait(void)
{
while(quit_->wait())
{
if(!run_)
break;
std::string answer("");
if (reply_.take(answer))
{
size_t len = answer.length();
if (len)
ipc_->write(answer.c_str(), &len, false, 100);
}
}
}
int32_t run_client(ipc_wrapper* s)
{
size_t len = SIZE_MB(1);
char* buf = (char*)s->get_kbuf(&len);
printf("--->request 1MB buffer of IPC internal, returned %u\n", len);
do
{
size_t l = 0, count = 0;
chronograph watch;
printf("Input message: ");
while ((buf[l++] = getchar()) != '\n');
buf[--l] = 0;
watch.reset();
s->write(buf, &l, true);
if(strcmp(buf, "exit") == 0)
break;
while(1)
{
if(quit_->wait(50))
{
std::string rcvd("");
if (reply_.take(rcvd))
{
if (rcvd.length())
{
count++;
printf("reply(%d): %s\n", rcvd.length(), rcvd.c_str());
l = rcvd.length();
strcpy(buf, rcvd.c_str());
if(s->write(buf, &l, true, 30) == ETIME)
{
printf("send content timouted(%ld in %ld us)!\n", count, watch.elapse_us());
break;
}
}
}
if(rcvd.empty())
printf("reply(0)\n");
}
else
{
printf("wait reply timeout(%ld in %ld us).\n", count, watch.elapse_us());
break;
}
}
} while (1);
s->release_kbuf(buf);
}
public:
rogger() : quit_(new linux_event("quit")), run_(true)
{
std::string param(sys_util::get_module_path() + ":0:1024:10");
ipc_ = ipc_wrapper::create_ipc(this, ipc_wrapper::IPC_SHARED_MEM, param.c_str());
}
protected:
~rogger()
{
quit_->release();
}
public:
int32_t on_event(int32_t ev, void* data, size_t data_len) override
{
if (ev == EVENT_READ)
{
char* d = (char*)data;
std::string str(d, data_len);
d[data_len] = 0;
if(ipc_->is_first())
{
printf("R(%d): %s\n", data_len, d);
for (size_t i = 0; i < str.length() / 2; ++i)
{
char c = str[i];
str[i] = str[str.length() - i - 1];
str[str.length() - i - 1] = c;
}
}
if (strcmp(d, "exit") == 0)
{
printf("Bye-bye :)\n");
run_ = false;
// quit_->trigger();
}
else
reply_.save(str);
quit_->trigger();
}
return 0;
}
int32_t get_fd(void)
{
return -1;
}
void run(void)
{
if (ipc_->is_first())
{
printf("%p run as server ...\n", getpid());
wait();
}
else
{
printf("%p run as client ...\n", getpid());
run_client(ipc_);
}
ipc_->stop();
ipc_->release();
}
void clear_kobjects(void)
{
ipc_->clear_kernel_objects();
}
void test_timed_wait(unsigned ms)
{
for(int i = 0; i < 1; ++i)
{
printf("%s: wait(%u) ...\n", chronograph::now().c_str(), ms);
printf("%s: wait result %d\n\n", chronograph::now().c_str(), quit_->wait(ms));
}
}
};
static void test(const char* oper)
{
rogger* r = new rogger();
r->test_timed_wait(123);
if(oper && strcmp(oper, "clear") == 0)
r->clear_kobjects();
else
r->run();
r->release();
}

166
sln/scanner.vcxproj Normal file
View File

@ -0,0 +1,166 @@
<?xml version="1.0" encoding="utf-8"?>
<Project DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<ItemGroup Label="ProjectConfigurations">
<ProjectConfiguration Include="Debug|Win32">
<Configuration>Debug</Configuration>
<Platform>Win32</Platform>
</ProjectConfiguration>
<ProjectConfiguration Include="Release|Win32">
<Configuration>Release</Configuration>
<Platform>Win32</Platform>
</ProjectConfiguration>
<ProjectConfiguration Include="Debug|x64">
<Configuration>Debug</Configuration>
<Platform>x64</Platform>
</ProjectConfiguration>
<ProjectConfiguration Include="Release|x64">
<Configuration>Release</Configuration>
<Platform>x64</Platform>
</ProjectConfiguration>
</ItemGroup>
<ItemGroup>
<ClCompile Include="..\common\event_monitor.cpp" />
<ClCompile Include="..\common\ipc_util.cpp" />
<ClCompile Include="..\common\ipc_wrapper.cpp" />
<ClCompile Include="..\common\log_util.cpp" />
<ClCompile Include="..\common\referer.cpp" />
<ClCompile Include="..\scanner\main\scanner.cpp" />
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\common\event_monitor.h" />
<ClInclude Include="..\common\ipc_util.h" />
<ClInclude Include="..\common\ipc_wrapper.h" />
<ClInclude Include="..\common\log_util.h" />
<ClInclude Include="..\common\referer.h" />
</ItemGroup>
<ItemGroup>
<Text Include="..\scanner\CMakeLists.txt" />
</ItemGroup>
<PropertyGroup Label="Globals">
<VCProjectVersion>16.0</VCProjectVersion>
<Keyword>Win32Proj</Keyword>
<ProjectGuid>{0f166a6e-0f74-4c38-99cd-c74b09e283db}</ProjectGuid>
<RootNamespace>scanner</RootNamespace>
<WindowsTargetPlatformVersion>10.0</WindowsTargetPlatformVersion>
</PropertyGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" />
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'" Label="Configuration">
<ConfigurationType>Application</ConfigurationType>
<UseDebugLibraries>true</UseDebugLibraries>
<PlatformToolset>v142</PlatformToolset>
<CharacterSet>Unicode</CharacterSet>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'" Label="Configuration">
<ConfigurationType>Application</ConfigurationType>
<UseDebugLibraries>false</UseDebugLibraries>
<PlatformToolset>v142</PlatformToolset>
<WholeProgramOptimization>true</WholeProgramOptimization>
<CharacterSet>Unicode</CharacterSet>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'" Label="Configuration">
<ConfigurationType>Application</ConfigurationType>
<UseDebugLibraries>true</UseDebugLibraries>
<PlatformToolset>v142</PlatformToolset>
<CharacterSet>Unicode</CharacterSet>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'" Label="Configuration">
<ConfigurationType>Application</ConfigurationType>
<UseDebugLibraries>false</UseDebugLibraries>
<PlatformToolset>v142</PlatformToolset>
<WholeProgramOptimization>true</WholeProgramOptimization>
<CharacterSet>Unicode</CharacterSet>
</PropertyGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" />
<ImportGroup Label="ExtensionSettings">
</ImportGroup>
<ImportGroup Label="Shared">
</ImportGroup>
<ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
<Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
</ImportGroup>
<ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
<Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
</ImportGroup>
<ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">
<Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
</ImportGroup>
<ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
<Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
</ImportGroup>
<PropertyGroup Label="UserMacros" />
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
<LinkIncremental>true</LinkIncremental>
<IncludePath>$(CurrentVsInstallRoot)\..\..\linux\include\;$(CurrentVsInstallRoot)\..\..\linux\include\x86_64-linux-gnu\;$(IncludePath)</IncludePath>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
<LinkIncremental>false</LinkIncremental>
<IncludePath>$(CurrentVsInstallRoot)\..\..\linux\include\;$(CurrentVsInstallRoot)\..\..\linux\include\x86_64-linux-gnu\;$(IncludePath)</IncludePath>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">
<LinkIncremental>true</LinkIncremental>
<IncludePath>$(CurrentVsInstallRoot)\..\..\linux\include\;$(CurrentVsInstallRoot)\..\..\linux\include\x86_64-linux-gnu\;$(IncludePath)</IncludePath>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
<LinkIncremental>false</LinkIncremental>
<IncludePath>$(CurrentVsInstallRoot)\..\..\linux\include\;$(CurrentVsInstallRoot)\..\..\linux\include\x86_64-linux-gnu\;$(IncludePath)</IncludePath>
</PropertyGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
<ClCompile>
<WarningLevel>Level3</WarningLevel>
<SDLCheck>true</SDLCheck>
<PreprocessorDefinitions>WIN32;_DEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<ConformanceMode>true</ConformanceMode>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>
<GenerateDebugInformation>true</GenerateDebugInformation>
</Link>
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
<ClCompile>
<WarningLevel>Level3</WarningLevel>
<FunctionLevelLinking>true</FunctionLevelLinking>
<IntrinsicFunctions>true</IntrinsicFunctions>
<SDLCheck>true</SDLCheck>
<PreprocessorDefinitions>WIN32;NDEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<ConformanceMode>true</ConformanceMode>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>
<EnableCOMDATFolding>true</EnableCOMDATFolding>
<OptimizeReferences>true</OptimizeReferences>
<GenerateDebugInformation>true</GenerateDebugInformation>
</Link>
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">
<ClCompile>
<WarningLevel>Level3</WarningLevel>
<SDLCheck>true</SDLCheck>
<PreprocessorDefinitions>_DEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<ConformanceMode>true</ConformanceMode>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>
<GenerateDebugInformation>true</GenerateDebugInformation>
</Link>
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
<ClCompile>
<WarningLevel>Level3</WarningLevel>
<FunctionLevelLinking>true</FunctionLevelLinking>
<IntrinsicFunctions>true</IntrinsicFunctions>
<SDLCheck>true</SDLCheck>
<PreprocessorDefinitions>NDEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<ConformanceMode>true</ConformanceMode>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>
<EnableCOMDATFolding>true</EnableCOMDATFolding>
<OptimizeReferences>true</OptimizeReferences>
<GenerateDebugInformation>true</GenerateDebugInformation>
</Link>
</ItemDefinitionGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets">
</ImportGroup>
</Project>

View File

@ -0,0 +1,63 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<ItemGroup>
<Filter Include="源文件">
<UniqueIdentifier>{4FC737F1-C7A5-4376-A066-2A32D752A2FF}</UniqueIdentifier>
<Extensions>cpp;c;cc;cxx;c++;cppm;ixx;def;odl;idl;hpj;bat;asm;asmx</Extensions>
</Filter>
<Filter Include="头文件">
<UniqueIdentifier>{93995380-89BD-4b04-88EB-625FBE52EBFB}</UniqueIdentifier>
<Extensions>h;hh;hpp;hxx;h++;hm;inl;inc;ipp;xsd</Extensions>
</Filter>
<Filter Include="资源文件">
<UniqueIdentifier>{67DA6AB6-F800-4c08-8B7A-83BB121AAD01}</UniqueIdentifier>
<Extensions>rc;ico;cur;bmp;dlg;rc2;rct;bin;rgs;gif;jpg;jpeg;jpe;resx;tiff;tif;png;wav;mfcribbon-ms</Extensions>
</Filter>
<Filter Include="main">
<UniqueIdentifier>{31dd8340-84a7-434a-9f85-b835fdedf634}</UniqueIdentifier>
</Filter>
<Filter Include="common">
<UniqueIdentifier>{e0ad062f-4e6a-4d9a-b010-83ff181576a4}</UniqueIdentifier>
</Filter>
</ItemGroup>
<ItemGroup>
<ClCompile Include="..\scanner\main\scanner.cpp">
<Filter>main</Filter>
</ClCompile>
<ClCompile Include="..\common\referer.cpp">
<Filter>common</Filter>
</ClCompile>
<ClCompile Include="..\common\event_monitor.cpp">
<Filter>common</Filter>
</ClCompile>
<ClCompile Include="..\common\ipc_util.cpp">
<Filter>common</Filter>
</ClCompile>
<ClCompile Include="..\common\log_util.cpp">
<Filter>common</Filter>
</ClCompile>
<ClCompile Include="..\common\ipc_wrapper.cpp">
<Filter>common</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\common\referer.h">
<Filter>common</Filter>
</ClInclude>
<ClInclude Include="..\common\event_monitor.h">
<Filter>common</Filter>
</ClInclude>
<ClInclude Include="..\common\ipc_util.h">
<Filter>common</Filter>
</ClInclude>
<ClInclude Include="..\common\log_util.h">
<Filter>common</Filter>
</ClInclude>
<ClInclude Include="..\common\ipc_wrapper.h">
<Filter>common</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<Text Include="..\scanner\CMakeLists.txt" />
</ItemGroup>
</Project>