Sequencer server class.
More...
|
| host = attr.ib(default="localhost") |
|
| port = attr.ib(default=8000) |
|
| log_level = attr.ib(default=logging.INFO) |
|
| proc = attr.ib(default=None, init=False) |
|
| q = attr.ib(default=attr.Factory(asyncio.Queue), init=False) |
|
| running_flag = attr.ib(default=True, init=False) |
|
| reader_flag = attr.ib(default=True, init=False) |
|
| reader_task = attr.ib(default=None, init=False) |
|
◆ do_clear()
def cli.seq_server.myBobServer.do_clear |
( |
|
self, |
|
|
|
arg |
|
) |
| |
◆ do_quit()
def cli.seq_server.myBobServer.do_quit |
( |
|
self, |
|
|
|
line |
|
) |
| |
◆ final_cleanup()
def cli.seq_server.myBobServer.final_cleanup |
( |
|
self | ) |
|
Attempts to finish the process cleanly.
◆ proc_reader()
def cli.seq_server.myBobServer.proc_reader |
( |
|
self | ) |
|
Read stdout from connected pipe.
◆ sig_handler()
def cli.seq_server.myBobServer.sig_handler |
( |
|
self | ) |
|
◆ start_seq_shell()
def cli.seq_server.myBobServer.start_seq_shell |
( |
|
self | ) |
|
Starts the seqtool shell
subprocess.
The `seqtool server` is a wrapper around a `seqtool shell` process.
The `seqtool server` listens for commands from the network and forwards
them to the `seqtool shell` process.
The replies from `seqtool shell` in stdout are read in the asynchronous
function proc_reader() which forwards them to the `seqtool shell`
stdout.
When a clean session is needed, the process is terminated (clear
command) and a new one is started.
◆ start_server()
def cli.seq_server.myBobServer.start_server |
( |
|
self | ) |
|
◆ stop_seq_shell()
def cli.seq_server.myBobServer.stop_seq_shell |
( |
|
self | ) |
|
◆ talk_to_client()
def cli.seq_server.myBobServer.talk_to_client |
( |
|
self, |
|
|
|
reader, |
|
|
|
writer |
|
) |
| |
Handles incomming strings from the Socket Server and forwards them to the stdin of the Subprocess seqtool shell
.
It also echoes the incomming strings back to the client.
◆ host
cli.seq_server.myBobServer.host = attr.ib(default="localhost") |
|
static |
◆ log_level
cli.seq_server.myBobServer.log_level = attr.ib(default=logging.INFO) |
|
static |
◆ port
cli.seq_server.myBobServer.port = attr.ib(default=8000) |
|
static |
◆ proc
cli.seq_server.myBobServer.proc = attr.ib(default=None, init=False) |
|
static |
cli.seq_server.myBobServer.q = attr.ib(default=attr.Factory(asyncio.Queue), init=False) |
|
static |
◆ reader_flag
cli.seq_server.myBobServer.reader_flag = attr.ib(default=True, init=False) |
|
static |
◆ reader_task
cli.seq_server.myBobServer.reader_task = attr.ib(default=None, init=False) |
|
static |
◆ running_flag
cli.seq_server.myBobServer.running_flag = attr.ib(default=True, init=False) |
|
static |
The documentation for this class was generated from the following file: