Linux C/C++ Notebook

Linux C/C++ · 2024-01-31 · 3045 人浏览
Linux C/C++ Notebook

Linux C/C++ Notebook

  • @author Bill
  • @date 2024-01-29
说明
  • _public.h头文件汇集了C++常用操作类和方法

文件写入的正确操作

  • CFile类的OpenForRename()方法打开文件,这个方法会先打开一个临时文件,然后把数据写入到临时文件中(比如最终生成的文件名是city.csv,则临时文件名为city.csv.tmp
  • CFile类的CloseAndRename()方法关闭文件,这个方法会先关闭临时文件,然后把临时文件重命名为正式文件
  • 如果程序异常退出,临时文件会被删除,正式文件不会被生成
  • 如果程序正常退出,临时文件会被重命名为正式文件
  • 如果程序正常退出,但是正式文件已经存在,那么正式文件会被删除,临时文件会被重命名为正式文件

提示:实际操作中不一定是CFile类,根据所用框架和上述操作步骤调整代码。CFile类在头文件_public.h中有定义。

Linux信号

在Linux中,信号是一种异步通知机制,用于在进程间或由内核向进程发送事件通知。信号可以用于处理各种事件,例如错误、中断、外部事件等。在C语言中,可以使用信号处理函数来捕获和处理信号。

以下是一些常见的信号及其含义:

  1. SIGABRT(0):在Unix/Linux系统中,发送信号0的主要作用是检查进程是否仍然存在。发送信号0时,系统不会真正给进程发送一个信号,而是检查进程是否仍然存在。如果进程存在,系统会执行相应的检查并返回成功;如果进程不存在,系统返回失败。

    这一机制通常用于检查进程是否处于活动状态,而无需实际干扰其正常运行。这在编写一些脚本或程序时很有用,以确保进程在继续其他操作之前仍然存在。

    例如,可以使用kill -0命令来检查指定进程是否存在,如下所示:

    kill -0 <pid>

    这里,<pid>是要检查的进程的进程ID。如果进程存在,该命令将返回成功,否则返回失败。

  2. SIGINT (2):由终端键盘中断字符(通常是Ctrl+C)产生,用于中断当前进程。
  3. SIGTERM (15):用于请求进程正常终止,通常由kill命令发送。
  4. SIGKILL (9):用于强制终止进程,进程无法捕获和忽略这个信号
  5. SIGSEGV (11):表示进程访问了无效的内存地址,通常是因为编程错误导致的段错误。
  6. SIGALRM (14):定时器超时信号,可以通过alarmsetitimer设置定时器后产生。
  7. SIGHUP (1):挂起信号,通常在终端关闭时发送给与该终端关联的进程。
  8. SIGCHLD (17):子进程状态变化,通常在子进程终止时由父进程接收。
  9. SIGUSR1 (10)SIGUSR2 (12):用户定义的信号,可以由进程自定义使用。

在C语言中,使用signal函数来注册信号处理函数。signal函数的原型如下:

#include <signal.h>

void (*signal(int signum, void (*handler)(int)))(int);

其中,signum 是要处理的信号编号,handler 是一个指向处理函数的指针。处理函数的原型通常是 void handler(int signum)

以下是一个简单的例子,演示如何使用 signal 函数捕获并处理 SIGINT 信号:

#include <stdio.h>
#include <stdlib.h>
#include <signal.h>

// 信号处理函数
void handle_sigint(int signum) {
    printf("Caught SIGINT, exiting...\n");
    exit(EXIT_SUCCESS);
}

int main() {
    // 注册信号处理函数
    signal(SIGINT, handle_sigint);

    printf("Press Ctrl+C to send SIGINT...\n");

    // 模拟程序运行
    while (1) {
        // 无限循环等待信号
    }

    return 0;
}

在这个例子中,程序注册了对 SIGINT 信号的处理函数 handle_sigint。当用户按下 Ctrl+C 时,程序将收到 SIGINT 信号,然后执行相应的处理函数。请注意,信号处理函数应该是尽可能简短和可靠的,以避免在信号处理期间发生不可预测的行为。

Linux多进程

写入缓冲区的问题

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
// 多进程支持
#include <unistd.h>

int main()
{

  FILE *fp = fopen("./test.txt", "w+");
  fprintf(fp, "FILE *fp = fopen(\"./test.txt\", \"w+\")\n");

  int pid = fork();

  if (pid == 0)
  {
    printf("child process, pid=%d\n", getpid());
    fprintf(fp, "child process, pid=%d\n", getpid());
  }

  if (pid > 0)
  {
    printf("parent process, pid=%d\n", getpid());
    fprintf(fp, "parent process, pid=%d\n", getpid());
  }

  fclose(fp);

  return 0;
}

运行上述代码,结果如下:

[bill@localhost c]$ g++ -o process process1.cpp
[bill@localhost c]$ ./process
parent process, pid=104181
child process, pid=104182

打开test.txt文件内容如下:

FILE *fp = fopen("./test.txt", "w+")
parent process, pid=104181
FILE *fp = fopen("./test.txt", "w+")
child process, pid=104182

Q&A

Q:主进程明明只写入了一次FILE *fp = fopen("./test.txt", "w+"),但是文本文件中却有两行?

A:FILE *fp = fopen("./test.txt", "w+")文本长度过短,文本实际上只放到了程序的缓冲区(内存)中,只要在11行代码下一行插入fflush(fp),在fork()之前刷新缓冲区即可解决。

Q:如何避免僵尸进程的出现?

A:

  • 方法一:忽略SIGCHLD信号

    #include <signal.h>
    int main()
    {
        signal(SIGCHLD, SIG_IGN);
        // 业务代码
        return 0;
    }
  • 方法二:父进程等待子进程结束,但是此方法会让父进程一直阻塞等待

    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    // 多进程支持
    #include <unistd.h>
    #include <signal.h>
    // wait
    #include <sys/types.h>
    #include <sys/wait.h>
    
    int main()
    {
      int pid = fork();
    
      // 业务代码
    
      if (pid > 0)
      {
        printf("parent process, pid=%d\n", getpid());
        fprintf(fp, "parent process, pid=%d\n", getpid());
        int status;
        wait(&status); // 等待子进程结束
      }
    
      // 业务代码
      return 0;
    }
  • 方法三:结合方法一、二,在捕捉到SIGCHLD信号后调用wait()方法,这样可以解决父进程阻塞等待的问题

    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    // 多进程支持
    #include <unistd.h>
    #include <signal.h>
    
    #include <sys/types.h>
    #include <sys/wait.h>
    
    void waitHandler(int sig)
    {
      int status;
      wait(&status); // 等待子进程结束
    }
    
    int main()
    {
      signal(SIGCHLD, waitHandler);
      int pid = fork();
      if (pid == 0)
      {
        printf("child process, pid=%d\n", getpid());
      }
      if (pid > 0)
      {
        printf("parent process, pid=%d\n", getpid());
      }
      return 0;
    }

说明

  • 子进程获得了父进程的数据空间、堆栈的副本,不是共享
  • 父进程中打开的文件描述符也被复制到子进程中
  • 如果父进程先退出,子进程会变成孤儿进程
  • 如果子进程先退出,内核向父进程发送SIGCHLD信号,如果父进程不处理这个信号,子进程会变成僵尸进程
  • 如果子进程在父进程之前终止,内核为每个子进程保留了一个数据结构,包括进程编号、终止状态和使用cpu时间等,父进程如果处理了子进程退出的信息,内核就会释放这个数据结构,如果父进程没处理子进程退出的信息,内核就不会释放这个数据结构,子进程进程编号就会一直被占用,但是系统可用的进程号是有限的,如果大量的产生僵尸死进程,将因为没有可用的进程号而导致系统不能产生新的进程,这就是僵尸进程的危害。
  • 如果父进程先退出,子进程会变成孤儿进程,会被init进程(1号进程)接管,由init进程回收
  • 如果子进程先退出,父进程会收到SIGCHLD信号,可以通过wait函数回收子进程,也可以通过signal函数注册信号处理函数回收子进程,否则子进程会变成僵尸进程

服务程序的调度

  • [x] 周期性的启动后台程序
  • [x] 常驻内存中的服务程序异常终止,在短时间内重启

execl

#include <unistd.h>

int execl(const char *path, const char *arg0, ... /*, (char *) NULL */ );

其中的参数意义如下:

  • path: 要执行的新程序的路径。
  • arg0, arg1, ...: 新程序的命令行参数,以字符串形式传递。arg0 一般用于传递新程序的名称,而后续的参数用于传递命令行参数。参数列表必须以一个空指针 (char *) NULL 结尾。

简单来说,execl 的参数是一个以 path 开始的字符串序列,最后以 NULL 结尾。

例如,如果你想在当前进程中执行 /bin/ls 程序,并传递参数 -l,可以使用如下的调用:

execl("/usr/bin/ls", "/usr/bin/ls", "-l", (char *) NULL);

在这个调用中,"/usr/bin/ls" 是新程序的路径,"/usr/bin/ls" 是新程序的名称(在 argv[0] 中),"-l" 是新程序的参数。最后的 (char *) NULL 表示参数列表的结束。

请注意,execl 函数的参数是一个变长参数列表,因此实际的参数数量是可变的,但是必须以 NULL 结尾。在传递参数时,需要按照参数在命令行中的顺序逐个列出。

  • 为什么execl()前两个参数值相同?

execl 中,argv[0] 通常用于指定新程序的名称。如果你不在参数列表中明确指定 argv[0],那么系统会假设 argv[0] 为新程序的路径(在这个例子中就是 "/usr/bin/ls")。因此,如果你只关心执行的程序是什么而不关心在 argv[0] 中是什么,那么 execl("/usr/bin/ls", "-l", (char *) NULL); 就足够了。

实际上,很多时候,程序员会选择使用 execl("/usr/bin/ls", "/usr/bin/ls", "-l", (char *) NULL); 来显式指定 argv[0],以提高代码的可读性。这样做可以清晰地表达新程序的名称,使代码更易于理解。

总体而言,两者都是合法的,可以根据个人偏好来选择。

exec是用参数中的命令替换当前进程(的正文段、数据段、堆栈),替换成功后,当前进程就变成了新的进程,不会再执行exec后面的代码。如果exec执行失败,当前进程还是原来的进程,会继续执行exec后面的代码。
  • 执行正确的exec命令
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>

int main()
{
  printf("before exec\n");
  // 正确的exec命令
  execl("/usr/bin/ls", "ls", "-l", (char *)NULL);
  printf("after exec\n", ret);
  return 0;
}

运行结果

[bill@localhost c]$ ./procctl1
before exec
总用量 20
-rw-rw-r--. 1 bill bill   91 1月  29 10:35 makefile
-rwxrwxr-x. 1 bill bill 8416 1月  29 10:54 procctl1
-rw-rw-r--. 1 bill bill  580 1月  29 10:54 procctl1.cpp
  • 执行错误的exec命令
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>

int main()
{
  printf("before exec\n");
  // 错误的exec命令
  int ret = execl("/usr/bin/lss", "ls", "-l", (char *)NULL);
  printf("after exec, ret=%d\n", ret);
  return 0;
}

运行结果

[bill@localhost c]$ ./procctl1
before exec
after exec, ret=-1

使用fork()进行进程调度

  • 1、先执行fork()函数创建子进程,在子进程中调用execl()函数执行新的程序
  • 2、execl()会替换子进程的正文段、数据段、堆栈,子进程会变成新的进程,不会再执行fork()后面的代码
  • 3、在父进程中调用wait()函数等待子进程的运行结果
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/wait.h>

int main()
{
  while (true)
  {
    // 子进程中调用execl()函数执行新的程序
    if (fork() == 0)
    {
      execl("/usr/bin/ls", "ls", "-l", (char *)NULL);
    }
    else
    {
      // 父进程
      int status;
      wait(&status);
      sleep(10);
    }
  }
  return 0;
}

运行结果

[bill@localhost c]$ ./procctl1
总用量 20
-rw-rw-r--. 1 bill bill   91 1月  29 10:35 makefile
-rwxrwxr-x. 1 bill bill 8512 1月  29 11:09 procctl1
-rw-rw-r--. 1 bill bill 1300 1月  29 11:08 procctl1.cpp
总用量 20
-rw-rw-r--. 1 bill bill   91 1月  29 10:35 makefile
-rwxrwxr-x. 1 bill bill 8512 1月  29 11:09 procctl1
-rw-rw-r--. 1 bill bill 1300 1月  29 11:08 procctl1.cpp
# ...(每10s输出一次)...

execv

#include <unistd.h>

int execv(const char *path, char *const argv[]);

其中:

  • path: 要执行的新程序的路径
  • argv: 一个以空指针 (char *) NULL 结尾的字符串数组,用于传递命令行参数
  • 当传入参数不固定时,可以使用execv()方法,execv支持不确定个数的参数
  • 当使用execl()时,可能遇到如下情况:
if (argc == 3)
    execl(argv[2], argv[2], (char *)NULL);
if (argc == 4)
    execl(argv[2], argv[2], argv[3], (char *)NULL);
if (argc == 5)
    execl(argv[2], argv[2], argv[3], argv[4], (char *)NULL);
if (argc == 6)
    execl(argv[2], argv[2], argv[3], argv[4], argv[5], (char *)NULL);

由于不确定外部传入多少个参数,代码过于冗余,此时可以使用execv

execv(argv[2], argv + 2);

实现定时调度器

  • 使用示例:

    ./procctl 5 /usr/bin/ls -lt /home/bill
  • Usage: ./procctl interval(sec) program [arg1] [arg2] ...
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/wait.h>

int main(int argc, char *argv[])
{
  if (argc < 3)
  {
    printf("usage: ./procctl interval(sec) program [arg1] [arg2] ...\n");
    printf("example: /home/bill/project/tools1/bin/procctl 5 /usr/bin/ls -lt /home/bill\n\n");

    printf("本程序是服务程序的调度程序,周期性启动服务程序或shell脚本。\n");
    printf("timetvl 运行周期,单位:秒。被调度的程序运行结束后,在timetvl秒后会被procctl重新启动。\n");
    printf("program 被调度的程序名,必须使用全路径。\n");
    printf("argvs   被调度的程序的参数。\n");
    printf("注意,本程序不会被kill杀死,但可以用kill -9强行杀死。\n\n\n");
    return -1;
  }

  // 忽略掉全部信号,关掉IO
  for (int i = 0; i < 64; i++)
  {
    signal(i, SIG_IGN);
    // 关掉IO
    close(i);
  }

  if (fork() > 0)
  {
    // 父进程退出
    exit(0);
  }

  signal(SIGCHLD, SIG_DFL);

  while (true)
  {
    // 子进程中调用execl()函数执行新的程序
    if (fork() == 0)
    {
      execv(argv[2], argv + 2);
      exit(0); // 如果execv执行失败,子进程会变成僵尸进程,所以这里要退出
    }
    else
    {
      // 父进程
      int status;
      wait(&status);
      sleep(atoi(argv[1]));
    }
  }
  return 0;
}

在这个守护进程的设计中,涉及到了三层进程关系:

  1. 父进程(Original Parent): 初始启动的由用户发起的进程。在第一次调用 fork() 后,这个进程退出,但是留下了子进程1(守护进程)。
  2. 子进程1(Daemon Process): 由父进程生成的第一个子进程,在 fork() 之后立即退出。成为守护进程,负责周期性地启动子进程2执行指定的程序。
  3. 子进程2(Executed Program): 由子进程1通过 fork() 生成的进程,执行指定的程序(例如通过 execv 启动的外部程序)。当这个程序执行完成后,子进程2退出,发送 SIGCHLD 信号给父进程(子进程1)。

整个进程关系如下:

  • 初始启动:父进程

    • 第一次fork():生成子进程1(守护进程),并立即退出

      • 循环中的fork():生成子进程2(执行指定的程序),执行完成后退出,发送SIGCHLD信号

        • 父进程(子进程1):捕获 SIGCHLD 信号,等待子进程2终止,然后继续循环

这种三层关系确保了父进程(子进程1)的退出不影响守护进程的运行,而守护进程可以在后台周期性地启动并等待子进程2的终止。

Linux共享内存

实现步骤:

  • 1、使用shmget()函数创建共享内存
  • 2、使用shmat()函数将共享内存映射到进程的地址空间
  • 3、使用shmdt()函数将共享内存从进程的地址空间中分离
  • 4、使用shmctl()函数可以使用cmd=IPC_RMID删除共享内存

shmget

shmget 是一个用于获取共享内存段的系统调用,通常用于进程间通信(IPC)。在Unix/Linux系统中,共享内存是一块被多个进程共享的内存区域,允许它们直接读写其中的数据,从而实现进程之间的协作。

#include <sys/ipc.h>
#include <sys/shm.h>

int shmget(key_t key, size_t size, int shmflg);
  • key:共享内存段的关键字,用于标识共享内存段。
  • size:共享内存段的大小(以字节为单位)。
  • shmflg:标志位,用于指定共享内存的访问权限和行为。

函数返回值:

  • 成功时返回一个正整数,表示共享内存段的标识符(ID)。
  • 失败时返回 -1,并设置 errno 表示具体的错误类型。

在Linux系统中可以使用ipcsipcrm查看和删除共享内存:

bill@205356368e4f:~/project/other-demo/c$ ipcs -m

------ Shared Memory Segments --------
key        shmid      owner      perms      bytes      nattch     status
0x00005005 0          bill       640        56         0

bill@205356368e4f:~/project/other-demo/c$ ipcrm -m 0
bill@205356368e4f:~/project/other-demo/c$ ipcs -m

------ Shared Memory Segments --------
key        shmid      owner      perms      bytes      nattch     status
  • ipcrm -m shmid可以删除对应的共享内存
  • key:共享内存段的键值,这是用于标识 IPC 对象的关键字。
  • shmid:共享内存段的标识符,是一个唯一的整数,由系统分配。
  • owner:拥有该共享内存段的进程或用户的用户名。
  • perms:共享内存段的权限,以八进制表示。例如,640 表示读写权限为 6(读权限)和 4(写权限),权限数字之间按位组合而成。
  • bytes:共享内存段的大小,以字节为单位。
  • nattch:当前附加到共享内存段的进程数(即使用该共享内存的进程数)。
  • status:表示共享内存段的状态。通常是 0 或者其他状态码,表示共享内存段是否被使用等信息。

shmat

shmat 是一个用于将共享内存段附加到进程地址空间的系统调用,通常用于进程间共享内存的操作。它允许进程直接访问共享内存区域中的数据。

#include <sys/shm.h>

void *shmat(int shmid, const void *shmaddr, int shmflg);
  • shmid:共享内存段的标识符,即通过 shmget 获得的标识符。
  • shmaddr:指定共享内存段附加到进程地址空间的地址。通常设置为 NULL,由系统自动选择合适的地址。
  • shmflg:标志位,用于指定共享内存的访问权限和行为。

函数返回值:

  • 成功时返回一个 void 指针,指向共享内存段附加到进程地址空间的起始地址。
  • 失败时返回 (void *) -1,并设置 errno 表示具体的错误类型。

shmdt

shmdt 函数用于将共享内存段从当前进程的地址空间中分离。它接受一个参数,即指向共享内存段附加到进程地址空间的起始地址的指针。

#include <sys/shm.h>

int shmdt(const void *shmaddr);
  • shmaddr:指向共享内存段附加到进程地址空间的起始地址。

函数返回值:

  • 成功时返回 0。
  • 失败时返回 -1,并设置 errno 表示具体的错误类型。

shmctl

shmctl 是一个用于控制共享内存段的系统调用。它可以执行多种操作,例如获取共享内存信息、设置共享内存权限、删除共享内存等。

#include <sys/shm.h>

int shmctl(int shmid, int cmd, struct shmid_ds *buf);
  • shmid:共享内存段的标识符,是通过 shmget 获取的。
  • cmd:控制命令,指定 shmctl 要执行的操作。
  • buf:指向 shmid_ds 结构的指针,用于传递或获取共享内存段的信息。

函数返回值:

  • 成功时返回操作相关的信息或 0。
  • 失败时返回 -1,并设置 errno 表示具体的错误类型。

下面是一些常用的 cmd 值和相应的操作:

  • IPC_STAT:获取共享内存段的状态信息,将信息存储在 buf 中。
  • IPC_SET:设置共享内存段的状态,使用 buf 中的信息。
  • IPC_RMID:删除共享内存段。

Demo

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/ipc.h>
#include <sys/shm.h>

struct st_pid
{
  int pid;       // 进程编号
  char name[51]; // 进程名称
};

int main(int argc, char *argv[])
{
  if (argc < 2)
  {
    printf("Usage: ./share-memory memoryname\n");
    return -1;
  }

  // 共享内存标志
  int shmid;

  if ((shmid = shmget(0x5005, sizeof(struct st_pid), 0640 | IPC_CREAT)) == -1)
  {
    printf("shmget(0x5005) failed\n");
    return -1;
  }

  // 指向共享内存地址的变量
  struct st_pid *stpid;

  if ((stpid = (struct st_pid *)shmat(shmid, 0, 0)) == (void *)-1)
  {
    printf("shmat failed\n");
    return -1;
  }

  // 读取共享内存
  printf("stpid->pid = %d\n", stpid->pid);
  printf("stpid->name = %s\n", stpid->name);

  // 操作共享内存
  stpid->pid = getpid();
  strcpy(stpid->name, argv[1]);

  // 把共享内存从当前进程中分离
  shmdt(stpid);

  // 删除共享内存
  // if (shmctl(shmid, IPC_RMID, NULL) == -1)
  // {
  //   printf("shmctl failed\n");
  // }

  return 0;
}
  • 为了方便展示,把删除共享内存代码做了注释
  • 编译运行,最后手动删掉共享内存
bill@205356368e4f:~/project/other-demo/c$ g++ -o share-memory share-memory.cpp
bill@205356368e4f:~/project/other-demo/c$ ./share-memory aaa
stpid->pid = 0
stpid->name =
bill@205356368e4f:~/project/other-demo/c$ ./share-memory bbb
stpid->pid = 110540
stpid->name = aaa
bill@205356368e4f:~/project/other-demo/c$ ./share-memory ccc
stpid->pid = 110553
stpid->name = bbb
bill@205356368e4f:~/project/other-demo/c$ ./share-memory ddd
stpid->pid = 110584
stpid->name = ccc
bill@205356368e4f:~/project/other-demo/c$ ./share-memory eee
stpid->pid = 110603
stpid->name = ddd
bill@205356368e4f:~/project/other-demo/c$ ipcs -m

------ Shared Memory Segments --------
key        shmid      owner      perms      bytes      nattch     status
0x00005005 2          bill       640        56         0

bill@205356368e4f:~/project/other-demo/c$ ipcrm -m 2
bill@205356368e4f:~/project/other-demo/c$ ipcs -m

------ Shared Memory Segments --------
key        shmid      owner      perms      bytes      nattch     status

共享内存唯一标识

  • int shmid = shmget(key, 1024, IPC_CREAT | 0666),可以看到共享内存的唯一标识符是key,如果两个不相干的程序使用了相同的 key 值创建共享内存区域,就会发生冲突,导致数据错乱或不可预测的行为。为了避免这种情况,需要确保在使用 ftok() 函数生成 key 值时,提供的文件路径和字符是唯一的。通常建议使用程序自己的文件路径和一个唯一的字符来生成 key 值,这样可以确保不同程序生成的 key 值是唯一的。另外,为了进一步确保不同程序之间的 key 值不冲突,可以在程序中定义一个特定的 key 值,而不是依赖于动态生成的 key 值。这样就可以避免由于不同程序生成的 key 值相同而导致的冲突问题。
  • ftok() 函数是用于生成一个 System V IPC(Inter-Process Communication,进程间通信)所需的键值的函数。这个键值通常用于标识共享内存、消息队列和信号量等 IPC 对象。
  • ftok() 函数的原型如下所示:
key_t ftok(const char *pathname, int proj_id);

其中,pathname 是一个指向文件路径名的指针,而 proj_id 是一个用户定义的整数,通常是一个小于或等于 255 的正整数。

ftok() 函数通过将 pathname 参数所指向的文件的索引节点号和 proj_id 参数组合成一个唯一的键值。这个键值用于创建或获取 IPC 对象。

ftok() 函数返回一个键值,如果发生错误,则返回 -1,并设置 errno 来指示错误类型。

值得注意的是,ftok() 函数的生成键值的方法并不是十分安全,特别是在多线程环境或者频繁创建文件的情况下,可能会导致冲突。因此,应该谨慎使用,并确保在生成键值时,pathname 参数指向的文件是唯一的,并且 proj_id 参数是固定的且不易冲突的。

Linux信号量

  • 使用信号量给共享资源加锁,防止出现线程不安全问题
  • 信号量本质上是一个非负数(>0)的计数器
  • 给共享资源建立一个标志,表示该共享内存被占用的情况
  • P操作(资源-1)、V操作(资源+1)

CSEM信号量操作类使用说明

1. 创建 CSEM 对象

CSEM sem;

2. 初始化信号量

// key: 信号量的键值
// value: 信号量的初始值,默认为1
// sem_flg: 信号量的标志,默认为SEM_UNDO
bool initSuccess = sem.init(key, value, sem_flg);
if (!initSuccess) {
    // 处理初始化失败的情况
    perror("Semaphore initialization failed");
    return -1;
}

3. 使用 P 操作(获取信号量)

// sem_op: P 操作的值,通常为-1
bool pSuccess = sem.P(sem_op);
if (!pSuccess) {
    // 处理 P 操作失败的情况
    perror("P operation failed");
    return -1;
}

4. 使用 V 操作(释放信号量)

// sem_op: V 操作的值,通常为1
bool vSuccess = sem.V(sem_op);
if (!vSuccess) {
    // 处理 V 操作失败的情况
    perror("V operation failed");
    return -1;
}

5. 获取信号量的当前值

int semValue = sem.value();
if (semValue == -1) {
    // 处理获取信号量值失败的情况
    perror("Failed to get semaphore value");
    return -1;
}

6. 销毁信号量

bool destroySuccess = sem.destroy();
if (!destroySuccess) {
    // 处理销毁信号量失败的情况
    perror("Semaphore destruction failed");
    return -1;
}

7. 注意事项

  • 信号量的键值 key 应该在进程间共享,并保证唯一性。
  • 在使用信号量前需要初始化,可以使用 init 方法。
  • P 操作用于获取信号量,V 操作用于释放信号量。
  • 使用完信号量后,应该通过 destroy 方法来销毁信号量。

这个类主要用于在多进程之间实现临界区的同步控制,确保在共享资源的访问中只有一个进程可以进行。在使用时要特别注意初始化和销毁的时机,以及合适地使用 P 操作和 V 操作。

CSEM信号量操作类源码

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <unistd.h>
#include <errno.h>
#include <stdio.h>

// 信号量。
class CSEM
{
private:
  union semun // 用于信号量操作的共同体。
  {
    int val;
    struct semid_ds *buf;
    unsigned short *arry;
  };

  int m_semid; // 信号量描述符。

  // 如果把sem_flg设置为SEM_UNDO,操作系统将跟踪进程对信号量的修改情况,
  // 在全部修改过信号量的进程(正常或异常)终止后,操作系统将把信号量恢
  // 复为初始值(就像撤消了全部进程对信号的操作)。
  // 如果信号量用于表示可用资源的数量(不变的),设置为SEM_UNDO更合适。
  // 如果信号量用于生产消费者模型,设置为0更合适。
  // 注意,网上查到的关于sem_flg的用法基本上是错的,一定要自己动手多测试。
  short m_sem_flg;

public:
  CSEM();
  // 如果信号量已存在,获取信号量;如果信号量不存在,则创建它并初始化为value。
  bool init(key_t key, unsigned short value = 1, short sem_flg = SEM_UNDO);
  bool P(short sem_op = -1); // 信号量的P操作。
  bool V(short sem_op = 1);  // 信号量的V操作。
  int value();               // 获取信号量的值,成功返回信号量的值,失败返回-1。
  bool destroy();            // 销毁信号量。
  ~CSEM();
};

CSEM::CSEM()
{
  m_semid = -1;
  m_sem_flg = SEM_UNDO;
}

// 如果信号量已存在,获取信号量;如果信号量不存在,则创建它并初始化为value。
bool CSEM::init(key_t key, unsigned short value, short sem_flg)
{
  if (m_semid != -1)
    return false;

  m_sem_flg = sem_flg;

  // 信号量的初始化不能直接用semget(key,1,0666|IPC_CREAT),因为信号量创建后,初始值是0。

  // 信号量的初始化分三个步骤:
  // 1)获取信号量,如果成功,函数返回。
  // 2)如果失败,则创建信号量。
  // 3) 设置信号量的初始值。

  // 获取信号量。
  if ((m_semid = semget(key, 1, 0666)) == -1)
  {
    // 如果信号量不存在,创建它。
    if (errno == 2)
    {
      // 用IPC_EXCL标志确保只有一个进程创建并初始化信号量,其它进程只能获取。
      if ((m_semid = semget(key, 1, 0666 | IPC_CREAT | IPC_EXCL)) == -1)
      {
        if (errno != EEXIST)
        {
          perror("init 1 semget()");
          return false;
        }
        if ((m_semid = semget(key, 1, 0666)) == -1)
        {
          perror("init 2 semget()");
          return false;
        }

        return true;
      }

      // 信号量创建成功后,还需要把它初始化成value。
      union semun sem_union;
      sem_union.val = value; // 设置信号量的初始值。
      if (semctl(m_semid, 0, SETVAL, sem_union) < 0)
      {
        perror("init semctl()");
        return false;
      }
    }
    else
    {
      perror("init 3 semget()");
      return false;
    }
  }

  return true;
}

bool CSEM::P(short sem_op)
{
  if (m_semid == -1)
    return false;

  struct sembuf sem_b;
  sem_b.sem_num = 0;     // 信号量编号,0代表第一个信号量。
  sem_b.sem_op = sem_op; // P操作的sem_op必须小于0。
  sem_b.sem_flg = m_sem_flg;
  if (semop(m_semid, &sem_b, 1) == -1)
  {
    perror("p semop()");
    return false;
  }

  return true;
}

bool CSEM::V(short sem_op)
{
  if (m_semid == -1)
    return false;

  struct sembuf sem_b;
  sem_b.sem_num = 0;     // 信号量编号,0代表第一个信号量。
  sem_b.sem_op = sem_op; // V操作的sem_op必须大于0。
  sem_b.sem_flg = m_sem_flg;
  if (semop(m_semid, &sem_b, 1) == -1)
  {
    perror("V semop()");
    return false;
  }

  return true;
}

// 获取信号量的值,成功返回信号量的值,失败返回-1。
int CSEM::value()
{
  return semctl(m_semid, 0, GETVAL);
}

bool CSEM::destroy()
{
  if (m_semid == -1)
    return false;

  if (semctl(m_semid, 0, IPC_RMID) == -1)
  {
    perror("destroy semctl()");
    return false;
  }

  return true;
}

CSEM::~CSEM()
{
}

关于SEM_UNDO

当在信号量上执行 P(wait)和 V(signal)操作时,如果进程在执行这些操作的过程中由于某些原因(如收到信号、进程异常终止等)被中断,可能会导致信号量的值发生变化,但系统无法自动恢复信号量到操作前的状态。这就是为什么引入 SEM_UNDO 标志的原因。

SEM_UNDO 标志用于启用信号量的撤销机制。当设置了 SEM_UNDO 标志后,在进程对信号量执行 P 操作时,系统将会记住进程的 PID,并在进程终止时自动执行 V 操作,以恢复信号量的值。

具体来说,使用 SEM_UNDO 标志时,在进程执行 P 操作时,系统将会为该进程关联一个撤销值(undo value)。如果该进程异常终止,系统会根据该撤销值自动执行 V 操作,恢复信号量的值。

这样做的目的是确保在进程异常退出的情况下,系统能够自动撤销该进程对信号量的操作,防止因为进程异常退出而导致信号量值的错误。在多进程环境中,信号量的撤销机制有助于维护系统的稳定性和一致性。

使用CMES类对共享内存进行加锁

// share-memory.cpp

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <_public.h>

CSEM sem; // 给共享内存加锁的信号量

struct st_pid
{
  int pid;       // 进程编号
  char name[51]; // 进程名称
};

int main(int argc, char *argv[])
{
  if (argc < 2)
  {
    printf("Usage: ./share-memory memoryname\n");
    return -1;
  }

  // 共享内存标志
  int shmid;

  if ((shmid = shmget(0x5005, sizeof(struct st_pid), 0640 | IPC_CREAT)) == -1)
  {
    printf("shmget(0x5005) failed\n");
    return -1;
  }

  // 如果信号量已存在,获取信号量;信号量不存在,则创建并初始化为value,缺省值为1
  if (sem.init(0x5005) == false)
  {
    printf("sem.init(0x5005) failed\n");
    return -1;
  }

  // 指向共享内存地址的变量
  struct st_pid *stpid;

  if ((stpid = (struct st_pid *)shmat(shmid, 0, 0)) == (void *)-1)
  {
    printf("shmat failed\n");
    return -1;
  }

  printf("Before sem.P(), time=%ld, sem.value=%d\n", time(0), sem.value());
  // P操作,加锁
  sem.P();
  printf("After sem.P(), time=%ld, sem.value=%d\n", time(0), sem.value());

  // 读取共享内存
  printf("stpid->pid = %d\n", stpid->pid);
  printf("stpid->name = %s\n", stpid->name);

  // 操作共享内存
  stpid->pid = getpid();
  sleep(10); // 为了测试加锁
  strcpy(stpid->name, argv[1]);

  printf("Before sem.V(), time=%ld, sem.value=%d\n", time(0), sem.value());
  // V操作
  sem.V();
  printf("After sem.V(), time=%ld, sem.value=%d\n", time(0), sem.value());

  // 把共享内存从当前进程中分离
  shmdt(stpid);

  // 删除共享内存
  if (shmctl(shmid, IPC_RMID, NULL) == -1)
  {
    printf("shmctl failed\n");
  }

  return 0;
}
  • 只运行一个进程
$ ./share-memory mn
Before sem.P(), time=1707071697, sem.value=1
After sem.P(), time=1707071697, sem.value=0
stpid->pid = 24904
stpid->name = mn
Before sem.V(), time=1707071707, sem.value=0
After sem.V(), time=1707071707, sem.value=1
  • 打开两个终端,先后(相差10s以内)运行两个进程

    • 终端1(先运行)

      $ ./share-memory mn
      Before sem.P(), time=1707078282, sem.value=1
      After sem.P(), time=1707078282, sem.value=0
      stpid->pid = 0
      stpid->name =
      Before sem.V(), time=1707078292, sem.value=0
      After sem.V(), time=1707078292, sem.value=0
    • 终端2(后运行)

      ./share-memory mn
      Before sem.P(), time=1707078283, sem.value=0
      After sem.P(), time=1707078292, sem.value=0
      stpid->pid = 64858
      stpid->name = mn
      Before sem.V(), time=1707078302, sem.value=0
      After sem.V(), time=1707078302, sem.value=1
      shmctl failed
    • 先运行的终端1,Before sem.P(), time=1707078282, sem.value=1,,由于终端1正在占用共享内存,因此终端2启动时,Before sem.P(), time=1707078283, sem.value=0
    • 终端1释放锁信号量时,After sem.V(), time=1707078292, sem.value=0,是因为释放的信号量立刻被终端2占用了

进程的心跳机制

在下一章节的守护进程中会用到心跳机制来判定服务程序的状态

// 进程心跳信息的结构体
struct st_procinfo
{
  int pid;        // 进程ID
  char pname[64]; // 进程名
  int timeout;    // 超时时间
  time_t atime;   // 最后一次心跳时间
};
  • 服务程序在共享内存中维护自己的心跳信息
  • 开发守护程序,终止已经死机的服务程序

PAction心跳工具类类使用说明

心跳类(PAction)是一个用于实现守护进程心跳功能的C++类。该类利用共享内存和信号量机制,实现了守护进程之间的心跳信息共享和更新。

公有成员函数

bool AddPinfo(const int timeout, const char *pname)

  • 描述:

    • 添加心跳信息到共享内存中。
  • 参数:

    • timeout: 心跳超时时间。
    • pname: 进程名。
  • 返回值:

    • 成功返回 false,失败返回 true

bool UptATime()

  • 描述:

    • 更新共享内存中当前进程的心跳信息的时间。
  • 返回值:

    • 成功返回 false

调用Demo

#include <unistd.h>

int main(int argc, char *argv[])
{
  PAction action;                        // 创建一个心跳对象
  action.AddPinfo(30, "test-heartbeat"); // 添加心跳信息,超时时间为30秒,进程名为test-heartbeat
  while (true)
  {
    action.UptATime(); // 更新心跳时间
    sleep(2);          // 模拟进程的其他工作
  }

  return 0;
}

PAction类源码

  • 信号量用到了上个章节写的CSEM
#include <stdio.h>
#include <unistd.h>
#include <_public.h>

#define MAXNUMP_ 1000   // 最大守护进程个数,1000个已经足够了
#define SHMKEYP_ 0x1234 // 共享内存的key
#define SEMKEYP_ 0x1234 // 信号量的key

// 进程心跳信息的结构体
struct st_pinfo
{
  int pid;        // 进程ID
  char pname[64]; // 进程名
  int timeout;    // 超时时间
  time_t atime;   // 最后一次心跳时间
};

class PAction
{
private:
  CSEM m_sem;             // 给共享内存加锁的信号量
  int m_shmid;            // 共享内存ID
  struct st_pinfo *m_shm; // 共享内存指针
  int m_pos;              // 当前进程在共享内存中的位置

public:
  PAction();
  bool AddPinfo(const int timeout, const char *pname); // 添加心跳信息
  bool UptATime();                                     // 更新心跳时间
  ~PAction();                                          // 析构函数,删除共享内存中的心跳信息
};

PAction::PAction()
{
  m_shm = nullptr;
  m_pos = -1;
  m_shmid = 0;
}

bool PAction::AddPinfo(const int timeout, const char *pname)
{
  // 创建/获取共享内存,大小为n*sizeof(st_pinfo),n为要守护的进程个数
  if ((m_shmid = shmget(SHMKEYP_, MAXNUMP_ * sizeof(st_pinfo), 0640 | IPC_CREAT)) == -1)
  {
    printf("shmget(%x) failed\n", SHMKEYP_);
    return -1;
  }

  // 初始化共享内存信号量
  CSEM m_sem;
  if (m_sem.init(SEMKEYP_) == false)
  {
    printf("m_sem.init(%x) error\n", SEMKEYP_);
    return -1;
  }

  // 获取共享内存的指针(内存地址)
  m_shm = (struct st_pinfo *)shmat(m_shmid, 0, 0);

  // 创建当前进程心跳信息
  struct st_pinfo *pinfo = new struct st_pinfo;
  memset(pinfo, 0, sizeof(st_pinfo));                     // 初始化心跳信息
  pinfo->pid = getpid();                                  // 当前进程ID
  STRNCPY(pinfo->pname, sizeof(pinfo->pname), pname, 64); // 安全的字符串拷贝
  pinfo->timeout = timeout;                               // 超时时间为30秒
  pinfo->atime = time(NULL);                              // 当前时间

  // 进程pid是循环使用的,如果曾经有一个进程异常退出,没有清理自己的心跳信息,
  // 它的心跳信息会残留在共享内存中。如果不巧,新启动的进程pid恰好与异常退出的进程pid相同,
  // 这样会在共享内存中找到两条相同pid的心跳信息,
  // 当守护进程检查到残留进程的心跳信息后,会kill掉残留进程的pid,这个pid恰好是新启动的进程的pid,
  // 会导致新启动进程会被kill

  // 为了解决这个问题,在共享内存中如果查到相同pid的心跳信息(pid相同),就复用这块地址
  for (int i = 0; i < MAXNUMP_; i++)
  {
    if ((m_shm + i)->pid == pinfo->pid)
    {
      m_pos = i;
      break;
    }
  }

  // 如果没有找到正好与当前进程相同pid的残留心跳信息,就在共享内存中查找空位置,将当前进程心跳信息写入
  if (m_pos == -1)
  {
    for (int i = 0; i < MAXNUMP_; i++)
    {
      if ((m_shm + i)->pid == 0)
      {
        m_pos = i;
        break;
      }
    }
  }

  // 这里加锁比较合适
  m_sem.P();

  if (m_pos == -1)
  {
    m_sem.V(); // 没有空间了也要释放信号量
    printf("out of share mem, too many processes\n");
    return -1;
  }

  memcpy(m_shm + m_pos, pinfo, sizeof(st_pinfo));

  // 这里解锁比较合适
  m_sem.V();
  // 打印当前进程的心跳信息
  printf("Create heartbeat success, pid=%d, pname=%s, timeout=%d, atime=%ld\n", pinfo->pid, pinfo->pname, pinfo->timeout, pinfo->atime);
  // 释放pinfo内存
  delete pinfo;
  return false;
}

bool PAction::UptATime()
{
  // 更新共享内存中当前进程的心跳信息
  // 这里不需要加锁,不会有其他进程抢占这个位置
  (m_shm + m_pos)->atime = time(NULL);
  // 打印当前进程的心跳信息
  printf("Update heartbeat success, pid=%d, pname=%s, timeout=%d, atime=%ld\n", (m_shm + m_pos)->pid, (m_shm + m_pos)->pname, (m_shm + m_pos)->timeout, (m_shm + m_pos)->atime);
  return false;
}

PAction::~PAction()
{
  // 把当前进程的心跳信息从共享内存中删除
  // 这里不需要加锁,释放之后任意进程都可以抢占这个位置
  (m_shm + m_pos)->pid = 0;
  // memset((m_shm + m_pos), 0, sizeof(st_pinfo));
  // 释放共享内存的指针
  shmdt(m_shm);
  // 打印删除心跳信息的日志
  printf("Delete heartbeat success, pid=%d, pname=%s, timeout=%d, atime=%ld\n", (m_shm + m_pos)->pid, (m_shm + m_pos)->pname, (m_shm + m_pos)->timeout, (m_shm + m_pos)->atime);
}

守护进程

  • 服务程序由调度程序启动(procctl)
  • 如果服务程序死机(挂起),守护进程将终止它
  • 服务程序被终止后,调度程序(procctl)将重新启动它
  • 应该使用root用户启动,防止被其它程序误杀

exit函数与析构函数

  • exit不会调用局部对象的析构函数
  • exit会调用全局对象的析构函数
  • return会调用全局+局部对象的析构函数
  • 例如下面的程序,把CPActive active作为局部对象放在了main()函数中:
#include <_public.h>

void EXIT(int sig)
{
  printf("sig = %d \n", sig);
  exit(0);
}

int main(int argc, char *argv[])
{

  signal(SIGINT, EXIT);  // 信号2,ctrl+c
  signal(SIGTERM, EXIT); // 信号15,killall -15(正常结束程序)

  // 定时发送心跳
  CPActive active;
  active.AddPInfo(atoi(argv[2]), argv[1]);

  while (true)
  {
    active.UptATime();
    sleep(10);
  }

  return 0;
}
  • 当键盘输入ctrl+c信号退出程序时,并不会调用active对象的析构函数,导致该进程的心跳信息并没有从共享内存中删除
  • 下面把CPActive active作为全局对象放在了main()函数外:
#include <_public.h>

// 需要定义成全局对象。如果是局部对象,ctrl+c后,exit(0)不会调用析构函数
// return 会调用全局和局部对象的析构函数
CPActive active;

void EXIT(int sig)
{
  printf("sig = %d \n", sig);
  exit(0);
}

int main(int argc, char *argv[])
{

  signal(SIGINT, EXIT);  // 信号2,ctrl+c
  signal(SIGTERM, EXIT); // 信号15,killall -15(正常结束程序)

  // 定时发送心跳
  active.AddPInfo(atoi(argv[2]), argv[1]);

  while (true)
  {
    active.UptATime();
    sleep(10);
  }

  return 0;
}
  • 当把CPActive active设置为全局对象后,退出程序后,会自动调用CPActive 的析构函数,移除共享内存中的进程心跳信息

守护进程代码编写

  • 守护进程业务逻辑

    • 1)遍历共享内存中全部的进程心跳记录
    • 2)找到有效的进程心跳记录(pid!=0),向进程发送信号0,判断进程是否存在
    • 3)若不存在则从共享内存中删除该进程心跳记录,continue
    • 4)如果进程未超时,直接continue
    • 5)如果已超时,向进程发送SIGTERM(15)信号,尝试正常关闭进程
    • 6)等待一段时间后如果进程仍没有退出,再次发送SIGKILL(9)信号,强制终止进程
    • 7)从共享内存中删除该进程心跳记录
    • 8)循环1)
