A SlackBot is a regular app that is designed to interact with the user via conversation. When user mention the SlackBot app from anywhere in the slack then it will access the APIs and do all of the magical things that Slack App can do.
Let's get started
1. Setup Slack App
Go to the Slack apps home page and click Create New App button.
Add scopes under the OAuth & Permissions menu
Installing App to Workspace
This will create Bot User, check it on App Home page
2. Configure Docker Image to run the Lambda function in the local environment
Pull the lambci/lambda:ruby2.7 docker image
# Open command prompt in your local machine and
# hit this command to pull the docker image of Lambda with runtime Ruby-2.7
$ docker pull lambci/lambda:ruby2.7
3. Create a Lambda function that responds to the Slack
Setup application directory to store the Lambda ruby function
# Creating directory
$ mkdir greeting-bot
# Move to directory
$ cd greeting-bot
Create file greeting_bot.rb under greeting-bot directory
# Creating ruby file under greeting-bot directory
$ touch greeting_bot.rb
Set environment variables
Find Bot User OAuth Access Token under the OAuth & Permissions menu from Slack App
$ export BOT_OAUTH_TOKEN='bot_oauth_token'
Find Verification Token under Basic Information menu from Slack App
$ export VERIFICATION_TOKEN='verification_token'
Update the file greeting_bot.rb
class GreetingBot
def self.main(event:, context:)
new.run(event)
end
def run(event)
case event['type']
when 'url_verification'
verify(event['token'], event['challenge'])
when 'event_callback'
if event['event']['type'] == 'app_mention'
process(event['event']['text'], event['event']['channel'])
end
end
end
private
# Verify request from the slack
def verify(token, challenge)
if token == ENV['VERIFICATION_TOKEN']
{ body: { challenge: challenge } }
else
{ body: 'Invalid token' }
end
end
def process(text, channel)
body = if text.strip.downcase.include?('hello')
'Hi, How are you?'
else
'How may I help you?'
end
send_message(body, channel)
end
# Slack API response to the mentioned channel
def send_message(text, channel)
uri = URI('https://slack.com/api/chat.postMessage')
params = {
token: ENV['BOT_OAUTH_TOKEN'],
text: text,
channel: channel
}
uri.query = URI.encode_www_form(params)
Net::HTTP.get_response(uri)
end
end
Run this Lambda function in the local environment and response back to the mentioned channel from request
# Make sure the mentioned channel(i.e greeting-channel) has already invited the greeting-bot that we have created in step 1
docker run \
-e BOT_OAUTH_TOKEN=$BOT_OAUTH_TOKEN \
--rm -v "$PWD":/var/task lambci/lambda:ruby2.7 \
greeting_bot.GreetingBot.main \
'{"type": "event_callback","event":{"type":"app_mention","text":"<@U>hello!","channel":"greeting-channel"}}'
Result in local
4. Upload Lambda function to the AWS Lambda
Make sure you have created a Lambda function in AWS.
# Creating zip file
zip function.zip greeting_bot.rb
# Upload zip file to the Lambda
$ aws lambda update-function-code --function-name slack-greeting-bot --zip-file fileb://function.zip
# add `--profile profile_name` if you got the AccessDeniedException
Set environment variables BOT_OAUTH_TOKEN and VERIFICATION_TOKEN in Lambda
After upload to Lambda, it looks like this
5. Create REST API end-point using API Gateway that calls the lambda function
I have created API Gateway
How to create REST API using API Gateway? read more
This blog will cover a method for combining unsupervised learning with supervised learning. I will show how to use an autoencoder and combine that with a neural network for a classification problem in Pytorch.
Data Processing:
The first step will be easy as the same dataloader can be used for both training the autoencoder and the neural network.
I will be using the cifar10 dataset as this is available to everyone and is easy to deal with.
#Basic Transforms
SIZE = (32,32) # Resize the image to this shape# Test and basic transform. This will reshape and then transform the raw image into a tensor for pytorch
basic = transforms.Compose([transforms.Resize(SIZE),
transforms.ToTensor()])
# Normalized transforms (0.4914, 0.4822, 0.4465), (0.247, 0.243, 0.261) retrived from here https://github.com/kuangliu/pytorch-cifar/issues/19
mean = (0.4914, 0.4822, 0.4465) # Mean
std = (0.247, 0.243, 0.261) # Standard deviation# This will transform the image to the Size and then normalize the image
norm_tran = transforms.Compose([transforms.Resize(SIZE),
transforms.ToTensor(),
transforms.Normalize(mean=mean, std=std)])
#Simple Data Augmentation# Data augmentations'''Randomly flip the images both virtically and horizontally this will cover and orientation for imagesRandomly rotate the image by 15. This will give images even more orientation than before but with limiting the black board issue of rotationsRandom Resie and crop this will resize the image and remove any excess to act like a zoom featureNormalize each image and make it a tensor'''
aug_tran = transforms.Compose([transforms.RandomHorizontalFlip(),
transforms.RandomRotation(15),
transforms.RandomResizedCrop(SIZE, scale=(0.08, 1.0), ratio=(0.75, 1.3333333333333333), interpolation=3),
transforms.ToTensor(),
transforms.Normalize(mean=mean, std=std)])
# Create Dataset
train_dataset = datasets.ImageFolder(TRAIN_DIR, transform=aug_tran)
test_dataset = datasets.ImageFolder(TEST_DIR, transform=norm_tran) #No augmentation for testing sets# Data loaders# Parameters for setting up data loaders
BATCH_SIZE = 32
NUM_WORKERS = 4
VALIDATION_SIZE = 0.15# Validatiaon split
num_train = len(train_dataset) # Number of training samples
indices = list(range(num_train)) # Create indices for each set
np.random.shuffle(indices) # Randomlly sample each of these by shuffling
split = int(np.floor(VALIDATION_SIZE * num_train)) # Create the split for validation
train_idx , val_idx = indices[split:], indices[:split] # Create the train and validation sets
train_sampler = SubsetRandomSampler(train_idx) # Subsample using pytroch
validation_sampler = SubsetRandomSampler(val_idx) # same here but for validation# Create the data loaders
train_loader = DataLoader(train_dataset,
batch_size=BATCH_SIZE,
sampler=train_sampler,
num_workers=NUM_WORKERS)
validation_loader = DataLoader(train_dataset,
batch_size=BATCH_SIZE,
sampler=validation_sampler,
num_workers=NUM_WORKERS)
test_loader = DataLoader(test_dataset,
batch_size=BATCH_SIZE,
shuffle=False,
num_workers=NUM_WORKERS)
An autoencoder is an unsupervised method of learning encodings of data which that can be processed efficiently. This is done through dimension reduction and ignoring noise in the dataset. There are two sides to an autoencoder. The encoder and the decoder. The encoder job is to create a useful encoding that will remove unwanted noise in the dataset while keeping the most import parts of the data. The decoder job is to take the encodings and reassemble it into the original input form. Below is the Autoencoder that we will be using as the feature extraction system in our combination model.
The approach that will be taken is to train the autoencoder separately instead of together with the NN. This will allow for us to check the result of the output of the encoder as well as the decoder and see how well it works.
# define the NN architectureclassConvAutoencoder(nn.Module):
def__init__(self):
super(ConvAutoencoder, self).__init__()
## encoder layers ### conv layer (depth from 1 --> 16), 3x3 kernels
self.conv1 = nn.Conv2d(3, 16, 3, padding=1)
# conv layer (depth from 16 --> 4), 3x3 kernels
self.conv2 = nn.Conv2d(16, 4, 3, padding=1)
# pooling layer to reduce x-y dims by two; kernel and stride of 2
self.pool = nn.MaxPool2d(2, 2)
## decoder layers #### a kernel of 2 and a stride of 2 will increase the spatial dims by 2
self.t_conv1 = nn.ConvTranspose2d(4, 16, 2, stride=2)
self.t_conv2 = nn.ConvTranspose2d(16, 3, 2, stride=2)
defforward(self, x):
## encode ### add hidden layers with relu activation function# and maxpooling after
x = torch.relu(self.conv1(x))
x = self.pool(x)
# add second hidden layer
x = torch.relu(self.conv2(x))
x = self.pool(x) # compressed representation## decode ### add transpose conv layers, with relu activation function
x = torch.relu(self.t_conv1(x))
# output layer (with sigmoid for scaling from 0 to 1)
x = torch.sigmoid(self.t_conv2(x))
return x
# Loss and optimizers
loss_function = nn.MSELoss()
optimizer = torch.optim.Adam(ae_model.parameters(), lr=0.001)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer=optimizer, mode='min', factor=0.1, patience=3, verbose=True) # Automatically reduce learning rate on plateau# number of epochs to train the model
n_epochs = 35
ae_model_filename = 'cifar_autoencoder.pt'
train_loss_min = np.Inf # track change in training loss
ae_train_loss_matrix = []
for epoch inrange(1, n_epochs+1):
# monitor training loss
train_loss = 0.0#################### train the model ####################for data in train_loader:
# _ stands in for labels, here# no need to flatten images
images, _ = data
if use_gpu:
images = images.cuda()
# clear the gradients of all optimized variables
optimizer.zero_grad()
# forward pass: compute predicted outputs by passing inputs to the model
outputs = ae_model(images)
# calculate the loss
loss = loss_function(outputs, images)
# backward pass: compute gradient of the loss with respect to model parameters
loss.backward()
# perform a single optimization step (parameter update)
optimizer.step()
# update running training loss
train_loss += loss.item()*images.size(0)
# print avg training statistics
train_loss = train_loss/len(train_loader)
scheduler.step(train_loss)
ae_train_loss_matrix.append([train_loss, epoch])
print('Epoch: {} \tTraining Loss: {:.6f}'.format(epoch, train_loss))
# save model if validation loss has decreasedif train_loss <= train_loss_min:
print('Validation loss decreased ({:.6f} --> {:.6f}). Saving model ...'.format(
train_loss_min,
train_loss))
torch.save(ae_model.state_dict(), ae_model_filename)
train_loss_min = train_loss
Looking at the above image the encoder works ok so we can use this with confidence.
Neural Network.
This will be the classification and supervised learning section of the model. The first this we need to do is freeze the autoencoder to ensure that its weights and bias do not get updated during training. Now we will define the NN using the autoencoder maxpooling layer as the output (the encoder part) and add on top of that Fully connected layers with a dropout layer as well to help normalize the output.
Here is the training code.
classMyModel(nn.Module):
def__init__(self):
super(MyModel, self).__init__()
image_modules = list(ae_model.children())[:-2] #get only the encoder layers
self.modelA = nn.Sequential(*image_modules)
# Shape of max pool = 4, 112, 112
self.fc1 = nn.Linear(4*16*16, 1024)
self.fc2 = nn.Linear(1024,512)
self.out = nn.Linear(512, 10)
self.drop = nn.Dropout(0.2)
defforward(self, x):
x = self.modelA(x)
x = x.view(x.size(0),4*16*16)
x = torch.relu(self.fc1(x))
x = self.drop(x)
x = torch.relu(self.fc2(x))
x = self.drop(x)
x = self.out(x)
return x
#Freze the autoencoder layers so they do not train. We did that already# Train only the linear layersfor child in model.children():
ifisinstance(child, nn.Linear):
print("Setting Layer {} to be trainable".format(child))
for param in child.parameters():
param.requires_grad = Trueelse:
for param in child.parameters():
param.requires_grad = False# Optimizer and Loss function
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr= 0.001)
# Decay LR by a factor of 0.1 every 7 epochs
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer=optimizer, mode='min', factor=0.1, patience=3, verbose=True)
model_filename = 'model_cifar10.pt'
n_epochs = 40
valid_loss_min = np.Inf # track change in validation loss
train_loss_matrix = []
val_loss_matrix = []
val_acc_matrix = []
for epoch inrange(1, n_epochs+1):
# keep track of training and validation loss
train_loss = 0.0
valid_loss = 0.0
train_correct = 0
train_total = 0
val_correct = 0
val_total = 0#################### train the model ####################
model.train()
for batch_idx, (data, target) inenumerate(train_loader):
# move tensors to GPU if CUDA is availableif use_gpu:
data, target = data.cuda(), target.cuda()
# clear the gradients of all optimized variables
optimizer.zero_grad()
# forward pass: compute predicted outputs by passing inputs to the model
output = model(data)
# calculate the batch loss
loss = criterion(output, target)
# backward pass: compute gradient of the loss with respect to model parameters
loss.backward()
# perform a single optimization step (parameter update)
optimizer.step()
# update training loss
train_loss += loss.item()*data.size(0)
###################### # validate the model #######################
model.eval()
val_acc = 0.0for batch_idx, (data, target) inenumerate(validation_loader):
# move tensors to GPU if CUDA is availableif use_gpu:
data, target = data.cuda(), target.cuda()
# forward pass: compute predicted outputs by passing inputs to the model
output = model(data)
# calculate the batch loss
loss = criterion(output, target)
# update average validation loss
valid_loss += loss.item()*data.size(0)
val_acc += calc_accuracy(output, target)
# calculate average losses
train_loss = train_loss/len(train_loader.sampler)
valid_loss = valid_loss/len(validation_loader.sampler)
#exp_lr_scheduler.step()
scheduler.step(valid_loss)
# Add losses and acc to plot latter
train_loss_matrix.append([train_loss, epoch])
val_loss_matrix.append([valid_loss, epoch])
val_acc_matrix.append([val_acc, epoch])
# print training/validation statistics print('Epoch: {} \tTraining Loss: {:.6f} \tValidation Loss: {:.6f}\tValidation Accuracy: {:.6f}'.format(
epoch, train_loss, valid_loss, val_acc))
# save model if validation loss has decreasedif valid_loss <= valid_loss_min:
print('Validation loss decreased ({:.6f} --> {:.6f}). Saving model ...'.format(
valid_loss_min,valid_loss))
torch.save(model.state_dict(), model_filename)
valid_loss_min = valid_loss
Training the model will give the final accuracy for each class.
Test Accuracy of airplane: 45% (231/504)
Test Accuracy of automobile: 61% (312/504)
Test Accuracy of bird: 18% (91/496)
Test Accuracy of cat: 11% (55/496)
Test Accuracy of deer: 27% (139/504)
Test Accuracy of dog: 35% (181/504)
Test Accuracy of frog: 63% (315/496)
Test Accuracy of horse: 49% (244/496)
Test Accuracy of ship: 59% (298/504)
Test Accuracy of truck: 46% (234/504)
Test Accuracy (Overall): 41% (2100/5008)
Conclusion:
Looking at the loss and validation accuracy the accuracy is moving up steadily (all be it a little jumpy) while the losses are both decreasing with the validation loss consistently less than training loss. This shows that the model is not overfitting or underfitting, so it is learning well going forward. The accuracy is a little low compared to simply supervised learning, but giving enough time the accuracy could get higher.
This blog will discuss the best practices for using AWS technology in developing and deploying Machine Learning models on AWS.
This blog will cover the moving parts of AWS services that can be used with Machine Learning models. It will also cover what they do and when and where you should use it.
AWS Data Stores for Machine Learning:
S3 Buckets:
S3 is the standard storage for AWS. This is basically your computer disk. It will store files in separate folders. There are 5 different types of S3 storage.
S3 General Purpose (GP) Standard which is the standard storage easy to access in real-time.
S3 Standard Infrequent Access (IA). This is ideal for items that you do not use very often. It is very similar to the S3 bucket but is intended for use of a file that is not needed often to keep them separate. It is just as fast and available as GP storage. IA is ideal for long term storage and backup files.
S3 One Zone Infrequent Access is for long term storage of infrequently accessed data. Unlike IA and GP, the data is only stored in one single location in Arizona, USA. If an availability zone is destroyed the data will be lost.
S3 Intelligent Tiering is a smart system that can automatically move data to the optimal storage solution based on access patterns without incurring performance impact or operation burden. It will automatically move data between frequently and infrequently access tiers which can save money and reduce management time and cost.
S3 Glacier comes in two flavors. S3 Glacier and S3 Glacier Deep Archive. The glacier is a very secure and durable and possibly less expensive than a local solution (buying your own storage system). This also increases the availability of the data geographically compared to local storage far easier than if you set up the access points yourself. Data can be transferred over the Glacier using the S3 lifecycle which if the preferred way than manually migrating the data over. A deep archive is the cheapest of all storage classes. This is ideal for data that is accessed once or twice a year only. This is ideal for data that needs to meet high regulations like FinTech, Healthcare, or Government data. Deep Archive is for storage of over 7 to 10 years to help meet regulation storage as in MiFID II regulations.
S3 Lifecycle:
S3 Lifecycles are very important to establish and use, otherwise you will have to manually manage your data which is nearly impossible with Bigdata and other complex datasets.
The use of rules to move data from one storage option to another storage option is important as this can not only save money, but keep your data well organized, and safe. For example: after an object is created move the data to IA from GP storage. After 6 months move the data from IA to Glacier for storage.
S3 Encryption:
Data encryption is very important for security especially if you are working with Personal Identifiable Information (PII). There are 4 types of encryption for S3.
SSE-S3- AWS handled Keys for encryptions.
SSE-KMS- AWS Key Manager to managed keys for encryptions + additional security + audit trail for KMS usage.
SSE-C – Self-managed encryption keys on AWS.
Client-side Encryption - Encrypt the data before it is sent to AWS.
S3 Access Control:
Managing who and what can access your data is extremely important. There are two main ways to manage access to an S3 Bucket.
User-Based:
IAM policies – Controls which API can be called as well as what a user can do.
Resource-Based:
Bucket Policy – Bucket wide rules as to what can be done and what data can be accessed and how.
Object Access Controls List – ACL, fine-grain access control for individual objects.
Bucket Access Control List – ACL for bucket wide access control. Less common compared to bucket policy and OACL.
Redshift:
Redshift primary use for analytics and not Machine Learning. Redshift is the main data warehousing and primally uses SQL Analytics for analysis of the data. Redshift is built for Online Analytical Processing (OLAP). Data can be moved for storage from S3 to Redshift.
RDS/Aurora:
Another data storage system. It is relational storage that uses SQL queries to find data. This storage service uses Online Transaction Processing (OLTP) and must be provisioned before use.
DynamoDB:
It is a NoSQL data storage solution that is serverless and can scale as needed so there is no need for provisionings like RDS or Redshift. You do have to provision the read and write capability for this though. This is a very good place to stored saved Machine Learning models.
Streams:
AWS Kinesis:
Kinesis is an idea for data streaming in real-time to increase real-time analytics and insight to help decision making and increase response timer pr ocess/replay alternative to Apache Kafka. It is ideal for use in logs, IoT, clickstreams, Bigdata, and real-time data applications. Kinesis goes hand in hand with Spark or other streaming processing frameworks.
There are 4 types of data streams for Kinesis:
Kinesis Streams – Low latency Streaming for consuming data at scale.
Kinesis Analytics – Real-time Analytics on streams using SQL.
Kinesis Firehose – Flows data to storage services like S3, Redshift, Elastic Search, Splunk, etc…
Kinesis Video Stream – For real-time video analysis.
Kinesis streams have shards which control the amount of input that can go through each stream. These shards must be provisioned beforehand which requires capacity planning and input knowledge.
Data retention is for up to 24 hours by default but can be extended up to 7 days after the configuration of each stream/shard. This gives the ability to reprocess/replay the data that is in the stream without reloading the data. Also, multiple application/analysis systems can use the same data from the same stream/shard. However, the data that is in the stream is immutable and cannot be removed manually. The data limit (ingestion) is up to 1mb per second of data per shard.
Kinesis Firehose:
This stream is a fully managed service that does not need configuration or an admin intervention/setup. Firehose is not real-time, but near real-time processing as the limit is 60 latency for a non-full batch. The primary purpose of Firehose is for data ingestion to S3, Redshift, Elastic Search, and Splunk. Firehose auto-scales to meet the needs of data transmission. Do some limited data conversions for S3 using AWS Lambda. This can convert CSV<->JSON<->Parquet. Firehose also allows for compression of data to zipping, GZip, or Snappy. This is very good for long term storage.
Kinesis Analytics:
This stream is for real-time analytics of data. Analytics has two types of input, Kinesis Stream and Firehose.
Use Cases:
Stream ELT: You can use analytics to transform data in a column on stream data.
Continuous Metric Generation: Live updates on data streams.
Responsive Analytics: Set up alerts in real-time.
Analytics streams are serverless and scale automatically to meet traffic flow. You will have to set up an IAM privilege to stream to certain sources and destinations like S3. You will also need to use Fink/SQL for all computations. You can also use Lambdas for preprocessing and schema discovery.
There are two types of built-in Machine learning algorithms in Analytics. These two are Random Cut Forest and Hotspot analysis. RCF uses SQL function for anomaly detection on numerical column data. This model gets updated as new data comes into the stream. This is a big benefit as it keeps the model accurate as your data changes over time. The hotspot algorithm is used for finding information on relatively dense regions in the data. This is very similar to KNN or other clustering algorithms. Unlike RCF this model is not dynamically trained and must be retrained for each dataset.
Kinesis Video Stream:
This stream is intended for video analysis and processing. The input or producers for this stream come from Security cameras, body cams, IoT cameras, and other video capturing devices. There is a restriction of 1 producer to 1 stream (1 to 1). Data can last from 1 hour (default storage) to 10 years after configuration. Video Streams have video playback capability as well. The consumers are limited compared to other streams. There are 3 types of consumers for this stream.
Build your own custom consumer (Pytorch models, Tensorflow models)
AWS SageMaker
Amazon Rekognition Video.
With these 3 approaches, you can apply Machine learning or Deep Learning models to video streams.
These are the types of streams that you can use in Kinesis.
Processors:
Glue Data Catalog:
Glue is often overlooked as its main purpose is to be a metadata repository for all your table data. Glue can generate the schema for your dataset automatically by looking over the data. Glue Crawlers help go through your datasets and build out the Data Catalog. Glue Crawlers can help in inferring schemas and partitions in the data. This works with JSON, CSV, and Parquet data. Glue can be used in all storage systems (S3, Redshift, RDS, and Glacier. You can set up a schedule to run Glue or run it manually to create/update the Glue catalog. The glue will have to be given IAM permissions to access each storage service.
Glue ETL (Extract Transform Load):
This is one of the most important aspects of Glue and one of the main uses of Glue is to preprocess and manage your data on AWS. Glue can transform, clean, and even enrich data before sending it for analysis. ETL code can be written in either Python or Scala. For Big Data, Spark/PySparks can be used. S3, JDBC, RDS, Redshift, Glue Data Catalog can be the targets for ETL.
AWS Pipeline:
AWS pipelines are exactly what it sounds like, its main goal is to aid in the movement of data from source to destination throughout all parts of AWS architecture. Typical destinations are S3, RDS, DynamoDB, Redshift, and EMR. It can also manage Task dependencies. It can also handle local or on-premises data and push that into AWS systems. The pipeline can orchestrate services and manages everything.
AWS Batch:
AWS Batch allows for batch jobs to run on AWS as a docker image. This allows for the dynamic provisioning of instances (EC2 or spot instances). Automatically adjust to get the optimal quantity and type based on the volume and requirements of the input/task. This is serverless so no managing of clusters is needed. The use of CloudWatch events can automatically run batch jobs as needed. Batch jobs can be managed by using AWS Step Functions.
Database Migration Services DBM:
This allows for quick and easy migrations from the local database to AWS. It is also resilient and self-healing which makes it a far more secure method for data transfer. The source database will also remain available for use during the migration. It supports both homogeneous and heterogeneous migrations of databases. DBM will need an EC2 instance started before the transfer can happen. EC2 is responsible for moving the source database to the target database.
Step Function:
Step functions are used to orchestrate steps and processes in workflows. SF has advanced error handling functions and sophisticated retry mechanisms. It is simple to audit workflow and history flows. Step functions can be put on hold for an arbitrary amount of time until a function/task is complete. But the max execution time of a step function is 1 year. A step function consists of steps to achieve the outcome that is desired. For example training a Machine Learning model would be like this:
Start -> Generate Training dataset -> Hyperparameter training (XGBoost) -> Extract Model Path -> Hyperparameter testing -> Extract Model Name -> Batch Transfer -> End
Step functions are idea for flow design and to ensure that one step happens after another step in a certain order.