You are currently viewing 如何制作一个实时图像处理的数据科学Web App

如何制作一个实时图像处理的数据科学Web App

在这篇文章中,我会展示如何快速制作一个Data-Driven的AI App。我会通过一个完整的例子,告诉你如何制作一个简单的Web App,并调用一个已训练好的模型。

1. 做数据科学为什么需要制作App?

大部分的数据科学家,或者机器学习工程师的主攻方向是算法,比如研究一些最新的算法并尝试使用在公司的业务数据上。或者分析业务数据,从中抽取有价值的内容。通常,我们会制作Dashboard,或者直接使用PPT做汇报工作。但是近几年,特别是2019年之后,很多企业已不满足于研究层面的数据科学工作,他们更多地要求一个能够实际使用的App帮助他们解决实际问题。另外我们在向客户汇报或做演示时,一个简单的App也是能够帮助我们更好地展示和说明解决方案的工具之一。

2. 用什么制作App

丰富的开源框架让我们有非常多的选择。

  • 做数据科学通常会用到Jupyter Notebook这个工具。笔者曾经尝试过将Jupyter Notebook直接封装成一个App验收给客户。但是代码极其不易管理,发生软件工程上的问题时对应非常麻烦。
  • 另一个选择是使用一些先进的数据科学平台。比如Dataiku,Databricks。它们能够帮助我们把建模直接过渡到应用上,但是这些平台都不是免费的。
  • 使用Web框架制作简单的Web App。这是免费并且最容易部署的方案了。比起Mobile App或Desktop App,Web App搭建简单,是做Prototype的较好选择。而我们并不需要对框架有深刻的了解就能快速上手。

在接下来的文章中,我们会使用Python的轻量级Web框架Flask

3. 本次着手的Web App简单概要

这次我们要制作的App的概要如下:

这是一个使用了Deep Learning技术实现异常检测的Web App,在实际的工厂生产线上,会有需要监控生产出来的部件是否能够正常运转的需求。这个App的作用是监控生产线上的电风扇的运转情况,并将异常检测结果(比如风扇异常停止运转等异常情况)实时显示在浏览器的画面上。

从技术实现角度来讲,我们会用到Kafka来处理从摄像头接收到的实时图像数据。我稍后会介绍Kafka的Producer和Consumer。简单地讲,Producer把实时图像数据(流数据)存放到Kafka中,Consumer去Kafka中读取数据。当我们在浏览器中显示App的画面时,Flask框架会去调用这个Consumer,实时显示异常检测结果。

4. 实现

硬件设备:

  • Mac x 1
  • 外接摄像头 x 1

软件环境:

  • 开发语言:Python3
  • Python库: Flask, Kafka, OpenCV, Keras

环境的配置在此不详述。使用conda或pip都能够非常简单地安装。

4.1. 关于Apache Kafka

Kafka在我们的App中是一个关键部分,我们详细地了解一下。当然对于我们这个App你只需要了解Producer和Consumer两个组件就足够了。

对于Apache Kafka,主要有两大应用,Data Hub和Streaming Processing。而Streaming Processing正是我们需要的。Apache Kafka是一种Publish-Subscribe型的系统。最初由LinkedIn提出,后来成为了开源项目。

Kafka的几个重要概念:

  1. Broker:发送给Kafka的数据存储在Broker中。许多Broker形成了一个Kafka Cluster。 各个Broker之间通过Apache ZooKeeper进行通信。
  2. Producer:Producer用于将数据发送到Broker。Producer有几种类型。 Kafka自身的Producer是用Java编写的,但是还有许多第三方Kafka库可用其他语言比如Python。
  3. Consumer:Consumer是向Producer请求数据的实体。
  4. Topic:Broker中真正存储数据的地方。Consumer订阅Topic并检索信息,Producer发布Topic。

光看Kafka的概念还是比较抽象,具体到我们的App中,我们的摄像头实时摄影,由Kafka的Producer实时保存到Broker的Topic中。而Consumer会去实时读取Topic中的未处理数据,调用我们训练完的模型做异常检测。

我们在程序中要实现的是Producer和Consumer,其他的Kafka框架都会为我们准备好了。

4.2 关于Flask的RESTful接口

我们之前提到这次我们制作的是一个Web App。所以我们需要简单了解一下RESTful接口。RESTful接口是一种用于Web Service的通信技术。RESTful使用HTTP协议来传送数据。通常我们在启动App后,通过在浏览器中输入接口对应的URL就能调用我们写在接口中的业务逻辑。在Flask中,在一个函数前加上@app.route这个Annotation,一个RESTful接口就定义完成了。我们会在就下来的代码中看到。

4.3. 代码

下面我们来看Producer和Consumer的实现。

Producer的代码:

import cv2
from kafka import KafkaProducer

topic = "distributed-video"
def publish_camera():
    # Start up producer
    producer = KafkaProducer(bootstrap_servers='localhost:9092')
    camera = cv2.VideoCapture(0)
    try:
        while(True):
            success, frame = camera.read()
            ret, buffer = cv2.imencode('.jpg', frame)
            producer.send(topic, buffer.tobytes())
            
            # Choppier stream, reduced load on processor
            time.sleep(0.2)

    except:
        print("\nExiting.")
        sys.exit(1)
    camera.release()

if __name__ == '__main__':
    print("publishing feed!")
    publish_camera() 

代码非常简洁,我们在localhost:9092初始化了一个Producer实例。然后我们使用OpenCV初始化了一个摄像头(camera)实例,实时读取图像数据。然后使用Producer实例发送给Kafka的Broker。Broker的Topic名为distributed-video。