// checkproc.cpp
// Date: 2024/02/06
#include <_public.h>

// 程序运行日志
CLogFile logFile;

int main(int argc, char *argv[])
{
  // 程序使用帮助
  if (argc != 2)
  {
    printf("\n");
    printf("Using:./checkproc logfilename\n");

    printf("Example:/home/bill/project/tools1/bin/procctl 10 /home/bill/project/tools1/bin/checkproc /home/bill/project/runtime/log/checkproc.log\n\n");

    printf("本程序用于检查后台服务程序是否超时,如果已超时,就终止它。\n");
    printf("注意:\n");
    printf("  1)本程序由procctl启动,运行周期建议为10秒。\n");
    printf("  2)为了避免被普通用户误杀,本程序应该用root用户启动。\n");
    printf("  3)如果要停止本程序,只能用killall -9 终止。\n\n\n");

    return 0;
  }

  // 守护进程没有任何关心的信号,所以屏蔽所有信号
  // for (int i = 1; i <= 64; i++)
  // {
  //   signal(i, SIG_IGN);
  // }
  // 关闭全部信号输入输出
  CloseIOAndSignal(true);

  // 打开日志文件
  if (logFile.Open(argv[1], "a+") == false)
  {
    printf("logFile.Open(%s) failed. \n", argv[1]);
    return -1;
  }

  // 获取信号量
  CSEM m_sem;
  int semid;
  if ((semid = m_sem.init(SEMKEYP)) == false)
  {
    logFile.Write("m_sem.init(%d) failed. \n", SEMKEYP);
    return -1;
  }

  // 创建/读取共享内存
  int m_shmid;
  if ((m_shmid = shmget(SHMKEYP, MAXNUMP * sizeof(struct st_procinfo), 0640 | IPC_CREAT)) == -1)
  {
    logFile.Write("shmget(%d) failed. \n", SHMKEYP);
    return -1;
  }

  // 将共享内存连接到当前进程的地址空间
  struct st_procinfo *procinfo = (struct st_procinfo *)shmat(m_shmid, 0, 0);

  // 遍历共享内存中全部的进程心跳记录
  for (int i = 0; i < MAXNUMP; i++)
  {
    // 找到有效的进程心跳记录(pid!=0),向进程发送信号0,判断进程是否存在
    // 若不存在则从共享内存中删除该进程心跳记录,continue
    if ((procinfo + i)->pid == 0)
      continue;

    // 仅仅为了调试方便
    // logFile.Write("checkproc: pos=%d, pid=%d, timeout=%d, atime=%d\n", i, (procinfo + i)->pid, (procinfo + i)->timeout, (procinfo + i)->atime);

    int iret = kill((procinfo + i)->pid, 0);
    if (iret == -1)
    {
      logFile.Write("process pid=%d(%s) had not existed. \n", (procinfo + i)->pid, (procinfo + i)->pname);
      memset(procinfo + i, 0, sizeof(struct st_procinfo));
      continue;
    }

    time_t now = time(NULL);
    // 如果进程未超时,直接continue
    if (now - (procinfo + i)->atime < (procinfo + i)->timeout)
      continue;

    // 如果已超时,向进程发送SIGTERM(15)信号,尝试正常关闭进程
    logFile.Write("process pid=%d(%s) had not existed. \n", (procinfo + i)->pid, (procinfo + i)->pname);
    kill((procinfo + i)->pid, SIGTERM); // 发送信号15,尝试正常关闭进程

    // 等待一段时间后如果进程仍没有退出,再次发送SIGKILL(9)信号,强制终止进程
    for (int i = 0; i < 5; i++)
    {
      sleep(1);
      iret = kill((procinfo + i)->pid, 0); // 向进程发送信号0,判断进程是否存在
      if (iret == -1)                      // 进程已退出
        break;
    }
    if (iret == -1) // 进程未退出
    {
      logFile.Write("process pid=%d(%s) had exited. \n", (procinfo + i)->pid, (procinfo + i)->pname);
    }
    else
    {
      kill((procinfo + i)->pid, SIGKILL); // 发送信号9,强制终止进程
      logFile.Write("process pid=%d(%s) had force killed. \n", (procinfo + i)->pid, (procinfo + i)->pname);
    }

    // 从共享内存中删除该进程心跳记录
    memset(procinfo + i, 0, sizeof(struct st_procinfo));
  }

  // 把共享内存从当前进程的地址空间分离
  shmdt(procinfo);

  return 0;
}

