Use Cases
Docs
Blog Articles
BlogResources
Pricing
PricingBuild Your Own Custom Robot Assistant
Co-founder & CEO, InfinyOn
Many successful modern applications need to interact with their users in real-time, and this capability is quickly becoming the expected standard. However, building a real-time application from scratch is a daunting task, pulling focus away from the business problems InfinyOn Team is actually trying to solve. Fluvio is a real-time data streaming platform designed to make real-time application development easy.
In this blog post, we’re going to build a Robot Assistant, an add-on button on the website, that interacts with users in real-time.
We’ll build the frontend and backend, then use Fluvio as our data streaming layer. Fluvio data streaming gives us the ability to react in real-time, deploy to a massive audience, and preserve all data exchanges.
The project is also available for download in github.
Prerequisites
This project is using websocket-glue
for the client/server communication. For additional information on websocket checkout our blog:
Familiarity with the following software packages is useful but not required: Javascript, TypeScript, Node.js, and WebSocket.
Overview
This blog takes a step-by-step approach to building a robot assistant, called Bot Assistant
, from the ground up. The following outline shows the steps involved:
- Step 1: Create the project
- Step 2: Implement backend server
- Step 3: Implement frontend client
- Step 4: Add data streaming and persistency
Step 1: Create the project
Bot assistant
has a client and a server. The client runs in the web browser and controls the frontend user interaction, while the backend runs on a server and manages the websocket proxy and the state machine. The client and the server communicate with ech other through websocket.
Let’s get started:
- Add project directory
- Add node.js server
- Add typescript configuration
- Add
bot-assistant.ts
server file
Add project directory
Create a project directory called bot-assistant
with two folders public
and src
:
$ mkdir -p bot-assistant/{public,src} && cd bot-assistant
The public
directory stores the client code, and the src
directory contains the server code (the “app server”). Both the client and the app server are served from the same web server that we’ll set up next.
Add node.js server
Create a Node.js project and implement the server. This project is using Node.js v13:
$ npm init -y
which yields the following package.json
file:
{
"name": "bot-assistant",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [],
"author": "fluvio <[email protected]> (fluvio.io)",
"license": "Apache 2.0"
}
Install express
, typescript
and a few other development services:
$ npm install typescript express ws @fluvio/client
Add watcher and typescript definitions:
$ npm install -D tsc-watch @types/ws @types/node @types/express
We installed the following packages:
- express: to serve the client and server files.
- ws: for client/server communication.
- @fluvio/client: node API library to communicate with fluvio.
- tsc-watch: to keep track of typescript file changes.
Update package.json file as follows:
{
"name": "bot-assistant",
"version": "1.0.0",
"description": "",
"main": "bot-assistant.js",
"scripts": {
"start:server": "tsc-watch --onSuccess \"node ./dist/bot-assistant.js $PARAMS\""
},
"keywords": [],
"author": "fluvio <[email protected]> (fluvio.io)",
"license": "Apache 2.0",
"dependencies": {
"@fluvio/client": "^0.6.0-beta.3",
"express": "^4.17.1",
"typescript": "^4.1.3",
"ws": "^7.4.2"
},
"devDependencies": {
"@types/express": "^4.17.9",
"@types/node": "^14.14.19",
"@types/ws": "^7.4.0",
"tsc-watch": "^4.2.9"
}
}
Change main
to reference bot-assistant.js
and start:dev
script to start the typescript watcher.
Add typescript configuration
The project is implemented in typescript which requires a typescript configuration file.
Add the tsconfig.json
typescript configuration file:
$ touch tsconfig.json
Paste the following content in the tsconfig.json
file:
{
"compilerOptions": {
"target": "es6",
"module": "commonjs",
"lib": [
"dom",
"ES2017",
"ES2015"
],
"outDir": "dist",
"strict": true,
"moduleResolution": "node",
"esModuleInterop": true,
},
"include": [
"src/*",
],
}
For additional information on the typescript configuration parameters, check out the documentation.
Add bot-assistant.ts
server file
The package.json
file instructs by Node.js to run bot-assistant.js
when it initializes. In typescript, this file is compiled from bot-assistant.ts
. This is the place where we provision the web server, add routes for the frontend, and initialize backend services.
Create the bot-assistant.ts
file in the src
directory:
touch src/bot-assistant.ts
Paste the following content in the src/bot-assistant.ts
file:
import http from "http";
import express from "express";
import path from 'path';
const PORT = 9998;
const startServer = async () => {
const app = express();
const publicPath = path.join(__dirname, '..', 'public')
app.get('/', (req, res) => {
res.sendFile(path.join(publicPath, 'index.html'));
});
app.use("/scripts", express.static(path.join(publicPath, 'scripts')));
app.use("/css", express.static(path.join(publicPath, 'css')));
app.use("/img", express.static(path.join(publicPath, 'img')));
const Server = http.createServer(app);
Server.listen(PORT, () => {
console.log(
`started bot assistant server at http://localhost:${PORT}...`
);
});
};
process.on("uncaughtException", (e) => { console.log(e); process.exit(1); });
process.on("unhandledRejection", (e) => { console.log(e); process.exit(1); });
startServer();
This code adds routes for the frontend client and starts a server on port 9998. The routes are as follows:
- / (root) => `public/index.html
- /scripts =>
public/scripts
- /css =>
public/css
- /img =>
public/img
Next, we’ll implement the backend server followed by the frontend client.
Step 2: Implement backend server
The backend server has two core services, Proxy Service
and Workflow Service
.
The Proxy service
handles the connection between the client and the workflows. It accepts websocket connections, forwards client messages to the workflow service, and returns the replies to the originator.
The Workflow Service
manages state transitions. It initializes the state machine from a json file, accepts client state messages, computes the next state and returns a reply.
The backend server implementation has several steps:
- Add messages type definition file
- Add state machine
- Add workflow controller
- Add proxy service
- Add
bot-server.ts
file
If you prefer to skip ahead, you can download the source code from github and resume at start backend server.
Add messages type definition file
The messages file defines the types of the messages for the client/server communication. The messages are shared by both, proxy and workflow services, and it will be a top level file.
Let’s add messages.ts
file inside src
directory:
$ touch src/messages.ts
Paste the following message type definitions:
export type TimeStamp = string;
export type SID = string;
export interface Message {
sid: SID;
payload?: Payload;
timestamp: TimeStamp;
}
export type Payload =
| Request
| Response;
export interface Request {
kind: "Request",
message: RequestMessage,
}
export interface Response {
kind: "Response",
message: ResponseMessage,
}
export type RequestMessage =
| BotText
| ChoiceRequest
| StartChatSession
| EndChatSession;
export type ResponseMessage =
| ChoiceResponse
| UserText
export interface BotText {
kind: "BotText",
content: string
}
export interface ChoiceRequest {
kind: "ChoiceRequest",
question: string,
groupId: string,
choices: Array<Choice>,
}
export interface Choice {
itemId: string,
content: string
}
export interface StartChatSession {
kind: "StartChatSession",
sessionId: string,
chatPrompt?: string,
chatText?: string,
}
export interface EndChatSession {
kind: "EndChatSession",
sessionId: string,
}
export interface ChoiceResponse {
kind: "ChoiceResponse",
groupId: string,
itemId: string,
content?: string,
}
export interface UserText {
kind: "UserText",
sessionId: string,
content?: string,
}
export function buildInitMessage(sid: SID) {
return <Message>{
sid: sid,
timestamp: getDateTime(),
};
};
export function buildRequest(sid: SID, message: RequestMessage) {
return <Message>{
sid: sid,
payload: <Request>{ kind: "Request", message: message },
timestamp: getDateTime(),
};
};
export function buildResponse(sid: SID, message: ResponseMessage) {
return <Message>{
sid: sid,
payload: <Response>{ kind: "Response", message: message },
timestamp: getDateTime(),
};
};
export function isRequest(payload?: Payload) {
return (payload) ? (payload.kind == "Request") : false;
}
function getDateTime() {
return new Date(Date.now() - new Date().getTimezoneOffset() * 60000)
.toISOString()
.slice(0, -1);
}
The message definitions are as follows:
- Message: is the top level type definition.
- Payload: defines payload types: request or response.
- Request: defines request messages (BotText, ChoiceRequest, StartChatSession, EndChatSession).
- Response: defines response messages (ChoiceResponse, UserText).
- BotText: is a text message sent by the Bot (text parsed as HTML).
- ChoiceRequest: is an array of choices sent by the Bot.
- StartChatSession: is a request by the Bot to enable chat editor.
- EndChatSession: is a request sent by the Bot to disable chat editor.
- ChoiceResponse: is the response to a ChoiceRequest.
- UserText: is text sent by the User.
The definitions are followed by a series of helper APIs:
- buildInitMessage: creates a message without a payload that indicates a new connection.
- buildRequest: creates a
Request
message. - buildResponse: creates a
Response
message. - isRequest: checks if the message is of
Request
kind. - getDateTime: generates a timestamp.
The type definitions are used extensively by the state machine
defined in the following section.
Add state machine
This project uses a state machine to implement the behavior of the robot assistant. We may think of a state machine as a guided tour where all traffic follows a well-defined path. The state machine defines the choices and the order in which they are to be sent to the client. Upon receipt, the client generates a response and returns an answer. The state machine uses the answer to identify the location to resume and generates the next choice. This request/response exchange continues until the end state is reached.
Define state machine types
The state machine is a chain of states expressed in a JSON format. Each state can have one of two types: sendRequest
, or matchResponse
. The sendRequest
state instructs the workflow controller to generate a message and wait for the response. When the response arrives, the controller looks up the matchResponse
state to identify where it should resume. Each request/response pair has a unique identifier. The identifier is a unique id that defines the context of a client/server message exchange. The final state is defined by a state without a next
field.
The workflow controller generates one of the following sendRequest
messages:
- BotText - sends the client an information field in text or HTML format.
- ChoiceRequest - sends a list of choices to the user. GroupId is the unique identifier paired with a ChoiceResponse.
- StartChatSession - asks the client to enable chat editor. SessionId is the unique identifier paired with a UserText
- EndChatSession - ask the client to disable chat session. Uses the SessionId paired with a StartChatSession
The Client replies with one of the following mathResponse
messages:
- ChoiceResponse - send one of the choices in the ChoiceRequest.
- UserText - sends text generated by the user.
Define state transitions
The state transition have two flows:
- internal flows - driven by one or more internal states.
- external flows - driven by an external state. An external state tells the engine to generate a request and wait for the response before resuming.
Internal states have a next
field whereas external states have a sessionId
or groupId
but no next
field. Internal states are chained internally, whereas external are chained externally through a client response.
State transitions are triggered by a new connection or a client response. If it begins at an internal state, the engine collects the state information and moves to the next state until it encounters an external state. At that time, it generates a client request and waits for the response before it can resume.
The client displays the request choices and asks the user to make a selection. Upon selection, the client generates a response and the cycle repeats.
Now that we have defined the state machine and the state transition, let’s start the implementation.
Create a state machine JSON file
We’ll create a state machine asks the user for their favorite programming language and collect their response.
Let’s create state-machine
directory and add bot-assistant.json
file:
$ mkdir state-machines && touch state-machines/bot-assistant.json
Copy following state machine definition in the JSON file:
{
"greetings": {
"sendRequest": {
"kind": "BotText",
"content": "Hi, I'm Bot! Nice to meet you."
},
"next": "langChoices"
},
"langChoices": {
"sendRequest": {
"kind": "ChoiceRequest",
"groupId": "lang",
"question": "What programming language do you use in your hobby projects?",
"choices": [
{
"itemId": "rust",
"content": "Rust"
},
{
"itemId": "go",
"content": "Go"
},
{
"itemId": "other",
"content": "Other"
}
]
}
},
"langChoiceRust": {
"matchResponse": {
"kind": "ChoiceResponse",
"groupId": "lang",
"itemId": "rust"
},
"next": "anyOtherChoices"
},
"langChoiceGo": {
"matchResponse": {
"kind": "ChoiceResponse",
"groupId": "lang",
"itemId": "go"
},
"next": "anyOtherChoices"
},
"langChoiceOther": {
"matchResponse": {
"kind": "ChoiceResponse",
"groupId": "lang",
"itemId": "other"
},
"next": "startLangPrefSession"
},
"startLangPrefSession": {
"sendRequest": {
"kind": "StartChatSession",
"sessionId": "langPreference",
"chatPrompt": "Type your preferred language here...",
"chatText": "I enabled chat editor"
}
},
"getLangPrefResponse": {
"matchResponse": {
"kind": "UserText",
"sessionId": "langPreference"
},
"next": "endLangPrefSession"
},
"endLangPrefSession": {
"sendRequest": {
"kind": "EndChatSession",
"sessionId": "langPreference"
},
"next": "anyOtherChoices"
},
"anyOtherChoices": {
"sendRequest": {
"kind": "ChoiceRequest",
"groupId": "others",
"question": "Any other?",
"choices": [
{
"itemId": "yes",
"content": "Yes"
},
{
"itemId": "no",
"content": "No"
}
]
}
},
"anyOtherYes": {
"matchResponse": {
"kind": "ChoiceResponse",
"groupId": "others",
"itemId": "yes"
},
"next": "langChoices"
},
"anyOtherNo": {
"matchResponse": {
"kind": "ChoiceResponse",
"groupId": "others",
"itemId": "no"
},
"next": "done"
},
"done": {
"sendRequest": {
"kind": "BotText",
"content": "Great, thanks!"
}
}
}
The state machine asks users for their favorite programming language and it presents them 3 options: Rust
, Go
, and Other
.
If a user chooses Rust
or Go
, the state machine return:
- anyOtherChoices - another choice with
yes
orno
answers.
For Other
, it runs through the following states:
- startLangPrefSession opens an interactive session,
- getLangPrefResponse captures the user response,
- endLangPrefSession ends the interaction session.
This basic state machine show two different interaction models: a choice request/response or a user interaction. When the client receives a choice request, it presents the user with a series of choices. The user clicks on one of the choices and the client generates a response. For an interactive session, the client is asked to open an interactive session for the user to type his answer. After the server receives the response, it sends the client another request to close the interactive session. It is the responsibility of the server to manage access to the user editor.
Next, we need to load the JSON file into memory.
Add state-machine.ts
file
The state machine is part of the workflow service that we’ll define in the next section.
Create a workflow-service
directory and add the state-machine.ts
file:
$ mkdir src/workflow-service && touch src/workflow-service/state-machine.ts
Paste the following code in the state-machine.ts
file:
import Fs from "fs";
import { RequestMessage, ResponseMessage } from "../messages";
type Name = string;
/* State Machine definition */
export type StateMachine = Map<Name, State>;
export interface State {
sendRequest?: RequestMessage,
matchResponse?: ResponseMessage;
next?: string,
}
/* Load state machine from JSON file */
export function loadStateMachine(filePath: string) {
const jsonFile = Fs.readFileSync(filePath);
const jsonObject = JSON.parse(jsonFile.toString());
const state_machine: StateMachine = new Map();
for (var value in jsonObject) {
state_machine.set(value, jsonObject[value])
}
return state_machine;
}
The code reads the JSON file, and provisions an internal state machine variable.
Add workflow controller
The workflow controller is the mediator between the websocket proxy and the state machine. The controller receives messages from the client, computes the next state, generates a reply, and sends a response.
Add workflow-controller.ts
file
Add the workflow-controller.ts
file to the workflow-service
directory:
$ touch src/workflow-service/workflow-controller.ts
Paste the following code:
import {
SID,
Message,
ResponseMessage,
ChoiceResponse,
UserText,
buildRequest,
isRequest
} from "../messages";
import { StateMachine, State } from "./state-machine";
import { SessionController } from "../proxy-service/session-controller";
export class WorkflowController {
private stateMachine: StateMachine;
private initState: string;
private sessionController: SessionController;
constructor(
stateMachine: StateMachine,
) {
this.stateMachine = stateMachine;
this.initState = stateMachine.keys().next().value;
this.sessionController = Object();
}
public init(sessionController: SessionController) {
this.sessionController = sessionController;
}
private processNewConnection(sid: SID) {
const nextStates = this.processNext(this.initState);
this.sendMessages(sid, nextStates);
}
private processNextState(sid: SID, response: ResponseMessage) {
const state: string = this.getState(response);
const nextStates = this.processNext(state);
this.sendMessages(sid, nextStates);
}
private getState(response: ResponseMessage) {
switch (response.kind) {
case "ChoiceResponse": {
return this.getChoiceResponseState(response);
}
case "UserText": {
return this.getUserTextState(response);
}
}
}
private processNext(startState: string) {
var nextStates: State[] = [];
var state = this.stateMachine.get(startState);
while (state) {
nextStates.push(state);
const next = state.next || "";
state = this.stateMachine.get(next);
if (next.length > 0 && !state) {
console.error(`Error: Cannot find next state: ${next}`);
}
}
return nextStates;
}
private getChoiceResponseState(choiceResponse: ChoiceResponse) {
for (let [key, state] of this.stateMachine.entries()) {
if (state.matchResponse &&
state.matchResponse.kind == choiceResponse.kind &&
state.matchResponse.groupId == choiceResponse.groupId &&
state.matchResponse.itemId == choiceResponse.itemId) {
return key;
}
}
console.error(`Error: cannot find choice ${JSON.stringify(choiceResponse)}`);
return this.initState;
}
private getUserTextState(userText: UserText) {
for (let [key, state] of this.stateMachine.entries()) {
if (state.matchResponse &&
state.matchResponse.kind == "UserText" &&
state.matchResponse.sessionId == userText.sessionId) {
return key;
}
}
console.error(`Error: cannot find user session ${JSON.stringify(userText)}`);
return this.initState;
}
private sendMessages(sid: SID, nextStates: State[]) {
for (let idx = 0; idx < nextStates.length; idx++) {
const state = nextStates[idx];
if (state.sendRequest) {
const message = buildRequest(sid, state.sendRequest);
this.sessionController.processBotMessage(JSON.stringify(message));
}
}
}
public processProxyMessage(clientMessage: string) {
const message: Message = JSON.parse(clientMessage);
if (!isRequest(message.payload)) {
const sid = message.sid;
if (message.payload) {
this.processNextState(
sid,
<ResponseMessage>message.payload.message
);
} else {
this.processNewConnection(sid);
}
}
}
}
The workflow controller performs the following functions:
- constructor: caches a reference to the
stateMachine
and computes the initial state. - init: caches a reference to the
sessionController
. This is done out of the constructor due to the circular reference. We’ll come back to this when in theFluvio data streaming
section.- Note: The code does not compile until we add the session controller in the following section.
- processProxyMessage: is invoked by session controller to process a new client message. If the message has payload, it asks for next request, otherwise is needs the initial request:
- processNewConnection reads the state machine from the first state and produces a request.
- processNextState parses the client response, looks-up the resume state, and produces the next request.
The other APIs help the controller match a response and traverse the state machine to generate subsequent requests.
Add proxy service
The proxy service has three components, incoming proxy ProxyIn
, outgoing proxy ProxyOut
and the session controller. The incoming proxy handles the websocket protocol, outgoing proxy sends messages based on a session id, and the session controller the interaction between the proxy and other services.
For additional details, checkout Websocket Glue for Data Streaming Apps.
Add outgoing proxy
Create a directory for the proxy-service
and add proxy-out.ts
file:
mkdir src/proxy-service && touch src/proxy-service/proxy-out.ts
Paste the following code:
import WS from "ws";
type SID = string;
export class WsProxyOut {
private sessions: Map<SID, WS>;
constructor() {
this.sessions = new Map();
}
public addSession(sid: SID, ws: WS) {
this.sessions.set(sid, ws);
}
public closeSession(sid: SID) {
const ws = this.sessions.get(sid);
if (ws) {
ws.close();
}
this.sessions.delete(sid);
}
public sendMessage(sid: SID, message: string) {
const ws = this.sessions.get(sid);
if (ws) {
ws.send(message);
}
}
}
As descried in the websocket-glue blog, ProxyOut keeps a mapping between session session id and the websocket session.
Add incoming proxy
Add proxy-in.ts
file to manage the websocket protocol:
$ touch src/proxy-service/proxy-in.ts
Paste the following code:
import crypto from 'crypto';
import WS from "ws";
import http from "http";
import { SessionController } from "./session-controller";
const COOKIE_NAME = "Fluvio-Bot-Assistant"
export class WsProxyIn {
private static wss: WS.Server;
private static sessionController: SessionController;
constructor(sessionController: SessionController) {
WsProxyIn.wss = new WS.Server({ clientTracking: false, noServer: true });
WsProxyIn.sessionController = sessionController;
}
public init(server: http.Server) {
this.onUpgrade(server);
this.onConnection();
}
private onUpgrade(server: http.Server) {
server.on("upgrade", (request, socket, head) => {
const session = WsProxyIn.parseCookie(COOKIE_NAME, request.headers.cookie);
if (session) {
request.headers.session = session;
}
WsProxyIn.wss.handleUpgrade(request, socket, head, function (ws: WS) {
WsProxyIn.wss.emit("connection", ws, request);
});
});
}
private onConnection() {
WsProxyIn.wss.on("headers", (headers: Array<string>, req) => {
const session = WsProxyIn.parseCookie(COOKIE_NAME, req.headers.cookie);
if (!session) {
let session = crypto.randomBytes(20).toString("hex");
req.headers.session = session;
headers.push("Set-Cookie: " + COOKIE_NAME + "=" + session);
}
});
WsProxyIn.wss.on("connection", async (ws, req) => {
const session_hdr = req.headers.session;
const sid = ((Array.isArray(session_hdr)) ? session_hdr[0] : session_hdr) || "";
await WsProxyIn.sessionController.sessionOpened(sid, ws);
ws.on("close", async () => {
await WsProxyIn.sessionController.sessionClosed(sid);
});
ws.on("message", async (clientMsg: string) => {
await WsProxyIn.sessionController.messageFromClient(sid, clientMsg);
});
});
}
private static parseCookie(cookieName: string, cookie_hdr?: string) {
if (cookie_hdr) {
const cookiePair = cookie_hdr.split(/; */).map((c: string) => {
const [key, v] = c.split('=', 2);
return [key, decodeURIComponent(v)];
}).find(res =>
(res[0] == cookieName)
);
if (Array.isArray(cookiePair) && cookiePair.length > 1) {
return cookiePair[1];
}
}
return undefined;
}
}
As descried in the websocket-glue blog, the code accepts websocket connections, provisions cookies (Fluvio-Bot-Assistant
), and passes the messages to the session controller.
Add session controller
Add session-controller.ts
file:
$ touch src/proxy-service/session-controller.ts
Paste the following code:
import WS from "ws";
import { WsProxyOut } from "./proxy-out";
import { Message, SID, buildInitMessage, buildResponse, isRequest } from "../messages";
import { WorkflowController } from "../workflow-service/workflow-controller";
type Messages = Array<Message>;
export class SessionController {
private sessionMessages: Map<SID, Messages>;
private proxyOut: WsProxyOut;
private workflowController: WorkflowController;
constructor(
proxyOut: WsProxyOut,
) {
this.sessionMessages = new Map();
this.proxyOut = proxyOut;
this.workflowController = Object();
}
public init(workflowController: WorkflowController) {
this.workflowController = workflowController;
this.show();
}
public sessionOpened(sid: SID, ws: WS) {
console.log(`start session - ${sid}`);
this.proxyOut.addSession(sid, ws);
const messages = this.sessionMessages.get(sid);
if (messages) {
this.sendMessagesToClient(messages);
} else {
const message = buildInitMessage(sid);
this.workflowController.processProxyMessage(JSON.stringify(message));
}
}
public sessionClosed(sid: SID) {
console.log(`end session - ${sid}`);
this.proxyOut.closeSession(sid);
}
public messageFromClient(sid: SID, clientMsg: string) {
console.log(`${sid} <== ${clientMsg}`);
const clientResponse = buildResponse(sid, JSON.parse(clientMsg));
this.addMessageToSession(clientResponse);
this.workflowController.processProxyMessage(JSON.stringify(clientResponse));
}
public sendMessagesToClient(messages: Messages) {
messages.forEach(message => {
this.sendMessageToClient(message);
});
}
public sendMessageToClient(message: Message) {
if (message.payload) {
const clientMessage = message.payload.message;
this.proxyOut.sendMessage(message.sid, JSON.stringify(clientMessage));
}
}
private addMessageToSession(message: Message) {
const sid = message.sid;
var messages = this.sessionMessages.get(sid);
if (!messages) {
messages = new Array();
}
messages.push(message);
this.sessionMessages.set(sid, messages);
}
public processBotMessage(botMessage: string) {
const message: Message = JSON.parse(botMessage);
this.addMessageToSession(message);
if (isRequest(message.payload)) {
this.sendMessageToClient(message);
}
}
private show() {
let table = new Map();
for (let [sid, value] of this.sessionMessages) {
table.set(sid, value.length);
}
console.table(table, ["SID", "Messages"]);
}
}
The session controller keeps a local copy of the messages exchanges anchored by session id. When a known session re-initiates a connection, the controller plays back the messages from memory. All other requests are passed along to the workflow controller.
We are now ready to add the bot-server
file and initialize all server components.
Add bot-server.ts
file
Add bot-server.ts
file in the src
directory:
touch src/bot-server.ts
Paste the following code:
import { Server } from "http";
import { WsProxyIn } from "./proxy-service/proxy-in";
import { WsProxyOut } from "./proxy-service/proxy-out";
import { StateMachine, loadStateMachine } from "./workflow-service/state-machine";
import { WorkflowController } from "./workflow-service/workflow-controller";
import { SessionController } from "./proxy-service/session-controller";
export const initBotAssistant = (server: Server) => {
const wsProxyOut = new WsProxyOut();
const sessionController = new SessionController(wsProxyOut);
const wsProxyIn = new WsProxyIn(sessionController);
let filePath = getFileName();
const stateMachine: StateMachine = loadStateMachine(filePath);
const workflowController = new WorkflowController(stateMachine);
sessionController.init(workflowController);
workflowController.init(sessionController);
wsProxyIn.init(server);
};
const getFileName = () => {
if (process.argv.length != 3) {
console.log("Usage: node bot-assistant.js <state-machine.json>");
process.exit(1);
}
return process.argv[2];
}
The bot server file initializes all server components: incoming proxy, outgoing proxy, session controller, state machine and workflow controller.
To address circular reference challenges, it initializes sessionController
and workflowController
separately from the constructors. We’ll come back to this in the Fluvio data streaming section.
Update bot-assistant.ts
file
The last step of the implementation integrates initBotAssistant
into the bot-assistant.ts
file.
Update the bot-assistant.ts
file as follows:
import http from "http";
import express from "express";
import path from 'path';
import { initBotAssistant } from "./bot-server";
const PORT = 9998;
const startServer = async () => {
const app = express();
const publicPath = path.join(__dirname, '..', 'public')
app.get('/', (req, res) => {
res.sendFile(path.join(publicPath, 'index.html'));
});
app.use("/scripts", express.static(path.join(publicPath, 'scripts')));
app.use("/css", express.static(path.join(publicPath, 'css')));
app.use("/img", express.static(path.join(publicPath, 'img')));
const Server = http.createServer(app);
await initBotAssistant(Server);
Server.listen(PORT, () => {
console.log(
`started bot assistant server at http://localhost:${PORT}...`
);
});
};
process.on("uncaughtException", (e) => { console.log(e); process.exit(1); });
process.on("unhandledRejection", (e) => { console.log(e); process.exit(1); });
startServer();
The code initializes the bot-server
which needs access to HTTP server. Hence initBotAssistant
is called after the Server
is provisioned, and the server is passed through the function parameter.
Start backend server
Let’s start the server using the bot-assistant.json
state machine file. Npm reads the command line parameters through environment variables:
$ PARAMS=state-machines/bot-assistant.json npm run start:server
4:35:59 PM - Starting compilation in watch mode...
4:36:01 PM - Found 0 errors. Watching for file changes.
┌───────────────────┬─────┬────────┐
│ (iteration index) │ Key │ Values │
├───────────────────┼─────┼────────┤
└───────────────────┴─────┴────────┘
started bot assistant server at http://localhost:9998...
Step 3: Implement frontend client
The frontend client has two HTML components:
Bot
button,Bot Assistant
dialog box
The Bot
button is displayed on the lower right-hand side of the screen that opens the Bot Assistant
dialog box. The dialog box is closed, the Bot
button is shown again. In essence, the two components toggle each other on and off.
The client builds the HTML components dynamically through javascript and it communicates with the web server through websocket.
The client is implemented in several steps:
- Add
index.html
file - Add stylesheet file
- Add assistant images
- Add
assistant.js
script - Load
reconnecting-socket.js
file - Test Bot Assistant (v1)
Add index.html
file
The front end client content is placed in the public
directory. Let’s add index.html
file:
$ touch public/index.html
Paste the following code in index.html
file:
<!DOCTYPE HTML>
<html>
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<link rel="stylesheet" type="text/css" href="css/assistant.css"/>
<script type = "text/javascript" src="scripts/assistant.js"></script>
</head>
<body>
<div class="assistant"></div>
<!-- debugging area - begin -->
<textarea id="debugOutput" rows="20" cols="60" readonly></textarea>
<!-- debugging area - end -->
</body>
</html>
In the header we are referencing two files:
css/assistant.css
- styles filescripts/assistant.js
- script file that builds the DOM elements.
In the body, there is a div
with class named assistant
. The script file looks-up this div
to attach DOM elements. For troubleshooting, there is a textarea
that prints debugging information.
Add stylesheet file
The stylesheet controls the look and feel of the Bot Assistant
button dialog box.
Add a stylesheet called assistant.css
to public/css
directory:
$ mkdir public/css && touch public/css/assistant.css
Paste the following code in assistant.css
file:
.assistant {
font-family: 'Lucida Sans', Geneva, Verdana, sans-serif;
position:fixed;
bottom:20px;
right:25px;
}
/* Assistant - Button */
.assistant button {
width: 45px;
height: 45px;
background:#008CBA;
border-radius:5px;
cursor:pointer;
border: none;
outline: none;
}
.assistant button img {
padding-top:5px;
width: 25px;
height: 25px;
}
.assistant button:focus {
border: none;
outline: none;
}
/* Assistant - Chat Box */
.assistant .chat{
display: none;
width:360px;
background:white;
border-radius:5px 5px 0px 0px;
border: 1px solid gray;
}
.assistant .header{
background: #008CBA;
color:white;
padding:8px;
font-weight:bold;
border-radius:5px 5px 0px 0px;
line-height: 32px;
}
.assistant .header span{
padding-left:0;
font-size: 11pt;
}
.assistant .header img {
width:18px;
height:35px;
margin-right: 10px;
float:right;
}
.assistant .header img.bot {
width:35px;
height:35px;
border-radius:50%;
background:#bbb;
float:left;
}
.assistant .header .overlay {
background-color: #f6f6f6;
padding: 1px;
position: absolute;
top: 32px;
left: 33px;
border-radius: 12px;
z-index: 1;
}
.assistant .header .status{
width: 12px;
height: 12px;
border-radius: 12px;
}
.assistant .header .status.off{
background-color: #FF3B28;
}
.assistant .header .status.on{
background-color: greenyellow;
}
.assistant .header .close{
float:right;
cursor:pointer;
width: 28px;
margin-right: 0;
}
.assistant .inner-body{
min-height: 250px;
max-height: calc(100vh - 300px);
overflow: auto;
overflow-x: hidden;
}
.assistant .msg-body {
font-size:12px;
padding: 10px 10px 5px 5px;
}
.assistant .msg-left{
margin-bottom:7px;
word-break: break-all;
}
.assistant .msg-left .avatar {
width: 50px;
margin-top: -40px;
}
.assistant .msg-left .operator {
margin-top: -40px;
padding: 1px;
font-size:1.6em;
width:35px;
height:35px;
line-height:1.8em;
text-align:center;
border-radius:50%;
background:plum;
color:white;
}
.assistant .msg-left img {
width:35px;
height:35px;
border-radius:50%;
background:#bbb;
border: 1px solid #eee;
}
.assistant .msg-left .msg {
background:#f2f2f2;
padding:10px;
min-height:15px;
margin: 3px;
border: 1px solid #ddd;
border-radius:7px;
margin-left: 44px;
margin-right: 30px;
}
.assistant .msg-left .button {
margin: -2px 30px 7px 50px;
}
.assistant .msg-right {
position: relative;
right: 0px;
margin: 3px;
margin-bottom:10px;
}
.assistant .msg-right .msg {
background:#d4e7fa;
padding:10px;
min-height:15px;
margin-left: 80px;
border-radius:7px;
word-break: break-all;
}
.assistant .msg-right .button {
float: right;
}
/* button */
.assistant .btn {
display: inline-block;
margin: 2px;
width: 100%;
}
.assistant .button {
width: max-content;
border-radius:15px;
padding: 10px 15px;
transition-duration: 0.2s;
background-color: white;
color: #006687;
border: 1px solid #008CBA;
}
.assistant .button.selected {
background-color: #008CBA;
color: white;
}
.assistant .button:hover {
cursor: pointer;
background-color: #008CBA;
color: white;
}
/* footer */
.assistant .footer {
background:white;
bottom: 0;
padding-bottom: 10px;
width: 100%;
}
.assistant .footer .textareaElement {
padding: 15px 10px 0 10px;
border-top: 1px solid #ccc;
min-height: 20px;
overflow-x: hidden;
overflow-y: auto;
font-size: 11pt;
font-family: Arial, Helvetica, sans-serif;
color: #333;
}
.assistant .footer .textareaElement:focus {
outline: none;
}
.assistant .footer [placeholder]:empty::before {
content: attr(placeholder);
color: #aaa;
}
.assistant .footer [placeholder]:empty:focus::before {
content: "";
}
In summary the stylesheet has three sections, Button and Chat Box.
The Chat Box has three subsections: a header with the bot icon, title and a close icon, the body area, and the footer. The footer has an editor for user input that is set to read-only
.
Add assistant images
The assistant button and chat dialog box uses several images to enhance the visualization.
Let’s create an img
directory and use curl to download the images from github:
mkdir -p public/img/assistant
curl -L https://raw.githubusercontent.com/infinyon/fluvio-demo-apps-node/master/bot-assistant/public/img/assistant/note.svg --output public/img/assistant/note.svg
curl -L https://raw.githubusercontent.com/infinyon/fluvio-demo-apps-node/master/bot-assistant/public/img/assistant/bot.svg --output public/img/assistant/bot.svg
curl -L https://raw.githubusercontent.com/infinyon/fluvio-demo-apps-node/master/bot-assistant/public/img/assistant/redo.svg --output public/img/assistant/redo.svg
curl -L https://raw.githubusercontent.com/infinyon/fluvio-demo-apps-node/master/bot-assistant/public/img/assistant/close.svg --output public/img/assistant/close.svg
The script download 4 svg
images: note, bot, redo and close.
Add assistant.js
script
The most important component of the frontend client is the assistant.js
script. The script creates DOM elements, handles the user interaction, and communicates with the server.
Add the assistant.js
file to the public/scripts
directory:
$ mkdir public/scripts && touch public/scripts/assistant.js
Paste the following code in assistant.js
file:
window.onload = () => {
var webSocket = null;
var sessionId = "";
// Load reconnecting socket to DOM
loadScript("scripts/reconnecting-socket.js");
// Create and attach Bot Assistant HTML elements
function loadAssistant() {
// Add assistant button
var note = createElement("img", { "src": `img/assistant/note.svg` }),
aButton = createElement("button", {}, note);
// Append assistant dialog
var status = createElement("div", { "id": "bot-status", "class": "status off" }),
overlay = createElement("div", { "class": "overlay" }, status),
bot = createElement("img", { "src": `img/assistant/bot.svg`, "class": "bot" }),
title = createElement("span", {}, "Bot Assistant"),
aDialogClose = createElement("img", { "src": `img/assistant/close.svg`, "class": "close" }),
aDialogReset = createElement("img", { "src": `img/assistant/redo.svg` }),
header = createElement("div", { "class": "header" }, [bot, overlay, title, aDialogClose, aDialogReset]),
msgBody = createElement("div", { "class": "msg-body" }),
innerBody = createElement("div", { "class": "inner-body" }, msgBody),
body = createElement("div", { "class": "body-wrapper" }, innerBody),
userMsg = createElement("div", {
"id": "user-msg",
"class": "textareaElement",
"placeholder": "Choose an option",
"contenteditable": "false"
}),
footer = createElement("div", { "class": "footer" }, userMsg),
aDialog = createElement("div", { "class": "chat" }, [header, body, footer]);
// Attach event listeners
aButton.addEventListener('click', onOpenDialog, false);
aDialogClose.addEventListener('click', onCloseDialog, false);
aDialogReset.addEventListener('click', onResetSession, false);
// Add to document
document.querySelector(".assistant").appendChild(aButton);
document.querySelector(".assistant").appendChild(aDialog);
}
// On open assistant dialog callback
function onOpenDialog() {
document.querySelector(".assistant button").style.display = "none";
document.querySelector(".assistant .chat").style.display = "block";
openWSConnection();
}
// On close assistant dialog callback
function onCloseDialog() {
document.querySelector(".assistant .chat").style.display = "none";
document.querySelector(".assistant button").style.display = "block";
}
// Clear the cookie and restart connection to create a new session.
function onResetSession() {
document.cookie = "Fluvio-Bot-Assistant=; expires=Thu, 01 Jan 1970 00:00:00 UTC; path=/";
closeWsConnection();
clearMessages();
openWSConnection();
}
// Open WebSocket connection
function openWSConnection() {
try {
if (webSocket != null) {
return; // already connected
}
logOutput("Connecting to: ws://localhost:9998/");
webSocket = new ReconnectingWebSocket("ws://localhost:9998/");
webSocket.onopen = function (openEvent) {
clearMessages();
document.getElementById("bot-status").setAttribute("class", "status on");
logOutput("Connected!");
};
webSocket.onclose = function (closeEvent) {
document.getElementById("bot-status").setAttribute("class", "status off");
logOutput("Disconnected!");
};
webSocket.onerror = function (errorEvent) {
logOutput(`Error: ${JSON.stringify(errorEvent)}`);
};
webSocket.onmessage = function (messageEvent) {
var serverMsg = messageEvent.data;
logOutput(`<== ${serverMsg}`);
onMessageFromServer(serverMsg);
};
} catch (exception) {
logOutput(`error: ${JSON.stringify(exception)}`);
}
}
// Close WS Connection
function closeWsConnection() {
if (webSocket.open) {
webSocket.close();
webSocket = null;
}
}
// On messages received from Websocket
function onMessageFromServer(value) {
const message = JSON.parse(value);
switch (message.kind) {
case "BotText":
showBotText(message.content);
break;
case "UserText":
showUserText(message.content);
break;
case "ChoiceRequest":
showBotText(message.question);
showChoiceButtons(message.groupId, message.choices);
break;
case "ChoiceResponse":
choicesToButton(message.groupId, message.content);
break;
case "StartChatSession":
sessionId = message.sessionId;
enableChatEditor(message.chatPrompt, message.chatText);
break;
case "EndChatSession":
disableChatEditor();
break;
};
}
// Send a message on WebSocket
function sendWsMessage(message) {
if (webSocket.readyState != WebSocket.OPEN) {
logOutput("WebSocket is not connected: " + webSocket.readyState);
return;
}
const msgObj = JSON.stringify(message)
logOutput(`==> ${msgObj}`);
webSocket.send(msgObj);
}
// Show text from bot assistant
function showBotText(content) {
if (content.length > 0) {
removeDuplicateAvatar("bot");
var img = createElement("img", { "src": `img/assistant/bot.svg` }),
avatar = createElement("div", { "class": "avatar", "id": "bot" }, img),
msg = createElement("div", { "class": "msg" }, content),
msgLeft = createElement("div", { "class": "msg-left" }, [msg, avatar]);
document.querySelector(".msg-body").appendChild(msgLeft);
scrollToBottom(".inner-body");
}
}
// Show text from user interactive session
function showUserText(content) {
if (content.length > 0) {
var msg = createElement("div", { "class": "msg" }, content),
msgLeft = createElement("div", { "class": "msg-right" }, msg);
document.querySelector(".msg-body").appendChild(msgLeft);
scrollToBottom(".inner-body");
}
}
// Show choices
function showChoiceButtons(groupId, choices) {
if (choices.length > 0) {
var buttons = [];
choices.forEach(choice => {
var button = createElement("div", { "class": "button" }, choice.content);
button.addEventListener('click', function () {
pickChoice(groupId, choice.itemId, choice.content);
}, false);
buttons.push(createElement("div", { "class": "btn" }, button));
});
var msgLeft = createElement("div", { "class": "msg-left", "id": groupId }, buttons);
document.querySelector(".msg-body").appendChild(msgLeft);
scrollToBottom(".inner-body");
}
}
// Callback invoked on user selection
function pickChoice(groupId, itemId, content) {
choicesToButton(groupId, content);
sendWsMessage({
kind: "ChoiceResponse",
groupId: groupId,
itemId: itemId,
content: content,
});
}
// Swap choices with a button representing the selection
function choicesToButton(groupId, content) {
document.getElementById(groupId).remove();
var button = createElement("div", { "class": "button selected" }, content),
btn = createElement("div", { "class": "btn" }, button),
msgRight = createElement("div", { "class": "msg-right" }, btn);
document.querySelector(".msg-body").appendChild(msgRight);
scrollToBottom(".inner-body");
}
// On multiple bot messages, ensure avatar is only displayed on last entry
function removeDuplicateAvatar(id) {
var messages = document.querySelector('.msg-body').children;
if (messages.length > 0) {
var lastMessage = messages[messages.length - 1];
if (lastMessage.getAttribute("class") === 'msg-left') {
if (lastMessage.lastChild.id == id) {
lastMessage.removeChild(lastMessage.lastChild);
}
}
}
}
// Enable interactive chat
function enableChatEditor(chatPrompt, chatText) {
if (chatText) {
showBotText(chatText);
}
var chatBox = document.getElementById("user-msg");
chatBox.setAttribute("contenteditable", true);
chatBox.setAttribute("placeholder", chatPrompt || "Type question here ...");
chatBox.addEventListener("keydown", onEditorKeys, false);
}
// Disable interactive chat
function disableChatEditor() {
var chatBox = document.getElementById("user-msg");
chatBox.addEventListener("keydown", {}, false);
chatBox.setAttribute("contenteditable", false);
chatBox.setAttribute("placeholder", "Choose an option");
}
// Scroll to last messages
function scrollToBottom(tag) {
var div = document.querySelector(tag);
div.scrollTop = div.scrollHeight - div.clientHeight;
}
// Clear messages in both editors
function clearMessages() {
var parent = document.querySelector('.msg-body');
while (parent.firstChild) {
parent.removeChild(parent.firstChild);
}
var debugOutput = document.getElementById("debugOutput");
if (debugOutput) {
debugOutput.value = "";
}
}
// Capture editor keys
function onEditorKeys(e) {
var chatBox = document.getElementById("user-msg");
if (e.code == 'Enter' && chatBox.textContent.length > 0) {
e.preventDefault();
const content = chatBox.textContent;
sendWsMessage({
kind: "UserText",
sessionId: sessionId,
content: content,
});
showUserText(content);
chatBox.innerHTML = '';
}
}
// Load external javascript file to DOM
function loadScript(fileName) {
var js_script = document.createElement('script');
js_script.type = "text/javascript";
js_script.src = fileName;
js_script.async = false;
document.getElementsByTagName('head')[0].appendChild(js_script);
}
// Log output in the "debugOutput" textarea (if available) and the console
function logOutput(value) {
var debugOutput = document.getElementById("debugOutput");
if (debugOutput) {
debugOutput.value += value + "\n\n";
debugOutput.scrollTop = debugOutput.scrollHeight;
}
console.log(value);
}
// Create element utility function
function createElement(element, attribute, inner) {
if (typeof (element) === "undefined") { return false; }
if (typeof (inner) === "undefined") { inner = ""; }
var el = document.createElement(element);
if (typeof (attribute) === 'object') {
for (var key in attribute) {
el.setAttribute(key, attribute[key]);
}
}
if (!Array.isArray(inner)) {
inner = [inner];
}
for (var k = 0; k < inner.length; k++) {
if (inner[k].tagName) {
el.appendChild(inner[k]);
} else {
el.innerHTML = inner[k];
}
}
return el;
}
// Call main function
loadAssistant();
};
The script is invoked during window.onload event. The functions are as follows:
- loadAssistant - creates DOM elements for the button and editor, and attaches event listeners.
- onOpenDialog - shows dialog and hides button.
- onCloseDialog - shows button and hides dialog.
- onResetSession - clears the session cookie and establishes a new connection (a new cookie gets assigned).
- openWsConnection - connects to websocket server and attaches event listeners.
- closeWsConnection - closes the connection
- onMessageFromServer - parses messages received from the server and publishes the result in the chat editor.
- sendWsMessage - sends a message to the server.
- loadScript - loads another script into the DOM.
- createElement - a utility function that makes it easy to create DOM elements.
The other APIs are manipulating various DOM elements, enable/disable chat editor and clear the messages.
The script also loads reconnecting-websocket.js
file which is discussed in the next section.
Load reconnecting-socket.js
file
The client is responsible for establishing and maintaining the connection to the server. While vanilla websocket offers the primitives to connect and disconnect to and from the server, it leaves it up to the user to implement reconnects.
Joe Walnes has written a great utility called reconnecting-socket.js that that that implements the reconnection logic under the hood.
Let’s copy the file in the public/scripts
directory:
$ curl -L https://raw.githubusercontent.com/infinyon/fluvio-demo-apps-node/master/bot-assistant/public/scripts/reconnecting-socket.js --output public/scripts/reconnecting-socket.js
Let’s review the public
directory hierarchy:
public
├── css
│ └── assistant.css
├── img
│ └── assistant
│ ├── bot.svg
│ ├── close.svg
│ ├── note.svg
│ └── redo.svg
├── index.html
└── scripts
├── assistant.js
└── reconnecting-socket.js
The frontend client is available for download in github.
Test Bot Assistant (v1)
Ensure the server is running, otherwise run the following command:
$ PARAMS=state-machines/bot-assistant.json npm run start:server
In the web browser, open http://localhost:9999/
, then click on “Bot Assistant` button. Click on the choices and see the bot assistant traverse through our state machine:
Congratulations, Bot Assistant
is up and running.
Step 4: Add data streaming and persistency
As seen in the previous section, Bot Assistant
works well, but it has some limitations. If the webserver restarts, all messages are lost and all user sessions are reset.
We’ll use Fluvio to remediate this issue. Fluvio is a high throughput, low latency data streaming platform that scales horizontally to handle persistency for a large number of concurrent messages.
We can deploy Fluvio between the connection proxy and the workflow controller, which also enables us to divide our monolith into two independent services (aka. microservices): Proxy Service
and Workflow Service
:
When services are bridged by Fluvio we gain additional benefits:
-
scale the proxy and workflow independently of each other.
-
handoff a conversation to a human operator. We can do that by adding an
operator service
independently that interacts directly with the client through the data stream. -
add-on services such as: analytics, machine learning, or connectors to other products.
We can also remove the circular reference hack we implemented between session-controller
and workflow-controller
.
To integrate Fluvio data streaming we’ll make the following changes:
- Add fluvio to
session-controller
- Add fluvio to
workflow-controller
- Add fluvio to
bot-server
- Add fluvio setup script
Add fluvio to session-controller
In the session-controller.ts
file we replace references to workflow-controller
with fluvio producers. In addition to that, the session controller can now use fluvio to look-up all transaction for a specific session.
Update src/proxy-service/session-controller.ts
with the following code changes:
import WS from "ws";
import { WsProxyOut } from "./proxy-out";
import { Message, SID, buildInitMessage, buildResponse, isRequest } from "../messages";
import { TopicProducer, PartitionConsumer, Offset } from "@fluvio/client";
type Messages = Array<Message>;
export class SessionController {
private sessionMessages: Map<SID, Messages>;
private proxyOut: WsProxyOut;
private fluvioProducer: TopicProducer;
private fluvioConsumer: PartitionConsumer;
constructor(
proxyOut: WsProxyOut,
fluvioProducer: TopicProducer,
fluvioConsumer: PartitionConsumer
) {
this.sessionMessages = new Map();
this.proxyOut = proxyOut;
this.fluvioProducer = fluvioProducer;
this.fluvioConsumer = fluvioConsumer;
}
public async init() {
(await this.fluvioConsumer.fetch(Offset.FromBeginning())).toRecords().forEach(msg => {
this.addMessageToSession(JSON.parse(msg));
});
this.show();
this.fluvioConsumer.stream(Offset.FromEnd(), (msg: string) => {
this.processBotMessage(msg);
});
}
public async sessionOpened(sid: SID, ws: WS) {
console.log(`start session - ${sid}`);
this.proxyOut.addSession(sid, ws);
const messages = this.sessionMessages.get(sid);
if (messages) {
this.sendMessagesToClient(messages);
} else {
const message = buildInitMessage(sid);
await this.fluvioProducer.sendRecord(JSON.stringify(message), 0);
}
}
public sessionClosed(sid: SID) {
console.log(`end session - ${sid}`);
this.proxyOut.closeSession(sid);
}
public async messageFromClient(sid: SID, clientMsg: string) {
console.log(`${sid} <== ${clientMsg}`);
const clientResponse = buildResponse(sid, JSON.parse(clientMsg));
await this.fluvioProducer.sendRecord(JSON.stringify(clientResponse), 0);
}
public sendMessagesToClient(messages: Messages) {
messages.forEach(message => {
this.sendMessageToClient(message);
});
}
public sendMessageToClient(message: Message) {
if (message.payload) {
const clientMessage = message.payload.message;
this.proxyOut.sendMessage(message.sid, JSON.stringify(clientMessage));
}
}
private addMessageToSession(message: Message) {
const sid = message.sid;
var messages = this.sessionMessages.get(sid);
if (!messages) {
messages = new Array();
}
messages.push(message);
this.sessionMessages.set(sid, messages);
}
public processBotMessage(botMessage: string) {
const message: Message = JSON.parse(botMessage);
this.addMessageToSession(message);
if (isRequest(message.payload)) {
this.sendMessageToClient(message);
}
}
private show() {
let table = new Map();
for (let [sid, value] of this.sessionMessages) {
table.set(sid, value.length);
}
console.table(table, ["SID", "Messages"]);
}
}
The code changes are as follows:
- constructor - saves fluvio
topicProducer
andtopicConsumer
in a local variable. - init:
- made
async
, - to fetch fluvio messages and cache them in
sessionMessages
array, - to register
processBotMessage
callback tofluvioConsumer
.
- made
- sessionOpened - made
async
to write a new message to the fluvio data stream. - messageFromClient - made
async
to write client messages to fluvio data stream (instead of calling workflow-controller).
That’s it, session-controller
can now be deployed as a stand-alone service without any dependencies on workflow service
.
Add fluvio to workflow-controller
Similarly, in the workflow-controller.ts
file we replace references to session-controller
with fluvio producers.
Update src/workflow-service/workflow-controller.ts
with the following code changes:
import {
SID,
Message,
ResponseMessage,
ChoiceResponse,
UserText,
buildRequest,
isRequest
} from "../messages";
import { StateMachine, State } from "./state-machine";
import { TopicProducer, PartitionConsumer, Offset } from "@fluvio/client";
export class WorkflowController {
private stateMachine: StateMachine;
private initState: string;
private fluvioProducer: TopicProducer;
private fluvioConsumer: PartitionConsumer;
constructor(
stateMachine: StateMachine,
fluvioProducer: TopicProducer,
fluvioConsumer: PartitionConsumer,
) {
this.stateMachine = stateMachine;
this.initState = stateMachine.keys().next().value;
this.fluvioProducer = fluvioProducer;
this.fluvioConsumer = fluvioConsumer;
}
public init() {
this.fluvioConsumer.stream(Offset.FromEnd(), async (sessionMsg: string) => {
await this.processProxyMessage(sessionMsg);
});
}
private async processNewConnection(sid: SID) {
const nextStates = this.processNext(this.initState);
await this.sendMessages(sid, nextStates);
}
private async processNextState(sid: SID, response: ResponseMessage) {
const state: string = this.getState(response);
const nextStates = this.processNext(state);
await this.sendMessages(sid, nextStates);
}
private getState(response: ResponseMessage) {
switch (response.kind) {
case "ChoiceResponse": {
return this.getChoiceResponseState(response);
}
case "UserText": {
return this.getUserTextState(response);
}
}
}
private processNext(startState: string) {
var nextStates: State[] = [];
var state = this.stateMachine.get(startState);
while (state) {
nextStates.push(state);
const next = state.next || "";
state = this.stateMachine.get(next);
if (next.length > 0 && !state) {
console.error(`Error: Cannot find next state: ${next}`);
}
}
return nextStates;
}
private getChoiceResponseState(choiceResponse: ChoiceResponse) {
for (let [key, state] of this.stateMachine.entries()) {
if (state.matchResponse &&
state.matchResponse.kind == choiceResponse.kind &&
state.matchResponse.groupId == choiceResponse.groupId &&
state.matchResponse.itemId == choiceResponse.itemId) {
return key;
}
}
console.error(`Error: cannot find choice ${JSON.stringify(choiceResponse)}`);
return this.initState;
}
private getUserTextState(userText: UserText) {
for (let [key, state] of this.stateMachine.entries()) {
if (state.matchResponse &&
state.matchResponse.kind == "UserText" &&
state.matchResponse.sessionId == userText.sessionId) {
return key;
}
}
console.error(`Error: cannot find user session ${JSON.stringify(userText)}`);
return this.initState;
}
private async sendMessages(sid: SID, nextStates: State[]) {
for (let idx = 0; idx < nextStates.length; idx++) {
const state = nextStates[idx];
if (state.sendRequest) {
const message = buildRequest(sid, state.sendRequest);
await this.fluvioProducer.sendRecord(JSON.stringify(message), 0);
}
}
}
public async processProxyMessage(clientMessage: string) {
const message: Message = JSON.parse(clientMessage);
if (!isRequest(message.payload)) {
const sid = message.sid;
if (message.payload) {
this.processNextState(
sid,
<ResponseMessage>message.payload.message
);
} else {
await this.processNewConnection(sid);
}
}
}
}
The code changes are as follows:
- constructor - saves fluvio
topicProducer
andtopicConsumer
in a local variable. - init - registers
processProxyMessage
to callbackfluvioConsumer
. - processNewConnection - made
async
to use sendMessages. - processNextState - made
async
to use sendMessages. - sendMessages - made
async
to write client message to the fluvio data stream (instead of calling session-controller). - processProxyMessage - made
async
toprocessNewConnection
.
The workflow controller is now a stand-alone service decoupled from session-controller
. The Fluvio middle tier allows these services to be moved to a different machine and be scaled-up independently. However, this improvement is beyond the scope of this blog.
Add fluvio to bot-server
The bot-server
is responsible for the initialization of the producer and consumer. After initialization, the producer and the consumer are passed to the session-controller
and workflow-controller
for processing.
Update src/bot-server.ts
with the following code changes:
import { Server } from "http";
import { WsProxyIn } from "./proxy-service/proxy-in";
import { WsProxyOut } from "./proxy-service/proxy-out";
import { StateMachine, loadStateMachine } from "./workflow-service/state-machine";
import { WorkflowController } from "./workflow-service/workflow-controller";
import { SessionController } from "./proxy-service/session-controller";
import Fluvio from '@fluvio/client';
const BOT_ASSIST_MESSAGES = "bot-assist-messages";
export const initBotAssistant = async (server: Server) => {
const fluvio = await Fluvio.connect();
await checkTopic(fluvio);
const fluvioProducer = await fluvio.topicProducer(BOT_ASSIST_MESSAGES);
const fluvioConsumer = await fluvio.partitionConsumer(BOT_ASSIST_MESSAGES, 0);
const wsProxyOut = new WsProxyOut();
const sessionController = new SessionController(wsProxyOut, fluvioProducer, fluvioConsumer);
const wsProxyIn = new WsProxyIn(sessionController);
let filePath = getFileName();
const stateMachine: StateMachine = loadStateMachine(filePath);
const workflowController = new WorkflowController(stateMachine, fluvioProducer, fluvioConsumer);
await sessionController.init();
workflowController.init();
wsProxyIn.init(server);
};
const getFileName = () => {
if (process.argv.length != 3) {
console.log("Usage: node bot-assistant.js <state-machine.json>");
process.exit(1);
}
return process.argv[2];
}
const checkTopic = async (fluvio: Fluvio) => {
const admin = await fluvio.admin();
if (!await admin.findTopic(BOT_ASSIST_MESSAGES)) {
console.error("Error: Fluvio topic not found! Run `npm run setup`");
process.exit(1);
}
}
The code changes are as follows:
- BOT_ASSIST_MESSAGES - defines bot assistant topic name.
- fluvio - connects to fluvio, checks topic existence, and provisions
fluvioProducer
andfluvioConsumer
. - SessionController - passes
fluvioProducer
andfluvioConsumer
to session controller. - WorkflowController - passes
fluvioProducer
andfluvioConsumer
to workflow controller.
Congratulations! You made all code changes for Bot Assistant
. Next, we’ll add couple of scripts to add/remove topic and we are ready for testing.
Add fluvio setup script
Fluvio needs a setup script to perform administrative operations such as add/remove topics. Let’s add a couple of files to perform these operations and link the files with npm.
Create a tools
directory and add setup.sh
and cleanup.sh
files:
mkdir tools
touch tools/setup.sh && chmod +x tools/setup.sh
touch tools/cleanup.sh && chmod +x tools/cleanup.sh
Paste the following in the setup.sh
file:
#!/bin/bash
fluvio topic create bot-assist-messages
Paste the following in the cleanup.sh
file:
#!/bin/bash
fluvio topic delete bot-assist-messages
Finally, update package.json
file to link the script files:
{
"name": "bot-assistant",
"version": "1.0.0",
"description": "",
"main": "bot-assistant.js",
"scripts": {
"start:server": "tsc-watch --onSuccess \"node ./dist/bot-assistant.js $PARAMS\"",
"setup": "sh ./tools/setup.sh",
"cleanup": "sh ./tools/cleanup.sh"
},
"keywords": [],
"author": "fluvio <[email protected]> (fluvio.io)",
"license": "Apache 2.0",
"dependencies": {
"@fluvio/client": "^0.6.0-beta.3",
"express": "^4.17.1",
"typescript": "^4.1.3",
"ws": "^7.4.2"
},
"devDependencies": {
"@types/express": "^4.17.9",
"@types/node": "^14.14.19",
"@types/ws": "^7.4.0",
"tsc-watch": "^4.2.9"
}
}
Let’s run setup, to create the new topic:
$ npm run setup
> [email protected] setup /projects/bot-assistant
> sh ./tools/setup.sh
topic "bot-assist-messages" created
Congratulations! You have completed changes in the for Bot Assistant
project.
Test Bot Assistant
Repeat the tests in Test Bot Assistant (v1) and refresh the screen. Note that the messages are refreshed as they have been persistent by Fluvio.
The persistence also survives server reboots. Go ahead and reboot the server and refresh the browser screen. Notice how the messages are preserved.
Furthermore, you can now use Fluvio or other programs with a Fluvio consumer interface to read session messages.
Let’s read the last 5 messages with Fluvio CLI:
fluvio consume bot-assist-messages --offset="-4"
{"sid":"fb7e2971d989070361c30d825bf6a853a406916e","payload":{"kind":"Response","message":{"kind":"ChoiceResponse","groupId":"others","itemId":"no","content":"No"}},"timestamp":"2021-01-05T17:58:46.511"}
{"sid":"fb7e2971d989070361c30d825bf6a853a406916e","payload":{"kind":"Request","message":{"kind":"BotText","content":"Great, thanks!"}},"timestamp":"2021-01-05T17:58:46.514"}
{"sid":"fb7e2971d989070361c30d825bf6a853a406916e","payload":{"kind":"Response","message":{"kind":"ResetSession"}},"timestamp":"2021-01-05T18:04:20.811"}
{"sid":"0fb1574b3b6d7c98c7089aa4a2c58a80894bbc6e","timestamp":"2021-01-05T18:04:20.815"}
{"sid":"0fb1574b3b6d7c98c7089aa4a2c58a80894bbc6e","payload":{"kind":"Request","message":{"kind":"BotText","content":"Hi, I'm Bot! Nice to meet you."}},"timestamp":"2021-01-05T18:04:20.817"}
When you use fluvio to transfer real-time messages between services, you gain much more than a transport layer. Benefits range from decoupling service to recovering from errors, from monitoring to troubleshooting and much more.
Conclusion
In this blog, we explored how to build a robot assistant that can interact with users in real-time. By using Fluvio, our application has a foundation that allows it to scale horizontally to meet the demands of a massive user audience. Since our backend services are stateless and decoupled, we can scale them independently to handle the particular load characteristics that we observe in production, preventing technical bottlenecks.
This project also just scratches the surface of what a real-time streaming application can do. By leveraging Fluvio’s persistent data streams, we can build improvements to our application by simply writing new microservices that interact with topic data. This gives us the power to develop new real-time features, as well as to analyze historical data for purposes such as Machine Learning personalization use-cases.
We hope you enjoyed the blog! If you have any questions or comments, or if you just want to come say hi, you can find us on our community Discord channel →