Cách đồng bộ hóa dữ liệu đã chuyển đổi từ MongoDB sang Elasticsearch với Transporter trên Ubuntu 16.04
Transporter là một công cụ open-souce để di chuyển dữ liệu qua các repodata khác nhau. Các nhà phát triển thường viết kịch bản một lần cho các việc như di chuyển dữ liệu qua database , di chuyển dữ liệu từ file sang database hoặc ngược lại, nhưng sử dụng một công cụ như Transporter có một số lợi thế.Trong Transporter, bạn xây dựng các đường ống dẫn , xác định stream dữ liệu từ một nguồn (nơi dữ liệu được đọc) đến phần chìm (nơi dữ liệu được ghi). Nguồn và phần chìm có thể là database SQL hoặc NoSQL, file phẳng hoặc các tài nguyên khác. Transporter sử dụng bộ điều hợp , là phần mở rộng có thể cắm được, để giao tiếp với các tài nguyên này và dự án bao gồm một số bộ điều hợp cho database phổ biến theo mặc định.
Ngoài dữ liệu di chuyển, Transporter cũng cho phép bạn thay đổi dữ liệu khi nó di chuyển qua đường ống sử dụng máy biến áp . Giống như bộ điều hợp, có một số máy biến áp được bao gồm theo mặc định. Bạn cũng có thể viết biến thế của riêng mình để tùy chỉnh sửa đổi dữ liệu .
Trong hướng dẫn này, ta sẽ giới thiệu cho các bạn một ví dụ về việc di chuyển và xử lý dữ liệu từ database MongoDB sang Elasticsearch bằng cách sử dụng các bộ điều hợp tích hợp của Transporter và một máy biến áp tùy chỉnh được viết bằng JavaScript.
Yêu cầu
Để làm theo hướng dẫn này, bạn cần :
- Một server Ubuntu 16.04 được cài đặt theo hướng dẫn cài đặt server ban đầu Ubuntu 16.04 này , bao gồm user không phải root có quyền sudo và firewall .
- MongoDB được cài đặt theo hướng dẫn MongoDB này trên Ubuntu 16.04 hoặc cài đặt MongoDB hiện có.
- Đã cài đặt Elasticsearch theo hướng dẫn Elasticsearch trên Ubuntu 16.04 này hoặc cài đặt Elasticsearch hiện có.
Đường ống vận chuyển được viết bằng JavaScript. Bạn sẽ không cần bất kỳ kiến thức hoặc kinh nghiệm nào về JavaScript trước để làm theo hướng dẫn này, nhưng bạn có thể tìm hiểu thêm trong các hướng dẫn JavaScript này .
Bước 1 - Cài đặt Trình vận chuyển
Transporter cung cấp mã binary cho hầu hết các hệ điều hành phổ biến. Quá trình cài đặt Ubuntu bao gồm hai bước: download bản binary Linux và làm cho nó có thể thực thi được.
Đầu tiên, hãy lấy liên kết cho version mới nhất từ trang phát hành mới nhất của Transporter trên GitHub . Sao chép liên kết kết thúc bằng -linux-amd64
. Hướng dẫn này sử dụng v0.5.2, version mới nhất tại thời điểm viết bài.
Tải file binary vào folder chính của bạn.
- cd
- wget https://github.com/compose/transporter/releases/download/v0.5.2/transporter-0.5.2-linux-amd64
Di chuyển nó vào /usr/local/bin
hoặc folder cài đặt bạn muốn .
- mv transporter-*-linux-amd64 /usr/local/bin/transporter
Sau đó, làm cho nó có thể thực thi để bạn có thể chạy nó.
- chmod +x /usr/local/bin/transporter
Bạn có thể kiểm tra xem Transporter có được cài đặt chính xác hay không bằng cách chạy file binary .
- transporter
Bạn sẽ thấy kết quả trợ giúp sử dụng và số version :
OutputUSAGE transporter <command> [flags] COMMANDS run run pipeline loaded from a file . . . VERSION 0.5.2
Để sử dụng Transporter để di chuyển dữ liệu từ MongoDB sang Elasticsearch, ta cần hai thứ: dữ liệu trong MongoDB mà ta muốn di chuyển và một đường ống cho Transporter biết cách di chuyển nó. Bước tiếp theo tạo một số dữ liệu ví dụ, nhưng nếu bạn đã có database MongoDB mà bạn muốn di chuyển, bạn có thể bỏ qua bước tiếp theo và chuyển thẳng sang Bước 3.
Bước 2 - Thêm dữ liệu mẫu vào MongoDB (Tùy chọn)
Trong bước này, ta sẽ tạo một database mẫu với một bộ sưu tập duy nhất trong MongoDB và thêm một vài tài liệu vào bộ sưu tập đó. Sau đó, trong phần còn lại của hướng dẫn, ta sẽ di chuyển và chuyển đổi dữ liệu ví dụ này với một đường ống Transporter.
Đầu tiên, kết nối với database MongoDB của bạn.
- mongo
Điều này sẽ thay đổi dấu nhắc của bạn thành mongo>
, cho biết rằng bạn đang sử dụng MongoDB shell.
Từ đây, chọn một database để làm việc. Ta sẽ gọi my_application
ta .
- use my_application
Trong MongoDB
, bạn không cần phải tạo database hoặc tập hợp một cách rõ ràng. Khi bạn bắt đầu thêm dữ liệu vào database mà bạn đã chọn theo tên, database đó sẽ tự động được tạo.
Vì vậy, để tạo database my_application
, hãy lưu hai tài liệu vào bộ sưu tập users
của nó: một tài liệu đại diện cho Sammy Shark và một tài liệu đại diện cho Gilly Glowfish. Đây sẽ là dữ liệu thử nghiệm của ta .
- db.users.save({"firstName": "Sammy", "lastName": "Shark"});
- db.users.save({"firstName": "Gilly", "lastName": "Glowfish"});
Sau khi thêm tài liệu, bạn có thể truy vấn bộ sưu tập users
để xem profile của bạn .
- db.users.find().pretty();
Đầu ra sẽ giống với kết quả bên dưới, nhưng các cột _id
sẽ khác. MongoDB tự động thêm ID đối tượng để xác định duy nhất các tài liệu trong một bộ sưu tập.
output{ "_id" : ObjectId("59299ac7f80b31254a916456"), "firstName" : "Sammy", "lastName" : "Shark" } { "_id" : ObjectId("59299ac7f80b31254a916457"), "firstName" : "Gilly", "lastName" : "Glowfish" }
Nhấn CTRL+C
để thoát khỏi shell MongoDB.
Tiếp theo, hãy tạo một đường ống Transporter để di chuyển dữ liệu này từ MongoDB sang Elasticsearch.
Bước 3 - Tạo một đường ống cơ bản
Đường ống trong Transporter được xác định bởi file JavaScript có tên là pipeline.js
theo mặc định. Lệnh init
tạo một tệp cấu hình cơ bản trong đúng folder , được cung cấp nguồn và tệp chìm.
Khởi tạo một pipeline.js
khởi động.js với MongoDB làm nguồn và Elasticsearch làm phần chìm.
- transporter init mongodb elasticsearch
Bạn sẽ thấy kết quả sau:
OutputWriting pipeline.js...
Bạn sẽ không cần phải sửa đổi pipeline.js
cho bước này, nhưng hãy xem nó hoạt động như thế nào.
Tệp trông giống như thế này, nhưng bạn cũng có thể xem nội dung của file bằng lệnh cat pipeline.js
, less pipeline.js
(thoát less
bằng cách nhấn q
) hoặc bằng cách mở nó bằng editor yêu thích của bạn.
var source = mongodb({ "uri": "${MONGODB_URI}" // "timeout": "30s", // "tail": false, // "ssl": false, // "cacerts": ["/path/to/cert.pem"], // "wc": 1, // "fsync": false, // "bulk": false, // "collection_filters": "{}", // "read_preference": "Primary" }) var sink = elasticsearch({ "uri": "${ELASTICSEARCH_URI}" // "timeout": "10s", // defaults to 30s // "aws_access_key": "ABCDEF", // used for signing requests to AWS Elasticsearch service // "aws_access_secret": "ABCDEF" // used for signing requests to AWS Elasticsearch service // "parent_id": "elastic_parent" // defaults to "elastic_parent" parent identifier for Elasticsearch }) t.Source("source", source, "/.*/").Save("sink", sink, "/.*/")
Các dòng bắt đầu bằng var source
và var sink
xác định các biến JavaScript cho các bộ điều hợp MongoDB và Elasticsearch tương ứng. Ta sẽ xác định MONGODB_URI
và ELASTICSEARCH_URI
biến môi trường mà các adapter cần sau này trong bước này.
Các dòng bắt đầu bằng //
là comment . Chúng nêu bật một số tùy chọn cấu hình phổ biến mà bạn có thể đặt cho đường dẫn của bạn , nhưng ta không sử dụng chúng cho đường dẫn cơ bản mà ta đang tạo ở đây.
Dòng cuối cùng nối nguồn và bồn rửa. Người transporter
biến hoặc t
cho phép ta truy cập vào đường ống của ta . Ta sử dụng .Source()
và .Save()
chức năng để thêm nguồn và chìm bằng cách sử dụng source
và sink
biến xác định trước đó trong file.
Đối số thứ ba cho các hàm Source()
và Save()
là namespace.
Chuyển /.*/
làm đối số cuối cùng nghĩa là ta muốn chuyển tất cả dữ liệu từ MongoDB và lưu nó dưới cùng một không gian tên trong Elasticsearch.
Trước khi có thể chạy đường ống này, ta cần đặt các biến môi trường cho URI MongoDB và URI Elasticsearch . Trong ví dụ mà ta đang sử dụng, cả hai đều được lưu trữ local với cài đặt mặc định, nhưng hãy đảm bảo bạn tùy chỉnh các tùy chọn này nếu bạn đang sử dụng các version MongoDB hoặc Elasticsearch hiện có.
- export MONGODB_URI='mongodb://localhost/my_application'
- export ELASTICSEARCH_URI='http://localhost:9200/my_application'
Bây giờ ta đã sẵn sàng để chạy đường ống.
- transporter run pipeline.js
Bạn sẽ thấy kết quả kết thúc như thế này:
Output. . . INFO[0001] metrics source records: 2 path=source ts=1522942118483391242 INFO[0001] metrics source/sink records: 2 path="source/sink" ts=1522942118483395960 INFO[0001] exit map[source:mongodb sink:elasticsearch] ts=1522942118483396878
Trong dòng thứ hai và thứ ba đến dòng cuối cùng, kết quả này cho biết rằng có 2 bản ghi hiện diện trong nguồn và 2 bản ghi đã được chuyển đến bồn rửa.
Để xác nhận cả hai bản ghi đã được xử lý, bạn có thể truy vấn Elasticsearch để biết nội dung của database my_application
, hiện sẽ tồn tại.
- curl $ELASTICSEARCH_URI/_search?pretty=true
Tham số ?pretty=true
giúp kết quả dễ đọc hơn:
Output{ "took" : 5, "timed_out" : false, "_shards" : { "total" : 5, "successful" : 5, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : 2, "max_score" : 1.0, "hits" : [ { "_index" : "my_application", "_type" : "users", "_id" : "5ac63e9c6687d9f638ced4fe", "_score" : 1.0, "_source" : { "firstName" : "Gilly", "lastName" : "Glowfish" } }, { "_index" : "my_application", "_type" : "users", "_id" : "5ac63e986687d9f638ced4fd", "_score" : 1.0, "_source" : { "firstName" : "Sammy", "lastName" : "Shark" } } ] } }
Database và bộ sưu tập trong MongoDB tương tự như các index và loại trong Elasticsearch. Với suy nghĩ đó, bạn sẽ thấy:
- Trường
_index
được đặt thànhmy_application,
tên của database MongoDB ban đầu). - Trường
_type
được đặt chousers,
tên của bộ sưu tập MongoDB. - Các
firstName
vàlastName
lĩnh vực điền với “Sammy” “Shark” và “Gilly” “Glowfish”, tương ứng.
Điều này xác nhận cả hai bản ghi từ MongoDB đã được xử lý thành công thông qua Transporter và được tải vào Elasticsearch. Để xây dựng dựa trên đường dẫn cơ bản này, ta sẽ thêm một bước xử lý trung gian có thể biến đổi dữ liệu đầu vào.
Bước 4 - Tạo máy biến áp
Như tên cho thấy, máy biến áp sửa đổi dữ liệu nguồn trước khi tải nó vào bồn rửa. Ví dụ: chúng cho phép bạn thêm trường mới, xóa trường hoặc thay đổi dữ liệu của trường. Transporter đi kèm với một số máy biến áp được định nghĩa cũng như hỗ trợ cho các máy biến áp tùy chỉnh.
Thông thường, máy biến áp tùy chỉnh được viết dưới dạng hàm JavaScript và được lưu trong một file riêng biệt. Để sử dụng chúng, bạn thêm một tham chiếu đến file biến áp trong pipeline.js
Transporter bao gồm cả công cụ JavaScript Otto và Goja. Vì Goja mới hơn và thường nhanh hơn, ta sẽ sử dụng nó ở đây. Sự khác biệt về chức năng duy nhất là cú pháp.
Tạo một file có tên là transform.js
, ta sẽ sử dụng file này để viết hàm chuyển đổi của bạn .
- nano transform.js
Đây là hàm ta sẽ sử dụng, sẽ tạo ra một trường mới có tên là fullName
, giá trị của nó sẽ là trường firstName
và lastName
nối với nhau, được phân tách bằng dấu cách (như Sammy Shark
).
function transform(msg) { msg.data.fullName = msg.data.firstName + " " + msg.data.lastName; return msg }
Hãy xem qua các dòng của file này:
- Dòng đầu tiên của file ,
function transform(msg),
là định nghĩa hàm . -
msg
là một đối tượng JavaScript chứa các chi tiết của tài liệu nguồn. Ta sử dụng đối tượng này để truy cập dữ liệu đi qua đường ống. - Dòng đầu tiên của hàm nối hai trường hiện có và gán giá trị đó cho trường
fullName
mới. - Dòng cuối cùng của hàm trả về đối tượng
msg
mới được sửa đổi để phần còn lại của đường dẫn sử dụng.
Lưu và đóng file .
Tiếp theo, ta cần sửa đổi đường ống để sử dụng máy biến áp này. Mở file pipeline.js
để chỉnh sửa.
- nano pipeline.js
Trong dòng cuối cùng, ta cần thêm một lệnh gọi vào hàm Transform()
để thêm biến áp vào đường ống giữa các lệnh gọi đến Source()
và Save()
, như sau:
. . . t.Source("source", source, "/.*/") .Transform(goja({"filename": "transform.js"})) .Save("sink", sink, "/.*/")
Đối số được truyền cho Transform()
là kiểu biến đổi, trong trường hợp này là Goja. Sử dụng hàm goja
, ta chỉ định tên file của máy biến áp bằng đường dẫn tương đối của nó.
Lưu và đóng file . Trước khi ta chạy lại đường ống để kiểm tra máy biến áp, hãy xóa dữ liệu hiện có trong Elasticsearch từ thử nghiệm trước.
- curl -XDELETE $ELASTICSEARCH_URI
Bạn sẽ thấy kết quả này thông báo sự thành công của lệnh.
Output{"acknowledged":true}
Bây giờ chạy lại đường ống.
- transporter run pipeline.js
Đầu ra sẽ trông rất giống với thử nghiệm trước đó và bạn có thể xem trong vài dòng cuối cùng liệu đường ống có hoàn thành như trước hay không. Để chắc chắn, ta có thể kiểm tra lại Elasticsearch để xem dữ liệu có tồn tại ở định dạng mà ta mong đợi hay không.
- curl $ELASTICSEARCH_URI/_search?pretty=true
Bạn có thể thấy trường fullName
trong kết quả mới:
Output{ "took" : 9, "timed_out" : false, "_shards" : { "total" : 5, "successful" : 5, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : 2, "max_score" : 1.0, "hits" : [ { "_index" : "my_application", "_type" : "users", "_id" : "5ac63e9c6687d9f638ced4fe", "_score" : 1.0, "_source" : { "firstName" : "Gilly", "fullName" : "Gilly Glowfish", "lastName" : "Glowfish" } }, { "_index" : "my_application", "_type" : "users", "_id" : "5ac63e986687d9f638ced4fd", "_score" : 1.0, "_source" : { "firstName" : "Sammy", "fullName" : "Sammy Shark", "lastName" : "Shark" } } ] } }
Lưu ý trường fullName
đã được thêm vào cả hai tài liệu với các giá trị được đặt chính xác. Với điều này, bây giờ ta biết cách thêm các phép biến đổi tùy chỉnh vào đường ống Transporter.
Kết luận
Bạn đã xây dựng một đường ống vận chuyển cơ bản với một máy biến áp để sao chép và sửa đổi dữ liệu từ MongoDB sang Elasticsearch. Bạn có thể áp dụng các phép biến đổi phức tạp hơn theo cùng một cách, chuỗi nhiều phép biến đổi trong cùng một đường dẫn và hơn thế nữa. MongoDB và Elasticsearch chỉ là hai trong số các bộ điều hợp Transporter hỗ trợ. Nó cũng hỗ trợ các file phẳng, database SQL như Postgres và nhiều nguồn dữ liệu khác.
Bạn có thể xem dự án Transporter trên GitHub để được cập nhật những thay đổi mới nhất trong API và truy cập wiki Transporter để biết thêm thông tin chi tiết về cách sử dụng bộ điều hợp, máy biến áp và các tính năng khác của Máy biến áp.
Các tin liên quan
Cách triển khai trang web Jekyll bằng Git Hooks trên Ubuntu 16.042018-03-29
Cách chặn nỗ lực đăng nhập SSH không mong muốn với PyFilter trên Ubuntu 16.04
2018-03-27
Cách tự động triển khai ứng dụng Laravel với Trình triển khai trên Ubuntu 16.04
2018-03-23
Cách thiết lập trang web phát triển Jekyll trên Ubuntu 16.04
2018-03-20
Cách cài đặt Ruby on Rails với rbenv trên Ubuntu 16.04
2018-03-15
Cách cài đặt Node.js trên Ubuntu 16.04
2018-03-07
Cách cài đặt và bảo mật Memcached trên Ubuntu 16.04
2018-03-06
Cách cài đặt Buildbot trên Ubuntu 16.04
2018-03-06
Cách quản lý an toàn bí mật với HashiCorp Vault trên Ubuntu 16.04
2018-02-28
Cách bảo mật Roundcube trên Ubuntu 16.04
2018-02-24