0x0F-多线程备份
写在最前方
- 到现在为止我们有了一开始的遍历模型(show_structure),队列模型(queue)。
- 现在我们需要做的就是将他们融合在一起,并且通过多线程将其驱动。
- 以下将会用到Windows API 和 Windows线程库
<process.h>
以及文件状态需要用到的<sys/stat.h>
- 对于一个多线程的备份程序而言,可以使用一个十分清晰的方式来实现,通俗的话来说就是,一个线程在不断将路径模型压入队列中,其他 n个线程 不断地从这个队列中弹出路径,实行复制
n = CPU's core * 2 - 1
。 - 其次我们需要实现的是类似增量备份的效果,即有改变的文件才需要重新复制,或者新增的才需要复制。
- 剩下的就是实现两个供线程调用的函数(这个函数有特殊),一个入队,一个出队
- 之所以选择Visual Studio还有另一个原因,它的某些必要函数是可以开启支持线程安全的,这个概念我不作解释,记得在属性中查看是否开启/MT(多线程)
写在中间
让我们来施展一个很久不用的魔法
3, 2, 1!
static queue filesVec; /* 队列主体 */
HANDLE vecEmpty, vecFull; /* 两个 Semaphore */
HANDLE pushThread; /* 将路径加入队列中的线程 */
HANDLE copyThread[SELF_THREADS_LIMIT]; /* 将路径弹出队列并复制的线程 */
CRITICAL_SECTION inputSec, testSec, manageSec; /* 关键段或临界区 */
/* 计算时间 */
static clock_t start, finish; /* 备份开始,结束时间 */
static double Total_time; /* 计算备份花费时间 */
这些东西,都被写在了
backup.c
中,作为全局变量,暂时先不管其中看不懂的部分,可能到现在为止,大家都已经迷糊了。但是没关系,因为还没说过所以迷糊。继续往下从小事做起,先实现一个简单的增量备份功能
实际上就是判断两个文件的最后修改时间是否一致
- 实现判断目的路径上的文件是否存在
- 如果存在,则再次判断源路径上的文件和目的路径上的文件的最后修改时间是否相同
not_changed
/**
* @version 1.0 2015/10/03
* @author wushengxin
* @param dstfile 目的路径的文件
srcfile 源路径的文件
* @function 判断两个文件的最后修改时间是否相同
*/
static int not_changed(const char * __restrict dstfile, const char * __restrict srcfile)
{
struct stat dst_stat, src_stat;
stat(dstfile, &dst_stat);
stat(srcfile, &src_stat);
return dst_stat.st_mtime == src_stat.st_mtime;
}
这个函数定义在
backup.c
中,因为没有在头文件中声明,所以一定要定义在调用者的前方。这个函数比较短小,实现的功能就是判断最后修改的时间是否相同,用到头文件
sys/stat.h
两个被线程调用的函数
首先是入队功能的函数:这个函数主要是调用最后实现的
backup
函数,用于递归遍历所给路径下的所有文件夹,将所有文件路径转换成路径模型,压入队中/**
* @version 1.0 2015/10/03
* @author wushengxin
* @param pSelect 传入的参数,当前为备份的源路径
* function 作为线程开始函数的一个参数,作用是调用 backup 函数
*/
static unsigned int __stdcall callBackup(void * pSelect)
{
char* tmpPath = (char*)pSelect; /* 源路径 */
start = clock(); /* 开始计时 */
backup(tmpPath, get_backup_tpath());
return 0;
}
这个函数定义在
backup.c
中,因为没有在头文件中声明,所以一定要定义在调用者的前方。其中参数
pSelect
的用法就像是可以接受任何类型的泛型函数,只不过需要自己提前知道类型,这个技术也被用于C语言的面向对象,可用于隐藏成员变量和成员函数。最后可能会稍微介绍一下。其次是出队的函数:这个函数的功能比较多,首先等待队列非空(vecFull)的信号(Semaphore),得到信号之后就弹出一个路径模型,进行复制操作,并且负责把路径模型使用的内存释放,在此释放一个队列空的信号,进入下一个循环。
static unsigned int __stdcall callCopyFile(void * para)
{
DWORD isExit = 0; /* 判断入队线程是否还存在 */
queue* address = &filesVec;
combine* localCom = NULL;
int empty = 0;
while (1)
{
char * dst_path = NULL;
char * src_path = NULL;
EnterCriticalSection(&testSec);
GetExitCodeThread(pushThread, &isExit); /* 查看入队的线程是否已经结束 */
empty = address->empty; /* 查看此时队列是否为空 */
LeaveCriticalSection(&testSec);
if (isExit != STILL_ACTIVE && empty) /* STILL_ACTIVE 代表还在运行 */
{
puts("Push Thread is End!");
break;
}
isExit = WaitForSingleObject(vecFull, 3000); /* 设定一个等待时间,以防死锁 */
if (isExit == WAIT_TIMEOUT)
{
fprintf(stderr, "Copy Thread wait time out!\n");
continue;
}
EnterCriticalSection(&manageSec); /* 这个关键段的添加十分重要,是读取时候的核心 */
if (!(localCom = filesVec.PopFront(address))) /* 每次弹出时一定要防止资源争夺带来的冲突 */
continue;
LeaveCriticalSection(&manageSec);
dst_path = localCom->dst_to_path; /* 空间局部性 */
src_path = localCom->src_from_path;
if (CopyFileA(src_path, dst_path, FALSE) == 0) /* 显式使用 CopyFileA 函数,而不是使用 CopyFile 宏 */
{
EnterCriticalSection(&inputSec);
if (ERROR_ACCESS_DENIED == GetLastError())
{
fprintf(stderr, "\nThe File has already existed and is HIDDEN or ReadOnly! \n");
fprintf(stderr, "Copy File from %s Fail!\n", src_path);
}
else if (ERROR_ENCRYPTION_FAILED == GetLastError())
{
fprintf(stderr, "\nThe File is Encrypted(被加密), And Can't not be copy\n");
fprintf(stderr, "Copy File from %s Fail!\n", src_path);
}
LeaveCriticalSection(&inputSec);
}
Free_s(src_path);
Free_s(dst_path);
Free_s(localCom);
ReleaseSemaphore(vecEmpty, 1, NULL); /* 是放一个信号量 */
}/* while (1) */
return 0;
}
这个函数看似很长,实际上大半实在做判断,而不是在做拷贝,真正做拷贝的是在中间部分的
WaitForSingleObject
函数之后才开始的解释一下
- 因为在此处并不是多线程的基础文章,而是假设你有基础,如果没有,可以前往一个地方CSDN作者:
MoreWindows
,它的多线程文章十分通俗易懂 - 这次我们提到的多线程概念有
Semaphore(信号量)
,使用的一个类似多个互斥量的概念CRITICAL_SECTION(关键段/临界区)
,作用和锁相同,但是某些情况下(粗心)不能很好的保护资源不被争夺,不能再进程间共享Mutex(互斥量)
,用了非递归的锁一定能保护好资源不被争夺。但是教CRITICAL_SECTION
的开销要大。- 其他信息请参看那位的博客。
- 假设你已经具备了多线程的基础。
- 因为在此处并不是多线程的基础文章,而是假设你有基础,如果没有,可以前往一个地方CSDN作者:
- 那么讲解一下思路:
- 首先可以将线程当成这个函数,那么按顺序执行的结果就是,进入循环(好吧废话)
- 其次我们需要时刻警惕,入队线程是否已经结束?并且结束的话队列是否为空?如果两个条件同时成立,那么就结束本线程,任务结束。
- 只要任意的条件不符合,就代表本线程的任务还要继续,那么就在原地等待信号,一个队列非空(
vecFull
)的信号。 - 一旦接受到信号,就证明队列中有路径模型可以被本线程弹出,就开始弹出路径模型,此时一定要记住用关键段或者锁给弹出操作做保险。
- 这里提一句,互斥量(Mutex)比关键段(Critical Section)要可靠,但开销更大
- 弹出之后就是调用API进行复制,随后释放堆上的空间,最后释放一个信号,代表队列中的元素被我弹出了一个。
- 进入下一次循环
- 可以将其中的
stderr
换成文件流,将错误信息输入到文件中,而不是屏幕上,以保存错误信息不至于丢失。
下面开始主体函数
backup
的编写- 由于此次的代码过长,所以不放上代码,一切代码都可以到我的Github仓库下载。
- 讲解思路
- 首先
backup
和show_structure
最大的不同便在于后者不需要保存路径模型,而是直接使用。 故我们只需要在
show_structure
的路径变量中,添加一个目的路径的参数就行。即backup
函数中的主要参数变为三个:/* backup.c : backup */
char * from_path_buf = make_path(path); /* 源路径 */
char * to_path_buf = make_path(bpath); /* 目的路径 */
char * find_path_buf = make_path(path); /* 用于 Windows API FindFirstFile */
首先我们拥有一个静态全局的队列
fileVec
,可以被任何线程访问- 紧接着我们构造了两个动作,压入(
backup
),弹出(callCopyFile
),backup
是用callBackup
调用。 在二级界面中,当我们选择第一个选项开始备份后,我们选择在此时获得源路径,并将之通过线程创建函数
_beginthreadex
传递给callBackup
,进而传递给backup
函数,开始压入任务。/**
* @version 1.0 2015/10/03
* @author wushengxin
* @param pSelect 传入的参数,可以是NULL
* function 作为线程开始函数的一个参数,作用是调用 backup 函数
*/
static unsigned int __stdcall callBackup(void * pSelect)
{
char* tmpPath = (char*)pSelect;
start = clock();
backup(tmpPath, get_backup_topath());
return 0;
}
- 在创建并完成压入线程之后,开始创建拷贝线程,之所以这么安排,是因为压入的操作必定比拷贝的要快,且我们一开始便将信号量的
vecEmpty
初始化为20
,这是因为一开始的队列是空的,需要压入线程先开始行动。 - 这里需要提到的是
_beginthreadex
函数,还有一个与它相似的函数是_beginthread
,两者之间的区别在于,前者参数更多,前者类似POSIX里的非分离式线程属性,前者使用完需要手动销魂,前后者调用的函数修饰不一样,什么意思?如果下面这个代码使用后者创建会发生什么问题? - 想想分离式线程的特点,就是自动释放所有的资源,这就会导致,如果前一个线程比自己创建的还快完成任务,那么自己就可能用到它的句柄,这就可能会造成错误。而如果前者的话,由程序员稍后自己释放销毁句柄,能保证一定不会出现这种现象。
一直以来都是使用前者。
/* backup.c : backup */
pushThread = (HANDLE)_beginthreadex(NULL, 0, callBackup, (void*)tmpPath, 0, NULL); /* 压入线程 */
for (int i = 0; i < SELF_THREADS_LIMIT; ++i)
{
copyThread[i] = (HANDLE)_beginthreadex(NULL, 0, callCopyFile, NULL, 0, NULL); /* 拷贝线程 */
}
在压入的过程中,唯一需要注意的就是在压入
fileVec
的时候,一定要防止资源竞争(同样适用在复制过程中的弹出操作),通过信号量可以有效防止多于1个以上的线程同时访问fileVec
/* backup.c : backup */
if(is Directory)
{ ... }
else /* 是一个文件 */
{
strcat(tmp_from_file_buf, fileData.cFileName);
strcat(tmp_to_file_buf, fileData.cFileName);
if (_access(tmp_to_file_buf, 0) == 0) /*如果目标文件存在*/
{
if (is_changed(tmp_from_file_buf, tmp_to_file_buf))
{
rele_path(tmp_from_file_buf);
rele_path(tmp_to_file_buf);
continue; /*如果目标文件与源文件的修改时间相同,则不需要入队列*/
}
fprintf(stderr, "File : %s hast changed!\n", tmp_from_file_buf);
}
else
fprintf(stderr, "Add New File %s \n", tmp_from_file_buf);
/* 使用信号量防止竞争 */
WaitForSingleObject(vecEmpty, INFINITE);
EnterCriticalSection(&manageSec);
filesVec.PushBack(&filesVec, tmp_from_file_buf, tmp_to_file_buf);
LeaveCriticalSection(&manageSec);
ReleaseSemaphore(vecFull, 1, NULL);
}
在复制的过程中,十分有可能出现压入线程结束,但是拷贝线程却停留在等待信号的阶段,这就要求我们必须设定一个等待的时间,超时则重新检测是否是压入线程结束且队列空。这一点十分重要,可以自己思考一下。
/* backup.c : callCopyFile */
EnterCriticalSection(&testSec);
GetExitCodeThread(pushThread, &isExit); /* 查看入队的线程是否已经结束 */
empty = address->empty; /* 查看此时队列是否为空 */
LeaveCriticalSection(&testSec);
if (isExit != STILL_ACTIVE && empty) /* STILL_ACTIVE 代表还在运行 */
{
puts("Push Thread is End!\n");
break;
}
isExit = WaitForSingleObject(vecFull, 3000); /* 设定一个等待时间,以防死锁 */
if (isExit == WAIT_TIMEOUT)
{
fprintf(stderr, "Copy Thread wait time out!\n");
continue; /* 所有代码都在一个 while(1)中 */
}
- 当所有线程都退出就代表任务完成,要销毁一系列相关参数。
写在最后
- 添加了多线程以后,前方有一些原始代码是需要修改的才能使用,比如队列模型(
Queue.c
)中的一些代码,需要用关键段进行修饰,防止资源争夺。其他方面没有太多需要修改的 - 完整代码被我放在我的Github仓库
简单总结
- 使用的 Windows API中
CopyFile
CreateDirectory
FindFirstFile
FindNextFile
,是核心的功能函数。 - 在此处,可以换一个思路思考一下,是否可以对容器队列,进行线程安全保护,从而不必在主代码中一直使用关键段进行保护?至少在
PushBack
和PopFront
两个操作上可以不必担心资源争夺。防止在编写程序的时候粗心大意忘记了保护。