FTP文件传输协议

FTP(File Transfer Protocol)是一种用于在网络上进行文件传输的协议。它允许用户通过网络从一个计算机向另一个计算机传输文件。FTP通常用于将文件上传到服务器或从服务器下载文件。

FTP协议的主要应用场景是内网中不同业务系统之间进行数据交换,效率不高也不安全,但是使用起来简单方便。

常用的FTP命令包括:

  1. connect: 连接到FTP服务器。通常使用 ftp 命令加上目标主机名或IP地址来连接到FTP服务器。

    ftp hostname_or_ip
  2. login: 登录到FTP服务器。在连接到FTP服务器后,您需要使用用户名和密码进行登录。

    user username
    password password
  3. cd: 切换FTP服务器上的当前工作目录。

    cd directory_name
  4. ls: 列出FTP服务器上当前工作目录中的文件和目录。

    ls
  5. get: 从FTP服务器下载文件到本地计算机。

    get filename
  6. put: 将文件从本地计算机上传到FTP服务器。

    put filename
  7. mkdir: 在FTP服务器上创建新的目录。

    mkdir directory_name
  8. delete: 删除FTP服务器上的文件。

    delete filename
  9. quit: 断开与FTP服务器的连接并退出FTP会话。

    quit

增量下载流程

  • 通过4个vector容器来实现增量下载,每次启动同步程序可以根据上次的状态,确定本次需要同步的文件列表,不会重复下载本地已有的文件。容器中存放到是文件信息,包含文件名修改时间两个属性。
