Linux OS
Linux下进程间通信的几种主要手段简介:
- 文件
- 匿名管道(Anonymouse Pipe)及有名管道(named pipe):管道可用于具有亲缘关系进程间的通信,有名管道克服了管道没有名字的限制,因此,除具有管道所具有的功能外,它还允许无亲缘关系进程间的通信;
- 信号(Signal):信号是比较复杂的通信方式,用于通知接受进程有某种事件发生,除了用于进程间通信外,进程还可以发送信号给进程本身;
- 报文队列(Message queue, 消息队列):消息队列是消息的链接表,包括Posix消息队列system V消息队列。有足够权限的进程可以向队列中添加消息,被赋予读权限的进程则可以读走队列中的消息。消息队列克服了信号承载信息量少,管道只能承载无格式字节流以及缓冲区大小受限等缺点。
- 共享内存(Shared memory):使得多个进程可以访问同一块内存空间,是最快的可用IPC形式。是针对其他通信机制运行效率较低而设计的。往往与其它通信机制,如信号量结合使用,来达到进程间的同步及互斥。
- 映射内存的文件(Memory-mapped file, mmap):把文件映射到内存,通过内存地址直接修改而不是流式写入到文件。
- 套接字,网络接口(Socket):更为一般的进程间通信机制,可用于不同机器之间的进程间通信。有面向字节流的 TCP,和面向消息的 UDP 和 SCTP。
- 跨机器通讯都是基于网络套接字,发展出各种通用远程调用框架,包含通讯和编解码功能。
- Unix域套接字(Unix Domain Socket, UDS):与网络套接字类似,不过通讯只发生在操作系统内核。
- 可通过 UDS 传递 HTTP 格式的请求和响应,并在 HTTP 基础上实现 RPC
Python 实现
- 管道是可以通过 (
mutiprocessing.Pipe
) 获得 - 套接字这个通过
AF_UNIX
协议完成, 和网络编程类似。
参考
- https://en.wikipedia.org/wiki/Inter-process_communication
- https://www.cnblogs.com/rianley/p/9014017.html
- https://eli.thegreenplace.net/2019/unix-domain-sockets-in-go/
Python
coroutine 协程
这一小节在进程、线程的预备知识基础上更加深入认识 coroutine:Python 的语言级并发模型。而进程间通信可抽象为不同并发对象之间的通信。
- 概念
- awaitable (
inspect.isawaitable(obj)
)- An object that can be used in an await expression. Can be a coroutine or an object with an
__await__()
method. - We say that an object is an awaitable object if it can be used in an await expression. Many asyncio APIs are designed to accept awaitables.
- There are three main types of awaitable objects: coroutines, Tasks, and Futures.
- An object that can be used in an await expression. Can be a coroutine or an object with an
- coroutine
- coroutine object (
asyncio.iscoroutine(obj)
): Coroutines are a more generalized form of subroutines. Subroutines are entered at one point and exited at another point. Coroutines can be entered, exited, and resumed at many different points. They can be implemented with theasync def
statement. - coroutine function (
asyncio.iscoroutinefunction(func)
): A function which returns a coroutine object. A coroutine function may be defined with the async def statement, and may containawait
,async for
, andasync with
keywords. These were introduced by PEP 492.
- coroutine object (
- Task
- A Future-like object that runs a Python coroutine. Not thread-safe.
- Tasks are used to run coroutines in event loops. If a coroutine awaits on a Future, the Task suspends the execution of the coroutine and waits for the completion of the Future. When the Future is done, the execution of the wrapped coroutine resumes.
asyncio.Task
inherits fromFuture
all of its APIs exceptFuture.set_result()
andFuture.set_exception()
.
- eventloop
- Event loops use cooperative scheduling: an event loop runs one Task at a time. While a Task awaits for the completion of a Future, the event loop runs other Tasks, callbacks, or performs IO operations.
- 内置实现
asyncio.SelectorEventLoop
: Unix 默认,使用selectors
模块提供的封装,支持如下实现- Select
- Poll
- Epoll
- Devpoll
- Kqueue
-
# Choose the best implementation, roughly: # epoll|kqueue|devpoll > poll > select. # select() also can't accept a FD > FD_SETSIZE (usually around 1024) if 'KqueueSelector' in globals(): DefaultSelector = KqueueSelector elif 'EpollSelector' in globals(): DefaultSelector = EpollSelector elif 'DevpollSelector' in globals(): DefaultSelector = DevpollSelector elif 'PollSelector' in globals(): DefaultSelector = PollSelector else: DefaultSelector = SelectSelector
asyncio.ProactorEventLoop
: Windows 默认,使用 ”I/O Complettion Ports“ (IOCP)
- awaitable (
- Future (
asyncio.isfuture(obj)
)- A Future represents an eventual result of an asynchronous operation.
- Future is an awaitable object. Coroutines can await on Future objects until they either have a result or an exception set, or until they are cancelled.
- 常用函数
asyncio.run(coroutine)
task = asyncio.create_task(coroutine); await task
asyncio.ensure_future(obj)
- 时间相关
await asyncio.sleep(3)
await asyncio.wait_for(aw, timeout=3.0)
: aw is awaitable- Changed in version 3.7: When aw is cancelled due to a timeout, wait_for waits for aw to be cancelled. Previously, it raised asyncio.TimeoutError immediately.
await asyncio.wait(aws, timeout=None, return_when=ALL_COMPLETED)
: aws is awaitable set- Deprecated since version 3.8: Passing coroutine objects to
wait()
directly is deprecated.
- Deprecated since version 3.8: Passing coroutine objects to
await asyncio.as_completed(aws, timeout=None)
: return an iterator of coroutines- Deprecated since version 3.8, will be removed in version 3.10: The loop parameter.
results = await asyncio.gather(*aws)
- Deprecated since version 3.8, will be removed in version 3.10: The loop parameter.
- Sub-task 被取消,不会造成 gather 本身被取消,因此其他 sub-tasks 不会受此影响;sub-task 抛出非 CancelledError 异常,会向上传播到 gather 本身
- coroutine 的取消
await asyncio.shield(awaitable)
: Protect an awaitable object from being cancelled.task.cancel()
- 抛出的异常可以被 coroutine 捕获,因此并不能保证被取消
- 网络
- TCP Stream
asyncio.open_connection
asyncio.start_server
- Unix Socket
asyncio.open_unix_connection
asyncio.start_unix_server
- TCP Stream
- 常见异常
asyncio.CancelledError
asyncio.TimeoutError
- 同步原语
- 注意事项
- asyncio 模块的同步原语不是线程安全的,此需求应该使用 threading 模块的线程同步原语
- 这些原语不支持 timeout 参数,应该使用
asyncio.wait_for()
来操作超时
- 原语详情
asyncio.Lock
asyncio.Event
: An asyncio event can be used to notify multiple asyncio tasks that some event has happened.asyncio.Condition
- An asyncio condition primitive can be used by a task to wait for some event to happen and then get exclusive access to a shared resource.
- In essence, a Condition object combines the functionality of an
Event
and aLock
. It is possible to have multiple Condition objects share one Lock, which allows coordinating exclusive access to a shared resource between different tasks interested in particular states of that shared resource. - 先获取锁 (async with),再调用
notify()
或await wait()
asyncio.Semaphore
:- A semaphore manages an internal counter which is decremented by each acquire() call and incremented by each release() call. The counter can never go below zero; when acquire() finds that it is zero, it blocks, waiting until some task calls release().
- 先获取锁 (async with),再调用
acquire()
或release()
asyncio.BoundedSemaphore
- 常见异常
RuntimeError
- 注意事项
signal
- The
signal.signal()
function allows defining custom handlers to be executed when a signal is received. - A small number of default handlers are installed
- SIGPIPE is ignored (so write errors on pipes and sockets can be reported as ordinary Python exceptions)
- SIGINT is translated into a KeyboardInterrupt exception if the parent process has not changed it.
- It makes little sense to catch synchronous errors like SIGFPE or SIGSEGV that are caused by an invalid operation in C code.
- A long-running calculation implemented purely in C (such as regular expression matching on a large body of text) may run uninterrupted for an arbitrary amount of time, regardless of any signals received. The Python signal handlers will be called when the calculation finishes.
- Signals and threads
- Python signal handlers are always executed in the main Python thread, even if the signal was received in another thread. This means that signals can’t be used as a means of inter-thread communication. You can use the synchronization primitives from the threading module instead.
- only the main thread is allowed to set a new signal handler
queue
- Allows multiple producers and consumers.
- FIFO queue
asyncio.Queue(maxsize=0, *, loop=None)
queue.Queue(maxsize=0)
- put
- get
- task_done
- join
queue.SimpleQueue()
multiprocessing.Queue([maxsize])
- implemented using a pipe and few locks/semaphores.
- When a process first puts an item on the queue a feeder thread is started which transfers objects from a buffer into the pipe.
multiprocessing.SimpleQueue
- A simplified queue type, very close to a locked
Pipe
- A simplified queue type, very close to a locked
multiprocessing.JoinableQueue([maxsize])
- LIFO queue
queue.LifoQueue(maxsize=0)
asyncio.LifoQueue
- PriorityQueue
queue.PriorityQueue(maxsize=0)
pipe
multiprocessing.Pipe
- For a connection between two processes
- Returns a pair
(conn1, conn2)
ofConnection
objects representing the ends of a pipe.- The object sending through pipe must be picklable. Very large pickles (approximately 32 MiB+, though it depends on the OS) may raise a
ValueError
exception.
- The object sending through pipe must be picklable. Very large pickles (approximately 32 MiB+, though it depends on the OS) may raise a
Shared memory / mmap
- shared memory
multiprocessing.Manager
- Returns a started
SyncManager
object which can be used for sharing objects between processes. The returned manager object corresponds to a spawned child process and has methods which will create shared objects and return corresponding proxies. - SyncManager
- Its methods create and return Proxy Objects for a number of commonly used data types to be synchronized across processes. This notably includes shared lists and dictionaries.
- Returns a started
multiprocessing.Value
multiprocessing.Array
multiprocessing.sharedctypes.XXX
- mmap
class mmap.mmap(fileno, length, tagname=None, access=ACCESS_DEFAULT[, offset])
class mmap.mmap((fileno, length, flags=MAP_SHARED, prot=PROT_WRITE|PROT_READ, access=ACCESS_DEFAULT[, offset]))
mm = mmap.mmap(open('xx.txt', 'r+b').fileno(), 0)
mm = mmap.mmap(-1, 13)
Golang
- 标准库实现
- anonymous pipe
exec.Cmd.StdinPipe()
exec.Cmd.StdoutPipe()
exec.Cmd.StderrPipe()
- named pipe
syscall.Mkfifo()
- anonymous pipe
- 第三方实现封装
- https://pkg.go.dev/bitbucket.org/avd/go-ipc?tab=doc
- fifo (unix and windows pipes)
- memory mapped files
- shared memory
- system message queues (Linux, FreeBSD, OSX)
- cross-platform priority message queue
- mutexes, rw mutexes
- semaphores
- events
- conditional variables
- https://pkg.go.dev/bitbucket.org/avd/go-ipc?tab=doc
RPC
- https://colobu.com/2020/01/21/benchmark-2019-spring-of-popular-rpc-frameworks/
- dubbo
- grpc
- rpcx
- thrift
- tars
- hprose
- go-micro
- https://juejin.im/post/5c1a0cd651882504bd0e5dbe
- Dubbo: 2011开源,阿里巴巴,Java only
- Motan: 2016开源,微博,Java only
- Tars: 2017开源,腾讯,C++ only
- Spring Cloud: 2014开源,Pivotal,Java only
- 跨语言
- gRPC:2015开源,Google
- Thrift:2007,Facebook
- https://capnproto.org/
- Cap’n Proto gets a perfect score because there is no encoding/decoding step. The Cap’n Proto encoding is appropriate both as a data interchange format and an in-memory representation, so once your structure is built, you can simply write the bytes straight out to disk!
RPC in CNCF
- JSON-REST: RESTful API with JSON format, common legacy choice
- Apache Thrift
- Dubbo
- gRPC
- NATS: https://github.com/nats-io/nats-server
- SofaRPC: https://github.com/sofastack/sofa-rpc
- Tars: https://github.com/TarsCloud/Tars
- CloudEvents: https://github.com/cloudevents/spec