이전 kafka-spark streaming 을 사용하여 인기검색어에 대한 로그를 수집하려고 하였지만, 마땅히 테스트 할 서버가 없어서 우선은 logstash를 사용해서 바로 로그를 elasticsearch에 저장하기로 했다.그래서 기존에 kafka 카테고리에 연재를 하였는데 elasticsearch로 옮기게 되었다. 이전의 글들은 kafka 카테고리에 있습니다.

로그 저장 방법 변경

컴퓨터의 리소스부족으로 kafka-sparkstreaming을 사용하지 않고 logstash를 사용해서 바로 elasticsearch로 변경하기로 했다.ㅠㅠ 다음에 돈 많이 모아서 aws 서버하나 구성해야겠네..요...

할일 : 기존에는 search: 검색단어 에서 search: 검색단어 timestamp: 201910293847 로 변경 하기

grok filter를 사용해서 로그에서 searchword와 timestamp를 추가하기로 했다.

Apache 로그는 아래와 같다.

timestamp는 apache 로그 앞에 있어서 바로 사용하면 되겠다고 생각했다.

2019-10-05 14:13:19,069[http-nio-8080-exec-28]INFO  c.k.k.c.HomeKDYController - 가위 Sat Oct 05 14:13:19 KST 2019

그럼 파일을 logstash 를 사용해서 각 필드에 넣기 위해서. Grok 필터를 사용하기로 함.

grok 필터에 대해 알아보는데 스터디 시간(1시간)동안 시간을 소요한것같은데

이제서야 해답을 찾았다.

우선 grok 필터는 다음과 같이 설정 하면될 것 같다.

%{TIMESTAMP_ISO8601:timestamp},%{NUMBER:num}\[%{DATA:httpio}\]%{LOGLEVEL:log-level}  %{DATA:class} \- (?<searchWd>[a-힣].)

로그의 구성을 보면 timestamp,number,[httpio]loglevel class - searchword time 으로 동일했다.

이것을 각각에 맞게 다 작성해주고, searchWd만 개별로 customizing 해서 만들어줬다.

그럼 결과는 아래로 나온다. 이제 logstash의 conf에 추가를 해주면될것같다.

{
  "timestamp": [
    [
      "2019-10-05 14:13:19"
    ]
  ],
  "YEAR": [
    [
      "2019"
    ]
  ],
  "MONTHNUM": [
    [
      "10"
    ]
  ],
  "MONTHDAY": [
    [
      "05"
    ]
  ],
  "HOUR": [
    [
      "14",
      null
    ]
  ],
  "MINUTE": [
    [
      "13",
      null
    ]
  ],
  "SECOND": [
    [
      "19"
    ]
  ],
  "ISO8601_TIMEZONE": [
    [
      null
    ]
  ],
  "num": [
    [
      "069"
    ]
  ],
  "BASE10NUM": [
    [
      "069"
    ]
  ],
  "httpio": [
    [
      "http-nio-8080-exec-28"
    ]
  ],
  "log": [
    [
      "INFO"
    ]
  ],
  "class": [
    [
      "c.k.k.c.HomeKDYController"
    ]
  ],
  "searchWd": [
    [
      "가위"
    ]
  ]
}

Log stash 내부 grok 필터

filter {
        grok {
                match => {"message" => "%{TIMESTAMP_ISO8601:timestamp},%{NUMBER:num}\[%{DATA:httpio}\]%{LOGLEVEL:log-level}  %{DATA:class} \- (?<searchWd>[a-힣].)"}
        }
}

https://github.com/DaeyunKim/elasticsearchStudy/blob/master/koreanDictionary/(8)%20log_structure_change.md

swap 파티션 : 디스크의 용량일부를 가상 메모리 공간으로 할당해 실제 메모리 공간이 부족하거나 어플리케이션에 따라 실제 메모리와 스왑 메모리를 혼용해서 사용하게 되는것

기존의 스왑 메모리가 있는지 확인

$swapon -s 
Filename                                Type            Size    Used    Priority
/dev/nvme0n1p3                          partition       8388604 81020   -2
  1. 스왑 파일 생성

    $ sudo fallocate -l 30GB /swapfile

  2. 파일을 시스템에서만 접근이 가능하도록 권한설정

    $sudo 600 /swapfile

  3. 스왑 포맷형태로 변환

    $sudo mkswap /swapfile

  4. 스왑 파일 시스템 등록

    $sudo swap /swapfile

결과

$ free -h
              total        used        free      shared  buff/cache   available
