Elasticsearch:使用 Node.js 将实时数据提取到 Elasticsearch 中(一)

语言: CN / TW / HK

Elasticsearch 是一个强大的 RESTful 搜索和分析引擎,能够处理越来越多的用例。 它将集中存储你的数据,以实现闪电般的快速搜索、微调相关性以及可轻松扩展的强大分析。 关于如何使用 Elastic Stack(又名 ELK 堆栈)将数据摄取到 Elasticsearch 的资源有很多。在今天的文章中,我将详细介绍如何使用 Node.js 从零开始来把地震的实时数据采集到 Elasticsearch 中。

如果你选择的编程语言是 JavaScript,并且你需要使用 RESTful API 方法从第三方应用程序获取数据,那么使用 Node.js 获取数据是一个不错的选择。 你还可以托管服务器,让它持续实时摄取数据。 该演示将向您展示如何设置一个 Node.js + Express.js 服务器,该服务器实时将数据提取到 Elasticsearch 中,然后可以对这些数据进行分析并以有意义的方式采取行动。

对于此演示,我们将使用 USGS 实时发布的公开可用的全球地震数据。

准备工作

Elasticsearch 及 Kibana

如果你还没有安装好自己的 Elasticsearch 及 Kibana 的话,那么请参考我之前的文章:

在今天的展示中,我将使用 Elastic Stack 8.x 来进行展示。在安装的时候,请参考相应的 Elastic Stack 8.x 的文章来进行安装。

Node.js

你需要安装好自己的 Node.js 来完成下面的练习。你可以参考 Node.js 链接来进行相应的安装。

实时数据

根据 USGS 网上所提供的信息,我们可以在地址  http://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_hour.geojson 找到相应的地震信息数据。我们可以通过如下的命令来进行查看:

curl http://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_hour.geojson | jq .

如上所示,它是一个以 JSON 格式给出来的数据信息。这个数据会实时发生变化,我们可以通过反复访问这个接口来得到所需要的地震信息。在这里,我们需要注意的是:

  • "time": 1672471359610,这是一个时间信息,可以作为我们的 timestamp 来对它进行分析。我们将最终把它存入到 @timestamp 里。
  • "id": "nc73827101",这是一个地震特有的 id,我们将以这个 id 成为数据的 id。

  • "geometry",这个是地震发生的地理位置。我们可以需要在 Elasticsearch 中为它定一下为 geo_point 数据类型。我们将把它变为:

虽然数据有很多,但是我们最终需要的数据格式是这样的:

```

  1. {
  2. "mag": 1.13,
  3. "place": "11km ENE of Coachella, CA",
  4. "@timestamp": 2022-05-02T20:07:53.266Z,
  5. "url": "http://earthquake.usgs.gov/earthquakes/eventpage/ci40240408",
  6. "sig": 20,
  7. "type": "earthquake",
  8. "depth": 2.09,
  9. "coordinates": {
  10. "lat": 33.7276667,
  11. "lon": -116.0736667
  12. }
  13. }

```

在接下来的步骤里,我来详细介绍如何达到我们最终的目的。

创建 Node.js 应用

创建最基本的 express 应用

我们将从 0 开始一步一步地创建 Node.js 应用。首先我们在自己的电脑中创建一个目录:

mkdir earthquake_app

```

  1. $ pwd
  2. /Users/liuxg/demos
  3. $ mkdir earthquake_app
  4. $ cd earthquake_app/

```

我们进入到该目录中,并打入如下的命令:

npm init -y

``

  1. $ npm init -y
  2. Wrote to /Users/liuxg/demos/earthquake_app/package.json:

  3. {

  4. "name": "earthquake_app",
  5. "version": "1.0.0",
  6. "description": "",
  7. "main": "index.js",
  8. "scripts": {
  9. "test": "echo \"Error: no test specified\" && exit 1"
  10. },
  11. "keywords": [],
  12. "author": "",
  13. "license": "ISC"
  14. }

  15. $ ls

  16. package.json

` ```

上述命令生成一个叫做 package.json 的文件。在以后安装的 packages,它也会自动添加到这个文件中。默认的设置显然不是我们想要的。我们需要对它做一些修改。

在接下来的代码中,我们将会使用如下的一些 packages:

  • @elastic/elasticsearch
  • axios
  • config
  • cors
  • express
  • log-timestamp
  • nodemon

我们可以通过如下的命令来进行安装:

npm i @elastic/elasticsearch axios config cors express log-timestamp nodemon

```

  1. $ npm i @elastic/elasticsearch axios config cors express log-timestamp nodemon
  2. npm notice Beginning October 4, 2021, all connections to the npm registry - including for package installation - must use TLS 1.2 or higher. You are currently using plaintext http to connect. Please visit the GitHub blog for more information: http://github.blog/2021-08-23-npm-registry-deprecating-tls-1-0-tls-1-1/
  3. npm notice Beginning October 4, 2021, all connections to the npm registry - including for package installation - must use TLS 1.2 or higher. You are currently using plaintext http to connect. Please visit the GitHub blog for more information: http://github.blog/2021-08-23-npm-registry-deprecating-tls-1-0-tls-1-1/

  4. added 118 packages in 17s

  5. 11 packages are looking for funding

  6. run npm fund for details

```

由于我之前已经安装过,所以我上面显示的信息和你的可能会有所不同。我们再次来查看 package.json 文件:

``

  1. $ pwd
  2. /Users/liuxg/demos/earthquake_app
  3. $ ls
  4. node_modules package-lock.json package.json
  5. $ cat package.json
  6. {
  7. "name": "earthquake_app",
  8. "version": "1.0.0",
  9. "description": "",
  10. "main": "index.js",
  11. "scripts": {
  12. "test": "echo \"Error: no test specified\" && exit 1"
  13. },
  14. "keywords": [],
  15. "author": "",
  16. "license": "ISC",
  17. "dependencies": {
  18. "@elastic/elasticsearch": "^8.5.0",
  19. "axios": "^1.2.2",
  20. "config": "^3.3.8",
  21. "cors": "^2.8.5",
  22. "express": "^4.18.2",
  23. "log-timestamp": "^0.3.0",
  24. "nodemon": "^2.0.20"
  25. }
  26. }

` ```

很显然,我们最新安装的 packages 已经自动添加到 package.json 文件中了。

我们接下来创建一个叫做 server 的子目录,并在它里面创建一个叫做 server.js 的文件:

```

  1. $ pwd
  2. /Users/liuxg/demos/earthquake_app
  3. $ mkdir server
  4. $ touch server/server.js

```

在上面,我们创建了一个叫做 server.js 的文件。这个将来就是我们需要运行的 server 脚本。为了能够让我们的 package.json 文件的配置能让 npm 进行运行,我们需要对它进行修改。

``

  1. $ pwd
  2. /Users/liuxg/demos/earthquake_app
  3. $ cat package.json
  4. {
  5. "name": "earthquake_app",
  6. "version": "1.0.0",
  7. "description": "",
  8. "main": "sever.js",
  9. "scripts": {
  10. "start": "nodemon server/server.js",
  11. "test": "echo \"Error: no test specified\" && exit 1"
  12. },
  13. "keywords": [],
  14. "author": "",
  15. "license": "ISC",
  16. "dependencies": {
  17. "@elastic/elasticsearch": "^8.5.0",
  18. "axios": "^1.2.2",
  19. "config": "^3.3.8",
  20. "cors": "^2.8.5",
  21. "express": "^4.18.2",
  22. "log-timestamp": "^0.3.0",
  23. "nodemon": "^2.0.20"
  24. }
  25. }

` ```

很多人可能会奇怪,为啥使用 nodemon 来启动脚本。它的好处是当我们修改好 server.js 里的脚本,那么它会自动重新启动服务器的运行,而不需要我们每次都需要打入如下的命令:

npm start

接下为了验证我们的 express 应用是否能成功地运行,我们修改 server.js 为如下的代码:

server/server.js

```

  1. onst express = require('express');

  2. const app = express();

  3. const port = 5001;

  4. app.get('/', (req, res) => {

  5. res.send('Hello World!')
  6. })

  7. app.listen(port, () => console.log(Server listening at http://localhost:${port}));

```

我们接下来使用如下的命令来进行启动:

npm start

```

  1. $ pwd
  2. /Users/liuxg/demos/earthquake_app
  3. $ npm start

  4. [email protected] start

  5. nodemon server/server.js

  6. [nodemon] 2.0.20

  7. [nodemon] to restart at any time, enter rs
  8. [nodemon] watching path(s): .
  9. [nodemon] watching extensions: js,mjs,json
  10. [nodemon] starting node server/server.js
  11. Server listening at http://localhost:5001

```

我们可以看到服务器已经成功地运行起来了,并且它运行于 5001 端口上。我们可以通过浏览器来进行访问它的网址:

上面显示我们的服务器运行正常。

安全地连接 Node.js 服务器到 Elasticsearch

接下来,我们需要创建代码来安全地连接 Node.js 服务到我们本地部署的 Elasticsearch 中。我们可以参考之前的文章 “Elasticsearch:使用最新的 Nodejs client 8.x 来创建索引并搜索”。我们可以在项目的更目录下创建如下的两个子目录:

```

  1. mkdir config
  2. mkdir -p server/elasticsearch

```

```

  1. $ pwd
  2. /Users/liuxg/demos/earthquake_app
  3. $ mkdir config
  4. $ mkdir -p server/elasticsearch
  5. $ ls -d */
  6. config/ node_modules/ server/

```

在 config 子目录下,我们创建如下的一个叫做 default.json 的文件。这个是用来配置如何连接到 Elasticsearch 的:

config/default.json

```

  1. {
  2. "elastic": {
  3. "elasticsearch_endpoint": "http://localhost:9200",
  4. "username": "elastic",
  5. "password": "-pK6Yth+mU8O-f+Q*F3i",
  6. "apiKey": "eVBKOFhJVUJUN1gwSDQyLU5halY6R1BVRjNOUmpRYUtkTmpXTUZHdWZVUQ==",
  7. "certificate": "/Users/liuxg/elastic/elasticsearch-8.5.3/config/certs/http_ca.crt",
  8. "caFingerprint": "E3D36275D9FA80CF96F74E6537FC74E7952511A75E01605EBCFB8FC9F08F598C"
  9. }
  10. }

```

我们先不要着急来了解这些配置参数。有些我们可能并不一定要用到。这些设置针对我们每个人的 Elasticsearch 的安装的不同而不同。在上面的参数解释如下:

  • elasticsearch_endpoint:这个是 Elasticsearch 的访问地址
  • username:这个是访问 Elasticsearch 的用户名,你可以不选用超级用户 elastic,而且在生产环境中,也不是推荐的方法
  • password:这个是上面 username 账号的密码
  • apiKey:这个是访问 Elasticsearch 所需要的 apiKey。你可以参考  “Elasticsearch:使用最新的 Nodejs client 8.x 来创建索引并搜索” 来了解如何进行生产。在下面的代码中,我们也可以使用 code 来进行生成
  • certificate:这个是证书的位置。每个 Elasticsearch 集群都会有一个生成的证书位置。我们需要填入这个位置信息
  • caFingerprint:这个是证书的 fingerprint 信息。我们可以采用 fingerprint 来进行连接。在本演示中,我将不使用这种方式。更多信息,请参考 Connecting | Elasticsearch JavaScript Client [master] | Elastic

我们在 elasticsearch 目录下创建一个叫做 client.js 的文件:

server/elasticsearch/client.js

``1. const { Client } = require('@elastic/elasticsearch'); 2. const config = require('config'); 3. const fs = require('fs')

  1. const elasticConfig = config.get('elastic');

  2. // const client = new Client ( {

  3. // node: elasticConfig.elasticsearch_endpoint,
  4. // auth: {
  5. // apiKey: elasticConfig.apiKey
  6. // },
  7. // tls: {
  8. // ca: fs.readFileSync(elasticConfig.certificate),
  9. // rejectUnauthorized: true
  10. // }
  11. // });

  12. const client = new Client ( {

  13. node: elasticConfig.elasticsearch_endpoint,
  14. auth: {
  15. username: elasticConfig.username,
  16. password: elasticConfig.password
  17. },
  18. tls: {
  19. ca: fs.readFileSync(elasticConfig.certificate),
  20. rejectUnauthorized: true
  21. }
  22. });

  23. client.ping()

  24. .then(response => console.log("You are connected to Elasticsearch!"))
  25. .catch(error => console.error("Elasticsearch is not connected."))

  26. module.exports = client;` ```

在上面,我使用了两种方法来连接到 Elasticsearch。一种是通过 username/password 的方式来进行连接:

```

  1. const client = new Client ( {
  2. node: elasticConfig.elasticsearch_endpoint,
  3. auth: {
  4. username: elasticConfig.username,
  5. password: elasticConfig.password
  6. },
  7. tls: {
  8. ca: fs.readFileSync(elasticConfig.certificate),
  9. rejectUnauthorized: true
  10. }
  11. });

```

而另外一种就是被注释掉的那个方法:

```

  1. const client = new Client ( {
  2. node: elasticConfig.elasticsearch_endpoint,
  3. auth: {
  4. apiKey: elasticConfig.apiKey
  5. },
  6. tls: {
  7. ca: fs.readFileSync(elasticConfig.certificate),
  8. rejectUnauthorized: true
  9. }
  10. });

```

这个也是被推荐的方法。在实际的使用中,我们更推荐使用 API key 来进行连接。

我们首先来使用 username/password 的方式来进行连接。我们需要修改我们的 server.js 来进行验证:

server/server.js

```

  1. const express = require('express');
  2. const client = require('./elasticsearch/client');

  3. const app = express();

  4. const port = 5001;

  5. app.get('/', (req, res) => {

  6. res.send('Hello World!')
  7. })

  8. app.listen(port, () => console.log(Server listening at http://localhost:${port}));

```

我们重新运行服务器。我们可以看到如下的输出:

```

  1. $ pwd
  2. /Users/liuxg/demos/earthquake_app
  3. $ npm start

  4. [email protected] start

  5. nodemon server/server.js

  6. [nodemon] 2.0.20

  7. [nodemon] to restart at any time, enter rs
  8. [nodemon] watching path(s): .
  9. [nodemon] watching extensions: js,mjs,json
  10. [nodemon] starting node server/server.js
  11. Server listening at http://localhost:5001
  12. You are connected to Elasticsearch!

```

上面的输出表明我们已经能够成功地连接到 Elasticsearch 了。

使用代码获取 API key

我们接下来可以通过代码来获得 API key,尽管我们可以通过其它的方法来获得。请详细阅读 “Elasticsearch:创建 API key 接口访问 Elasticsearch”。在这里,我们可以使用 Node.js 代码来动态地生成一个 API key。我们在 server 目录下创建如下的一个文件:

sever/create-api-key.js

`

  1. const client = require('./elasticsearch/client');

  2. async function generateApiKeys(opts) {

  3. const body = await client.security.createApiKey({
  4. body: {
  5. name: 'earthquake_app',
  6. role_descriptors: {
  7. earthquakes_example_writer: {
  8. cluster: ['monitor'],
  9. index: [
  10. {
  11. names: ['earthquakes'],
  12. privileges: ['create_index', 'write', 'read', 'manage'],
  13. },
  14. ],
  15. },
  16. },
  17. },
  18. });
  19. return Buffer.from(${body.id}:${body.api_key}).toString('base64');
  20. }

  21. generateApiKeys()

  22. .then(console.log)
  23. .catch((err) => {
  24. console.error(err);
  25. process.exit(1);
  26. });

`` ```

我们使用如下的命令来运行这个 Node.js 的代码:

```

  1. $ pwd
  2. /Users/liuxg/demos/earthquake_app
  3. $ ls server/create-api-key.js
  4. server/create-api-key.js
  5. $ node server/create-api-key.js
  6. You are connected to Elasticsearch!
  7. emZJSGFZVUJUN1gwSDQyLWRLaS06LVpHaXR1bm5RQnEybE4zOWoyd0g5Zw==

```

我们可以把上面命令生成的 API key 写入到之前的 default.json 文件中。这样我们也可以通过 API key 的方式来访问 Elasticsearch 了,如果我们需要的话。这样 client.js 实际上可以写成:

server/elasticsearch/client.js

``1. const { Client } = require('@elastic/elasticsearch'); 2. const config = require('config'); 3. const fs = require('fs')

  1. const elasticConfig = config.get('elastic');

  2. const client = new Client ( {

  3. node: elasticConfig.elasticsearch_endpoint,
  4. auth: {
  5. apiKey: elasticConfig.apiKey
  6. },
  7. tls: {
  8. ca: fs.readFileSync(elasticConfig.certificate),
  9. rejectUnauthorized: true
  10. }
  11. });

  12. // const client = new Client ( {

  13. // node: elasticConfig.elasticsearch_endpoint,
  14. // auth: {
  15. // username: elasticConfig.username,
  16. // password: elasticConfig.password
  17. // },
  18. // tls: {
  19. // ca: fs.readFileSync(elasticConfig.certificate),
  20. // rejectUnauthorized: true
  21. // }
  22. // });

  23. client.ping()

  24. .then(response => console.log("You are connected to Elasticsearch!"))
  25. .catch(error => console.error("Elasticsearch is not connected."))

  26. module.exports = client;` ```

我们重新运行 server.js,我们可以看到如下的输出:

```

  1. $ pwd
  2. /Users/liuxg/demos/earthquake_app
  3. $ npm start

  4. [email protected] start

  5. nodemon server/server.js

  6. [nodemon] 2.0.20

  7. [nodemon] to restart at any time, enter rs
  8. [nodemon] watching path(s): .
  9. [nodemon] watching extensions: js,mjs,json
  10. [nodemon] starting node server/server.js
  11. Server listening at http://localhost:5001
  12. You are connected to Elasticsearch!

```

很显然,我们的 API key 方式是成功的。使用 API key 的好处是我们不必要暴露用户的密码在代码中,而且,我们甚至可以为这个 API key 来设置有效时间及权限。可以授予最小所需要的权限,以确保安全。

设置 RESTful API 调用以从源检索数据

现在我们的服务器正在运行并且 Elasticsearch 已连接,我们需要测试对 USGS 的 API 调用以接收初始数据。 在项目的根目录下,创建一个名为 routes 的文件夹和一个名为 api 的子文件夹。 在 api 文件夹中,创建一个名为 data.js 的文件并添加以下代码:

```

  1. $ pwd
  2. /Users/liuxg/demos/earthquake_app
  3. $ mkdir -p server/routes/api

```

我在 routes/api 目录下创建一个如下的 data.js 文件:

server/routes/api/data.js

`

  1. require('log-timestamp');
  2. const express = require('express');
  3. const router = express.Router();
  4. const axios = require('axios')
  5. const client = require('../../elasticsearch/client');
  6. const URL = http://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_hour.geojson;

  7. router.get('/earthquakes', async function (req, res) {

  8. console.log('Loading Application...');

  9. //======= Check that Elasticsearch is up and running =======\

  10. pingElasticsearch = async () => {
  11. await client.ping(
  12. function(error,res) {
  13. if (error) {
  14. console.error('elasticsearch cluster is down!');
  15. } else {
  16. console.log('Elasticsearch Ready');
  17. }
  18. }
  19. );
  20. }

  21. // ====== Get Data From USGS and then index into Elasticsearch

  22. indexAllDocs = async () => {
  23. try {
  24. console.log('Getting Data From Host')

  25. const EARTHQUAKES = await axios.get(${URL},{

  26. headers: {
  27. 'Content-Type': [
  28. 'application/json',
  29. 'charset=utf-8'
  30. ]
  31. }
  32. });

  33. console.log('Data Received!')

  34. results = EARTHQUAKES.data.features

  35. console.log('Indexing Data...')

  36. console.log(results)

  37. res.json(results)

  38. if (EARTHQUAKES.data.length) {

  39. indexAllDocs();
  40. } else {
  41. console.log('All Data Has Been Indexed!');
  42. };
  43. } catch (err) {
  44. console.log(err)
  45. };

  46. console.log('Preparing For The Next Data Check...');

  47. }

  48. console.log("Ping the Elasticsearch server");

  49. pingElasticsearch()

  50. console.log("Get data from USGS");

  51. indexAllDocs()
  52. });

  53. module.exports = router;

`` ```

上面的代码使用 npm 包 Axios 对 USGS 地震 API 进行异步 API 调用。 收到数据后,它将显示为 JSON。 你还可以看到我们在页面顶部导入了一个名为 log-timestamp 的依赖项。 这将允许我们将时间戳添加到每个 console.log。

我们接下来修改 server.js 如下:

server/server.js

`

  1. const express = require('express');
  2. const client = require('./elasticsearch/client');

  3. const app = express();

  4. const port = 5001;

  5. //Define Routes

  6. const data = require('./routes/api/data')
  7. app.use('/api/data', data);

  8. app.get('/', (req, res) => {

  9. res.send('Hello World!')
  10. })

  11. app.listen(port, () => console.log(Server listening at http://localhost:${port}));

`` ```

重新运行我们的 server.js。我们通过 Postman 或者其它的工具来对我们的 REST 接口进行访问:

localhost:5000/api/data/earthquakes

从上面的输出中,我们可以看出来设计的 REST 接口工作是正常的。它含有一些收集来的数据。在所收集来的数据中,有一些数据是我们并不需要的。我们最终需要的数据是这样的:

```

  1. {
  2. "mag": 1.13,
  3. "place": "11km ENE of Coachella, CA",
  4. "time": 2022-05-02T20:07:53.266Z,
  5. "url": "http://earthquake.usgs.gov/earthquakes/eventpage/ci40240408",
  6. "sig": 20,
  7. "type": "earthquake",
  8. "depth": 2.09,
  9. "coordinates": {
  10. "lat": 33.7276667,
  11. "lon": -116.0736667
  12. }
  13. }

```

也就是说我们可以删除一下不需要的字段,并且我们需要转换一些字段,比如把 time 字段转换为我们想要的 @timestamp 字段。另外在写入 Elasticsearch 时,我们需要预先针对 coodinates 字段进行定义。它是一个 geo_point 类型的字段。

定义 mapping 及 pipeline

如上所示,我们需要的字段如上。我们可以如下的一个 earthquakes 索引。我们在 Kibana 的 console 中打入如下的命令:

``

  1. PUT earthquakes
  2. {
  3. "mappings": {
  4. "properties": {
  5. "@timestamp": {
  6. "type": "date"
  7. },
  8. "coordinates": {
  9. "type": "geo_point"
  10. },
  11. "depth": {
  12. "type": "float"
  13. },
  14. "mag": {
  15. "type": "float"
  16. },
  17. "place": {
  18. "type": "text",
  19. "fields": {
  20. "keyword": {
  21. "type": "keyword"
  22. }
  23. }
  24. },
  25. "sig": {
  26. "type": "short"
  27. },
  28. "type": {
  29. "type": "keyword"
  30. },
  31. "url": {
  32. "enabled": false
  33. }
  34. }
  35. }
  36. }

` ```

在上面,我们针对索引的字段类型做如下的说明:

  • @timestamp:这是一个 date 字段类型的字段。我们希望的格式是 2022-05-02T20:07:53.266Z 而不是以 EPOC 形式显示的值,比如 1672471359610。这个字段有 time 转换而来
  • coordinates:这个是一个 geo_point 的字段。是地震发生的地理位置
  • place:这是一个 multi-field 字段。我们希望对这个字段进行统计,也可以针对它进行搜索
  • sig:这字段我们使用 short 类型,而不是 long。这样可以省去存储空间
  • type:这是一个 keyword 类型的字段。它只可以做数据分析统计之用
  • url:这个字段,我们既不想对它进行搜索,也不想对它进行统计,所有设置 enabled 为 false。这样可以省去分词的时间,从而提高摄入数据的速度

为此,我们可以针对上面的 data.js 做更进一步的修改:

server/routes/api/data.js

`

  1. const express = require('express');
  2. const router = express.Router();
  3. const axios = require('axios')
  4. const client = require('../../elasticsearch/client');
  5. const URL = http://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_hour.geojson;

  6. //======= Check that Elasticsearch is up and running =======\

  7. function pingElasticsearch() {
  8. console.log("ping .....")
  9. client.ping({
  10. requestTimeout: 30000,
  11. }, function(error,res) {
  12. if (error) {
  13. console.error('elasticsearch cluster is down!');
  14. } else {
  15. console.log('Elasticsearch Ready');
  16. }
  17. });
  18. };

  19. // ====== Get Data From USGS and then index into Elasticsearch

  20. indexAllDocs = async () => {
  21. try {
  22. const EARTHQUAKES = await axios.get(${URL},{
  23. headers: {
  24. 'Content-Type': [
  25. 'application/json',
  26. 'charset=utf-8'
  27. ]
  28. }
  29. });

  30. console.log('Getting Data From Host')

  31. results = EARTHQUAKES.data.features

  32. results.map(

  33. async (results) => (
  34. (earthquakeObject = {
  35. place: results.properties.place, //
  36. time: results.properties.time, //
  37. url: results.properties.url, //
  38. sig: results.properties.sig, //
  39. mag: results.properties.mag, //
  40. type: results.properties.type, //
  41. longitude: results.geometry.coordinates[0], //
  42. latitude: results.geometry.coordinates[1], //
  43. depth: results.geometry.coordinates[2], //
  44. }),
  45. await client.index({
  46. index: 'earthquakes',
  47. id: results.id,
  48. body: earthquakeObject
  49. })
  50. )
  51. );

  52. if (EARTHQUAKES.data.length) {

  53. indexAllDocs();
  54. } else {
  55. console.log('All Data Has Been Indexed!');
  56. };
  57. } catch (err) {
  58. console.log(err)
  59. };

  60. console.log('Preparing For The Next Data Check...');

  61. }

  62. //================== Official API Call ==================\

  63. router.get('/earthquakes', function (req, res) {
  64. res.send('Running Application...');
  65. console.log('Loading Application...')

  66. indexAllDocs(res);

  67. });

  68. module.exports = router;

`` ```

在上面,我们添加了把文档写入 Elasticsearch 的代码部分。我们使用地震数据的 id 作为 Elasticsearch 文档的 id。等服务器运行起来后,我们需要在 terminal 中打入如下的命令:

curl -XGET http://localhost:5001/api/data/earthquakes

我们可以在 Kibana 中通过如下的命令来查看文档:

GET earthquakes/_search?filter_path=**.hits

我们可以看到如下的结果:

``

  1. {
  2. "hits": {
  3. "hits": [
  4. {
  5. "_index": "earthquakes",
  6. "_id": "nc73827281",
  7. "_score": 1,
  8. "_source": {
  9. "place": "10km S of Laytonville, CA",
  10. "time": 1672505649740,
  11. "url": "http://earthquake.usgs.gov/earthquakes/eventpage/nc73827281",
  12. "sig": 63,
  13. "mag": 2.02,
  14. "type": "earthquake",
  15. "longitude": -123.4981689,
  16. "latitude": 39.5991669,
  17. "depth": 4.59
  18. }
  19. },
  20. ...

` ```

很显然,这个文档的 source 和我们之前的想要的格式还是不太一样。为了能够使的 time 转换为 @timestamp,我们可以在 Node.js 的代码中进行相应的转换。我们也可以采用 ingest pipeline 来实现相应的操作。我们定义如下的 ingest pipeine。

``

  1. POST _ingest/pipeline/_simulate
  2. {
  3. "pipeline": {
  4. "description": "This is for data transform for earthquake data",
  5. "processors": [
  6. {
  7. "date": {
  8. "field": "time",
  9. "formats": [
  10. "UNIX_MS"
  11. ]
  12. }
  13. }
  14. ]
  15. },
  16. "docs": [
  17. {
  18. "_source": {
  19. "place": "16km N of Borrego Springs, CA",
  20. "time": 1672507053210,
  21. "url": "http://earthquake.usgs.gov/earthquakes/eventpage/ci40152271",
  22. "sig": 10,
  23. "mag": 0.81,
  24. "type": "earthquake",
  25. "longitude": -116.368,
  26. "latitude": 33.4013333,
  27. "depth": 2.91
  28. }
  29. }
  30. ]
  31. }

` ```

在上面的命令中,我们使用 date processor 来把 time 转换为所需要的格式,并在默认的情况下把 target 设置为 @timestamp。上面命令运行的结果为:

``

  1. {
  2. "docs": [
  3. {
  4. "doc": {
  5. "_index": "_index",
  6. "_id": "_id",
  7. "_version": "-3",
  8. "_source": {
  9. "sig": 10,
  10. "mag": 0.81,
  11. "depth": 2.91,
  12. "@timestamp": "2022-12-31T17:17:33.210Z",
  13. "latitude": 33.4013333,
  14. "place": "16km N of Borrego Springs, CA",
  15. "time": 1672507053210,
  16. "type": "earthquake",
  17. "url": "http://earthquake.usgs.gov/earthquakes/eventpage/ci40152271",
  18. "longitude": -116.368
  19. },
  20. "_ingest": {
  21. "timestamp": "2023-01-01T00:31:03.544821Z"
  22. }
  23. }
  24. }
  25. ]
  26. }

` ```

从上面的输出中,我们可以看出来 @timestamp 字段已经生成。它的值由 time 字段转换而来。我们还发现 latitude 及 longitude 并不是按照我们需要的格式来显示的。我们需要把它转化为另外一个像如下的对象:

我们可以通过 rename processor 来操作:

``

  1. POST _ingest/pipeline/_simulate
  2. {
  3. "pipeline": {
  4. "description": "This is for data transform for earthquake data",
  5. "processors": [
  6. {
  7. "date": {
  8. "field": "time",
  9. "formats": [
  10. "UNIX_MS"
  11. ]
  12. }
  13. },
  14. {
  15. "rename": {
  16. "field": "latitude",
  17. "target_field": "coordinates.lat"
  18. }
  19. },
  20. {
  21. "rename": {
  22. "field": "longitude",
  23. "target_field": "coordinates.lon"
  24. }
  25. }
  26. ]
  27. },
  28. "docs": [
  29. {
  30. "_source": {
  31. "place": "16km N of Borrego Springs, CA",
  32. "time": 1672507053210,
  33. "url": "http://earthquake.usgs.gov/earthquakes/eventpage/ci40152271",
  34. "sig": 10,
  35. "mag": 0.81,
  36. "type": "earthquake",
  37. "longitude": -116.368,
  38. "latitude": 33.4013333,
  39. "depth": 2.91
  40. }
  41. }
  42. ]
  43. }

` ```

在上面的命令中,我们通过 rename processor 来重新命名 longitude 及 latitude 两个字段。运行上面的代码,我们可以看到如下的结果:

``

  1. {
  2. "docs": [
  3. {
  4. "doc": {
  5. "_index": "_index",
  6. "_id": "_id",
  7. "_version": "-3",
  8. "_source": {
  9. "sig": 10,
  10. "mag": 0.81,
  11. "depth": 2.91,
  12. "@timestamp": "2022-12-31T17:17:33.210Z",
  13. "coordinates": {
  14. "lon": -116.368,
  15. "lat": 33.4013333
  16. },
  17. "place": "16km N of Borrego Springs, CA",
  18. "time": 1672507053210,
  19. "type": "earthquake",
  20. "url": "http://earthquake.usgs.gov/earthquakes/eventpage/ci40152271"
  21. },
  22. "_ingest": {
  23. "timestamp": "2023-01-01T00:38:42.729604Z"
  24. }
  25. }
  26. }
  27. ]
  28. }

` ```

很显然,我们看到了一个新的 coordinates 的字段。它是一个 object。我们发现有一个多余的字段叫做 time。这个并不是我们所需要的。我们可以通过 remove processor 来删除这个字段。

``

  1. POST _ingest/pipeline/_simulate
  2. {
  3. "pipeline": {
  4. "description": "This is for data transform for earthquake data",
  5. "processors": [
  6. {
  7. "date": {
  8. "field": "time",
  9. "formats": [
  10. "UNIX_MS"
  11. ]
  12. }
  13. },
  14. {
  15. "rename": {
  16. "field": "latitude",
  17. "target_field": "coordinates.lat"
  18. }
  19. },
  20. {
  21. "rename": {
  22. "field": "longitude",
  23. "target_field": "coordinates.lon"
  24. }
  25. },
  26. {
  27. "remove": {
  28. "field": "time"
  29. }
  30. }
  31. ]
  32. },
  33. "docs": [
  34. {
  35. "_source": {
  36. "place": "16km N of Borrego Springs, CA",
  37. "time": 1672507053210,
  38. "url": "http://earthquake.usgs.gov/earthquakes/eventpage/ci40152271",
  39. "sig": 10,
  40. "mag": 0.81,
  41. "type": "earthquake",
  42. "longitude": -116.368,
  43. "latitude": 33.4013333,
  44. "depth": 2.91
  45. }
  46. }
  47. ]
  48. }

` ```

我们运行上面的命令。我们再次查看输出的结果:

``

  1. {
  2. "docs": [
  3. {
  4. "doc": {
  5. "_index": "_index",
  6. "_id": "_id",
  7. "_version": "-3",
  8. "_source": {
  9. "sig": 10,
  10. "mag": 0.81,
  11. "depth": 2.91,
  12. "@timestamp": "2022-12-31T17:17:33.210Z",
  13. "coordinates": {
  14. "lon": -116.368,
  15. "lat": 33.4013333
  16. },
  17. "place": "16km N of Borrego Springs, CA",
  18. "type": "earthquake",
  19. "url": "http://earthquake.usgs.gov/earthquakes/eventpage/ci40152271"
  20. },
  21. "_ingest": {
  22. "timestamp": "2023-01-01T00:44:46.919265Z"
  23. }
  24. }
  25. }
  26. ]
  27. }

` ```

很显然这个时候,我们的 time 字段不见了。

在上面,我们通过 _simulate 的端点测试好了我们的 ingest pipeline。接下来,是我们使用命令来创建这个 pipeline 的时候了。我们使用如下的命令来创建这个 pipeline:

``

  1. PUT _ingest/pipeline/earthquake_data_pipeline
  2. {
  3. "description": "This is for data transform for earthquake data",
  4. "processors": [
  5. {
  6. "date": {
  7. "field": "time",
  8. "formats": [
  9. "UNIX_MS"
  10. ]
  11. }
  12. },
  13. {
  14. "rename": {
  15. "field": "latitude",
  16. "target_field": "coordinates.lat"
  17. }
  18. },
  19. {
  20. "rename": {
  21. "field": "longitude",
  22. "target_field": "coordinates.lon"
  23. }
  24. },
  25. {
  26. "remove": {
  27. "field": "time"
  28. }
  29. }
  30. ]
  31. }

` ```

运行上面的命令。这样我们就创建了一个叫做 earthquake_data_pipeline 的 ingest pipeline。

接下来,我们需要删除之前所创建的索引,因为它包含我们不需要的一些字段:

DELETE earthquakes

我们再次运行之前创建索引 earthquakes 的命令:

``

  1. PUT earthquakes
  2. {
  3. "mappings": {
  4. "properties": {
  5. "@timestamp": {
  6. "type": "date"
  7. },
  8. "coordinates": {
  9. "type": "geo_point"
  10. },
  11. "depth": {
  12. "type": "float"
  13. },
  14. "mag": {
  15. "type": "float"
  16. },
  17. "place": {
  18. "type": "text",
  19. "fields": {
  20. "keyword": {
  21. "type": "keyword"
  22. }
  23. }
  24. },
  25. "sig": {
  26. "type": "short"
  27. },
  28. "type": {
  29. "type": "keyword"
  30. },
  31. "url": {
  32. "enabled": false
  33. }
  34. }
  35. }
  36. }

` ```

我们接下来需要修改 data.js 文件来使用这个 ingest pipeline:

server/routes/api/data.js

`

  1. const express = require('express');
  2. const router = express.Router();
  3. const axios = require('axios')
  4. const client = require('../../elasticsearch/client');
  5. const URL = http://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_hour.geojson;

  6. //======= Check that Elasticsearch is up and running =======\

  7. function pingElasticsearch() {
  8. console.log("ping .....")
  9. client.ping({
  10. requestTimeout: 30000,
  11. }, function(error,res) {
  12. if (error) {
  13. console.error('elasticsearch cluster is down!');
  14. } else {
  15. console.log('Elasticsearch Ready');
  16. }
  17. });
  18. };

  19. // ====== Get Data From USGS and then index into Elasticsearch

  20. indexAllDocs = async () => {
  21. try {
  22. const EARTHQUAKES = await axios.get(${URL},{
  23. headers: {
  24. 'Content-Type': [
  25. 'application/json',
  26. 'charset=utf-8'
  27. ]
  28. }
  29. });

  30. console.log('Getting Data From Host')

  31. results = EARTHQUAKES.data.features

  32. results.map(

  33. async (results) => (
  34. (earthquakeObject = {
  35. place: results.properties.place,
  36. time: results.properties.time,
  37. url: results.properties.url,
  38. sig: results.properties.sig,
  39. mag: results.properties.mag,
  40. type: results.properties.type,
  41. longitude: results.geometry.coordinates[0],
  42. latitude: results.geometry.coordinates[1],
  43. depth: results.geometry.coordinates[2],
  44. }),
  45. await client.index({
  46. index: 'earthquakes',
  47. id: results.id,
  48. body: earthquakeObject,
  49. pipeline: 'earthquake_data_pipeline'
  50. })
  51. )
  52. );

  53. if (EARTHQUAKES.data.length) {

  54. indexAllDocs();
  55. } else {
  56. console.log('All Data Has Been Indexed!');
  57. };
  58. } catch (err) {
  59. console.log(err)
  60. };

  61. console.log('Preparing For The Next Data Check...');

  62. }

  63. //================== Official API Call ==================\

  64. router.get('/earthquakes', function (req, res) {
  65. res.send('Running Application...');
  66. console.log('Loading Application...')

  67. setInterval(() => {

  68. pingElasticsearch()
  69. indexAllDocs(res);
  70. }, 120000);

  71. });

  72. module.exports = router;

`` ```

在上面的代码中,我对一下的两处做了修改:

我们再次使用如下的命令来启动对数据的采集:

curl -XGET http://localhost:5001/api/data/earthquakes

稍等一点时间(超过2分钟),我们到 Kibana 中来查看数据:

GET earthquakes/_search

我们可以看到如下的数据:

``

  1. {
  2. "took": 0,
  3. "timed_out": false,
  4. "_shards": {
  5. "total": 1,
  6. "successful": 1,
  7. "skipped": 0,
  8. "failed": 0
  9. },
  10. "hits": {
  11. "total": {
  12. "value": 9,
  13. "relation": "eq"
  14. },
  15. "max_score": 1,
  16. "hits": [
  17. {
  18. "_index": "earthquakes",
  19. "_id": "us7000j1cr",
  20. "_score": 1,
  21. "_source": {
  22. "sig": 340,
  23. "mag": 4.7,
  24. "depth": 181.449,
  25. "@timestamp": "2023-01-01T06:39:45.239Z",
  26. "coordinates": {
  27. "lon": 70.8869,
  28. "lat": 36.5351
  29. },
  30. "place": "36 km S of Jurm, Afghanistan",
  31. "type": "earthquake",
  32. "url": "http://earthquake.usgs.gov/earthquakes/eventpage/us7000j1cr"
  33. }
  34. },
  35. ...

` ```

从上面,我们可以看出来有9个地震数据已经被写入。我们可以让应用运行一段时间,它可能会有更多的数据进来。比如:

``

  1. {
  2. "took": 0,
  3. "timed_out": false,
  4. "_shards": {
  5. "total": 1,
  6. "successful": 1,
  7. "skipped": 0,
  8. "failed": 0
  9. },
  10. "hits": {
  11. "total": {
  12. "value": 10,
  13. "relation": "eq"
  14. },
  15. "max_score": 1,
  16. "hits": [
  17. {
  18. "_index": "earthquakes",
  19. "_id": "nc73827436",
  20. "_score": 1,
  21. "_source": {
  22. "sig": 252,
  23. "mag": 4.04,
  24. "depth": 4.51,
  25. "@timestamp": "2023-01-01T06:49:08.930Z",
  26. "coordinates": {
  27. "lon": -121.220665,
  28. "lat": 36.5789986
  29. },
  30. "place": "9km NW of Pinnacles, CA",
  31. "type": "earthquake",
  32. "url": "http://earthquake.usgs.gov/earthquakes/eventpage/nc73827436"
  33. }
  34. },

` ```

我们可以看到10个数据。

从上面的数据中,我们可以看到最终的数据结构就是我们想要的数据结构。

在接下来的文章中,我将详细描述如何对这个数据进行可视化。我将使用 Kibana 来进行展示,也会使用 Web 来进行搜索。敬请期待!

为了方便大家的学习,我把源代码放在这里:http://github.com/liu-xiao-guo/earthquakes-app

参考:

【1】http://medium.com/@webdevmark16/ingesting-real-time-data-into-elasticsearch-with-node-js-a7aa9b5acf8c