那么实时向Kafka的Broker保存数据的Producer完成了,接下来要实现Consumer从Broker中读取数据,进行异常检测处理。

以下是Consumer的代码:

from flask import Flask, Response
from kafka import KafkaConsumer
import numpy as np
import cv2

# Fire up the Kafka Consumer
topic = "distributed-video"

consumer = KafkaConsumer(
    topic,
    bootstrap_servers=['localhost:9092'])

# Set the consumer in a Flask App
app = Flask(__name__)

@app.route('/video', methods=['GET'])
def video():
    return Response(
        get_video_stream(),
        mimetype='multipart/x-mixed-replace; boundary=frame') 
我们刚才在Producer中定义了 名为distributed-video的Topic。现在我们从Topic中读取数据。数据保存在consumer变量中。 然后我们使用@app.route Annotation定义了一个Flask的RESTful接口。接口名为video。我们可以在启动这个Flask App后再浏览器中输入http://localhost:5000/video调用这个接口。 我们的Consumer的业务逻辑都写在get_video_stream()函数中,以被接口调用。

get_video_stream()函数的实现如下:

from keras.models import model_from_json
from skimage.measure import compare_ssim

def get_video_stream():

    ssim_score_list = []
    # load keras model
    model = model_from_json(open('./model/vae_anomaly_detection_model.json', 'r').read())
    model.load_weights('./model/vae_anomaly_detection_weight.h5')
    model._make_predict_function()
    
    msg_buff = []
    for msg in consumer:
        input_img = np.frombuffer(msg.value, dtype=np.uint8)

        # decode msg
        input_img = cv2.imdecode(input_img, -1)

        # prereprocessing
        input_img = prereprocess(input_img)
        
        # predict(generate) output
        decoded_img_arr = model.predict(input_img, batch_size=1)
        decoded_img = decoded_img_arr[0]

        # compare decode_img with input_img
        ssim_score = compare_ssim(input_img, decoded_img, multichannel=True)

        ssim_score_list.append(ssim_score)

        # concat ssim and the original image
        im_v = make_score_graph(ssim_score_list)
        ret, buffer = cv2.imencode('.jpg', im_v)

        yield (b'--frame\r\n'
               b'Content-Type: image/jpg\r\n\r\n' + buffer.tobytes() + b'\r\n\r\n') 

Consumer中会对实时接受到的图像进行基本的预处理,比如物体检出,领域切割,调整颜色等。而这些预处理不是这次要讲的内容,我们会在另一篇关于VAE模型训练的文章中详细介绍,这里我们已经用Keras训练好了模型,并导出为h5格式的模型文件。当然如果你使用其他框架比如Pytorch等,你也可以同样把模型文件放在这里,只是在Consumer中加载模型的代码要换成Pytorch版的。

最后,我们把模型的推理(inference)结果显示出来。我们的模型是一种VAE生成模型。也就是说,模型会根据接收的图像生成一张新的图像,而我们通过比较接收的图像生成的图像的相似度,就能够形成一种对模型的评价体系。我们的模型只通过运转中电风扇的图像进行学习的,因此,对于停止运转的图像由于对模型的过拟合会生成与原图非常不相似的图像。所以通过这种相似度的评价体系能够检测出电风扇的异常运转情况。

下面是使用matplotlib实时地将异常检测结果,也就是把计算出来的图像相似度显示出来。

def make_score_graph(ssim_score_list):

        if len(ssim_score_list) > 50:
            ssim_score_list.pop(0)
        else:
            pass

        # make ssim score graph
        # make an agg figure
        fig, ax = plt.subplots()
        ax.plot(ssim_score_index[:len(ssim_score_list)], ssim_score_list)
        #ax.set_title('a simple figure')
        ax.set_ylim(0.1, 0.9)
        ax.set_xticks([])
        ax.set_ylabel("SSIM Score")
        fig.canvas.draw()
        # grab the pixel buffer and dump it into a numpy array
        ssim_graph = np.fromstring(fig.canvas.tostring_rgb(), dtype='uint8')
        ssim_graph = ssim_graph.reshape(fig.canvas.get_width_height()[::-1] + (3,))
        print(ssim_graph.shape)

        # resize original image and ssim_graph to the same height
        ssim_graph = cv2.resize(ssim_graph, (400, 400))
        src_img = cv2.resize(src_img, (400, 400))

        # concat ssim and the original image
        im_v = np.concatenate((src_img, ssim_graph), axis=1)
        
        return im_v 

在实际操作中,如果我们将电风扇突然关闭,在画面上显示出来的相似度的数值会随之突然下降,因此我们就得知运转异常出现了。

5. 启动并运行我们的App

我们需要先启动ZooKeeper,这样我们才能将数据保存到Broker中。

$ brew install kafka
$ brew services start zookeeper
$ brew services start kafka

然后启动Flask(启动时不会调用Consumer的业务逻辑)

$ python consumer.py
启动结果如下所示。在浏览器中访问 http://0.0.0.0:5000/video 就能看到App的画面了。
* Serving Flask app "videoConsumer" (lazy loading)
* Environment: production
WARNING: This is a development server. Do not use it in a production deployment.
Use a production WSGI server instead.
* Debug mode: on
* Running on http://0.0.0.0:5000/ (Press CTRL+C to quit)
* Restarting with stat

最后启动Producer开始从摄像头收集数据。启动后,浏览器画面上会有实时地异常检测结果显示出来。

$ python producer.py

6. 总结

至此,我们看到了一个完整而简单的图像流处理App,我们在App中对图像进行异常检测,并将结果显示在了浏览器中。