文件信息结构体
文件名
文件修改时间
struct st_fileinfo
{
  char filename[512]; // 文件名
  char mtime[32];     // 文件的修改时间
};
  • 创建4个容器,存储不同状态的文件信息列表
  • listfilename文件
SURF_ZH_20240210224640_11597.csv
SURF_ZH_20240210224640_11597.json
SURF_ZH_20240210224640_11597.xml
SURF_ZH_20240210224740_12088.csv
SURF_ZH_20240210224740_12088.json
SURF_ZH_20240210224740_12088.xml
SURF_ZH_20240210224840_12587.csv
SURF_ZH_20240210224840_12587.json
SURF_ZH_20240210224840_12587.xml

用于存储远程目录的文件列表

  • okfilename文件
<filename>SURF_ZH_20240210224640_11597.xml</filename><mtime>20240211064640</mtime>
<filename>SURF_ZH_20240210224740_12088.xml</filename><mtime>20240211064740</mtime>
<filename>SURF_ZH_20240210224840_12587.xml</filename><mtime>20240211064840</mtime>

每一行对应一条文件信息,文件信息包含文件名、修改时间,XML格式。该文件用于存储相对于远程目录下本次不需要下载的文件列表,换句话说,也就是基于上一次从远程目录下载的文件列表,去除远程目录不存在的文件名。这样可以尽量减少okfilename文件的大小。

  • 这里只需要对上面两个文件有印象即可,下面会详细介绍这两个文件和4个vector的关系。

程序第一次运行

vfilelist1vfilelist2vfilelist3vfilelist4
vector <struct st_fileinfo>vector <struct st_fileinfo>vector <struct st_fileinfo>vector <struct st_fileinfo>
已下载1nlist2已下载3待下载4

  • 模拟客户端、服务端文件情况。此时客户端目录为空,远程目录有1.txt2.txt3.txt4.txt5.txt 5个文件

  • 首先从本地(okfilename文件)加载上次已下载的文件列表(由于是第一次运行,vfilelist1为空)
  • 再向FTP发送指令,列出远程目录文件列表,存储到本地文件(listfilename),而后再读取listfilename文件,过滤掉条件不匹配的文件名,加载到vfilelist2中。(此处有一个疑惑为什么要读取远程目录文件列表,然后存储到本地文件,再过滤掉不匹配的文件名加载到vfilelist2中,而不是读取远程目录文件列表,直接加载到vfilelist2中)

  • 通过比对vfilelist1vfilelist2,得出本次无需下载的文件列表(vfilelist3已下载3)和本次待下载(vfilelist4待下载4)
  • vfilelist3(本次无需下载文件列表)中的内容,覆盖写入到okfilename文件中,okfilename文件只需存储上一个状态、相对于远程目录下载成功的文件列表即可,防止okfilename文件数据积累。

  • 遍历vfilelist4调用FTP对象逐个下载文件,每下载成功一个文件,向okfilename文件写入一行

程序第二次运行

  • 从本地的okfilename文件加载vfilelist1容器

  • 此时客户端目录已经放着第一次运行程序下载好的文件,服务端删除了1.txt2.txt,新增了6.txt7.txt

  • 通过FTP远程命令列出远程目录中的文件列表,先存放到listfilename文件中,再读取listfilename文件,过滤掉不匹配的文件名,加载到vfilelist2中。
  • 通过比对vfilelist1(上次已下载)、vfilelist2(远程目录文件列表),得到vfilelist3(本次无需下载)、vfilelist4(本次待下载)。
  • okfilename文件中只需存储相对于远程目录已经下载成功的文件即可,不需要全部的本地已下载全部的文件列表,因此需要计算出vfilelist3容器,再把vfilelist3中的内容覆写到okfilename文件中。
  • 依照vfilelist4从远程目录逐个下载文件,每下载成功一个文件,便向okfilename文件中追加一个文件信息。

  • 第二次运行程序,下载开始前

    • okfilename文件内容如下

      <filename>1.txt</filename><mtime>20240211064640</mtime>
      <filename>2.txt</filename><mtime>20240211064740</mtime>
      <filename>3.txt</filename><mtime>20240211064840</mtime>
      <filename>4.txt</filename><mtime>20240211065840</mtime>
      <filename>5.txt</filename><mtime>20240211066840</mtime>
    • listfilename文件内容如下

      3.txt
      4.txt
      5.txt
      6.txt
      7.txt
    • vfilelist2中的内容是根据远程目录文件列表,过滤掉不匹配的文件名得来的,在这里过滤尽量减少后续判断的性能消耗。
  • 第二次程序运行,下载完成后

    • okfilename文件内容如下

      <filename>3.txt</filename><mtime>20240211064840</mtime>
      <filename>4.txt</filename><mtime>20240211065840</mtime>
      <filename>5.txt</filename><mtime>20240211066840</mtime>
      <filename>6.txt</filename><mtime>20240211067740</mtime>
      <filename>7.txt</filename><mtime>20240211068940</mtime>

程序第三次运行

  • 从本地加载okfilename文件到vfilelist1
  • vfilelist2中存放到的远程目录的所有文件(过滤掉不匹配的文件名)
  • vfilelist3vfilelist4是通过vfilelist1vfilelist2比对而来

  • 远程目录只有6.txt7.txt8.txt3个文件

  • 第三次运行下载前

    • okfilename文件内容

      <filename>3.txt</filename><mtime>20240211064840</mtime>
      <filename>4.txt</filename><mtime>20240211065840</mtime>
      <filename>5.txt</filename><mtime>20240211066840</mtime>
      <filename>6.txt</filename><mtime>20240211067740</mtime>
      <filename>7.txt</filename><mtime>20240211068940</mtime>
    • listfilename文件内容

      6.txt
      7.txt
      8.txt
  • 第三次运行下载后

    • okfilename文件内容

      <filename>6.txt</filename><mtime>20240211067740</mtime>
      <filename>7.txt</filename><mtime>20240211068940</mtime>
      <filename>8.txt</filename><mtime>20240211069940</mtime>

加入修改时间比对

  • 如果文件名相同,但是修改时间发生变化,也可以纳入判断范围,在运行的时候可以指定是否checktime。要修改的地方就是在比对vfilelist1vfilelist2时,除了比对文件名外,还应比对modifyTime,得出vfilelist4

image-20240212185454690

增量上传流程

与增量下载逻辑相同,过程逆转过来即可

程序死机(挂死)

  • 涉及网络通信、数据库操作的程序容易挂死
  • 比较复杂的程序可能会挂死
  • 简单的程序基本上不会挂死

因此需要在可能会超时的代码附近,加上心跳更新代码,例如在列出远程目录、每个文件下载成功等地方更新心跳。

TCP文件传输系统

粘包和分包

TCP(Transmission Control Protocol)是一种可靠的、面向连接的、基于字节流的传输协议,用于在网络上进行数据通信。在TCP通信中,粘包(Packet Pacing)和分包(Packet Splitting)是两个常见的问题,它们可能会影响数据的传输效率和正确性。

  1. 粘包

    • 粘包指的是发送方在发送数据时,多个小数据包被合并成一个大的数据包发送,或者接收方在接收数据时,多个连续的数据包被合并成一个大的数据包接收。这种情况通常发生在发送方连续发送小数据包而没有明确的分隔符或者长度标识,或者接收方没有及时处理接收到的数据,导致多个数据包被合并到一个缓冲区中。
    • 解决粘包的方法包括在数据包之间添加特定的分隔符,或者在数据包中包含长度信息等,以便接收方能够准确地识别和拆分数据包。
  2. 分包

    • 分包指的是发送方在发送数据时,一个大的数据包被拆分成多个小的数据包发送,或者接收方在接收数据时,一个完整的数据包被拆分成多个部分接收。这种情况通常发生在发送方发送的数据大于网络传输的最大单元(MTU),或者网络中存在数据包丢失或者延迟,导致数据包被拆分和重新组合。
    • 解决分包的方法通常是在接收方进行数据重组,等待所有数据到达后再进行处理,或者在传输过程中采用流量控制和拥塞控制等机制,尽量减少数据包的拆分和重组。

