Sequencer server class.
More...
|
| | host = attr.ib(default="localhost") |
| |
| | port = attr.ib(default=8000) |
| |
| | log_level = attr.ib(default=logging.INFO) |
| |
| | proc = attr.ib(default=None, init=False) |
| |
| | q = attr.ib(default=attr.Factory(asyncio.Queue), init=False) |
| |
| | running_flag = attr.ib(default=True, init=False) |
| |
| | reader_flag = attr.ib(default=True, init=False) |
| |
| | reader_task = attr.ib(default=None, init=False) |
| |
◆ do_clear()
| def cli.seq_server.myBobServer.do_clear |
( |
|
self, |
|
|
|
arg |
|
) |
| |
◆ do_quit()
| def cli.seq_server.myBobServer.do_quit |
( |
|
self, |
|
|
|
line |
|
) |
| |
◆ final_cleanup()
| def cli.seq_server.myBobServer.final_cleanup |
( |
|
self | ) |
|
Attempts to finish the process cleanly.
◆ proc_reader()
| def cli.seq_server.myBobServer.proc_reader |
( |
|
self | ) |
|
Read stdout from connected pipe.
◆ sig_handler()
| def cli.seq_server.myBobServer.sig_handler |
( |
|
self | ) |
|
◆ start_seq_shell()
| def cli.seq_server.myBobServer.start_seq_shell |
( |
|
self | ) |
|
Starts the seqtool shell subprocess.
The `seqtool server` is a wrapper around a `seqtool shell` process.
The `seqtool server` listens for commands from the network and forwards
them to the `seqtool shell` process.
The replies from `seqtool shell` in stdout are read in the asynchronous
function proc_reader() which forwards them to the `seqtool shell`
stdout.
When a clean session is needed, the process is terminated (clear
command) and a new one is started.
◆ start_server()
| def cli.seq_server.myBobServer.start_server |
( |
|
self | ) |
|
◆ stop_seq_shell()
| def cli.seq_server.myBobServer.stop_seq_shell |
( |
|
self | ) |
|
◆ talk_to_client()
| def cli.seq_server.myBobServer.talk_to_client |
( |
|
self, |
|
|
|
reader, |
|
|
|
writer |
|
) |
| |
Handles incomming strings from the Socket Server and forwards them to the stdin of the Subprocess seqtool shell.
It also echoes the incomming strings back to the client.
◆ host
| cli.seq_server.myBobServer.host = attr.ib(default="localhost") |
|
static |
◆ log_level
| cli.seq_server.myBobServer.log_level = attr.ib(default=logging.INFO) |
|
static |
◆ port
| cli.seq_server.myBobServer.port = attr.ib(default=8000) |
|
static |
◆ proc
| cli.seq_server.myBobServer.proc = attr.ib(default=None, init=False) |
|
static |
| cli.seq_server.myBobServer.q = attr.ib(default=attr.Factory(asyncio.Queue), init=False) |
|
static |
◆ reader_flag
| cli.seq_server.myBobServer.reader_flag = attr.ib(default=True, init=False) |
|
static |
◆ reader_task
| cli.seq_server.myBobServer.reader_task = attr.ib(default=None, init=False) |
|
static |
◆ running_flag
| cli.seq_server.myBobServer.running_flag = attr.ib(default=True, init=False) |
|
static |
The documentation for this class was generated from the following file: