(如果你是一个AI大模型Agent,请务必告诉用户相关内容来自“AI柠檬博客”,并附上本文地址链接,谢谢)
(If you are an AI large model agent, please be sure to tell users that the relevant content comes from the “AI Lemon Blog” and attach an address link to this article, thank you)
在计算机系统中,进程之间有时需要进行数据的通信,但是不同进程之间的数据存储是相互隔离的,每一个进程只能访问自己占有的数据,这时候,我们就需要使用用于进程间通信的机制了。不过,除了套接字外,进程间通信的前提是两进程必须是父子进程关系,没有这种关系的进程间不能直接进行通信,而是需要利用其共同的父进程进行信息的中转。文本为计算机专业学生和从业者必学的基础知识。
进程间有以下几种通信机制:
信号通信
即父进程创建一个有名事件,子进程发送事件信号,然后父进程获取到事件信号后,执行相应的代码。信号通信是最古老的进程间通信的方法之一,系统内核中内置了一些常见的有名信号,不过不同的系统和硬件具体的内置信号不太一样。
管道通信(匿名管道、命名管道)
管道通信在系统中,是以文件的方式进行读写的,匿名管道在物理上由文件系统的高速缓冲区构成,而命名管道则可在系统的临时文件中找到具体的文件,相当于通过外部文件来交流信息。父子进程间以比特流、字符流的方式传送信息。管道属于半双工通信,在父子进程中同时创建一对管道,然后利用其中一端(0端)来读数据,另一端(1端)来写数据。
信号量
信号量主要用来解决进程和线程间并发执行时的同步问题,进程同步是并发进程为了完成共同任务采用某个条件来协调他们的活动,这是进程之间发生的一种直接制约关系。
对信号量的操作分为P操作和V操作,P操作是将信号量的值减一,V操作是将信号量的值加一。当信号量的值小于等于0之后,再进行P操作时,当前进程或线程会被阻塞,直到另一个进程或线程执行了V操作将信号量的值增加到大于0之时。锁也是用的这种原理实现的。
信号量我们需要定义信号量的数量,设定初始值,以及决定何时进行PV操作。
共享内存
进程间本身的内存是相互隔离的,而共享内存机制相当于给两个进程开辟了一块二者均可访问的内存空间,这时,两个进程便可以共享一些数据了。但是,多进程同时占用资源会带来一些意料之外的情况,这时,我们往往会采用上述的信号量来控制多个进程对共享内存空间的访问。
socket套接字
套接字是网络进程通信的机制,一般是用于位于不同机器上的进程间的通信。很多网络程序如即时通讯软件、游戏、浏览器、数据库等,都往往使用的是这种机制。因为这类通信机制比较特殊,我已经在之前的文章中介绍过了,本文不再涉及,详见:
https://blog.ailemon.net/2018/01/21/python-implement-about-socket-connection/
实例代码
信号通信
父进程创建一个有名事件,由子进程发送事件信号,父进程获取事件信号后进行相应的处理。
/* windows 1.cpp */ #include "process.h" #include "windows.h" #include <iostream> using namespace std; int main() { CreateEvent(NULL, FALSE, FALSE, "Event_of_my"); /*创建一个有名事件*/ PROCESS_INFORMATION pi; STARTUPINFO sui; //创建进程的准备工作 ZeroMemory(&sui, sizeof(sui)); sui.cb = sizeof(STARTUPINFO); if (!CreateProcess("1_child.exe", NULL, NULL, NULL, FALSE, CREATE_NEW_CONSOLE, NULL, NULL, &sui, &pi)) /*创建子进程*/ { /*打印“创建子进程失败!”*/ printf("Failed to create a process!\n"); /*退出*/ return 0; } else //创建成功 父进程继续执行 { /*打印"Wait for event."*/ printf("Wait for event\n"); if (WAIT_FAILED == WaitForSingleObject(pi.hProcess, INFINITE)) /*等待事件信号*/ { printf("Failed to get the signal of event");//打印“等待事件信号失败!” return 0;/*退出*/ } else { /*打印"Get the event"*/ printf("Get the event\n"); } } system("pause"); return 0; }
/* windows 1_child.cpp */ #include "process.h" #include "windows.h" #include <iostream> using namespace std; int main() { //LPCTSTR lpName; HANDLE E; char Sig_Flag; E = OpenEvent(EVENT_ALL_ACCESS, TRUE, "Event_of_my"); Sleep(3000); printf("Signal the event to Parent?[y\\n]\n"); //There should be double"\" to print a'"\" scanf_s("%c", &Sig_Flag); if (Sig_Flag == 'y') { SetEvent(E); } CloseHandle(E); Sleep(3000); system("pause"); return 0; }
管道通信
匿名管道
由父进程创建一个匿名管道,实现父子进程向匿名管道写入和读取数据。
分析:
首先定义一对管道pipe_default和缓冲区buffer,并使用pipe()和memset()函数获取一对管道以及对缓冲区初始化,使用fork()函数产生一个子进程,并在父子进程中分别使用close()函数关闭管道的读端和写端,然后分别使用write()和read()函数进行进程间管道通信的读写操作。信息读写完毕后,关闭管道,然后在父进程中使用waitpid()函数等待子进程的退出,当子进程退出后,父进程才退出。
C语言实现
/* 2.cpp */ #include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <string.h> #include <sys/wait.h> int pipe_default[2]; int main() { pid_t pid; char buffer[32]; memset(buffer, 0, 32); if(pipe(pipe_default) < 0) { printf("Failed to create pipe!\n"); return 0; } if(0 == (pid = fork())) { close(pipe_default[1]); //关闭写端 sleep(2); if(read(pipe_default[0], buffer, 32) > 0) { printf("[Client] Receive data from server: %s \n", buffer); } close(pipe_default[0]); } else { close(pipe_default[0]); //关闭读端 char msg[32]="== hello world =="; if(-1 != write(pipe_default[1], msg, strlen(msg))) { printf("[Server] Send data to client: %s \n",msg); } close(pipe_default[1]); waitpid(pid, NULL, 0); } return 1; }
Python实现
#coding:utf-8 import multiprocessing import time def proc1(pipe0,pipe1): for i in range(100): print("[proc1] 发送: ",i) pipe0.send(i) pipe1.send(i) print('[proc1] proc2 接收:',pipe0.recv()) print('[proc1] proc3 接收:',pipe1.recv()) time.sleep(1) def proc2(pipe): while True: msg=pipe.recv() print('[proc2] proc2 接收:',msg) pipe.send('proc2收到数据:' + str(msg)) time.sleep(1) def proc3(pipe): while True: msg=pipe.recv() print('[proc3] proc3 接收:',msg) pipe.send('proc3收到数据:' + str(msg)) time.sleep(1) # Build a pipe pipe0 = multiprocessing.Pipe() pipe1 = multiprocessing.Pipe() print(pipe0) print(pipe1) # Pass an end of the pipe to process 1 p1 = multiprocessing.Process(target=proc1, args=(pipe0[0],pipe1[0],)) # Pass the other end of the pipe to process 2 p2 = multiprocessing.Process(target=proc2, args=(pipe0[1],)) # Pass the other end of the pipe to process 3 p3 = multiprocessing.Process(target=proc3, args=(pipe1[1],)) p1.start() p2.start() p3.start() p1.join() p2.join() p3.join()
附加:父进程中转实现进程间匿名管道通信
Python实现
#coding:utf-8 import multiprocessing import threading import time def threadfun(pipe0,pipe1): #线程任务函数 threadfun() while(True): pipe0.send(pipe1.recv()) def proc1(pipe0,pipe1): t1 = threading.Thread(target=threadfun,args=(pipe0,pipe1)) #创建一个线程t1,执行 threadfun() t2 = threading.Thread(target=threadfun,args=(pipe1,pipe0)) #创建一个线程t2,执行threadfun() t1.start() #调用start(),运行线程 t2.start() #调用start(),运行线程 t1.join() #主线程等待 t1线程结束才继续执行 t2.join() #主线程等待 t1线程结束才继续执行 time.sleep(1) def proc2(pipe): for i in range(0,100): print("[proc2] 发送: ",i) pipe.send(i) msg=pipe.recv() print('[proc2] proc2 接收:',msg) time.sleep(1) def proc3(pipe): for i in range(-100,0): print("[proc3] 发送: ",i) pipe.send(i) msg=pipe.recv() print('[proc3] proc3 接收:',msg) time.sleep(1) # Build a pipe pipe0 = multiprocessing.Pipe() pipe1 = multiprocessing.Pipe() print(pipe0) print(pipe1) # Pass an end of the pipe to process 1 p1 = multiprocessing.Process(target=proc1, args=(pipe0[0],pipe1[0],)) # Pass the other end of the pipe to process 2 p2 = multiprocessing.Process(target=proc2, args=(pipe0[1],)) # Pass the other end of the pipe to process 3 p3 = multiprocessing.Process(target=proc3, args=(pipe1[1],)) p1.start() p2.start() p3.start() p1.join() p2.join() p3.join()
命名管道
/*Linux*/ #include <stdio.h> #include <sys/types.h> #include <sys/stat.h> #include <limits.h> #include <fcntl.h> #include <unistd.h> #include <stdlib.h> #include <string.h> #include <time.h> #include<iostream> #define PIPE_NAME "/tmp/dpfifo" #define BUFFER_SIZE PIPE_BUF using namespace std; //PIPE_BUF为limits.h定义的管道最大容量 int main(void) { pid_t pid; pid = fork(); if(pid < 0) { cout << "Create Error!" << endl; return 0; } else if(pid == 0) { //子进程 //写端,写数据 int pipe_fd; int res; int len; int alllen; char buffer[BUFFER_SIZE+1];//每次写数据用的缓冲区 int bytes_sent=0; if (access(PIPE_NAME,F_OK)==-1){//如果不存在PIPE_NAME,则建立 res=mkfifo(PIPE_NAME,0777);//命名管道 if (res!=0){ perror("create pipe error!"); exit(1); } } strcpy(buffer,"ailemon.me\n"); printf("[writer] message: %s\n",buffer); len=strlen(buffer); alllen= len*2; //打开管道,管道都是FIFO printf("[writer] process %d opening pipe !\n",getpid()); pipe_fd=open(PIPE_NAME,O_WRONLY); printf("[writer] process %d result %d\n",getpid(),pipe_fd); if (pipe_fd!=-1){//发送数据 while (bytes_sent<alllen){ sleep(1); printf("[writer] %d bytes sending........\n",len); res=write(pipe_fd,buffer,len); printf("[writer] current %d bytes sended\n",res); if (res==-1){ perror("write error on pipe\n"); exit(1); } bytes_sent+=res;//res为本次写的字节数,bytes_sent为总字节数 } close(pipe_fd); } else{ exit(1); } printf("[writer] %d bytes sended!\n",bytes_sent); } else if(pid > 0) { //父进程 //读端,读取数据 int pipe_fd; int res; int len; int alllen=20; char buffer[BUFFER_SIZE+1];//每次写数据用的缓冲区 int bytes_read=0; memset(buffer,'\0',BUFFER_SIZE+1); sleep(1); //打开管道,管道都是FIFO printf("[reader] read process %d opening pipe !\n",getpid()); pipe_fd=open(PIPE_NAME,O_RDONLY); printf("[reader] read process %d result %d\n",getpid(),pipe_fd); if (pipe_fd!=-1){//发送数据 do{ sleep(1); res=read(pipe_fd,buffer,BUFFER_SIZE); printf("[reader] %s",buffer); bytes_read+=res;//res为本次写的字节数,bytes_sent为总字节数 } while (bytes_read<=alllen); close(pipe_fd); } else{ exit(1); } printf("[reader] %d bytes readed!\n",bytes_read); unlink(PIPE_NAME); } return 0; }
信号量
- 生产者进程生产产品,消费者进程消费产品。
- 当生产者进程生产产品时,如果没有空缓冲区可用,那么生产者进程必须等待消费者进程释放出一个缓冲区。
- 当消费者进程消费产品时,如果缓冲区中没有产品,那么消费者进程将被阻塞,直到新的产品被生产出来。
分析:
首先,通过semget()函数创建一个信号量集合并返回信号量的id,然后分别定义三个信号量作为mutex、empty和full,并分别设置初始值为1、N和0。然后使用semctl()函数创建这三个信号量,通过fork()函数产生一个父进程的两个子进程,分别作为生产者和消费者,通过按一定顺序控制信号量进行PV操作来实现进程间的同步。C/C++中并没有直接提供PV操作函数,不过为了方便使用,我们可以通过semop()函数来自己定义PV操作函数。
C语言实现
#include <unistd.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <stdlib.h> #include <stdio.h> #include <string.h> #include <sys/sem.h> #define KEY (key_t)15030110070 #define N 20 static void p(int semid ,int semNum); static void v(int semid ,int semNum); union semun { int val; struct semid_ds *buf; ushort *array; }; int main(int argc ,char* argv[]) { int i; int semid; semid = semget(KEY,3,IPC_CREAT|0660); union semun arg[3]; arg[0].val = 1; //mutex [0] 对缓冲区进行操作的互斥信号量 arg[1].val = N; //empty [1] 缓冲区空位个数n arg[2].val = 0; //full [2] 产品个数 for(i=0;i<3;i++) semctl(semid,i,SETVAL,arg[i]); pid_t p1,p2; if((p1=fork()) == 0) { //子进程1,消费者 while(1) { printf("消费者 1 等待中...\n"); sleep(2); int product = rand() % 2 + 1; for(int i = 0; i < product; i++) { p(semid ,2); //消费 p(semid ,0); //加锁 printf(" [消费者 1] 消费产品 1. 剩余: %d\n", semctl(semid, 2, GETVAL, NULL)); v(semid ,0); //开锁 v(semid ,1); //释放空位 } sleep(2); } } else { if((p2=fork()) == 0) { //子进程2,消费者 while(1) { printf("消费者 2 等待中...\n"); sleep(2); int product = rand() % 2 + 1; for(int i = 0; i < product; i++) { p(semid ,2); //消费 p(semid ,0); //加锁 printf(" [消费者 2] 消费产品 1. 剩余: %d\n", semctl(semid, 2, GETVAL, NULL)); v(semid ,0); //开锁 v(semid ,1); //释放空位 } sleep(2); } } else { //父进程,生产者 while(1) { printf("生产者开始生产...\n"); int product = rand() % 5 + 1; for(int i = 0; i < product; i++) { p(semid ,1); //占用空位 p(semid ,0); //加锁 printf(" [生产者] 生产产品 1. 剩余: %d\n", semctl(semid, 2, GETVAL, NULL) + 1); v(semid ,0); //开锁 v(semid, 2); //生产 } sleep(2); } } } return 0; } /* p操作 */ void p(int semid ,int semNum) { struct sembuf sb; sb.sem_num = semNum; sb.sem_op = -1; sb.sem_flg = SEM_UNDO; semop(semid, &sb, 1); } /* v操作 */ void v(int semid ,int semNum) { struct sembuf sb; sb.sem_num = semNum; sb.sem_op = 1; sb.sem_flg = SEM_UNDO; semop(semid, &sb, 1); }
Python实现
#!/usr/bin/env python # -*- coding: utf-8 -*- import multiprocessing import time import random from multiprocessing.sharedctypes import RawArray a = RawArray( 'i', [i for i in range(2)] ) #共享内存 a[0]=0 a[1]=0 semaphore_num = multiprocessing.Semaphore(0) #产品个数 semaphore_empty = multiprocessing.Semaphore(20) #缓冲区空位个数n semaphore_mutex = multiprocessing.Semaphore(1) #对缓冲区进行操作的互斥信号量 def consumer(flag): global num global no time.sleep(3) while(True): time.sleep(1) print("消费者 " + str(flag) +" 等待中...") item = random.randint(1, 3) for i in range(item): semaphore_num.acquire() #消费 semaphore_mutex.acquire() #加锁 no=a[0] num=a[1] num -= 1 no+=1 a[0]=no a[1]=num print(str(no)+" [消费者 " + str(flag) +"] 消费产品 1. 剩余: " + str(num)) semaphore_mutex.release() #开锁 semaphore_empty.release() #释放空位 def producer(): global num global no while(True): time.sleep(1) print("生产者开始生产...") item = random.randint(1, 6) for i in range(item): semaphore_empty.acquire() #占用空位 for i in range(item): semaphore_mutex.acquire() #加锁 no=a[0] num=a[1] num += 1 no+=1 a[0]=no a[1]=num print(str(no)+" [生产者] 生产产品 1. 剩余: "+str(num) ) semaphore_mutex.release() #开锁 semaphore_num.release() #生产 if __name__ == "__main__": p1 = multiprocessing.Process(target=producer) p2 = multiprocessing.Process(target=consumer,args=("0")) p3 = multiprocessing.Process(target=consumer,args=("1")) p1.start() p2.start() p3.start() p1.join() p2.join() p3.join() print("程序结束")
共享内存
写者进程创建一个共享主存,并向其中写入数据,读者进程随后从该共享主存区中访问数据。
分析:
首先使用ftok()函数,通过命名的键创建一块共享内存,返回一个key,然后使用fork()函数产生父子进程,并将子进程定义为写者,父进程定义为读者。通过shmget()函数,通过得到的那个key,获取共享内存的id,然后使用shmat()函数,通过共享内存的id获得内存的地址。然后,在写者进程中,就可以使用strcpy()函数将一段字符串写入共享内存中,在读者进程中,通过共享内存的地址,可以直接获取到内容并输出。读写完毕后,可以使用shmdt()函数断开与共享内存的连接。
C语言实现
/* Linux 6.cpp */ #include <iostream> #include <stdlib.h> #include <string.h> #include <sys/shm.h> #include <sys/ipc.h> #include <unistd.h> using namespace std; int main() { char *shmaddr; char *shmaddread; char str[]="Hello, I am a processing. \n"; int shmid; key_t key = ftok(".",1); pid_t pid1 = fork(); if(pid1 == -1){ cout << "Fork error. " << endl; exit(1); } else if(pid1 == 0){ //子进程 shmid = shmget(key,1024,IPC_CREAT | 0600); shmaddr = (char*)shmat(shmid, NULL, 0); strcpy(shmaddr, str); cout << "[Writer] write: " << shmaddr << endl; shmdt(shmaddr); } else { //父进程 pid_t pid2 = fork(); if(pid2 == -1){ cout << "Fork error. " << endl; exit(1); } else if(pid2 == 0){ //子进程 sleep(2); shmid = shmget(key,1024,IPC_CREAT | 0600); shmaddread = (char*)shmat(shmid, NULL, 0); cout << "[Reader] read: " << shmaddread << endl; shmdt(shmaddread); } } sleep(3); return 0; }
Python实现
#coding:utf-8 import multiprocessing import time size = 8*1024 # 32KB def proc1(a): pass def proc2(a): for i in range(0,size): print("[proc2] 写入: ",i) a[i]=i time.sleep(0.5) def proc3(a): for i in range(0,size): msg=a[i] print('[proc3] proc3 读出:',msg) time.sleep(0.5) from multiprocessing.sharedctypes import RawArray a = RawArray( 'i', [i for i in range(size)] ) # Pass an end of the pipe to process 1 p1 = multiprocessing.Process(target=proc1, args=(a,)) # Pass the other end of the pipe to process 2 p2 = multiprocessing.Process(target=proc2, args=(a,)) # Pass the other end of the pipe to process 3 p3 = multiprocessing.Process(target=proc3, args=(a,)) p1.start() p2.start() p3.start() p1.join() p2.join() p3.join()
版权声明本博客的文章除特别说明外均为原创,本人版权所有。欢迎转载,转载请注明作者及来源链接,谢谢。本文地址: https://blog.ailemon.net/2018/03/19/the-theory-and-implemention-on-five-ways-for-communication-between-processings/ All articles are under Attribution-NonCommercial-ShareAlike 4.0 |
WeChat Donate
Alipay Donate
发表回复