Using Pipes and Sockets for asynchronous local AI Chats
Recently I was experimenting with writing a CLI interface from scratch. There’s a variety of new AI tools out there, but I was on the lookout for a plugin for Vim that would allow me to use a ChatGPT-like workflow without the constant alt-tabbing between windows and relentless copy-pasting.
The best one I found was vim-ai
by madox2
, I highly
recommend checking it out. It seems to be the closest candidate for what I was looking for, but its
chat functionality is synchronous. Every query paused all actions in Vim, meaning I couldn’t keep
exploring until after the answer had fully generated. I explored the codebase, but it looked like it
was not a simple feature to add, so I decided to explore this problem from a fresh canvas.
I was hesitant to use vimscript
straight away (though I eventually built a solution with this). I
initially took to Python to see if I could make a Vim-independent CLI solution so that:
- Chats would be launched through
:terminal
buffers (for easy OOTB asynchronous execution) - Answers could be piped directly to the clipboard or a Vim register
- Chat buffer would automatically close when the user quit the chat CLI
This would give me the asynchronous implementation straight away. I used ncurses
to generate a
simple CLI and learned about sockets and pipes to allow this CLI to send and receive data to other
processes. Essentially this CLI would behave a bit like less
and watch
combined together with
some data processing thrown in.
Using Ollama for a local API, I could generate resposes using
1curl -s http://localhost:11434/api/chat --no-buffer -d '{
2 "model": "phi4:latest",
3 "messages": [
4 {
5 "role": "user",
6 "content": "hello"
7 }
8 ]
9}'
which gives me a streaming repose like
{"model":"phi4:latest","created_at":"2025-02-15T04:15:47.51450141Z","message":{"role":"assistant","content":"Hello"},"done":false}
{"model":"phi4:latest","created_at":"2025-02-15T04:15:47.60217441Z","message":{"role":"assistant","content":"!"},"done":false}
{"model":"phi4:latest","created_at":"2025-02-15T04:15:47.676216968Z","message":{"role":"assistant","content":" How"},"done":false}
{"model":"phi4:latest","created_at":"2025-02-15T04:15:47.749957446Z","message":{"role":"assistant","content":" can"},"done":false}
{"model":"phi4:latest","created_at":"2025-02-15T04:15:47.823717019Z","message":{"role":"assistant","content":" I"},"done":false}
{"model":"phi4:latest","created_at":"2025-02-15T04:15:47.897391602Z","message":{"role":"assistant","content":" assist"},"done":false}
{"model":"phi4:latest","created_at":"2025-02-15T04:15:47.971128894Z","message":{"role":"assistant","content":" you"},"done":false}
{"model":"phi4:latest","created_at":"2025-02-15T04:15:48.045499791Z","message":{"role":"assistant","content":" today"},"done":false}
{"model":"phi4:latest","created_at":"2025-02-15T04:15:48.11924118Z","message":{"role":"assistant","content":"?"},"done":false}
{"model":"phi4:latest","created_at":"2025-02-15T04:15:49.391045405Z","message":{"role":"assistant","content":""},"done_reason":"stop","done":true,"total_duration":21531784605,"load_duration":19128188606,"prompt_eval_count":11,"prompt_eval_duration":522000000,"eval_count":26,"eval_duration":1877000000}
So this CLI would have to listen to these incoming json
objects, parse the content out of them and
display it on a screen.
So I started with defining a single class for this CLI, which has a socket for receiving input:
1import time
2import os
3import curses
4import threading
5import socket
6import textwrap
7
8SOCKET_PATH = "/tmp/pipe_viewer_socket"
9
10
11class Viewer:
12 def __init__(self, stdscr):
13 self.stdscr = stdscr
14 self.running = True
15 ...
16
17 def read_socket(self):
18 """Continuously accept connections and read from the Unix domain socket."""
19 try:
20 while self.running:
21 ...
22 except KeyboardInterrupt:
23 self.running = False
24
25 def run(self):
26 """Main curses loop to update the display."""
27 ...
28
29
30if __name__ == "__main__":
31 try:
32 curses.wrapper(Viewer)
33 except KeyboardInterrupt:
34 pass
In our __init__
method, we’ll declare the state we need to track of and start the threads:
1SOCKET_PATH = "/tmp/my_socket"
2
3
4class Viewer:
5 def __init__(self, stdscr):
6 self.stdscr = stdscr
7 self.running = True
8 self.buffer = [""]
9 self.lock = threading.Lock()
10
11 # Ensure no leftover socket file
12 if os.path.exists(SOCKET_PATH):
13 os.remove(SOCKET_PATH)
14
15 # Create a Unix domain socket
16 self.server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
17 self.server_socket.bind(SOCKET_PATH)
18 self.server_socket.listen()
First we’ll define the loops for listening to the socket:
1 def read_socket(self):
2 """Continuously accept connections and read from the Unix domain socket."""
3 try:
4 while self.running: # Re-open the socket if it closes
5 conn, _ = self.server_socket.accept()
6 with conn:
7 while self.running: # Listen continuously and accept data in 1024 bytes at a time
8 data = conn.recv(1024)
9 if not data:
10 break
11 with self.lock: # Lock structures to this thread to avoid concurrency issues
12 line = data.decode()
13 try:
14 data = json.loads(line, strict=False)
15 except json.JSONDecodeError:
16 continue
17
18 content = data.get("message", {}).get("content", "")
19 done = data.get("done", False)
20
21 if not self.request_in_progress:
22 self.request_in_progress = True
23
24 self.buffer[-1] += content
25
26 if done:
27 self.request_in_progress = False
28 self.buffer[-1] += "\n\n"
29 self.buffer.append("")
30
31 except KeyboardInterrupt:
32 self.running = False
We can render this data in the screen using a run
method:
1 def run(self):
2 """Main curses loop to update the display."""
3 curses.curs_set(0) # Hide cursor
4 self.stdscr.timeout(100) # Refresh every 100ms
5
6 while self.running:
7 self.stdscr.clear()
8 h, w = self.stdscr.getmaxyx()
9
10 with self.lock:
11 wrapped_buffer = textwrap.wrap("\n".join(self.buffer), width=w - 1))
12
13 scroll_offset = max(0, len(wrapped_buffer) - self.stdscr.getmaxyx()[0])
14
15 total_lines = len(wrapped_buffer)
16 start = max(0, total_lines - h - self.scroll_offset)
17 visible_lines = wrapped_buffer[start : start + h]
18
19 for i, line in enumerate(visible_lines):
20 if i >= h:
21 break
22 self.stdscr.addstr(i, 0, line)
23
24 self.server_socket.close()
25 os.remove(SOCKET_PATH)
Then we can launch these this method on separate threads in __init__
, and start running the
application by calling the run
method:
1 def __init__(self, stdscr):
2 ...
3 # Start the input listener in a separate thread
4 self.listener_thread = threading.Thread(target=self.read_socket, daemon=True)
5 self.listener_thread.start()
6
7 self.run()
And now launching this script with python
, we get the rendered buffer where we can pipe curl
outputs from Ollama into so they can be displayed and rendered.
The next steps are to managing copying and pasting, I’ll write about how I solved that in a future post. For now, I hope this article on listening to sockets was informative!