在TCP通信中,粘包和分包问题可能会影响数据的可靠性和正确性,因此在设计和实现网络应用时,需要考虑到这些问题,并采取相应的措施来避免或者解决这些问题。

  • TCP协议的保证

    • 报文内容的顺序不变,如果发送方发送“hello”,接收方也一定顺序收到“hello”
    • 分割的包中间不会插入其他数据
  • 解决TCP粘包和分包,可以在项目开发中,采用自定义报文格式

    • 例如:报文长度+报文内容 0010abcdefghi

      image-20240213171705891

网络字节序和主机字节序

网络字节序(Network Byte Order)和主机字节序(Host Byte Order)是两种不同的字节序(字节顺序)表示方式,用于描述数据在计算机内存中的排列顺序。

  1. 网络字节序

    • 网络字节序是一种统一的、规范化的字节序表示方式,用于在网络通信中传输数据。在网络字节序中,数据的高位字节(Most Significant Byte,MSB)在前,低位字节(Least Significant Byte,LSB)在后,即采用大端序(Big Endian)表示。
    • 大多数网络协议(如TCP/IP协议栈)都要求数据使用网络字节序进行传输,以确保在不同计算机体系结构之间的兼容性。
  2. 主机字节序

    • 主机字节序是指数据在主机(计算机)内存中的排列顺序,通常取决于所使用的CPU架构。主机字节序可以是大端序或小端序(Little Endian)。
    • 在大端序中,数据的高位字节存储在低地址处,低位字节存储在高地址处;而在小端序中,数据的低位字节存储在低地址处,高位字节存储在高地址处。

在进行网络通信时,由于不同计算机体系结构可能使用不同的主机字节序,因此发送方需要将数据转换为网络字节序进行传输,而接收方则需要将接收到的数据从网络字节序转换为主机字节序进行处理,以确保数据的正确性和可靠性。常见的转换函数包括htons()htonl()ntohs()ntohl(),用于实现16位和32位整数的主机字节序和网络字节序之间的转换。

关于SIGPIPE信号

当一个进程向一个已经关闭的写端(例如由另一个进程关闭的管道的写端)写数据时,操作系统会发送SIGPIPE信号给该进程。SIGPIPE信号的默认行为是终止进程,这样操作系统就可以防止无限制地写入数据到一个已经关闭的管道,从而导致不必要的资源浪费或者其他不良影响。

然而,在某些情况下,程序员可能希望忽略SIGPIPE信号,而不是使程序终止。这通常发生在以下情况:

  1. 处理管道和套接字: 当进程与另一个进程通过管道或套接字进行通信时,如果另一个进程意外关闭了写端,当前进程可能会收到SIGPIPE信号。在某些情况下,程序员可能选择忽略此信号,以便能够自行处理管道或套接字的关闭情况。
  2. 网络编程: 在网络编程中,当一个进程尝试向已经关闭的套接字写入数据时,也可能会收到SIGPIPE信号。在某些情况下,程序员可能希望继续执行而不是退出,以便能够正确处理网络连接的关闭情况。

要忽略SIGPIPE信号,可以使用signal()函数来设置信号处理函数为SIG_IGN。这将使操作系统忽略SIGPIPE信号,而不是终止进程。例如:

#include <signal.h>

// 在程序初始化阶段将SIGPIPE信号的处理函数设置为SIG_IGN
signal(SIGPIPE, SIG_IGN);

需要注意的是,忽略SIGPIPE信号可能会导致一些意想不到的行为,因为进程将继续尝试写入已经关闭的管道或套接字,而不会收到任何错误提示。因此,在忽略SIGPIPE信号时,程序员需要特别注意处理写入操作的返回值,以确保不会出现不必要的错误或者资源浪费。

在本章节TCP网络编程中,当某个进程向已经关闭的套接字写入数据时,我们并不希望通过系统发出的SIGPIPE信号,导致程序终止运行,而是希望获取sendrecv的返回值(-1),以便于自定义处理接下来的业务流程。

多进程网络服务程序框架

实际场景中往往会有多个客户端连接同一个服务端。

image-20240214115856076

父进程负责接受TCP请求,拿到文件描述符后fork()出新的子进程处理后续业务。

image-20240214120125206

  • 服务端父进程用来监听TCP连接请求,服务端子进程用于维护TCP连接
  • kill掉一个客户端进程,不会对服务端父进程、服务端子进程、其他客户端进程产生影响
  • kill掉服务端父进程,不会影响到服务端子进程、客户端进程
  • kill掉一个服务端子进程,只会影响到与之对应连接的客户端进程,不会对其他进程产生影响

服务端的CloseListen和CloseClient

服务端的父进程负责监听TCP连接请求,一旦有请求进来就接受,接下来fork()出子进程专门接管维护新进来的请求,而父进程继续监听TCP连接请求。此时父进程就需要关掉客户端的文件描述符,子进程就需要关闭监听TCP连接的文件描述符,避免造成文件描述符资源浪费。

父进程、子进程退出函数

这两个函数是处理进程退出逻辑的,通常用于一个由父进程和多个子进程组成的进程族中。父进程和子进程分别有自己的退出处理函数,这是因为它们在接收到退出信号时可能需要执行不同的清理任务。现在,我将详细解释这两个函数及其用途。

父进程退出函数 ParentExit
// 父进程退出函数
void ParentExit(int sig)
{
  signal(SIGINT, SIG_IGN);  // 父进程忽略信号SIGINT,避免重复退出
  signal(SIGTERM, SIG_IGN); // 父进程忽略信号SIGTERM,避免重复退出
  logFile.Write("父进程收到退出信号,sig=%d\n", sig);
  server.CloseListen(); // 父进程关闭监听。
  kill(0, SIGTERM);     // 向全部子进程发送退出信号。
  exit(0);
}
  1. 函数定义void ParentExit(int sig)

    • sig 参数:接收到的信号编号,比如 SIGINTSIGTERM
  2. 函数内容

    • signal(SIGINT, SIG_IGN);signal(SIGTERM, SIG_IGN);:这两行代码的作用是使父进程忽略 SIGINTSIGTERM 信号。这是因为一旦父进程收到退出信号并开始执行退出处理函数,它可能不希望再次因为同样的信号而退出。SIG_IGN 表示忽略该信号。
    • logFile.Write("父进程收到退出信号,sig=%d\n", sig);:记录日志,表明父进程收到了退出信号。
    • server.CloseListen();:关闭父进程正在监听的服务器端口或套接字。这通常是为了确保不再接受新的连接请求。
    • kill(0, SIGTERM);:向所有子进程发送 SIGTERM 信号。kill(0, SIGTERM) 实际上会向调用进程的所有子进程发送 SIGTERM 信号,但调用进程本身不会收到信号。这是因为 kill 函数的第一个参数为 0 时,表示发送信号给调用进程的所有子进程。
    • exit(0);:退出父进程,返回状态码 0,表示正常退出。
  3. 使用场景

    • 当你想优雅地关闭一个由父进程和子进程组成的进程族时,可以使用这个函数。
    • 当父进程收到如 SIGINTSIGTERM 这样的退出信号时,它可以确保所有的子进程都被通知到并有机会进行清理工作。
子进程退出函数 ChildExit
// 子进程退出函数
void ChildExit(int sig)
{
  signal(SIGINT, SIG_IGN);  // 子进程忽略信号SIGINT,避免重复退出
  signal(SIGTERM, SIG_IGN); // 子进程忽略信号SIGTERM,避免重复退出
  logFile.Write("子进程收到退出信号,sig=%d\n", sig);
  server.CloseClient(); // 子进程关闭客户端连接。
  exit(0);
}
  1. 函数定义void ChildExit(int sig)

    • sig 参数:同样是接收到的信号编号。
  2. 函数内容

    • signal(SIGINT, SIG_IGN);signal(SIGTERM, SIG_IGN);:与父进程类似,子进程也忽略这些信号,避免重复处理。
    • logFile.Write("子进程收到退出信号,sig=%d\n", sig);:记录日志,表明子进程收到了退出信号。
    • server.CloseClient();:关闭子进程可能正在处理的客户端连接。这确保了客户端连接得到妥善处理,而不是突然中断。
    • exit(0);:退出子进程,返回状态码 0
  3. 使用场景

    • 当子进程收到退出信号时,它可以确保正在处理的请求得到妥善处理,比如关闭连接或保存数据。
    • 这对于提供稳定服务的进程来说非常有用,比如 Web 服务器或数据库服务器,它们通常有多个子进程来处理并发请求。

这种设计模式在处理需要稳定关闭的复杂进程族时非常有用。父进程负责监听和关闭,而子进程负责处理请求并在退出时关闭连接。通过这种方式,可以确保在进程退出时,所有的资源都被正确地释放,所有的连接都被妥善处理。

TCP短链接与长连接

  1. 短连接

    • 在短连接中,客户端和服务器之间的连接是临时性的,即在数据传输完成后立即关闭连接。
    • 每次通信都需要建立一个新的连接,包括连接的建立和关闭过程,这会增加一定的开销。
    • 适用于一次性传输少量数据的情况,如HTTP请求响应。
  2. 长连接

    • 在长连接中,客户端和服务器之间的连接是持久性的,即在一段时间内保持连接处于打开状态,直到达到设定的超时时间或者明确关闭连接。
    • 一旦建立连接,客户端和服务器可以在多次通信中重复使用该连接,而无需频繁地建立和关闭连接,从而减少了连接建立和关闭的开销。
    • 适用于需要频繁通信或保持实时连接的场景,如实时聊天、实时数据传输等。

选择短连接还是长连接取决于应用的具体需求和性能优化考量。长连接通常可以提高性能和效率,但也可能会增加系统资源的占用,因此需要根据实际情况进行权衡和选择。

模块设计

  • 文件传输的服务端模块(支持上传、下载)
  • 文件上传的客户端模块
  • 文件下载的客户端模块

文件上传客户端模块

image-20240215161027366

异步通信

异步通信的实现

  • 多进程:用不同的进程发送报文和接收报文
  • 多线程:用不同的线程发送报文和接收报文
  • I/O复用:select、poll、epoll函数
  • 可以通过对比试验,更直观的显现异步通信的效果
编写同步通信代码
  • 编写同步通信Demo,并循环发送指定文本十万次,计算每秒发送接收报文的数量(QPS)
  • 下面是服务端代码
/*
 * 程序名:demo32.cpp,此程序用于演示socket通讯的服务端。
 * 作者:Bill
 */
#include <_public.h>

int main(int argc, char *argv[])
{
  if (argc != 2)
  {
    printf("Using:./demo32 port\nExample:./demo32 5005\n\n");
    return -1;
  }

  CTcpServer server;

  // 第2步:把服务端用于通讯的地址和端口绑定到socket上。
  struct sockaddr_in servaddr; // 服务端地址信息的数据结构。
  memset(&servaddr, 0, sizeof(servaddr));
  if (server.InitServer(atoi(argv[1])) == false)
  {
    printf("server.InitServer(%s) failed\n", argv[1]);
    return -1;
  }

  if (server.Accept() == false)
  {
    printf("server.Accept() failed\n");
    return -1;
  }

  char buffer[102400];

  CLogFile logFile;

  if (logFile.Open("/home/bill/project/runtime/log/demo32.log", "a+") == false)
  {
    printf("logFile.Open() failed\n");
    return -1;
  }

  // 与客户端通讯,接收客户端发过来的报文后,回复ok。
  while (1)
  {
    memset(buffer, 0, sizeof(buffer));
    if (server.Read(buffer) == false) // 接收客户端的请求报文。
    {
      break;
    }
    logFile.Write("recv <- %s\n", buffer);

    strcpy(buffer, "ok");
    if (server.Write(buffer) == false) // 向客户端发送响应结果。
    {
      perror("send");
      break;
    }
    logFile.Write("send <- %s\n", buffer);
  }
}
  • 下面是客户端代码
/*
 * 程序名:demo31.cpp,此程序用于演示socket通讯的客户端。
 * 作者:Bill
 */
#include <_public.h>

int main(int argc, char *argv[])
{
  if (argc != 3)
  {
    printf("Using:./demo31 ip port\nExample:./demo31 192.168.2.159 5005\n\n");
    return -1;
  }

  CTcpClient client;

  if (client.ConnectToServer(argv[1], atoi(argv[2])) == false)
  {
    printf("ConnectToServer(%s,%s) failed\n", argv[1], argv[2]);
    return -1;
  }

  char buffer[102400];

  CLogFile logFile;

  if (logFile.Open("/home/bill/project/runtime/log/demo31.log", "a+") == false)
  {
    printf("logFile.Open() failed\n");
    return -1;
  }

  // 第3步:与服务端通讯,发送一个报文后等待回复,然后再发下一个报文。
  for (int ii = 0; ii < 100000; ii++)
  {
    memset(buffer, 0, sizeof(buffer));
    SPRINTF(buffer, sizeof(buffer), "这是第%d个向服务端发送的请求报文,,编号%03d。(后面这里为中文后缀,增加报文长度,提高网络负载)", ii + 1, ii + 1);
    if (client.Write(buffer) == false) // 向服务端发送请求报文。
    {
      perror("send");
      break;
    }
    logFile.Write("send -> %s\n", buffer);

    memset(buffer, 0, sizeof(buffer));
    if (client.Read(buffer) == false) // 接收服务端的回应报文。
    {
      break;
    }
    logFile.Write("recv <- %s\n", buffer);

    // sleep(1); // 每隔一秒后再次发送报文。
  }
}
  • 编译后,先运行服务端,再运行客户端
  • 根据日志文件统计QPS
bill@DESKTOP-Q27QRF6:~/project/socket1$ grep "2024-05-17 10:41:57" ../runtime/log/demo31.log | wc
  22224   66672 1389120
bill@DESKTOP-Q27QRF6:~/project/socket1$ grep "2024-05-17 10:41:58" ../runtime/log/demo31.log | wc
  29470   88410 1871345
bill@DESKTOP-Q27QRF6:~/project/socket1$ grep "2024-05-17 10:41:59" ../runtime/log/demo31.log | wc
  29050   87150 1844675
bill@DESKTOP-Q27QRF6:~/project/socket1$ grep "2024-05-17 10:42:00" ../runtime/log/demo31.log | wc
  27229   81687 1729073
bill@DESKTOP-Q27QRF6:~/project/socket1$ grep "2024-05-17 10:42:01" ../runtime/log/demo31.log | wc
  28759   86277 1826165
bill@DESKTOP-Q27QRF6:~/project/socket1$ grep "2024-05-17 10:42:02" ../runtime/log/demo31.log | wc
  25845   77535 1641189
bill@DESKTOP-Q27QRF6:~/project/socket1$ grep "2024-05-17 10:42:03" ../runtime/log/demo31.log | wc
  28840   86520 1831340
bill@DESKTOP-Q27QRF6:~/project/socket1$ grep "2024-05-17 10:42:04" ../runtime/log/demo31.log | wc
   8583   25749  544991
bill@DESKTOP-Q27QRF6:~/project/socket1$ grep "2024-05-17 10:42:05" ../runtime/log/demo31.log | wc
      0       0       0
  • grep "2024-05-17 10:41:57" ../runtime/log/demo31.log | wc: 在 ../runtime/log/demo31.log 文件中查找包含时间戳 "2024-05-17 10:41:57" 的行,并计算出现的行数、字数以及字符数。
  • "行数" 表示包含特定时间戳的行的数量,即日志文件中满足搜索条件的行数。"字数" 表示所有匹配行中的单词总数。"字符数" 则是指所有匹配行中的字符总数,包括空格和其他特殊字符。这些统计数据帮助用户了解日志文件中特定时间段内的活动情况和内容密度。
  • 可以看出QPS在28000左右
编写多进程通信代码
  • 在同步通信的基础上,只修改客户端的代码,变为多进程通信,父进程发送,子进程接收
/*
 * 程序名:demo31.cpp,此程序用于演示socket通讯的客户端。
 * 作者:Bill
 * 时间:2024年11月04日
 */
#include <_public.h>

int main(int argc, char *argv[])
{
  if (argc != 3)
  {
    printf("Using:./demo31 ip port\nExample:./demo31 127.0.0.1 5005\n\n");
    return -1;
  }

  CTcpClient client;

  if (client.ConnectToServer(argv[1], atoi(argv[2])) == false)
  {
    printf("ConnectToServer(%s,%s) failed\n", argv[1], argv[2]);
    return -1;
  }

  char buffer[102400];

  CLogFile logfile(1000); // 初始化单个日志大小为1000MB,防止切换日志造成后续统计出错
  logfile.Open("/home/bill/project/runtime/log/demo31.log", "w+");
    
  int pid = fork();

  // 第3步:与服务端通讯,发送一个报文后等待回复,然后再发下一个报文。
  for (int ii = 0; ii < 1000000; ii++)
  {
    if (pid > 0) // 父进程发送报文
    {
      memset(buffer, 0, sizeof(buffer));
      SPRINTF(buffer, sizeof(buffer), "这是第%d个向服务端发送的请求报文,编号%03d。(后面这里为中文后缀,增加报文长度,提高网络负载)", ii + 1, ii + 1);
      if (client.Write(buffer) == false)
        break;
      logfile.Write("发送:%s\n", buffer);
    }
    else
    {
      memset(buffer, 0, sizeof(buffer));
      if (client.Read(buffer) == false)
        break;
      logfile.Write("接收:%s\n", buffer);

      // sleep(1); // 每隔一秒后再次发送报文。
    }
  }
}
  • 29行int pid = fork()分化出父子进程
  • 34行、42行分别为父、子进程执行的代码
  • 根据日志统计出QPS
bill@DESKTOP-Q27QRF6:~/project/runtime/log$ grep "2024-11-04 17:53:23" demo31.log | wc
      0       0       0
bill@DESKTOP-Q27QRF6:~/project/runtime/log$ grep "2024-11-04 17:53:24" demo31.log | wc
  18037   54111  577184
bill@DESKTOP-Q27QRF6:~/project/runtime/log$ grep "2024-11-04 17:53:25" demo31.log | wc
 114332  342996 6621164
bill@DESKTOP-Q27QRF6:~/project/runtime/log$ grep "2024-11-04 17:53:26" demo31.log | wc
 208952  626856 21766424
bill@DESKTOP-Q27QRF6:~/project/runtime/log$ grep "2024-11-04 17:53:27" demo31.log | wc
 216723  650169 22131296
bill@DESKTOP-Q27QRF6:~/project/runtime/log$ grep "2024-11-04 17:53:28" demo31.log | wc
 199916  599748 20387232
bill@DESKTOP-Q27QRF6:~/project/runtime/log$ grep "2024-11-04 17:53:29" demo31.log | wc
 122331  366993 11884374
bill@DESKTOP-Q27QRF6:~/project/runtime/log$ grep "2024-11-04 17:53:30" demo31.log | wc
      0       0       0
  • 可以看出QPS在200000左右,性能有很大提升
编写I/O复用异步通信(select、poll、epoll)
// 接收socket的对端发送过来的数据。
// sockfd:可用的socket连接。
// buffer:接收数据缓冲区的地址。
// ibuflen:本次成功接收数据的字节数。
// itimeout:接收等待超时的时间,单位:秒,-1-不等待;0-无限等待;>0-等待的秒数。
// 返回值:true-成功;false-失败,失败有两种情况:1)等待超时;2)socket连接已不可用。
bool TcpRead(const int sockfd, char *buffer, int *ibuflen, const int itimeout)
{
  if (sockfd == -1)
    return false;

  // 如果itimeout>0,表示需要等待itimeout秒,如果itimeout秒后还没有数据到达,返回false。
  if (itimeout > 0)
  {
    struct pollfd fds;
    fds.fd = sockfd;
    fds.events = POLLIN;
    if (poll(&fds, 1, itimeout * 1000) <= 0)
      return false;
  }

  // 如果itimeout==-1,表示不等待,立即判断socket的缓冲区中是否有数据,如果没有,返回false。
  if (itimeout == -1)
  {
    struct pollfd fds;
    fds.fd = sockfd;
    fds.events = POLLIN;
    if (poll(&fds, 1, 0) <= 0)
      return false;
  }

  (*ibuflen) = 0; // 报文长度变量初始化为0。

  // 先读取报文长度,4个字节。
  if (Readn(sockfd, (char *)ibuflen, 4) == false)
    return false;

  (*ibuflen) = ntohl(*ibuflen); // 把报文长度由网络字节序转换为主机字节序。

  // 再读取报文内容。
  if (Readn(sockfd, buffer, (*ibuflen)) == false)
    return false;

  return true;
}

上面展示的TcpRead方法已经封装到_public.h,当itimeout为0时,会使用poll进行I/O复用,下为demo:

  • 代码尾部持续接收剩余的回复: 当客户端发送完所有请求报文后,仍有可能有很多来自服务端的响应没有被读取到。第二个 while (jj < 1000000) 循环就是为了处理这些“迟到”的响应消息,确保客户端可以接收服务端针对每一个请求所做出的应答,避免遗漏。
  • 为什么继续接收的部分可以不需要 poll

    • 单连接场景:在这种场景中,只有一个连接,继续接收的过程就是等待服务端返回所有的响应报文。使用 poll 等待 I/O 就显得不太必要,因为你只需要关注一个套接字的读取,不涉及多路复用或者并发处理。
    • 确保所有数据接收完毕:通过继续接收,客户端可以确保服务端针对每一个请求都做出了回应,并且这些回应最终都被完整地接收到了。
    • 简化代码逻辑:由于继续接收的过程不涉及其他文件描述符的监听,也不需要超时机制,直接使用 while (jj < 1000000) 这样的循环就足以完成所有数据的接收。

​ 所以,第二部分的接收循环是为了保证网络通信的完整性,确保客户端不会因为发送速度过快、网络延迟或服务端的处理速度等原因而丢失应答报文。

  • 根据日志统计出QPS大概在150000左右,比多线程/进程要低一点
bill@DESKTOP-Q27QRF6:~/project/runtime/log$ grep "2024-11-04 18:15:16" demo33.log | wc
      0       0       0
bill@DESKTOP-Q27QRF6:~/project/runtime/log$ grep "2024-11-04 18:15:17" demo33.log | wc
 105109  315327 10596458
bill@DESKTOP-Q27QRF6:~/project/runtime/log$ grep "2024-11-04 18:15:18" demo33.log | wc
 164039  492117 16637196
bill@DESKTOP-Q27QRF6:~/project/runtime/log$ grep "2024-11-04 18:15:19" demo33.log | wc
 150474  451422 15348208
bill@DESKTOP-Q27QRF6:~/project/runtime/log$ grep "2024-11-04 18:15:20" demo33.log | wc
 151905  455715 15492980
bill@DESKTOP-Q27QRF6:~/project/runtime/log$ grep "2024-11-04 18:15:21" demo33.log | wc
 146648  439944 14959216
bill@DESKTOP-Q27QRF6:~/project/runtime/log$ grep "2024-11-04 18:15:22" demo33.log | wc
 149564  448692 15255528
bill@DESKTOP-Q27QRF6:~/project/runtime/log$ grep "2024-11-04 18:15:23" demo33.log | wc
 148975  446925 15195240
bill@DESKTOP-Q27QRF6:~/project/runtime/log$ grep "2024-11-04 18:15:24" demo33.log | wc
 151826  455478 15486112
bill@DESKTOP-Q27QRF6:~/project/runtime/log$ grep "2024-11-04 18:15:25" demo33.log | wc
 161826  485478 16507092
bill@DESKTOP-Q27QRF6:~/project/runtime/log$ grep "2024-11-04 18:15:26" demo33.log | wc
 152240  456720 15527780
bill@DESKTOP-Q27QRF6:~/project/runtime/log$ grep "2024-11-04 18:15:27" demo33.log | wc
 147282  441846 15023044
bill@DESKTOP-Q27QRF6:~/project/runtime/log$ grep "2024-11-04 18:15:28" demo33.log | wc
 154596  463788 15768092
bill@DESKTOP-Q27QRF6:~/project/runtime/log$ grep "2024-11-04 18:15:29" demo33.log | wc
 150251  450753 15326232
bill@DESKTOP-Q27QRF6:~/project/runtime/log$ grep "2024-11-04 18:15:30" demo33.log | wc
  65265  195795 6654722
bill@DESKTOP-Q27QRF6:~/project/runtime/log$ grep "2024-11-04 18:15:31" demo33.log | wc
      0       0       0
  • 查看日志的最后25行
bill@DESKTOP-Q27QRF6:~/project/runtime/log$ tail -n 25 demo33.log
2024-11-04 18:15:30 发送:这是第999999个向服务端发送的请求报文,编号999999。(后面这里为中文后缀,增加报文长度 ,提高网络负载)
2024-11-04 18:15:30 发送:这是第1000000个向服务端发送的请求报文,编号1000000。(后面这里为中文后缀,增加报文长 度,提高网络负载)
2024-11-04 18:15:30 接收:ok
2024-11-04 18:15:30 接收:ok
2024-11-04 18:15:30 接收:ok
2024-11-04 18:15:30 接收:ok
2024-11-04 18:15:30 接收:ok
2024-11-04 18:15:30 接收:ok
2024-11-04 18:15:30 接收:ok
2024-11-04 18:15:30 接收:ok
2024-11-04 18:15:30 接收:ok
2024-11-04 18:15:30 接收:ok
2024-11-04 18:15:30 接收:ok
2024-11-04 18:15:30 接收:ok
2024-11-04 18:15:30 接收:ok
2024-11-04 18:15:30 接收:ok
2024-11-04 18:15:30 接收:ok
2024-11-04 18:15:30 接收:ok
2024-11-04 18:15:30 接收:ok
2024-11-04 18:15:30 接收:ok
2024-11-04 18:15:30 接收:ok
2024-11-04 18:15:30 接收:ok
2024-11-04 18:15:30 接收:ok
2024-11-04 18:15:30 接收:ok
2024-11-04 18:15:30 接收:ok

​ 可以看到末尾的日志都是迟来的接收,由于本实验在单机上操作,效果不是很明显。如果是在局域网中,由于网络延迟,这种现象会更明显。

基于TCP的文件上传系统实现

:dart:目标​

  • 实现文件的上传功能
  • 采用异步通信机制(I/O复用),实现文件的快速传输
  • 优化休眠机制,当没有传输任务时,则休眠,否则不休眠一直上传
  • TCP连接由客户端发起,登录协商文件传输的参数
  • 由客户端发起文件传输请求,把客户端目录中的文件发送给服务端

文件上传客户端

/*
 * 程序名:tcpputfiles.cpp,采用TCP协议,实现文件上传客户端
 * 作者:Bill
 * 日期:2024-11-04
 */
#include <_public.h>

// 程序运行参数结构体
struct st_arg
{
  int clientType;          // 客户端类型,1:文件上传,2:文件下载
  char ip[32];             // 服务端IP
  int port;                // 服务端端口
  int ptype;               // 文件上传成功后的处理方式,1:删除,2:移动到备份目录
  char clientPath[256];    // 客户端文件路径
  char clientPathBak[256]; // 客户端文件备份路径
  bool andChild;           // 是否上传子目录
  char matchname[256];     // 匹配的文件名(正则表达式),如:*.txt
  char srvPath[256];       // 服务端存放目录
  int timetvl;             // 扫描本地目录的时间间隔,单位:秒
  int timeout;             // 心跳超时时间
  char pname[256];         // 本地进程名.建议以tcpupfiles_开头
} starg;

void EXIT(int sig);              // 退出函数,处理信号SIGINT(2)和SIGTERM(15)
void _help();                    // 运行帮助函数
bool _xmlToArg(char *xmlbuffer); // 解析xml数据到结构体starg中

bool srvHeartbeat();             // 心跳业务
bool srvLogin(const char *argv); // 登录业务
bool _tcpputfiles();             // 上传文件业务
bool bcontinue = true;           // 如果调用了_tcpputfiles()发送了文件,bcontinue=true,初始化为true

bool sendFile(const int sockfd, const char *filename, const int filesize); // 发送文件

char strRecvBuffer[1024], strSendBuffer[1024]; // 接收和发送缓冲区
void clearBuffer();                            // 清空缓冲区

bool ackMessage(const char *strRecvBuffer); // 删除或移动(转存)本地文件

CTcpClient client; // 定义一个客户端对象
CPActive active;   // 定义一个心跳对象
CLogFile logFile;  // 定义一个日志文件对象

int main(int argc, char *argv[])
{
  if (argc != 3)
  {
    _help();
    return -1;
  }

  // CloseIOAndSignal(true); // 关闭标准输入输出和信号的输入输出
  signal(SIGINT, EXIT);  // 注册信号SIGINT的处理函数
  signal(SIGTERM, EXIT); // 注册信号SIGTERM的处理函数

  if (logFile.Open(argv[1], "a+") == false) // 初始化日志文件
  {
    printf("logFile.Open(%s) failed\n", argv[1]);
    return -1;
  }

  if (_xmlToArg(argv[2]) == false) // 解析xml数据到结构体starg中
  {
    return -1;
  }

  active.AddPInfo(starg.timeout, starg.pname); // 添加心跳进程

  if (client.ConnectToServer(starg.ip, starg.port) == false) // 连接服务端
  {
    logFile.Write("ConnectToServer(%s,%d) failed\n", starg.ip, starg.port);
    EXIT(-1);
  }

  logFile.Write("connect(%s:%d) success\n", starg.ip, starg.port);

  if (srvLogin(argv[2]) == false) // 登录
  {
    logFile.Write("srvLogin() failed\n");
    EXIT(-1);
  }

  logFile.Write("login (%s:%d) success\n", starg.ip, starg.port);

  while (true)
  {

    if (_tcpputfiles() == false)
    {
      logFile.Write("_tcpputfiles() failed\n");
      EXIT(-1);
    }
    if (bcontinue == false)
    {
      sleep(starg.timetvl); // 休眠

      if (srvHeartbeat() == false)
        break;
    }

    active.UptATime(); // 发送心跳
  }

  EXIT(0);
}

bool _tcpputfiles()
{
  clearBuffer();

  // 遍历本地需要上传的目录文件
  CDir dir;
  if (dir.OpenDir(starg.clientPath, starg.matchname, 10000, starg.andChild) == false)
  {
    logFile.Write("OpenDir(%s) failed\n", starg.clientPath);
    return false;
  }

  int delayed = 0; // 传输文件后未收到服务端确认的数量
  int bufLen = 0;  // 用于存放接收到的数据长度

  bcontinue = false;

  while (dir.ReadDir())
  {
    clearBuffer();
    bcontinue = true;

    // 把filename、mtime、size先发送给服务端
    SPRINTF(strSendBuffer, sizeof(strSendBuffer),
            "<filename>%s</filename><mtime>%s</mtime><size>%d</size>",
            dir.m_FileName, dir.m_ModifyTime, dir.m_FileSize);
    // logFile.Write("发送:%s\n", strSendBuffer);
    if (client.Write(strSendBuffer) == false)
    {
      logFile.Write("client.Write() failed\n");
      return false;
    }

    // 把文件传到服务端
    logFile.Write("send %s(%d) ...", dir.m_FullFileName, dir.m_FileSize);
    if (sendFile(client.m_connfd, dir.m_FullFileName, dir.m_FileSize))
    {
      delayed++;
      logFile.WriteEx("ok\n");
    }
    else
    {
      logFile.WriteEx("failed\n");
      client.Close();
      return false;
    }

    active.UptATime(); // 发送心跳

    // 接收确认报文
    while (delayed > 0)
    {
      clearBuffer();
      if (TcpRead(client.m_connfd, strRecvBuffer, &bufLen, -1) == false)
        break;
      delayed--;
      // 删除或移动(转存)本地文件
      ackMessage(strRecvBuffer);
    }
  }

  bcontinue = false;

  // 继续接收确认报文
  while (delayed > 0)
  {
    clearBuffer();
    if (TcpRead(client.m_connfd, strRecvBuffer, &bufLen, 10) == false)
      break;
    delayed--;
    // 删除或移动(转存)本地文件
    ackMessage(strRecvBuffer);
  }

  return true;
}

bool ackMessage(const char *strRecvBuffer)
{
  char filename[256], fullFilename[512], fullFilenameBak[512];
  char result[32];

  GetXMLBuffer(strRecvBuffer, "filename", filename, sizeof(filename) - 1);
  GetXMLBuffer(strRecvBuffer, "result", result, sizeof(result) - 1);

  if (strcmp(result, "ok") == 0)
  {
    if (starg.ptype == 1) // 删除
    {
      SNPRINTF(fullFilename, sizeof(fullFilename), 1000, "%s/%s", starg.clientPath, filename);
      if (REMOVE(fullFilename) == false)
      {
        logFile.Write("remove(%s) failed\n", fullFilename);
        return false;
      }
    }
    else if (starg.ptype == 2) // 移动到备份目录
    {
      SNPRINTF(fullFilename, sizeof(fullFilename), 1000, "%s/%s", starg.clientPath, filename);
      SNPRINTF(fullFilenameBak, sizeof(fullFilenameBak), 1000, "%s/%s", starg.clientPathBak, filename);
      if (RENAME(fullFilename, fullFilenameBak) == false)
      {
        logFile.Write("rename(%s->%s) failed\n", fullFilename, fullFilenameBak);
        return false;
      }
    }
  }
  else
  {
    logFile.Write("ackMessage(%s) failed\n", strRecvBuffer);
    return false;
  }
  return true;
}

bool sendFile(const int sockfd, const char *filename, const int filesize)
{
  int onRead = 0;         // 每次预计读取的字节数
  int nRead = 0;          // 每次实际读取的字节数
  int sendBytes = 0;      // 每次实际发送的字节数
  int totalReadBytes = 0; // 总共读取的字节数
  char buffer[1025];      // 存放读取数据的缓冲区

  // 以rb方式打开文件
  FILE *fp = fopen(filename, "rb");
  if (fp == NULL)
  {
    logFile.Write("fopen(%s) failed\n", filename);
    return false;
  }

  while (true)
  {
    // 每次读取1024个字节,使用移位运算符加快计算速度
    memset(buffer, 0, sizeof(buffer));
    onRead = (filesize - totalReadBytes) > 1024 ? 1024 : (filesize - totalReadBytes);

    // 从文件读取
    nRead = fread(buffer, 1, onRead, fp);

    // 发送
    if (nRead > 0)
    {
      if (Writen(sockfd, buffer, nRead) == false)
      {
        fclose(fp);
        return false;
      }
    }

    // 计算累计已读字节
    totalReadBytes += nRead;

    if (totalReadBytes >= filesize)
      break;
  }

  fclose(fp);
  return true;
}

bool _xmlToArg(char *xmlbuffer)
{
  memset(&starg, 0, sizeof(starg));
  // 客户端类型,1:文件上传,2:文件下载
  // if (GetXMLBuffer(xmlbuffer, "clientType", &starg.clientType) == false)
  // {
  //   logFile.Write("Failed: <clientType> is null\n");
  //   return false;
  // }
  // 服务端IP
  if (GetXMLBuffer(xmlbuffer, "ip", starg.ip, sizeof(starg.ip) - 1) == false)
  {
    logFile.Write("Failed: <ip> is null\n");
    return false;
  }
  // 服务端端口
  if (GetXMLBuffer(xmlbuffer, "port", &starg.port) == false)
  {
    logFile.Write("Failed: <port> is null\n");
    return false;
  }
  // 文件上传成功后的处理方式,1:删除,2:移动到备份目录
  if (GetXMLBuffer(xmlbuffer, "ptype", &starg.ptype) == false)
  {
    logFile.Write("Failed: <ptype> is null\n");
    return false;
  }
  // 客户端文件路径
  if (GetXMLBuffer(xmlbuffer, "clientPath", starg.clientPath, sizeof(starg.clientPath) - 1) == false)
  {
    logFile.Write("Failed: <clientPath> is null\n");
    return false;
  }
  // 客户端文件备份路径,客户端类型为文件上传时有效
  if (starg.ptype == 2 && GetXMLBuffer(xmlbuffer, "clientPathBak", starg.clientPathBak, sizeof(starg.clientPathBak) - 1) == false)
  {
    logFile.Write("Failed: <clientPathBak> is null\n");
    return false;
  }
  // 是否上传子目录
  if (GetXMLBuffer(xmlbuffer, "andChild", &starg.andChild) == false)
  {
    logFile.Write("Failed: <andChild> is null\n");
    return false;
  }
  // 匹配的文件名(正则表达式),如:*.txt
  if (GetXMLBuffer(xmlbuffer, "matchname", starg.matchname, sizeof(starg.matchname)) == false)
  {
    logFile.Write("Failed: <matchname> is null\n");
    return false;
  }
  // 服务端存放目录
  if (GetXMLBuffer(xmlbuffer, "srvPath", starg.srvPath, sizeof(starg.srvPath) - 1) == false)
  {
    logFile.Write("Failed: <srvPath> is null\n");
    return false;
  }
  // 扫描本地目录的时间间隔,单位:秒,由于数据传输需要即时性,因此规定不得超过30秒
  GetXMLBuffer(xmlbuffer, "timetvl", &starg.timetvl);
  if (starg.timetvl == 0)
  {
    logFile.Write("Failed: <timetvl> is null\n");
    return false;
  }
  if (starg.timetvl > 30)
    starg.timetvl = 30;
  // 心跳超时时间,单位:秒,心跳不需要非常频繁,因此规定不得低于50秒
  // 此外,心跳超时时间不得低于扫描本地目录的时间间隔,否则会导致心跳超时
  GetXMLBuffer(xmlbuffer, "timeout", &starg.timeout);
  if (starg.timeout == 0)
  {
    logFile.Write("Failed: <timeout> is null\n");
    return false;
  }
  if (starg.timeout < 50)
    starg.timeout = 50;
  // 本地进程名.建议以tcpupfiles_开头
  if (GetXMLBuffer(xmlbuffer, "pname", starg.pname, sizeof(starg.pname) - 1) == false)
  {
    logFile.Write("Failed: <pname> is null\n");
    return false;
  }
  return true;
}

bool srvLogin(const char *argv)
{
  clearBuffer();
  starg.clientType = 1;
  SPRINTF(strSendBuffer, sizeof(strSendBuffer),
          "%s<clientType>%d</clientType>", argv, 1);
  if (client.Write(strSendBuffer) == false)
    return false;
  // printf("发送:%s\n", strSendBuffer);

  if (client.Read(strRecvBuffer, 20) == false)
    return false;
  printf("接收:login %s\n", strRecvBuffer);

  return true;
}

bool srvHeartbeat()
{
  clearBuffer();
  memset(strSendBuffer, 0, sizeof(strSendBuffer));
  strcpy(strSendBuffer, "<heart-beat/>");
  if (client.Write(strSendBuffer) == false)
    return false;
  // logFile.Write("发送:%s\n", strSendBuffer);

  if (client.Read(strRecvBuffer, 20) == false)
    return false;
  // logFile.Write("接收:%s\n", strRecvBuffer);

  return true;
}

void EXIT(int sig)
{
  printf("process exit\n");
  exit(0);
}

void _help()
{
  printf("Usage: \n"
         "/home/bill/project/tools1/bin/tcpputfiles "
         "/home/bill/project/runtime/log/tcpputfiles.log \""
         "<clientType>1</clientType>"
         "<ip>192.168.2.159</ip>"
         "<port>5005</port>"
         "<ptype>1</ptype>"
         "<clientPath>/home/bill/project/tools1/bin</clientPath>"
         "<clientPathBak>/home/bill/project/tools1/bin/bak</clientPathBak>"
         "<andChild>true</andChild>"
         "<matchname>*.cpp</matchname>"
         "<srvPath>/home/bill/project/tools1/bin</srvPath>"
         "<timetvl>5</timetvl>"
         "<timeout>10</timeout>"
         "<pname>tcpupfiles_me</pname>\"\n\n");
  printf("clientType:客户端类型,1:文件上传,2:文件下载\n");
  printf("ip:服务端IP\n");
  printf("port:服务端端口\n");
  printf("ptype:文件上传成功后的处理方式,1:删除,2:移动到备份目录\n");
  printf("clientPath:客户端文件路径\n");
  printf("clientPathBak:客户端文件备份路径\n");
  printf("andChild:是否上传子目录\n");
  printf("matchname:匹配的文件名(正则表达式),如:*.txt\n");
  printf("srvPath:服务端存放目录\n");
  printf("timetvl:扫描本地目录的时间间隔,单位:秒,扫描间隔最大不得超过30s\n");
  printf("timeout:心跳超时时间,规定不低于50秒\n");
  printf("pname:本地进程名.建议以tcpupfiles_开头\n\n");
}

void clearBuffer()
{
  memset(strRecvBuffer, 0, sizeof(strRecvBuffer));
  memset(strSendBuffer, 0, sizeof(strSendBuffer));
}

文件上传服务端

/*
 * 程序名: fileserver.cpp,文件传输的服务端。
 * 作者:Bill
 * 日期:2024-11-04
 */
#include <_public.h>

// 程序运行参数结构体
struct st_arg
{
  int clientType;          // 客户端类型,1:文件上传,2:文件下载
  char ip[32];             // 服务端IP
  int port;                // 服务端端口
  int ptype;               // 文件上传成功后的处理方式,1:删除,2:移动到备份目录
  char clientPath[256];    // 客户端文件路径
  char clientPathBak[256]; // 客户端文件备份路径
  bool andChild;           // 是否上传子目录
  char matchname[256];     // 匹配的文件名(正则表达式),如:*.txt
  char srvPath[256];       // 服务端存放目录
  int timetvl;             // 扫描本地目录的时间间隔,单位:秒
  int timeout;             // 心跳超时时间
  char pname[256];         // 本地进程名.建议以tcpupfiles_开头
} starg;

// 文件信息结构体
struct st_file
{
  char name[256]; // 文件名
  int size;       // 文件大小
  char mtime[32]; // 文件修改时间
} stfile;

CLogFile logFile;  // 定义一个日志文件对象
CPActive active;   // 定义一个心跳对象
CTcpServer server; // 定义一个服务端对象

void ParentExit(int sig); // 父进程退出函数
void ChildExit(int sig);  // 子进程退出函数

bool srvHeartbeat(char *strSendbuffer); // 心跳业务
bool clientLogin();                     // 登录业务

char strRecvbuffer[1024], strSendbuffer[1024]; // 接收和发送缓冲区
void clearBuffer();                            // 清空缓冲区

bool _xmlToArg(char *xmlbuffer); // 解析xml数据到结构体starg中

void recvFilesMain();                                                                         // 上传文件主函数(客户端上传,服务端接收)
bool recvFile(const int sockfd, const char *filename, const char *mtime, const int filesize); // 接收文件

void sendFilesMain(); // 下载文件主函数(客户端下载,服务端发送)

