ProbableOdyssey

Using Pipes and Sockets for asynchronous local AI Chats

Recently I was experimenting with writing a CLI interface from scratch. There’s a variety of new AI tools out there, but I was on the lookout for a plugin for Vim that would allow me to use a ChatGPT-like workflow without the constant alt-tabbing between windows and relentless copy-pasting.

The best one I found was vim-ai by madox2, I highly recommend checking it out. It seems to be the closest candidate for what I was looking for, but its chat functionality is synchronous. Every query paused all actions in Vim, meaning I couldn’t keep exploring until after the answer had fully generated. I explored the codebase, but it looked like it was not a simple feature to add, so I decided to explore this problem from a fresh canvas.

I was hesitant to use vimscript straight away (though I eventually built a solution with this). I initially took to Python to see if I could make a Vim-independent CLI solution so that:

This would give me the asynchronous implementation straight away. I used ncurses to generate a simple CLI and learned about sockets and pipes to allow this CLI to send and receive data to other processes. Essentially this CLI would behave a bit like less and watch combined together with some data processing thrown in.

Using Ollama for a local API, I could generate resposes using

1curl -s http://localhost:11434/api/chat --no-buffer -d '{
2  "model": "phi4:latest",
3  "messages": [
4    {
5      "role": "user",
6      "content": "hello"
7    }
8  ]
9}'

which gives me a streaming repose like

{"model":"phi4:latest","created_at":"2025-02-15T04:15:47.51450141Z","message":{"role":"assistant","content":"Hello"},"done":false}
{"model":"phi4:latest","created_at":"2025-02-15T04:15:47.60217441Z","message":{"role":"assistant","content":"!"},"done":false}
{"model":"phi4:latest","created_at":"2025-02-15T04:15:47.676216968Z","message":{"role":"assistant","content":" How"},"done":false}
{"model":"phi4:latest","created_at":"2025-02-15T04:15:47.749957446Z","message":{"role":"assistant","content":" can"},"done":false}
{"model":"phi4:latest","created_at":"2025-02-15T04:15:47.823717019Z","message":{"role":"assistant","content":" I"},"done":false}
{"model":"phi4:latest","created_at":"2025-02-15T04:15:47.897391602Z","message":{"role":"assistant","content":" assist"},"done":false}
{"model":"phi4:latest","created_at":"2025-02-15T04:15:47.971128894Z","message":{"role":"assistant","content":" you"},"done":false}
{"model":"phi4:latest","created_at":"2025-02-15T04:15:48.045499791Z","message":{"role":"assistant","content":" today"},"done":false}
{"model":"phi4:latest","created_at":"2025-02-15T04:15:48.11924118Z","message":{"role":"assistant","content":"?"},"done":false}
{"model":"phi4:latest","created_at":"2025-02-15T04:15:49.391045405Z","message":{"role":"assistant","content":""},"done_reason":"stop","done":true,"total_duration":21531784605,"load_duration":19128188606,"prompt_eval_count":11,"prompt_eval_duration":522000000,"eval_count":26,"eval_duration":1877000000}

So this CLI would have to listen to these incoming json objects, parse the content out of them and display it on a screen.

So I started with defining a single class for this CLI, which has a socket for receiving input:

 1import time
 2import os
 3import curses
 4import threading
 5import socket
 6import textwrap
 7
 8SOCKET_PATH = "/tmp/pipe_viewer_socket"
 9
10
11class Viewer:
12    def __init__(self, stdscr):
13        self.stdscr = stdscr
14        self.running = True
15        ...
16
17    def read_socket(self):
18        """Continuously accept connections and read from the Unix domain socket."""
19        try:
20            while self.running:
21                ...
22        except KeyboardInterrupt:
23            self.running = False
24
25    def run(self):
26        """Main curses loop to update the display."""
27        ...
28
29
30if __name__ == "__main__":
31    try:
32        curses.wrapper(Viewer)
33    except KeyboardInterrupt:
34        pass

In our __init__ method, we’ll declare the state we need to track of and start the threads:

 1SOCKET_PATH = "/tmp/my_socket"
 2
 3
 4class Viewer:
 5    def __init__(self, stdscr):
 6        self.stdscr = stdscr
 7        self.running = True
 8        self.buffer = [""]
 9        self.lock = threading.Lock()
10
11        # Ensure no leftover socket file
12        if os.path.exists(SOCKET_PATH):
13            os.remove(SOCKET_PATH)
14
15        # Create a Unix domain socket
16        self.server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
17        self.server_socket.bind(SOCKET_PATH)
18        self.server_socket.listen()

First we’ll define the loops for listening to the socket:

 1    def read_socket(self):
 2        """Continuously accept connections and read from the Unix domain socket."""
 3        try:
 4            while self.running:  # Re-open the socket if it closes
 5                conn, _ = self.server_socket.accept()
 6                with conn:
 7                    while self.running:  # Listen continuously and accept data in 1024 bytes at a time
 8                        data = conn.recv(1024)
 9                        if not data:
10                            break
11                        with self.lock:  # Lock structures to this thread to avoid concurrency issues
12                            line = data.decode()
13                            try:
14                                data = json.loads(line, strict=False)
15                            except json.JSONDecodeError:
16                                continue
17
18                            content = data.get("message", {}).get("content", "")
19                            done = data.get("done", False)
20
21                            if not self.request_in_progress:
22                                self.request_in_progress = True
23
24                            self.buffer[-1] += content
25
26                            if done:
27                                self.request_in_progress = False
28                                self.buffer[-1] += "\n\n"
29                                self.buffer.append("")
30
31        except KeyboardInterrupt:
32            self.running = False

We can render this data in the screen using a run method:

 1    def run(self):
 2        """Main curses loop to update the display."""
 3        curses.curs_set(0)  # Hide cursor
 4        self.stdscr.timeout(100)  # Refresh every 100ms
 5
 6        while self.running:
 7            self.stdscr.clear()
 8            h, w = self.stdscr.getmaxyx()
 9
10            with self.lock:
11                wrapped_buffer = textwrap.wrap("\n".join(self.buffer), width=w - 1))
12
13                scroll_offset = max(0, len(wrapped_buffer) - self.stdscr.getmaxyx()[0])
14
15                total_lines = len(wrapped_buffer)
16                start = max(0, total_lines - h - self.scroll_offset)
17                visible_lines = wrapped_buffer[start : start + h]
18
19                for i, line in enumerate(visible_lines):
20                    if i >= h:
21                        break
22                    self.stdscr.addstr(i, 0, line)
23
24        self.server_socket.close()
25        os.remove(SOCKET_PATH)

Then we can launch these this method on separate threads in __init__, and start running the application by calling the run method:

1    def __init__(self, stdscr):
2        ...
3        # Start the input listener in a separate thread
4        self.listener_thread = threading.Thread(target=self.read_socket, daemon=True)
5        self.listener_thread.start()
6
7        self.run()

And now launching this script with python, we get the rendered buffer where we can pipe curl outputs from Ollama into so they can be displayed and rendered.

The next steps are to managing copying and pasting, I’ll write about how I solved that in a future post. For now, I hope this article on listening to sockets was informative!

Reply to this post by email ↪