Mem:            30G        1.1G         29G        2.3M        570M         29G
Swap:           35G        1.5G         34G
  1. 재부팅 후에도 시스템에서 스왑 파일을 사용할 수 있게 변경

/etc/fstab 파일 수정

/swapfile   none    swap    sw    0   0

스왑 파일 삭제 하기

스왑 끄기

$sudo swapoff /swapfile

파일 삭제

$rm /swapfile

/etc/fstab 에 등록한 항목 삭제

참고 : https://extrememanual.net/12975

'BackEnd > Linux' 카테고리의 다른 글

Yum Repository 만들기 (1)  (0) 2019.12.23
[Linux]NoLogin  (0) 2019.10.08
cache 삭제하기  (0) 2019.08.27
Bash 프로그래밍 기초 - 파라미터 받아오기  (0) 2019.08.20
Bash 프로그래밍 기초 - If  (0) 2019.08.20

Message Bus & Message Queue

Message Queue 와 Message Bus의 차이

 

Message Queue

  • 두 개 이상의 프로세스 가 공통 시스템 메시지 큐에 대한 액세스를 통해 정보를 교환

하나 이상의 어플리케이션에서 만들어진 데이터들을 FIFO 방식으로 사용될수 있다.

A,B,C 의 어플리케이션이 있을경우에는 각 어플리케이션 마다 별도의 메세지 큐가 추가된다.

메세지는 일반적으로 읽을때 삭제되므로 여러 다른 종속응용 프로그램간에 대기열을 공유하는것이 일반적이지 않다.

일반적으로는 메시지 큐와 종속 응용프로그램 간에는 1:1 의 통신 관계가 있다.

 

Message Bus

  • 다른 시스템이 통해 통신 할 수 있도록 메시징 인프라 인터페이스를 공유 세트 ( 메시지 버스 ).

메시지 버스 또는 서비스 버스는 하나 또는 그 이상의 응용프로그램이 하나 이상의 다른 응용 프로그램에 메시지를 전달하는 방법을 제공

선입 선출의 주문이 보장 되지 않을 수 있으며, 버스 가입자는 메시지 발신자의 지식 없이도 출입이 가능하다.

보내는 응용 프로그램이 모든 큐에 메시지를 명시적으로 추가하는 큐와 달리 메세지를 게시 하면 버스로 보내고, 버스에 연결된 어플리케이션들이 메시지를 각자 가지고가는 방식

참고

https://ardalis.com/bus-or-queue

https://stackoverflow.com/questions/7793927/message-queue-vs-message-bus-what-are-the-differences

RDD 와 RDD 에서 호출하는 액션들에 대한 모든 의존성을 재연산하게 되는데, 호출하는 액션들에 대한 모든의존성을 재연산하게 된다.

이때 데이터를 여러번 스캔하는 반복알고리즘들에 대해서는 매우 무거운 작업일 수 있다.

여러번 반복 연산하는 것을 피하려면 스파크에 데이터 영속화(persist/persistence)를 요청할 수 있다.

RDD 영속화에 대한 요청을 하면 RDD를 계산한 노드들은 그 파티션들을 저장하고 있게 된다.

자바에서는 기본적으로 persist()가 데이터를 JVM 힙(heap) 에 직렬화되지 않는 객체 형태로 저장.

레벨 공간사용 CPU 사용시간 메모리에 저장 디스크에 저장 비고
MEMORY_ONLY 높음 낮음 예 아니오
MEMORY_ONLY_SER 낮음 높음 예 아니오
MEMORY_AND_DISK 높음 중간 일부 일부 메모리에 넣기에 데이터가 너무 많으면 디스크에 나눠 저장
MEMORY_AND_DISK_SER 낮음 높음 일부 일부 메모리에 넣기에 데이터가 너무 많으면 디스크에 나눠 저장.메모리에 직렬화된 형태로 저장
DISK_ONLY 낮음 높음 아니오 예

persist() 호출은 연산을 강제로 수행하지않는다.

메모리에 많은 데이터를 올리려고 시도하면 스파크는 LRU 캐시 정책에 따라 오래된 파티션들을 자동으로 버림.

예제 코드

import org.apache.spark.storage.StorageLevel

val result = input map(x => x*x)
result.persist(StorageLevel.DISK_ONLY)
println(result.count())
println(result.collect().mkString(","))

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-broadcast.html

Docker-02

컨테이너 기반 가상화 플랫폼 도커의 이해 2강

Mac 에서는 xhyve, Window에서는 Hyper-V 로 사용

Docker Meetup 9/17/14  Docker Client, Server, and Containers  client server  docker run ... docker -d  init  grandchild  G...

https://www.slideshare.net/SaiedKazemi/lpc-docker-container-cr-with-criu

client-server

//1. client
$docker run ~~~~ 


//2. host
$docker daemon

//3. client (output)
$output

Run Command

옵션 설명
-d Detached mode 백그라운드모드
-p 호스트와 컨테이너의 포트를 연결(포트 포워딩)
예제) localport: hostport
-v 호스트와 컨테이너의 디렉토리를 연결(마운트)
-e 컨테이너 내에서 사용할 환경 변수 설정
예제) -e PARAM_NAME test
--name 컨테이너 이름
--rm 프로세스 종료시 컨테이너 자동 제거
-it -i 와 -t를 동시에 사용한것으로 터미널입력을 위한 옵션
--network 네트워크 연결

컨테이너 실행

CentOs 실행

$docker run centos:7
Unable to find image 'centos:7' locally
7: Pulling from library/centos
d8d02d457314: Pull complete
Digest: sha256:307835c385f656ec2e2fec602cf093224173c51119bbebd602c53c3653a3d6eb
Status: Downloaded newer image for centos:7

이미지 파일이 있으면 실행하고 없으면 pull 로 이미지를 받아온뒤 centos:7 이미지 를 실행

bash 접속

$docker run --rm -it centos:7 /bin/bash
[root@7bead8a2b28b /]#ls
anaconda-post.log  dev  home  lib64  mnt  proc  run   srv  tmp  var
bin                etc  lib   media  opt  root  sbin  sys  usr

--it : 터미널로 입력을 하겠다는 의믜 (input terminal 인가??)

--rm : 터미널종료가 되면 컨테이너가 자동으로 종료됨

Redis

메모리에 key value를 사용하여 쓸수있는 메모리

$docker run --name=kdy_redis -d -p 1234:6379 redis
Unable to find image 'redis:latest' locally
latest: Pulling from library/redis
1ab2bdfe9778: Pull complete
966bc436cc8b: Pull complete
c1b01f4f76d9: Pull complete
8a9a85c968a2: Pull complete
8e4f9890211f: Pull complete
93e8c2071125: Pull complete
Digest: sha256:9755880356c4ced4ff7745bafe620f0b63dd17747caedba72504ef7bac882089
Status: Downloaded newer image for redis:latest
747b5ec128b3c79e54a1e7e1b49bdf9fe06542fa2dcc1824038812149da491cf
$

실행은 됬는데

텔넷으로 접속 redis

redis접속 하기 위해서 다시 telnet이미지 설치

별도로 설치 하지않고 이미지를 사용해서 command 라인 프로그램 사용 가능

$docker run --rm -it mikesplain/telnet docker.for.mac.localhost 1234

Unable to find image 'mikesplain/telnet:latest' locally
latest: Pulling from mikesplain/telnet
[DEPRECATION NOTICE] registry v2 schema1 support will be removed in an upcoming release. Please contact admins of the docker.io registry NOW to avoid future disruption.
c52e3ed763ff: Pull complete
a3ed95caeb02: Pull complete
Digest: sha256:11fa2b96776f5d34511452ad2b76eafad69321e456ac6653482eadb9d98c52a7
Status: Downloaded newer image for mikesplain/telnet:latest

keys * // 키의 모든것을 보여주라는 것
*0
set hello morris // key value설정
+OK
get hello //가지고오기
$6
morris//가지고 온 value

docker run --rm -it mikesplain/telnet docker.for.mac.localhost 1234

이미지의 호스트에 아이피가 필요한데 이것을 모르기 때문에 도커에서 제공하는 docker.for.mac.localhost 라는 dns을 사용해서 접속 가능하다.

docker.for.win.localhost 는 윈도우 설정

logs 확인하기

이미지에 대한 로그 확인하기

$docker logs [option] [container_id]

$docker ps
CONTAINER ID        IMAGE               COMMAND                  CREATED             STATUS              PORTS                    NAMES
747b5ec128b3        redis               "docker-entrypoint.s…"   9 minutes ago       Up 9 minutes        0.0.0.0:1234->6379/tcp   kdy_redis

$docker logs -f 747
1:C 27 Aug 2019 12:07:51.161 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
1:C 27 Aug 2019 12:07:51.161 # Redis version=5.0.5, bits=64, commit=00000000, modified=0, pid=1, just started
1:C 27 Aug 2019 12:07:51.161 # Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf

이미지확인

도커 이미지 확인 하기

$docker images

이미지 다운로드

$docker pull [name]

이미지 삭제

$docker rmi [name]

컨테이너가 실행 중일때는 삭제가 되지 않는다.

네트워크 만들기

$docker network create [option] NETWORK

네트워크를 만들어서 이미지를 실행할때 아래와같은 네트워크이름을 입력하면된다.

$docker run --name=centos_morris -it --network=[docker_network_name] centos:7 /bin/bash

아이피를 모르더라도 네트워크의 이름만으로도 해결이 가능하다.

Docker Compose

운영 환경에서 많이 사용

$docker-compose version
docker-compose version 1.24.1, build 4667896b
docker-py version: 3.7.3
CPython version: 3.6.8
OpenSSL version: OpenSSL 1.1.0j  20 Nov 2018

하나하나 이미지를 실행했던 것을 하나의 파일에서 환경변수, volumnes , port 등의 여러이미지에 대한 설정을 한번에 실행

docker-compose.yml 파일을 만들어서 내용 입력(이미지, 이미지에 대한 옵션을 설정으로 입력)

해당하는 파일을 읽어서 컨테이너를 실행함

$docker-compose up

해당파일을 읽어서 컨테이너 종료 및 삭제

$docker-compose down

Q : docker compose 사용할때 무조건 docker-compose.yml 의 이름으로 실행해야하는건지?

기본적으로는 docker-compose.yml , docker-compose.yaml 파일을 찾지만

-f 옵션을 사용하여 특정 파일을 지정해 줄수 있음

$docker-compose -f 파일이름 up

강의 내용
youtube.com/watch?v=RlezLB66KPg&list=PL9mhQYIlKEhfw7ZKPgHIm9opAm2ZpmzDq&index=3&t=2423s

'BackEnd > Docker' 카테고리의 다른 글

도커 명령어  (1) 2020.02.09
컨테이너 기반 가상화 플랫폼 '도커(Docker)' 의 이해 1강  (0) 2019.08.26
[docker] MySQL 접속하기 (2)  (0) 2019.06.30
[Docker] install Mysql  (0) 2019.06.23

# echo 3 > /proc/sys/vm/drop_caches

 

https://linux-mm.org/Drop_Caches

'BackEnd > Linux' 카테고리의 다른 글

[Linux]NoLogin  (0) 2019.10.08
Swap 메모리 늘리기  (0) 2019.09.17
Bash 프로그래밍 기초 - 파라미터 받아오기  (0) 2019.08.20
Bash 프로그래밍 기초 - If  (0) 2019.08.20
Bash 프로그래밍 기초 - for  (0) 2019.08.20

도커에 대해 T-Academy로 형들과 도커에 대한 이해하는것으로 스터디를 해보기로했다.

실습을 위해 아래의 주소로 들어가서 확인

https://bit.ly/docker-sk

도커의 등장

솔로몬 하이크 -> 파이콘 2013 에서 등장

The future of Linux containers

Mac OS에서 실행 해보자

  • ubuntu(docker container)

  • CentOs(docker container)

  • Busybox(docker container)

    • 도커 커널을 위한 최소한의 이미지
    • 하나의 실행 파일 안에 스트립 다운된 일부 유닉스 도구들을 제공하는 소프트웨어
    • 다수가 리눅스 커널이 제공하는 인터페이스와 함께 동작하도록 설계되어있음 (자원이 매우 적은 임베디드 운영체제를 위해 작성됨)

우분투 에서 실행된 hello, world

$docker run -it ubuntu:latest echo "hello,world!"

우분투의 bash로 접속하기

$docker run -it ubuntu:latest bash

/etc/lsb-realease

centos

/etc/centos-release

busybox

docker run -it busybox:latest sh

컨테이너

컨테이너 : 가상머신

각각의 VM = 서로 다른 환경

각각의 컨테이너 = 서로 다른 환경

Virtual Machine == Container ?같을까 ?

Virtual Machine : 하드웨어 가상화

소프트웨어로 구현된 하드웨어 

컨테이너 : 하드웨어 가상화가 아님

OS에서 지원하는 기능을 사용 (리눅스 커널을 사용)

격리된 환경에서 프로세스를 실행 

한마디로 한 이미지의 프로세스

가상화 된 하드웨어 필요없이 os의 커널을 사용해서 동작함

img

chroot

실제 루트는 / 이지만 ~/box 가 루트인것처럼 인식되게함

자신의 루트를 ~/box로 인식함

#chroot /root/box/ bash

ldd 의존하고 있는 파일의 목록을 볼 수 있다.

예시) ldd bash

도커는 마치 OS의 커널을 위의 chroot 처럼 process를 격리 시켜서 만들어져 동작함

Linux (croups, namespaces, nettling, selinux, netfilter, capabilities, apparmpr) 등의 리눅스 커널을 Docker(lib container, libvirt, lxc, system-nspawn) 등으로 따로 격리 시킨것임

예전에는 lxc를 사용했지만...추후에 ...


실습

  • mysql

  • wordpress

MySQL

$docker run -d -p 3306:3306 --name mysql mysql:latest

앞의 포트 : 안에서 접속할 포트

뒤의 포트 : 밖에서 접속할 포트

-d : 백그라운드에서 실행한다는 뜻

WordPress

이미지

특정 프로세스를 실행하기 위한 환경

  • 계층화된 파일 시스템
  • 이미지는 파일들의 집합
  • 프로세스가 실행되는 환경도 결국 파일들의 집합

도커의 기본 아키텍처

linux

docker run -> server , container(process)

macOS

docker run ->xhyve라는 가상화된 머신에다가 server와 container가 실행됨

xhyve

macOS의 가상화 방식(경량 가상 머신)

컨테이너 = xhyve에서 실행된 프로세스

호스트 머신과 자연스럽게 결합

네트워크 / 볼륨 등 

호스트 머신처럼 사용가능

왜 도커를 사용하는가 ??

컨테이너가 필요한 이유

보편적 물리법칙 언제 어디서나 동일하게 적용된다.

하지만 컴퓨터의 환경은 보편적이지 않다

각각의 OS에서 설치되는 프로그램은 다 다르다. 하나로 통일이 되지 않기 때문에

서버 관리도 어렵다.

도커를 사용하게 되면 어플리케이션 실행 환경까지의 최소한의 파일을 모아둔것이기 때문에 깔끔하게 설치를 할 수 있다.

이미지 : 동작되는게 보장이된다. 사용할 프로세스를 미리 만들어놓기 때문에

개발환경의 pc의 환경을 미리 보장하고 사용하는것이 가능하기 때문에

Q1 : 도커는 시스템전체를 다 사용하나요?

cgroups는 프로세스의 자원을 사용할지 제한을두게 되는데, docker에서도 자원의 제약을 한정해서 쓸 수 있다.

'BackEnd > Docker' 카테고리의 다른 글

도커 명령어  (1) 2020.02.09
컨테이너 기반 가상화 플랫폼 '도커(Docker)' 의 이해 2강  (0) 2019.08.27
[docker] MySQL 접속하기 (2)  (0) 2019.06.30
[Docker] install Mysql  (0) 2019.06.23

Spark ElasticSearch 배포하기

위에서 spark의 코드를작성하고 테스트 까지 완료 하였다.

이제 배포를 해야하는데 어떻게 할까 ?

실행 가능한 fat jar로 만들기 위해서 하위 부분 jar 를 추가 해주었다.

plugins {
    id 'scala'

}

group 'KafkaEsSpark'
version '1.0-SNAPSHOT'

sourceCompatibility = 1.8

repositories {
    mavenCentral()
}

dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.12'
    // https://mvnrepository.com/artifact/org.scala-lang/scala-library
    compile group: 'org.scala-lang', name: 'scala-library', version: '2.11.7'
    compile 'org.apache.spark:spark-core_2.11:2.4.3'
    compile 'com.crealytics:spark-excel_2.11:0.12.0'
    compile 'org.apache.spark:spark-sql_2.11:2.4.3'
    compile 'org.elasticsearch:elasticsearch-spark-20_2.11:6.5.2'
    // https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
    compile 'org.apache.spark:spark-streaming_2.11:2.4.3'
    compile 'org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.3'
}

jar {
    classifier = 'all'
    manifest {
        attributes "Main-Class": "com.kafkastream.word.KafkaStreaming"
    }
    from{
        configurations.compile.collect { it.isDirectory() ? it: zipTree(it)}
    }
    baseName = 'SparkEsKafka'
    zip64 true
}

task run(type:JavaExec, dependsOn: classes) {
    main = "com.kafkastream.word.KafkaStreaming"
    classpath sourceSets.main.runtimeClasspath
    classpath configurations.runtime
}
task fatJar(type: Jar){
    zip64 true
    description = "Assembles a Hadoop ready fat jar file"
    baseName = project.name + '-all'
    doFirst {
        from {
            configurations.compile.collect { it.isDirectory() ? it : zipTree(it) }
        }
    }
    manifest {
        attributes "Main-Class": "com.kafkastream.word.KafkaStreaming"
    }
    exclude 'META-INF/*.RSA','META-INF/*.SF','META-INF/*.DSA'
    with jar
}

프로젝트 루트 디렉토리에있는 gradlew 파일이 있는 곳에서

$./gradlew fatJar

를 하게 되면 프로젝트 루트폴더 build> libs 에 kafkaspark-all-1.0-SNAPSHOT.jar 파일이 보인다.

/kafkaspark/build/libs$ls

SparkEsKafka-1.0-SNAPSHOT-all.jar 
kafkaspark-all-1.0-SNAPSHOT.jar

모든 라이브러리들의 의존성을 한번에 묶어줬기 때문에 spark-submit을 통해 제출하지 않아도 실행이 되는것같다.

$java -jar kafkaspark-all-1.0-SNAPSHOT.jar

실행을 해보면 ?

/build/libs$java -jar kafkaspark-all-1.0-SNAPSHOT.jar 
hello
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
19/08/24 12:02:40 INFO SparkContext: Running Spark version 2.4.3
19/08/24 12:02:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/08/24 12:02:40 INFO SparkContext: Submitted application: KafkaSpark
19/08/24 12:02:40 INFO SecurityManager: Changing view acls to: daeyunkim
19/08/24 12:02:40 INFO SecurityManager: Changing modify acls to: daeyunkim
19/08/24 12:02:40 INFO SecurityManager: Changing view acls groups to: 
19/08/24 12:02:40 INFO SecurityManager: Changing modify acls groups to: 
19/08/24 12:02:40 INFO SecurityManager: SecurityManager: authentication disabled;
...
19/08/24 12:02:41 WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data.
19/08/24 12:02:41 WARN KafkaUtils: overriding enable.auto.commit to false for executor
19/08/24 12:02:41 WARN KafkaUtils: overriding auto.offset.reset to none for executor
19/08/24 12:02:41 WARN KafkaUtils: overriding executor group.id to spark-executor-testGroup
19/08/24 12:02:41 WARN KafkaUtils: overriding receive.buffer.bytes to 65536 see KAFKA-3135
....
19/08/24 12:02:41 INFO MappedDStream: Initialized and validated 
-------------------------------------------
Time: 1566615780000 ms
-------------------------------------------
...

실행이 잘된다 .

실행한 상태에서

search_word에 토픽에 전달을 하게 되면 잘 받아와서 elasticsearch에 잘 저장됨

소스는 여기에서 볼수 있다. (약간 수정해서 kafka broker의 아이디와 elasticsearch의 주소를 적을수 있게 변경했다.)

https://github.com/DaeyunKim/Kafka-SparkStreaming-Elasticsearch

실행 순서 :

  1. kafka 를 위한 zookeeper 실행
  2. Kafka 서버 실행 - 토픽 (search_word) 추가
  3. elasticsearch 실행
  4. Build 한 jar 파일 만들기
    ProjectRoot에서 실행
    $./gradlew fatJar
  5. (테스트) kafka console producer를 통해서 테스트 실행 함
    kafkaspark-all-1.0-SNAPSHOT.jar 는 프로젝트 루트의 아래 /build/lib에 위치해있다.
    $java -jar kafkaspark-all-1.0-SNAPSHOT.jar 127.0.0.1 9200 localhost:9092

spark streaming 으로 kafka-0.10 consumer api를 사용하여 elasticsearch에 저장하는 프로젝트

참고자료

http://www.hongyusu.com/amt/spark-streaming-kafka-avro-and-registry.html

파라미터 받아오기

$ vi param.sh
#!/bin/bash

echo "Total Param = $#, PROG: $0, param1 = $1, param2 = $2"

파라미터 출력하기

$# : 파라미터의 갯수

$0 : 프로그램이름

$[num] : 입력된 순서의 파라미터

$ sh param.sh linux basic
Total Param = 2, PROG: param.sh, param1 = linux, param2 = basic

'BackEnd > Linux' 카테고리의 다른 글

Swap 메모리 늘리기  (0) 2019.09.17
cache 삭제하기  (0) 2019.08.27
Bash 프로그래밍 기초 - If  (0) 2019.08.20
Bash 프로그래밍 기초 - for  (0) 2019.08.20
Bash 프로그래밍 기초 - array  (0) 2019.08.19

+ Recent posts