int main(int argc, char *argv[])
{
  if (argc != 3)
  {
    printf("Using:./fileserver port logfile\n"
           "Example:\n"
           "/home/bill/project/tools1/bin/fileserver 5005 /home/bill/project/runtime/tmp/fileserver.log\n\n");
    return -1;
  }

  CloseIOAndSignal();          // 关闭标准输入输出和信号的输入输出
  signal(SIGINT, ParentExit);  // 父进程注册信号SIGINT的处理函数
  signal(SIGTERM, ParentExit); // 父进程注册信号SIGTERM的处理函数

  // 初始化日志文件
  if (logFile.Open(argv[2], "a+") == false)
  {
    logFile.Write("logFile.Open(%s) failed\n", argv[2]);
    return -1;
  }

  // 服务端初始化
  if (server.InitServer(atoi(argv[1])) == false)
  {
    logFile.Write("server.InitServer(%s) failed\n", argv[1]);
    return -1;
  }

  while (1)
  {
    if (server.Accept() == false)
    {
      logFile.Write("server.Accept() failed\n");
      ParentExit(-1); // 父进程退出
    }

    // 打印客户端IP已连接
    logFile.Write("客户端IP已连接:%s\n", server.GetIP());

    /*
    if (fork() > 0)
    {
      continue;             // 父进程继续监听客户端连接回到Accept(),子进程负责与客户端通讯。
      server.CloseClient(); // 父进程关闭客户端连接。
    }

    // ==================== 子进程业务开始 ====================
    signal(SIGINT, ChildExit);  // 子进程注册信号SIGINT的处理函数
    signal(SIGTERM, ChildExit); // 子进程注册信号SIGTERM的处理函数

    // 子进程关闭监听。
    server.CloseListen();
    */

    // 获取登录报文
    if (clientLogin() == false)
    {
      ChildExit(-1); // 子进程退出
    }

    // logFile.Write("starg.clientType=%d\n", starg.clientType);

    // 如果clientType=1,则是文件上传,调用上传文件主函数
    if (starg.clientType == 1)
    {
      // 上传文件主函数(客户端上传,服务端接收)
      recvFilesMain();
    }

    // 如果clientType=2,则是文件下载,调用下载文件主函数(客户端下载,服务端发送)
    if (starg.clientType == 2)
    {
      // 下载文件主函数
      sendFilesMain();
    }

    // return 0; // 子进程结束,或使用exit(0);
    ChildExit(0); // 子进程退出
  }
}

// 上传文件主函数(客户端上传,服务端接收)
void recvFilesMain()
{
  active.AddPInfo(starg.timeout, starg.pname); // 添加心跳进程
  while (true)
  {
    // 接收客户端报文
    clearBuffer();

    active.UptATime(); // 发送心跳

    if (server.Read(strRecvbuffer, starg.timetvl + 10) == false)
    {
      logFile.Write("server.Read() failed\n");
      break;
    }
    // logFile.Write("recvbuffer:%s\n", strRecvbuffer);

    // 处理心跳报文
    if (strcmp(strRecvbuffer, "<heart-beat/>") == 0)
    {
      strcpy(strSendbuffer, "Ok");
      // logFile.Write("recvbuffer:%s\n", strSendbuffer);
      if (server.Write(strSendbuffer) == false)
      {
        logFile.Write("server.Write(%s) failed\n", strSendbuffer);
        return;
      }
      continue;
    }

    // 解析上传报文
    if (strncmp(strRecvbuffer, "<filename>", 10) == 0)
    {
      // 解析报文xml
      memset(&stfile, 0, sizeof(stfile));
      GetXMLBuffer(strRecvbuffer, "filename", stfile.name, sizeof(stfile.name) - 1);
      GetXMLBuffer(strRecvbuffer, "size", &stfile.size);
      GetXMLBuffer(strRecvbuffer, "mtime", stfile.mtime, sizeof(stfile.mtime) - 1);

      // 接收上传的文件内容
      char srvFullFilename[256]; // 服务端文件全路径
      memset(srvFullFilename, 0, sizeof(srvFullFilename));
      SNPRINTF(srvFullFilename, sizeof(srvFullFilename), 1000, "%s/%s", starg.srvPath, stfile.name);

      logFile.Write("recv %s ... ", stfile.name);
      if (recvFile(server.m_connfd, srvFullFilename, stfile.mtime, stfile.size) == false)
      {
        logFile.WriteEx("failed.\n");
        SNPRINTF(strSendbuffer, sizeof(strSendbuffer), 1000, "<filename>%s</filename><result>failed</result>", stfile.name);
        return;
      }
      else
      {
        SNPRINTF(strSendbuffer, sizeof(strSendbuffer), 1000, "<filename>%s</filename><result>ok</result>", stfile.name);
        logFile.WriteEx("ok.\n");
      }

      // 把接收结果返回给客户端
      // logFile.Write("recvbuffer:%s\n", strSendbuffer);
      if (server.Write(strSendbuffer) == false)
      {
        // logFile.Write("server.Write(%s) failed\n", strSendbuffer);
        return;
      }
    }
  }
}

// 接收文件
bool recvFile(const int sockfd, const char *filename, const char *mtime, const int filesize)
{
  // 生成临时文件名
  char strFullFileTmpName[256]; // 临时文件全路径
  SNPRINTF(strFullFileTmpName, sizeof(strFullFileTmpName), 1000, "%s.tmp", filename);

  int totalBytes = 0; // 累计已接收字节
  int onRead = 0;     // 每次预计读取的字节数
  int nRead = 0;      // 每次实际读取的字节数

  char buffer[1025]; // 存放读取数据的缓冲区

  // 创建临时文件
  FILE *fp = FOPEN(strFullFileTmpName, "wb");
  if (fp == NULL)
    return false;

  while (true)
  {
    memset(buffer, 0, sizeof(buffer));
    // 计算本次应该接收的字节数
    onRead = (filesize - totalBytes) > 1024 ? 1024 : (filesize - totalBytes);

    // 接收文件内容
    if (Readn(sockfd, buffer, onRead) == false)
    {
      fclose(fp);
      return false;
    }

    // 写入文件
    fwrite(buffer, 1, onRead, fp);

    // 计算累计已接收字节,如果已接收字节>=文件大小,则退出循环
    totalBytes += onRead;
    if (totalBytes >= filesize)
      break;
  }

  // 关闭临时文件
  fclose(fp);

  // 检查文件是否接收完整
  // if (totalBytes != filesize)
  // {
  //   logFile.Write("totalBytes=%d,filesize=%d\n", totalBytes, filesize);
  //   return false;
  // }

  // 重置文件时间
  UTime(strFullFileTmpName, mtime);

  // 临时文件改名为正式文件
  return RENAME(strFullFileTmpName, filename);
}

// 下载文件主函数(客户端下载,服务端发送)
void sendFilesMain()
{
}

// 登录业务
bool clientLogin()
{
  clearBuffer();
  if (server.Read(strRecvbuffer, 20) == false)
  {
    logFile.Write("server.Read() failed\n");
    return false;
  }
  // logFile.Write("recvbuffer:%s\n", strRecvbuffer);

  // 解析登录报文
  _xmlToArg(strRecvbuffer);

  if (starg.clientType != 1 && starg.clientType != 2)
    strcpy(strSendbuffer, "Fail");
  else
    strcpy(strSendbuffer, "Ok");

  if (server.Write(strSendbuffer) == false)
  {
    logFile.Write("server.Write() failed\n");
    return false;
  }
  logFile.Write("%s login %s.\n", server.GetIP(), strSendbuffer);
  if (server.Write(strSendbuffer) == false)
  {
    logFile.Write("server.Write(%s) failed\n", strSendbuffer);
    return false;
  }
  return true;
}

// 心跳业务
bool srvHeartbeat(char *strSendbuffer)
{
  strcpy(strSendbuffer, "<retcode>0</retcode><retmsg>心跳成功</retmsg>");
  if (server.Write(strSendbuffer) == false)
  {
    logFile.Write("server.Write(%s) failed\n", strSendbuffer);
    return false;
  }
  return true;
}

// 父进程退出函数
void ParentExit(int sig)
{
  signal(SIGINT, SIG_IGN);  // 父进程忽略信号SIGINT,避免重复退出
  signal(SIGTERM, SIG_IGN); // 父进程忽略信号SIGTERM,避免重复退出
  logFile.Write("parent get sig, sig=%d\n", sig);
  server.CloseListen(); // 父进程关闭监听。
  kill(0, SIGTERM);     // 向全部子进程发送退出信号。
  exit(0);
}

// 子进程退出函数
void ChildExit(int sig)
{
  signal(SIGINT, SIG_IGN);  // 子进程忽略信号SIGINT,避免重复退出
  signal(SIGTERM, SIG_IGN); // 子进程忽略信号SIGTERM,避免重复退出
  logFile.Write("子进程收到退出信号,sig=%d\n", sig);
  server.CloseClient(); // 子进程关闭客户端连接。
  exit(0);
}

// 清空缓冲区
void clearBuffer()
{
  memset(strRecvbuffer, 0, sizeof(strRecvbuffer));
  memset(strSendbuffer, 0, sizeof(strSendbuffer));
}

bool _xmlToArg(char *xmlbuffer)
{
  memset(&starg, 0, sizeof(starg));
  // 客户端已经校验过,这里不再校验
  GetXMLBuffer(xmlbuffer, "clientType", &starg.clientType);      // 客户端类型,1:文件上传,2:文件下载
  GetXMLBuffer(xmlbuffer, "ip", starg.ip);                       // 服务端IP
  GetXMLBuffer(xmlbuffer, "port", &starg.port);                  // 服务端端口
  GetXMLBuffer(xmlbuffer, "ptype", &starg.ptype);                // 文件上传成功后的处理方式,1:删除,2:移动到备份目录
  GetXMLBuffer(xmlbuffer, "clientPath", starg.clientPath);       // 客户端文件路径
  GetXMLBuffer(xmlbuffer, "clientPathBak", starg.clientPathBak); // 客户端文件备份路径
  GetXMLBuffer(xmlbuffer, "andChild", &starg.andChild);          // 是否上传子目录
  GetXMLBuffer(xmlbuffer, "matchname", starg.matchname);         // 匹配的文件名(正则表达式),如:*.txt
  GetXMLBuffer(xmlbuffer, "srvPath", starg.srvPath);             // 服务端存放目录
  GetXMLBuffer(xmlbuffer, "timetvl", &starg.timetvl);            // 扫描本地目录的时间间隔,单位:秒
  GetXMLBuffer(xmlbuffer, "timeout", &starg.timeout);            // 心跳超时时间
  GetXMLBuffer(xmlbuffer, "pname", starg.pname);                 // 本地进程名.建议以tcpupfiles_开头
  return true;
}
  • 用法:

​ 先启动服务端:

./fileserver 5005 /home/bill/project/runtime/tmp/fileserver.log

​ 再启动客户端,将形参改成实际参数做测试:

./tcpputfiles /home/bill/project/runtime/log/tcpputfiles.log "<clientType>1</clientType><ip>127.0.0.1</ip><port>5005</port><ptype>1</ptype><clientPath>/home/bill/project/runtime/idcdata/surfdata</clientPath><clientPathBak>/home/bill/project/runtime/idcdata/surfdata/bak</clientPathBak><andChild>true</andChild><matchname>*.*</matchname><srvPath>/home/bill/project/runtime/tcputest</srvPath><timetvl>5</timetvl><timeout>10</timeout><pname>tcpupfiles_me</pname>"

日志文件:

2024-11-04 18:45:26 connect(127.0.0.1:5005) success
2024-11-04 18:45:26 login (127.0.0.1:5005) success
2024-11-04 18:45:26 send /home/bill/project/runtime/idcdata/surfdata/SURF_ZH_20240210221640_133.xml(119717) ...ok
2024-11-04 18:45:26 ackMessage(Ok) failed
2024-11-04 18:45:26 send /home/bill/project/runtime/idcdata/surfdata/SURF_ZH_20240210223840_9047.xml(119700) ...ok
...............
...............
2024-11-04 18:45:26 send /home/bill/project/runtime/idcdata/surfdata/SURF_ZH_20240210224340_10104.xml(119673) ...ok
2024-11-04 18:45:26 send /home/bill/project/runtime/idcdata/surfdata/SURF_ZH_20240210222540_4609.xml(119696) ...ok
2024-11-04 18:45:26 send /home/bill/project/runtime/idcdata/surfdata/SURF_ZH_20240210221840_1144.xml(119677) ...ok

上传目标目录:

bill@DESKTOP-Q27QRF6:~/project/runtime/log$ ls /home/bill/project/runtime/tcputest
SURF_ZH_20240210190105_6932.xml   SURF_ZH_20240210222340_3619.xml   SURF_ZH_20240210224640_11597.xml
SURF_ZH_20240210190205_7424.xml   SURF_ZH_20240210222440_4118.xml   SURF_ZH_20240210224740_12088.xml
SURF_ZH_20240210190305_7917.xml   SURF_ZH_20240210222540_4609.xml   SURF_ZH_20240210224840_12587.xml
SURF_ZH_20240210221018_42869.xml  SURF_ZH_20240210222640_5112.xml   checkproc.cpp
SURF_ZH_20240210221033_43014.xml  SURF_ZH_20240210222740_5603.xml   deletefiles.cpp
SURF_ZH_20240210221118_43401.xml  SURF_ZH_20240210222840_6102.xml   fileserver.cpp
SURF_ZH_20240210221133_43532.xml  SURF_ZH_20240210222940_6593.xml   fileserver1.cpp
SURF_ZH_20240210221218_43920.xml  SURF_ZH_20240210223040_7092.xml   fileserver2.cpp
SURF_ZH_20240210221233_44052.xml  SURF_ZH_20240210223140_7587.xml   ftpclient.cpp
SURF_ZH_20240210221318_44434.xml  SURF_ZH_20240210223240_8086.xml   ftpgetfiles.cpp
SURF_ZH_20240210221333_44564.xml  SURF_ZH_20240210223340_8577.xml   ftpgetfiles1.cpp
SURF_ZH_20240210221418_44950.xml  SURF_ZH_20240210223440_8967.xml   ftpgetfiles2.cpp
SURF_ZH_20240210221433_45080.xml  SURF_ZH_20240210223540_8986.xml   ftpgetfiles3.cpp
SURF_ZH_20240210221518_45460.xml  SURF_ZH_20240210223640_9009.xml   ftpgetfiles4.cpp
SURF_ZH_20240210221533_45593.xml  SURF_ZH_20240210223740_9028.xml   ftpputfiles.cpp
SURF_ZH_20240210221618_45976.xml  SURF_ZH_20240210223840_9047.xml   gzipfiles.cpp
SURF_ZH_20240210221640_133.xml    SURF_ZH_20240210223940_9066.xml   procctl.cpp
SURF_ZH_20240210221740_642.xml    SURF_ZH_20240210224040_9085.xml   procctl1.cpp
SURF_ZH_20240210221840_1144.xml   SURF_ZH_20240210224140_9108.xml   tcpputfiles.cpp
SURF_ZH_20240210221940_1635.xml   SURF_ZH_20240210224240_9613.xml   tcpputfiles1.cpp
SURF_ZH_20240210222040_2134.xml   SURF_ZH_20240210224340_10104.xml  tcpputfiles2.cpp
SURF_ZH_20240210222140_2629.xml   SURF_ZH_20240210224440_10603.xml  testproc.cpp
SURF_ZH_20240210222240_3128.xml   SURF_ZH_20240210224540_11094.xml

基于TCP的文件下载系统实现

:dart:目标​

  • 实现文件的上传


c/c++ Linux Linux信号量 Linux共享内存 Linux进程心跳 Linux守护进程
Theme Jasmine by Kent Liao And Bill

本站由提供云存储服务