/* windows 1.cpp */ #include "process.h" #include "windows.h" #include <iostream> using namespace std; int main() { CreateEvent(NULL, FALSE, FALSE, "Event_of_my"); /*创建一个有名事件*/ PROCESS_INFORMATION pi; STARTUPINFO sui; //创建进程的准备工作 ZeroMemory(&sui, sizeof(sui)); sui.cb = sizeof(STARTUPINFO); if (!CreateProcess("1_child.exe", NULL, NULL, NULL, FALSE, CREATE_NEW_CONSOLE, NULL, NULL, &sui, &pi)) /*创建子进程*/ { /*打印“创建子进程失败!”*/ printf("Failed to create a process!\n"); /*退出*/ return 0; } else //创建成功 父进程继续执行 { /*打印"Wait for event."*/ printf("Wait for event\n"); if (WAIT_FAILED == WaitForSingleObject(pi.hProcess, INFINITE)) /*等待事件信号*/ { printf("Failed to get the signal of event");//打印“等待事件信号失败!” return 0;/*退出*/ } else { /*打印"Get the event"*/ printf("Get the event\n"); } } system("pause"); return 0; }
/* windows 1_child.cpp */ #include "process.h" #include "windows.h" #include <iostream> using namespace std; int main() { //LPCTSTR lpName; HANDLE E; char Sig_Flag; E = OpenEvent(EVENT_ALL_ACCESS, TRUE, "Event_of_my"); Sleep(3000); printf("Signal the event to Parent?[y\\n]\n"); //There should be double"\" to print a'"\" scanf_s("%c", &Sig_Flag); if (Sig_Flag == 'y') { SetEvent(E); } CloseHandle(E); Sleep(3000); system("pause"); return 0; }
/* 2.cpp */ #include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <string.h> #include <sys/wait.h> int pipe_default[2]; int main() { pid_t pid; char buffer[32]; memset(buffer, 0, 32); if(pipe(pipe_default) < 0) { printf("Failed to create pipe!\n"); return 0; } if(0 == (pid = fork())) { close(pipe_default[1]); //关闭写端 sleep(2); if(read(pipe_default[0], buffer, 32) > 0) { printf("[Client] Receive data from server: %s \n", buffer); } close(pipe_default[0]); } else { close(pipe_default[0]); //关闭读端 char msg[32]="== hello world =="; if(-1 != write(pipe_default[1], msg, strlen(msg))) { printf("[Server] Send data to client: %s \n",msg); } close(pipe_default[1]); waitpid(pid, NULL, 0); } return 1; }
#coding:utf-8 import multiprocessing import time def proc1(pipe0,pipe1): for i in range(100): print("[proc1] 发送: ",i) pipe0.send(i) pipe1.send(i) print('[proc1] proc2 接收:',pipe0.recv()) print('[proc1] proc3 接收:',pipe1.recv()) time.sleep(1) def proc2(pipe): while True: msg=pipe.recv() print('[proc2] proc2 接收:',msg) pipe.send('proc2收到数据:' + str(msg)) time.sleep(1) def proc3(pipe): while True: msg=pipe.recv() print('[proc3] proc3 接收:',msg) pipe.send('proc3收到数据:' + str(msg)) time.sleep(1) # Build a pipe pipe0 = multiprocessing.Pipe() pipe1 = multiprocessing.Pipe() print(pipe0) print(pipe1) # Pass an end of the pipe to process 1 p1 = multiprocessing.Process(target=proc1, args=(pipe0[0],pipe1[0],)) # Pass the other end of the pipe to process 2 p2 = multiprocessing.Process(target=proc2, args=(pipe0[1],)) # Pass the other end of the pipe to process 3 p3 = multiprocessing.Process(target=proc3, args=(pipe1[1],)) p1.start() p2.start() p3.start() p1.join() p2.join() p3.join()
#coding:utf-8 import multiprocessing import threading import time def threadfun(pipe0,pipe1): #线程任务函数 threadfun() while(True): pipe0.send(pipe1.recv()) def proc1(pipe0,pipe1): t1 = threading.Thread(target=threadfun,args=(pipe0,pipe1)) #创建一个线程t1,执行 threadfun() t2 = threading.Thread(target=threadfun,args=(pipe1,pipe0)) #创建一个线程t2,执行threadfun() t1.start() #调用start(),运行线程 t2.start() #调用start(),运行线程 t1.join() #主线程等待 t1线程结束才继续执行 t2.join() #主线程等待 t1线程结束才继续执行 time.sleep(1) def proc2(pipe): for i in range(0,100): print("[proc2] 发送: ",i) pipe.send(i) msg=pipe.recv() print('[proc2] proc2 接收:',msg) time.sleep(1) def proc3(pipe): for i in range(-100,0): print("[proc3] 发送: ",i) pipe.send(i) msg=pipe.recv() print('[proc3] proc3 接收:',msg) time.sleep(1) # Build a pipe pipe0 = multiprocessing.Pipe() pipe1 = multiprocessing.Pipe() print(pipe0) print(pipe1) # Pass an end of the pipe to process 1 p1 = multiprocessing.Process(target=proc1, args=(pipe0[0],pipe1[0],)) # Pass the other end of the pipe to process 2 p2 = multiprocessing.Process(target=proc2, args=(pipe0[1],)) # Pass the other end of the pipe to process 3 p3 = multiprocessing.Process(target=proc3, args=(pipe1[1],)) p1.start() p2.start() p3.start() p1.join() p2.join() p3.join()
/*Linux*/ #include <stdio.h> #include <sys/types.h> #include <sys/stat.h> #include <limits.h> #include <fcntl.h> #include <unistd.h> #include <stdlib.h> #include <string.h> #include <time.h> #include<iostream> #define PIPE_NAME "/tmp/dpfifo" #define BUFFER_SIZE PIPE_BUF using namespace std; //PIPE_BUF为limits.h定义的管道最大容量 int main(void) { pid_t pid; pid = fork(); if(pid < 0) { cout << "Create Error!" << endl; return 0; } else if(pid == 0) { //子进程 //写端,写数据 int pipe_fd; int res; int len; int alllen; char buffer[BUFFER_SIZE+1];//每次写数据用的缓冲区 int bytes_sent=0; if (access(PIPE_NAME,F_OK)==-1){//如果不存在PIPE_NAME,则建立 res=mkfifo(PIPE_NAME,0777);//命名管道 if (res!=0){ perror("create pipe error!"); exit(1); } } strcpy(buffer,"ailemon.me\n"); printf("[writer] message: %s\n",buffer); len=strlen(buffer); alllen= len*2; //打开管道,管道都是FIFO printf("[writer] process %d opening pipe !\n",getpid()); pipe_fd=open(PIPE_NAME,O_WRONLY); printf("[writer] process %d result %d\n",getpid(),pipe_fd); if (pipe_fd!=-1){//发送数据 while (bytes_sent<alllen){ sleep(1); printf("[writer] %d bytes sending........\n",len); res=write(pipe_fd,buffer,len); printf("[writer] current %d bytes sended\n",res); if (res==-1){ perror("write error on pipe\n"); exit(1); } bytes_sent+=res;//res为本次写的字节数,bytes_sent为总字节数 } close(pipe_fd); } else{ exit(1); } printf("[writer] %d bytes sended!\n",bytes_sent); } else if(pid > 0) { //父进程 //读端,读取数据 int pipe_fd; int res; int len; int alllen=20; char buffer[BUFFER_SIZE+1];//每次写数据用的缓冲区 int bytes_read=0; memset(buffer,'\0',BUFFER_SIZE+1); sleep(1); //打开管道,管道都是FIFO printf("[reader] read process %d opening pipe !\n",getpid()); pipe_fd=open(PIPE_NAME,O_RDONLY); printf("[reader] read process %d result %d\n",getpid(),pipe_fd); if (pipe_fd!=-1){//发送数据 do{ sleep(1); res=read(pipe_fd,buffer,BUFFER_SIZE); printf("[reader] %s",buffer); bytes_read+=res;//res为本次写的字节数,bytes_sent为总字节数 } while (bytes_read<=alllen); close(pipe_fd); } else{ exit(1); } printf("[reader] %d bytes readed!\n",bytes_read); unlink(PIPE_NAME); } return 0; }
- 生产者进程生产产品,消费者进程消费产品。
- 当生产者进程生产产品时,如果没有空缓冲区可用,那么生产者进程必须等待消费者进程释放出一个缓冲区。
- 当消费者进程消费产品时,如果缓冲区中没有产品,那么消费者进程将被阻塞,直到新的产品被生产出来。
#include <unistd.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <stdlib.h> #include <stdio.h> #include <string.h> #include <sys/sem.h> #define KEY (key_t)15030110070 #define N 20 static void p(int semid ,int semNum); static void v(int semid ,int semNum); union semun { int val; struct semid_ds *buf; ushort *array; }; int main(int argc ,char* argv[]) { int i; int semid; semid = semget(KEY,3,IPC_CREAT|0660); union semun arg[3]; arg[0].val = 1; //mutex [0] 对缓冲区进行操作的互斥信号量 arg[1].val = N; //empty [1] 缓冲区空位个数n arg[2].val = 0; //full [2] 产品个数 for(i=0;i<3;i++) semctl(semid,i,SETVAL,arg[i]); pid_t p1,p2; if((p1=fork()) == 0) { //子进程1,消费者 while(1) { printf("消费者 1 等待中...\n"); sleep(2); int product = rand() % 2 + 1; for(int i = 0; i < product; i++) { p(semid ,2); //消费 p(semid ,0); //加锁 printf(" [消费者 1] 消费产品 1. 剩余: %d\n", semctl(semid, 2, GETVAL, NULL)); v(semid ,0); //开锁 v(semid ,1); //释放空位 } sleep(2); } } else { if((p2=fork()) == 0) { //子进程2,消费者 while(1) { printf("消费者 2 等待中...\n"); sleep(2); int product = rand() % 2 + 1; for(int i = 0; i < product; i++) { p(semid ,2); //消费 p(semid ,0); //加锁 printf(" [消费者 2] 消费产品 1. 剩余: %d\n", semctl(semid, 2, GETVAL, NULL)); v(semid ,0); //开锁 v(semid ,1); //释放空位 } sleep(2); } } else { //父进程,生产者 while(1) { printf("生产者开始生产...\n"); int product = rand() % 5 + 1; for(int i = 0; i < product; i++) { p(semid ,1); //占用空位 p(semid ,0); //加锁 printf(" [生产者] 生产产品 1. 剩余: %d\n", semctl(semid, 2, GETVAL, NULL) + 1); v(semid ,0); //开锁 v(semid, 2); //生产 } sleep(2); } } } return 0; } /* p操作 */ void p(int semid ,int semNum) { struct sembuf sb; sb.sem_num = semNum; sb.sem_op = -1; sb.sem_flg = SEM_UNDO; semop(semid, &sb, 1); } /* v操作 */ void v(int semid ,int semNum) { struct sembuf sb; sb.sem_num = semNum; sb.sem_op = 1; sb.sem_flg = SEM_UNDO; semop(semid, &sb, 1); }
#!/usr/bin/env python # -*- coding: utf-8 -*- import multiprocessing import time import random from multiprocessing.sharedctypes import RawArray a = RawArray( 'i', [i for i in range(2)] ) #共享内存 a[0]=0 a[1]=0 semaphore_num = multiprocessing.Semaphore(0) #产品个数 semaphore_empty = multiprocessing.Semaphore(20) #缓冲区空位个数n semaphore_mutex = multiprocessing.Semaphore(1) #对缓冲区进行操作的互斥信号量 def consumer(flag): global num global no time.sleep(3) while(True): time.sleep(1) print("消费者 " + str(flag) +" 等待中...") item = random.randint(1, 3) for i in range(item): semaphore_num.acquire() #消费 semaphore_mutex.acquire() #加锁 no=a[0] num=a[1] num -= 1 no+=1 a[0]=no a[1]=num print(str(no)+" [消费者 " + str(flag) +"] 消费产品 1. 剩余: " + str(num)) semaphore_mutex.release() #开锁 semaphore_empty.release() #释放空位 def producer(): global num global no while(True): time.sleep(1) print("生产者开始生产...") item = random.randint(1, 6) for i in range(item): semaphore_empty.acquire() #占用空位 for i in range(item): semaphore_mutex.acquire() #加锁 no=a[0] num=a[1] num += 1 no+=1 a[0]=no a[1]=num print(str(no)+" [生产者] 生产产品 1. 剩余: "+str(num) ) semaphore_mutex.release() #开锁 semaphore_num.release() #生产 if __name__ == "__main__": p1 = multiprocessing.Process(target=producer) p2 = multiprocessing.Process(target=consumer,args=("0")) p3 = multiprocessing.Process(target=consumer,args=("1")) p1.start() p2.start() p3.start() p1.join() p2.join() p3.join() print("程序结束")
/* Linux 6.cpp */ #include <iostream> #include <stdlib.h> #include <string.h> #include <sys/shm.h> #include <sys/ipc.h> #include <unistd.h> using namespace std; int main() { char *shmaddr; char *shmaddread; char str[]="Hello, I am a processing. \n"; int shmid; key_t key = ftok(".",1); pid_t pid1 = fork(); if(pid1 == -1){ cout << "Fork error. " << endl; exit(1); } else if(pid1 == 0){ //子进程 shmid = shmget(key,1024,IPC_CREAT | 0600); shmaddr = (char*)shmat(shmid, NULL, 0); strcpy(shmaddr, str); cout << "[Writer] write: " << shmaddr << endl; shmdt(shmaddr); } else { //父进程 pid_t pid2 = fork(); if(pid2 == -1){ cout << "Fork error. " << endl; exit(1); } else if(pid2 == 0){ //子进程 sleep(2); shmid = shmget(key,1024,IPC_CREAT | 0600); shmaddread = (char*)shmat(shmid, NULL, 0); cout << "[Reader] read: " << shmaddread << endl; shmdt(shmaddread); } } sleep(3); return 0; }
#coding:utf-8 import multiprocessing import time size = 8*1024 # 32KB def proc1(a): pass def proc2(a): for i in range(0,size): print("[proc2] 写入: ",i) a[i]=i time.sleep(0.5) def proc3(a): for i in range(0,size): msg=a[i] print('[proc3] proc3 读出:',msg) time.sleep(0.5) from multiprocessing.sharedctypes import RawArray a = RawArray( 'i', [i for i in range(size)] ) # Pass an end of the pipe to process 1 p1 = multiprocessing.Process(target=proc1, args=(a,)) # Pass the other end of the pipe to process 2 p2 = multiprocessing.Process(target=proc2, args=(a,)) # Pass the other end of the pipe to process 3 p3 = multiprocessing.Process(target=proc3, args=(a,)) p1.start() p2.start() p3.start() p1.join() p2.join() p3.join()
